- Implemented the WiFi DensePose model in PyTorch, including CSI phase processing, modality translation, and DensePose prediction heads. - Added a comprehensive training utility for the model, including loss functions and training steps. - Created a CSV file to document hardware specifications, architecture details, training parameters, performance metrics, and advantages of the model.
245 lines
9.4 KiB
Python
245 lines
9.4 KiB
Python
# WiFi DensePose Implementation - Core Architecture (NumPy-based prototype)
|
|
# Based on "DensePose From WiFi" by Carnegie Mellon University
|
|
|
|
import numpy as np
|
|
import math
|
|
from typing import Dict, List, Tuple, Optional
|
|
from collections import OrderedDict
|
|
import json
|
|
|
|
class CSIPhaseProcessor:
|
|
"""
|
|
Processes raw CSI phase data through unwrapping, filtering, and linear fitting
|
|
Based on the phase sanitization methodology from the paper
|
|
"""
|
|
|
|
def __init__(self, num_subcarriers: int = 30):
|
|
self.num_subcarriers = num_subcarriers
|
|
print(f"Initialized CSI Phase Processor with {num_subcarriers} subcarriers")
|
|
|
|
def unwrap_phase(self, phase_data: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Unwrap phase values to handle discontinuities
|
|
"""
|
|
unwrapped = np.copy(phase_data)
|
|
|
|
for i in range(1, phase_data.shape[1]): # Along frequency dimension
|
|
diff = unwrapped[:, i] - unwrapped[:, i-1]
|
|
|
|
# Apply unwrapping logic
|
|
unwrapped[:, i] = np.where(diff > np.pi,
|
|
unwrapped[:, i-1] + diff - 2*np.pi,
|
|
unwrapped[:, i])
|
|
unwrapped[:, i] = np.where(diff < -np.pi,
|
|
unwrapped[:, i-1] + diff + 2*np.pi,
|
|
unwrapped[:, i])
|
|
|
|
return unwrapped
|
|
|
|
def apply_filters(self, phase_data: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Apply median and uniform filters to eliminate outliers
|
|
"""
|
|
filtered = np.copy(phase_data)
|
|
|
|
# Apply simple smoothing in time dimension
|
|
for i in range(1, phase_data.shape[0]-1):
|
|
filtered[i] = (phase_data[i-1] + phase_data[i] + phase_data[i+1]) / 3
|
|
|
|
# Apply smoothing in frequency dimension
|
|
for i in range(1, phase_data.shape[1]-1):
|
|
filtered[:, i] = (filtered[:, i-1] + filtered[:, i] + filtered[:, i+1]) / 3
|
|
|
|
return filtered
|
|
|
|
def linear_fitting(self, phase_data: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Apply linear fitting to remove systematic phase drift
|
|
"""
|
|
fitted_data = np.copy(phase_data)
|
|
F = self.num_subcarriers
|
|
|
|
for sample_idx in range(phase_data.shape[0]):
|
|
for ant_i in range(phase_data.shape[2]):
|
|
for ant_j in range(phase_data.shape[3]):
|
|
phase_seq = phase_data[sample_idx, :, ant_i, ant_j]
|
|
|
|
# Calculate linear coefficients
|
|
alpha1 = (phase_seq[-1] - phase_seq[0]) / (2 * np.pi * F)
|
|
alpha0 = np.mean(phase_seq)
|
|
|
|
# Apply linear fitting
|
|
frequencies = np.arange(1, F + 1)
|
|
linear_trend = alpha1 * frequencies + alpha0
|
|
fitted_data[sample_idx, :, ant_i, ant_j] = phase_seq - linear_trend
|
|
|
|
return fitted_data
|
|
|
|
def sanitize_phase(self, raw_phase: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Complete phase sanitization pipeline
|
|
"""
|
|
print("Sanitizing CSI phase data...")
|
|
print(f"Input shape: {raw_phase.shape}")
|
|
|
|
# Step 1: Unwrap phase
|
|
unwrapped = self.unwrap_phase(raw_phase)
|
|
print("✓ Phase unwrapping completed")
|
|
|
|
# Step 2: Apply filters
|
|
filtered = self.apply_filters(unwrapped)
|
|
print("✓ Filtering completed")
|
|
|
|
# Step 3: Linear fitting
|
|
sanitized = self.linear_fitting(filtered)
|
|
print("✓ Linear fitting completed")
|
|
|
|
return sanitized
|
|
|
|
class WiFiDensePoseConfig:
|
|
"""
|
|
Configuration class for WiFi DensePose system
|
|
"""
|
|
def __init__(self):
|
|
# Hardware configuration
|
|
self.num_transmitters = 3
|
|
self.num_receivers = 3
|
|
self.num_subcarriers = 30
|
|
self.sampling_rate = 100 # Hz
|
|
self.consecutive_samples = 5
|
|
|
|
# Network configuration
|
|
self.input_amplitude_shape = (150, 3, 3) # 5 samples * 30 frequencies, 3x3 antennas
|
|
self.input_phase_shape = (150, 3, 3)
|
|
self.output_feature_shape = (3, 720, 1280) # Image-like feature map
|
|
|
|
# DensePose configuration
|
|
self.num_body_parts = 24
|
|
self.num_keypoints = 17
|
|
self.keypoint_heatmap_size = (56, 56)
|
|
self.uv_map_size = (112, 112)
|
|
|
|
# Training configuration
|
|
self.learning_rate = 1e-3
|
|
self.batch_size = 16
|
|
self.num_epochs = 145000
|
|
self.lambda_dp = 0.6 # DensePose loss weight
|
|
self.lambda_kp = 0.3 # Keypoint loss weight
|
|
self.lambda_tr = 0.1 # Transfer learning loss weight
|
|
|
|
class WiFiDataSimulator:
|
|
"""
|
|
Simulates WiFi CSI data for demonstration purposes
|
|
"""
|
|
|
|
def __init__(self, config: WiFiDensePoseConfig):
|
|
self.config = config
|
|
np.random.seed(42) # For reproducibility
|
|
|
|
def generate_csi_sample(self, num_people: int = 1, movement_intensity: float = 1.0) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""
|
|
Generate simulated CSI amplitude and phase data
|
|
"""
|
|
# Base CSI signal (environment)
|
|
amplitude = np.ones(self.config.input_amplitude_shape) * 50 # Base signal strength
|
|
phase = np.zeros(self.config.input_phase_shape)
|
|
|
|
# Add noise
|
|
amplitude += np.random.normal(0, 5, self.config.input_amplitude_shape)
|
|
phase += np.random.normal(0, 0.1, self.config.input_phase_shape)
|
|
|
|
# Simulate human presence effects
|
|
for person in range(num_people):
|
|
# Random position effects
|
|
pos_x = np.random.uniform(0.2, 0.8)
|
|
pos_y = np.random.uniform(0.2, 0.8)
|
|
|
|
# Create interference patterns
|
|
for tx in range(3):
|
|
for rx in range(3):
|
|
# Distance-based attenuation
|
|
distance = np.sqrt((tx/2 - pos_x)**2 + (rx/2 - pos_y)**2)
|
|
attenuation = movement_intensity * np.exp(-distance * 2)
|
|
|
|
# Frequency-dependent effects
|
|
for freq in range(30):
|
|
freq_effect = np.sin(2 * np.pi * freq / 30 + person * np.pi/2)
|
|
|
|
# Amplitude effects
|
|
for sample in range(5):
|
|
sample_idx = sample * 30 + freq
|
|
amplitude[sample_idx, tx, rx] *= (1 - attenuation * 0.3 * freq_effect)
|
|
|
|
# Phase effects
|
|
for sample in range(5):
|
|
sample_idx = sample * 30 + freq
|
|
phase[sample_idx, tx, rx] += attenuation * freq_effect * movement_intensity
|
|
|
|
return amplitude, phase
|
|
|
|
def generate_ground_truth_poses(self, num_people: int = 1) -> Dict:
|
|
"""
|
|
Generate simulated ground truth pose data
|
|
"""
|
|
poses = []
|
|
for person in range(num_people):
|
|
# Simulate a person's bounding box
|
|
x = np.random.uniform(100, 620) # Within 720px width
|
|
y = np.random.uniform(100, 1180) # Within 1280px height
|
|
w = np.random.uniform(80, 200)
|
|
h = np.random.uniform(150, 400)
|
|
|
|
# Simulate keypoints (17 COCO keypoints)
|
|
keypoints = []
|
|
for kp in range(17):
|
|
kp_x = x + np.random.uniform(-w/4, w/4)
|
|
kp_y = y + np.random.uniform(-h/4, h/4)
|
|
confidence = np.random.uniform(0.7, 1.0)
|
|
keypoints.extend([kp_x, kp_y, confidence])
|
|
|
|
poses.append({
|
|
'bbox': [x, y, w, h],
|
|
'keypoints': keypoints,
|
|
'person_id': person
|
|
})
|
|
|
|
return {'poses': poses, 'num_people': num_people}
|
|
|
|
# Initialize the system
|
|
config = WiFiDensePoseConfig()
|
|
phase_processor = CSIPhaseProcessor(config.num_subcarriers)
|
|
data_simulator = WiFiDataSimulator(config)
|
|
|
|
print("WiFi DensePose System Initialized!")
|
|
print(f"Configuration:")
|
|
print(f" - Hardware: {config.num_transmitters}x{config.num_receivers} antenna array")
|
|
print(f" - Frequencies: {config.num_subcarriers} subcarriers at 2.4GHz")
|
|
print(f" - Sampling: {config.sampling_rate}Hz")
|
|
print(f" - Body parts: {config.num_body_parts}")
|
|
print(f" - Keypoints: {config.num_keypoints}")
|
|
|
|
# Demonstrate CSI data processing
|
|
print("\n" + "="*60)
|
|
print("DEMONSTRATING CSI DATA PROCESSING")
|
|
print("="*60)
|
|
|
|
# Generate sample CSI data
|
|
amplitude_data, phase_data = data_simulator.generate_csi_sample(num_people=2, movement_intensity=1.5)
|
|
print(f"Generated CSI data:")
|
|
print(f" Amplitude shape: {amplitude_data.shape}")
|
|
print(f" Phase shape: {phase_data.shape}")
|
|
print(f" Amplitude range: [{amplitude_data.min():.2f}, {amplitude_data.max():.2f}]")
|
|
print(f" Phase range: [{phase_data.min():.2f}, {phase_data.max():.2f}]")
|
|
|
|
# Process phase data
|
|
sanitized_phase = phase_processor.sanitize_phase(phase_data)
|
|
print(f"Sanitized phase range: [{sanitized_phase.min():.2f}, {sanitized_phase.max():.2f}]")
|
|
|
|
# Generate ground truth
|
|
ground_truth = data_simulator.generate_ground_truth_poses(num_people=2)
|
|
print(f"\nGenerated ground truth for {ground_truth['num_people']} people")
|
|
for i, pose in enumerate(ground_truth['poses']):
|
|
bbox = pose['bbox']
|
|
print(f" Person {i+1}: bbox=[{bbox[0]:.1f}, {bbox[1]:.1f}, {bbox[2]:.1f}, {bbox[3]:.1f}]")
|
|
|
|
print("\nCSI processing demonstration completed!") |