54 KiB
54 KiB
Testing Guide
Overview
This guide provides comprehensive information about testing the WiFi-DensePose system, including test types, frameworks, best practices, and continuous integration setup. Our testing strategy ensures reliability, performance, and maintainability of the codebase.
Table of Contents
- Testing Philosophy
- Test Types and Structure
- Testing Frameworks and Tools
- Unit Testing
- Integration Testing
- End-to-End Testing
- Performance Testing
- Test Data and Fixtures
- Mocking and Test Doubles
- Continuous Integration
- Test Coverage
- Testing Best Practices
Testing Philosophy
Test Pyramid
We follow the test pyramid approach:
/\
/ \ E2E Tests (Few)
/____\ - Full system integration
/ \ - User journey validation
/________\ Integration Tests (Some)
- Component interaction
- API contract testing
___________
Unit Tests (Many)
- Individual function testing
- Fast feedback loop
Testing Principles
- Fast Feedback: Unit tests provide immediate feedback
- Reliability: Tests should be deterministic and stable
- Maintainability: Tests should be easy to understand and modify
- Coverage: Critical paths must be thoroughly tested
- Isolation: Tests should not depend on external systems
- Documentation: Tests serve as living documentation
Test Types and Structure
Directory Structure
tests/
├── unit/ # Unit tests
│ ├── api/
│ │ ├── test_routers.py
│ │ └── test_middleware.py
│ ├── neural_network/
│ │ ├── test_inference.py
│ │ ├── test_models.py
│ │ └── test_training.py
│ ├── hardware/
│ │ ├── test_csi_processor.py
│ │ ├── test_router_interface.py
│ │ └── test_phase_sanitizer.py
│ ├── tracking/
│ │ ├── test_tracker.py
│ │ └── test_kalman_filter.py
│ └── analytics/
│ ├── test_event_detection.py
│ └── test_metrics.py
├── integration/ # Integration tests
│ ├── test_api_endpoints.py
│ ├── test_database_operations.py
│ ├── test_neural_network_pipeline.py
│ └── test_hardware_integration.py
├── e2e/ # End-to-end tests
│ ├── test_full_pipeline.py
│ ├── test_user_scenarios.py
│ └── test_domain_workflows.py
├── performance/ # Performance tests
│ ├── test_throughput.py
│ ├── test_latency.py
│ └── test_memory_usage.py
├── fixtures/ # Test data and fixtures
│ ├── csi_data/
│ ├── pose_data/
│ ├── config/
│ └── models/
├── conftest.py # Pytest configuration
└── utils/ # Test utilities
├── factories.py
├── helpers.py
└── assertions.py
Test Categories
Unit Tests
- Test individual functions and classes in isolation
- Fast execution (< 1 second per test)
- No external dependencies
- High coverage of business logic
Integration Tests
- Test component interactions
- Database operations
- API contract validation
- External service integration
End-to-End Tests
- Test complete user workflows
- Full system integration
- Real-world scenarios
- Acceptance criteria validation
Performance Tests
- Throughput and latency measurements
- Memory usage profiling
- Scalability testing
- Resource utilization monitoring
Testing Frameworks and Tools
Core Testing Stack
# pytest - Primary testing framework
pytest==7.4.0
pytest-asyncio==0.21.0 # Async test support
pytest-cov==4.1.0 # Coverage reporting
pytest-mock==3.11.1 # Mocking utilities
pytest-xdist==3.3.1 # Parallel test execution
# Testing utilities
factory-boy==3.3.0 # Test data factories
faker==19.3.0 # Fake data generation
freezegun==1.2.2 # Time mocking
responses==0.23.1 # HTTP request mocking
# Performance testing
pytest-benchmark==4.0.0 # Performance benchmarking
memory-profiler==0.60.0 # Memory usage profiling
# API testing
httpx==0.24.1 # HTTP client for testing
pytest-httpx==0.21.3 # HTTP mocking for httpx
Configuration
pytest.ini
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
--strict-markers
--strict-config
--verbose
--tb=short
--cov=src
--cov-report=term-missing
--cov-report=html:htmlcov
--cov-report=xml
--cov-fail-under=80
markers =
unit: Unit tests
integration: Integration tests
e2e: End-to-end tests
performance: Performance tests
slow: Slow running tests
gpu: Tests requiring GPU
hardware: Tests requiring hardware
asyncio_mode = auto
conftest.py
import pytest
import asyncio
from unittest.mock import Mock
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.api.main import app
from src.config.settings import get_settings, get_test_settings
from src.database.models import Base
from tests.utils.factories import CSIDataFactory, PoseEstimationFactory
# Test database setup
@pytest.fixture(scope="session")
def test_db():
"""Create test database."""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
yield TestingSessionLocal
Base.metadata.drop_all(engine)
@pytest.fixture
def db_session(test_db):
"""Create database session for testing."""
session = test_db()
try:
yield session
finally:
session.close()
# API testing setup
@pytest.fixture
def test_client():
"""Create test client with test configuration."""
app.dependency_overrides[get_settings] = get_test_settings
return TestClient(app)
@pytest.fixture
def auth_headers(test_client):
"""Get authentication headers for testing."""
response = test_client.post(
"/api/v1/auth/token",
json={"username": "test_user", "password": "test_password"}
)
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
# Mock hardware components
@pytest.fixture
def mock_csi_processor():
"""Mock CSI processor for testing."""
processor = Mock()
processor.process_frame.return_value = CSIDataFactory()
return processor
@pytest.fixture
def mock_neural_network():
"""Mock neural network for testing."""
network = Mock()
network.predict.return_value = [PoseEstimationFactory()]
return network
# Test data factories
@pytest.fixture
def csi_data():
"""Generate test CSI data."""
return CSIDataFactory()
@pytest.fixture
def pose_estimation():
"""Generate test pose estimation."""
return PoseEstimationFactory()
Unit Testing
Testing Individual Components
CSI Processor Tests
import pytest
import numpy as np
from unittest.mock import Mock, patch
from src.hardware.csi_processor import CSIProcessor, CSIConfig
from src.hardware.models import CSIFrame, ProcessedCSIData
class TestCSIProcessor:
"""Test suite for CSI processor."""
@pytest.fixture
def csi_config(self):
"""Create test CSI configuration."""
return CSIConfig(
buffer_size=100,
sampling_rate=30,
antenna_count=3,
subcarrier_count=56
)
@pytest.fixture
def csi_processor(self, csi_config):
"""Create CSI processor for testing."""
return CSIProcessor(csi_config)
def test_process_frame_valid_data(self, csi_processor):
"""Test processing of valid CSI frame."""
# Arrange
frame = CSIFrame(
timestamp=1704686400.0,
antenna_data=np.random.complex128((3, 56)),
metadata={"router_id": "router_001"}
)
# Act
result = csi_processor.process_frame(frame)
# Assert
assert isinstance(result, ProcessedCSIData)
assert result.timestamp == frame.timestamp
assert result.phase.shape == (3, 56)
assert result.amplitude.shape == (3, 56)
assert np.all(np.isfinite(result.phase))
assert np.all(result.amplitude >= 0)
def test_process_frame_invalid_shape(self, csi_processor):
"""Test processing with invalid data shape."""
# Arrange
frame = CSIFrame(
timestamp=1704686400.0,
antenna_data=np.random.complex128((2, 30)), # Wrong shape
metadata={"router_id": "router_001"}
)
# Act & Assert
with pytest.raises(ValueError, match="Invalid antenna data shape"):
csi_processor.process_frame(frame)
def test_phase_sanitization(self, csi_processor):
"""Test phase unwrapping and sanitization."""
# Arrange
# Create data with phase wrapping
phase_data = np.array([0, np.pi/2, np.pi, -np.pi/2, 0])
complex_data = np.exp(1j * phase_data)
frame = CSIFrame(
timestamp=1704686400.0,
antenna_data=complex_data.reshape(1, -1),
metadata={"router_id": "router_001"}
)
# Act
result = csi_processor.process_frame(frame)
# Assert
# Check that phase is properly unwrapped
phase_diff = np.diff(result.phase[0])
assert np.all(np.abs(phase_diff) < np.pi), "Phase should be unwrapped"
@pytest.mark.asyncio
async def test_process_stream(self, csi_processor):
"""Test continuous stream processing."""
# Arrange
frames = [
CSIFrame(
timestamp=1704686400.0 + i,
antenna_data=np.random.complex128((3, 56)),
metadata={"router_id": "router_001"}
)
for i in range(5)
]
with patch.object(csi_processor, '_receive_frames') as mock_receive:
mock_receive.return_value = iter(frames)
# Act
results = []
async for result in csi_processor.process_stream():
results.append(result)
if len(results) >= 5:
break
# Assert
assert len(results) == 5
for i, result in enumerate(results):
assert result.timestamp == frames[i].timestamp
Neural Network Tests
import pytest
import torch
from unittest.mock import Mock, patch
from src.neural_network.inference import PoseEstimationService
from src.neural_network.models import DensePoseNet
from src.config.settings import ModelConfig
class TestPoseEstimationService:
"""Test suite for pose estimation service."""
@pytest.fixture
def model_config(self):
"""Create test model configuration."""
return ModelConfig(
model_path="test_model.pth",
batch_size=16,
confidence_threshold=0.5,
device="cpu"
)
@pytest.fixture
def pose_service(self, model_config):
"""Create pose estimation service for testing."""
with patch('torch.load') as mock_load:
mock_model = Mock(spec=DensePoseNet)
mock_load.return_value = mock_model
service = PoseEstimationService(model_config)
return service
def test_estimate_poses_single_detection(self, pose_service):
"""Test pose estimation with single person detection."""
# Arrange
csi_features = torch.randn(1, 256, 32, 32)
# Mock model output
mock_output = {
'poses': torch.randn(1, 17, 3), # 17 keypoints, 3 coords each
'confidences': torch.tensor([0.8])
}
pose_service.model.return_value = mock_output
# Act
with torch.no_grad():
result = pose_service.estimate_poses(csi_features)
# Assert
assert len(result) == 1
assert result[0].confidence >= 0.5 # Above threshold
assert len(result[0].keypoints) == 17
pose_service.model.assert_called_once()
def test_estimate_poses_multiple_detections(self, pose_service):
"""Test pose estimation with multiple persons."""
# Arrange
csi_features = torch.randn(1, 256, 32, 32)
# Mock model output for 3 persons
mock_output = {
'poses': torch.randn(3, 17, 3),
'confidences': torch.tensor([0.9, 0.7, 0.3]) # One below threshold
}
pose_service.model.return_value = mock_output
# Act
result = pose_service.estimate_poses(csi_features)
# Assert
assert len(result) == 2 # Only 2 above confidence threshold
assert all(pose.confidence >= 0.5 for pose in result)
def test_estimate_poses_empty_input(self, pose_service):
"""Test pose estimation with empty input."""
# Arrange
csi_features = torch.empty(0, 256, 32, 32)
# Act & Assert
with pytest.raises(ValueError, match="Empty input features"):
pose_service.estimate_poses(csi_features)
@pytest.mark.gpu
def test_gpu_inference(self, model_config):
"""Test GPU inference if available."""
if not torch.cuda.is_available():
pytest.skip("GPU not available")
# Arrange
model_config.device = "cuda"
with patch('torch.load') as mock_load:
mock_model = Mock(spec=DensePoseNet)
mock_load.return_value = mock_model
service = PoseEstimationService(model_config)
csi_features = torch.randn(1, 256, 32, 32).cuda()
# Act
result = service.estimate_poses(csi_features)
# Assert
assert service.device.type == "cuda"
mock_model.assert_called_once()
Tracking Tests
import pytest
import numpy as np
from src.tracking.tracker import PersonTracker, TrackingConfig
from src.tracking.models import Detection, Track
from tests.utils.factories import DetectionFactory
class TestPersonTracker:
"""Test suite for person tracker."""
@pytest.fixture
def tracking_config(self):
"""Create test tracking configuration."""
return TrackingConfig(
max_age=30,
min_hits=3,
iou_threshold=0.3
)
@pytest.fixture
def tracker(self, tracking_config):
"""Create person tracker for testing."""
return PersonTracker(tracking_config)
def test_create_new_track(self, tracker):
"""Test creation of new track from detection."""
# Arrange
detection = DetectionFactory(
bbox=[100, 100, 50, 100],
confidence=0.8
)
# Act
tracks = tracker.update([detection])
# Assert
assert len(tracks) == 0 # Track not confirmed yet (min_hits=3)
assert len(tracker.tracks) == 1
assert tracker.tracks[0].hits == 1
def test_track_confirmation(self, tracker):
"""Test track confirmation after minimum hits."""
# Arrange
detection = DetectionFactory(
bbox=[100, 100, 50, 100],
confidence=0.8
)
# Act - Update tracker multiple times
for _ in range(3):
tracks = tracker.update([detection])
# Assert
assert len(tracks) == 1 # Track should be confirmed
assert tracks[0].is_confirmed()
assert tracks[0].track_id is not None
def test_track_association(self, tracker):
"""Test association of detections with existing tracks."""
# Arrange - Create initial track
detection1 = DetectionFactory(bbox=[100, 100, 50, 100])
for _ in range(3):
tracker.update([detection1])
# Similar detection (should associate)
detection2 = DetectionFactory(bbox=[105, 105, 50, 100])
# Act
tracks = tracker.update([detection2])
# Assert
assert len(tracks) == 1
assert len(tracker.tracks) == 1 # Same track, not new one
# Check that track position was updated
track = tracks[0]
assert abs(track.bbox[0] - 105) < 10 # Position updated
def test_track_loss_and_deletion(self, tracker):
"""Test track loss and deletion after max age."""
# Arrange - Create confirmed track
detection = DetectionFactory(bbox=[100, 100, 50, 100])
for _ in range(3):
tracker.update([detection])
# Act - Update without detections (track should be lost)
for _ in range(35): # Exceed max_age=30
tracks = tracker.update([])
# Assert
assert len(tracks) == 0
assert len(tracker.tracks) == 0 # Track should be deleted
def test_multiple_tracks(self, tracker):
"""Test tracking multiple persons simultaneously."""
# Arrange
detection1 = DetectionFactory(bbox=[100, 100, 50, 100])
detection2 = DetectionFactory(bbox=[300, 100, 50, 100])
# Act - Create two confirmed tracks
for _ in range(3):
tracks = tracker.update([detection1, detection2])
# Assert
assert len(tracks) == 2
track_ids = [track.track_id for track in tracks]
assert len(set(track_ids)) == 2 # Different track IDs
Integration Testing
API Integration Tests
import pytest
import httpx
from fastapi.testclient import TestClient
from unittest.mock import patch, Mock
class TestPoseAPI:
"""Integration tests for pose API endpoints."""
def test_pose_estimation_workflow(self, test_client, auth_headers):
"""Test complete pose estimation workflow."""
# Step 1: Start system
start_response = test_client.post(
"/api/v1/system/start",
json={
"configuration": {
"domain": "healthcare",
"environment_id": "test_room"
}
},
headers=auth_headers
)
assert start_response.status_code == 200
# Step 2: Wait for system to be ready
import time
time.sleep(1) # In real tests, poll status endpoint
# Step 3: Get pose data
pose_response = test_client.get(
"/api/v1/pose/latest",
headers=auth_headers
)
assert pose_response.status_code == 200
pose_data = pose_response.json()
assert "timestamp" in pose_data
assert "persons" in pose_data
# Step 4: Stop system
stop_response = test_client.post(
"/api/v1/system/stop",
headers=auth_headers
)
assert stop_response.status_code == 200
def test_configuration_update_workflow(self, test_client, auth_headers):
"""Test configuration update workflow."""
# Get current configuration
get_response = test_client.get("/api/v1/config", headers=auth_headers)
assert get_response.status_code == 200
original_config = get_response.json()
# Update configuration
update_data = {
"detection": {
"confidence_threshold": 0.8,
"max_persons": 3
}
}
put_response = test_client.put(
"/api/v1/config",
json=update_data,
headers=auth_headers
)
assert put_response.status_code == 200
# Verify configuration was updated
verify_response = test_client.get("/api/v1/config", headers=auth_headers)
updated_config = verify_response.json()
assert updated_config["detection"]["confidence_threshold"] == 0.8
assert updated_config["detection"]["max_persons"] == 3
@pytest.mark.asyncio
async def test_websocket_connection(self, test_client):
"""Test WebSocket connection and data streaming."""
with test_client.websocket_connect("/ws/pose") as websocket:
# Send subscription message
websocket.send_json({
"type": "subscribe",
"channel": "pose_updates",
"filters": {"min_confidence": 0.7}
})
# Receive confirmation
confirmation = websocket.receive_json()
assert confirmation["type"] == "subscription_confirmed"
# Simulate pose data (in real test, trigger actual detection)
with patch('src.api.websocket.pose_manager.broadcast_pose_update'):
# Receive pose update
data = websocket.receive_json()
assert data["type"] == "pose_update"
assert "data" in data
Database Integration Tests
import pytest
from sqlalchemy.orm import Session
from src.database.models import PoseData, SystemConfig
from src.database.operations import PoseDataRepository
from datetime import datetime, timedelta
class TestDatabaseOperations:
"""Integration tests for database operations."""
def test_pose_data_crud(self, db_session: Session):
"""Test CRUD operations for pose data."""
repo = PoseDataRepository(db_session)
# Create
pose_data = PoseData(
timestamp=datetime.utcnow(),
frame_id=12345,
person_id=1,
confidence=0.85,
keypoints=[{"x": 100, "y": 200, "confidence": 0.9}],
environment_id="test_room"
)
created_pose = repo.create(pose_data)
assert created_pose.id is not None
# Read
retrieved_pose = repo.get_by_id(created_pose.id)
assert retrieved_pose.frame_id == 12345
assert retrieved_pose.confidence == 0.85
# Update
retrieved_pose.confidence = 0.90
updated_pose = repo.update(retrieved_pose)
assert updated_pose.confidence == 0.90
# Delete
repo.delete(updated_pose.id)
deleted_pose = repo.get_by_id(updated_pose.id)
assert deleted_pose is None
def test_time_series_queries(self, db_session: Session):
"""Test time-series queries for pose data."""
repo = PoseDataRepository(db_session)
# Create test data with different timestamps
base_time = datetime.utcnow()
test_data = []
for i in range(10):
pose_data = PoseData(
timestamp=base_time + timedelta(minutes=i),
frame_id=i,
person_id=1,
confidence=0.8,
keypoints=[],
environment_id="test_room"
)
test_data.append(repo.create(pose_data))
# Query by time range
start_time = base_time + timedelta(minutes=2)
end_time = base_time + timedelta(minutes=7)
results = repo.get_by_time_range(start_time, end_time)
assert len(results) == 6 # Minutes 2-7 inclusive
# Query latest N records
latest_results = repo.get_latest(limit=3)
assert len(latest_results) == 3
assert latest_results[0].frame_id == 9 # Most recent first
def test_database_performance(self, db_session: Session):
"""Test database performance with large datasets."""
repo = PoseDataRepository(db_session)
# Insert large batch of data
import time
start_time = time.time()
batch_data = []
for i in range(1000):
pose_data = PoseData(
timestamp=datetime.utcnow(),
frame_id=i,
person_id=i % 5, # 5 different persons
confidence=0.8,
keypoints=[],
environment_id="test_room"
)
batch_data.append(pose_data)
repo.bulk_create(batch_data)
insert_time = time.time() - start_time
# Query performance
start_time = time.time()
results = repo.get_latest(limit=100)
query_time = time.time() - start_time
# Assert performance requirements
assert insert_time < 5.0 # Bulk insert should be fast
assert query_time < 0.1 # Query should be very fast
assert len(results) == 100
End-to-End Testing
Full Pipeline Tests
import pytest
import asyncio
import numpy as np
from unittest.mock import patch, Mock
from src.pipeline.main import WiFiDensePosePipeline
from src.config.settings import get_test_settings
class TestFullPipeline:
"""End-to-end tests for complete system pipeline."""
@pytest.fixture
def pipeline(self):
"""Create test pipeline with mocked hardware."""
settings = get_test_settings()
settings.mock_hardware = True
return WiFiDensePosePipeline(settings)
@pytest.mark.asyncio
async def test_complete_pose_estimation_pipeline(self, pipeline):
"""Test complete pipeline from CSI data to pose output."""
# Arrange
mock_csi_data = np.random.complex128((3, 56, 100)) # 3 antennas, 56 subcarriers, 100 samples
with patch.object(pipeline.csi_processor, 'get_latest_data') as mock_csi:
mock_csi.return_value = mock_csi_data
# Act
await pipeline.start()
# Wait for processing
await asyncio.sleep(2)
# Get results
results = await pipeline.get_latest_poses()
# Assert
assert len(results) > 0
for pose in results:
assert pose.confidence > 0
assert len(pose.keypoints) == 17 # COCO format
assert pose.timestamp is not None
await pipeline.stop()
@pytest.mark.asyncio
async def test_healthcare_domain_workflow(self, pipeline):
"""Test healthcare-specific workflow with fall detection."""
# Configure for healthcare domain
await pipeline.configure_domain("healthcare")
# Mock fall scenario
fall_poses = self._create_fall_sequence()
with patch.object(pipeline.pose_estimator, 'estimate_poses') as mock_estimate:
mock_estimate.side_effect = fall_poses
await pipeline.start()
# Wait for fall detection
alerts = []
for _ in range(10): # Check for 10 iterations
await asyncio.sleep(0.1)
new_alerts = await pipeline.get_alerts()
alerts.extend(new_alerts)
if any(alert.type == "fall_detection" for alert in alerts):
break
# Assert fall was detected
fall_alerts = [a for a in alerts if a.type == "fall_detection"]
assert len(fall_alerts) > 0
assert fall_alerts[0].severity in ["medium", "high"]
await pipeline.stop()
def _create_fall_sequence(self):
"""Create sequence of poses simulating a fall."""
# Standing pose
standing_pose = Mock()
standing_pose.keypoints = [
{"name": "head", "y": 100},
{"name": "hip", "y": 200},
{"name": "knee", "y": 300},
{"name": "ankle", "y": 400}
]
# Falling pose (head getting lower)
falling_pose = Mock()
falling_pose.keypoints = [
{"name": "head", "y": 300},
{"name": "hip", "y": 350},
{"name": "knee", "y": 380},
{"name": "ankle", "y": 400}
]
# Fallen pose (horizontal)
fallen_pose = Mock()
fallen_pose.keypoints = [
{"name": "head", "y": 380},
{"name": "hip", "y": 385},
{"name": "knee", "y": 390},
{"name": "ankle", "y": 395}
]
return [
[standing_pose] * 5, # Standing for 5 frames
[falling_pose] * 3, # Falling for 3 frames
[fallen_pose] * 10 # Fallen for 10 frames
]
User Scenario Tests
import pytest
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class TestUserScenarios:
"""End-to-end tests for user scenarios."""
@pytest.fixture
def driver(self):
"""Create web driver for UI testing."""
options = webdriver.ChromeOptions()
options.add_argument("--headless")
driver = webdriver.Chrome(options=options)
yield driver
driver.quit()
def test_dashboard_monitoring_workflow(self, driver):
"""Test user monitoring workflow through dashboard."""
# Navigate to dashboard
driver.get("http://localhost:8000/dashboard")
# Login
username_field = driver.find_element(By.ID, "username")
password_field = driver.find_element(By.ID, "password")
login_button = driver.find_element(By.ID, "login")
username_field.send_keys("test_user")
password_field.send_keys("test_password")
login_button.click()
# Wait for dashboard to load
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "pose-visualization"))
)
# Check that pose data is displayed
pose_count = driver.find_element(By.ID, "person-count")
assert pose_count.text.isdigit()
# Check real-time updates
initial_timestamp = driver.find_element(By.ID, "last-update").text
# Wait for update
WebDriverWait(driver, 5).until(
lambda d: d.find_element(By.ID, "last-update").text != initial_timestamp
)
# Verify update occurred
updated_timestamp = driver.find_element(By.ID, "last-update").text
assert updated_timestamp != initial_timestamp
def test_alert_notification_workflow(self, driver):
"""Test alert notification workflow."""
driver.get("http://localhost:8000/dashboard")
# Login and navigate to alerts page
self._login(driver)
alerts_tab = driver.find_element(By.ID, "alerts-tab")
alerts_tab.click()
# Configure alert settings
fall_detection_toggle = driver.find_element(By.ID, "fall-detection-enabled")
if not fall_detection_toggle.is_selected():
fall_detection_toggle.click()
sensitivity_slider = driver.find_element(By.ID, "fall-sensitivity")
driver.execute_script("arguments[0].value = 0.8", sensitivity_slider)
save_button = driver.find_element(By.ID, "save-settings")
save_button.click()
# Trigger test alert
test_alert_button = driver.find_element(By.ID, "test-fall-alert")
test_alert_button.click()
# Wait for alert notification
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "alert-notification"))
)
# Verify alert details
alert_notification = driver.find_element(By.CLASS_NAME, "alert-notification")
assert "Fall detected" in alert_notification.text
def _login(self, driver):
"""Helper method to login."""
username_field = driver.find_element(By.ID, "username")
password_field = driver.find_element(By.ID, "password")
login_button = driver.find_element(By.ID, "login")
username_field.send_keys("test_user")
password_field.send_keys("test_password")
login_button.click()
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "dashboard"))
)
Performance Testing
Throughput and Latency Tests
import pytest
import time
import asyncio
import statistics
from concurrent.futures import ThreadPoolExecutor
from src.neural_network.inference import PoseEstimationService
class TestPerformance:
"""Performance tests for critical system components."""
@pytest.mark.performance
def test_pose_estimation_latency(self, pose_service):
"""Test pose estimation latency requirements."""
csi_features = torch.randn(1, 256, 32, 32)
# Warm up
for _ in range(5):
pose_service.estimate_poses(csi_features)
# Measure latency
latencies = []
for _ in range(100):
start_time = time.perf_counter()
result = pose_service.estimate_poses(csi_features)
end_time = time.perf_counter()
latency_ms = (end_time - start_time) * 1000
latencies.append(latency_ms)
# Assert latency requirements
avg_latency = statistics.mean(latencies)
p95_latency = statistics.quantiles(latencies, n=20)[18] # 95th percentile
assert avg_latency < 50, f"Average latency {avg_latency:.1f}ms exceeds 50ms"
assert p95_latency < 100, f"P95 latency {p95_latency:.1f}ms exceeds 100ms"
@pytest.mark.performance
async def test_system_throughput(self, pipeline):
"""Test system throughput requirements."""
# Generate test data
test_frames = [
torch.randn(1, 256, 32, 32) for _ in range(1000)
]
start_time = time.perf_counter()
# Process frames concurrently
tasks = []
for frame in test_frames:
task = asyncio.create_task(pipeline.process_frame(frame))
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.perf_counter()
# Calculate throughput
total_time = end_time - start_time
fps = len(test_frames) / total_time
assert fps >= 30, f"Throughput {fps:.1f} FPS below 30 FPS requirement"
assert len(results) == len(test_frames)
@pytest.mark.performance
def test_memory_usage(self, pose_service):
"""Test memory usage during processing."""
import psutil
import gc
process = psutil.Process()
# Baseline memory
gc.collect()
baseline_memory = process.memory_info().rss / 1024 / 1024 # MB
# Process large batch
large_batch = torch.randn(64, 256, 32, 32)
for _ in range(10):
result = pose_service.estimate_poses(large_batch)
del result
# Measure peak memory
peak_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_increase = peak_memory - baseline_memory
# Clean up
gc.collect()
final_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_leak = final_memory - baseline_memory
# Assert memory requirements
assert memory_increase < 2000, f"Memory usage {memory_increase:.1f}MB exceeds 2GB"
assert memory_leak < 100, f"Memory leak {memory_leak:.1f}MB detected"
@pytest.mark.performance
def test_concurrent_requests(self, test_client, auth_headers):
"""Test API performance under concurrent load."""
def make_request():
response = test_client.get("/api/v1/pose/latest", headers=auth_headers)
return response.status_code, response.elapsed.total_seconds()
# Concurrent requests
with ThreadPoolExecutor(max_workers=50) as executor:
start_time = time.perf_counter()
futures = [executor.submit(make_request) for _ in range(200)]
results = [future.result() for future in futures]
end_time = time.perf_counter()
# Analyze results
status_codes = [result[0] for result in results]
response_times = [result[1] for result in results]
success_rate = sum(1 for code in status_codes if code == 200) / len(status_codes)
avg_response_time = statistics.mean(response_times)
total_time = end_time - start_time
# Assert performance requirements
assert success_rate >= 0.95, f"Success rate {success_rate:.2%} below 95%"
assert avg_response_time < 1.0, f"Average response time {avg_response_time:.2f}s exceeds 1s"
assert total_time < 30, f"Total time {total_time:.1f}s exceeds 30s"
Test Data and Fixtures
Data Factories
import factory
import numpy as np
from datetime import datetime
from src.hardware.models import CSIFrame, CSIData
from src.neural_network.models import PoseEstimation, Keypoint
class CSIFrameFactory(factory.Factory):
"""Factory for generating test CSI frames."""
class Meta:
model = CSIFrame
timestamp = factory.LazyFunction(lambda: datetime.utcnow().timestamp())
antenna_data = factory.LazyFunction(
lambda: np.random.complex128((3, 56))
)
metadata = factory.Dict({
"router_id": factory.Sequence(lambda n: f"router_{n:03d}"),
"signal_strength": factory.Faker("pyfloat", min_value=-80, max_value=-20),
"noise_level": factory.Faker("pyfloat", min_value=-100, max_value=-60)
})
class KeypointFactory(factory.Factory):
"""Factory for generating test keypoints."""
class Meta:
model = Keypoint
name = factory.Iterator([
"nose", "left_eye", "right_eye", "left_ear", "right_ear",
"left_shoulder", "right_shoulder", "left_elbow", "right_elbow",
"left_wrist", "right_wrist", "left_hip", "right_hip",
"left_knee", "right_knee", "left_ankle", "right_ankle"
])
x = factory.Faker("pyfloat", min_value=0, max_value=640)
y = factory.Faker("pyfloat", min_value=0, max_value=480)
confidence = factory.Faker("pyfloat", min_value=0.5, max_value=1.0)
visible = factory.Faker("pybool")
class PoseEstimationFactory(factory.Factory):
"""Factory for generating test pose estimations."""
class Meta:
model = PoseEstimation
person_id = factory.Sequence(lambda n: n)
confidence = factory.Faker("pyfloat", min_value=0.5, max_value=1.0)
bounding_box = factory.LazyFunction(
lambda: {
"x": np.random.randint(0, 400),
"y": np.random.randint(0, 300),
"width": np.random.randint(50, 200),
"height": np.random.randint(100, 300)
}
)
keypoints = factory.SubFactoryList(KeypointFactory, size=17)
timestamp = factory.LazyFunction(datetime.utcnow)
Test Fixtures
# tests/fixtures/csi_data.py
import numpy as np
import json
from pathlib import Path
def load_test_csi_data():
"""Load pre-recorded CSI data for testing."""
fixture_path = Path(__file__).parent / "csi_data" / "sample_data.npz"
if fixture_path.exists():
data = np.load(fixture_path)
return {
"amplitude": data["amplitude"],
"phase": data["phase"],
"timestamps": data["timestamps"]
}
else:
# Generate synthetic data if fixture doesn't exist
return generate_synthetic_csi_data()
def generate_synthetic_csi_data():
"""Generate synthetic CSI data for testing."""
num_samples = 1000
num_antennas = 3
num_subcarriers = 56
# Generate realistic CSI patterns
amplitude = np.random.exponential(scale=10, size=(num_samples, num_antennas, num_subcarriers))
phase = np.random.uniform(-np.pi, np.pi, size=(num_samples, num_antennas, num_subcarriers))
timestamps = np.linspace(0, 33.33, num_samples) # 30 FPS for 33.33 seconds
return {
"amplitude": amplitude,
"phase": phase,
"timestamps": timestamps
}
# tests/fixtures/pose_data.py
def load_test_pose_sequences():
"""Load test pose sequences for different scenarios."""
return {
"walking": load_walking_sequence(),
"sitting": load_sitting_sequence(),
"falling": load_falling_sequence(),
"multiple_persons": load_multiple_persons_sequence()
}
def load_walking_sequence():
"""Load walking pose sequence."""
# Simplified walking pattern
poses = []
for frame in range(30): # 1 second at 30 FPS
pose = {
"keypoints": generate_walking_keypoints(frame),
"confidence": 0.8 + 0.1 * np.sin(frame * 0.2),
"timestamp": frame / 30.0
}
poses.append(pose)
return poses
def generate_walking_keypoints(frame):
"""Generate keypoints for walking motion."""
# Simplified walking pattern with leg movement
base_keypoints = {
"nose": {"x": 320, "y": 100},
"left_shoulder": {"x": 300, "y": 150},
"right_shoulder": {"x": 340, "y": 150},
"left_hip": {"x": 310, "y": 250},
"right_hip": {"x": 330, "y": 250},
}
# Add walking motion to legs
leg_offset = 20 * np.sin(frame * 0.4) # Walking cycle
base_keypoints["left_knee"] = {"x": 305 + leg_offset, "y": 350}
base_keypoints["right_knee"] = {"x": 335 - leg_offset, "y": 350}
base_keypoints["left_ankle"] = {"x": 300 + leg_offset, "y": 450}
base_keypoints["right_ankle"] = {"x": 340 - leg_offset, "y": 450}
return base_keypoints
Mocking and Test Doubles
Hardware Mocking
# tests/mocks/hardware.py
from unittest.mock import Mock, AsyncMock
import numpy as np
import asyncio
class MockCSIProcessor:
"""Mock CSI processor for testing."""
def __init__(self, config=None):
self.config = config or {}
self.is_running = False
self._data_generator = self._generate_mock_data()
async def start(self):
"""Start mock CSI processing."""
self.is_running = True
async def stop(self):
"""Stop mock CSI processing."""
self.is_running = False
async def get_latest_frame(self):
"""Get latest mock CSI frame."""
if not self.is_running:
raise RuntimeError("CSI processor not running")
return next(self._data_generator)
def _generate_mock_data(self):
"""Generate realistic mock CSI data."""
frame_id = 0
while True:
# Generate data with some patterns
amplitude = np.random.exponential(scale=10, size=(3, 56))
phase = np.random.uniform(-np.pi, np.pi, size=(3, 56))
# Add some motion patterns
if frame_id % 30 < 15: # Simulate person movement
amplitude *= 1.2
phase += 0.1 * np.sin(frame_id * 0.1)
yield {
"frame_id": frame_id,
"timestamp": frame_id / 30.0,
"amplitude": amplitude,
"phase": phase,
"metadata": {"router_id": "mock_router"}
}
frame_id += 1
class MockNeuralNetwork:
"""Mock neural network for testing."""
def __init__(self, model_config=None):
self.model_config = model_config or {}
self.is_loaded = False
def load_model(self, model_path):
"""Mock model loading."""
self.is_loaded = True
return True
def predict(self, csi_features):
"""Mock pose prediction."""
if not self.is_loaded:
raise RuntimeError("Model not loaded")
batch_size = csi_features.shape[0]
# Generate mock predictions
predictions = []
for i in range(batch_size):
# Simulate 0-2 persons detected
num_persons = np.random.choice([0, 1, 2], p=[0.1, 0.7, 0.2])
frame_predictions = []
for person_id in range(num_persons):
pose = {
"person_id": person_id,
"confidence": np.random.uniform(0.6, 0.95),
"keypoints": self._generate_mock_keypoints(),
"bounding_box": self._generate_mock_bbox()
}
frame_predictions.append(pose)
predictions.append(frame_predictions)
return predictions
def _generate_mock_keypoints(self):
"""Generate mock keypoints."""
keypoints = []
for i in range(17): # COCO format
keypoint = {
"x": np.random.uniform(50, 590),
"y": np.random.uniform(50, 430),
"confidence": np.random.uniform(0.5, 1.0),
"visible": np.random.choice([True, False], p=[0.8, 0.2])
}
keypoints.append(keypoint)
return keypoints
def _generate_mock_bbox(self):
"""Generate mock bounding box."""
x = np.random.uniform(0, 400)
y = np.random.uniform(0, 300)
width = np.random.uniform(50, 200)
height = np.random.uniform(100, 300)
return {"x": x, "y": y, "width": width, "height": height}
API Mocking
# tests/mocks/external_apis.py
import responses
import json
@responses.activate
def test_external_api_integration():
"""Test integration with external APIs using mocked responses."""
# Mock external pose estimation API
responses.add(
responses.POST,
"https://external-api.com/pose/estimate",
json={
"poses": [
{
"id": 1,
"confidence": 0.85,
"keypoints": [...]
}
]
},
status=200
)
# Mock webhook endpoint
responses.add(
responses.POST,
"https://webhook.example.com/alerts",
json={"status": "received"},
status=200
)
# Test code that makes external API calls
# ...
class MockWebhookServer:
"""Mock webhook server for testing notifications."""
def __init__(self):
self.received_webhooks = []
def start(self, port=8080):
"""Start mock webhook server."""
from flask import Flask, request
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def receive_webhook():
data = request.get_json()
self.received_webhooks.append(data)
return {"status": "received"}, 200
app.run(port=port, debug=False)
def get_received_webhooks(self):
"""Get all received webhooks."""
return self.received_webhooks.copy()
def clear_webhooks(self):
"""Clear received webhooks."""
self.received_webhooks.clear()
Continuous Integration
GitHub Actions Configuration
# .github/workflows/test.yml
name: Test Suite
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main, develop ]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8, 3.9, "3.10", "3.11"]
services:
postgres:
image: timescale/timescaledb:latest-pg14
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: test_wifi_densepose
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
redis:
image: redis:7-alpine
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Cache pip dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements*.txt') }}
restore-keys: |
${{ runner.os }}-pip-
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libopencv-dev ffmpeg
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-dev.txt
- name: Lint with flake8
run: |
flake8 src/ tests/ --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 src/ tests/ --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
- name: Type check with mypy
run: |
mypy src/
- name: Test with pytest
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_wifi_densepose
REDIS_URL: redis://localhost:6379/0
SECRET_KEY: test-secret-key
MOCK_HARDWARE: true
run: |
pytest tests/ -v --cov=src --cov-report=xml --cov-report=term-missing
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
flags: unittests
name: codecov-umbrella
performance-test:
runs-on: ubuntu-latest
needs: test
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-dev.txt
- name: Run performance tests
run: |
pytest tests/performance/ -v --benchmark-only --benchmark-json=benchmark.json
- name: Store benchmark result
uses: benchmark-action/github-action-benchmark@v1
with:
tool: 'pytest'
output-file-path: benchmark.json
github-token: ${{ secrets.GITHUB_TOKEN }}
auto-push: true
integration-test:
runs-on: ubuntu-latest
needs: test
steps:
- uses: actions/checkout@v3
- name: Build Docker images
run: |
docker-compose -f docker-compose.test.yml build
- name: Run integration tests
run: |
docker-compose -f docker-compose.test.yml up --abort-on-container-exit
- name: Cleanup
run: |
docker-compose -f docker-compose.test.yml down -v
Pre-commit Configuration
# .pre-commit-config.yaml
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-merge-conflict
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black
language_version: python3
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
- id: flake8
additional_dependencies: [flake8-docstrings]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.3.0
hooks:
- id: mypy
additional_dependencies: [types-all]
- repo: local
hooks:
- id: pytest-check
name: pytest-check
entry: pytest
language: system
pass_filenames: false
always_run: true
args: [tests/unit/, --tb=short]
Test Coverage
Coverage Configuration
# .coveragerc
[run]
source = src/
omit =
src/*/tests/*
src/*/test_*
*/venv/*
*/virtualenv/*
*/.tox/*
*/migrations/*
*/settings/*
[report]
exclude_lines =
pragma: no cover
def __repr__
if self.debug:
if settings.DEBUG
raise AssertionError
raise NotImplementedError
if 0:
if __name__ == .__main__.:
class .*\bProtocol\):
@(abc\.)?abstractmethod
[html]
directory = htmlcov
Coverage Targets
- Overall Coverage: Minimum 80%
- Critical Components: Minimum 90%
- Neural network inference
- CSI processing
- Person tracking
- API endpoints
- New Code: Minimum 95%
Coverage Reporting
# Generate coverage report
pytest --cov=src --cov-report=html --cov-report=term-missing
# View HTML report
open htmlcov/index.html
# Check coverage thresholds
pytest --cov=src --cov-fail-under=80
Testing Best Practices
Test Organization
- One Test Class per Component: Group related tests together
- Descriptive Test Names: Use clear, descriptive test method names
- Arrange-Act-Assert: Structure tests with clear sections
- Test Independence: Each test should be independent and isolated
Test Data Management
- Use Factories: Generate test data with factories instead of hardcoded values
- Realistic Data: Use realistic test data that represents actual usage
- Edge Cases: Test boundary conditions and edge cases
- Error Conditions: Test error handling and exception cases
Performance Considerations
- Fast Unit Tests: Keep unit tests fast (< 1 second each)
- Parallel Execution: Use pytest-xdist for parallel test execution
- Test Categorization: Use markers to categorize slow tests
- Resource Cleanup: Properly clean up resources after tests
Maintenance
- Regular Updates: Keep test dependencies updated
- Flaky Test Detection: Monitor and fix flaky tests
- Test Documentation: Document complex test scenarios
- Refactoring: Refactor tests when production code changes
This testing guide provides a comprehensive framework for ensuring the reliability and quality of the WiFi-DensePose system. Regular testing and continuous improvement of the test suite are essential for maintaining a robust and reliable system.
For more information, see: