Commodity Sensing Module (ADR-013): - sensing/rssi_collector.py: Real Linux WiFi RSSI collection from /proc/net/wireless and iw commands, with SimulatedCollector for testing - sensing/feature_extractor.py: FFT-based spectral analysis, CUSUM change-point detection, breathing/motion band power extraction - sensing/classifier.py: Rule-based presence/motion classification with confidence scoring and multi-receiver agreement - sensing/backend.py: Common SensingBackend protocol with honest capability reporting (PRESENCE + MOTION only for commodity) Proof of Reality Bundle (ADR-011): - data/proof/generate_reference_signal.py: Deterministic synthetic CSI with known breathing (0.3 Hz) and walking (1.2 Hz) signals - data/proof/sample_csi_data.json: Generated reference signal - data/proof/verify.py: One-command pipeline verification with SHA-256 - data/proof/expected_features.sha256: Expected output hash Three.js Visualization: - ui/components/scene.js: 3D scene setup with OrbitControls Mock Isolation: - testing/mock_pose_generator.py: Mock pose generation moved out of production pose_service.py - services/pose_service.py: Cleaned mock paths https://claude.ai/code/session_01Ki7pvEZtJDvqJkmyn6B714
264 lines
8.2 KiB
Python
264 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Proof-of-Reality Verification Script for WiFi-DensePose Pipeline.
|
|
|
|
This script verifies that the signal processing pipeline produces
|
|
DETERMINISTIC, REPRODUCIBLE output from a known reference signal.
|
|
|
|
Steps:
|
|
1. Load the synthetic reference CSI signal from sample_csi_data.json
|
|
2. Feed each frame through the actual CSI processor feature extraction
|
|
3. Collect all feature outputs into a canonical byte representation
|
|
4. Compute SHA-256 hash of the full feature output
|
|
5. Compare against the expected hash in expected_features.sha256
|
|
6. Print PASS or FAIL
|
|
|
|
The reference signal is SYNTHETIC (generated by generate_reference_signal.py)
|
|
and is used purely for pipeline determinism verification.
|
|
|
|
Usage:
|
|
python verify.py # Run verification against stored hash
|
|
python verify.py --generate-hash # Generate and print the expected hash
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import struct
|
|
import sys
|
|
import argparse
|
|
from datetime import datetime, timezone
|
|
|
|
import numpy as np
|
|
|
|
# Add the v1 directory to sys.path so we can import the actual modules
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
V1_DIR = os.path.abspath(os.path.join(SCRIPT_DIR, "..", "..")) # v1/data/proof -> v1/
|
|
if V1_DIR not in sys.path:
|
|
sys.path.insert(0, V1_DIR)
|
|
|
|
# Import the actual pipeline modules
|
|
from src.hardware.csi_extractor import CSIData
|
|
from src.core.csi_processor import CSIProcessor, CSIFeatures
|
|
|
|
|
|
# -- Configuration for the CSI processor (matches production defaults) --
|
|
PROCESSOR_CONFIG = {
|
|
"sampling_rate": 100,
|
|
"window_size": 56,
|
|
"overlap": 0.5,
|
|
"noise_threshold": -60,
|
|
"human_detection_threshold": 0.8,
|
|
"smoothing_factor": 0.9,
|
|
"max_history_size": 500,
|
|
"enable_preprocessing": True,
|
|
"enable_feature_extraction": True,
|
|
"enable_human_detection": True,
|
|
}
|
|
|
|
# Number of frames to process for the feature hash
|
|
# We process a representative subset to keep verification fast while
|
|
# still covering temporal dynamics (Doppler requires history)
|
|
VERIFICATION_FRAME_COUNT = 100 # First 100 frames = 1 second
|
|
|
|
|
|
def load_reference_signal(data_path):
|
|
"""Load the reference CSI signal from JSON.
|
|
|
|
Args:
|
|
data_path: Path to sample_csi_data.json.
|
|
|
|
Returns:
|
|
dict: Parsed JSON data.
|
|
|
|
Raises:
|
|
FileNotFoundError: If the data file doesn't exist.
|
|
json.JSONDecodeError: If the data is malformed.
|
|
"""
|
|
with open(data_path, "r") as f:
|
|
data = json.load(f)
|
|
return data
|
|
|
|
|
|
def frame_to_csi_data(frame, signal_meta):
|
|
"""Convert a JSON frame dict into a CSIData dataclass instance.
|
|
|
|
Args:
|
|
frame: Dict with 'amplitude', 'phase', 'timestamp_s', 'frame_index'.
|
|
signal_meta: Top-level signal metadata (num_antennas, frequency, etc).
|
|
|
|
Returns:
|
|
CSIData instance.
|
|
"""
|
|
amplitude = np.array(frame["amplitude"], dtype=np.float64)
|
|
phase = np.array(frame["phase"], dtype=np.float64)
|
|
timestamp = datetime.fromtimestamp(frame["timestamp_s"], tz=timezone.utc)
|
|
|
|
return CSIData(
|
|
timestamp=timestamp,
|
|
amplitude=amplitude,
|
|
phase=phase,
|
|
frequency=signal_meta["frequency_hz"],
|
|
bandwidth=signal_meta["bandwidth_hz"],
|
|
num_subcarriers=signal_meta["num_subcarriers"],
|
|
num_antennas=signal_meta["num_antennas"],
|
|
snr=15.0, # Fixed SNR for synthetic signal
|
|
metadata={
|
|
"source": "synthetic_reference",
|
|
"frame_index": frame["frame_index"],
|
|
},
|
|
)
|
|
|
|
|
|
def features_to_bytes(features):
|
|
"""Convert CSIFeatures to a deterministic byte representation.
|
|
|
|
We serialize each numpy array to bytes in a canonical order
|
|
using little-endian float64 representation. This ensures the
|
|
hash is platform-independent for IEEE 754 compliant systems.
|
|
|
|
Args:
|
|
features: CSIFeatures instance.
|
|
|
|
Returns:
|
|
bytes: Canonical byte representation.
|
|
"""
|
|
parts = []
|
|
|
|
# Serialize each feature array in declaration order
|
|
for array in [
|
|
features.amplitude_mean,
|
|
features.amplitude_variance,
|
|
features.phase_difference,
|
|
features.correlation_matrix,
|
|
features.doppler_shift,
|
|
features.power_spectral_density,
|
|
]:
|
|
flat = np.asarray(array, dtype=np.float64).ravel()
|
|
# Pack as little-endian double (8 bytes each)
|
|
parts.append(struct.pack(f"<{len(flat)}d", *flat))
|
|
|
|
return b"".join(parts)
|
|
|
|
|
|
def compute_pipeline_hash(data_path):
|
|
"""Run the full pipeline and compute the SHA-256 hash of all features.
|
|
|
|
Args:
|
|
data_path: Path to sample_csi_data.json.
|
|
|
|
Returns:
|
|
str: Hex-encoded SHA-256 hash of the feature output.
|
|
"""
|
|
# Load reference signal
|
|
signal_data = load_reference_signal(data_path)
|
|
frames = signal_data["frames"][:VERIFICATION_FRAME_COUNT]
|
|
|
|
# Create processor
|
|
processor = CSIProcessor(PROCESSOR_CONFIG)
|
|
|
|
# Process all frames and accumulate feature bytes
|
|
hasher = hashlib.sha256()
|
|
features_count = 0
|
|
|
|
for frame in frames:
|
|
csi_data = frame_to_csi_data(frame, signal_data)
|
|
|
|
# Run through the actual pipeline: preprocess -> extract features
|
|
preprocessed = processor.preprocess_csi_data(csi_data)
|
|
features = processor.extract_features(preprocessed)
|
|
|
|
if features is not None:
|
|
feature_bytes = features_to_bytes(features)
|
|
hasher.update(feature_bytes)
|
|
features_count += 1
|
|
|
|
# Add to history for Doppler computation in subsequent frames
|
|
processor.add_to_history(csi_data)
|
|
|
|
print(f" Processed {features_count} frames through pipeline")
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def main():
|
|
"""Main verification entry point."""
|
|
parser = argparse.ArgumentParser(
|
|
description="WiFi-DensePose pipeline verification"
|
|
)
|
|
parser.add_argument(
|
|
"--generate-hash",
|
|
action="store_true",
|
|
help="Generate and print the expected hash (do not verify)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 70)
|
|
print("WiFi-DensePose: Pipeline Verification")
|
|
print("=" * 70)
|
|
print()
|
|
|
|
# Locate data file
|
|
data_path = os.path.join(SCRIPT_DIR, "sample_csi_data.json")
|
|
hash_path = os.path.join(SCRIPT_DIR, "expected_features.sha256")
|
|
|
|
if not os.path.exists(data_path):
|
|
print(f"FAIL: Reference data not found at {data_path}")
|
|
print(" Run generate_reference_signal.py first.")
|
|
sys.exit(1)
|
|
|
|
# Compute hash
|
|
print("[1/2] Processing reference signal through pipeline...")
|
|
computed_hash = compute_pipeline_hash(data_path)
|
|
print(f" SHA-256: {computed_hash}")
|
|
print()
|
|
|
|
if args.generate_hash:
|
|
# Write the hash file
|
|
with open(hash_path, "w") as f:
|
|
f.write(computed_hash + "\n")
|
|
print(f"[2/2] Wrote expected hash to {hash_path}")
|
|
print()
|
|
print("HASH GENERATED - run without --generate-hash to verify")
|
|
print("=" * 70)
|
|
return
|
|
|
|
# Verify against expected hash
|
|
print("[2/2] Verifying against expected hash...")
|
|
if not os.path.exists(hash_path):
|
|
print(f" WARNING: No expected hash file at {hash_path}")
|
|
print(f" Computed hash: {computed_hash}")
|
|
print()
|
|
print(" Run with --generate-hash to create the expected hash file.")
|
|
print()
|
|
print("SKIP (no expected hash to compare against)")
|
|
print("=" * 70)
|
|
sys.exit(2)
|
|
|
|
with open(hash_path, "r") as f:
|
|
expected_hash = f.read().strip()
|
|
|
|
print(f" Expected: {expected_hash}")
|
|
print(f" Computed: {computed_hash}")
|
|
print()
|
|
|
|
if computed_hash == expected_hash:
|
|
print("PASS - Pipeline output is deterministic and matches expected hash.")
|
|
print("=" * 70)
|
|
sys.exit(0)
|
|
else:
|
|
print("FAIL - Pipeline output does NOT match expected hash.")
|
|
print()
|
|
print("Possible causes:")
|
|
print(" - Numpy/scipy version mismatch (check requirements-lock.txt)")
|
|
print(" - Code change in CSI processor that alters numerical output")
|
|
print(" - Platform floating-point differences (unlikely for IEEE 754)")
|
|
print()
|
|
print("To update the expected hash after intentional changes:")
|
|
print(" python verify.py --generate-hash")
|
|
print("=" * 70)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|