Files
wifi-densepose/v1/src/services/stream_service.py
Claude 6ed69a3d48 feat: Complete Rust port of WiFi-DensePose with modular crates
Major changes:
- Organized Python v1 implementation into v1/ subdirectory
- Created Rust workspace with 9 modular crates:
  - wifi-densepose-core: Core types, traits, errors
  - wifi-densepose-signal: CSI processing, phase sanitization, FFT
  - wifi-densepose-nn: Neural network inference (ONNX/Candle/tch)
  - wifi-densepose-api: Axum-based REST/WebSocket API
  - wifi-densepose-db: SQLx database layer
  - wifi-densepose-config: Configuration management
  - wifi-densepose-hardware: Hardware abstraction
  - wifi-densepose-wasm: WebAssembly bindings
  - wifi-densepose-cli: Command-line interface

Documentation:
- ADR-001: Workspace structure
- ADR-002: Signal processing library selection
- ADR-003: Neural network inference strategy
- DDD domain model with bounded contexts

Testing:
- 69 tests passing across all crates
- Signal processing: 45 tests
- Neural networks: 21 tests
- Core: 3 doc tests

Performance targets:
- 10x faster CSI processing (~0.5ms vs ~5ms)
- 5x lower memory usage (~100MB vs ~500MB)
- WASM support for browser deployment
2026-01-13 03:11:16 +00:00

397 lines
14 KiB
Python

"""
Real-time streaming service for WiFi-DensePose API
"""
import logging
import asyncio
import json
from typing import Dict, List, Optional, Any, Set
from datetime import datetime
from collections import deque
import numpy as np
from fastapi import WebSocket
from src.config.settings import Settings
from src.config.domains import DomainConfig
logger = logging.getLogger(__name__)
class StreamService:
"""Service for real-time data streaming."""
def __init__(self, settings: Settings, domain_config: DomainConfig):
"""Initialize stream service."""
self.settings = settings
self.domain_config = domain_config
self.logger = logging.getLogger(__name__)
# WebSocket connections
self.connections: Set[WebSocket] = set()
self.connection_metadata: Dict[WebSocket, Dict[str, Any]] = {}
# Stream buffers
self.pose_buffer = deque(maxlen=self.settings.stream_buffer_size)
self.csi_buffer = deque(maxlen=self.settings.stream_buffer_size)
# Service state
self.is_running = False
self.last_error = None
# Streaming statistics
self.stats = {
"active_connections": 0,
"total_connections": 0,
"messages_sent": 0,
"messages_failed": 0,
"data_points_streamed": 0,
"average_latency_ms": 0.0
}
# Background tasks
self.streaming_task = None
async def initialize(self):
"""Initialize the stream service."""
self.logger.info("Stream service initialized")
async def start(self):
"""Start the stream service."""
if self.is_running:
return
self.is_running = True
self.logger.info("Stream service started")
# Start background streaming task
if self.settings.enable_real_time_processing:
self.streaming_task = asyncio.create_task(self._streaming_loop())
async def stop(self):
"""Stop the stream service."""
self.is_running = False
# Cancel background task
if self.streaming_task:
self.streaming_task.cancel()
try:
await self.streaming_task
except asyncio.CancelledError:
pass
# Close all connections
await self._close_all_connections()
self.logger.info("Stream service stopped")
async def add_connection(self, websocket: WebSocket, metadata: Dict[str, Any] = None):
"""Add a new WebSocket connection."""
try:
await websocket.accept()
self.connections.add(websocket)
self.connection_metadata[websocket] = metadata or {}
self.stats["active_connections"] = len(self.connections)
self.stats["total_connections"] += 1
self.logger.info(f"New WebSocket connection added. Total: {len(self.connections)}")
# Send initial data if available
await self._send_initial_data(websocket)
except Exception as e:
self.logger.error(f"Error adding WebSocket connection: {e}")
raise
async def remove_connection(self, websocket: WebSocket):
"""Remove a WebSocket connection."""
try:
if websocket in self.connections:
self.connections.remove(websocket)
self.connection_metadata.pop(websocket, None)
self.stats["active_connections"] = len(self.connections)
self.logger.info(f"WebSocket connection removed. Total: {len(self.connections)}")
except Exception as e:
self.logger.error(f"Error removing WebSocket connection: {e}")
async def broadcast_pose_data(self, pose_data: Dict[str, Any]):
"""Broadcast pose data to all connected clients."""
if not self.is_running:
return
# Add to buffer
self.pose_buffer.append({
"type": "pose_data",
"timestamp": datetime.now().isoformat(),
"data": pose_data
})
# Broadcast to all connections
await self._broadcast_message({
"type": "pose_update",
"timestamp": datetime.now().isoformat(),
"data": pose_data
})
async def broadcast_csi_data(self, csi_data: np.ndarray, metadata: Dict[str, Any]):
"""Broadcast CSI data to all connected clients."""
if not self.is_running:
return
# Convert numpy array to list for JSON serialization
csi_list = csi_data.tolist() if isinstance(csi_data, np.ndarray) else csi_data
# Add to buffer
self.csi_buffer.append({
"type": "csi_data",
"timestamp": datetime.now().isoformat(),
"data": csi_list,
"metadata": metadata
})
# Broadcast to all connections
await self._broadcast_message({
"type": "csi_update",
"timestamp": datetime.now().isoformat(),
"data": csi_list,
"metadata": metadata
})
async def broadcast_system_status(self, status_data: Dict[str, Any]):
"""Broadcast system status to all connected clients."""
if not self.is_running:
return
await self._broadcast_message({
"type": "system_status",
"timestamp": datetime.now().isoformat(),
"data": status_data
})
async def send_to_connection(self, websocket: WebSocket, message: Dict[str, Any]):
"""Send message to a specific connection."""
try:
if websocket in self.connections:
await websocket.send_text(json.dumps(message))
self.stats["messages_sent"] += 1
except Exception as e:
self.logger.error(f"Error sending message to connection: {e}")
self.stats["messages_failed"] += 1
await self.remove_connection(websocket)
async def _broadcast_message(self, message: Dict[str, Any]):
"""Broadcast message to all connected clients."""
if not self.connections:
return
disconnected = set()
for websocket in self.connections.copy():
try:
await websocket.send_text(json.dumps(message))
self.stats["messages_sent"] += 1
except Exception as e:
self.logger.warning(f"Failed to send message to connection: {e}")
self.stats["messages_failed"] += 1
disconnected.add(websocket)
# Remove disconnected clients
for websocket in disconnected:
await self.remove_connection(websocket)
if message.get("type") in ["pose_update", "csi_update"]:
self.stats["data_points_streamed"] += 1
async def _send_initial_data(self, websocket: WebSocket):
"""Send initial data to a new connection."""
try:
# Send recent pose data
if self.pose_buffer:
recent_poses = list(self.pose_buffer)[-10:] # Last 10 poses
await self.send_to_connection(websocket, {
"type": "initial_poses",
"timestamp": datetime.now().isoformat(),
"data": recent_poses
})
# Send recent CSI data
if self.csi_buffer:
recent_csi = list(self.csi_buffer)[-5:] # Last 5 CSI readings
await self.send_to_connection(websocket, {
"type": "initial_csi",
"timestamp": datetime.now().isoformat(),
"data": recent_csi
})
# Send service status
status = await self.get_status()
await self.send_to_connection(websocket, {
"type": "service_status",
"timestamp": datetime.now().isoformat(),
"data": status
})
except Exception as e:
self.logger.error(f"Error sending initial data: {e}")
async def _streaming_loop(self):
"""Background streaming loop for periodic updates."""
try:
while self.is_running:
# Send periodic heartbeat
if self.connections:
await self._broadcast_message({
"type": "heartbeat",
"timestamp": datetime.now().isoformat(),
"active_connections": len(self.connections)
})
# Wait for next iteration
await asyncio.sleep(self.settings.websocket_ping_interval)
except asyncio.CancelledError:
self.logger.info("Streaming loop cancelled")
except Exception as e:
self.logger.error(f"Error in streaming loop: {e}")
self.last_error = str(e)
async def _close_all_connections(self):
"""Close all WebSocket connections."""
disconnected = []
for websocket in self.connections.copy():
try:
await websocket.close()
disconnected.append(websocket)
except Exception as e:
self.logger.warning(f"Error closing connection: {e}")
disconnected.append(websocket)
# Clear all connections
for websocket in disconnected:
await self.remove_connection(websocket)
async def get_status(self) -> Dict[str, Any]:
"""Get service status."""
return {
"status": "healthy" if self.is_running and not self.last_error else "unhealthy",
"running": self.is_running,
"last_error": self.last_error,
"connections": {
"active": len(self.connections),
"total": self.stats["total_connections"]
},
"buffers": {
"pose_buffer_size": len(self.pose_buffer),
"csi_buffer_size": len(self.csi_buffer),
"max_buffer_size": self.settings.stream_buffer_size
},
"statistics": self.stats.copy(),
"configuration": {
"stream_fps": self.settings.stream_fps,
"buffer_size": self.settings.stream_buffer_size,
"ping_interval": self.settings.websocket_ping_interval,
"timeout": self.settings.websocket_timeout
}
}
async def get_metrics(self) -> Dict[str, Any]:
"""Get service metrics."""
total_messages = self.stats["messages_sent"] + self.stats["messages_failed"]
success_rate = self.stats["messages_sent"] / max(1, total_messages)
return {
"stream_service": {
"active_connections": self.stats["active_connections"],
"total_connections": self.stats["total_connections"],
"messages_sent": self.stats["messages_sent"],
"messages_failed": self.stats["messages_failed"],
"message_success_rate": success_rate,
"data_points_streamed": self.stats["data_points_streamed"],
"average_latency_ms": self.stats["average_latency_ms"]
}
}
async def get_connection_info(self) -> List[Dict[str, Any]]:
"""Get information about active connections."""
connections_info = []
for websocket in self.connections:
metadata = self.connection_metadata.get(websocket, {})
connection_info = {
"id": id(websocket),
"connected_at": metadata.get("connected_at", "unknown"),
"user_agent": metadata.get("user_agent", "unknown"),
"ip_address": metadata.get("ip_address", "unknown"),
"subscription_types": metadata.get("subscription_types", [])
}
connections_info.append(connection_info)
return connections_info
async def reset(self):
"""Reset service state."""
# Clear buffers
self.pose_buffer.clear()
self.csi_buffer.clear()
# Reset statistics
self.stats = {
"active_connections": len(self.connections),
"total_connections": 0,
"messages_sent": 0,
"messages_failed": 0,
"data_points_streamed": 0,
"average_latency_ms": 0.0
}
self.last_error = None
self.logger.info("Stream service reset")
def get_buffer_data(self, buffer_type: str, limit: int = 100) -> List[Dict[str, Any]]:
"""Get data from buffers."""
if buffer_type == "pose":
return list(self.pose_buffer)[-limit:]
elif buffer_type == "csi":
return list(self.csi_buffer)[-limit:]
else:
return []
@property
def is_active(self) -> bool:
"""Check if stream service is active."""
return self.is_running
async def health_check(self) -> Dict[str, Any]:
"""Perform health check."""
try:
status = "healthy" if self.is_running and not self.last_error else "unhealthy"
return {
"status": status,
"message": self.last_error if self.last_error else "Stream service is running normally",
"active_connections": len(self.connections),
"metrics": {
"messages_sent": self.stats["messages_sent"],
"messages_failed": self.stats["messages_failed"],
"data_points_streamed": self.stats["data_points_streamed"]
}
}
except Exception as e:
return {
"status": "unhealthy",
"message": f"Health check failed: {str(e)}"
}
async def is_ready(self) -> bool:
"""Check if service is ready."""
return self.is_running