Major changes: - Organized Python v1 implementation into v1/ subdirectory - Created Rust workspace with 9 modular crates: - wifi-densepose-core: Core types, traits, errors - wifi-densepose-signal: CSI processing, phase sanitization, FFT - wifi-densepose-nn: Neural network inference (ONNX/Candle/tch) - wifi-densepose-api: Axum-based REST/WebSocket API - wifi-densepose-db: SQLx database layer - wifi-densepose-config: Configuration management - wifi-densepose-hardware: Hardware abstraction - wifi-densepose-wasm: WebAssembly bindings - wifi-densepose-cli: Command-line interface Documentation: - ADR-001: Workspace structure - ADR-002: Signal processing library selection - ADR-003: Neural network inference strategy - DDD domain model with bounded contexts Testing: - 69 tests passing across all crates - Signal processing: 45 tests - Neural networks: 21 tests - Core: 3 doc tests Performance targets: - 10x faster CSI processing (~0.5ms vs ~5ms) - 5x lower memory usage (~100MB vs ~500MB) - WASM support for browser deployment
571 lines
22 KiB
Python
571 lines
22 KiB
Python
"""
|
|
Integration tests for authentication and authorization.
|
|
|
|
Tests JWT authentication flow, user permissions, and access control.
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, Optional
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
import jwt
|
|
import json
|
|
|
|
from fastapi import HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials
|
|
|
|
|
|
class MockJWTToken:
|
|
"""Mock JWT token for testing."""
|
|
|
|
def __init__(self, payload: Dict[str, Any], secret: str = "test-secret"):
|
|
self.payload = payload
|
|
self.secret = secret
|
|
self.token = jwt.encode(payload, secret, algorithm="HS256")
|
|
|
|
def decode(self, token: str, secret: str) -> Dict[str, Any]:
|
|
"""Decode JWT token."""
|
|
return jwt.decode(token, secret, algorithms=["HS256"])
|
|
|
|
|
|
class TestJWTAuthentication:
|
|
"""Test JWT authentication functionality."""
|
|
|
|
@pytest.fixture
|
|
def valid_user_payload(self):
|
|
"""Valid user payload for JWT token."""
|
|
return {
|
|
"sub": "user-001",
|
|
"username": "testuser",
|
|
"email": "test@example.com",
|
|
"is_admin": False,
|
|
"is_active": True,
|
|
"permissions": ["read", "write"],
|
|
"exp": datetime.utcnow() + timedelta(hours=1),
|
|
"iat": datetime.utcnow()
|
|
}
|
|
|
|
@pytest.fixture
|
|
def admin_user_payload(self):
|
|
"""Admin user payload for JWT token."""
|
|
return {
|
|
"sub": "admin-001",
|
|
"username": "admin",
|
|
"email": "admin@example.com",
|
|
"is_admin": True,
|
|
"is_active": True,
|
|
"permissions": ["read", "write", "admin"],
|
|
"exp": datetime.utcnow() + timedelta(hours=1),
|
|
"iat": datetime.utcnow()
|
|
}
|
|
|
|
@pytest.fixture
|
|
def expired_user_payload(self):
|
|
"""Expired user payload for JWT token."""
|
|
return {
|
|
"sub": "user-002",
|
|
"username": "expireduser",
|
|
"email": "expired@example.com",
|
|
"is_admin": False,
|
|
"is_active": True,
|
|
"permissions": ["read"],
|
|
"exp": datetime.utcnow() - timedelta(hours=1), # Expired
|
|
"iat": datetime.utcnow() - timedelta(hours=2)
|
|
}
|
|
|
|
@pytest.fixture
|
|
def mock_jwt_service(self):
|
|
"""Mock JWT service."""
|
|
class MockJWTService:
|
|
def __init__(self):
|
|
self.secret = "test-secret-key"
|
|
self.algorithm = "HS256"
|
|
|
|
def create_token(self, user_data: Dict[str, Any]) -> str:
|
|
"""Create JWT token."""
|
|
payload = {
|
|
**user_data,
|
|
"exp": datetime.utcnow() + timedelta(hours=1),
|
|
"iat": datetime.utcnow()
|
|
}
|
|
return jwt.encode(payload, self.secret, algorithm=self.algorithm)
|
|
|
|
def verify_token(self, token: str) -> Dict[str, Any]:
|
|
"""Verify JWT token."""
|
|
try:
|
|
payload = jwt.decode(token, self.secret, algorithms=[self.algorithm])
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token has expired"
|
|
)
|
|
except jwt.InvalidTokenError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid token"
|
|
)
|
|
|
|
def refresh_token(self, token: str) -> str:
|
|
"""Refresh JWT token."""
|
|
payload = self.verify_token(token)
|
|
# Remove exp and iat for new token
|
|
payload.pop("exp", None)
|
|
payload.pop("iat", None)
|
|
return self.create_token(payload)
|
|
|
|
return MockJWTService()
|
|
|
|
def test_jwt_token_creation_should_fail_initially(self, mock_jwt_service, valid_user_payload):
|
|
"""Test JWT token creation - should fail initially."""
|
|
token = mock_jwt_service.create_token(valid_user_payload)
|
|
|
|
# This will fail initially
|
|
assert isinstance(token, str)
|
|
assert len(token) > 0
|
|
|
|
# Verify token can be decoded
|
|
decoded = mock_jwt_service.verify_token(token)
|
|
assert decoded["sub"] == valid_user_payload["sub"]
|
|
assert decoded["username"] == valid_user_payload["username"]
|
|
|
|
def test_jwt_token_verification_should_fail_initially(self, mock_jwt_service, valid_user_payload):
|
|
"""Test JWT token verification - should fail initially."""
|
|
token = mock_jwt_service.create_token(valid_user_payload)
|
|
decoded = mock_jwt_service.verify_token(token)
|
|
|
|
# This will fail initially
|
|
assert decoded["sub"] == valid_user_payload["sub"]
|
|
assert decoded["is_admin"] == valid_user_payload["is_admin"]
|
|
assert "exp" in decoded
|
|
assert "iat" in decoded
|
|
|
|
def test_expired_token_rejection_should_fail_initially(self, mock_jwt_service, expired_user_payload):
|
|
"""Test expired token rejection - should fail initially."""
|
|
# Create token with expired payload
|
|
token = jwt.encode(expired_user_payload, mock_jwt_service.secret, algorithm=mock_jwt_service.algorithm)
|
|
|
|
# This should fail initially
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
mock_jwt_service.verify_token(token)
|
|
|
|
assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED
|
|
assert "expired" in exc_info.value.detail.lower()
|
|
|
|
def test_invalid_token_rejection_should_fail_initially(self, mock_jwt_service):
|
|
"""Test invalid token rejection - should fail initially."""
|
|
invalid_token = "invalid.jwt.token"
|
|
|
|
# This should fail initially
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
mock_jwt_service.verify_token(invalid_token)
|
|
|
|
assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED
|
|
assert "invalid" in exc_info.value.detail.lower()
|
|
|
|
def test_token_refresh_should_fail_initially(self, mock_jwt_service, valid_user_payload):
|
|
"""Test token refresh functionality - should fail initially."""
|
|
original_token = mock_jwt_service.create_token(valid_user_payload)
|
|
|
|
# Wait a moment to ensure different timestamps
|
|
import time
|
|
time.sleep(0.1)
|
|
|
|
refreshed_token = mock_jwt_service.refresh_token(original_token)
|
|
|
|
# This will fail initially
|
|
assert refreshed_token != original_token
|
|
|
|
# Verify both tokens are valid but have different timestamps
|
|
original_payload = mock_jwt_service.verify_token(original_token)
|
|
refreshed_payload = mock_jwt_service.verify_token(refreshed_token)
|
|
|
|
assert original_payload["sub"] == refreshed_payload["sub"]
|
|
assert original_payload["iat"] != refreshed_payload["iat"]
|
|
|
|
|
|
class TestUserAuthentication:
|
|
"""Test user authentication scenarios."""
|
|
|
|
@pytest.fixture
|
|
def mock_user_service(self):
|
|
"""Mock user service."""
|
|
class MockUserService:
|
|
def __init__(self):
|
|
self.users = {
|
|
"testuser": {
|
|
"id": "user-001",
|
|
"username": "testuser",
|
|
"email": "test@example.com",
|
|
"password_hash": "hashed_password",
|
|
"is_admin": False,
|
|
"is_active": True,
|
|
"permissions": ["read", "write"],
|
|
"zones": ["zone1", "zone2"],
|
|
"created_at": datetime.utcnow()
|
|
},
|
|
"admin": {
|
|
"id": "admin-001",
|
|
"username": "admin",
|
|
"email": "admin@example.com",
|
|
"password_hash": "admin_hashed_password",
|
|
"is_admin": True,
|
|
"is_active": True,
|
|
"permissions": ["read", "write", "admin"],
|
|
"zones": [], # Admin has access to all zones
|
|
"created_at": datetime.utcnow()
|
|
}
|
|
}
|
|
|
|
async def authenticate_user(self, username: str, password: str) -> Optional[Dict[str, Any]]:
|
|
"""Authenticate user with username and password."""
|
|
user = self.users.get(username)
|
|
if not user:
|
|
return None
|
|
|
|
# Mock password verification
|
|
if password == "correct_password":
|
|
return user
|
|
return None
|
|
|
|
async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get user by ID."""
|
|
for user in self.users.values():
|
|
if user["id"] == user_id:
|
|
return user
|
|
return None
|
|
|
|
async def update_user_activity(self, user_id: str):
|
|
"""Update user last activity."""
|
|
user = await self.get_user_by_id(user_id)
|
|
if user:
|
|
user["last_activity"] = datetime.utcnow()
|
|
|
|
return MockUserService()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_authentication_success_should_fail_initially(self, mock_user_service):
|
|
"""Test successful user authentication - should fail initially."""
|
|
user = await mock_user_service.authenticate_user("testuser", "correct_password")
|
|
|
|
# This will fail initially
|
|
assert user is not None
|
|
assert user["username"] == "testuser"
|
|
assert user["is_active"] is True
|
|
assert "read" in user["permissions"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_authentication_failure_should_fail_initially(self, mock_user_service):
|
|
"""Test failed user authentication - should fail initially."""
|
|
user = await mock_user_service.authenticate_user("testuser", "wrong_password")
|
|
|
|
# This will fail initially
|
|
assert user is None
|
|
|
|
# Test with non-existent user
|
|
user = await mock_user_service.authenticate_user("nonexistent", "any_password")
|
|
assert user is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_admin_user_authentication_should_fail_initially(self, mock_user_service):
|
|
"""Test admin user authentication - should fail initially."""
|
|
admin = await mock_user_service.authenticate_user("admin", "correct_password")
|
|
|
|
# This will fail initially
|
|
assert admin is not None
|
|
assert admin["is_admin"] is True
|
|
assert "admin" in admin["permissions"]
|
|
assert admin["zones"] == [] # Admin has access to all zones
|
|
|
|
|
|
class TestAuthorizationDependencies:
|
|
"""Test authorization dependency functions."""
|
|
|
|
@pytest.fixture
|
|
def mock_request(self):
|
|
"""Mock FastAPI request."""
|
|
class MockRequest:
|
|
def __init__(self):
|
|
self.state = MagicMock()
|
|
self.state.user = None
|
|
|
|
return MockRequest()
|
|
|
|
@pytest.fixture
|
|
def mock_credentials(self):
|
|
"""Mock HTTP authorization credentials."""
|
|
def create_credentials(token: str):
|
|
return HTTPAuthorizationCredentials(
|
|
scheme="Bearer",
|
|
credentials=token
|
|
)
|
|
return create_credentials
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_current_user_with_valid_token_should_fail_initially(self, mock_request, mock_credentials):
|
|
"""Test get_current_user with valid token - should fail initially."""
|
|
# Mock the get_current_user dependency
|
|
async def mock_get_current_user(request, credentials):
|
|
if not credentials:
|
|
return None
|
|
|
|
# Mock token validation
|
|
if credentials.credentials == "valid_token":
|
|
return {
|
|
"id": "user-001",
|
|
"username": "testuser",
|
|
"is_admin": False,
|
|
"is_active": True,
|
|
"permissions": ["read", "write"]
|
|
}
|
|
return None
|
|
|
|
credentials = mock_credentials("valid_token")
|
|
user = await mock_get_current_user(mock_request, credentials)
|
|
|
|
# This will fail initially
|
|
assert user is not None
|
|
assert user["username"] == "testuser"
|
|
assert user["is_active"] is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_current_user_without_credentials_should_fail_initially(self, mock_request):
|
|
"""Test get_current_user without credentials - should fail initially."""
|
|
async def mock_get_current_user(request, credentials):
|
|
if not credentials:
|
|
return None
|
|
return {"id": "user-001"}
|
|
|
|
user = await mock_get_current_user(mock_request, None)
|
|
|
|
# This will fail initially
|
|
assert user is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_require_active_user_should_fail_initially(self):
|
|
"""Test require active user dependency - should fail initially."""
|
|
async def mock_get_current_active_user(current_user):
|
|
if not current_user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication required"
|
|
)
|
|
|
|
if not current_user.get("is_active", True):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Inactive user"
|
|
)
|
|
|
|
return current_user
|
|
|
|
# Test with active user
|
|
active_user = {"id": "user-001", "is_active": True}
|
|
result = await mock_get_current_active_user(active_user)
|
|
|
|
# This will fail initially
|
|
assert result == active_user
|
|
|
|
# Test with inactive user
|
|
inactive_user = {"id": "user-002", "is_active": False}
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await mock_get_current_active_user(inactive_user)
|
|
|
|
assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
# Test with no user
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await mock_get_current_active_user(None)
|
|
|
|
assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_require_admin_user_should_fail_initially(self):
|
|
"""Test require admin user dependency - should fail initially."""
|
|
async def mock_get_admin_user(current_user):
|
|
if not current_user.get("is_admin", False):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin privileges required"
|
|
)
|
|
return current_user
|
|
|
|
# Test with admin user
|
|
admin_user = {"id": "admin-001", "is_admin": True}
|
|
result = await mock_get_admin_user(admin_user)
|
|
|
|
# This will fail initially
|
|
assert result == admin_user
|
|
|
|
# Test with regular user
|
|
regular_user = {"id": "user-001", "is_admin": False}
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await mock_get_admin_user(regular_user)
|
|
|
|
assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_permission_checking_should_fail_initially(self):
|
|
"""Test permission checking functionality - should fail initially."""
|
|
def require_permission(permission: str):
|
|
async def check_permission(current_user):
|
|
user_permissions = current_user.get("permissions", [])
|
|
|
|
# Admin users have all permissions
|
|
if current_user.get("is_admin", False):
|
|
return current_user
|
|
|
|
# Check specific permission
|
|
if permission not in user_permissions:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Permission '{permission}' required"
|
|
)
|
|
|
|
return current_user
|
|
|
|
return check_permission
|
|
|
|
# Test with user having required permission
|
|
user_with_permission = {
|
|
"id": "user-001",
|
|
"permissions": ["read", "write"],
|
|
"is_admin": False
|
|
}
|
|
|
|
check_read = require_permission("read")
|
|
result = await check_read(user_with_permission)
|
|
|
|
# This will fail initially
|
|
assert result == user_with_permission
|
|
|
|
# Test with user missing permission
|
|
check_admin = require_permission("admin")
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await check_admin(user_with_permission)
|
|
|
|
assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN
|
|
assert "admin" in exc_info.value.detail
|
|
|
|
# Test with admin user (should have all permissions)
|
|
admin_user = {"id": "admin-001", "is_admin": True, "permissions": ["read"]}
|
|
result = await check_admin(admin_user)
|
|
assert result == admin_user
|
|
|
|
|
|
class TestZoneAndRouterAccess:
|
|
"""Test zone and router access control."""
|
|
|
|
@pytest.fixture
|
|
def mock_domain_config(self):
|
|
"""Mock domain configuration."""
|
|
class MockDomainConfig:
|
|
def __init__(self):
|
|
self.zones = {
|
|
"zone1": {"id": "zone1", "name": "Zone 1", "enabled": True},
|
|
"zone2": {"id": "zone2", "name": "Zone 2", "enabled": True},
|
|
"zone3": {"id": "zone3", "name": "Zone 3", "enabled": False}
|
|
}
|
|
self.routers = {
|
|
"router1": {"id": "router1", "name": "Router 1", "enabled": True},
|
|
"router2": {"id": "router2", "name": "Router 2", "enabled": False}
|
|
}
|
|
|
|
def get_zone(self, zone_id: str):
|
|
return self.zones.get(zone_id)
|
|
|
|
def get_router(self, router_id: str):
|
|
return self.routers.get(router_id)
|
|
|
|
return MockDomainConfig()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_zone_access_validation_should_fail_initially(self, mock_domain_config):
|
|
"""Test zone access validation - should fail initially."""
|
|
async def validate_zone_access(zone_id: str, current_user=None):
|
|
zone = mock_domain_config.get_zone(zone_id)
|
|
if not zone:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Zone '{zone_id}' not found"
|
|
)
|
|
|
|
if not zone["enabled"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Zone '{zone_id}' is disabled"
|
|
)
|
|
|
|
if current_user:
|
|
if current_user.get("is_admin", False):
|
|
return zone_id
|
|
|
|
user_zones = current_user.get("zones", [])
|
|
if user_zones and zone_id not in user_zones:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Access denied to zone '{zone_id}'"
|
|
)
|
|
|
|
return zone_id
|
|
|
|
# Test valid zone access
|
|
result = await validate_zone_access("zone1")
|
|
|
|
# This will fail initially
|
|
assert result == "zone1"
|
|
|
|
# Test invalid zone
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await validate_zone_access("nonexistent")
|
|
|
|
assert exc_info.value.status_code == status.HTTP_404_NOT_FOUND
|
|
|
|
# Test disabled zone
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await validate_zone_access("zone3")
|
|
|
|
assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
# Test user with zone access
|
|
user_with_access = {"id": "user-001", "zones": ["zone1", "zone2"]}
|
|
result = await validate_zone_access("zone1", user_with_access)
|
|
assert result == "zone1"
|
|
|
|
# Test user without zone access
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await validate_zone_access("zone2", {"id": "user-002", "zones": ["zone1"]})
|
|
|
|
assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_router_access_validation_should_fail_initially(self, mock_domain_config):
|
|
"""Test router access validation - should fail initially."""
|
|
async def validate_router_access(router_id: str, current_user=None):
|
|
router = mock_domain_config.get_router(router_id)
|
|
if not router:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Router '{router_id}' not found"
|
|
)
|
|
|
|
if not router["enabled"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Router '{router_id}' is disabled"
|
|
)
|
|
|
|
return router_id
|
|
|
|
# Test valid router access
|
|
result = await validate_router_access("router1")
|
|
|
|
# This will fail initially
|
|
assert result == "router1"
|
|
|
|
# Test disabled router
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await validate_router_access("router2")
|
|
|
|
assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN |