- Implemented the WiFi DensePose model in PyTorch, including CSI phase processing, modality translation, and DensePose prediction heads. - Added a comprehensive training utility for the model, including loss functions and training steps. - Created a CSV file to document hardware specifications, architecture details, training parameters, performance metrics, and advantages of the model.
1378 lines
45 KiB
Markdown
1378 lines
45 KiB
Markdown
# Technical Specification
|
||
## WiFi-DensePose System
|
||
|
||
### Document Information
|
||
- **Version**: 1.0
|
||
- **Date**: 2025-01-07
|
||
- **Project**: InvisPose - WiFi-Based Dense Human Pose Estimation
|
||
- **Status**: Draft
|
||
|
||
---
|
||
|
||
## 1. Introduction
|
||
|
||
### 1.1 Purpose
|
||
This document provides detailed technical specifications for the WiFi-DensePose system implementation, including architecture design, component interfaces, data structures, and implementation strategies.
|
||
|
||
### 1.2 Scope
|
||
The technical specification covers system architecture, neural network design, data processing pipelines, API implementation, hardware interfaces, and deployment considerations.
|
||
|
||
### 1.3 Technical Overview
|
||
The system employs a modular architecture with five primary components: Hardware Interface Layer, Neural Network Pipeline, Pose Estimation Engine, API Services, and Configuration Management.
|
||
|
||
---
|
||
|
||
## 2. System Architecture
|
||
|
||
### 2.1 High-Level Architecture
|
||
|
||
```
|
||
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
||
│ WiFi Routers │ │ CSI Receiver │ │ Neural Network │
|
||
│ (Hardware) │───▶│ Module │───▶│ Pipeline │
|
||
└─────────────────┘ └─────────────────┘ └─────────────────┘
|
||
│
|
||
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
||
│ Web Dashboard │◄───│ API Services │◄───│ Pose Estimation │
|
||
│ (Frontend) │ │ Module │ │ Engine │
|
||
└─────────────────┘ └─────────────────┘ └─────────────────┘
|
||
│
|
||
┌─────────────────┐
|
||
│ Configuration │
|
||
│ Management │
|
||
└─────────────────┘
|
||
```
|
||
|
||
### 2.2 Component Architecture
|
||
|
||
#### 2.2.1 Hardware Interface Layer
|
||
**Purpose**: Interface with WiFi hardware for CSI data extraction
|
||
**Components**:
|
||
- CSI Data Collector
|
||
- Router Communication Manager
|
||
- Signal Preprocessor
|
||
- Data Stream Manager
|
||
|
||
**Technology Stack**:
|
||
- Python 3.8+ with asyncio for concurrent processing
|
||
- Socket programming for UDP data streams
|
||
- NumPy for signal processing operations
|
||
- Threading for parallel data collection
|
||
|
||
#### 2.2.2 Neural Network Pipeline
|
||
**Purpose**: Transform CSI signals to pose estimates
|
||
**Components**:
|
||
- Modality Translation Network
|
||
- DensePose Estimation Network
|
||
- Feature Fusion Module
|
||
- Temporal Consistency Filter
|
||
|
||
**Technology Stack**:
|
||
- PyTorch 1.12+ for deep learning framework
|
||
- CUDA 11.6+ for GPU acceleration
|
||
- TorchVision for computer vision utilities
|
||
- OpenCV for image processing operations
|
||
|
||
#### 2.2.3 Pose Estimation Engine
|
||
**Purpose**: Orchestrate end-to-end processing pipeline
|
||
**Components**:
|
||
- Pipeline Coordinator
|
||
- Multi-Person Tracker
|
||
- Performance Monitor
|
||
- Error Recovery Manager
|
||
|
||
**Technology Stack**:
|
||
- Python asyncio for asynchronous processing
|
||
- Threading and multiprocessing for parallelization
|
||
- Queue management for data flow control
|
||
- Logging framework for monitoring
|
||
|
||
#### 2.2.4 API Services Module
|
||
**Purpose**: Provide external interfaces and streaming
|
||
**Components**:
|
||
- FastAPI REST Server
|
||
- WebSocket Manager
|
||
- Streaming Service
|
||
- Authentication Handler
|
||
|
||
**Technology Stack**:
|
||
- FastAPI 0.95+ for REST API framework
|
||
- WebSockets for real-time communication
|
||
- FFmpeg for video encoding
|
||
- Pydantic for data validation
|
||
|
||
#### 2.2.5 Configuration Management
|
||
**Purpose**: Handle system configuration and templates
|
||
**Components**:
|
||
- Configuration Parser
|
||
- Template Manager
|
||
- Validation Engine
|
||
- Runtime Configuration
|
||
|
||
**Technology Stack**:
|
||
- YAML for configuration files
|
||
- JSON Schema for validation
|
||
- File system monitoring for dynamic updates
|
||
- Environment variable integration
|
||
|
||
### 2.3 Data Flow Architecture
|
||
|
||
```
|
||
CSI Raw Data → Preprocessing → Neural Network → Post-processing → Output
|
||
│ │ │ │ │
|
||
▼ ▼ ▼ ▼ ▼
|
||
UDP Stream Signal Clean Feature Extract Tracking API/Stream
|
||
Buffer Mgmt Calibration Pose Estimation Smoothing Distribution
|
||
Error Handle Noise Filter Multi-Person ID Assign Visualization
|
||
```
|
||
|
||
---
|
||
|
||
## 3. Neural Network Design
|
||
|
||
### 3.1 Modality Translation Network
|
||
|
||
#### 3.1.1 Architecture Overview
|
||
**Input**: CSI tensor (3×3×N) where N is temporal window
|
||
**Output**: Spatial features (720×1280×3) compatible with DensePose
|
||
|
||
**Network Structure**:
|
||
```python
|
||
class ModalityTranslationNetwork(nn.Module):
|
||
def __init__(self):
|
||
# Amplitude branch encoder
|
||
self.amplitude_encoder = nn.Sequential(
|
||
nn.Conv1d(9, 64, kernel_size=3, padding=1),
|
||
nn.BatchNorm1d(64),
|
||
nn.ReLU(),
|
||
nn.Conv1d(64, 128, kernel_size=3, padding=1),
|
||
nn.BatchNorm1d(128),
|
||
nn.ReLU(),
|
||
nn.AdaptiveAvgPool1d(256)
|
||
)
|
||
|
||
# Phase branch encoder
|
||
self.phase_encoder = nn.Sequential(
|
||
nn.Conv1d(9, 64, kernel_size=3, padding=1),
|
||
nn.BatchNorm1d(64),
|
||
nn.ReLU(),
|
||
nn.Conv1d(64, 128, kernel_size=3, padding=1),
|
||
nn.BatchNorm1d(128),
|
||
nn.ReLU(),
|
||
nn.AdaptiveAvgPool1d(256)
|
||
)
|
||
|
||
# Feature fusion and upsampling
|
||
self.fusion_network = nn.Sequential(
|
||
nn.Linear(512, 1024),
|
||
nn.ReLU(),
|
||
nn.Linear(1024, 720*1280*3),
|
||
nn.Sigmoid()
|
||
)
|
||
```
|
||
|
||
#### 3.1.2 CSI Preprocessing Pipeline
|
||
**Phase Unwrapping Algorithm**:
|
||
```python
|
||
def unwrap_phase(phase_data):
|
||
"""
|
||
Unwrap CSI phase data to remove 2π discontinuities
|
||
"""
|
||
unwrapped = np.unwrap(phase_data, axis=-1)
|
||
# Apply linear detrending
|
||
detrended = signal.detrend(unwrapped, axis=-1)
|
||
# Temporal filtering
|
||
filtered = apply_moving_average(detrended, window=5)
|
||
return filtered
|
||
|
||
def apply_moving_average(data, window=5):
|
||
"""
|
||
Apply moving average filter for noise reduction
|
||
"""
|
||
kernel = np.ones(window) / window
|
||
return np.convolve(data, kernel, mode='same')
|
||
```
|
||
|
||
**Amplitude Processing**:
|
||
```python
|
||
def process_amplitude(amplitude_data):
|
||
"""
|
||
Process CSI amplitude data for neural network input
|
||
"""
|
||
# Convert to dB scale
|
||
amplitude_db = 20 * np.log10(np.abs(amplitude_data) + 1e-10)
|
||
# Normalize to [0, 1] range
|
||
normalized = (amplitude_db - amplitude_db.min()) / (amplitude_db.max() - amplitude_db.min())
|
||
return normalized
|
||
```
|
||
|
||
#### 3.1.3 Feature Fusion Strategy
|
||
**Fusion Architecture**:
|
||
- Concatenate amplitude and phase features
|
||
- Apply fully connected layers for dimension reduction
|
||
- Use residual connections for gradient flow
|
||
- Apply dropout for regularization
|
||
|
||
### 3.2 DensePose Integration
|
||
|
||
#### 3.2.1 Network Adaptation
|
||
**Base Architecture**: DensePose-RCNN with ResNet-FPN backbone
|
||
**Modifications**:
|
||
- Replace RGB input with WiFi-translated features
|
||
- Adapt feature pyramid network for WiFi domain
|
||
- Modify region proposal network for WiFi characteristics
|
||
- Fine-tune detection heads for WiFi-specific patterns
|
||
|
||
#### 3.2.2 Transfer Learning Framework
|
||
**Teacher-Student Architecture**:
|
||
```python
|
||
class TransferLearningFramework:
|
||
def __init__(self):
|
||
self.teacher_model = load_pretrained_densepose()
|
||
self.student_model = WiFiDensePoseModel()
|
||
self.translation_network = ModalityTranslationNetwork()
|
||
|
||
def knowledge_distillation_loss(self, wifi_features, image_features):
|
||
"""
|
||
Compute knowledge distillation loss between teacher and student
|
||
"""
|
||
teacher_output = self.teacher_model(image_features)
|
||
student_output = self.student_model(wifi_features)
|
||
|
||
# Feature matching loss
|
||
feature_loss = F.mse_loss(student_output.features, teacher_output.features)
|
||
|
||
# Pose estimation loss
|
||
pose_loss = F.cross_entropy(student_output.poses, teacher_output.poses)
|
||
|
||
return feature_loss + pose_loss
|
||
```
|
||
|
||
### 3.3 Multi-Person Tracking
|
||
|
||
#### 3.3.1 Tracking Algorithm
|
||
**Hungarian Algorithm Implementation**:
|
||
```python
|
||
class MultiPersonTracker:
|
||
def __init__(self, max_persons=5):
|
||
self.max_persons = max_persons
|
||
self.active_tracks = {}
|
||
self.next_id = 1
|
||
|
||
def update(self, detections):
|
||
"""
|
||
Update tracks with new detections using Hungarian algorithm
|
||
"""
|
||
if not self.active_tracks:
|
||
# Initialize tracks for first frame
|
||
return self.initialize_tracks(detections)
|
||
|
||
# Compute cost matrix
|
||
cost_matrix = self.compute_cost_matrix(detections)
|
||
|
||
# Solve assignment problem
|
||
assignments = self.hungarian_assignment(cost_matrix)
|
||
|
||
# Update tracks
|
||
return self.update_tracks(detections, assignments)
|
||
|
||
def compute_cost_matrix(self, detections):
|
||
"""
|
||
Compute cost matrix for track-detection assignment
|
||
"""
|
||
costs = np.zeros((len(self.active_tracks), len(detections)))
|
||
|
||
for i, track in enumerate(self.active_tracks.values()):
|
||
for j, detection in enumerate(detections):
|
||
# Compute distance-based cost
|
||
distance = np.linalg.norm(track.position - detection.position)
|
||
# Add appearance similarity cost
|
||
appearance_cost = 1 - self.compute_appearance_similarity(track, detection)
|
||
costs[i, j] = distance + appearance_cost
|
||
|
||
return costs
|
||
```
|
||
|
||
#### 3.3.2 Kalman Filtering
|
||
**State Prediction Model**:
|
||
```python
|
||
class KalmanTracker:
|
||
def __init__(self):
|
||
# State vector: [x, y, vx, vy, ax, ay]
|
||
self.state = np.zeros(6)
|
||
|
||
# State transition matrix
|
||
self.F = np.array([
|
||
[1, 0, 1, 0, 0.5, 0],
|
||
[0, 1, 0, 1, 0, 0.5],
|
||
[0, 0, 1, 0, 1, 0],
|
||
[0, 0, 0, 1, 0, 1],
|
||
[0, 0, 0, 0, 1, 0],
|
||
[0, 0, 0, 0, 0, 1]
|
||
])
|
||
|
||
# Measurement matrix
|
||
self.H = np.array([
|
||
[1, 0, 0, 0, 0, 0],
|
||
[0, 1, 0, 0, 0, 0]
|
||
])
|
||
|
||
# Process and measurement noise
|
||
self.Q = np.eye(6) * 0.1 # Process noise
|
||
self.R = np.eye(2) * 1.0 # Measurement noise
|
||
self.P = np.eye(6) * 100 # Initial covariance
|
||
|
||
def predict(self):
|
||
"""Predict next state"""
|
||
self.state = self.F @ self.state
|
||
self.P = self.F @ self.P @ self.F.T + self.Q
|
||
return self.state[:2] # Return position
|
||
|
||
def update(self, measurement):
|
||
"""Update state with measurement"""
|
||
y = measurement - self.H @ self.state
|
||
S = self.H @ self.P @ self.H.T + self.R
|
||
K = self.P @ self.H.T @ np.linalg.inv(S)
|
||
|
||
self.state = self.state + K @ y
|
||
self.P = (np.eye(6) - K @ self.H) @ self.P
|
||
```
|
||
|
||
---
|
||
|
||
## 4. Hardware Interface Implementation
|
||
|
||
### 4.1 CSI Data Collection
|
||
|
||
#### 4.1.1 Router Communication Protocol
|
||
**UDP Socket Implementation**:
|
||
```python
|
||
class CSIReceiver:
|
||
def __init__(self, port=5500, buffer_size=1024):
|
||
self.port = port
|
||
self.buffer_size = buffer_size
|
||
self.socket = None
|
||
self.running = False
|
||
|
||
async def start_collection(self):
|
||
"""Start CSI data collection"""
|
||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
self.socket.bind(('0.0.0.0', self.port))
|
||
self.socket.setblocking(False)
|
||
self.running = True
|
||
|
||
while self.running:
|
||
try:
|
||
data, addr = await asyncio.wait_for(
|
||
self.socket.recvfrom(self.buffer_size),
|
||
timeout=1.0
|
||
)
|
||
await self.process_csi_packet(data, addr)
|
||
except asyncio.TimeoutError:
|
||
continue
|
||
except Exception as e:
|
||
logger.error(f"CSI collection error: {e}")
|
||
|
||
async def process_csi_packet(self, data, addr):
|
||
"""Process incoming CSI packet"""
|
||
try:
|
||
csi_data = self.parse_csi_packet(data)
|
||
await self.data_queue.put(csi_data)
|
||
except Exception as e:
|
||
logger.error(f"CSI parsing error: {e}")
|
||
```
|
||
|
||
#### 4.1.2 CSI Packet Parsing
|
||
**Atheros CSI Format**:
|
||
```python
|
||
class AtheriosCSIParser:
|
||
def __init__(self):
|
||
self.packet_format = struct.Struct('<HHHHH') # Header format
|
||
|
||
def parse_packet(self, raw_data):
|
||
"""Parse Atheros CSI packet format"""
|
||
if len(raw_data) < 10: # Minimum header size
|
||
raise ValueError("Packet too short")
|
||
|
||
# Parse header
|
||
header = self.packet_format.unpack(raw_data[:10])
|
||
timestamp, length, rate, channel, rssi = header
|
||
|
||
# Extract CSI data
|
||
csi_start = 10
|
||
csi_length = length - 10
|
||
csi_raw = raw_data[csi_start:csi_start + csi_length]
|
||
|
||
# Parse complex CSI values
|
||
csi_complex = self.parse_complex_csi(csi_raw)
|
||
|
||
return {
|
||
'timestamp': timestamp,
|
||
'channel': channel,
|
||
'rssi': rssi,
|
||
'csi_data': csi_complex,
|
||
'amplitude': np.abs(csi_complex),
|
||
'phase': np.angle(csi_complex)
|
||
}
|
||
|
||
def parse_complex_csi(self, csi_raw):
|
||
"""Parse complex CSI values from raw bytes"""
|
||
# Atheros format: 3x3 MIMO, 56 subcarriers
|
||
num_subcarriers = 56
|
||
num_antennas = 9 # 3x3 MIMO
|
||
|
||
csi_complex = np.zeros((num_antennas, num_subcarriers), dtype=complex)
|
||
|
||
for i in range(num_antennas):
|
||
for j in range(num_subcarriers):
|
||
idx = (i * num_subcarriers + j) * 4 # 4 bytes per complex value
|
||
if idx + 4 <= len(csi_raw):
|
||
real = struct.unpack('<h', csi_raw[idx:idx+2])[0]
|
||
imag = struct.unpack('<h', csi_raw[idx+2:idx+4])[0]
|
||
csi_complex[i, j] = complex(real, imag)
|
||
|
||
return csi_complex
|
||
```
|
||
|
||
### 4.2 Signal Processing Pipeline
|
||
|
||
#### 4.2.1 Real-Time Processing
|
||
**Streaming Data Processor**:
|
||
```python
|
||
class StreamingProcessor:
|
||
def __init__(self, window_size=100):
|
||
self.window_size = window_size
|
||
self.data_buffer = collections.deque(maxlen=window_size)
|
||
self.background_model = None
|
||
|
||
async def process_stream(self, csi_data):
|
||
"""Process streaming CSI data"""
|
||
# Add to buffer
|
||
self.data_buffer.append(csi_data)
|
||
|
||
if len(self.data_buffer) < self.window_size:
|
||
return None # Wait for sufficient data
|
||
|
||
# Extract current window
|
||
window_data = np.array(list(self.data_buffer))
|
||
|
||
# Apply preprocessing
|
||
processed_data = self.preprocess_window(window_data)
|
||
|
||
# Background subtraction
|
||
if self.background_model is not None:
|
||
processed_data = processed_data - self.background_model
|
||
|
||
return processed_data
|
||
|
||
def preprocess_window(self, window_data):
|
||
"""Apply preprocessing to data window"""
|
||
# Phase unwrapping
|
||
phase_data = np.angle(window_data)
|
||
unwrapped_phase = np.unwrap(phase_data, axis=-1)
|
||
|
||
# Amplitude processing
|
||
amplitude_data = np.abs(window_data)
|
||
amplitude_db = 20 * np.log10(amplitude_data + 1e-10)
|
||
|
||
# Temporal filtering
|
||
filtered_amplitude = self.apply_temporal_filter(amplitude_db)
|
||
filtered_phase = self.apply_temporal_filter(unwrapped_phase)
|
||
|
||
# Combine amplitude and phase
|
||
processed = np.stack([filtered_amplitude, filtered_phase], axis=-1)
|
||
|
||
return processed
|
||
```
|
||
|
||
#### 4.2.2 Background Subtraction
|
||
**Adaptive Background Model**:
|
||
```python
|
||
class AdaptiveBackgroundModel:
|
||
def __init__(self, learning_rate=0.01):
|
||
self.learning_rate = learning_rate
|
||
self.background = None
|
||
self.variance = None
|
||
|
||
def update_background(self, csi_data):
|
||
"""Update background model with new data"""
|
||
if self.background is None:
|
||
self.background = csi_data.copy()
|
||
self.variance = np.ones_like(csi_data)
|
||
return
|
||
|
||
# Exponential moving average
|
||
self.background = (1 - self.learning_rate) * self.background + \
|
||
self.learning_rate * csi_data
|
||
|
||
# Update variance estimate
|
||
diff = csi_data - self.background
|
||
self.variance = (1 - self.learning_rate) * self.variance + \
|
||
self.learning_rate * (diff ** 2)
|
||
|
||
def subtract_background(self, csi_data):
|
||
"""Subtract background from CSI data"""
|
||
if self.background is None:
|
||
return csi_data
|
||
|
||
# Subtract background
|
||
foreground = csi_data - self.background
|
||
|
||
# Normalize by variance
|
||
normalized = foreground / (np.sqrt(self.variance) + 1e-10)
|
||
|
||
return normalized
|
||
```
|
||
|
||
---
|
||
|
||
## 5. API Implementation
|
||
|
||
### 5.1 REST API Architecture
|
||
|
||
#### 5.1.1 FastAPI Server Implementation
|
||
**Main Server Structure**:
|
||
```python
|
||
from fastapi import FastAPI, WebSocket, HTTPException
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
import asyncio
|
||
|
||
app = FastAPI(title="WiFi-DensePose API", version="1.0.0")
|
||
|
||
# CORS middleware
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# Global state
|
||
pose_estimator = None
|
||
websocket_manager = WebSocketManager()
|
||
|
||
@app.on_event("startup")
|
||
async def startup_event():
|
||
"""Initialize system on startup"""
|
||
global pose_estimator
|
||
pose_estimator = PoseEstimator()
|
||
await pose_estimator.initialize()
|
||
|
||
@app.get("/pose/latest")
|
||
async def get_latest_pose():
|
||
"""Get latest pose estimation results"""
|
||
if pose_estimator is None:
|
||
raise HTTPException(status_code=503, detail="System not initialized")
|
||
|
||
latest_pose = await pose_estimator.get_latest_pose()
|
||
if latest_pose is None:
|
||
raise HTTPException(status_code=404, detail="No pose data available")
|
||
|
||
return {
|
||
"timestamp": latest_pose.timestamp,
|
||
"persons": [person.to_dict() for person in latest_pose.persons],
|
||
"metadata": latest_pose.metadata
|
||
}
|
||
|
||
@app.get("/pose/history")
|
||
async def get_pose_history(
|
||
start_time: Optional[datetime] = None,
|
||
end_time: Optional[datetime] = None,
|
||
limit: int = 100
|
||
):
|
||
"""Get historical pose data"""
|
||
history = await pose_estimator.get_pose_history(
|
||
start_time=start_time,
|
||
end_time=end_time,
|
||
limit=limit
|
||
)
|
||
|
||
return {
|
||
"poses": [pose.to_dict() for pose in history],
|
||
"count": len(history)
|
||
}
|
||
```
|
||
|
||
#### 5.1.2 WebSocket Implementation
|
||
**Real-Time Streaming**:
|
||
```python
|
||
class WebSocketManager:
|
||
def __init__(self):
|
||
self.active_connections: List[WebSocket] = []
|
||
self.connection_info: Dict[WebSocket, dict] = {}
|
||
|
||
async def connect(self, websocket: WebSocket, client_info: dict):
|
||
"""Accept new WebSocket connection"""
|
||
await websocket.accept()
|
||
self.active_connections.append(websocket)
|
||
self.connection_info[websocket] = client_info
|
||
logger.info(f"Client connected: {client_info}")
|
||
|
||
def disconnect(self, websocket: WebSocket):
|
||
"""Remove WebSocket connection"""
|
||
if websocket in self.active_connections:
|
||
self.active_connections.remove(websocket)
|
||
del self.connection_info[websocket]
|
||
|
||
async def broadcast_pose_data(self, pose_data: dict):
|
||
"""Broadcast pose data to all connected clients"""
|
||
if not self.active_connections:
|
||
return
|
||
|
||
message = {
|
||
"type": "pose_update",
|
||
"data": pose_data,
|
||
"timestamp": datetime.utcnow().isoformat()
|
||
}
|
||
|
||
# Send to all connections
|
||
disconnected = []
|
||
for connection in self.active_connections:
|
||
try:
|
||
await connection.send_json(message)
|
||
except Exception as e:
|
||
logger.error(f"WebSocket send error: {e}")
|
||
disconnected.append(connection)
|
||
|
||
# Clean up disconnected clients
|
||
for connection in disconnected:
|
||
self.disconnect(connection)
|
||
|
||
@app.websocket("/ws/pose")
|
||
async def websocket_pose_endpoint(websocket: WebSocket):
|
||
"""WebSocket endpoint for real-time pose data"""
|
||
client_info = {
|
||
"client_ip": websocket.client.host,
|
||
"connect_time": datetime.utcnow()
|
||
}
|
||
|
||
await websocket_manager.connect(websocket, client_info)
|
||
|
||
try:
|
||
while True:
|
||
# Keep connection alive and handle client messages
|
||
data = await websocket.receive_text()
|
||
# Process client commands if needed
|
||
await handle_websocket_command(websocket, data)
|
||
except Exception as e:
|
||
logger.error(f"WebSocket error: {e}")
|
||
finally:
|
||
websocket_manager.disconnect(websocket)
|
||
```
|
||
|
||
### 5.2 External Integration APIs
|
||
|
||
#### 5.2.1 MQTT Integration
|
||
**MQTT Publisher Implementation**:
|
||
```python
|
||
import paho.mqtt.client as mqtt
|
||
import json
|
||
|
||
class MQTTPublisher:
|
||
def __init__(self, broker_host, broker_port=1883):
|
||
self.broker_host = broker_host
|
||
self.broker_port = broker_port
|
||
self.client = mqtt.Client()
|
||
self.connected = False
|
||
|
||
async def connect(self):
|
||
"""Connect to MQTT broker"""
|
||
def on_connect(client, userdata, flags, rc):
|
||
if rc == 0:
|
||
self.connected = True
|
||
logger.info("Connected to MQTT broker")
|
||
else:
|
||
logger.error(f"MQTT connection failed: {rc}")
|
||
|
||
def on_disconnect(client, userdata, rc):
|
||
self.connected = False
|
||
logger.info("Disconnected from MQTT broker")
|
||
|
||
self.client.on_connect = on_connect
|
||
self.client.on_disconnect = on_disconnect
|
||
|
||
try:
|
||
self.client.connect(self.broker_host, self.broker_port, 60)
|
||
self.client.loop_start()
|
||
except Exception as e:
|
||
logger.error(f"MQTT connection error: {e}")
|
||
|
||
async def publish_pose_data(self, pose_data):
|
||
"""Publish pose data to MQTT topics"""
|
||
if not self.connected:
|
||
return
|
||
|
||
# Publish individual person data
|
||
for person in pose_data.persons:
|
||
topic = f"wifi-densepose/pose/person/{person.id}"
|
||
payload = {
|
||
"id": person.id,
|
||
"keypoints": person.keypoints,
|
||
"confidence": person.confidence,
|
||
"timestamp": pose_data.timestamp
|
||
}
|
||
|
||
self.client.publish(topic, json.dumps(payload))
|
||
|
||
# Publish summary data
|
||
summary_topic = "wifi-densepose/summary"
|
||
summary_payload = {
|
||
"person_count": len(pose_data.persons),
|
||
"timestamp": pose_data.timestamp,
|
||
"processing_time": pose_data.metadata.get("processing_time", 0)
|
||
}
|
||
|
||
self.client.publish(summary_topic, json.dumps(summary_payload))
|
||
```
|
||
|
||
#### 5.2.2 Webhook Integration
|
||
**Webhook Delivery System**:
|
||
```python
|
||
import aiohttp
|
||
import asyncio
|
||
from typing import List, Dict
|
||
|
||
class WebhookManager:
|
||
def __init__(self):
|
||
self.webhooks: List[Dict] = []
|
||
self.session = None
|
||
|
||
async def initialize(self):
|
||
"""Initialize HTTP session"""
|
||
self.session = aiohttp.ClientSession()
|
||
|
||
def add_webhook(self, url: str, events: List[str], auth: Dict = None):
|
||
"""Add webhook configuration"""
|
||
webhook = {
|
||
"url": url,
|
||
"events": events,
|
||
"auth": auth,
|
||
"retry_count": 0,
|
||
"max_retries": 3
|
||
}
|
||
self.webhooks.append(webhook)
|
||
|
||
async def send_webhook(self, event_type: str, data: Dict):
|
||
"""Send webhook notifications for event"""
|
||
relevant_webhooks = [
|
||
wh for wh in self.webhooks
|
||
if event_type in wh["events"]
|
||
]
|
||
|
||
tasks = []
|
||
for webhook in relevant_webhooks:
|
||
task = asyncio.create_task(
|
||
self._deliver_webhook(webhook, event_type, data)
|
||
)
|
||
tasks.append(task)
|
||
|
||
if tasks:
|
||
await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
async def _deliver_webhook(self, webhook: Dict, event_type: str, data: Dict):
|
||
"""Deliver individual webhook with retry logic"""
|
||
payload = {
|
||
"event": event_type,
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
"data": data
|
||
}
|
||
|
||
headers = {"Content-Type": "application/json"}
|
||
|
||
# Add authentication if configured
|
||
if webhook.get("auth"):
|
||
auth = webhook["auth"]
|
||
if auth.get("type") == "bearer":
|
||
headers["Authorization"] = f"Bearer {auth['token']}"
|
||
elif auth.get("type") == "basic":
|
||
# Handle basic auth
|
||
pass
|
||
|
||
for attempt in range(webhook["max_retries"]):
|
||
try:
|
||
async with self.session.post(
|
||
webhook["url"],
|
||
json=payload,
|
||
headers=headers,
|
||
timeout=aiohttp.ClientTimeout(total=10)
|
||
) as response:
|
||
if response.status < 400:
|
||
logger.info(f"Webhook delivered: {webhook['url']}")
|
||
return
|
||
else:
|
||
logger.warning(f"Webhook failed: {response.status}")
|
||
|
||
except Exception as e:
|
||
|
||
logger.error(f"Webhook delivery failed: {e}")
|
||
await asyncio.sleep(2 ** attempt) # Exponential backoff
|
||
|
||
logger.error(f"Webhook delivery failed after {webhook['max_retries']} attempts")
|
||
```
|
||
|
||
---
|
||
|
||
## 6. Performance Requirements and Optimization
|
||
|
||
### 6.1 System Performance Specifications
|
||
|
||
#### 6.1.1 Processing Performance
|
||
**Real-Time Processing Requirements**:
|
||
- **End-to-End Latency**: <100ms (95th percentile)
|
||
- **Processing Throughput**: 10-30 FPS depending on hardware configuration
|
||
- **Memory Usage**: <4GB RAM for standard operation
|
||
- **GPU Memory**: <2GB VRAM for neural network inference
|
||
|
||
**Performance Scaling**:
|
||
```python
|
||
class PerformanceManager:
|
||
def __init__(self):
|
||
self.performance_targets = {
|
||
"cpu_only": {"fps": 10, "latency_ms": 150},
|
||
"gpu_basic": {"fps": 20, "latency_ms": 100},
|
||
"gpu_high_end": {"fps": 30, "latency_ms": 75}
|
||
}
|
||
|
||
def detect_hardware_capability(self):
|
||
"""Detect available hardware and set performance targets"""
|
||
if torch.cuda.is_available():
|
||
gpu_memory = torch.cuda.get_device_properties(0).total_memory
|
||
if gpu_memory > 8e9: # 8GB+
|
||
return "gpu_high_end"
|
||
else:
|
||
return "gpu_basic"
|
||
return "cpu_only"
|
||
|
||
def optimize_for_hardware(self, capability):
|
||
"""Optimize processing pipeline for detected hardware"""
|
||
targets = self.performance_targets[capability]
|
||
|
||
# Adjust batch sizes
|
||
if capability == "gpu_high_end":
|
||
self.batch_size = 8
|
||
self.model_precision = torch.float16
|
||
elif capability == "gpu_basic":
|
||
self.batch_size = 4
|
||
self.model_precision = torch.float32
|
||
else:
|
||
self.batch_size = 1
|
||
self.model_precision = torch.float32
|
||
```
|
||
|
||
// TEST: Verify performance targets are met on different hardware configurations
|
||
// TEST: Confirm automatic hardware detection and optimization
|
||
// TEST: Validate memory usage stays within specified limits
|
||
|
||
#### 6.1.2 Scalability Requirements
|
||
**Concurrent Processing**: Support multiple simultaneous operations
|
||
- **API Requests**: 1000 concurrent REST API requests
|
||
- **WebSocket Connections**: 100 simultaneous streaming clients
|
||
- **Data Processing**: Parallel CSI stream processing
|
||
- **Storage Operations**: Concurrent read/write operations
|
||
|
||
**Resource Management**:
|
||
```python
|
||
class ResourceManager:
|
||
def __init__(self, max_memory_gb=4, max_gpu_memory_gb=2):
|
||
self.max_memory = max_memory_gb * 1e9
|
||
self.max_gpu_memory = max_gpu_memory_gb * 1e9
|
||
self.memory_monitor = MemoryMonitor()
|
||
|
||
async def monitor_resources(self):
|
||
"""Continuously monitor system resources"""
|
||
while True:
|
||
memory_usage = self.memory_monitor.get_memory_usage()
|
||
gpu_usage = self.memory_monitor.get_gpu_memory_usage()
|
||
|
||
if memory_usage > 0.9 * self.max_memory:
|
||
await self.trigger_memory_cleanup()
|
||
|
||
if gpu_usage > 0.9 * self.max_gpu_memory:
|
||
await self.trigger_gpu_cleanup()
|
||
|
||
await asyncio.sleep(5) # Check every 5 seconds
|
||
|
||
async def trigger_memory_cleanup(self):
|
||
"""Trigger memory cleanup procedures"""
|
||
# Clear data buffers
|
||
self.data_buffer.clear_old_entries()
|
||
# Force garbage collection
|
||
gc.collect()
|
||
# Reduce batch sizes temporarily
|
||
self.reduce_batch_sizes()
|
||
```
|
||
|
||
// TEST: Verify system handles specified concurrent load
|
||
// TEST: Confirm resource monitoring prevents memory exhaustion
|
||
// TEST: Validate automatic resource cleanup procedures
|
||
|
||
### 6.2 Neural Network Optimization
|
||
|
||
#### 6.2.1 Model Optimization Techniques
|
||
**Quantization**: Reduce model size and improve inference speed
|
||
```python
|
||
class ModelOptimizer:
|
||
def __init__(self, model):
|
||
self.model = model
|
||
|
||
def apply_quantization(self, quantization_type="dynamic"):
|
||
"""Apply quantization to reduce model size"""
|
||
if quantization_type == "dynamic":
|
||
# Dynamic quantization for CPU inference
|
||
quantized_model = torch.quantization.quantize_dynamic(
|
||
self.model,
|
||
{torch.nn.Linear, torch.nn.Conv2d},
|
||
dtype=torch.qint8
|
||
)
|
||
elif quantization_type == "static":
|
||
# Static quantization for better performance
|
||
quantized_model = self.apply_static_quantization()
|
||
|
||
return quantized_model
|
||
|
||
def apply_pruning(self, sparsity=0.3):
|
||
"""Apply structured pruning to reduce model complexity"""
|
||
import torch.nn.utils.prune as prune
|
||
|
||
for module in self.model.modules():
|
||
if isinstance(module, torch.nn.Conv2d):
|
||
prune.l1_unstructured(module, name='weight', amount=sparsity)
|
||
|
||
return self.model
|
||
```
|
||
|
||
// TEST: Verify quantization maintains accuracy while improving speed
|
||
// TEST: Confirm pruning reduces model size without significant accuracy loss
|
||
// TEST: Validate optimization techniques work on target hardware
|
||
|
||
#### 6.2.2 Inference Optimization
|
||
**Batch Processing**: Optimize throughput with intelligent batching
|
||
```python
|
||
class InferenceBatcher:
|
||
def __init__(self, max_batch_size=8, max_wait_time=0.01):
|
||
self.max_batch_size = max_batch_size
|
||
self.max_wait_time = max_wait_time
|
||
self.pending_requests = []
|
||
self.batch_timer = None
|
||
|
||
async def add_request(self, csi_data, callback):
|
||
"""Add inference request to batch"""
|
||
request = {
|
||
'data': csi_data,
|
||
'callback': callback,
|
||
'timestamp': time.time()
|
||
}
|
||
|
||
self.pending_requests.append(request)
|
||
|
||
if len(self.pending_requests) >= self.max_batch_size:
|
||
await self.process_batch()
|
||
elif self.batch_timer is None:
|
||
self.batch_timer = asyncio.create_task(
|
||
self.wait_and_process()
|
||
)
|
||
|
||
async def process_batch(self):
|
||
"""Process current batch of requests"""
|
||
if not self.pending_requests:
|
||
return
|
||
|
||
# Extract data and callbacks
|
||
batch_data = [req['data'] for req in self.pending_requests]
|
||
callbacks = [req['callback'] for req in self.pending_requests]
|
||
|
||
# Process batch
|
||
batch_tensor = torch.stack(batch_data)
|
||
with torch.no_grad():
|
||
batch_results = self.model(batch_tensor)
|
||
|
||
# Return results to callbacks
|
||
for i, callback in enumerate(callbacks):
|
||
await callback(batch_results[i])
|
||
|
||
# Clear processed requests
|
||
self.pending_requests.clear()
|
||
```
|
||
|
||
// TEST: Verify batch processing improves overall throughput
|
||
// TEST: Confirm batching maintains acceptable latency
|
||
// TEST: Validate batch timer prevents indefinite waiting
|
||
|
||
### 6.3 Hardware Interface Optimization
|
||
|
||
#### 6.3.1 CSI Data Processing Optimization
|
||
**Parallel Processing**: Optimize CSI data collection and processing
|
||
```python
|
||
class OptimizedCSIProcessor:
|
||
def __init__(self, num_workers=4):
|
||
self.num_workers = num_workers
|
||
self.processing_pool = ProcessPoolExecutor(max_workers=num_workers)
|
||
self.data_queue = asyncio.Queue(maxsize=1000)
|
||
|
||
async def start_processing(self):
|
||
"""Start parallel CSI processing workers"""
|
||
tasks = []
|
||
for i in range(self.num_workers):
|
||
task = asyncio.create_task(self.processing_worker(i))
|
||
tasks.append(task)
|
||
|
||
await asyncio.gather(*tasks)
|
||
|
||
def process_csi_data(self, csi_data):
|
||
"""CPU-intensive CSI processing in separate process"""
|
||
# Phase unwrapping
|
||
phase_unwrapped = np.unwrap(np.angle(csi_data), axis=-1)
|
||
|
||
# Amplitude processing
|
||
amplitude_db = 20 * np.log10(np.abs(csi_data) + 1e-10)
|
||
|
||
# Apply filters using optimized NumPy operations
|
||
filtered_phase = scipy.signal.savgol_filter(phase_unwrapped, 5, 2, axis=-1)
|
||
filtered_amplitude = scipy.signal.savgol_filter(amplitude_db, 5, 2, axis=-1)
|
||
|
||
# Combine and normalize
|
||
processed = np.stack([filtered_amplitude, filtered_phase], axis=-1)
|
||
normalized = (processed - processed.mean()) / (processed.std() + 1e-10)
|
||
|
||
return normalized
|
||
```
|
||
|
||
// TEST: Verify parallel processing improves CSI data throughput
|
||
// TEST: Confirm worker processes handle errors gracefully
|
||
// TEST: Validate processed data quality meets neural network requirements
|
||
|
||
---
|
||
|
||
## 7. Deployment and Infrastructure
|
||
|
||
### 7.1 Container Architecture
|
||
|
||
#### 7.1.1 Docker Configuration
|
||
**Multi-Stage Build**: Optimize container size and security
|
||
```dockerfile
|
||
# Build stage
|
||
FROM python:3.9-slim as builder
|
||
|
||
WORKDIR /app
|
||
COPY requirements.txt .
|
||
RUN pip install --no-cache-dir --user -r requirements.txt
|
||
|
||
# Production stage
|
||
FROM python:3.9-slim
|
||
|
||
# Install system dependencies
|
||
RUN apt-get update && apt-get install -y \
|
||
libgl1-mesa-glx \
|
||
libglib2.0-0 \
|
||
libsm6 \
|
||
libxext6 \
|
||
libxrender-dev \
|
||
libgomp1 \
|
||
&& rm -rf /var/lib/apt/lists/*
|
||
|
||
# Copy Python packages from builder
|
||
COPY --from=builder /root/.local /root/.local
|
||
ENV PATH=/root/.local/bin:$PATH
|
||
|
||
# Copy application code
|
||
WORKDIR /app
|
||
COPY . .
|
||
|
||
# Create non-root user
|
||
RUN useradd -m -u 1000 wifipose && \
|
||
chown -R wifipose:wifipose /app
|
||
USER wifipose
|
||
|
||
# Health check
|
||
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
|
||
CMD curl -f http://localhost:8000/health || exit 1
|
||
|
||
EXPOSE 8000
|
||
CMD ["python", "-m", "wifi_densepose.main"]
|
||
```
|
||
|
||
#### 7.1.2 Kubernetes Deployment
|
||
**Production Deployment Configuration**:
|
||
```yaml
|
||
apiVersion: apps/v1
|
||
kind: Deployment
|
||
metadata:
|
||
name: wifi-densepose
|
||
labels:
|
||
app: wifi-densepose
|
||
spec:
|
||
replicas: 3
|
||
selector:
|
||
matchLabels:
|
||
app: wifi-densepose
|
||
template:
|
||
metadata:
|
||
labels:
|
||
app: wifi-densepose
|
||
spec:
|
||
containers:
|
||
- name: wifi-densepose
|
||
image: wifi-densepose:latest
|
||
ports:
|
||
- containerPort: 8000
|
||
env:
|
||
- name: CUDA_VISIBLE_DEVICES
|
||
value: "0"
|
||
- name: LOG_LEVEL
|
||
value: "INFO"
|
||
resources:
|
||
requests:
|
||
memory: "2Gi"
|
||
cpu: "1000m"
|
||
nvidia.com/gpu: 1
|
||
limits:
|
||
memory: "4Gi"
|
||
cpu: "2000m"
|
||
nvidia.com/gpu: 1
|
||
livenessProbe:
|
||
httpGet:
|
||
path: /health
|
||
port: 8000
|
||
initialDelaySeconds: 60
|
||
periodSeconds: 30
|
||
readinessProbe:
|
||
httpGet:
|
||
path: /ready
|
||
port: 8000
|
||
initialDelaySeconds: 30
|
||
periodSeconds: 10
|
||
```
|
||
|
||
// TEST: Verify Docker container builds and runs correctly
|
||
// TEST: Confirm Kubernetes deployment scales properly
|
||
// TEST: Validate health checks and resource limits
|
||
|
||
### 7.2 Monitoring and Observability
|
||
|
||
#### 7.2.1 Metrics Collection
|
||
**Prometheus Integration**: Comprehensive metrics collection
|
||
```python
|
||
from prometheus_client import Counter, Histogram, Gauge, start_http_server
|
||
|
||
class MetricsCollector:
|
||
def __init__(self):
|
||
# Performance metrics
|
||
self.inference_duration = Histogram(
|
||
'inference_duration_seconds',
|
||
'Time spent on neural network inference',
|
||
buckets=[0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0]
|
||
)
|
||
|
||
self.pose_detection_count = Counter(
|
||
'pose_detections_total',
|
||
'Total number of pose detections',
|
||
['confidence_level']
|
||
)
|
||
|
||
self.active_persons = Gauge(
|
||
'active_persons_current',
|
||
'Current number of tracked persons'
|
||
)
|
||
|
||
# System metrics
|
||
self.memory_usage = Gauge(
|
||
'memory_usage_bytes',
|
||
'Current memory usage in bytes'
|
||
)
|
||
|
||
self.gpu_utilization = Gauge(
|
||
'gpu_utilization_percent',
|
||
'GPU utilization percentage'
|
||
)
|
||
|
||
def record_inference_time(self, duration):
|
||
"""Record neural network inference time"""
|
||
self.inference_duration.observe(duration)
|
||
|
||
def start_metrics_server(self, port=8001):
|
||
"""Start Prometheus metrics server"""
|
||
start_http_server(port)
|
||
logger.info(f"Metrics server started on port {port}")
|
||
```
|
||
|
||
// TEST: Verify metrics collection captures all key performance indicators
|
||
// TEST: Confirm Prometheus integration works correctly
|
||
// TEST: Validate metrics provide actionable insights
|
||
|
||
---
|
||
|
||
## 8. Security and Compliance
|
||
|
||
### 8.1 Data Security
|
||
|
||
#### 8.1.1 Privacy-Preserving Design
|
||
**Data Minimization**: Collect only necessary data for pose estimation
|
||
```python
|
||
class PrivacyPreservingProcessor:
|
||
def __init__(self):
|
||
self.data_retention_days = 7 # Configurable retention period
|
||
self.anonymization_enabled = True
|
||
|
||
def process_pose_data(self, raw_poses):
|
||
"""Process poses with privacy preservation"""
|
||
if self.anonymization_enabled:
|
||
# Remove personally identifiable features
|
||
anonymized_poses = self.anonymize_poses(raw_poses)
|
||
return anonymized_poses
|
||
return raw_poses
|
||
|
||
def anonymize_poses(self, poses):
|
||
"""Remove identifying characteristics from pose data"""
|
||
anonymized = []
|
||
|
||
for pose in poses:
|
||
# Remove fine-grained features that could identify individuals
|
||
anonymized_pose = {
|
||
'keypoints': self.generalize_keypoints(pose['keypoints']),
|
||
'confidence': pose['confidence'],
|
||
'timestamp': pose['timestamp'],
|
||
'activity': pose.get('activity', 'unknown')
|
||
}
|
||
anonymized.append(anonymized_pose)
|
||
|
||
return anonymized
|
||
|
||
async def cleanup_old_data(self):
|
||
"""Automatically delete old data based on retention policy"""
|
||
cutoff_date = datetime.now() - timedelta(days=self.data_retention_days)
|
||
|
||
# Delete old pose data
|
||
await self.database.delete_poses_before(cutoff_date)
|
||
|
||
# Delete old CSI data
|
||
await self.database.delete_csi_before(cutoff_date)
|
||
|
||
logger.info(f"Cleaned up data older than {cutoff_date}")
|
||
```
|
||
|
||
// TEST: Verify anonymization removes identifying characteristics
|
||
// TEST: Confirm data retention policies are enforced automatically
|
||
// TEST: Validate privacy preservation doesn't impact functionality
|
||
|
||
---
|
||
|
||
## 9. Testing and Quality Assurance
|
||
|
||
### 9.1 London School TDD Implementation
|
||
|
||
#### 9.1.1 Test-First Development
|
||
**Comprehensive Test Coverage**: Following London School TDD principles
|
||
```python
|
||
import pytest
|
||
import asyncio
|
||
from unittest.mock import Mock, AsyncMock, patch
|
||
import numpy as np
|
||
import torch
|
||
|
||
class TestPoseEstimationPipeline:
|
||
"""Test suite following London School TDD principles"""
|
||
|
||
@pytest.fixture
|
||
def mock_csi_data(self):
|
||
"""Generate synthetic CSI data for testing"""
|
||
return np.random.complex128((3, 3, 56, 100)) # 3x3 MIMO, 56 subcarriers, 100 samples
|
||
|
||
@pytest.fixture
|
||
def mock_neural_network(self):
|
||
"""Mock neural network for isolated testing"""
|
||
mock_network = Mock()
|
||
mock_network.forward.return_value = torch.randn(1, 17, 3) # Mock pose output
|
||
return mock_network
|
||
|
||
async def test_csi_preprocessing_pipeline(self, mock_csi_data):
|
||
"""Test CSI preprocessing produces valid output"""
|
||
# Arrange
|
||
processor = CSIProcessor()
|
||
|
||
# Act
|
||
processed_data = await processor.preprocess(mock_csi_data)
|
||
|
||
# Assert
|
||
assert processed_data.shape == (3, 3, 56, 100)
|
||
assert not np.isnan(processed_data).any()
|
||
assert not np.isinf(processed_data).any()
|
||
|
||
# Verify phase unwrapping
|
||
phase_data = np.angle(processed_data)
|
||
phase_diff = np.diff(phase_data, axis=-1)
|
||
assert np.abs(phase_diff).max() < np.pi # No phase jumps > π
|
||
|
||
async def test_neural_network_inference_performance(self, mock_csi_data, mock_neural_network):
|
||
"""Test neural network inference meets performance requirements"""
|
||
# Arrange
|
||
estimator = PoseEstimator()
|
||
estimator.neural_network = mock_neural_network
|
||
|
||
# Act
|
||
start_time = time.time()
|
||
result = await estimator.neural_inference(mock_csi_data)
|
||
inference_time = time.time() - start_time
|
||
|
||
# Assert
|
||
assert inference_time < 0.05 # <50ms requirement
|
||
assert result is not None
|
||
mock_neural_network.forward.assert_called_once()
|
||
|
||
async def test_fall_detection_accuracy(self):
|
||
"""Test fall detection algorithm accuracy"""
|
||
# Arrange
|
||
fall_detector = FallDetector()
|
||
|
||
# Simulate fall trajectory
|
||
fall_trajectory = [
|
||
{'position': np.array([100, 100]), 'timestamp': 0.0}, # Standing
|
||
{'position': np.array([100, 120]), 'timestamp': 0.5}, # Falling
|
||
{'position': np.array([100, 180]), 'timestamp': 1.0}, # On ground
|
||
{'position': np.array([100, 180]), 'timestamp': 1.5}, # Still on ground
|
||
]
|
||
|
||
# Act
|
||
fall_detected = False
|
||
for pose in fall_trajectory:
|
||
result = fall_detector.analyze_pose(pose)
|
||
if result['fall_detected']:
|
||
fall_detected = True
|
||
break
|
||
|
||
# Assert
|
||
assert fall_detected
|
||
assert result['confidence'] > 0.8
|
||
```
|
||
|
||
// TEST: Verify all test cases pass with >95% coverage
|
||
// TEST: Confirm TDD approach catches regressions early
|
||
// TEST: Validate integration tests cover real-world scenarios
|
||
|
||
---
|
||
|
||
## 10. Acceptance Criteria
|
||
|
||
### 10.1 Technical Implementation Criteria
|
||
- **CSI Processing Pipeline**: Real-time CSI data collection and preprocessing functional
|
||
- **Neural Network Integration**: DensePose model integration with <50ms inference time
|
||
- **Multi-Person Tracking**: Robust tracking of up to 5 individuals simultaneously
|
||
- **API Implementation**: Complete REST and WebSocket API implementation
|
||
- **Performance Targets**: All latency and throughput requirements met
|
||
|
||
### 10.2 Integration Criteria
|
||
- **Hardware Integration**: Successful integration with WiFi routers and CSI extraction
|
||
- **External Service Integration**: MQTT, webhook, and Restream integrations operational
|
||
- **Database Integration**: Efficient data storage and retrieval implementation
|
||
- **Monitoring Integration**: Comprehensive system monitoring and alerting
|
||
|
||
### 10.3 Quality Assurance Criteria
|
||
- **Test Coverage**: >90% unit test coverage, complete integration test suite
|
||
- **Performance Validation**: All performance benchmarks met under load testing
|
||
- **Security Validation**: Security measures tested and vulnerabilities addressed
|
||
- **Documentation Completeness**: Technical documentation complete and accurate
|
||
|
||
// TEST: Verify all technical implementation criteria are met
|
||
// TEST: Confirm integration criteria are satisfied
|
||
// TEST: Validate quality assurance criteria through comprehensive testing
|