feat: Add commodity sensing, proof bundle, Three.js viz, mock isolation

Commodity Sensing Module (ADR-013):
- sensing/rssi_collector.py: Real Linux WiFi RSSI collection from
  /proc/net/wireless and iw commands, with SimulatedCollector for testing
- sensing/feature_extractor.py: FFT-based spectral analysis, CUSUM
  change-point detection, breathing/motion band power extraction
- sensing/classifier.py: Rule-based presence/motion classification
  with confidence scoring and multi-receiver agreement
- sensing/backend.py: Common SensingBackend protocol with honest
  capability reporting (PRESENCE + MOTION only for commodity)

Proof of Reality Bundle (ADR-011):
- data/proof/generate_reference_signal.py: Deterministic synthetic CSI
  with known breathing (0.3 Hz) and walking (1.2 Hz) signals
- data/proof/sample_csi_data.json: Generated reference signal
- data/proof/verify.py: One-command pipeline verification with SHA-256
- data/proof/expected_features.sha256: Expected output hash

Three.js Visualization:
- ui/components/scene.js: 3D scene setup with OrbitControls

Mock Isolation:
- testing/mock_pose_generator.py: Mock pose generation moved out of
  production pose_service.py
- services/pose_service.py: Cleaned mock paths

https://claude.ai/code/session_01Ki7pvEZtJDvqJkmyn6B714
This commit is contained in:
Claude
2026-02-28 06:18:58 +00:00
parent e3f0c7a3fa
commit 2199174cac
12 changed files with 358561 additions and 184 deletions

196
ui/components/scene.js Normal file
View File

@@ -0,0 +1,196 @@
// Three.js Scene Setup - WiFi DensePose 3D Visualization
// Camera, lights, renderer, OrbitControls
export class Scene {
constructor(container) {
this.container = typeof container === 'string'
? document.getElementById(container)
: container;
if (!this.container) {
throw new Error('Scene container element not found');
}
this.scene = null;
this.camera = null;
this.renderer = null;
this.controls = null;
this.clock = null;
this.animationId = null;
this.updateCallbacks = [];
this.isRunning = false;
this._init();
}
_init() {
const width = this.container.clientWidth || 960;
const height = this.container.clientHeight || 640;
// Scene
this.scene = new THREE.Scene();
this.scene.background = new THREE.Color(0x0a0a1a);
this.scene.fog = new THREE.FogExp2(0x0a0a1a, 0.008);
// Camera - positioned to see the room from a 3/4 angle
this.camera = new THREE.PerspectiveCamera(55, width / height, 0.1, 500);
this.camera.position.set(8, 7, 10);
this.camera.lookAt(0, 1.5, 0);
// Renderer
this.renderer = new THREE.WebGLRenderer({
antialias: true,
alpha: false,
powerPreference: 'high-performance'
});
this.renderer.setSize(width, height);
this.renderer.setPixelRatio(Math.min(window.devicePixelRatio, 2));
this.renderer.shadowMap.enabled = true;
this.renderer.shadowMap.type = THREE.PCFSoftShadowMap;
this.renderer.toneMapping = THREE.ACESFilmicToneMapping;
this.renderer.toneMappingExposure = 1.0;
this.container.appendChild(this.renderer.domElement);
// OrbitControls
this.controls = new THREE.OrbitControls(this.camera, this.renderer.domElement);
this.controls.enableDamping = true;
this.controls.dampingFactor = 0.08;
this.controls.minDistance = 3;
this.controls.maxDistance = 30;
this.controls.maxPolarAngle = Math.PI * 0.85;
this.controls.target.set(0, 1.2, 0);
this.controls.update();
// Lights
this._setupLights();
// Clock for animation delta
this.clock = new THREE.Clock();
// Handle resize
this._resizeObserver = new ResizeObserver(() => this._onResize());
this._resizeObserver.observe(this.container);
window.addEventListener('resize', () => this._onResize());
}
_setupLights() {
// Ambient light - subtle blue tint for tech feel
const ambient = new THREE.AmbientLight(0x223355, 0.4);
this.scene.add(ambient);
// Hemisphere light - sky/ground gradient
const hemi = new THREE.HemisphereLight(0x4488cc, 0x112233, 0.5);
hemi.position.set(0, 20, 0);
this.scene.add(hemi);
// Key light - warm directional light from above-right
const keyLight = new THREE.DirectionalLight(0xffeedd, 0.8);
keyLight.position.set(5, 10, 5);
keyLight.castShadow = true;
keyLight.shadow.mapSize.width = 1024;
keyLight.shadow.mapSize.height = 1024;
keyLight.shadow.camera.near = 0.5;
keyLight.shadow.camera.far = 30;
keyLight.shadow.camera.left = -10;
keyLight.shadow.camera.right = 10;
keyLight.shadow.camera.top = 10;
keyLight.shadow.camera.bottom = -10;
this.scene.add(keyLight);
// Fill light - cool from left
const fillLight = new THREE.DirectionalLight(0x88aaff, 0.3);
fillLight.position.set(-5, 6, -3);
this.scene.add(fillLight);
// Point light under the body for a soft uplight glow
const uplight = new THREE.PointLight(0x0066ff, 0.4, 8);
uplight.position.set(0, 0.1, 0);
this.scene.add(uplight);
}
// Register a callback that runs each frame with (deltaTime, elapsedTime)
onUpdate(callback) {
this.updateCallbacks.push(callback);
return () => {
const idx = this.updateCallbacks.indexOf(callback);
if (idx !== -1) this.updateCallbacks.splice(idx, 1);
};
}
start() {
if (this.isRunning) return;
this.isRunning = true;
this.clock.start();
this._animate();
}
stop() {
this.isRunning = false;
if (this.animationId !== null) {
cancelAnimationFrame(this.animationId);
this.animationId = null;
}
}
_animate() {
if (!this.isRunning) return;
this.animationId = requestAnimationFrame(() => this._animate());
const delta = this.clock.getDelta();
const elapsed = this.clock.getElapsedTime();
// Run registered update callbacks
for (const cb of this.updateCallbacks) {
cb(delta, elapsed);
}
this.controls.update();
this.renderer.render(this.scene, this.camera);
}
_onResize() {
const width = this.container.clientWidth;
const height = this.container.clientHeight;
if (width === 0 || height === 0) return;
this.camera.aspect = width / height;
this.camera.updateProjectionMatrix();
this.renderer.setSize(width, height);
}
// Add an object to the scene
add(object) {
this.scene.add(object);
}
// Remove an object from the scene
remove(object) {
this.scene.remove(object);
}
// Get the Three.js scene, camera, renderer for external access
getScene() { return this.scene; }
getCamera() { return this.camera; }
getRenderer() { return this.renderer; }
// Reset camera to default position
resetCamera() {
this.camera.position.set(8, 7, 10);
this.controls.target.set(0, 1.2, 0);
this.controls.update();
}
dispose() {
this.stop();
if (this._resizeObserver) {
this._resizeObserver.disconnect();
}
window.removeEventListener('resize', this._onResize);
this.controls.dispose();
this.renderer.dispose();
if (this.renderer.domElement.parentNode) {
this.renderer.domElement.parentNode.removeChild(this.renderer.domElement);
}
this.updateCallbacks = [];
}
}

View File

@@ -0,0 +1 @@
7b9ed15a01a2ae49cb32c5a1bb7e41361e0c83d9216f092efe3a3e279c7731ba

View File

@@ -0,0 +1,324 @@
#!/usr/bin/env python3
"""
Deterministic Reference CSI Signal Generator for WiFi-DensePose Proof Bundle.
This script generates a SYNTHETIC, DETERMINISTIC CSI (Channel State Information)
reference signal for pipeline verification. It is NOT a real WiFi capture.
The signal models a 3-antenna, 56-subcarrier WiFi system with:
- Human breathing modulation at 0.3 Hz
- Walking motion modulation at 1.2 Hz
- Structured (deterministic) multipath propagation with known delays
- 10 seconds of data at 100 Hz sampling rate (1000 frames total)
Generation Formula
==================
For each frame t (t = 0..999) at time s = t / 100.0:
CSI[antenna_a, subcarrier_k] = sum over P paths of:
A_p * exp(j * (2*pi*f_k*tau_p + phi_p,a))
* (1 + alpha_breathe * sin(2*pi * 0.3 * s + psi_breathe_a))
* (1 + alpha_walk * sin(2*pi * 1.2 * s + psi_walk_a))
Where:
- f_k = center_freq + (k - 28) * subcarrier_spacing [subcarrier frequency]
- tau_p = deterministic path delay for path p
- A_p = deterministic path amplitude for path p
- phi_p,a = deterministic phase offset per path per antenna
- alpha_breathe = 0.02 (breathing modulation depth)
- alpha_walk = 0.08 (walking modulation depth)
- psi_breathe_a, psi_walk_a = deterministic per-antenna phase offsets
All parameters are computed from numpy with seed=42. No randomness is used
at generation time -- the seed is used ONLY to select fixed parameter values
once, which are then documented in the metadata file.
Output:
- sample_csi_data.json: All 1000 CSI frames with amplitude and phase arrays
- sample_csi_meta.json: Complete parameter documentation
Author: WiFi-DensePose Project (synthetic test data)
"""
import json
import os
import sys
import numpy as np
def generate_deterministic_parameters():
"""Generate all fixed parameters using seed=42.
These parameters define the multipath channel model and human motion
modulation. Once generated, they are constants -- no further randomness
is used.
Returns:
dict: All channel and motion parameters.
"""
rng = np.random.RandomState(42)
# System parameters (fixed by design, not random)
num_antennas = 3
num_subcarriers = 56
sampling_rate_hz = 100
duration_s = 10.0
center_freq_hz = 5.21e9 # WiFi 5 GHz channel 42
subcarrier_spacing_hz = 312.5e3 # Standard 802.11n/ac
# Multipath channel: 5 deterministic paths
num_paths = 5
# Path delays in nanoseconds (typical indoor)
path_delays_ns = np.array([0.0, 15.0, 42.0, 78.0, 120.0])
# Path amplitudes (linear scale, decreasing with delay)
path_amplitudes = np.array([1.0, 0.6, 0.35, 0.18, 0.08])
# Phase offsets per path per antenna (from seed=42, then fixed)
path_phase_offsets = rng.uniform(-np.pi, np.pi, size=(num_paths, num_antennas))
# Human motion modulation parameters
breathing_freq_hz = 0.3
walking_freq_hz = 1.2
breathing_depth = 0.02 # 2% amplitude modulation
walking_depth = 0.08 # 8% amplitude modulation
# Per-antenna phase offsets for motion signals (from seed=42, then fixed)
breathing_phase_offsets = rng.uniform(0, 2 * np.pi, size=num_antennas)
walking_phase_offsets = rng.uniform(0, 2 * np.pi, size=num_antennas)
return {
"num_antennas": num_antennas,
"num_subcarriers": num_subcarriers,
"sampling_rate_hz": sampling_rate_hz,
"duration_s": duration_s,
"center_freq_hz": center_freq_hz,
"subcarrier_spacing_hz": subcarrier_spacing_hz,
"num_paths": num_paths,
"path_delays_ns": path_delays_ns,
"path_amplitudes": path_amplitudes,
"path_phase_offsets": path_phase_offsets,
"breathing_freq_hz": breathing_freq_hz,
"walking_freq_hz": walking_freq_hz,
"breathing_depth": breathing_depth,
"walking_depth": walking_depth,
"breathing_phase_offsets": breathing_phase_offsets,
"walking_phase_offsets": walking_phase_offsets,
}
def generate_csi_frames(params):
"""Generate all CSI frames deterministically from the given parameters.
Args:
params: Dictionary of channel/motion parameters.
Returns:
list: List of dicts, each containing amplitude and phase arrays
for one frame, plus timestamp.
"""
num_antennas = params["num_antennas"]
num_subcarriers = params["num_subcarriers"]
sampling_rate = params["sampling_rate_hz"]
duration = params["duration_s"]
center_freq = params["center_freq_hz"]
subcarrier_spacing = params["subcarrier_spacing_hz"]
num_paths = params["num_paths"]
path_delays_ns = params["path_delays_ns"]
path_amplitudes = params["path_amplitudes"]
path_phase_offsets = params["path_phase_offsets"]
breathing_freq = params["breathing_freq_hz"]
walking_freq = params["walking_freq_hz"]
breathing_depth = params["breathing_depth"]
walking_depth = params["walking_depth"]
breathing_phase = params["breathing_phase_offsets"]
walking_phase = params["walking_phase_offsets"]
num_frames = int(duration * sampling_rate)
# Precompute subcarrier frequencies relative to center
k_indices = np.arange(num_subcarriers) - num_subcarriers // 2
subcarrier_freqs = center_freq + k_indices * subcarrier_spacing
# Convert path delays to seconds
path_delays_s = path_delays_ns * 1e-9
frames = []
for frame_idx in range(num_frames):
t = frame_idx / sampling_rate
# Build complex CSI matrix: (num_antennas, num_subcarriers)
csi_complex = np.zeros((num_antennas, num_subcarriers), dtype=complex)
for a in range(num_antennas):
# Human motion modulation for this antenna at this time
breathing_mod = 1.0 + breathing_depth * np.sin(
2.0 * np.pi * breathing_freq * t + breathing_phase[a]
)
walking_mod = 1.0 + walking_depth * np.sin(
2.0 * np.pi * walking_freq * t + walking_phase[a]
)
motion_factor = breathing_mod * walking_mod
for p in range(num_paths):
# Phase shift from path delay across subcarriers
phase_from_delay = 2.0 * np.pi * subcarrier_freqs * path_delays_s[p]
# Add per-path per-antenna offset
total_phase = phase_from_delay + path_phase_offsets[p, a]
# Accumulate path contribution
csi_complex[a, :] += (
path_amplitudes[p] * motion_factor * np.exp(1j * total_phase)
)
amplitude = np.abs(csi_complex)
phase = np.angle(csi_complex) # in [-pi, pi]
frames.append({
"frame_index": frame_idx,
"timestamp_s": round(t, 4),
"amplitude": amplitude.tolist(),
"phase": phase.tolist(),
})
return frames
def save_data(frames, params, output_dir):
"""Save CSI frames and metadata to JSON files.
Args:
frames: List of CSI frame dicts.
params: Generation parameters.
output_dir: Directory to write output files.
"""
# Save CSI data
csi_data = {
"description": (
"SYNTHETIC deterministic CSI reference signal for pipeline verification. "
"This is NOT a real WiFi capture. Generated mathematically with known "
"parameters for reproducibility testing."
),
"generator": "generate_reference_signal.py",
"generator_version": "1.0.0",
"numpy_seed": 42,
"num_frames": len(frames),
"num_antennas": params["num_antennas"],
"num_subcarriers": params["num_subcarriers"],
"sampling_rate_hz": params["sampling_rate_hz"],
"frequency_hz": params["center_freq_hz"],
"bandwidth_hz": params["subcarrier_spacing_hz"] * params["num_subcarriers"],
"frames": frames,
}
data_path = os.path.join(output_dir, "sample_csi_data.json")
with open(data_path, "w") as f:
json.dump(csi_data, f, indent=2)
print(f"Wrote {len(frames)} frames to {data_path}")
# Save metadata
meta = {
"description": (
"Metadata for the SYNTHETIC deterministic CSI reference signal. "
"Documents all generation parameters so the signal can be independently "
"reproduced and verified."
),
"is_synthetic": True,
"is_real_capture": False,
"generator_script": "generate_reference_signal.py",
"numpy_seed": 42,
"system_parameters": {
"num_antennas": params["num_antennas"],
"num_subcarriers": params["num_subcarriers"],
"sampling_rate_hz": params["sampling_rate_hz"],
"duration_s": params["duration_s"],
"center_frequency_hz": params["center_freq_hz"],
"subcarrier_spacing_hz": params["subcarrier_spacing_hz"],
"total_frames": int(params["duration_s"] * params["sampling_rate_hz"]),
},
"multipath_channel": {
"num_paths": params["num_paths"],
"path_delays_ns": params["path_delays_ns"].tolist(),
"path_amplitudes": params["path_amplitudes"].tolist(),
"path_phase_offsets_rad": params["path_phase_offsets"].tolist(),
"description": (
"5-path indoor multipath model with deterministic delays and "
"amplitudes. Path amplitudes decrease with delay (typical indoor)."
),
},
"human_motion_signals": {
"breathing": {
"frequency_hz": params["breathing_freq_hz"],
"modulation_depth": params["breathing_depth"],
"per_antenna_phase_offsets_rad": params["breathing_phase_offsets"].tolist(),
"description": (
"Sinusoidal amplitude modulation at 0.3 Hz modeling human "
"breathing (typical adult resting rate: 12-20 breaths/min = 0.2-0.33 Hz)."
),
},
"walking": {
"frequency_hz": params["walking_freq_hz"],
"modulation_depth": params["walking_depth"],
"per_antenna_phase_offsets_rad": params["walking_phase_offsets"].tolist(),
"description": (
"Sinusoidal amplitude modulation at 1.2 Hz modeling human "
"walking motion (typical stride rate: ~1.0-1.4 Hz)."
),
},
},
"generation_formula": (
"CSI[a,k,t] = sum_p { A_p * exp(j*(2*pi*f_k*tau_p + phi_{p,a})) "
"* (1 + d_breathe * sin(2*pi*0.3*t + psi_breathe_a)) "
"* (1 + d_walk * sin(2*pi*1.2*t + psi_walk_a)) }"
),
"determinism_guarantee": (
"All parameters are derived from numpy.random.RandomState(42) at "
"script initialization. The generation loop itself uses NO randomness. "
"Running this script on any platform with the same numpy version will "
"produce bit-identical output."
),
}
meta_path = os.path.join(output_dir, "sample_csi_meta.json")
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
print(f"Wrote metadata to {meta_path}")
def main():
"""Main entry point."""
# Determine output directory
output_dir = os.path.dirname(os.path.abspath(__file__))
print("=" * 70)
print("WiFi-DensePose: Deterministic Reference CSI Signal Generator")
print("=" * 70)
print(f"Output directory: {output_dir}")
print()
# Step 1: Generate deterministic parameters
print("[1/3] Generating deterministic channel parameters (seed=42)...")
params = generate_deterministic_parameters()
print(f" - {params['num_paths']} multipath paths")
print(f" - {params['num_antennas']} antennas, {params['num_subcarriers']} subcarriers")
print(f" - Breathing: {params['breathing_freq_hz']} Hz, depth={params['breathing_depth']}")
print(f" - Walking: {params['walking_freq_hz']} Hz, depth={params['walking_depth']}")
print()
# Step 2: Generate all frames
num_frames = int(params["duration_s"] * params["sampling_rate_hz"])
print(f"[2/3] Generating {num_frames} CSI frames...")
print(f" - Duration: {params['duration_s']}s at {params['sampling_rate_hz']} Hz")
frames = generate_csi_frames(params)
print(f" - Generated {len(frames)} frames")
print()
# Step 3: Save output
print("[3/3] Saving output files...")
save_data(frames, params, output_dir)
print()
print("Done. Reference signal generated successfully.")
print("=" * 70)
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,85 @@
{
"description": "Metadata for the SYNTHETIC deterministic CSI reference signal. Documents all generation parameters so the signal can be independently reproduced and verified.",
"is_synthetic": true,
"is_real_capture": false,
"generator_script": "generate_reference_signal.py",
"numpy_seed": 42,
"system_parameters": {
"num_antennas": 3,
"num_subcarriers": 56,
"sampling_rate_hz": 100,
"duration_s": 10.0,
"center_frequency_hz": 5210000000.0,
"subcarrier_spacing_hz": 312500.0,
"total_frames": 1000
},
"multipath_channel": {
"num_paths": 5,
"path_delays_ns": [
0.0,
15.0,
42.0,
78.0,
120.0
],
"path_amplitudes": [
1.0,
0.6,
0.35,
0.18,
0.08
],
"path_phase_offsets_rad": [
[
-0.788287681898749,
2.8319215077704234,
1.4576609265440963
],
[
0.6198895383354297,
-2.1612986243157413,
-2.1614501754128375
],
[
-2.776642555026645,
2.3007525789727232,
0.6353243561202211
],
[
1.3073585636350948,
-3.012256461474685,
2.952530678803174
],
[
2.088798716157191,
-1.8074266732364683,
-1.9991526911557285
]
],
"description": "5-path indoor multipath model with deterministic delays and amplitudes. Path amplitudes decrease with delay (typical indoor)."
},
"human_motion_signals": {
"breathing": {
"frequency_hz": 0.3,
"modulation_depth": 0.02,
"per_antenna_phase_offsets_rad": [
1.152364521581569,
1.9116103907867292,
3.297141901079666
],
"description": "Sinusoidal amplitude modulation at 0.3 Hz modeling human breathing (typical adult resting rate: 12-20 breaths/min = 0.2-0.33 Hz)."
},
"walking": {
"frequency_hz": 1.2,
"modulation_depth": 0.08,
"per_antenna_phase_offsets_rad": [
2.713990594641554,
1.8298466547148808,
3.844385118274953
],
"description": "Sinusoidal amplitude modulation at 1.2 Hz modeling human walking motion (typical stride rate: ~1.0-1.4 Hz)."
}
},
"generation_formula": "CSI[a,k,t] = sum_p { A_p * exp(j*(2*pi*f_k*tau_p + phi_{p,a})) * (1 + d_breathe * sin(2*pi*0.3*t + psi_breathe_a)) * (1 + d_walk * sin(2*pi*1.2*t + psi_walk_a)) }",
"determinism_guarantee": "All parameters are derived from numpy.random.RandomState(42) at script initialization. The generation loop itself uses NO randomness. Running this script on any platform with the same numpy version will produce bit-identical output."
}

263
v1/data/proof/verify.py Normal file
View File

@@ -0,0 +1,263 @@
#!/usr/bin/env python3
"""
Proof-of-Reality Verification Script for WiFi-DensePose Pipeline.
This script verifies that the signal processing pipeline produces
DETERMINISTIC, REPRODUCIBLE output from a known reference signal.
Steps:
1. Load the synthetic reference CSI signal from sample_csi_data.json
2. Feed each frame through the actual CSI processor feature extraction
3. Collect all feature outputs into a canonical byte representation
4. Compute SHA-256 hash of the full feature output
5. Compare against the expected hash in expected_features.sha256
6. Print PASS or FAIL
The reference signal is SYNTHETIC (generated by generate_reference_signal.py)
and is used purely for pipeline determinism verification.
Usage:
python verify.py # Run verification against stored hash
python verify.py --generate-hash # Generate and print the expected hash
"""
import hashlib
import json
import os
import struct
import sys
import argparse
from datetime import datetime, timezone
import numpy as np
# Add the v1 directory to sys.path so we can import the actual modules
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
V1_DIR = os.path.abspath(os.path.join(SCRIPT_DIR, "..", "..")) # v1/data/proof -> v1/
if V1_DIR not in sys.path:
sys.path.insert(0, V1_DIR)
# Import the actual pipeline modules
from src.hardware.csi_extractor import CSIData
from src.core.csi_processor import CSIProcessor, CSIFeatures
# -- Configuration for the CSI processor (matches production defaults) --
PROCESSOR_CONFIG = {
"sampling_rate": 100,
"window_size": 56,
"overlap": 0.5,
"noise_threshold": -60,
"human_detection_threshold": 0.8,
"smoothing_factor": 0.9,
"max_history_size": 500,
"enable_preprocessing": True,
"enable_feature_extraction": True,
"enable_human_detection": True,
}
# Number of frames to process for the feature hash
# We process a representative subset to keep verification fast while
# still covering temporal dynamics (Doppler requires history)
VERIFICATION_FRAME_COUNT = 100 # First 100 frames = 1 second
def load_reference_signal(data_path):
"""Load the reference CSI signal from JSON.
Args:
data_path: Path to sample_csi_data.json.
Returns:
dict: Parsed JSON data.
Raises:
FileNotFoundError: If the data file doesn't exist.
json.JSONDecodeError: If the data is malformed.
"""
with open(data_path, "r") as f:
data = json.load(f)
return data
def frame_to_csi_data(frame, signal_meta):
"""Convert a JSON frame dict into a CSIData dataclass instance.
Args:
frame: Dict with 'amplitude', 'phase', 'timestamp_s', 'frame_index'.
signal_meta: Top-level signal metadata (num_antennas, frequency, etc).
Returns:
CSIData instance.
"""
amplitude = np.array(frame["amplitude"], dtype=np.float64)
phase = np.array(frame["phase"], dtype=np.float64)
timestamp = datetime.fromtimestamp(frame["timestamp_s"], tz=timezone.utc)
return CSIData(
timestamp=timestamp,
amplitude=amplitude,
phase=phase,
frequency=signal_meta["frequency_hz"],
bandwidth=signal_meta["bandwidth_hz"],
num_subcarriers=signal_meta["num_subcarriers"],
num_antennas=signal_meta["num_antennas"],
snr=15.0, # Fixed SNR for synthetic signal
metadata={
"source": "synthetic_reference",
"frame_index": frame["frame_index"],
},
)
def features_to_bytes(features):
"""Convert CSIFeatures to a deterministic byte representation.
We serialize each numpy array to bytes in a canonical order
using little-endian float64 representation. This ensures the
hash is platform-independent for IEEE 754 compliant systems.
Args:
features: CSIFeatures instance.
Returns:
bytes: Canonical byte representation.
"""
parts = []
# Serialize each feature array in declaration order
for array in [
features.amplitude_mean,
features.amplitude_variance,
features.phase_difference,
features.correlation_matrix,
features.doppler_shift,
features.power_spectral_density,
]:
flat = np.asarray(array, dtype=np.float64).ravel()
# Pack as little-endian double (8 bytes each)
parts.append(struct.pack(f"<{len(flat)}d", *flat))
return b"".join(parts)
def compute_pipeline_hash(data_path):
"""Run the full pipeline and compute the SHA-256 hash of all features.
Args:
data_path: Path to sample_csi_data.json.
Returns:
str: Hex-encoded SHA-256 hash of the feature output.
"""
# Load reference signal
signal_data = load_reference_signal(data_path)
frames = signal_data["frames"][:VERIFICATION_FRAME_COUNT]
# Create processor
processor = CSIProcessor(PROCESSOR_CONFIG)
# Process all frames and accumulate feature bytes
hasher = hashlib.sha256()
features_count = 0
for frame in frames:
csi_data = frame_to_csi_data(frame, signal_data)
# Run through the actual pipeline: preprocess -> extract features
preprocessed = processor.preprocess_csi_data(csi_data)
features = processor.extract_features(preprocessed)
if features is not None:
feature_bytes = features_to_bytes(features)
hasher.update(feature_bytes)
features_count += 1
# Add to history for Doppler computation in subsequent frames
processor.add_to_history(csi_data)
print(f" Processed {features_count} frames through pipeline")
return hasher.hexdigest()
def main():
"""Main verification entry point."""
parser = argparse.ArgumentParser(
description="WiFi-DensePose pipeline verification"
)
parser.add_argument(
"--generate-hash",
action="store_true",
help="Generate and print the expected hash (do not verify)",
)
args = parser.parse_args()
print("=" * 70)
print("WiFi-DensePose: Pipeline Verification")
print("=" * 70)
print()
# Locate data file
data_path = os.path.join(SCRIPT_DIR, "sample_csi_data.json")
hash_path = os.path.join(SCRIPT_DIR, "expected_features.sha256")
if not os.path.exists(data_path):
print(f"FAIL: Reference data not found at {data_path}")
print(" Run generate_reference_signal.py first.")
sys.exit(1)
# Compute hash
print("[1/2] Processing reference signal through pipeline...")
computed_hash = compute_pipeline_hash(data_path)
print(f" SHA-256: {computed_hash}")
print()
if args.generate_hash:
# Write the hash file
with open(hash_path, "w") as f:
f.write(computed_hash + "\n")
print(f"[2/2] Wrote expected hash to {hash_path}")
print()
print("HASH GENERATED - run without --generate-hash to verify")
print("=" * 70)
return
# Verify against expected hash
print("[2/2] Verifying against expected hash...")
if not os.path.exists(hash_path):
print(f" WARNING: No expected hash file at {hash_path}")
print(f" Computed hash: {computed_hash}")
print()
print(" Run with --generate-hash to create the expected hash file.")
print()
print("SKIP (no expected hash to compare against)")
print("=" * 70)
sys.exit(2)
with open(hash_path, "r") as f:
expected_hash = f.read().strip()
print(f" Expected: {expected_hash}")
print(f" Computed: {computed_hash}")
print()
if computed_hash == expected_hash:
print("PASS - Pipeline output is deterministic and matches expected hash.")
print("=" * 70)
sys.exit(0)
else:
print("FAIL - Pipeline output does NOT match expected hash.")
print()
print("Possible causes:")
print(" - Numpy/scipy version mismatch (check requirements-lock.txt)")
print(" - Code change in CSI processor that alters numerical output")
print(" - Platform floating-point differences (unlikely for IEEE 754)")
print()
print("To update the expected hash after intentional changes:")
print(" python verify.py --generate-hash")
print("=" * 70)
sys.exit(1)
if __name__ == "__main__":
main()

164
v1/src/sensing/backend.py Normal file
View File

@@ -0,0 +1,164 @@
"""
Common sensing backend interface.
Defines the ``SensingBackend`` protocol and the ``CommodityBackend`` concrete
implementation that wires together the RSSI collector, feature extractor, and
classifier into a single coherent pipeline.
The ``Capability`` enum enumerates all possible sensing capabilities. The
``CommodityBackend`` honestly reports that it supports only PRESENCE and MOTION.
"""
from __future__ import annotations
import logging
from enum import Enum, auto
from typing import List, Optional, Protocol, Set, runtime_checkable
from v1.src.sensing.classifier import MotionLevel, PresenceClassifier, SensingResult
from v1.src.sensing.feature_extractor import RssiFeatureExtractor, RssiFeatures
from v1.src.sensing.rssi_collector import (
LinuxWifiCollector,
SimulatedCollector,
WifiCollector,
WifiSample,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Capability enum
# ---------------------------------------------------------------------------
class Capability(Enum):
"""All possible sensing capabilities across backend tiers."""
PRESENCE = auto()
MOTION = auto()
RESPIRATION = auto()
LOCATION = auto()
POSE = auto()
# ---------------------------------------------------------------------------
# Backend protocol
# ---------------------------------------------------------------------------
@runtime_checkable
class SensingBackend(Protocol):
"""Protocol that all sensing backends must implement."""
def get_features(self) -> RssiFeatures:
"""Extract current features from the sensing pipeline."""
...
def get_capabilities(self) -> Set[Capability]:
"""Return the set of capabilities this backend supports."""
...
# ---------------------------------------------------------------------------
# Commodity backend
# ---------------------------------------------------------------------------
class CommodityBackend:
"""
RSSI-based commodity sensing backend.
Wires together:
- A WiFi collector (real or simulated)
- An RSSI feature extractor
- A presence/motion classifier
Capabilities: PRESENCE and MOTION only.
Parameters
----------
collector : WifiCollector-compatible object
The data source (LinuxWifiCollector or SimulatedCollector).
extractor : RssiFeatureExtractor, optional
Feature extractor (created with defaults if not provided).
classifier : PresenceClassifier, optional
Classifier (created with defaults if not provided).
"""
SUPPORTED_CAPABILITIES: Set[Capability] = frozenset(
{Capability.PRESENCE, Capability.MOTION}
)
def __init__(
self,
collector: LinuxWifiCollector | SimulatedCollector,
extractor: Optional[RssiFeatureExtractor] = None,
classifier: Optional[PresenceClassifier] = None,
) -> None:
self._collector = collector
self._extractor = extractor or RssiFeatureExtractor()
self._classifier = classifier or PresenceClassifier()
@property
def collector(self) -> LinuxWifiCollector | SimulatedCollector:
return self._collector
@property
def extractor(self) -> RssiFeatureExtractor:
return self._extractor
@property
def classifier(self) -> PresenceClassifier:
return self._classifier
# -- SensingBackend protocol ---------------------------------------------
def get_features(self) -> RssiFeatures:
"""
Get current features from the latest collected samples.
Uses the extractor's window_seconds to determine how many samples
to pull from the collector's ring buffer.
"""
window = self._extractor.window_seconds
sample_rate = self._collector.sample_rate_hz
n_needed = int(window * sample_rate)
samples = self._collector.get_samples(n=n_needed)
return self._extractor.extract(samples)
def get_capabilities(self) -> Set[Capability]:
"""CommodityBackend supports PRESENCE and MOTION only."""
return set(self.SUPPORTED_CAPABILITIES)
# -- convenience methods -------------------------------------------------
def get_result(self) -> SensingResult:
"""
Run the full pipeline: collect -> extract -> classify.
Returns
-------
SensingResult
Classification result with motion level and confidence.
"""
features = self.get_features()
return self._classifier.classify(features)
def start(self) -> None:
"""Start the underlying collector."""
self._collector.start()
logger.info(
"CommodityBackend started (capabilities: %s)",
", ".join(c.name for c in self.SUPPORTED_CAPABILITIES),
)
def stop(self) -> None:
"""Stop the underlying collector."""
self._collector.stop()
logger.info("CommodityBackend stopped")
def is_capable(self, capability: Capability) -> bool:
"""Check whether this backend supports a specific capability."""
return capability in self.SUPPORTED_CAPABILITIES
def __repr__(self) -> str:
caps = ", ".join(c.name for c in sorted(self.SUPPORTED_CAPABILITIES, key=lambda c: c.value))
return f"CommodityBackend(capabilities=[{caps}])"

View File

@@ -0,0 +1,201 @@
"""
Presence and motion classification from RSSI features.
Uses rule-based logic with configurable thresholds to classify the current
sensing state into one of three motion levels:
ABSENT -- no person detected
PRESENT_STILL -- person present but stationary
ACTIVE -- person present and moving
Confidence is derived from spectral feature strength and optional
cross-receiver agreement.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional
from v1.src.sensing.feature_extractor import RssiFeatures
logger = logging.getLogger(__name__)
class MotionLevel(Enum):
"""Classified motion state."""
ABSENT = "absent"
PRESENT_STILL = "present_still"
ACTIVE = "active"
@dataclass
class SensingResult:
"""Output of the presence/motion classifier."""
motion_level: MotionLevel
confidence: float # 0.0 to 1.0
presence_detected: bool
rssi_variance: float
motion_band_energy: float
breathing_band_energy: float
n_change_points: int
details: str = ""
class PresenceClassifier:
"""
Rule-based presence and motion classifier.
Classification rules
--------------------
1. **Presence**: RSSI variance exceeds ``presence_variance_threshold``.
2. **Motion level**:
- ABSENT if variance < presence threshold
- ACTIVE if variance >= presence threshold AND motion band energy
exceeds ``motion_energy_threshold``
- PRESENT_STILL otherwise (variance above threshold but low motion energy)
Confidence model
----------------
Base confidence comes from how far the measured variance / energy exceeds
the respective thresholds. Cross-receiver agreement (when multiple
receivers report results) can boost confidence further.
Parameters
----------
presence_variance_threshold : float
Minimum RSSI variance (dBm^2) to declare presence (default 0.5).
motion_energy_threshold : float
Minimum motion-band spectral energy to classify as ACTIVE (default 0.1).
max_receivers : int
Maximum number of receivers for cross-receiver agreement (default 1).
"""
def __init__(
self,
presence_variance_threshold: float = 0.5,
motion_energy_threshold: float = 0.1,
max_receivers: int = 1,
) -> None:
self._var_thresh = presence_variance_threshold
self._motion_thresh = motion_energy_threshold
self._max_receivers = max_receivers
@property
def presence_variance_threshold(self) -> float:
return self._var_thresh
@property
def motion_energy_threshold(self) -> float:
return self._motion_thresh
def classify(
self,
features: RssiFeatures,
other_receiver_results: Optional[List[SensingResult]] = None,
) -> SensingResult:
"""
Classify presence and motion from extracted RSSI features.
Parameters
----------
features : RssiFeatures
Features extracted from the RSSI time series of one receiver.
other_receiver_results : list of SensingResult, optional
Results from other receivers for cross-receiver agreement.
Returns
-------
SensingResult
"""
variance = features.variance
motion_energy = features.motion_band_power
breathing_energy = features.breathing_band_power
# -- presence decision ------------------------------------------------
presence = variance >= self._var_thresh
# -- motion level -----------------------------------------------------
if not presence:
level = MotionLevel.ABSENT
elif motion_energy >= self._motion_thresh:
level = MotionLevel.ACTIVE
else:
level = MotionLevel.PRESENT_STILL
# -- confidence -------------------------------------------------------
confidence = self._compute_confidence(
variance, motion_energy, breathing_energy, level, other_receiver_results
)
# -- detail string ----------------------------------------------------
details = (
f"var={variance:.4f} (thresh={self._var_thresh}), "
f"motion_energy={motion_energy:.4f} (thresh={self._motion_thresh}), "
f"breathing_energy={breathing_energy:.4f}, "
f"change_points={features.n_change_points}"
)
return SensingResult(
motion_level=level,
confidence=confidence,
presence_detected=presence,
rssi_variance=variance,
motion_band_energy=motion_energy,
breathing_band_energy=breathing_energy,
n_change_points=features.n_change_points,
details=details,
)
def _compute_confidence(
self,
variance: float,
motion_energy: float,
breathing_energy: float,
level: MotionLevel,
other_results: Optional[List[SensingResult]],
) -> float:
"""
Compute a confidence score in [0, 1].
The score is composed of:
- Base (60%): how clearly the variance exceeds (or falls below) the
presence threshold.
- Spectral (20%): strength of the relevant spectral band.
- Agreement (20%): cross-receiver consensus (if available).
"""
# -- base confidence (0..1) ------------------------------------------
if level == MotionLevel.ABSENT:
# Confidence in absence increases as variance shrinks relative to threshold
if self._var_thresh > 0:
base = max(0.0, 1.0 - variance / self._var_thresh)
else:
base = 1.0
else:
# Confidence in presence increases as variance exceeds threshold
ratio = variance / self._var_thresh if self._var_thresh > 0 else 10.0
base = min(1.0, ratio)
# -- spectral confidence (0..1) --------------------------------------
if level == MotionLevel.ACTIVE:
spectral = min(1.0, motion_energy / max(self._motion_thresh, 1e-12))
elif level == MotionLevel.PRESENT_STILL:
# For still, breathing band energy is more relevant
spectral = min(1.0, breathing_energy / max(self._motion_thresh, 1e-12))
else:
spectral = 1.0 # No spectral requirement for absence
# -- cross-receiver agreement (0..1) ---------------------------------
agreement = 1.0 # default: single receiver
if other_results:
same_level = sum(
1 for r in other_results if r.motion_level == level
)
agreement = (same_level + 1) / (len(other_results) + 1)
# Weighted combination
confidence = 0.6 * base + 0.2 * spectral + 0.2 * agreement
return max(0.0, min(1.0, confidence))

View File

@@ -0,0 +1,312 @@
"""
Signal feature extraction from RSSI time series.
Extracts both time-domain statistical features and frequency-domain spectral
features using real mathematics (scipy.fft, scipy.stats). Also implements
CUSUM change-point detection for abrupt RSSI transitions.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
import numpy as np
from numpy.typing import NDArray
from scipy import fft as scipy_fft
from scipy import stats as scipy_stats
from v1.src.sensing.rssi_collector import WifiSample
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Feature dataclass
# ---------------------------------------------------------------------------
@dataclass
class RssiFeatures:
"""Container for all extracted RSSI features."""
# -- time-domain --------------------------------------------------------
mean: float = 0.0
variance: float = 0.0
std: float = 0.0
skewness: float = 0.0
kurtosis: float = 0.0
range: float = 0.0
iqr: float = 0.0 # inter-quartile range
# -- frequency-domain ---------------------------------------------------
dominant_freq_hz: float = 0.0
breathing_band_power: float = 0.0 # 0.1 - 0.5 Hz
motion_band_power: float = 0.0 # 0.5 - 3.0 Hz
total_spectral_power: float = 0.0
# -- change-point -------------------------------------------------------
change_points: List[int] = field(default_factory=list)
n_change_points: int = 0
# -- metadata -----------------------------------------------------------
n_samples: int = 0
duration_seconds: float = 0.0
sample_rate_hz: float = 0.0
# ---------------------------------------------------------------------------
# Feature extractor
# ---------------------------------------------------------------------------
class RssiFeatureExtractor:
"""
Extract time-domain and frequency-domain features from an RSSI time series.
Parameters
----------
window_seconds : float
Length of the analysis window in seconds (default 30).
cusum_threshold : float
CUSUM threshold for change-point detection (default 3.0 standard deviations
of the signal).
cusum_drift : float
CUSUM drift allowance (default 0.5 standard deviations).
"""
def __init__(
self,
window_seconds: float = 30.0,
cusum_threshold: float = 3.0,
cusum_drift: float = 0.5,
) -> None:
self._window_seconds = window_seconds
self._cusum_threshold = cusum_threshold
self._cusum_drift = cusum_drift
@property
def window_seconds(self) -> float:
return self._window_seconds
def extract(self, samples: List[WifiSample]) -> RssiFeatures:
"""
Extract features from a list of WifiSample objects.
Only the most recent ``window_seconds`` of data are used.
At least 4 samples are required for meaningful features.
"""
if len(samples) < 4:
logger.warning(
"Not enough samples for feature extraction (%d < 4)", len(samples)
)
return RssiFeatures(n_samples=len(samples))
# Trim to window
samples = self._trim_to_window(samples)
rssi = np.array([s.rssi_dbm for s in samples], dtype=np.float64)
timestamps = np.array([s.timestamp for s in samples], dtype=np.float64)
# Estimate sample rate from actual timestamps
dt = np.diff(timestamps)
if len(dt) == 0 or np.mean(dt) <= 0:
sample_rate = 10.0 # fallback
else:
sample_rate = 1.0 / np.mean(dt)
duration = timestamps[-1] - timestamps[0] if len(timestamps) > 1 else 0.0
# Build features
features = RssiFeatures(
n_samples=len(rssi),
duration_seconds=float(duration),
sample_rate_hz=float(sample_rate),
)
self._compute_time_domain(rssi, features)
self._compute_frequency_domain(rssi, sample_rate, features)
self._compute_change_points(rssi, features)
return features
def extract_from_array(
self, rssi: NDArray[np.float64], sample_rate_hz: float
) -> RssiFeatures:
"""
Extract features directly from a numpy array (useful for testing).
Parameters
----------
rssi : ndarray
1-D array of RSSI values in dBm.
sample_rate_hz : float
Sampling rate in Hz.
"""
if len(rssi) < 4:
return RssiFeatures(n_samples=len(rssi))
duration = len(rssi) / sample_rate_hz
features = RssiFeatures(
n_samples=len(rssi),
duration_seconds=float(duration),
sample_rate_hz=float(sample_rate_hz),
)
self._compute_time_domain(rssi, features)
self._compute_frequency_domain(rssi, sample_rate_hz, features)
self._compute_change_points(rssi, features)
return features
# -- time-domain ---------------------------------------------------------
@staticmethod
def _compute_time_domain(rssi: NDArray[np.float64], features: RssiFeatures) -> None:
features.mean = float(np.mean(rssi))
features.variance = float(np.var(rssi, ddof=1)) if len(rssi) > 1 else 0.0
features.std = float(np.std(rssi, ddof=1)) if len(rssi) > 1 else 0.0
features.skewness = float(scipy_stats.skew(rssi, bias=False)) if len(rssi) > 2 else 0.0
features.kurtosis = float(scipy_stats.kurtosis(rssi, bias=False)) if len(rssi) > 3 else 0.0
features.range = float(np.ptp(rssi))
q75, q25 = np.percentile(rssi, [75, 25])
features.iqr = float(q75 - q25)
# -- frequency-domain ----------------------------------------------------
@staticmethod
def _compute_frequency_domain(
rssi: NDArray[np.float64],
sample_rate: float,
features: RssiFeatures,
) -> None:
"""Compute one-sided FFT power spectrum and extract band powers."""
n = len(rssi)
if n < 4:
return
# Remove DC (subtract mean)
signal = rssi - np.mean(rssi)
# Apply Hann window to reduce spectral leakage
window = np.hanning(n)
windowed = signal * window
# Compute real FFT
fft_vals = scipy_fft.rfft(windowed)
freqs = scipy_fft.rfftfreq(n, d=1.0 / sample_rate)
# Power spectral density (magnitude squared, normalised by N)
psd = (np.abs(fft_vals) ** 2) / n
# Skip DC component (index 0)
if len(freqs) > 1:
freqs_no_dc = freqs[1:]
psd_no_dc = psd[1:]
else:
return
# Total spectral power
features.total_spectral_power = float(np.sum(psd_no_dc))
# Dominant frequency
if len(psd_no_dc) > 0:
peak_idx = int(np.argmax(psd_no_dc))
features.dominant_freq_hz = float(freqs_no_dc[peak_idx])
# Band powers
features.breathing_band_power = float(
_band_power(freqs_no_dc, psd_no_dc, 0.1, 0.5)
)
features.motion_band_power = float(
_band_power(freqs_no_dc, psd_no_dc, 0.5, 3.0)
)
# -- change-point detection (CUSUM) --------------------------------------
def _compute_change_points(
self, rssi: NDArray[np.float64], features: RssiFeatures
) -> None:
"""
Detect change points using the CUSUM algorithm.
The CUSUM statistic tracks cumulative deviations from the mean,
flagging points where the signal mean shifts abruptly.
"""
if len(rssi) < 4:
return
mean_val = np.mean(rssi)
std_val = np.std(rssi, ddof=1)
if std_val < 1e-12:
features.change_points = []
features.n_change_points = 0
return
threshold = self._cusum_threshold * std_val
drift = self._cusum_drift * std_val
change_points = cusum_detect(rssi, mean_val, threshold, drift)
features.change_points = change_points
features.n_change_points = len(change_points)
# ---------------------------------------------------------------------------
# Helper functions
# ---------------------------------------------------------------------------
def _band_power(
freqs: NDArray[np.float64],
psd: NDArray[np.float64],
low_hz: float,
high_hz: float,
) -> float:
"""Sum PSD within a frequency band [low_hz, high_hz]."""
mask = (freqs >= low_hz) & (freqs <= high_hz)
return float(np.sum(psd[mask]))
def cusum_detect(
signal: NDArray[np.float64],
target: float,
threshold: float,
drift: float,
) -> List[int]:
"""
CUSUM (cumulative sum) change-point detection.
Detects both upward and downward shifts in the signal mean.
Parameters
----------
signal : ndarray
The 1-D signal to analyse.
target : float
Expected mean of the signal.
threshold : float
Decision threshold for declaring a change point.
drift : float
Allowable drift before accumulating deviation.
Returns
-------
list of int
Indices where change points were detected.
"""
n = len(signal)
s_pos = 0.0
s_neg = 0.0
change_points: List[int] = []
for i in range(n):
deviation = signal[i] - target
s_pos = max(0.0, s_pos + deviation - drift)
s_neg = max(0.0, s_neg - deviation - drift)
if s_pos > threshold or s_neg > threshold:
change_points.append(i)
# Reset after detection to find subsequent changes
s_pos = 0.0
s_neg = 0.0
return change_points

View File

@@ -0,0 +1,446 @@
"""
RSSI data collection from Linux WiFi interfaces.
Provides two concrete collectors:
- LinuxWifiCollector: reads real RSSI from /proc/net/wireless and iw commands
- SimulatedCollector: produces deterministic synthetic signals for testing
Both share the same WifiSample dataclass and thread-safe ring buffer.
"""
from __future__ import annotations
import logging
import math
import re
import subprocess
import threading
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Deque, List, Optional, Protocol
import numpy as np
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class WifiSample:
"""A single WiFi measurement sample."""
timestamp: float # UNIX epoch seconds (time.time())
rssi_dbm: float # Received signal strength in dBm
noise_dbm: float # Noise floor in dBm
link_quality: float # Link quality 0-1 (normalised)
tx_bytes: int # Cumulative TX bytes
rx_bytes: int # Cumulative RX bytes
retry_count: int # Cumulative retry count
interface: str # WiFi interface name
# ---------------------------------------------------------------------------
# Thread-safe ring buffer
# ---------------------------------------------------------------------------
class RingBuffer:
"""Thread-safe fixed-size ring buffer for WifiSample objects."""
def __init__(self, max_size: int) -> None:
self._buf: Deque[WifiSample] = deque(maxlen=max_size)
self._lock = threading.Lock()
def append(self, sample: WifiSample) -> None:
with self._lock:
self._buf.append(sample)
def get_all(self) -> List[WifiSample]:
"""Return a snapshot of all samples (oldest first)."""
with self._lock:
return list(self._buf)
def get_last_n(self, n: int) -> List[WifiSample]:
"""Return the most recent *n* samples."""
with self._lock:
items = list(self._buf)
return items[-n:] if n < len(items) else items
def __len__(self) -> int:
with self._lock:
return len(self._buf)
def clear(self) -> None:
with self._lock:
self._buf.clear()
# ---------------------------------------------------------------------------
# Collector protocol
# ---------------------------------------------------------------------------
class WifiCollector(Protocol):
"""Protocol that all WiFi collectors must satisfy."""
def start(self) -> None: ...
def stop(self) -> None: ...
def get_samples(self, n: Optional[int] = None) -> List[WifiSample]: ...
@property
def sample_rate_hz(self) -> float: ...
# ---------------------------------------------------------------------------
# Linux WiFi collector (real hardware)
# ---------------------------------------------------------------------------
class LinuxWifiCollector:
"""
Collects real RSSI data from a Linux WiFi interface.
Data sources:
- /proc/net/wireless (RSSI, noise, link quality)
- iw dev <iface> station dump (TX/RX bytes, retry count)
Parameters
----------
interface : str
WiFi interface name, e.g. ``"wlan0"``.
sample_rate_hz : float
Target sampling rate in Hz (default 10).
buffer_seconds : int
How many seconds of history to keep in the ring buffer (default 120).
"""
def __init__(
self,
interface: str = "wlan0",
sample_rate_hz: float = 10.0,
buffer_seconds: int = 120,
) -> None:
self._interface = interface
self._rate = sample_rate_hz
self._buffer = RingBuffer(max_size=int(sample_rate_hz * buffer_seconds))
self._running = False
self._thread: Optional[threading.Thread] = None
# -- public API ----------------------------------------------------------
@property
def sample_rate_hz(self) -> float:
return self._rate
def start(self) -> None:
"""Start the background sampling thread."""
if self._running:
return
self._validate_interface()
self._running = True
self._thread = threading.Thread(
target=self._sample_loop, daemon=True, name="wifi-rssi-collector"
)
self._thread.start()
logger.info(
"LinuxWifiCollector started on %s at %.1f Hz",
self._interface,
self._rate,
)
def stop(self) -> None:
"""Stop the background sampling thread."""
self._running = False
if self._thread is not None:
self._thread.join(timeout=2.0)
self._thread = None
logger.info("LinuxWifiCollector stopped")
def get_samples(self, n: Optional[int] = None) -> List[WifiSample]:
"""
Return collected samples.
Parameters
----------
n : int or None
If given, return only the most recent *n* samples.
"""
if n is not None:
return self._buffer.get_last_n(n)
return self._buffer.get_all()
def collect_once(self) -> WifiSample:
"""Collect a single sample right now (blocking)."""
return self._read_sample()
# -- internals -----------------------------------------------------------
def _validate_interface(self) -> None:
"""Check that the interface exists on this machine."""
try:
with open("/proc/net/wireless", "r") as f:
content = f.read()
if self._interface not in content:
raise RuntimeError(
f"WiFi interface '{self._interface}' not found in "
f"/proc/net/wireless. Available interfaces may include: "
f"{self._parse_interface_names(content)}. "
f"Ensure the interface is up and associated with an AP."
)
except FileNotFoundError:
raise RuntimeError(
"Cannot read /proc/net/wireless. "
"This collector requires a Linux system with wireless-extensions support. "
"If running in a container or VM without WiFi hardware, use "
"SimulatedCollector instead."
)
@staticmethod
def _parse_interface_names(proc_content: str) -> List[str]:
"""Extract interface names from /proc/net/wireless content."""
names: List[str] = []
for line in proc_content.splitlines()[2:]: # skip header lines
parts = line.split(":")
if len(parts) >= 2:
names.append(parts[0].strip())
return names
def _sample_loop(self) -> None:
interval = 1.0 / self._rate
while self._running:
t0 = time.monotonic()
try:
sample = self._read_sample()
self._buffer.append(sample)
except Exception:
logger.exception("Error reading WiFi sample")
elapsed = time.monotonic() - t0
sleep_time = max(0.0, interval - elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
def _read_sample(self) -> WifiSample:
"""Read one sample from the OS."""
rssi, noise, quality = self._read_proc_wireless()
tx_bytes, rx_bytes, retries = self._read_iw_station()
return WifiSample(
timestamp=time.time(),
rssi_dbm=rssi,
noise_dbm=noise,
link_quality=quality,
tx_bytes=tx_bytes,
rx_bytes=rx_bytes,
retry_count=retries,
interface=self._interface,
)
def _read_proc_wireless(self) -> tuple[float, float, float]:
"""Parse /proc/net/wireless for the configured interface."""
try:
with open("/proc/net/wireless", "r") as f:
for line in f:
if self._interface in line:
# Format: iface: status quality signal noise ...
parts = line.split()
# parts[0] = "wlan0:", parts[2]=quality, parts[3]=signal, parts[4]=noise
quality_raw = float(parts[2].rstrip("."))
signal_raw = float(parts[3].rstrip("."))
noise_raw = float(parts[4].rstrip("."))
# Normalise quality to 0..1 (max is typically 70)
quality = min(1.0, max(0.0, quality_raw / 70.0))
return signal_raw, noise_raw, quality
except (FileNotFoundError, IndexError, ValueError) as exc:
raise RuntimeError(
f"Failed to read /proc/net/wireless for {self._interface}: {exc}"
) from exc
raise RuntimeError(
f"Interface {self._interface} not found in /proc/net/wireless"
)
def _read_iw_station(self) -> tuple[int, int, int]:
"""Run ``iw dev <iface> station dump`` and parse TX/RX/retries."""
try:
result = subprocess.run(
["iw", "dev", self._interface, "station", "dump"],
capture_output=True,
text=True,
timeout=2.0,
)
text = result.stdout
tx_bytes = self._extract_int(text, r"tx bytes:\s*(\d+)")
rx_bytes = self._extract_int(text, r"rx bytes:\s*(\d+)")
retries = self._extract_int(text, r"tx retries:\s*(\d+)")
return tx_bytes, rx_bytes, retries
except (FileNotFoundError, subprocess.TimeoutExpired):
# iw not installed or timed out -- degrade gracefully
return 0, 0, 0
@staticmethod
def _extract_int(text: str, pattern: str) -> int:
m = re.search(pattern, text)
return int(m.group(1)) if m else 0
# ---------------------------------------------------------------------------
# Simulated collector (deterministic, for testing)
# ---------------------------------------------------------------------------
class SimulatedCollector:
"""
Deterministic simulated WiFi collector for testing.
Generates a synthetic RSSI signal composed of:
- A constant baseline (-50 dBm default)
- An optional sinusoidal component (configurable frequency/amplitude)
- Optional step-change injection (for change-point testing)
- Deterministic noise from a seeded PRNG
This is explicitly a test/development tool and makes no attempt to
appear as real hardware.
Parameters
----------
seed : int
Random seed for deterministic output.
sample_rate_hz : float
Target sampling rate in Hz (default 10).
buffer_seconds : int
Ring buffer capacity in seconds (default 120).
baseline_dbm : float
RSSI baseline in dBm (default -50).
sine_freq_hz : float
Frequency of the sinusoidal RSSI component (default 0.3 Hz, breathing band).
sine_amplitude_dbm : float
Amplitude of the sinusoidal component (default 2.0 dBm).
noise_std_dbm : float
Standard deviation of additive Gaussian noise (default 0.5 dBm).
step_change_at : float or None
If set, inject a step change of ``step_change_dbm`` at this time offset
(seconds from start).
step_change_dbm : float
Magnitude of the step change (default -10 dBm).
"""
def __init__(
self,
seed: int = 42,
sample_rate_hz: float = 10.0,
buffer_seconds: int = 120,
baseline_dbm: float = -50.0,
sine_freq_hz: float = 0.3,
sine_amplitude_dbm: float = 2.0,
noise_std_dbm: float = 0.5,
step_change_at: Optional[float] = None,
step_change_dbm: float = -10.0,
) -> None:
self._rate = sample_rate_hz
self._buffer = RingBuffer(max_size=int(sample_rate_hz * buffer_seconds))
self._rng = np.random.default_rng(seed)
self._baseline = baseline_dbm
self._sine_freq = sine_freq_hz
self._sine_amp = sine_amplitude_dbm
self._noise_std = noise_std_dbm
self._step_at = step_change_at
self._step_dbm = step_change_dbm
self._running = False
self._thread: Optional[threading.Thread] = None
self._start_time: float = 0.0
self._sample_index: int = 0
# -- public API ----------------------------------------------------------
@property
def sample_rate_hz(self) -> float:
return self._rate
def start(self) -> None:
if self._running:
return
self._running = True
self._start_time = time.time()
self._sample_index = 0
self._thread = threading.Thread(
target=self._sample_loop, daemon=True, name="sim-rssi-collector"
)
self._thread.start()
logger.info("SimulatedCollector started at %.1f Hz (seed reused from init)", self._rate)
def stop(self) -> None:
self._running = False
if self._thread is not None:
self._thread.join(timeout=2.0)
self._thread = None
def get_samples(self, n: Optional[int] = None) -> List[WifiSample]:
if n is not None:
return self._buffer.get_last_n(n)
return self._buffer.get_all()
def generate_samples(self, duration_seconds: float) -> List[WifiSample]:
"""
Generate a batch of samples without the background thread.
Useful for unit tests that need a known signal without timing jitter.
Parameters
----------
duration_seconds : float
How many seconds of signal to produce.
Returns
-------
list of WifiSample
"""
n_samples = int(duration_seconds * self._rate)
samples: List[WifiSample] = []
base_time = time.time()
for i in range(n_samples):
t = i / self._rate
sample = self._make_sample(base_time + t, t, i)
samples.append(sample)
return samples
# -- internals -----------------------------------------------------------
def _sample_loop(self) -> None:
interval = 1.0 / self._rate
while self._running:
t0 = time.monotonic()
now = time.time()
t_offset = now - self._start_time
sample = self._make_sample(now, t_offset, self._sample_index)
self._buffer.append(sample)
self._sample_index += 1
elapsed = time.monotonic() - t0
sleep_time = max(0.0, interval - elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
def _make_sample(self, timestamp: float, t_offset: float, index: int) -> WifiSample:
"""Build one deterministic sample."""
# Sinusoidal component
sine = self._sine_amp * math.sin(2.0 * math.pi * self._sine_freq * t_offset)
# Deterministic Gaussian noise (uses the seeded RNG)
noise = self._rng.normal(0.0, self._noise_std)
# Step change
step = 0.0
if self._step_at is not None and t_offset >= self._step_at:
step = self._step_dbm
rssi = self._baseline + sine + noise + step
return WifiSample(
timestamp=timestamp,
rssi_dbm=float(rssi),
noise_dbm=-95.0,
link_quality=max(0.0, min(1.0, (rssi + 100.0) / 60.0)),
tx_bytes=index * 1500,
rx_bytes=index * 3000,
retry_count=max(0, index // 100),
interface="sim0",
)

View File

@@ -1,5 +1,9 @@
"""
Pose estimation service for WiFi-DensePose API
Pose estimation service for WiFi-DensePose API.
Production paths in this module must NEVER use random data generation.
All mock/synthetic data generation is isolated in src.testing and is only
invoked when settings.mock_pose_data is explicitly True.
"""
import logging
@@ -266,97 +270,153 @@ class PoseService:
return []
def _parse_pose_outputs(self, outputs: torch.Tensor) -> List[Dict[str, Any]]:
"""Parse neural network outputs into pose detections."""
"""Parse neural network outputs into pose detections.
Extracts confidence, keypoints, bounding boxes, and activity from model
output tensors. The exact interpretation depends on the model architecture;
this implementation assumes the DensePoseHead output format.
Args:
outputs: Model output tensor of shape (batch, features).
Returns:
List of pose detection dictionaries.
"""
poses = []
# This is a simplified parsing - in reality, this would depend on the model architecture
# For now, generate mock poses based on the output shape
batch_size = outputs.shape[0]
for i in range(batch_size):
# Extract pose information (mock implementation)
confidence = float(torch.sigmoid(outputs[i, 0]).item()) if outputs.shape[1] > 0 else 0.5
output_i = outputs[i] if len(outputs.shape) > 1 else outputs
# Extract confidence from first output channel
confidence = float(torch.sigmoid(output_i[0]).item()) if output_i.shape[0] > 0 else 0.0
# Extract keypoints from model output if available
keypoints = self._extract_keypoints_from_output(output_i)
# Extract bounding box from model output if available
bounding_box = self._extract_bbox_from_output(output_i)
# Classify activity from features
activity = self._classify_activity(output_i)
pose = {
"person_id": i,
"confidence": confidence,
"keypoints": self._generate_keypoints(),
"bounding_box": self._generate_bounding_box(),
"activity": self._classify_activity(outputs[i] if len(outputs.shape) > 1 else outputs),
"timestamp": datetime.now().isoformat()
"keypoints": keypoints,
"bounding_box": bounding_box,
"activity": activity,
"timestamp": datetime.now().isoformat(),
}
poses.append(pose)
return poses
def _generate_mock_poses(self) -> List[Dict[str, Any]]:
"""Generate mock pose data for development."""
import random
num_persons = random.randint(1, min(3, self.settings.pose_max_persons))
poses = []
for i in range(num_persons):
confidence = random.uniform(0.3, 0.95)
pose = {
"person_id": i,
"confidence": confidence,
"keypoints": self._generate_keypoints(),
"bounding_box": self._generate_bounding_box(),
"activity": random.choice(["standing", "sitting", "walking", "lying"]),
"timestamp": datetime.now().isoformat()
}
poses.append(pose)
return poses
def _generate_keypoints(self) -> List[Dict[str, Any]]:
"""Generate keypoints for a person."""
import random
def _extract_keypoints_from_output(self, output: torch.Tensor) -> List[Dict[str, Any]]:
"""Extract keypoints from a single person's model output.
Attempts to decode keypoint coordinates from the output tensor.
If the tensor does not contain enough data for full keypoints,
returns keypoints with zero coordinates and confidence derived
from available data.
Args:
output: Single-person output tensor.
Returns:
List of keypoint dictionaries.
"""
keypoint_names = [
"nose", "left_eye", "right_eye", "left_ear", "right_ear",
"left_shoulder", "right_shoulder", "left_elbow", "right_elbow",
"left_wrist", "right_wrist", "left_hip", "right_hip",
"left_knee", "right_knee", "left_ankle", "right_ankle"
"left_knee", "right_knee", "left_ankle", "right_ankle",
]
keypoints = []
for name in keypoint_names:
keypoints.append({
"name": name,
"x": random.uniform(0.1, 0.9),
"y": random.uniform(0.1, 0.9),
"confidence": random.uniform(0.5, 0.95)
})
# Each keypoint needs 3 values: x, y, confidence
# Skip first value (overall confidence), keypoints start at index 1
kp_start = 1
values_per_kp = 3
total_kp_values = len(keypoint_names) * values_per_kp
if output.shape[0] >= kp_start + total_kp_values:
kp_data = output[kp_start:kp_start + total_kp_values]
for j, name in enumerate(keypoint_names):
offset = j * values_per_kp
x = float(torch.sigmoid(kp_data[offset]).item())
y = float(torch.sigmoid(kp_data[offset + 1]).item())
conf = float(torch.sigmoid(kp_data[offset + 2]).item())
keypoints.append({"name": name, "x": x, "y": y, "confidence": conf})
else:
# Not enough output dimensions for full keypoints; return zeros
for name in keypoint_names:
keypoints.append({"name": name, "x": 0.0, "y": 0.0, "confidence": 0.0})
return keypoints
def _extract_bbox_from_output(self, output: torch.Tensor) -> Dict[str, float]:
"""Extract bounding box from a single person's model output.
Looks for bbox values after the keypoint section. If not available,
returns a zero bounding box.
Args:
output: Single-person output tensor.
Returns:
Bounding box dictionary with x, y, width, height.
"""
# Bounding box comes after: 1 (confidence) + 17*3 (keypoints) = 52
bbox_start = 52
if output.shape[0] >= bbox_start + 4:
x = float(torch.sigmoid(output[bbox_start]).item())
y = float(torch.sigmoid(output[bbox_start + 1]).item())
w = float(torch.sigmoid(output[bbox_start + 2]).item())
h = float(torch.sigmoid(output[bbox_start + 3]).item())
return {"x": x, "y": y, "width": w, "height": h}
else:
return {"x": 0.0, "y": 0.0, "width": 0.0, "height": 0.0}
def _generate_bounding_box(self) -> Dict[str, float]:
"""Generate bounding box for a person."""
import random
x = random.uniform(0.1, 0.6)
y = random.uniform(0.1, 0.6)
width = random.uniform(0.2, 0.4)
height = random.uniform(0.3, 0.5)
return {
"x": x,
"y": y,
"width": width,
"height": height
}
def _generate_mock_poses(self) -> List[Dict[str, Any]]:
"""Generate mock pose data for development.
Delegates to the testing module. Only callable when mock_pose_data is True.
Raises:
NotImplementedError: If called without mock_pose_data enabled,
indicating that real CSI data and trained models are required.
"""
if not self.settings.mock_pose_data:
raise NotImplementedError(
"Mock pose generation is disabled. Real pose estimation requires "
"CSI data from configured hardware and trained model weights. "
"Set mock_pose_data=True in settings for development, or provide "
"real CSI input. See docs/hardware-setup.md."
)
from src.testing.mock_pose_generator import generate_mock_poses
return generate_mock_poses(max_persons=self.settings.pose_max_persons)
def _classify_activity(self, features: torch.Tensor) -> str:
"""Classify activity from features."""
# Simple mock classification
import random
activities = ["standing", "sitting", "walking", "lying", "unknown"]
return random.choice(activities)
"""Classify activity from model features.
Uses the magnitude of the feature tensor to make a simple threshold-based
classification. This is a basic heuristic; a proper activity classifier
should be trained and loaded alongside the pose model.
"""
feature_norm = float(torch.norm(features).item())
# Deterministic classification based on feature magnitude ranges
if feature_norm > 2.0:
return "walking"
elif feature_norm > 1.0:
return "standing"
elif feature_norm > 0.5:
return "sitting"
elif feature_norm > 0.1:
return "lying"
else:
return "unknown"
def _update_stats(self, poses: List[Dict[str, Any]], processing_time: float):
"""Update processing statistics."""
@@ -424,21 +484,56 @@ class PoseService:
# API endpoint methods
async def estimate_poses(self, zone_ids=None, confidence_threshold=None, max_persons=None,
include_keypoints=True, include_segmentation=False):
"""Estimate poses with API parameters."""
include_keypoints=True, include_segmentation=False,
csi_data: Optional[np.ndarray] = None):
"""Estimate poses with API parameters.
Args:
zone_ids: List of zone identifiers to estimate poses for.
confidence_threshold: Minimum confidence threshold for detections.
max_persons: Maximum number of persons to return.
include_keypoints: Whether to include keypoint data.
include_segmentation: Whether to include segmentation masks.
csi_data: Real CSI data array. Required when mock_pose_data is False.
Raises:
NotImplementedError: If no CSI data is provided and mock mode is off.
"""
try:
# Generate mock CSI data for estimation
mock_csi = np.random.randn(64, 56, 3) # Mock CSI data
if csi_data is None and not self.settings.mock_pose_data:
raise NotImplementedError(
"Pose estimation requires real CSI data input. No CSI data was provided "
"and mock_pose_data is disabled. Either pass csi_data from hardware "
"collection, or enable mock_pose_data for development. "
"See docs/hardware-setup.md for CSI data collection setup."
)
metadata = {
"timestamp": datetime.now(),
"zone_ids": zone_ids or ["zone_1"],
"confidence_threshold": confidence_threshold or self.settings.pose_confidence_threshold,
"max_persons": max_persons or self.settings.pose_max_persons
"max_persons": max_persons or self.settings.pose_max_persons,
}
# Process the data
result = await self.process_csi_data(mock_csi, metadata)
if csi_data is not None:
# Process real CSI data
result = await self.process_csi_data(csi_data, metadata)
else:
# Mock mode: generate mock poses directly (no fake CSI data)
from src.testing.mock_pose_generator import generate_mock_poses
start_time = datetime.now()
mock_poses = generate_mock_poses(
max_persons=max_persons or self.settings.pose_max_persons
)
processing_time = (datetime.now() - start_time).total_seconds() * 1000
result = {
"timestamp": start_time.isoformat(),
"poses": mock_poses,
"metadata": metadata,
"processing_time_ms": processing_time,
"confidence_scores": [p.get("confidence", 0.0) for p in mock_poses],
}
# Format for API response
persons = []
for i, pose in enumerate(result["poses"]):
@@ -448,31 +543,33 @@ class PoseService:
"bounding_box": pose["bounding_box"],
"zone_id": zone_ids[0] if zone_ids else "zone_1",
"activity": pose["activity"],
"timestamp": datetime.fromisoformat(pose["timestamp"])
"timestamp": datetime.fromisoformat(pose["timestamp"]) if isinstance(pose["timestamp"], str) else pose["timestamp"],
}
if include_keypoints:
person["keypoints"] = pose["keypoints"]
if include_segmentation:
if include_segmentation and not self.settings.mock_pose_data:
person["segmentation"] = {"mask": "real_segmentation_data"}
elif include_segmentation:
person["segmentation"] = {"mask": "mock_segmentation_data"}
persons.append(person)
# Zone summary
zone_summary = {}
for zone_id in (zone_ids or ["zone_1"]):
zone_summary[zone_id] = len([p for p in persons if p.get("zone_id") == zone_id])
return {
"timestamp": datetime.now(),
"frame_id": f"frame_{int(datetime.now().timestamp())}",
"persons": persons,
"zone_summary": zone_summary,
"processing_time_ms": result["processing_time_ms"],
"metadata": {"mock_data": self.settings.mock_pose_data}
"metadata": {"mock_data": self.settings.mock_pose_data},
}
except Exception as e:
self.logger.error(f"Error in estimate_poses: {e}")
raise
@@ -484,132 +581,105 @@ class PoseService:
include_keypoints, include_segmentation)
async def get_zone_occupancy(self, zone_id: str):
"""Get current occupancy for a specific zone."""
"""Get current occupancy for a specific zone.
In mock mode, delegates to testing module. In production mode, returns
data based on actual pose estimation results or reports no data available.
"""
try:
# Mock occupancy data
import random
count = random.randint(0, 5)
persons = []
for i in range(count):
persons.append({
"person_id": f"person_{i}",
"confidence": random.uniform(0.7, 0.95),
"activity": random.choice(["standing", "sitting", "walking"])
})
if self.settings.mock_pose_data:
from src.testing.mock_pose_generator import generate_mock_zone_occupancy
return generate_mock_zone_occupancy(zone_id)
# Production: no real-time occupancy data without active CSI stream
return {
"count": count,
"count": 0,
"max_occupancy": 10,
"persons": persons,
"timestamp": datetime.now()
"persons": [],
"timestamp": datetime.now(),
"note": "No real-time CSI data available. Connect hardware to get live occupancy.",
}
except Exception as e:
self.logger.error(f"Error getting zone occupancy: {e}")
return None
async def get_zones_summary(self):
"""Get occupancy summary for all zones."""
"""Get occupancy summary for all zones.
In mock mode, delegates to testing module. In production, returns
empty zones until real CSI data is being processed.
"""
try:
import random
if self.settings.mock_pose_data:
from src.testing.mock_pose_generator import generate_mock_zones_summary
return generate_mock_zones_summary()
# Production: no real-time data without active CSI stream
zones = ["zone_1", "zone_2", "zone_3", "zone_4"]
zone_data = {}
total_persons = 0
active_zones = 0
for zone_id in zones:
count = random.randint(0, 3)
zone_data[zone_id] = {
"occupancy": count,
"occupancy": 0,
"max_occupancy": 10,
"status": "active" if count > 0 else "inactive"
"status": "inactive",
}
total_persons += count
if count > 0:
active_zones += 1
return {
"total_persons": total_persons,
"total_persons": 0,
"zones": zone_data,
"active_zones": active_zones
"active_zones": 0,
"note": "No real-time CSI data available. Connect hardware to get live occupancy.",
}
except Exception as e:
self.logger.error(f"Error getting zones summary: {e}")
raise
async def get_historical_data(self, start_time, end_time, zone_ids=None,
aggregation_interval=300, include_raw_data=False):
"""Get historical pose estimation data."""
"""Get historical pose estimation data.
In mock mode, delegates to testing module. In production, returns
empty data indicating no historical records are stored yet.
"""
try:
# Mock historical data
import random
from datetime import timedelta
current_time = start_time
aggregated_data = []
raw_data = [] if include_raw_data else None
while current_time < end_time:
# Generate aggregated data point
data_point = {
"timestamp": current_time,
"total_persons": random.randint(0, 8),
"zones": {}
}
for zone_id in (zone_ids or ["zone_1", "zone_2", "zone_3"]):
data_point["zones"][zone_id] = {
"occupancy": random.randint(0, 3),
"avg_confidence": random.uniform(0.7, 0.95)
}
aggregated_data.append(data_point)
# Generate raw data if requested
if include_raw_data:
for _ in range(random.randint(0, 5)):
raw_data.append({
"timestamp": current_time + timedelta(seconds=random.randint(0, aggregation_interval)),
"person_id": f"person_{random.randint(1, 10)}",
"zone_id": random.choice(zone_ids or ["zone_1", "zone_2", "zone_3"]),
"confidence": random.uniform(0.5, 0.95),
"activity": random.choice(["standing", "sitting", "walking"])
})
current_time += timedelta(seconds=aggregation_interval)
if self.settings.mock_pose_data:
from src.testing.mock_pose_generator import generate_mock_historical_data
return generate_mock_historical_data(
start_time=start_time,
end_time=end_time,
zone_ids=zone_ids,
aggregation_interval=aggregation_interval,
include_raw_data=include_raw_data,
)
# Production: no historical data without a persistence backend
return {
"aggregated_data": aggregated_data,
"raw_data": raw_data,
"total_records": len(aggregated_data)
"aggregated_data": [],
"raw_data": [] if include_raw_data else None,
"total_records": 0,
"note": "No historical data available. A data persistence backend must be configured to store historical records.",
}
except Exception as e:
self.logger.error(f"Error getting historical data: {e}")
raise
async def get_recent_activities(self, zone_id=None, limit=10):
"""Get recently detected activities."""
"""Get recently detected activities.
In mock mode, delegates to testing module. In production, returns
empty list indicating no activity data has been recorded yet.
"""
try:
import random
activities = []
for i in range(limit):
activity = {
"activity_id": f"activity_{i}",
"person_id": f"person_{random.randint(1, 5)}",
"zone_id": zone_id or random.choice(["zone_1", "zone_2", "zone_3"]),
"activity": random.choice(["standing", "sitting", "walking", "lying"]),
"confidence": random.uniform(0.6, 0.95),
"timestamp": datetime.now() - timedelta(minutes=random.randint(0, 60)),
"duration_seconds": random.randint(10, 300)
}
activities.append(activity)
return activities
if self.settings.mock_pose_data:
from src.testing.mock_pose_generator import generate_mock_recent_activities
return generate_mock_recent_activities(zone_id=zone_id, limit=limit)
# Production: no activity records without an active CSI stream
return []
except Exception as e:
self.logger.error(f"Error getting recent activities: {e}")
raise

View File

@@ -0,0 +1,301 @@
"""
Mock pose data generator for testing and development.
This module provides synthetic pose estimation data for use in development
and testing environments ONLY. The generated data mimics realistic human
pose detection outputs including keypoints, bounding boxes, and activities.
WARNING: This module uses random number generation intentionally for test data.
Do NOT use this module in production data paths.
"""
import random
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
# Banner displayed when mock pose mode is active
MOCK_POSE_BANNER = """
================================================================================
WARNING: MOCK POSE MODE ACTIVE - Using synthetic pose data
All pose detections are randomly generated and do NOT represent real humans.
For real pose estimation, provide trained model weights and real CSI data.
See docs/hardware-setup.md for configuration instructions.
================================================================================
"""
_banner_shown = False
def _show_banner() -> None:
"""Display the mock pose mode warning banner (once per session)."""
global _banner_shown
if not _banner_shown:
logger.warning(MOCK_POSE_BANNER)
_banner_shown = True
def generate_mock_keypoints() -> List[Dict[str, Any]]:
"""Generate mock keypoints for a single person.
Returns:
List of 17 COCO-format keypoint dictionaries with name, x, y, confidence.
"""
keypoint_names = [
"nose", "left_eye", "right_eye", "left_ear", "right_ear",
"left_shoulder", "right_shoulder", "left_elbow", "right_elbow",
"left_wrist", "right_wrist", "left_hip", "right_hip",
"left_knee", "right_knee", "left_ankle", "right_ankle",
]
keypoints = []
for name in keypoint_names:
keypoints.append({
"name": name,
"x": random.uniform(0.1, 0.9),
"y": random.uniform(0.1, 0.9),
"confidence": random.uniform(0.5, 0.95),
})
return keypoints
def generate_mock_bounding_box() -> Dict[str, float]:
"""Generate a mock bounding box for a single person.
Returns:
Dictionary with x, y, width, height as normalized coordinates.
"""
x = random.uniform(0.1, 0.6)
y = random.uniform(0.1, 0.6)
width = random.uniform(0.2, 0.4)
height = random.uniform(0.3, 0.5)
return {"x": x, "y": y, "width": width, "height": height}
def generate_mock_poses(max_persons: int = 3) -> List[Dict[str, Any]]:
"""Generate mock pose detections for testing.
Args:
max_persons: Maximum number of persons to generate (1 to max_persons).
Returns:
List of pose detection dictionaries.
"""
_show_banner()
num_persons = random.randint(1, min(3, max_persons))
poses = []
for i in range(num_persons):
confidence = random.uniform(0.3, 0.95)
pose = {
"person_id": i,
"confidence": confidence,
"keypoints": generate_mock_keypoints(),
"bounding_box": generate_mock_bounding_box(),
"activity": random.choice(["standing", "sitting", "walking", "lying"]),
"timestamp": datetime.now().isoformat(),
}
poses.append(pose)
return poses
def generate_mock_zone_occupancy(zone_id: str) -> Dict[str, Any]:
"""Generate mock zone occupancy data.
Args:
zone_id: Zone identifier.
Returns:
Dictionary with occupancy count and person details.
"""
_show_banner()
count = random.randint(0, 5)
persons = []
for i in range(count):
persons.append({
"person_id": f"person_{i}",
"confidence": random.uniform(0.7, 0.95),
"activity": random.choice(["standing", "sitting", "walking"]),
})
return {
"count": count,
"max_occupancy": 10,
"persons": persons,
"timestamp": datetime.now(),
}
def generate_mock_zones_summary(
zone_ids: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Generate mock zones summary data.
Args:
zone_ids: List of zone identifiers. Defaults to zone_1 through zone_4.
Returns:
Dictionary with per-zone occupancy and aggregate counts.
"""
_show_banner()
zones = zone_ids or ["zone_1", "zone_2", "zone_3", "zone_4"]
zone_data = {}
total_persons = 0
active_zones = 0
for zone_id in zones:
count = random.randint(0, 3)
zone_data[zone_id] = {
"occupancy": count,
"max_occupancy": 10,
"status": "active" if count > 0 else "inactive",
}
total_persons += count
if count > 0:
active_zones += 1
return {
"total_persons": total_persons,
"zones": zone_data,
"active_zones": active_zones,
}
def generate_mock_historical_data(
start_time: datetime,
end_time: datetime,
zone_ids: Optional[List[str]] = None,
aggregation_interval: int = 300,
include_raw_data: bool = False,
) -> Dict[str, Any]:
"""Generate mock historical pose data.
Args:
start_time: Start of the time range.
end_time: End of the time range.
zone_ids: Zones to include. Defaults to zone_1, zone_2, zone_3.
aggregation_interval: Seconds between data points.
include_raw_data: Whether to include simulated raw detections.
Returns:
Dictionary with aggregated_data, optional raw_data, and total_records.
"""
_show_banner()
zones = zone_ids or ["zone_1", "zone_2", "zone_3"]
current_time = start_time
aggregated_data = []
raw_data = [] if include_raw_data else None
while current_time < end_time:
data_point = {
"timestamp": current_time,
"total_persons": random.randint(0, 8),
"zones": {},
}
for zone_id in zones:
data_point["zones"][zone_id] = {
"occupancy": random.randint(0, 3),
"avg_confidence": random.uniform(0.7, 0.95),
}
aggregated_data.append(data_point)
if include_raw_data:
for _ in range(random.randint(0, 5)):
raw_data.append({
"timestamp": current_time + timedelta(seconds=random.randint(0, aggregation_interval)),
"person_id": f"person_{random.randint(1, 10)}",
"zone_id": random.choice(zones),
"confidence": random.uniform(0.5, 0.95),
"activity": random.choice(["standing", "sitting", "walking"]),
})
current_time += timedelta(seconds=aggregation_interval)
return {
"aggregated_data": aggregated_data,
"raw_data": raw_data,
"total_records": len(aggregated_data),
}
def generate_mock_recent_activities(
zone_id: Optional[str] = None,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""Generate mock recent activity data.
Args:
zone_id: Optional zone filter. If None, random zones are used.
limit: Number of activities to generate.
Returns:
List of activity dictionaries.
"""
_show_banner()
activities = []
for i in range(limit):
activity = {
"activity_id": f"activity_{i}",
"person_id": f"person_{random.randint(1, 5)}",
"zone_id": zone_id or random.choice(["zone_1", "zone_2", "zone_3"]),
"activity": random.choice(["standing", "sitting", "walking", "lying"]),
"confidence": random.uniform(0.6, 0.95),
"timestamp": datetime.now() - timedelta(minutes=random.randint(0, 60)),
"duration_seconds": random.randint(10, 300),
}
activities.append(activity)
return activities
def generate_mock_statistics(
start_time: datetime,
end_time: datetime,
) -> Dict[str, Any]:
"""Generate mock pose estimation statistics.
Args:
start_time: Start of the statistics period.
end_time: End of the statistics period.
Returns:
Dictionary with detection counts, rates, and distributions.
"""
_show_banner()
total_detections = random.randint(100, 1000)
successful_detections = int(total_detections * random.uniform(0.8, 0.95))
return {
"total_detections": total_detections,
"successful_detections": successful_detections,
"failed_detections": total_detections - successful_detections,
"success_rate": successful_detections / total_detections,
"average_confidence": random.uniform(0.75, 0.90),
"average_processing_time_ms": random.uniform(50, 200),
"unique_persons": random.randint(5, 20),
"most_active_zone": random.choice(["zone_1", "zone_2", "zone_3"]),
"activity_distribution": {
"standing": random.uniform(0.3, 0.5),
"sitting": random.uniform(0.2, 0.4),
"walking": random.uniform(0.1, 0.3),
"lying": random.uniform(0.0, 0.1),
},
}