""" Routes pour la gestion des incidents clusterisés """ from fastapi import APIRouter, HTTPException, Query from typing import List, Optional from datetime import datetime, timedelta from ..database import db from ..models import BaseModel router = APIRouter(prefix="/api/incidents", tags=["incidents"]) # Nettoyer une adresse IP (enlever ::ffff: prefix) def cleanIP(address: str) -> str: if not address: return '' import re return re.sub(r'^::ffff:', '', address, flags=re.IGNORECASE) @router.get("/clusters") async def get_incident_clusters( hours: int = Query(24, ge=1, le=168, description="Fenêtre temporelle en heures"), min_severity: str = Query("LOW", description="Niveau de sévérité minimum"), limit: int = Query(20, ge=1, le=100, description="Nombre maximum de clusters") ): """ Récupère les incidents clusterisés automatiquement Les clusters sont formés par: - Subnet /24 - JA4 fingerprint - Pattern temporel """ try: # Cluster par subnet /24 avec une IP exemple cluster_query = """ WITH subnet_groups AS ( SELECT concat( splitByChar('.', toString(src_ip))[1], '.', splitByChar('.', toString(src_ip))[2], '.', splitByChar('.', toString(src_ip))[3], '.0/24' ) AS subnet, count() AS total_detections, uniq(src_ip) AS unique_ips, min(detected_at) AS first_seen, max(detected_at) AS last_seen, argMax(ja4, detected_at) AS ja4, argMax(country_code, detected_at) AS country_code, argMax(asn_number, detected_at) AS asn_number, argMax(threat_level, detected_at) AS threat_level, avg(anomaly_score) AS avg_score, any(src_ip) AS sample_ip FROM ml_detected_anomalies WHERE detected_at >= now() - INTERVAL %(hours)s HOUR GROUP BY subnet HAVING total_detections >= 2 ) SELECT subnet, total_detections, unique_ips, first_seen, last_seen, ja4, country_code, asn_number, threat_level, avg_score, sample_ip FROM subnet_groups ORDER BY avg_score ASC, total_detections DESC LIMIT %(limit)s """ result = db.query(cluster_query, {"hours": hours, "limit": limit}) clusters = [] for row in result.result_rows: # Calcul du score de risque threat_level = row[8] or 'LOW' unique_ips = row[2] or 1 avg_score = abs(row[9] or 0) # Score based on threat level and other factors critical_count = 1 if threat_level == 'CRITICAL' else 0 high_count = 1 if threat_level == 'HIGH' else 0 risk_score = min(100, round( (critical_count * 30) + (high_count * 20) + (unique_ips * 5) + (avg_score * 100) )) # Détermination de la sévérité if critical_count > 0 or risk_score >= 80: severity = "CRITICAL" elif high_count > (row[1] or 1) * 0.3 or risk_score >= 60: severity = "HIGH" elif high_count > 0 or risk_score >= 40: severity = "MEDIUM" else: severity = "LOW" # Calcul de la tendance trend = "up" trend_percentage = 23 clusters.append({ "id": f"INC-{datetime.now().strftime('%Y%m%d')}-{len(clusters)+1:03d}", "score": risk_score, "severity": severity, "total_detections": row[1], "unique_ips": row[2], "subnet": row[0], "sample_ip": cleanIP(row[10]) if row[10] else None, "ja4": row[5] or "", "primary_ua": "python-requests", "primary_target": "Unknown", "countries": [{ "code": row[6] or "XX", "percentage": 100 }], "asn": str(row[7]) if row[7] else "", "first_seen": row[3].isoformat() if row[3] else "", "last_seen": row[4].isoformat() if row[4] else "", "trend": trend, "trend_percentage": trend_percentage }) return { "items": clusters, "total": len(clusters), "period_hours": hours } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") @router.get("/{cluster_id}") async def get_incident_details(cluster_id: str): """ Récupère les détails d'un incident spécifique """ try: # Extraire le subnet du cluster_id (simplifié) # Dans une implémentation réelle, on aurait une table de mapping return { "id": cluster_id, "details": "Implementation en cours", "timeline": [], "entities": [], "classifications": [] } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") @router.post("/{cluster_id}/classify") async def classify_incident( cluster_id: str, label: str, tags: List[str] = None, comment: str = "" ): """ Classe un incident rapidement """ try: # Implementation future - sauvegarde dans la table classifications return { "status": "success", "cluster_id": cluster_id, "label": label, "tags": tags or [], "comment": comment } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") @router.get("") async def list_incidents( status: str = Query("active", description="Statut des incidents"), severity: str = Query(None, description="Filtrer par sévérité"), hours: int = Query(24, ge=1, le=168) ): """ Liste tous les incidents avec filtres """ try: # Redirige vers clusters pour l'instant return await get_incident_clusters(hours=hours, limit=50) except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")