# WiFi DensePose Implementation - Core Neural Network Architecture # Based on "DensePose From WiFi" by Carnegie Mellon University import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import math from typing import Dict, List, Tuple, Optional from collections import OrderedDict # CSI Phase Sanitization Module class CSIPhaseProcessor: """ Processes raw CSI phase data through unwrapping, filtering, and linear fitting Based on the phase sanitization methodology from the paper """ def __init__(self, num_subcarriers: int = 30): self.num_subcarriers = num_subcarriers def unwrap_phase(self, phase_data: np.ndarray) -> np.ndarray: """ Unwrap phase values to handle discontinuities Args: phase_data: Raw phase data of shape (samples, frequencies, antennas, antennas) Returns: Unwrapped phase data """ unwrapped = np.copy(phase_data) for i in range(1, phase_data.shape[1]): # Along frequency dimension diff = unwrapped[:, i] - unwrapped[:, i-1] # Apply unwrapping logic unwrapped[:, i] = np.where(diff > np.pi, unwrapped[:, i-1] + diff - 2*np.pi, unwrapped[:, i]) unwrapped[:, i] = np.where(diff < -np.pi, unwrapped[:, i-1] + diff + 2*np.pi, unwrapped[:, i]) return unwrapped def apply_filters(self, phase_data: np.ndarray) -> np.ndarray: """ Apply median and uniform filters to eliminate outliers """ # Simple moving average as approximation for filters filtered = np.copy(phase_data) # Apply simple smoothing in time dimension for i in range(1, phase_data.shape[0]-1): filtered[i] = (phase_data[i-1] + phase_data[i] + phase_data[i+1]) / 3 # Apply smoothing in frequency dimension for i in range(1, phase_data.shape[1]-1): filtered[:, i] = (filtered[:, i-1] + filtered[:, i] + filtered[:, i+1]) / 3 return filtered def linear_fitting(self, phase_data: np.ndarray) -> np.ndarray: """ Apply linear fitting to remove systematic phase drift """ fitted_data = np.copy(phase_data) F = self.num_subcarriers for sample_idx in range(phase_data.shape[0]): for ant_i in range(phase_data.shape[2]): for ant_j in range(phase_data.shape[3]): phase_seq = phase_data[sample_idx, :, ant_i, ant_j] # Calculate linear coefficients alpha1 = (phase_seq[-1] - phase_seq[0]) / (2 * np.pi * F) alpha0 = np.mean(phase_seq) # Apply linear fitting frequencies = np.arange(1, F + 1) linear_trend = alpha1 * frequencies + alpha0 fitted_data[sample_idx, :, ant_i, ant_j] = phase_seq - linear_trend return fitted_data def sanitize_phase(self, raw_phase: np.ndarray) -> np.ndarray: """ Complete phase sanitization pipeline """ # Step 1: Unwrap phase unwrapped = self.unwrap_phase(raw_phase) # Step 2: Apply filters filtered = self.apply_filters(unwrapped) # Step 3: Linear fitting sanitized = self.linear_fitting(filtered) return sanitized # Modality Translation Network class ModalityTranslationNetwork(nn.Module): """ Translates CSI domain features to spatial domain features Input: 150x3x3 amplitude and phase tensors Output: 3x720x1280 feature map """ def __init__(self, input_dim: int = 1350, hidden_dim: int = 512, output_height: int = 720, output_width: int = 1280): super(ModalityTranslationNetwork, self).__init__() self.input_dim = input_dim self.output_height = output_height self.output_width = output_width # Amplitude encoder self.amplitude_encoder = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, hidden_dim//2), nn.ReLU(), nn.Linear(hidden_dim//2, hidden_dim//4), nn.ReLU() ) # Phase encoder self.phase_encoder = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, hidden_dim//2), nn.ReLU(), nn.Linear(hidden_dim//2, hidden_dim//4), nn.ReLU() ) # Feature fusion self.fusion_mlp = nn.Sequential( nn.Linear(hidden_dim//2, hidden_dim//4), nn.ReLU(), nn.Linear(hidden_dim//4, 24*24), # Reshape to 24x24 nn.ReLU() ) # Spatial processing self.spatial_conv = nn.Sequential( nn.Conv2d(1, 64, kernel_size=3, padding=1), nn.ReLU(), nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.ReLU(), nn.AdaptiveAvgPool2d((6, 6)) # Compress to 6x6 ) # Upsampling to target resolution self.upsample = nn.Sequential( nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1), # 12x12 nn.ReLU(), nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1), # 24x24 nn.ReLU(), nn.ConvTranspose2d(32, 16, kernel_size=4, stride=2, padding=1), # 48x48 nn.ReLU(), nn.ConvTranspose2d(16, 8, kernel_size=4, stride=2, padding=1), # 96x96 nn.ReLU(), ) # Final upsampling to target size self.final_upsample = nn.ConvTranspose2d(8, 3, kernel_size=1) def forward(self, amplitude_tensor: torch.Tensor, phase_tensor: torch.Tensor) -> torch.Tensor: batch_size = amplitude_tensor.shape[0] # Flatten input tensors amplitude_flat = amplitude_tensor.view(batch_size, -1) # [B, 1350] phase_flat = phase_tensor.view(batch_size, -1) # [B, 1350] # Encode features amp_features = self.amplitude_encoder(amplitude_flat) # [B, 128] phase_features = self.phase_encoder(phase_flat) # [B, 128] # Fuse features fused_features = torch.cat([amp_features, phase_features], dim=1) # [B, 256] spatial_features = self.fusion_mlp(fused_features) # [B, 576] # Reshape to 2D feature map spatial_map = spatial_features.view(batch_size, 1, 24, 24) # [B, 1, 24, 24] # Apply spatial convolutions conv_features = self.spatial_conv(spatial_map) # [B, 128, 6, 6] # Upsample upsampled = self.upsample(conv_features) # [B, 8, 96, 96] # Final upsampling using interpolation to reach target size final_features = self.final_upsample(upsampled) # [B, 3, 96, 96] # Interpolate to target resolution output = F.interpolate(final_features, size=(self.output_height, self.output_width), mode='bilinear', align_corners=False) return output print("Modality Translation Network implementation completed!") print("Input: 150x3x3 amplitude and phase tensors") print("Output: 3x720x1280 feature map")