# WiFi DensePose Implementation - Core Neural Network Architecture # Based on "DensePose From WiFi" by Carnegie Mellon University import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import math from typing import Dict, List, Tuple, Optional from collections import OrderedDict # CSI Phase Sanitization Module class CSIPhaseProcessor: """ Processes raw CSI phase data through unwrapping, filtering, and linear fitting Based on the phase sanitization methodology from the paper """ def __init__(self, num_subcarriers: int = 30): self.num_subcarriers = num_subcarriers def unwrap_phase(self, phase_data: np.ndarray) -> np.ndarray: """ Unwrap phase values to handle discontinuities Args: phase_data: Raw phase data of shape (samples, frequencies, antennas, antennas) Returns: Unwrapped phase data """ unwrapped = np.copy(phase_data) for i in range(1, phase_data.shape[1]): # Along frequency dimension diff = unwrapped[:, i] - unwrapped[:, i-1] # Apply unwrapping logic unwrapped[:, i] = np.where(diff > np.pi, unwrapped[:, i-1] + diff - 2*np.pi, unwrapped[:, i]) unwrapped[:, i] = np.where(diff < -np.pi, unwrapped[:, i-1] + diff + 2*np.pi, unwrapped[:, i]) return unwrapped def apply_filters(self, phase_data: np.ndarray) -> np.ndarray: """ Apply median and uniform filters to eliminate outliers """ from scipy.ndimage import median_filter, uniform_filter # Apply median filter in time domain filtered = median_filter(phase_data, size=(3, 1, 1, 1)) # Apply uniform filter in frequency domain filtered = uniform_filter(filtered, size=(1, 3, 1, 1)) return filtered def linear_fitting(self, phase_data: np.ndarray) -> np.ndarray: """ Apply linear fitting to remove systematic phase drift """ fitted_data = np.copy(phase_data) F = self.num_subcarriers for sample_idx in range(phase_data.shape[0]): for ant_i in range(phase_data.shape[2]): for ant_j in range(phase_data.shape[3]): phase_seq = phase_data[sample_idx, :, ant_i, ant_j] # Calculate linear coefficients alpha1 = (phase_seq[-1] - phase_seq[0]) / (2 * np.pi * F) alpha0 = np.mean(phase_seq) # Apply linear fitting frequencies = np.arange(1, F + 1) linear_trend = alpha1 * frequencies + alpha0 fitted_data[sample_idx, :, ant_i, ant_j] = phase_seq - linear_trend return fitted_data def sanitize_phase(self, raw_phase: np.ndarray) -> np.ndarray: """ Complete phase sanitization pipeline """ # Step 1: Unwrap phase unwrapped = self.unwrap_phase(raw_phase) # Step 2: Apply filters filtered = self.apply_filters(unwrapped) # Step 3: Linear fitting sanitized = self.linear_fitting(filtered) return sanitized print("CSI Phase Processor implementation completed!")