Major changes: - Organized Python v1 implementation into v1/ subdirectory - Created Rust workspace with 9 modular crates: - wifi-densepose-core: Core types, traits, errors - wifi-densepose-signal: CSI processing, phase sanitization, FFT - wifi-densepose-nn: Neural network inference (ONNX/Candle/tch) - wifi-densepose-api: Axum-based REST/WebSocket API - wifi-densepose-db: SQLx database layer - wifi-densepose-config: Configuration management - wifi-densepose-hardware: Hardware abstraction - wifi-densepose-wasm: WebAssembly bindings - wifi-densepose-cli: Command-line interface Documentation: - ADR-001: Workspace structure - ADR-002: Signal processing library selection - ADR-003: Neural network inference strategy - DDD domain model with bounded contexts Testing: - 69 tests passing across all crates - Signal processing: 45 tests - Neural networks: 21 tests - Core: 3 doc tests Performance targets: - 10x faster CSI processing (~0.5ms vs ~5ms) - 5x lower memory usage (~100MB vs ~500MB) - WASM support for browser deployment
397 lines
14 KiB
Python
397 lines
14 KiB
Python
"""
|
|
Real-time streaming service for WiFi-DensePose API
|
|
"""
|
|
|
|
import logging
|
|
import asyncio
|
|
import json
|
|
from typing import Dict, List, Optional, Any, Set
|
|
from datetime import datetime
|
|
from collections import deque
|
|
|
|
import numpy as np
|
|
from fastapi import WebSocket
|
|
|
|
from src.config.settings import Settings
|
|
from src.config.domains import DomainConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class StreamService:
|
|
"""Service for real-time data streaming."""
|
|
|
|
def __init__(self, settings: Settings, domain_config: DomainConfig):
|
|
"""Initialize stream service."""
|
|
self.settings = settings
|
|
self.domain_config = domain_config
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
# WebSocket connections
|
|
self.connections: Set[WebSocket] = set()
|
|
self.connection_metadata: Dict[WebSocket, Dict[str, Any]] = {}
|
|
|
|
# Stream buffers
|
|
self.pose_buffer = deque(maxlen=self.settings.stream_buffer_size)
|
|
self.csi_buffer = deque(maxlen=self.settings.stream_buffer_size)
|
|
|
|
# Service state
|
|
self.is_running = False
|
|
self.last_error = None
|
|
|
|
# Streaming statistics
|
|
self.stats = {
|
|
"active_connections": 0,
|
|
"total_connections": 0,
|
|
"messages_sent": 0,
|
|
"messages_failed": 0,
|
|
"data_points_streamed": 0,
|
|
"average_latency_ms": 0.0
|
|
}
|
|
|
|
# Background tasks
|
|
self.streaming_task = None
|
|
|
|
async def initialize(self):
|
|
"""Initialize the stream service."""
|
|
self.logger.info("Stream service initialized")
|
|
|
|
async def start(self):
|
|
"""Start the stream service."""
|
|
if self.is_running:
|
|
return
|
|
|
|
self.is_running = True
|
|
self.logger.info("Stream service started")
|
|
|
|
# Start background streaming task
|
|
if self.settings.enable_real_time_processing:
|
|
self.streaming_task = asyncio.create_task(self._streaming_loop())
|
|
|
|
async def stop(self):
|
|
"""Stop the stream service."""
|
|
self.is_running = False
|
|
|
|
# Cancel background task
|
|
if self.streaming_task:
|
|
self.streaming_task.cancel()
|
|
try:
|
|
await self.streaming_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Close all connections
|
|
await self._close_all_connections()
|
|
|
|
self.logger.info("Stream service stopped")
|
|
|
|
async def add_connection(self, websocket: WebSocket, metadata: Dict[str, Any] = None):
|
|
"""Add a new WebSocket connection."""
|
|
try:
|
|
await websocket.accept()
|
|
self.connections.add(websocket)
|
|
self.connection_metadata[websocket] = metadata or {}
|
|
|
|
self.stats["active_connections"] = len(self.connections)
|
|
self.stats["total_connections"] += 1
|
|
|
|
self.logger.info(f"New WebSocket connection added. Total: {len(self.connections)}")
|
|
|
|
# Send initial data if available
|
|
await self._send_initial_data(websocket)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error adding WebSocket connection: {e}")
|
|
raise
|
|
|
|
async def remove_connection(self, websocket: WebSocket):
|
|
"""Remove a WebSocket connection."""
|
|
try:
|
|
if websocket in self.connections:
|
|
self.connections.remove(websocket)
|
|
self.connection_metadata.pop(websocket, None)
|
|
|
|
self.stats["active_connections"] = len(self.connections)
|
|
|
|
self.logger.info(f"WebSocket connection removed. Total: {len(self.connections)}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error removing WebSocket connection: {e}")
|
|
|
|
async def broadcast_pose_data(self, pose_data: Dict[str, Any]):
|
|
"""Broadcast pose data to all connected clients."""
|
|
if not self.is_running:
|
|
return
|
|
|
|
# Add to buffer
|
|
self.pose_buffer.append({
|
|
"type": "pose_data",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": pose_data
|
|
})
|
|
|
|
# Broadcast to all connections
|
|
await self._broadcast_message({
|
|
"type": "pose_update",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": pose_data
|
|
})
|
|
|
|
async def broadcast_csi_data(self, csi_data: np.ndarray, metadata: Dict[str, Any]):
|
|
"""Broadcast CSI data to all connected clients."""
|
|
if not self.is_running:
|
|
return
|
|
|
|
# Convert numpy array to list for JSON serialization
|
|
csi_list = csi_data.tolist() if isinstance(csi_data, np.ndarray) else csi_data
|
|
|
|
# Add to buffer
|
|
self.csi_buffer.append({
|
|
"type": "csi_data",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": csi_list,
|
|
"metadata": metadata
|
|
})
|
|
|
|
# Broadcast to all connections
|
|
await self._broadcast_message({
|
|
"type": "csi_update",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": csi_list,
|
|
"metadata": metadata
|
|
})
|
|
|
|
async def broadcast_system_status(self, status_data: Dict[str, Any]):
|
|
"""Broadcast system status to all connected clients."""
|
|
if not self.is_running:
|
|
return
|
|
|
|
await self._broadcast_message({
|
|
"type": "system_status",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": status_data
|
|
})
|
|
|
|
async def send_to_connection(self, websocket: WebSocket, message: Dict[str, Any]):
|
|
"""Send message to a specific connection."""
|
|
try:
|
|
if websocket in self.connections:
|
|
await websocket.send_text(json.dumps(message))
|
|
self.stats["messages_sent"] += 1
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error sending message to connection: {e}")
|
|
self.stats["messages_failed"] += 1
|
|
await self.remove_connection(websocket)
|
|
|
|
async def _broadcast_message(self, message: Dict[str, Any]):
|
|
"""Broadcast message to all connected clients."""
|
|
if not self.connections:
|
|
return
|
|
|
|
disconnected = set()
|
|
|
|
for websocket in self.connections.copy():
|
|
try:
|
|
await websocket.send_text(json.dumps(message))
|
|
self.stats["messages_sent"] += 1
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to send message to connection: {e}")
|
|
self.stats["messages_failed"] += 1
|
|
disconnected.add(websocket)
|
|
|
|
# Remove disconnected clients
|
|
for websocket in disconnected:
|
|
await self.remove_connection(websocket)
|
|
|
|
if message.get("type") in ["pose_update", "csi_update"]:
|
|
self.stats["data_points_streamed"] += 1
|
|
|
|
async def _send_initial_data(self, websocket: WebSocket):
|
|
"""Send initial data to a new connection."""
|
|
try:
|
|
# Send recent pose data
|
|
if self.pose_buffer:
|
|
recent_poses = list(self.pose_buffer)[-10:] # Last 10 poses
|
|
await self.send_to_connection(websocket, {
|
|
"type": "initial_poses",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": recent_poses
|
|
})
|
|
|
|
# Send recent CSI data
|
|
if self.csi_buffer:
|
|
recent_csi = list(self.csi_buffer)[-5:] # Last 5 CSI readings
|
|
await self.send_to_connection(websocket, {
|
|
"type": "initial_csi",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": recent_csi
|
|
})
|
|
|
|
# Send service status
|
|
status = await self.get_status()
|
|
await self.send_to_connection(websocket, {
|
|
"type": "service_status",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": status
|
|
})
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error sending initial data: {e}")
|
|
|
|
async def _streaming_loop(self):
|
|
"""Background streaming loop for periodic updates."""
|
|
try:
|
|
while self.is_running:
|
|
# Send periodic heartbeat
|
|
if self.connections:
|
|
await self._broadcast_message({
|
|
"type": "heartbeat",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"active_connections": len(self.connections)
|
|
})
|
|
|
|
# Wait for next iteration
|
|
await asyncio.sleep(self.settings.websocket_ping_interval)
|
|
|
|
except asyncio.CancelledError:
|
|
self.logger.info("Streaming loop cancelled")
|
|
except Exception as e:
|
|
self.logger.error(f"Error in streaming loop: {e}")
|
|
self.last_error = str(e)
|
|
|
|
async def _close_all_connections(self):
|
|
"""Close all WebSocket connections."""
|
|
disconnected = []
|
|
|
|
for websocket in self.connections.copy():
|
|
try:
|
|
await websocket.close()
|
|
disconnected.append(websocket)
|
|
except Exception as e:
|
|
self.logger.warning(f"Error closing connection: {e}")
|
|
disconnected.append(websocket)
|
|
|
|
# Clear all connections
|
|
for websocket in disconnected:
|
|
await self.remove_connection(websocket)
|
|
|
|
async def get_status(self) -> Dict[str, Any]:
|
|
"""Get service status."""
|
|
return {
|
|
"status": "healthy" if self.is_running and not self.last_error else "unhealthy",
|
|
"running": self.is_running,
|
|
"last_error": self.last_error,
|
|
"connections": {
|
|
"active": len(self.connections),
|
|
"total": self.stats["total_connections"]
|
|
},
|
|
"buffers": {
|
|
"pose_buffer_size": len(self.pose_buffer),
|
|
"csi_buffer_size": len(self.csi_buffer),
|
|
"max_buffer_size": self.settings.stream_buffer_size
|
|
},
|
|
"statistics": self.stats.copy(),
|
|
"configuration": {
|
|
"stream_fps": self.settings.stream_fps,
|
|
"buffer_size": self.settings.stream_buffer_size,
|
|
"ping_interval": self.settings.websocket_ping_interval,
|
|
"timeout": self.settings.websocket_timeout
|
|
}
|
|
}
|
|
|
|
async def get_metrics(self) -> Dict[str, Any]:
|
|
"""Get service metrics."""
|
|
total_messages = self.stats["messages_sent"] + self.stats["messages_failed"]
|
|
success_rate = self.stats["messages_sent"] / max(1, total_messages)
|
|
|
|
return {
|
|
"stream_service": {
|
|
"active_connections": self.stats["active_connections"],
|
|
"total_connections": self.stats["total_connections"],
|
|
"messages_sent": self.stats["messages_sent"],
|
|
"messages_failed": self.stats["messages_failed"],
|
|
"message_success_rate": success_rate,
|
|
"data_points_streamed": self.stats["data_points_streamed"],
|
|
"average_latency_ms": self.stats["average_latency_ms"]
|
|
}
|
|
}
|
|
|
|
async def get_connection_info(self) -> List[Dict[str, Any]]:
|
|
"""Get information about active connections."""
|
|
connections_info = []
|
|
|
|
for websocket in self.connections:
|
|
metadata = self.connection_metadata.get(websocket, {})
|
|
|
|
connection_info = {
|
|
"id": id(websocket),
|
|
"connected_at": metadata.get("connected_at", "unknown"),
|
|
"user_agent": metadata.get("user_agent", "unknown"),
|
|
"ip_address": metadata.get("ip_address", "unknown"),
|
|
"subscription_types": metadata.get("subscription_types", [])
|
|
}
|
|
|
|
connections_info.append(connection_info)
|
|
|
|
return connections_info
|
|
|
|
async def reset(self):
|
|
"""Reset service state."""
|
|
# Clear buffers
|
|
self.pose_buffer.clear()
|
|
self.csi_buffer.clear()
|
|
|
|
# Reset statistics
|
|
self.stats = {
|
|
"active_connections": len(self.connections),
|
|
"total_connections": 0,
|
|
"messages_sent": 0,
|
|
"messages_failed": 0,
|
|
"data_points_streamed": 0,
|
|
"average_latency_ms": 0.0
|
|
}
|
|
|
|
self.last_error = None
|
|
self.logger.info("Stream service reset")
|
|
|
|
def get_buffer_data(self, buffer_type: str, limit: int = 100) -> List[Dict[str, Any]]:
|
|
"""Get data from buffers."""
|
|
if buffer_type == "pose":
|
|
return list(self.pose_buffer)[-limit:]
|
|
elif buffer_type == "csi":
|
|
return list(self.csi_buffer)[-limit:]
|
|
else:
|
|
return []
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
"""Check if stream service is active."""
|
|
return self.is_running
|
|
|
|
async def health_check(self) -> Dict[str, Any]:
|
|
"""Perform health check."""
|
|
try:
|
|
status = "healthy" if self.is_running and not self.last_error else "unhealthy"
|
|
|
|
return {
|
|
"status": status,
|
|
"message": self.last_error if self.last_error else "Stream service is running normally",
|
|
"active_connections": len(self.connections),
|
|
"metrics": {
|
|
"messages_sent": self.stats["messages_sent"],
|
|
"messages_failed": self.stats["messages_failed"],
|
|
"data_points_streamed": self.stats["data_points_streamed"]
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "unhealthy",
|
|
"message": f"Health check failed: {str(e)}"
|
|
}
|
|
|
|
async def is_ready(self) -> bool:
|
|
"""Check if service is ready."""
|
|
return self.is_running |