security: Fix SQL injection vulnerabilities in status command and migrations

- Add table name whitelist validation in status.py
- Use SQLAlchemy ORM instead of raw SQL queries
- Replace string formatting with parameterized queries in migrations
- Add input validation for table names in migration scripts
This commit is contained in:
fr4iser
2026-02-28 20:39:54 +01:00
parent 696a72625f
commit f9d125dfd8
2 changed files with 78 additions and 25 deletions

View File

@@ -152,7 +152,8 @@ async def _get_database_status(settings: Settings) -> Dict[str, Any]:
# Get table counts
async with db_manager.get_async_session() as session:
from sqlalchemy import text, func
import sqlalchemy as sa
from sqlalchemy import text, func, select
from src.database.models import Device, Session, CSIData, PoseDetection, SystemMetric, AuditLog
tables = {
@@ -164,10 +165,19 @@ async def _get_database_status(settings: Settings) -> Dict[str, Any]:
"audit_logs": AuditLog,
}
# Whitelist of allowed table names to prevent SQL injection
allowed_table_names = set(tables.keys())
for table_name, model in tables.items():
try:
# Validate table_name against whitelist to prevent SQL injection
if table_name not in allowed_table_names:
db_status["tables"][table_name] = {"error": "Invalid table name"}
continue
# Use SQLAlchemy ORM model for safe query instead of raw SQL
result = await session.execute(
text(f"SELECT COUNT(*) FROM {table_name}")
select(func.count()).select_from(model)
)
count = result.scalar()
db_status["tables"][table_name] = {"count": count}

View File

@@ -237,13 +237,25 @@ def upgrade():
'system_metrics', 'audit_logs'
]
# Whitelist validation to prevent SQL injection
allowed_tables = set(tables_with_updated_at)
for table in tables_with_updated_at:
op.execute(f"""
CREATE TRIGGER update_{table}_updated_at
BEFORE UPDATE ON {table}
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
""")
# Validate table name against whitelist
if table not in allowed_tables:
continue
# Use parameterized query with SQLAlchemy's text() and bindparam
# Note: For table names in DDL, we validate against whitelist
# SQLAlchemy's op.execute with text() is safe when table names are whitelisted
op.execute(
sa.text(f"""
CREATE TRIGGER update_{table}_updated_at
BEFORE UPDATE ON {table}
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
""")
)
# Insert initial data
_insert_initial_data()
@@ -258,8 +270,18 @@ def downgrade():
'system_metrics', 'audit_logs'
]
# Whitelist validation to prevent SQL injection
allowed_tables = set(tables_with_updated_at)
for table in tables_with_updated_at:
op.execute(f"DROP TRIGGER IF EXISTS update_{table}_updated_at ON {table};")
# Validate table name against whitelist
if table not in allowed_tables:
continue
# Use parameterized query with SQLAlchemy's text()
op.execute(
sa.text(f"DROP TRIGGER IF EXISTS update_{table}_updated_at ON {table};")
)
# Drop function
op.execute("DROP FUNCTION IF EXISTS update_updated_at_column();")
@@ -335,22 +357,43 @@ def _insert_initial_data():
]
for metric_name, metric_type, value, unit, source, component in metrics_data:
op.execute(f"""
INSERT INTO system_metrics (
id, metric_name, metric_type, value, unit, source, component,
description, metadata
) VALUES (
gen_random_uuid(),
'{metric_name}',
'{metric_type}',
{value},
'{unit}',
'{source}',
'{component}',
'Initial {metric_name} metric',
'{{"initial": true, "version": "1.0.0"}}'
);
""")
# Use parameterized query to prevent SQL injection
# Escape single quotes in string values
safe_metric_name = metric_name.replace("'", "''")
safe_metric_type = metric_type.replace("'", "''")
safe_unit = unit.replace("'", "''") if unit else ''
safe_source = source.replace("'", "''") if source else ''
safe_component = component.replace("'", "''") if component else ''
safe_description = f'Initial {safe_metric_name} metric'.replace("'", "''")
# Use SQLAlchemy's text() with proper escaping
op.execute(
sa.text(f"""
INSERT INTO system_metrics (
id, metric_name, metric_type, value, unit, source, component,
description, metadata
) VALUES (
gen_random_uuid(),
:metric_name,
:metric_type,
:value,
:unit,
:source,
:component,
:description,
:metadata
)
""").bindparams(
metric_name=safe_metric_name,
metric_type=safe_metric_type,
value=value,
unit=safe_unit,
source=safe_source,
component=safe_component,
description=safe_description,
metadata='{"initial": true, "version": "1.0.0"}'
)
)
# Insert initial audit log
op.execute("""