419 lines
16 KiB
Python
419 lines
16 KiB
Python
"""
|
|
Integration tests for WebSocket streaming functionality.
|
|
|
|
Tests WebSocket connections, message handling, and real-time data streaming.
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Dict, Any, List
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import websockets
|
|
from fastapi import FastAPI, WebSocket
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
class MockWebSocket:
|
|
"""Mock WebSocket for testing."""
|
|
|
|
def __init__(self):
|
|
self.messages_sent = []
|
|
self.messages_received = []
|
|
self.closed = False
|
|
self.accept_called = False
|
|
|
|
async def accept(self):
|
|
"""Mock accept method."""
|
|
self.accept_called = True
|
|
|
|
async def send_json(self, data: Dict[str, Any]):
|
|
"""Mock send_json method."""
|
|
self.messages_sent.append(data)
|
|
|
|
async def send_text(self, text: str):
|
|
"""Mock send_text method."""
|
|
self.messages_sent.append(text)
|
|
|
|
async def receive_text(self) -> str:
|
|
"""Mock receive_text method."""
|
|
if self.messages_received:
|
|
return self.messages_received.pop(0)
|
|
# Simulate WebSocket disconnect
|
|
from fastapi import WebSocketDisconnect
|
|
raise WebSocketDisconnect()
|
|
|
|
async def close(self):
|
|
"""Mock close method."""
|
|
self.closed = True
|
|
|
|
def add_received_message(self, message: str):
|
|
"""Add a message to be received."""
|
|
self.messages_received.append(message)
|
|
|
|
|
|
class TestWebSocketStreaming:
|
|
"""Integration tests for WebSocket streaming."""
|
|
|
|
@pytest.fixture
|
|
def mock_websocket(self):
|
|
"""Create mock WebSocket."""
|
|
return MockWebSocket()
|
|
|
|
@pytest.fixture
|
|
def mock_connection_manager(self):
|
|
"""Mock connection manager."""
|
|
manager = AsyncMock()
|
|
manager.connect.return_value = "client-001"
|
|
manager.disconnect.return_value = True
|
|
manager.get_connection_stats.return_value = {
|
|
"total_clients": 1,
|
|
"active_streams": ["pose"]
|
|
}
|
|
manager.broadcast.return_value = 1
|
|
return manager
|
|
|
|
@pytest.fixture
|
|
def mock_stream_service(self):
|
|
"""Mock stream service."""
|
|
service = AsyncMock()
|
|
service.get_status.return_value = {
|
|
"is_active": True,
|
|
"active_streams": [],
|
|
"uptime_seconds": 3600.0
|
|
}
|
|
service.is_active.return_value = True
|
|
service.start.return_value = None
|
|
service.stop.return_value = None
|
|
return service
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_pose_connection_should_fail_initially(self, mock_websocket, mock_connection_manager):
|
|
"""Test WebSocket pose connection establishment - should fail initially."""
|
|
# This test should fail because we haven't implemented the WebSocket handler properly
|
|
|
|
# Simulate WebSocket connection
|
|
zone_ids = "zone1,zone2"
|
|
min_confidence = 0.7
|
|
max_fps = 30
|
|
|
|
# Mock the websocket_pose_stream function
|
|
async def mock_websocket_handler(websocket, zone_ids, min_confidence, max_fps):
|
|
await websocket.accept()
|
|
|
|
# Parse zone IDs
|
|
zone_list = [zone.strip() for zone in zone_ids.split(",") if zone.strip()]
|
|
|
|
# Register client
|
|
client_id = await mock_connection_manager.connect(
|
|
websocket=websocket,
|
|
stream_type="pose",
|
|
zone_ids=zone_list,
|
|
min_confidence=min_confidence,
|
|
max_fps=max_fps
|
|
)
|
|
|
|
# Send confirmation
|
|
await websocket.send_json({
|
|
"type": "connection_established",
|
|
"client_id": client_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"config": {
|
|
"zone_ids": zone_list,
|
|
"min_confidence": min_confidence,
|
|
"max_fps": max_fps
|
|
}
|
|
})
|
|
|
|
return client_id
|
|
|
|
# Execute the handler
|
|
client_id = await mock_websocket_handler(mock_websocket, zone_ids, min_confidence, max_fps)
|
|
|
|
# This assertion will fail initially, driving us to implement the WebSocket handler
|
|
assert mock_websocket.accept_called
|
|
assert len(mock_websocket.messages_sent) == 1
|
|
assert mock_websocket.messages_sent[0]["type"] == "connection_established"
|
|
assert mock_websocket.messages_sent[0]["client_id"] == "client-001"
|
|
assert "config" in mock_websocket.messages_sent[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_message_handling_should_fail_initially(self, mock_websocket):
|
|
"""Test WebSocket message handling - should fail initially."""
|
|
# Mock message handler
|
|
async def handle_websocket_message(client_id: str, data: Dict[str, Any], websocket):
|
|
message_type = data.get("type")
|
|
|
|
if message_type == "ping":
|
|
await websocket.send_json({
|
|
"type": "pong",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
})
|
|
elif message_type == "update_config":
|
|
config = data.get("config", {})
|
|
await websocket.send_json({
|
|
"type": "config_updated",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"config": config
|
|
})
|
|
else:
|
|
await websocket.send_json({
|
|
"type": "error",
|
|
"message": f"Unknown message type: {message_type}"
|
|
})
|
|
|
|
# Test ping message
|
|
ping_data = {"type": "ping"}
|
|
await handle_websocket_message("client-001", ping_data, mock_websocket)
|
|
|
|
# This will fail initially
|
|
assert len(mock_websocket.messages_sent) == 1
|
|
assert mock_websocket.messages_sent[0]["type"] == "pong"
|
|
|
|
# Test config update
|
|
mock_websocket.messages_sent.clear()
|
|
config_data = {
|
|
"type": "update_config",
|
|
"config": {"min_confidence": 0.8, "max_fps": 15}
|
|
}
|
|
await handle_websocket_message("client-001", config_data, mock_websocket)
|
|
|
|
# This will fail initially
|
|
assert len(mock_websocket.messages_sent) == 1
|
|
assert mock_websocket.messages_sent[0]["type"] == "config_updated"
|
|
assert mock_websocket.messages_sent[0]["config"]["min_confidence"] == 0.8
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_events_stream_should_fail_initially(self, mock_websocket, mock_connection_manager):
|
|
"""Test WebSocket events stream - should fail initially."""
|
|
# Mock events stream handler
|
|
async def mock_events_handler(websocket, event_types, zone_ids):
|
|
await websocket.accept()
|
|
|
|
# Parse parameters
|
|
event_list = [event.strip() for event in event_types.split(",") if event.strip()] if event_types else None
|
|
zone_list = [zone.strip() for zone in zone_ids.split(",") if zone.strip()] if zone_ids else None
|
|
|
|
# Register client
|
|
client_id = await mock_connection_manager.connect(
|
|
websocket=websocket,
|
|
stream_type="events",
|
|
zone_ids=zone_list,
|
|
event_types=event_list
|
|
)
|
|
|
|
# Send confirmation
|
|
await websocket.send_json({
|
|
"type": "connection_established",
|
|
"client_id": client_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"config": {
|
|
"event_types": event_list,
|
|
"zone_ids": zone_list
|
|
}
|
|
})
|
|
|
|
return client_id
|
|
|
|
# Execute handler
|
|
client_id = await mock_events_handler(mock_websocket, "fall_detection,intrusion", "zone1")
|
|
|
|
# This will fail initially
|
|
assert mock_websocket.accept_called
|
|
assert len(mock_websocket.messages_sent) == 1
|
|
assert mock_websocket.messages_sent[0]["type"] == "connection_established"
|
|
assert mock_websocket.messages_sent[0]["config"]["event_types"] == ["fall_detection", "intrusion"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_disconnect_handling_should_fail_initially(self, mock_websocket, mock_connection_manager):
|
|
"""Test WebSocket disconnect handling - should fail initially."""
|
|
# Mock disconnect scenario
|
|
client_id = "client-001"
|
|
|
|
# Simulate disconnect
|
|
disconnect_result = await mock_connection_manager.disconnect(client_id)
|
|
|
|
# This will fail initially
|
|
assert disconnect_result is True
|
|
mock_connection_manager.disconnect.assert_called_once_with(client_id)
|
|
|
|
|
|
class TestWebSocketConnectionManager:
|
|
"""Test WebSocket connection management."""
|
|
|
|
@pytest.fixture
|
|
def connection_manager(self):
|
|
"""Create connection manager for testing."""
|
|
# Mock connection manager implementation
|
|
class MockConnectionManager:
|
|
def __init__(self):
|
|
self.connections = {}
|
|
self.client_counter = 0
|
|
|
|
async def connect(self, websocket, stream_type, zone_ids=None, **kwargs):
|
|
self.client_counter += 1
|
|
client_id = f"client-{self.client_counter:03d}"
|
|
self.connections[client_id] = {
|
|
"websocket": websocket,
|
|
"stream_type": stream_type,
|
|
"zone_ids": zone_ids or [],
|
|
"connected_at": datetime.utcnow(),
|
|
**kwargs
|
|
}
|
|
return client_id
|
|
|
|
async def disconnect(self, client_id):
|
|
if client_id in self.connections:
|
|
del self.connections[client_id]
|
|
return True
|
|
return False
|
|
|
|
async def get_connected_clients(self):
|
|
return list(self.connections.keys())
|
|
|
|
async def get_connection_stats(self):
|
|
return {
|
|
"total_clients": len(self.connections),
|
|
"active_streams": list(set(conn["stream_type"] for conn in self.connections.values()))
|
|
}
|
|
|
|
async def broadcast(self, data, stream_type=None, zone_ids=None):
|
|
sent_count = 0
|
|
for client_id, conn in self.connections.items():
|
|
if stream_type and conn["stream_type"] != stream_type:
|
|
continue
|
|
if zone_ids and not any(zone in conn["zone_ids"] for zone in zone_ids):
|
|
continue
|
|
|
|
# Mock sending data
|
|
sent_count += 1
|
|
|
|
return sent_count
|
|
|
|
return MockConnectionManager()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connection_manager_connect_should_fail_initially(self, connection_manager, mock_websocket):
|
|
"""Test connection manager connect functionality - should fail initially."""
|
|
client_id = await connection_manager.connect(
|
|
websocket=mock_websocket,
|
|
stream_type="pose",
|
|
zone_ids=["zone1", "zone2"],
|
|
min_confidence=0.7
|
|
)
|
|
|
|
# This will fail initially
|
|
assert client_id == "client-001"
|
|
assert client_id in connection_manager.connections
|
|
assert connection_manager.connections[client_id]["stream_type"] == "pose"
|
|
assert connection_manager.connections[client_id]["zone_ids"] == ["zone1", "zone2"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connection_manager_disconnect_should_fail_initially(self, connection_manager, mock_websocket):
|
|
"""Test connection manager disconnect functionality - should fail initially."""
|
|
# Connect first
|
|
client_id = await connection_manager.connect(
|
|
websocket=mock_websocket,
|
|
stream_type="pose"
|
|
)
|
|
|
|
# Disconnect
|
|
result = await connection_manager.disconnect(client_id)
|
|
|
|
# This will fail initially
|
|
assert result is True
|
|
assert client_id not in connection_manager.connections
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connection_manager_broadcast_should_fail_initially(self, connection_manager):
|
|
"""Test connection manager broadcast functionality - should fail initially."""
|
|
# Connect multiple clients
|
|
ws1 = MockWebSocket()
|
|
ws2 = MockWebSocket()
|
|
|
|
client1 = await connection_manager.connect(ws1, "pose", zone_ids=["zone1"])
|
|
client2 = await connection_manager.connect(ws2, "events", zone_ids=["zone2"])
|
|
|
|
# Broadcast to pose stream
|
|
sent_count = await connection_manager.broadcast(
|
|
data={"type": "pose_data", "data": {}},
|
|
stream_type="pose"
|
|
)
|
|
|
|
# This will fail initially
|
|
assert sent_count == 1
|
|
|
|
# Broadcast to specific zone
|
|
sent_count = await connection_manager.broadcast(
|
|
data={"type": "zone_event", "data": {}},
|
|
zone_ids=["zone1"]
|
|
)
|
|
|
|
# This will fail initially
|
|
assert sent_count == 1
|
|
|
|
|
|
class TestWebSocketPerformance:
|
|
"""Test WebSocket performance characteristics."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_concurrent_connections_should_fail_initially(self):
|
|
"""Test handling multiple concurrent WebSocket connections - should fail initially."""
|
|
# Mock multiple connections
|
|
connection_count = 10
|
|
connections = []
|
|
|
|
for i in range(connection_count):
|
|
mock_ws = MockWebSocket()
|
|
connections.append(mock_ws)
|
|
|
|
# Simulate concurrent connections
|
|
async def simulate_connection(websocket, client_id):
|
|
await websocket.accept()
|
|
await websocket.send_json({
|
|
"type": "connection_established",
|
|
"client_id": client_id
|
|
})
|
|
return True
|
|
|
|
# Execute concurrent connections
|
|
tasks = [
|
|
simulate_connection(ws, f"client-{i:03d}")
|
|
for i, ws in enumerate(connections)
|
|
]
|
|
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
# This will fail initially
|
|
assert len(results) == connection_count
|
|
assert all(results)
|
|
assert all(ws.accept_called for ws in connections)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_message_throughput_should_fail_initially(self):
|
|
"""Test WebSocket message throughput - should fail initially."""
|
|
mock_ws = MockWebSocket()
|
|
message_count = 100
|
|
|
|
# Simulate high-frequency message sending
|
|
start_time = datetime.utcnow()
|
|
|
|
for i in range(message_count):
|
|
await mock_ws.send_json({
|
|
"type": "pose_data",
|
|
"frame_id": f"frame-{i:04d}",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
})
|
|
|
|
end_time = datetime.utcnow()
|
|
duration = (end_time - start_time).total_seconds()
|
|
|
|
# This will fail initially
|
|
assert len(mock_ws.messages_sent) == message_count
|
|
assert duration < 1.0 # Should handle 100 messages in under 1 second
|
|
|
|
# Calculate throughput
|
|
throughput = message_count / duration if duration > 0 else float('inf')
|
|
assert throughput > 100 # Should handle at least 100 messages per second |