# WiFi DensePose Implementation - Fixed version # Based on "DensePose From WiFi" by Carnegie Mellon University import numpy as np import math from typing import Dict, List, Tuple, Optional from collections import OrderedDict import json class CSIPhaseProcessor: """ Processes raw CSI phase data through unwrapping, filtering, and linear fitting Based on the phase sanitization methodology from the paper """ def __init__(self, num_subcarriers: int = 30): self.num_subcarriers = num_subcarriers print(f"Initialized CSI Phase Processor with {num_subcarriers} subcarriers") def unwrap_phase(self, phase_data: np.ndarray) -> np.ndarray: """ Unwrap phase values to handle discontinuities Phase data shape: (freq_samples, ant_tx, ant_rx) = (150, 3, 3) """ unwrapped = np.copy(phase_data) # Unwrap along frequency dimension (groups of 30 frequencies) for sample_group in range(5): # 5 consecutive samples start_idx = sample_group * 30 end_idx = start_idx + 30 for tx in range(3): for rx in range(3): for i in range(start_idx + 1, end_idx): diff = unwrapped[i, tx, rx] - unwrapped[i-1, tx, rx] if diff > np.pi: unwrapped[i, tx, rx] = unwrapped[i-1, tx, rx] + diff - 2*np.pi elif diff < -np.pi: unwrapped[i, tx, rx] = unwrapped[i-1, tx, rx] + diff + 2*np.pi return unwrapped def apply_filters(self, phase_data: np.ndarray) -> np.ndarray: """ Apply median and uniform filters to eliminate outliers """ filtered = np.copy(phase_data) # Apply smoothing in frequency dimension for i in range(1, phase_data.shape[0]-1): filtered[i] = (phase_data[i-1] + phase_data[i] + phase_data[i+1]) / 3 return filtered def linear_fitting(self, phase_data: np.ndarray) -> np.ndarray: """ Apply linear fitting to remove systematic phase drift """ fitted_data = np.copy(phase_data) F = self.num_subcarriers # Process each sample group (5 consecutive samples) for sample_group in range(5): start_idx = sample_group * 30 end_idx = start_idx + 30 for tx in range(3): for rx in range(3): phase_seq = phase_data[start_idx:end_idx, tx, rx] # Calculate linear coefficients if len(phase_seq) > 1: alpha1 = (phase_seq[-1] - phase_seq[0]) / (2 * np.pi * F) alpha0 = np.mean(phase_seq) # Apply linear fitting frequencies = np.arange(1, len(phase_seq) + 1) linear_trend = alpha1 * frequencies + alpha0 fitted_data[start_idx:end_idx, tx, rx] = phase_seq - linear_trend return fitted_data def sanitize_phase(self, raw_phase: np.ndarray) -> np.ndarray: """ Complete phase sanitization pipeline """ print("Sanitizing CSI phase data...") print(f"Input shape: {raw_phase.shape}") # Step 1: Unwrap phase unwrapped = self.unwrap_phase(raw_phase) print("✓ Phase unwrapping completed") # Step 2: Apply filters filtered = self.apply_filters(unwrapped) print("✓ Filtering completed") # Step 3: Linear fitting sanitized = self.linear_fitting(filtered) print("✓ Linear fitting completed") return sanitized class ModalityTranslationNetwork: """ Simulates the modality translation network behavior Translates CSI domain features to spatial domain features """ def __init__(self, input_shape=(150, 3, 3), output_shape=(3, 720, 1280)): self.input_shape = input_shape self.output_shape = output_shape self.hidden_dim = 512 # Initialize simulated weights np.random.seed(42) self.amp_weights = np.random.normal(0, 0.1, (np.prod(input_shape), self.hidden_dim//4)) self.phase_weights = np.random.normal(0, 0.1, (np.prod(input_shape), self.hidden_dim//4)) self.fusion_weights = np.random.normal(0, 0.1, (self.hidden_dim//2, 24*24)) print(f"Initialized Modality Translation Network:") print(f" Input: {input_shape} -> Output: {output_shape}") def encode_features(self, amplitude_data, phase_data): """ Simulate feature encoding from amplitude and phase data """ # Flatten inputs amp_flat = amplitude_data.flatten() phase_flat = phase_data.flatten() # Simple linear transformation (simulating MLP) amp_features = np.tanh(np.dot(amp_flat, self.amp_weights)) phase_features = np.tanh(np.dot(phase_flat, self.phase_weights)) return amp_features, phase_features def fuse_and_translate(self, amp_features, phase_features): """ Fuse features and translate to spatial domain """ # Concatenate features fused = np.concatenate([amp_features, phase_features]) # Apply fusion transformation spatial_features = np.tanh(np.dot(fused, self.fusion_weights)) # Reshape to spatial map spatial_map = spatial_features.reshape(24, 24) # Simulate upsampling to target resolution # Using simple bilinear interpolation simulation from scipy.ndimage import zoom upsampled = zoom(spatial_map, (self.output_shape[1]/24, self.output_shape[2]/24), order=1) # Create 3-channel output output = np.stack([upsampled, upsampled * 0.8, upsampled * 0.6]) return output def forward(self, amplitude_data, phase_data): """ Complete forward pass """ # Encode features amp_features, phase_features = self.encode_features(amplitude_data, phase_data) # Translate to spatial domain spatial_output = self.fuse_and_translate(amp_features, phase_features) return spatial_output class WiFiDensePoseSystem: """ Complete WiFi DensePose system """ def __init__(self): self.config = WiFiDensePoseConfig() self.phase_processor = CSIPhaseProcessor(self.config.num_subcarriers) self.modality_network = ModalityTranslationNetwork() print("WiFi DensePose System initialized!") def process_csi_data(self, amplitude_data, phase_data): """ Process raw CSI data through the complete pipeline """ # Step 1: Phase sanitization sanitized_phase = self.phase_processor.sanitize_phase(phase_data) # Step 2: Modality translation spatial_features = self.modality_network.forward(amplitude_data, sanitized_phase) # Step 3: Simulate DensePose prediction pose_prediction = self.simulate_densepose_prediction(spatial_features) return { 'sanitized_phase': sanitized_phase, 'spatial_features': spatial_features, 'pose_prediction': pose_prediction } def simulate_densepose_prediction(self, spatial_features): """ Simulate DensePose-RCNN prediction """ # Simulate person detection num_detected = np.random.randint(1, 4) # 1-3 people predictions = [] for i in range(num_detected): # Simulate bounding box x = np.random.uniform(50, spatial_features.shape[1] - 150) y = np.random.uniform(50, spatial_features.shape[2] - 300) w = np.random.uniform(80, 150) h = np.random.uniform(200, 300) # Simulate confidence confidence = np.random.uniform(0.7, 0.95) # Simulate keypoints keypoints = [] for kp in range(17): kp_x = x + np.random.uniform(-w/4, w/4) kp_y = y + np.random.uniform(-h/4, h/4) kp_conf = np.random.uniform(0.6, 0.9) keypoints.extend([kp_x, kp_y, kp_conf]) # Simulate UV map (simplified) uv_map = np.random.uniform(0, 1, (24, 112, 112)) predictions.append({ 'bbox': [x, y, w, h], 'confidence': confidence, 'keypoints': keypoints, 'uv_map': uv_map }) return predictions # Configuration and utility classes class WiFiDensePoseConfig: """Configuration class for WiFi DensePose system""" def __init__(self): # Hardware configuration self.num_transmitters = 3 self.num_receivers = 3 self.num_subcarriers = 30 self.sampling_rate = 100 # Hz self.consecutive_samples = 5 # Network configuration self.input_amplitude_shape = (150, 3, 3) # 5 samples * 30 frequencies, 3x3 antennas self.input_phase_shape = (150, 3, 3) self.output_feature_shape = (3, 720, 1280) # Image-like feature map # DensePose configuration self.num_body_parts = 24 self.num_keypoints = 17 self.keypoint_heatmap_size = (56, 56) self.uv_map_size = (112, 112) class WiFiDataSimulator: """Simulates WiFi CSI data for demonstration purposes""" def __init__(self, config: WiFiDensePoseConfig): self.config = config np.random.seed(42) # For reproducibility def generate_csi_sample(self, num_people: int = 1, movement_intensity: float = 1.0) -> Tuple[np.ndarray, np.ndarray]: """Generate simulated CSI amplitude and phase data""" # Base CSI signal (environment) amplitude = np.ones(self.config.input_amplitude_shape) * 50 # Base signal strength phase = np.zeros(self.config.input_phase_shape) # Add noise amplitude += np.random.normal(0, 5, self.config.input_amplitude_shape) phase += np.random.normal(0, 0.1, self.config.input_phase_shape) # Simulate human presence effects for person in range(num_people): # Random position effects pos_x = np.random.uniform(0.2, 0.8) pos_y = np.random.uniform(0.2, 0.8) # Create interference patterns for tx in range(3): for rx in range(3): # Distance-based attenuation distance = np.sqrt((tx/2 - pos_x)**2 + (rx/2 - pos_y)**2) attenuation = movement_intensity * np.exp(-distance * 2) # Frequency-dependent effects for freq in range(30): freq_effect = np.sin(2 * np.pi * freq / 30 + person * np.pi/2) # Apply effects to all 5 samples for this frequency for sample in range(5): sample_idx = sample * 30 + freq amplitude[sample_idx, tx, rx] *= (1 - attenuation * 0.3 * freq_effect) phase[sample_idx, tx, rx] += attenuation * freq_effect * movement_intensity return amplitude, phase # Install scipy for zoom function try: from scipy.ndimage import zoom except ImportError: print("Installing scipy...") import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "scipy"]) from scipy.ndimage import zoom # Initialize the complete system print("="*60) print("WIFI DENSEPOSE SYSTEM DEMONSTRATION") print("="*60) config = WiFiDensePoseConfig() data_simulator = WiFiDataSimulator(config) wifi_system = WiFiDensePoseSystem() # Generate and process sample data print("\n1. Generating sample CSI data...") amplitude_data, phase_data = data_simulator.generate_csi_sample(num_people=2, movement_intensity=1.5) print(f" Generated CSI data shapes: Amplitude {amplitude_data.shape}, Phase {phase_data.shape}") print("\n2. Processing through WiFi DensePose pipeline...") results = wifi_system.process_csi_data(amplitude_data, phase_data) print(f"\n3. Results:") print(f" Sanitized phase range: [{results['sanitized_phase'].min():.3f}, {results['sanitized_phase'].max():.3f}]") print(f" Spatial features shape: {results['spatial_features'].shape}") print(f" Detected {len(results['pose_prediction'])} people") for i, pred in enumerate(results['pose_prediction']): bbox = pred['bbox'] print(f" Person {i+1}: bbox=[{bbox[0]:.1f}, {bbox[1]:.1f}, {bbox[2]:.1f}, {bbox[3]:.1f}], confidence={pred['confidence']:.3f}") print("\nWiFi DensePose system demonstration completed successfully!") print(f"System specifications:") print(f" - Hardware cost: ~$30 (2 TP-Link AC1750 routers)") print(f" - Frequency: 2.4GHz ± 20MHz") print(f" - Sampling rate: {config.sampling_rate}Hz") print(f" - Body parts detected: {config.num_body_parts}") print(f" - Keypoints tracked: {config.num_keypoints}") print(f" - Works through walls: ✓") print(f" - Privacy preserving: ✓") print(f" - Real-time capable: ✓")