""" Routes pour l'audit et les logs d'activité """ from fastapi import APIRouter, HTTPException, Query, Request from typing import List, Optional from datetime import datetime, timedelta from ..database import db router = APIRouter(prefix="/api/audit", tags=["audit"]) @router.post("/logs") async def create_audit_log( request: Request, action: str, entity_type: Optional[str] = None, entity_id: Optional[str] = None, entity_count: Optional[int] = None, details: Optional[dict] = None, user: Optional[str] = "soc_user" ): """ Crée un log d'audit pour une action utilisateur """ try: # Récupérer l'IP du client client_ip = request.client.host if request.client else "unknown" # Insérer dans ClickHouse insert_query = """ INSERT INTO mabase_prod.audit_logs (timestamp, user_name, action, entity_type, entity_id, entity_count, details, client_ip) VALUES (%(timestamp)s, %(user)s, %(action)s, %(entity_type)s, %(entity_id)s, %(entity_count)s, %(details)s, %(client_ip)s) """ params = { 'timestamp': datetime.now(), 'user': user, 'action': action, 'entity_type': entity_type, 'entity_id': entity_id, 'entity_count': entity_count, 'details': str(details) if details else '', 'client_ip': client_ip } # Note: This requires the audit_logs table to exist # See deploy_audit_logs_table.sql try: db.query(insert_query, params) except Exception as e: # Table might not exist yet, log warning print(f"Warning: Could not insert audit log: {e}") return { "status": "success", "message": "Audit log created", "action": action, "timestamp": params['timestamp'].isoformat() } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") @router.get("/logs") async def get_audit_logs( hours: int = Query(24, ge=1, le=720, description="Fenêtre temporelle en heures"), user: Optional[str] = Query(None, description="Filtrer par utilisateur"), action: Optional[str] = Query(None, description="Filtrer par action"), entity_type: Optional[str] = Query(None, description="Filtrer par type d'entité"), limit: int = Query(100, ge=1, le=1000, description="Nombre maximum de résultats") ): """ Récupère les logs d'audit avec filtres """ try: where_clauses = ["timestamp >= now() - INTERVAL %(hours)s HOUR"] params = {"hours": hours, "limit": limit} if user: where_clauses.append("user_name = %(user)s") params["user"] = user if action: where_clauses.append("action = %(action)s") params["action"] = action if entity_type: where_clauses.append("entity_type = %(entity_type)s") params["entity_type"] = entity_type where_clause = " AND ".join(where_clauses) query = f""" SELECT timestamp, user_name, action, entity_type, entity_id, entity_count, details, client_ip FROM mabase_prod.audit_logs WHERE {where_clause} ORDER BY timestamp DESC LIMIT %(limit)s """ result = db.query(query, params) logs = [] for row in result.result_rows: logs.append({ "timestamp": row[0].isoformat() if row[0] else "", "user_name": row[1] or "", "action": row[2] or "", "entity_type": row[3] or "", "entity_id": row[4] or "", "entity_count": row[5] or 0, "details": row[6] or "", "client_ip": row[7] or "" }) return { "items": logs, "total": len(logs), "period_hours": hours } except Exception as e: # If table doesn't exist, return empty result if "Table" in str(e) and "doesn't exist" in str(e): return { "items": [], "total": 0, "period_hours": hours, "warning": "Audit logs table not created yet" } raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") @router.get("/stats") async def get_audit_stats( hours: int = Query(24, ge=1, le=720) ): """ Statistiques d'audit """ try: query = """ SELECT action, count() AS count, uniq(user_name) AS unique_users, sum(entity_count) AS total_entities FROM mabase_prod.audit_logs WHERE timestamp >= now() - INTERVAL %(hours)s HOUR GROUP BY action ORDER BY count DESC """ result = db.query(query, {"hours": hours}) stats = [] for row in result.result_rows: stats.append({ "action": row[0] or "", "count": row[1] or 0, "unique_users": row[2] or 0, "total_entities": row[3] or 0 }) return { "items": stats, "period_hours": hours } except Exception as e: if "Table" in str(e) and "doesn't exist" in str(e): return { "items": [], "period_hours": hours, "warning": "Audit logs table not created yet" } raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") @router.get("/users/activity") async def get_user_activity( hours: int = Query(24, ge=1, le=720) ): """ Activité par utilisateur """ try: query = """ SELECT user_name, count() AS actions, uniq(action) AS action_types, min(timestamp) AS first_action, max(timestamp) AS last_action FROM mabase_prod.audit_logs WHERE timestamp >= now() - INTERVAL %(hours)s HOUR GROUP BY user_name ORDER BY actions DESC """ result = db.query(query, {"hours": hours}) users = [] for row in result.result_rows: users.append({ "user_name": row[0] or "", "actions": row[1] or 0, "action_types": row[2] or 0, "first_action": row[3].isoformat() if row[3] else "", "last_action": row[4].isoformat() if row[4] else "" }) return { "items": users, "period_hours": hours } except Exception as e: if "Table" in str(e) and "doesn't exist" in str(e): return { "items": [], "period_hours": hours, "warning": "Audit logs table not created yet" } raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")