""" Router interface for WiFi CSI data collection """ import logging import asyncio import time from typing import Dict, List, Optional, Any from datetime import datetime import numpy as np logger = logging.getLogger(__name__) class RouterInterface: """Interface for connecting to WiFi routers and collecting CSI data.""" def __init__( self, router_id: str, host: str, port: int = 22, username: str = "admin", password: str = "", interface: str = "wlan0", mock_mode: bool = False ): """Initialize router interface. Args: router_id: Unique identifier for the router host: Router IP address or hostname port: SSH port for connection username: SSH username password: SSH password interface: WiFi interface name mock_mode: Whether to use mock data instead of real connection """ self.router_id = router_id self.host = host self.port = port self.username = username self.password = password self.interface = interface self.mock_mode = mock_mode self.logger = logging.getLogger(f"{__name__}.{router_id}") # Connection state self.is_connected = False self.connection = None self.last_error = None # Data collection state self.last_data_time = None self.error_count = 0 self.sample_count = 0 # Mock data generation self.mock_data_generator = None if mock_mode: self._initialize_mock_generator() def _initialize_mock_generator(self): """Initialize mock data generator.""" self.mock_data_generator = { 'phase': 0, 'amplitude_base': 1.0, 'frequency': 0.1, 'noise_level': 0.1 } async def connect(self): """Connect to the router.""" if self.mock_mode: self.is_connected = True self.logger.info(f"Mock connection established to router {self.router_id}") return try: self.logger.info(f"Connecting to router {self.router_id} at {self.host}:{self.port}") # In a real implementation, this would establish SSH connection # For now, we'll simulate the connection await asyncio.sleep(0.1) # Simulate connection delay self.is_connected = True self.error_count = 0 self.logger.info(f"Connected to router {self.router_id}") except Exception as e: self.last_error = str(e) self.error_count += 1 self.logger.error(f"Failed to connect to router {self.router_id}: {e}") raise async def disconnect(self): """Disconnect from the router.""" try: if self.connection: # Close SSH connection self.connection = None self.is_connected = False self.logger.info(f"Disconnected from router {self.router_id}") except Exception as e: self.logger.error(f"Error disconnecting from router {self.router_id}: {e}") async def reconnect(self): """Reconnect to the router.""" await self.disconnect() await asyncio.sleep(1) # Wait before reconnecting await self.connect() async def get_csi_data(self) -> Optional[np.ndarray]: """Get CSI data from the router. Returns: CSI data as numpy array, or None if no data available """ if not self.is_connected: raise RuntimeError(f"Router {self.router_id} is not connected") try: if self.mock_mode: csi_data = self._generate_mock_csi_data() else: csi_data = await self._collect_real_csi_data() if csi_data is not None: self.last_data_time = datetime.now() self.sample_count += 1 self.error_count = 0 return csi_data except Exception as e: self.last_error = str(e) self.error_count += 1 self.logger.error(f"Error getting CSI data from router {self.router_id}: {e}") return None def _generate_mock_csi_data(self) -> np.ndarray: """Generate mock CSI data for testing.""" # Simulate CSI data with realistic characteristics num_subcarriers = 64 num_antennas = 4 num_samples = 100 # Update mock generator state self.mock_data_generator['phase'] += self.mock_data_generator['frequency'] # Generate amplitude and phase data time_axis = np.linspace(0, 1, num_samples) # Create realistic CSI patterns csi_data = np.zeros((num_antennas, num_subcarriers, num_samples), dtype=complex) for antenna in range(num_antennas): for subcarrier in range(num_subcarriers): # Base signal with some variation per antenna/subcarrier amplitude = ( self.mock_data_generator['amplitude_base'] * (1 + 0.2 * np.sin(2 * np.pi * subcarrier / num_subcarriers)) * (1 + 0.1 * antenna) ) # Phase with spatial and frequency variation phase_offset = ( self.mock_data_generator['phase'] + 2 * np.pi * subcarrier / num_subcarriers + np.pi * antenna / num_antennas ) # Add some movement simulation movement_freq = 0.5 # Hz movement_amplitude = 0.3 movement = movement_amplitude * np.sin(2 * np.pi * movement_freq * time_axis) # Generate complex signal signal_amplitude = amplitude * (1 + movement) signal_phase = phase_offset + movement * 0.5 # Add noise noise_real = np.random.normal(0, self.mock_data_generator['noise_level'], num_samples) noise_imag = np.random.normal(0, self.mock_data_generator['noise_level'], num_samples) noise = noise_real + 1j * noise_imag # Create complex signal signal = signal_amplitude * np.exp(1j * signal_phase) + noise csi_data[antenna, subcarrier, :] = signal return csi_data async def _collect_real_csi_data(self) -> Optional[np.ndarray]: """Collect real CSI data from router (placeholder implementation).""" # This would implement the actual CSI data collection # For now, return None to indicate no real implementation self.logger.warning("Real CSI data collection not implemented") return None async def check_health(self) -> bool: """Check if the router connection is healthy. Returns: True if healthy, False otherwise """ if not self.is_connected: return False try: # In mock mode, always healthy if self.mock_mode: return True # For real connections, we could ping the router or check SSH connection # For now, consider healthy if error count is low return self.error_count < 5 except Exception as e: self.logger.error(f"Error checking health of router {self.router_id}: {e}") return False async def get_status(self) -> Dict[str, Any]: """Get router status information. Returns: Dictionary containing router status """ return { "router_id": self.router_id, "connected": self.is_connected, "mock_mode": self.mock_mode, "last_data_time": self.last_data_time.isoformat() if self.last_data_time else None, "error_count": self.error_count, "sample_count": self.sample_count, "last_error": self.last_error, "configuration": { "host": self.host, "port": self.port, "username": self.username, "interface": self.interface } } async def get_router_info(self) -> Dict[str, Any]: """Get router hardware information. Returns: Dictionary containing router information """ if self.mock_mode: return { "model": "Mock Router", "firmware": "1.0.0-mock", "wifi_standard": "802.11ac", "antennas": 4, "supported_bands": ["2.4GHz", "5GHz"], "csi_capabilities": { "max_subcarriers": 64, "max_antennas": 4, "sampling_rate": 1000 } } # For real routers, this would query the actual hardware return { "model": "Unknown", "firmware": "Unknown", "wifi_standard": "Unknown", "antennas": 1, "supported_bands": ["Unknown"], "csi_capabilities": { "max_subcarriers": 64, "max_antennas": 1, "sampling_rate": 100 } } async def configure_csi_collection(self, config: Dict[str, Any]) -> bool: """Configure CSI data collection parameters. Args: config: Configuration dictionary Returns: True if configuration successful, False otherwise """ try: if self.mock_mode: # Update mock generator parameters if 'sampling_rate' in config: self.mock_data_generator['frequency'] = config['sampling_rate'] / 1000.0 if 'noise_level' in config: self.mock_data_generator['noise_level'] = config['noise_level'] self.logger.info(f"Mock CSI collection configured for router {self.router_id}") return True # For real routers, this would send configuration commands self.logger.warning("Real CSI configuration not implemented") return False except Exception as e: self.logger.error(f"Error configuring CSI collection for router {self.router_id}: {e}") return False def get_metrics(self) -> Dict[str, Any]: """Get router interface metrics. Returns: Dictionary containing metrics """ uptime = 0 if self.last_data_time: uptime = (datetime.now() - self.last_data_time).total_seconds() success_rate = 0 if self.sample_count > 0: success_rate = (self.sample_count - self.error_count) / self.sample_count return { "router_id": self.router_id, "sample_count": self.sample_count, "error_count": self.error_count, "success_rate": success_rate, "uptime_seconds": uptime, "is_connected": self.is_connected, "mock_mode": self.mock_mode } def reset_stats(self): """Reset statistics counters.""" self.error_count = 0 self.sample_count = 0 self.last_error = None self.logger.info(f"Statistics reset for router {self.router_id}")