diff --git a/v1/src/core/router_interface.py b/v1/src/core/router_interface.py index 18c1e27..f009530 100644 --- a/v1/src/core/router_interface.py +++ b/v1/src/core/router_interface.py @@ -195,11 +195,23 @@ class RouterInterface: return csi_data async def _collect_real_csi_data(self) -> Optional[np.ndarray]: - """Collect real CSI data from router (placeholder implementation).""" - # This would implement the actual CSI data collection - # For now, return None to indicate no real implementation - self.logger.warning("Real CSI data collection not implemented") - return None + """Collect real CSI data from the router. + + Raises: + RuntimeError: Always in the current state, because real CSI + data collection requires hardware setup that has not been + configured. This method must never silently return random + or placeholder data. + """ + raise RuntimeError( + f"Real CSI data collection from router '{self.router_id}' requires " + "hardware setup that is not configured. You must: " + "(1) install CSI-capable firmware (e.g., Atheros CSI Tool, Nexmon CSI) on the router, " + "(2) configure the SSH connection to the router, and " + "(3) implement the CSI extraction command for your specific firmware. " + "For development/testing, use mock_mode=True. " + "See docs/hardware-setup.md for complete setup instructions." + ) async def check_health(self) -> bool: """Check if the router connection is healthy. diff --git a/v1/src/hardware/router_interface.py b/v1/src/hardware/router_interface.py index e20e849..3167204 100644 --- a/v1/src/hardware/router_interface.py +++ b/v1/src/hardware/router_interface.py @@ -197,25 +197,25 @@ class RouterInterface: def _parse_csi_response(self, response: str) -> CSIData: """Parse CSI response data. - + Args: response: Raw response from router - + Returns: Parsed CSI data + + Raises: + RouterConnectionError: Always in current state, because real CSI + parsing from router command output requires hardware-specific + format knowledge that must be implemented per router model. """ - # Mock implementation for testing - # In real implementation, this would parse actual router CSI format - return CSIData( - timestamp=datetime.now(timezone.utc), - amplitude=np.random.rand(3, 56), - phase=np.random.rand(3, 56), - frequency=2.4e9, - bandwidth=20e6, - num_subcarriers=56, - num_antennas=3, - snr=15.0, - metadata={'source': 'router', 'raw_response': response} + raise RouterConnectionError( + "Real CSI data parsing from router responses is not yet implemented. " + "Collecting CSI data from a router requires: " + "(1) a router with CSI-capable firmware (e.g., Atheros CSI Tool, Nexmon), " + "(2) proper hardware setup and configuration, and " + "(3) a parser for the specific binary/text format produced by the firmware. " + "See docs/hardware-setup.md for instructions on configuring your router for CSI collection." ) def _parse_status_response(self, response: str) -> Dict[str, Any]: diff --git a/v1/src/sensing/__init__.py b/v1/src/sensing/__init__.py new file mode 100644 index 0000000..e6f6c33 --- /dev/null +++ b/v1/src/sensing/__init__.py @@ -0,0 +1,56 @@ +""" +Commodity WiFi Sensing Module (ADR-013) +======================================= + +RSSI-based presence and motion detection using standard Linux WiFi metrics. +This module provides real signal processing from commodity WiFi hardware, +extracting presence and motion features from RSSI time series. + +Components: + - rssi_collector: Data collection from Linux WiFi interfaces + - feature_extractor: Time-domain and frequency-domain feature extraction + - classifier: Presence and motion classification from features + - backend: Common sensing backend interface + +Capabilities: + - PRESENCE: Detect whether a person is present in the sensing area + - MOTION: Classify motion level (absent / still / active) + +Note: This module uses RSSI only. For higher-fidelity sensing (respiration, +pose estimation), CSI-capable hardware and the full DensePose pipeline +are required. +""" + +from v1.src.sensing.rssi_collector import ( + LinuxWifiCollector, + SimulatedCollector, + WifiSample, +) +from v1.src.sensing.feature_extractor import ( + RssiFeatureExtractor, + RssiFeatures, +) +from v1.src.sensing.classifier import ( + PresenceClassifier, + SensingResult, + MotionLevel, +) +from v1.src.sensing.backend import ( + SensingBackend, + CommodityBackend, + Capability, +) + +__all__ = [ + "LinuxWifiCollector", + "SimulatedCollector", + "WifiSample", + "RssiFeatureExtractor", + "RssiFeatures", + "PresenceClassifier", + "SensingResult", + "MotionLevel", + "SensingBackend", + "CommodityBackend", + "Capability", +] diff --git a/v1/src/testing/__init__.py b/v1/src/testing/__init__.py new file mode 100644 index 0000000..96de9f7 --- /dev/null +++ b/v1/src/testing/__init__.py @@ -0,0 +1,21 @@ +""" +Testing utilities for WiFi-DensePose. + +This module contains mock data generators and testing helpers that are +ONLY intended for use in development/testing environments. These generators +produce synthetic data that mimics real CSI and pose data patterns. + +WARNING: Code in this module uses random number generation intentionally +for mock/test data. Do NOT import from this module in production code paths +unless behind an explicit mock_mode flag with appropriate logging. +""" + +from .mock_csi_generator import MockCSIGenerator +from .mock_pose_generator import generate_mock_poses, generate_mock_keypoints, generate_mock_bounding_box + +__all__ = [ + "MockCSIGenerator", + "generate_mock_poses", + "generate_mock_keypoints", + "generate_mock_bounding_box", +] diff --git a/v1/src/testing/mock_csi_generator.py b/v1/src/testing/mock_csi_generator.py new file mode 100644 index 0000000..77dc350 --- /dev/null +++ b/v1/src/testing/mock_csi_generator.py @@ -0,0 +1,178 @@ +""" +Mock CSI data generator for testing and development. + +This module provides synthetic CSI (Channel State Information) data generation +for use in development and testing environments ONLY. The generated data mimics +realistic WiFi CSI patterns including multipath effects, human motion signatures, +and noise characteristics. + +WARNING: This module uses np.random intentionally for test data generation. +Do NOT use this module in production data paths. +""" + +import logging +import numpy as np +from typing import Dict, Any, Optional + +logger = logging.getLogger(__name__) + +# Banner displayed when mock mode is active +MOCK_MODE_BANNER = """ +================================================================================ + WARNING: MOCK MODE ACTIVE - Using synthetic CSI data + + All CSI data is randomly generated and does NOT represent real WiFi signals. + For real pose estimation, configure hardware per docs/hardware-setup.md. +================================================================================ +""" + + +class MockCSIGenerator: + """Generator for synthetic CSI data used in testing and development. + + This class produces complex-valued CSI matrices that simulate realistic + WiFi channel characteristics including: + - Per-antenna and per-subcarrier amplitude/phase variation + - Simulated human movement signatures + - Configurable noise levels + - Temporal coherence across consecutive frames + + This is ONLY for testing. Production code must use real hardware data. + """ + + def __init__( + self, + num_subcarriers: int = 64, + num_antennas: int = 4, + num_samples: int = 100, + noise_level: float = 0.1, + movement_freq: float = 0.5, + movement_amplitude: float = 0.3, + ): + """Initialize mock CSI generator. + + Args: + num_subcarriers: Number of OFDM subcarriers to simulate + num_antennas: Number of antenna elements + num_samples: Number of temporal samples per frame + noise_level: Standard deviation of additive Gaussian noise + movement_freq: Frequency of simulated human movement (Hz) + movement_amplitude: Amplitude of movement-induced CSI variation + """ + self.num_subcarriers = num_subcarriers + self.num_antennas = num_antennas + self.num_samples = num_samples + self.noise_level = noise_level + self.movement_freq = movement_freq + self.movement_amplitude = movement_amplitude + + # Internal state for temporal coherence + self._phase = 0.0 + self._frequency = 0.1 + self._amplitude_base = 1.0 + + self._banner_shown = False + + def show_banner(self) -> None: + """Display the mock mode warning banner (once per session).""" + if not self._banner_shown: + logger.warning(MOCK_MODE_BANNER) + self._banner_shown = True + + def generate(self) -> np.ndarray: + """Generate a single frame of mock CSI data. + + Returns: + Complex-valued numpy array of shape + (num_antennas, num_subcarriers, num_samples). + """ + self.show_banner() + + # Advance internal phase for temporal coherence + self._phase += self._frequency + + time_axis = np.linspace(0, 1, self.num_samples) + + csi_data = np.zeros( + (self.num_antennas, self.num_subcarriers, self.num_samples), + dtype=complex, + ) + + for antenna in range(self.num_antennas): + for subcarrier in range(self.num_subcarriers): + # Base amplitude varies with antenna and subcarrier + amplitude = ( + self._amplitude_base + * (1 + 0.2 * np.sin(2 * np.pi * subcarrier / self.num_subcarriers)) + * (1 + 0.1 * antenna) + ) + + # Phase with spatial and frequency variation + phase_offset = ( + self._phase + + 2 * np.pi * subcarrier / self.num_subcarriers + + np.pi * antenna / self.num_antennas + ) + + # Simulated human movement + movement = self.movement_amplitude * np.sin( + 2 * np.pi * self.movement_freq * time_axis + ) + + signal_amplitude = amplitude * (1 + movement) + signal_phase = phase_offset + movement * 0.5 + + # Additive complex Gaussian noise + noise = np.random.normal(0, self.noise_level, self.num_samples) + 1j * np.random.normal( + 0, self.noise_level, self.num_samples + ) + + csi_data[antenna, subcarrier, :] = ( + signal_amplitude * np.exp(1j * signal_phase) + noise + ) + + return csi_data + + def configure(self, config: Dict[str, Any]) -> None: + """Update generator parameters. + + Args: + config: Dictionary with optional keys: + - sampling_rate: Adjusts internal frequency + - noise_level: Sets noise standard deviation + - num_subcarriers: Updates subcarrier count + - num_antennas: Updates antenna count + - movement_freq: Updates simulated movement frequency + - movement_amplitude: Updates movement amplitude + """ + if "sampling_rate" in config: + self._frequency = config["sampling_rate"] / 1000.0 + if "noise_level" in config: + self.noise_level = config["noise_level"] + if "num_subcarriers" in config: + self.num_subcarriers = config["num_subcarriers"] + if "num_antennas" in config: + self.num_antennas = config["num_antennas"] + if "movement_freq" in config: + self.movement_freq = config["movement_freq"] + if "movement_amplitude" in config: + self.movement_amplitude = config["movement_amplitude"] + + def get_router_info(self) -> Dict[str, Any]: + """Return mock router hardware information. + + Returns: + Dictionary mimicking router hardware info for testing. + """ + return { + "model": "Mock Router", + "firmware": "1.0.0-mock", + "wifi_standard": "802.11ac", + "antennas": self.num_antennas, + "supported_bands": ["2.4GHz", "5GHz"], + "csi_capabilities": { + "max_subcarriers": self.num_subcarriers, + "max_antennas": self.num_antennas, + "sampling_rate": 1000, + }, + }