- Implemented the WiFi DensePose model in PyTorch, including CSI phase processing, modality translation, and DensePose prediction heads. - Added a comprehensive training utility for the model, including loss functions and training steps. - Created a CSV file to document hardware specifications, architecture details, training parameters, performance metrics, and advantages of the model.
202 lines
7.5 KiB
Python
202 lines
7.5 KiB
Python
# WiFi DensePose Implementation - Core Neural Network Architecture
|
|
# Based on "DensePose From WiFi" by Carnegie Mellon University
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
import numpy as np
|
|
import math
|
|
from typing import Dict, List, Tuple, Optional
|
|
from collections import OrderedDict
|
|
|
|
# CSI Phase Sanitization Module
|
|
class CSIPhaseProcessor:
|
|
"""
|
|
Processes raw CSI phase data through unwrapping, filtering, and linear fitting
|
|
Based on the phase sanitization methodology from the paper
|
|
"""
|
|
|
|
def __init__(self, num_subcarriers: int = 30):
|
|
self.num_subcarriers = num_subcarriers
|
|
|
|
def unwrap_phase(self, phase_data: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Unwrap phase values to handle discontinuities
|
|
Args:
|
|
phase_data: Raw phase data of shape (samples, frequencies, antennas, antennas)
|
|
Returns:
|
|
Unwrapped phase data
|
|
"""
|
|
unwrapped = np.copy(phase_data)
|
|
|
|
for i in range(1, phase_data.shape[1]): # Along frequency dimension
|
|
diff = unwrapped[:, i] - unwrapped[:, i-1]
|
|
|
|
# Apply unwrapping logic
|
|
unwrapped[:, i] = np.where(diff > np.pi,
|
|
unwrapped[:, i-1] + diff - 2*np.pi,
|
|
unwrapped[:, i])
|
|
unwrapped[:, i] = np.where(diff < -np.pi,
|
|
unwrapped[:, i-1] + diff + 2*np.pi,
|
|
unwrapped[:, i])
|
|
|
|
return unwrapped
|
|
|
|
def apply_filters(self, phase_data: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Apply median and uniform filters to eliminate outliers
|
|
"""
|
|
# Simple moving average as approximation for filters
|
|
filtered = np.copy(phase_data)
|
|
|
|
# Apply simple smoothing in time dimension
|
|
for i in range(1, phase_data.shape[0]-1):
|
|
filtered[i] = (phase_data[i-1] + phase_data[i] + phase_data[i+1]) / 3
|
|
|
|
# Apply smoothing in frequency dimension
|
|
for i in range(1, phase_data.shape[1]-1):
|
|
filtered[:, i] = (filtered[:, i-1] + filtered[:, i] + filtered[:, i+1]) / 3
|
|
|
|
return filtered
|
|
|
|
def linear_fitting(self, phase_data: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Apply linear fitting to remove systematic phase drift
|
|
"""
|
|
fitted_data = np.copy(phase_data)
|
|
F = self.num_subcarriers
|
|
|
|
for sample_idx in range(phase_data.shape[0]):
|
|
for ant_i in range(phase_data.shape[2]):
|
|
for ant_j in range(phase_data.shape[3]):
|
|
phase_seq = phase_data[sample_idx, :, ant_i, ant_j]
|
|
|
|
# Calculate linear coefficients
|
|
alpha1 = (phase_seq[-1] - phase_seq[0]) / (2 * np.pi * F)
|
|
alpha0 = np.mean(phase_seq)
|
|
|
|
# Apply linear fitting
|
|
frequencies = np.arange(1, F + 1)
|
|
linear_trend = alpha1 * frequencies + alpha0
|
|
fitted_data[sample_idx, :, ant_i, ant_j] = phase_seq - linear_trend
|
|
|
|
return fitted_data
|
|
|
|
def sanitize_phase(self, raw_phase: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Complete phase sanitization pipeline
|
|
"""
|
|
# Step 1: Unwrap phase
|
|
unwrapped = self.unwrap_phase(raw_phase)
|
|
|
|
# Step 2: Apply filters
|
|
filtered = self.apply_filters(unwrapped)
|
|
|
|
# Step 3: Linear fitting
|
|
sanitized = self.linear_fitting(filtered)
|
|
|
|
return sanitized
|
|
|
|
# Modality Translation Network
|
|
class ModalityTranslationNetwork(nn.Module):
|
|
"""
|
|
Translates CSI domain features to spatial domain features
|
|
Input: 150x3x3 amplitude and phase tensors
|
|
Output: 3x720x1280 feature map
|
|
"""
|
|
|
|
def __init__(self, input_dim: int = 1350, hidden_dim: int = 512, output_height: int = 720, output_width: int = 1280):
|
|
super(ModalityTranslationNetwork, self).__init__()
|
|
|
|
self.input_dim = input_dim
|
|
self.output_height = output_height
|
|
self.output_width = output_width
|
|
|
|
# Amplitude encoder
|
|
self.amplitude_encoder = nn.Sequential(
|
|
nn.Linear(input_dim, hidden_dim),
|
|
nn.ReLU(),
|
|
nn.Linear(hidden_dim, hidden_dim//2),
|
|
nn.ReLU(),
|
|
nn.Linear(hidden_dim//2, hidden_dim//4),
|
|
nn.ReLU()
|
|
)
|
|
|
|
# Phase encoder
|
|
self.phase_encoder = nn.Sequential(
|
|
nn.Linear(input_dim, hidden_dim),
|
|
nn.ReLU(),
|
|
nn.Linear(hidden_dim, hidden_dim//2),
|
|
nn.ReLU(),
|
|
nn.Linear(hidden_dim//2, hidden_dim//4),
|
|
nn.ReLU()
|
|
)
|
|
|
|
# Feature fusion
|
|
self.fusion_mlp = nn.Sequential(
|
|
nn.Linear(hidden_dim//2, hidden_dim//4),
|
|
nn.ReLU(),
|
|
nn.Linear(hidden_dim//4, 24*24), # Reshape to 24x24
|
|
nn.ReLU()
|
|
)
|
|
|
|
# Spatial processing
|
|
self.spatial_conv = nn.Sequential(
|
|
nn.Conv2d(1, 64, kernel_size=3, padding=1),
|
|
nn.ReLU(),
|
|
nn.Conv2d(64, 128, kernel_size=3, padding=1),
|
|
nn.ReLU(),
|
|
nn.AdaptiveAvgPool2d((6, 6)) # Compress to 6x6
|
|
)
|
|
|
|
# Upsampling to target resolution
|
|
self.upsample = nn.Sequential(
|
|
nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1), # 12x12
|
|
nn.ReLU(),
|
|
nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1), # 24x24
|
|
nn.ReLU(),
|
|
nn.ConvTranspose2d(32, 16, kernel_size=4, stride=2, padding=1), # 48x48
|
|
nn.ReLU(),
|
|
nn.ConvTranspose2d(16, 8, kernel_size=4, stride=2, padding=1), # 96x96
|
|
nn.ReLU(),
|
|
)
|
|
|
|
# Final upsampling to target size
|
|
self.final_upsample = nn.ConvTranspose2d(8, 3, kernel_size=1)
|
|
|
|
def forward(self, amplitude_tensor: torch.Tensor, phase_tensor: torch.Tensor) -> torch.Tensor:
|
|
batch_size = amplitude_tensor.shape[0]
|
|
|
|
# Flatten input tensors
|
|
amplitude_flat = amplitude_tensor.view(batch_size, -1) # [B, 1350]
|
|
phase_flat = phase_tensor.view(batch_size, -1) # [B, 1350]
|
|
|
|
# Encode features
|
|
amp_features = self.amplitude_encoder(amplitude_flat) # [B, 128]
|
|
phase_features = self.phase_encoder(phase_flat) # [B, 128]
|
|
|
|
# Fuse features
|
|
fused_features = torch.cat([amp_features, phase_features], dim=1) # [B, 256]
|
|
spatial_features = self.fusion_mlp(fused_features) # [B, 576]
|
|
|
|
# Reshape to 2D feature map
|
|
spatial_map = spatial_features.view(batch_size, 1, 24, 24) # [B, 1, 24, 24]
|
|
|
|
# Apply spatial convolutions
|
|
conv_features = self.spatial_conv(spatial_map) # [B, 128, 6, 6]
|
|
|
|
# Upsample
|
|
upsampled = self.upsample(conv_features) # [B, 8, 96, 96]
|
|
|
|
# Final upsampling using interpolation to reach target size
|
|
final_features = self.final_upsample(upsampled) # [B, 3, 96, 96]
|
|
|
|
# Interpolate to target resolution
|
|
output = F.interpolate(final_features, size=(self.output_height, self.output_width),
|
|
mode='bilinear', align_corners=False)
|
|
|
|
return output
|
|
|
|
print("Modality Translation Network implementation completed!")
|
|
print("Input: 150x3x3 amplitude and phase tensors")
|
|
print("Output: 3x720x1280 feature map") |