Files
wifi-densepose/v1/tests/fixtures/csi_data.py
Claude 6ed69a3d48 feat: Complete Rust port of WiFi-DensePose with modular crates
Major changes:
- Organized Python v1 implementation into v1/ subdirectory
- Created Rust workspace with 9 modular crates:
  - wifi-densepose-core: Core types, traits, errors
  - wifi-densepose-signal: CSI processing, phase sanitization, FFT
  - wifi-densepose-nn: Neural network inference (ONNX/Candle/tch)
  - wifi-densepose-api: Axum-based REST/WebSocket API
  - wifi-densepose-db: SQLx database layer
  - wifi-densepose-config: Configuration management
  - wifi-densepose-hardware: Hardware abstraction
  - wifi-densepose-wasm: WebAssembly bindings
  - wifi-densepose-cli: Command-line interface

Documentation:
- ADR-001: Workspace structure
- ADR-002: Signal processing library selection
- ADR-003: Neural network inference strategy
- DDD domain model with bounded contexts

Testing:
- 69 tests passing across all crates
- Signal processing: 45 tests
- Neural networks: 21 tests
- Core: 3 doc tests

Performance targets:
- 10x faster CSI processing (~0.5ms vs ~5ms)
- 5x lower memory usage (~100MB vs ~500MB)
- WASM support for browser deployment
2026-01-13 03:11:16 +00:00

487 lines
18 KiB
Python

"""
Test data generation utilities for CSI data.
Provides realistic CSI data samples for testing pose estimation pipeline.
"""
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
import json
import random
class CSIDataGenerator:
"""Generate realistic CSI data for testing."""
def __init__(self,
frequency: float = 5.8e9,
bandwidth: float = 80e6,
num_antennas: int = 4,
num_subcarriers: int = 64):
self.frequency = frequency
self.bandwidth = bandwidth
self.num_antennas = num_antennas
self.num_subcarriers = num_subcarriers
self.sample_rate = 1000 # Hz
self.noise_level = 0.1
# Pre-computed patterns for different scenarios
self._initialize_patterns()
def _initialize_patterns(self):
"""Initialize CSI patterns for different scenarios."""
# Empty room pattern (baseline)
self.empty_room_pattern = {
"amplitude_mean": 0.3,
"amplitude_std": 0.05,
"phase_variance": 0.1,
"temporal_stability": 0.95
}
# Single person patterns
self.single_person_patterns = {
"standing": {
"amplitude_mean": 0.5,
"amplitude_std": 0.08,
"phase_variance": 0.2,
"temporal_stability": 0.85,
"movement_frequency": 0.1
},
"walking": {
"amplitude_mean": 0.6,
"amplitude_std": 0.15,
"phase_variance": 0.4,
"temporal_stability": 0.6,
"movement_frequency": 2.0
},
"sitting": {
"amplitude_mean": 0.4,
"amplitude_std": 0.06,
"phase_variance": 0.15,
"temporal_stability": 0.9,
"movement_frequency": 0.05
},
"fallen": {
"amplitude_mean": 0.35,
"amplitude_std": 0.04,
"phase_variance": 0.08,
"temporal_stability": 0.95,
"movement_frequency": 0.02
}
}
# Multi-person patterns
self.multi_person_patterns = {
2: {"amplitude_multiplier": 1.4, "phase_complexity": 1.6},
3: {"amplitude_multiplier": 1.7, "phase_complexity": 2.1},
4: {"amplitude_multiplier": 2.0, "phase_complexity": 2.8}
}
def generate_empty_room_sample(self, timestamp: Optional[datetime] = None) -> Dict[str, Any]:
"""Generate CSI sample for empty room."""
if timestamp is None:
timestamp = datetime.utcnow()
pattern = self.empty_room_pattern
# Generate amplitude matrix
amplitude = np.random.normal(
pattern["amplitude_mean"],
pattern["amplitude_std"],
(self.num_antennas, self.num_subcarriers)
)
amplitude = np.clip(amplitude, 0, 1)
# Generate phase matrix
phase = np.random.uniform(
-np.pi, np.pi,
(self.num_antennas, self.num_subcarriers)
)
# Add temporal stability
if hasattr(self, '_last_empty_sample'):
stability = pattern["temporal_stability"]
amplitude = stability * self._last_empty_sample["amplitude"] + (1 - stability) * amplitude
phase = stability * self._last_empty_sample["phase"] + (1 - stability) * phase
sample = {
"timestamp": timestamp.isoformat(),
"router_id": "router_001",
"amplitude": amplitude.tolist(),
"phase": phase.tolist(),
"frequency": self.frequency,
"bandwidth": self.bandwidth,
"num_antennas": self.num_antennas,
"num_subcarriers": self.num_subcarriers,
"sample_rate": self.sample_rate,
"scenario": "empty_room",
"signal_quality": np.random.uniform(0.85, 0.95)
}
self._last_empty_sample = {
"amplitude": amplitude,
"phase": phase
}
return sample
def generate_single_person_sample(self,
activity: str = "standing",
timestamp: Optional[datetime] = None) -> Dict[str, Any]:
"""Generate CSI sample for single person activity."""
if timestamp is None:
timestamp = datetime.utcnow()
if activity not in self.single_person_patterns:
raise ValueError(f"Unknown activity: {activity}")
pattern = self.single_person_patterns[activity]
# Generate base amplitude
amplitude = np.random.normal(
pattern["amplitude_mean"],
pattern["amplitude_std"],
(self.num_antennas, self.num_subcarriers)
)
# Add movement-induced variations
movement_freq = pattern["movement_frequency"]
time_factor = timestamp.timestamp()
movement_modulation = 0.1 * np.sin(2 * np.pi * movement_freq * time_factor)
amplitude += movement_modulation
amplitude = np.clip(amplitude, 0, 1)
# Generate phase with activity-specific variance
phase_base = np.random.uniform(-np.pi, np.pi, (self.num_antennas, self.num_subcarriers))
phase_variance = pattern["phase_variance"]
phase_noise = np.random.normal(0, phase_variance, (self.num_antennas, self.num_subcarriers))
phase = phase_base + phase_noise
phase = np.mod(phase + np.pi, 2 * np.pi) - np.pi # Wrap to [-π, π]
# Add temporal correlation
if hasattr(self, f'_last_{activity}_sample'):
stability = pattern["temporal_stability"]
last_sample = getattr(self, f'_last_{activity}_sample')
amplitude = stability * last_sample["amplitude"] + (1 - stability) * amplitude
phase = stability * last_sample["phase"] + (1 - stability) * phase
sample = {
"timestamp": timestamp.isoformat(),
"router_id": "router_001",
"amplitude": amplitude.tolist(),
"phase": phase.tolist(),
"frequency": self.frequency,
"bandwidth": self.bandwidth,
"num_antennas": self.num_antennas,
"num_subcarriers": self.num_subcarriers,
"sample_rate": self.sample_rate,
"scenario": f"single_person_{activity}",
"signal_quality": np.random.uniform(0.7, 0.9),
"activity": activity
}
setattr(self, f'_last_{activity}_sample', {
"amplitude": amplitude,
"phase": phase
})
return sample
def generate_multi_person_sample(self,
num_persons: int = 2,
activities: Optional[List[str]] = None,
timestamp: Optional[datetime] = None) -> Dict[str, Any]:
"""Generate CSI sample for multiple persons."""
if timestamp is None:
timestamp = datetime.utcnow()
if num_persons < 2 or num_persons > 4:
raise ValueError("Number of persons must be between 2 and 4")
if activities is None:
activities = random.choices(list(self.single_person_patterns.keys()), k=num_persons)
if len(activities) != num_persons:
raise ValueError("Number of activities must match number of persons")
# Start with empty room baseline
amplitude = np.random.normal(
self.empty_room_pattern["amplitude_mean"],
self.empty_room_pattern["amplitude_std"],
(self.num_antennas, self.num_subcarriers)
)
phase = np.random.uniform(
-np.pi, np.pi,
(self.num_antennas, self.num_subcarriers)
)
# Add contribution from each person
for i, activity in enumerate(activities):
person_pattern = self.single_person_patterns[activity]
# Generate person-specific contribution
person_amplitude = np.random.normal(
person_pattern["amplitude_mean"] * 0.7, # Reduced for multi-person
person_pattern["amplitude_std"],
(self.num_antennas, self.num_subcarriers)
)
# Add spatial variation (different persons at different locations)
spatial_offset = i * self.num_subcarriers // num_persons
person_amplitude = np.roll(person_amplitude, spatial_offset, axis=1)
# Add movement modulation
movement_freq = person_pattern["movement_frequency"]
time_factor = timestamp.timestamp() + i * 0.5 # Phase offset between persons
movement_modulation = 0.05 * np.sin(2 * np.pi * movement_freq * time_factor)
person_amplitude += movement_modulation
amplitude += person_amplitude
# Add phase contribution
person_phase = np.random.normal(0, person_pattern["phase_variance"],
(self.num_antennas, self.num_subcarriers))
person_phase = np.roll(person_phase, spatial_offset, axis=1)
phase += person_phase
# Apply multi-person complexity
pattern = self.multi_person_patterns[num_persons]
amplitude *= pattern["amplitude_multiplier"]
phase *= pattern["phase_complexity"]
# Clip and normalize
amplitude = np.clip(amplitude, 0, 1)
phase = np.mod(phase + np.pi, 2 * np.pi) - np.pi
sample = {
"timestamp": timestamp.isoformat(),
"router_id": "router_001",
"amplitude": amplitude.tolist(),
"phase": phase.tolist(),
"frequency": self.frequency,
"bandwidth": self.bandwidth,
"num_antennas": self.num_antennas,
"num_subcarriers": self.num_subcarriers,
"sample_rate": self.sample_rate,
"scenario": f"multi_person_{num_persons}",
"signal_quality": np.random.uniform(0.6, 0.8),
"num_persons": num_persons,
"activities": activities
}
return sample
def generate_time_series(self,
duration_seconds: int = 10,
scenario: str = "single_person_walking",
**kwargs) -> List[Dict[str, Any]]:
"""Generate time series of CSI samples."""
samples = []
start_time = datetime.utcnow()
for i in range(duration_seconds * self.sample_rate):
timestamp = start_time + timedelta(seconds=i / self.sample_rate)
if scenario == "empty_room":
sample = self.generate_empty_room_sample(timestamp)
elif scenario.startswith("single_person_"):
activity = scenario.replace("single_person_", "")
sample = self.generate_single_person_sample(activity, timestamp)
elif scenario.startswith("multi_person_"):
num_persons = int(scenario.split("_")[-1])
sample = self.generate_multi_person_sample(num_persons, timestamp=timestamp, **kwargs)
else:
raise ValueError(f"Unknown scenario: {scenario}")
samples.append(sample)
return samples
def add_noise(self, sample: Dict[str, Any], noise_level: Optional[float] = None) -> Dict[str, Any]:
"""Add noise to CSI sample."""
if noise_level is None:
noise_level = self.noise_level
noisy_sample = sample.copy()
# Add amplitude noise
amplitude = np.array(sample["amplitude"])
amplitude_noise = np.random.normal(0, noise_level, amplitude.shape)
noisy_amplitude = amplitude + amplitude_noise
noisy_amplitude = np.clip(noisy_amplitude, 0, 1)
noisy_sample["amplitude"] = noisy_amplitude.tolist()
# Add phase noise
phase = np.array(sample["phase"])
phase_noise = np.random.normal(0, noise_level * np.pi, phase.shape)
noisy_phase = phase + phase_noise
noisy_phase = np.mod(noisy_phase + np.pi, 2 * np.pi) - np.pi
noisy_sample["phase"] = noisy_phase.tolist()
# Reduce signal quality
noisy_sample["signal_quality"] *= (1 - noise_level)
return noisy_sample
def simulate_hardware_artifacts(self, sample: Dict[str, Any]) -> Dict[str, Any]:
"""Simulate hardware-specific artifacts."""
artifact_sample = sample.copy()
amplitude = np.array(sample["amplitude"])
phase = np.array(sample["phase"])
# Simulate antenna coupling
coupling_matrix = np.random.uniform(0.95, 1.05, (self.num_antennas, self.num_antennas))
amplitude = coupling_matrix @ amplitude
# Simulate frequency-dependent gain variations
freq_response = 1 + 0.1 * np.sin(np.linspace(0, 2*np.pi, self.num_subcarriers))
amplitude *= freq_response[np.newaxis, :]
# Simulate phase drift
phase_drift = np.random.uniform(-0.1, 0.1) * np.arange(self.num_subcarriers)
phase += phase_drift[np.newaxis, :]
# Clip and wrap
amplitude = np.clip(amplitude, 0, 1)
phase = np.mod(phase + np.pi, 2 * np.pi) - np.pi
artifact_sample["amplitude"] = amplitude.tolist()
artifact_sample["phase"] = phase.tolist()
return artifact_sample
# Convenience functions for common test scenarios
def generate_fall_detection_sequence() -> List[Dict[str, Any]]:
"""Generate CSI sequence showing fall detection scenario."""
generator = CSIDataGenerator()
sequence = []
# Normal standing (5 seconds)
sequence.extend(generator.generate_time_series(5, "single_person_standing"))
# Walking (3 seconds)
sequence.extend(generator.generate_time_series(3, "single_person_walking"))
# Fall event (1 second transition)
sequence.extend(generator.generate_time_series(1, "single_person_fallen"))
# Fallen state (3 seconds)
sequence.extend(generator.generate_time_series(3, "single_person_fallen"))
return sequence
def generate_multi_person_scenario() -> List[Dict[str, Any]]:
"""Generate CSI sequence for multi-person scenario."""
generator = CSIDataGenerator()
sequence = []
# Start with empty room
sequence.extend(generator.generate_time_series(2, "empty_room"))
# One person enters
sequence.extend(generator.generate_time_series(3, "single_person_walking"))
# Second person enters
sequence.extend(generator.generate_time_series(5, "multi_person_2",
activities=["standing", "walking"]))
# Third person enters
sequence.extend(generator.generate_time_series(4, "multi_person_3",
activities=["standing", "walking", "sitting"]))
return sequence
def generate_noisy_environment_data() -> List[Dict[str, Any]]:
"""Generate CSI data with various noise levels."""
generator = CSIDataGenerator()
# Generate clean data
clean_samples = generator.generate_time_series(5, "single_person_walking")
# Add different noise levels
noisy_samples = []
noise_levels = [0.05, 0.1, 0.2, 0.3]
for noise_level in noise_levels:
for sample in clean_samples[:10]: # Take first 10 samples
noisy_sample = generator.add_noise(sample, noise_level)
noisy_samples.append(noisy_sample)
return noisy_samples
def generate_hardware_test_data() -> List[Dict[str, Any]]:
"""Generate CSI data with hardware artifacts."""
generator = CSIDataGenerator()
# Generate base samples
base_samples = generator.generate_time_series(3, "single_person_standing")
# Add hardware artifacts
artifact_samples = []
for sample in base_samples:
artifact_sample = generator.simulate_hardware_artifacts(sample)
artifact_samples.append(artifact_sample)
return artifact_samples
# Test data validation utilities
def validate_csi_sample(sample: Dict[str, Any]) -> bool:
"""Validate CSI sample structure and data ranges."""
required_fields = [
"timestamp", "router_id", "amplitude", "phase",
"frequency", "bandwidth", "num_antennas", "num_subcarriers"
]
# Check required fields
for field in required_fields:
if field not in sample:
return False
# Validate data types and ranges
amplitude = np.array(sample["amplitude"])
phase = np.array(sample["phase"])
# Check shapes
expected_shape = (sample["num_antennas"], sample["num_subcarriers"])
if amplitude.shape != expected_shape or phase.shape != expected_shape:
return False
# Check value ranges
if not (0 <= amplitude.min() and amplitude.max() <= 1):
return False
if not (-np.pi <= phase.min() and phase.max() <= np.pi):
return False
return True
def extract_features_from_csi(sample: Dict[str, Any]) -> Dict[str, Any]:
"""Extract features from CSI sample for testing."""
amplitude = np.array(sample["amplitude"])
phase = np.array(sample["phase"])
features = {
"amplitude_mean": float(np.mean(amplitude)),
"amplitude_std": float(np.std(amplitude)),
"amplitude_max": float(np.max(amplitude)),
"amplitude_min": float(np.min(amplitude)),
"phase_variance": float(np.var(phase)),
"phase_range": float(np.max(phase) - np.min(phase)),
"signal_energy": float(np.sum(amplitude ** 2)),
"phase_coherence": float(np.abs(np.mean(np.exp(1j * phase)))),
"spatial_correlation": float(np.mean(np.corrcoef(amplitude))),
"frequency_diversity": float(np.std(np.mean(amplitude, axis=0)))
}
return features