""" Integration tests for authentication and authorization. Tests JWT authentication flow, user permissions, and access control. """ import pytest import asyncio from datetime import datetime, timedelta from typing import Dict, Any, Optional from unittest.mock import AsyncMock, MagicMock, patch import jwt import json from fastapi import HTTPException, status from fastapi.security import HTTPAuthorizationCredentials class MockJWTToken: """Mock JWT token for testing.""" def __init__(self, payload: Dict[str, Any], secret: str = "test-secret"): self.payload = payload self.secret = secret self.token = jwt.encode(payload, secret, algorithm="HS256") def decode(self, token: str, secret: str) -> Dict[str, Any]: """Decode JWT token.""" return jwt.decode(token, secret, algorithms=["HS256"]) class TestJWTAuthentication: """Test JWT authentication functionality.""" @pytest.fixture def valid_user_payload(self): """Valid user payload for JWT token.""" return { "sub": "user-001", "username": "testuser", "email": "test@example.com", "is_admin": False, "is_active": True, "permissions": ["read", "write"], "exp": datetime.utcnow() + timedelta(hours=1), "iat": datetime.utcnow() } @pytest.fixture def admin_user_payload(self): """Admin user payload for JWT token.""" return { "sub": "admin-001", "username": "admin", "email": "admin@example.com", "is_admin": True, "is_active": True, "permissions": ["read", "write", "admin"], "exp": datetime.utcnow() + timedelta(hours=1), "iat": datetime.utcnow() } @pytest.fixture def expired_user_payload(self): """Expired user payload for JWT token.""" return { "sub": "user-002", "username": "expireduser", "email": "expired@example.com", "is_admin": False, "is_active": True, "permissions": ["read"], "exp": datetime.utcnow() - timedelta(hours=1), # Expired "iat": datetime.utcnow() - timedelta(hours=2) } @pytest.fixture def mock_jwt_service(self): """Mock JWT service.""" class MockJWTService: def __init__(self): self.secret = "test-secret-key" self.algorithm = "HS256" def create_token(self, user_data: Dict[str, Any]) -> str: """Create JWT token.""" payload = { **user_data, "exp": datetime.utcnow() + timedelta(hours=1), "iat": datetime.utcnow() } return jwt.encode(payload, self.secret, algorithm=self.algorithm) def verify_token(self, token: str) -> Dict[str, Any]: """Verify JWT token.""" try: payload = jwt.decode(token, self.secret, algorithms=[self.algorithm]) return payload except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired" ) except jwt.InvalidTokenError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token" ) def refresh_token(self, token: str) -> str: """Refresh JWT token.""" payload = self.verify_token(token) # Remove exp and iat for new token payload.pop("exp", None) payload.pop("iat", None) return self.create_token(payload) return MockJWTService() def test_jwt_token_creation_should_fail_initially(self, mock_jwt_service, valid_user_payload): """Test JWT token creation - should fail initially.""" token = mock_jwt_service.create_token(valid_user_payload) # This will fail initially assert isinstance(token, str) assert len(token) > 0 # Verify token can be decoded decoded = mock_jwt_service.verify_token(token) assert decoded["sub"] == valid_user_payload["sub"] assert decoded["username"] == valid_user_payload["username"] def test_jwt_token_verification_should_fail_initially(self, mock_jwt_service, valid_user_payload): """Test JWT token verification - should fail initially.""" token = mock_jwt_service.create_token(valid_user_payload) decoded = mock_jwt_service.verify_token(token) # This will fail initially assert decoded["sub"] == valid_user_payload["sub"] assert decoded["is_admin"] == valid_user_payload["is_admin"] assert "exp" in decoded assert "iat" in decoded def test_expired_token_rejection_should_fail_initially(self, mock_jwt_service, expired_user_payload): """Test expired token rejection - should fail initially.""" # Create token with expired payload token = jwt.encode(expired_user_payload, mock_jwt_service.secret, algorithm=mock_jwt_service.algorithm) # This should fail initially with pytest.raises(HTTPException) as exc_info: mock_jwt_service.verify_token(token) assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED assert "expired" in exc_info.value.detail.lower() def test_invalid_token_rejection_should_fail_initially(self, mock_jwt_service): """Test invalid token rejection - should fail initially.""" invalid_token = "invalid.jwt.token" # This should fail initially with pytest.raises(HTTPException) as exc_info: mock_jwt_service.verify_token(invalid_token) assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED assert "invalid" in exc_info.value.detail.lower() def test_token_refresh_should_fail_initially(self, mock_jwt_service, valid_user_payload): """Test token refresh functionality - should fail initially.""" original_token = mock_jwt_service.create_token(valid_user_payload) # Wait a moment to ensure different timestamps import time time.sleep(0.1) refreshed_token = mock_jwt_service.refresh_token(original_token) # This will fail initially assert refreshed_token != original_token # Verify both tokens are valid but have different timestamps original_payload = mock_jwt_service.verify_token(original_token) refreshed_payload = mock_jwt_service.verify_token(refreshed_token) assert original_payload["sub"] == refreshed_payload["sub"] assert original_payload["iat"] != refreshed_payload["iat"] class TestUserAuthentication: """Test user authentication scenarios.""" @pytest.fixture def mock_user_service(self): """Mock user service.""" class MockUserService: def __init__(self): self.users = { "testuser": { "id": "user-001", "username": "testuser", "email": "test@example.com", "password_hash": "hashed_password", "is_admin": False, "is_active": True, "permissions": ["read", "write"], "zones": ["zone1", "zone2"], "created_at": datetime.utcnow() }, "admin": { "id": "admin-001", "username": "admin", "email": "admin@example.com", "password_hash": "admin_hashed_password", "is_admin": True, "is_active": True, "permissions": ["read", "write", "admin"], "zones": [], # Admin has access to all zones "created_at": datetime.utcnow() } } async def authenticate_user(self, username: str, password: str) -> Optional[Dict[str, Any]]: """Authenticate user with username and password.""" user = self.users.get(username) if not user: return None # Mock password verification if password == "correct_password": return user return None async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]: """Get user by ID.""" for user in self.users.values(): if user["id"] == user_id: return user return None async def update_user_activity(self, user_id: str): """Update user last activity.""" user = await self.get_user_by_id(user_id) if user: user["last_activity"] = datetime.utcnow() return MockUserService() @pytest.mark.asyncio async def test_user_authentication_success_should_fail_initially(self, mock_user_service): """Test successful user authentication - should fail initially.""" user = await mock_user_service.authenticate_user("testuser", "correct_password") # This will fail initially assert user is not None assert user["username"] == "testuser" assert user["is_active"] is True assert "read" in user["permissions"] @pytest.mark.asyncio async def test_user_authentication_failure_should_fail_initially(self, mock_user_service): """Test failed user authentication - should fail initially.""" user = await mock_user_service.authenticate_user("testuser", "wrong_password") # This will fail initially assert user is None # Test with non-existent user user = await mock_user_service.authenticate_user("nonexistent", "any_password") assert user is None @pytest.mark.asyncio async def test_admin_user_authentication_should_fail_initially(self, mock_user_service): """Test admin user authentication - should fail initially.""" admin = await mock_user_service.authenticate_user("admin", "correct_password") # This will fail initially assert admin is not None assert admin["is_admin"] is True assert "admin" in admin["permissions"] assert admin["zones"] == [] # Admin has access to all zones class TestAuthorizationDependencies: """Test authorization dependency functions.""" @pytest.fixture def mock_request(self): """Mock FastAPI request.""" class MockRequest: def __init__(self): self.state = MagicMock() self.state.user = None return MockRequest() @pytest.fixture def mock_credentials(self): """Mock HTTP authorization credentials.""" def create_credentials(token: str): return HTTPAuthorizationCredentials( scheme="Bearer", credentials=token ) return create_credentials @pytest.mark.asyncio async def test_get_current_user_with_valid_token_should_fail_initially(self, mock_request, mock_credentials): """Test get_current_user with valid token - should fail initially.""" # Mock the get_current_user dependency async def mock_get_current_user(request, credentials): if not credentials: return None # Mock token validation if credentials.credentials == "valid_token": return { "id": "user-001", "username": "testuser", "is_admin": False, "is_active": True, "permissions": ["read", "write"] } return None credentials = mock_credentials("valid_token") user = await mock_get_current_user(mock_request, credentials) # This will fail initially assert user is not None assert user["username"] == "testuser" assert user["is_active"] is True @pytest.mark.asyncio async def test_get_current_user_without_credentials_should_fail_initially(self, mock_request): """Test get_current_user without credentials - should fail initially.""" async def mock_get_current_user(request, credentials): if not credentials: return None return {"id": "user-001"} user = await mock_get_current_user(mock_request, None) # This will fail initially assert user is None @pytest.mark.asyncio async def test_require_active_user_should_fail_initially(self): """Test require active user dependency - should fail initially.""" async def mock_get_current_active_user(current_user): if not current_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required" ) if not current_user.get("is_active", True): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) return current_user # Test with active user active_user = {"id": "user-001", "is_active": True} result = await mock_get_current_active_user(active_user) # This will fail initially assert result == active_user # Test with inactive user inactive_user = {"id": "user-002", "is_active": False} with pytest.raises(HTTPException) as exc_info: await mock_get_current_active_user(inactive_user) assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN # Test with no user with pytest.raises(HTTPException) as exc_info: await mock_get_current_active_user(None) assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED @pytest.mark.asyncio async def test_require_admin_user_should_fail_initially(self): """Test require admin user dependency - should fail initially.""" async def mock_get_admin_user(current_user): if not current_user.get("is_admin", False): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required" ) return current_user # Test with admin user admin_user = {"id": "admin-001", "is_admin": True} result = await mock_get_admin_user(admin_user) # This will fail initially assert result == admin_user # Test with regular user regular_user = {"id": "user-001", "is_admin": False} with pytest.raises(HTTPException) as exc_info: await mock_get_admin_user(regular_user) assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN @pytest.mark.asyncio async def test_permission_checking_should_fail_initially(self): """Test permission checking functionality - should fail initially.""" def require_permission(permission: str): async def check_permission(current_user): user_permissions = current_user.get("permissions", []) # Admin users have all permissions if current_user.get("is_admin", False): return current_user # Check specific permission if permission not in user_permissions: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permission '{permission}' required" ) return current_user return check_permission # Test with user having required permission user_with_permission = { "id": "user-001", "permissions": ["read", "write"], "is_admin": False } check_read = require_permission("read") result = await check_read(user_with_permission) # This will fail initially assert result == user_with_permission # Test with user missing permission check_admin = require_permission("admin") with pytest.raises(HTTPException) as exc_info: await check_admin(user_with_permission) assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN assert "admin" in exc_info.value.detail # Test with admin user (should have all permissions) admin_user = {"id": "admin-001", "is_admin": True, "permissions": ["read"]} result = await check_admin(admin_user) assert result == admin_user class TestZoneAndRouterAccess: """Test zone and router access control.""" @pytest.fixture def mock_domain_config(self): """Mock domain configuration.""" class MockDomainConfig: def __init__(self): self.zones = { "zone1": {"id": "zone1", "name": "Zone 1", "enabled": True}, "zone2": {"id": "zone2", "name": "Zone 2", "enabled": True}, "zone3": {"id": "zone3", "name": "Zone 3", "enabled": False} } self.routers = { "router1": {"id": "router1", "name": "Router 1", "enabled": True}, "router2": {"id": "router2", "name": "Router 2", "enabled": False} } def get_zone(self, zone_id: str): return self.zones.get(zone_id) def get_router(self, router_id: str): return self.routers.get(router_id) return MockDomainConfig() @pytest.mark.asyncio async def test_zone_access_validation_should_fail_initially(self, mock_domain_config): """Test zone access validation - should fail initially.""" async def validate_zone_access(zone_id: str, current_user=None): zone = mock_domain_config.get_zone(zone_id) if not zone: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Zone '{zone_id}' not found" ) if not zone["enabled"]: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Zone '{zone_id}' is disabled" ) if current_user: if current_user.get("is_admin", False): return zone_id user_zones = current_user.get("zones", []) if user_zones and zone_id not in user_zones: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Access denied to zone '{zone_id}'" ) return zone_id # Test valid zone access result = await validate_zone_access("zone1") # This will fail initially assert result == "zone1" # Test invalid zone with pytest.raises(HTTPException) as exc_info: await validate_zone_access("nonexistent") assert exc_info.value.status_code == status.HTTP_404_NOT_FOUND # Test disabled zone with pytest.raises(HTTPException) as exc_info: await validate_zone_access("zone3") assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN # Test user with zone access user_with_access = {"id": "user-001", "zones": ["zone1", "zone2"]} result = await validate_zone_access("zone1", user_with_access) assert result == "zone1" # Test user without zone access with pytest.raises(HTTPException) as exc_info: await validate_zone_access("zone2", {"id": "user-002", "zones": ["zone1"]}) assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN @pytest.mark.asyncio async def test_router_access_validation_should_fail_initially(self, mock_domain_config): """Test router access validation - should fail initially.""" async def validate_router_access(router_id: str, current_user=None): router = mock_domain_config.get_router(router_id) if not router: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Router '{router_id}' not found" ) if not router["enabled"]: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Router '{router_id}' is disabled" ) return router_id # Test valid router access result = await validate_router_access("router1") # This will fail initially assert result == "router1" # Test disabled router with pytest.raises(HTTPException) as exc_info: await validate_router_access("router2") assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN