""" Endpoints pour la liste des détections """ from fastapi import APIRouter, HTTPException, Query from typing import Optional, List from ..database import db from ..models import DetectionsListResponse, Detection router = APIRouter(prefix="/api/detections", tags=["detections"]) # Mapping label ASN → score float (0 = très suspect, 1 = légitime) _ASN_LABEL_SCORES: dict[str, float] = { 'human': 0.9, 'bot': 0.05, 'proxy': 0.25, 'vpn': 0.3, 'tor': 0.1, 'datacenter': 0.4, 'scanner': 0.05, 'malicious': 0.05, } def _label_to_score(label: str) -> float | None: """Convertit un label de réputation ASN en score numérique.""" if not label: return None return _ASN_LABEL_SCORES.get(label.lower(), 0.5) @router.get("", response_model=DetectionsListResponse) async def get_detections( page: int = Query(1, ge=1, description="Numéro de page"), page_size: int = Query(25, ge=1, le=100, description="Nombre de lignes par page"), threat_level: Optional[str] = Query(None, description="Filtrer par niveau de menace"), model_name: Optional[str] = Query(None, description="Filtrer par modèle"), country_code: Optional[str] = Query(None, description="Filtrer par pays"), asn_number: Optional[str] = Query(None, description="Filtrer par ASN"), search: Optional[str] = Query(None, description="Recherche texte (IP, JA4, Host)"), sort_by: str = Query("detected_at", description="Trier par"), sort_order: str = Query("DESC", description="Ordre (ASC/DESC)"), group_by_ip: bool = Query(False, description="Grouper par IP (first_seen/last_seen agrégés)"), score_type: Optional[str] = Query(None, description="Filtrer par type de score: BOT, REGLE, BOT_REGLE, SCORE") ): """ Récupère la liste des détections avec pagination et filtres """ try: # Construction de la requête where_clauses = ["detected_at >= now() - INTERVAL 24 HOUR"] params = {} if threat_level: where_clauses.append("threat_level = %(threat_level)s") params["threat_level"] = threat_level if model_name: where_clauses.append("model_name = %(model_name)s") params["model_name"] = model_name if country_code: where_clauses.append("country_code = %(country_code)s") params["country_code"] = country_code.upper() if asn_number: where_clauses.append("asn_number = %(asn_number)s") params["asn_number"] = asn_number if search: where_clauses.append( "(ilike(toString(src_ip), %(search)s) OR ilike(ja4, %(search)s) OR ilike(host, %(search)s))" ) params["search"] = f"%{search}%" if score_type: st = score_type.upper() if st == "BOT": where_clauses.append("threat_level = 'KNOWN_BOT'") elif st == "REGLE": where_clauses.append("threat_level = 'ANUBIS_DENY'") elif st == "BOT_REGLE": where_clauses.append("threat_level IN ('KNOWN_BOT', 'ANUBIS_DENY')") elif st == "SCORE": where_clauses.append("threat_level NOT IN ('KNOWN_BOT', 'ANUBIS_DENY')") where_clause = " AND ".join(where_clauses) # Requête de comptage count_query = f""" SELECT count() FROM ml_detected_anomalies WHERE {where_clause} """ count_result = db.query(count_query, params) total = count_result.result_rows[0][0] if count_result.result_rows else 0 # Requête principale offset = (page - 1) * page_size sort_order = "DESC" if sort_order.upper() == "DESC" else "ASC" # ── Mode groupé par IP (first_seen / last_seen depuis la DB) ──────────── if group_by_ip: valid_sort_grouped = ["anomaly_score", "hits", "hit_velocity", "first_seen", "last_seen", "src_ip", "detected_at"] grouped_sort = sort_by if sort_by in valid_sort_grouped else "last_seen" # detected_at → last_seen (max(detected_at) dans le GROUP BY) if grouped_sort == "detected_at": grouped_sort = "last_seen" # In outer query, min_score is exposed as anomaly_score — keep the alias outer_sort = "min_score" if grouped_sort == "anomaly_score" else grouped_sort # Count distinct IPs count_ip_query = f""" SELECT uniq(src_ip) FROM ml_detected_anomalies WHERE {where_clause} """ cr = db.query(count_ip_query, params) total = cr.result_rows[0][0] if cr.result_rows else 0 grouped_query = f""" SELECT ip_data.src_ip, ip_data.first_seen, ip_data.last_seen, ip_data.detection_count, ip_data.unique_ja4s, ip_data.unique_hosts, ip_data.min_score AS anomaly_score, ip_data.threat_level_best, ip_data.model_name_best, ip_data.country_code, ip_data.asn_number, ip_data.asn_org, ip_data.hit_velocity, ip_data.hits, ip_data.asn_label, ar.label AS asn_rep_label, ip_data.anubis_bot_name_best, ip_data.anubis_bot_action_best, ip_data.anubis_bot_category_best FROM ( SELECT src_ip, min(detected_at) AS first_seen, max(detected_at) AS last_seen, count() AS detection_count, groupUniqArray(5)(ja4) AS unique_ja4s, groupUniqArray(5)(host) AS unique_hosts, min(anomaly_score) AS min_score, argMin(threat_level, anomaly_score) AS threat_level_best, argMin(model_name, anomaly_score) AS model_name_best, any(country_code) AS country_code, any(asn_number) AS asn_number, any(asn_org) AS asn_org, max(hit_velocity) AS hit_velocity, sum(hits) AS hits, any(asn_label) AS asn_label, argMin(anubis_bot_name, anomaly_score) AS anubis_bot_name_best, argMin(anubis_bot_action, anomaly_score) AS anubis_bot_action_best, argMin(anubis_bot_category, anomaly_score) AS anubis_bot_category_best FROM ml_detected_anomalies WHERE {where_clause} GROUP BY src_ip ) ip_data LEFT JOIN mabase_prod.asn_reputation ar ON ar.src_asn = toUInt32OrZero(ip_data.asn_number) ORDER BY {outer_sort} {sort_order} LIMIT %(limit)s OFFSET %(offset)s """ params["limit"] = page_size params["offset"] = offset gresult = db.query(grouped_query, params) detections = [] for row in gresult.result_rows: # row: src_ip, first_seen, last_seen, detection_count, unique_ja4s, unique_hosts, # anomaly_score, threat_level_best, model_name_best, country_code, asn_number, # asn_org, hit_velocity, hits, asn_label, asn_rep_label, # anubis_bot_name, anubis_bot_action, anubis_bot_category ja4s = list(row[4]) if row[4] else [] hosts = list(row[5]) if row[5] else [] detections.append(Detection( detected_at=row[1], src_ip=str(row[0]), ja4=ja4s[0] if ja4s else "", host=hosts[0] if hosts else "", bot_name="", anomaly_score=float(row[6]) if row[6] else 0.0, threat_level=row[7] or "LOW", model_name=row[8] or "", recurrence=int(row[3] or 0), asn_number=str(row[10]) if row[10] else "", asn_org=row[11] or "", asn_detail="", asn_domain="", country_code=row[9] or "", asn_label=row[14] or "", hits=int(row[13] or 0), hit_velocity=float(row[12]) if row[12] else 0.0, fuzzing_index=0.0, post_ratio=0.0, reason="", asn_rep_label=row[15] or "", asn_score=_label_to_score(row[15] or ""), first_seen=row[1], last_seen=row[2], unique_ja4s=ja4s, unique_hosts=hosts, anubis_bot_name=row[16] or "", anubis_bot_action=row[17] or "", anubis_bot_category=row[18] or "", )) total_pages = (total + page_size - 1) // page_size return DetectionsListResponse( items=detections, total=total, page=page, page_size=page_size, total_pages=total_pages ) # ── Mode individuel (comportement original) ────────────────────────────── # Validation du tri valid_sort_columns = [ "detected_at", "src_ip", "threat_level", "anomaly_score", "asn_number", "country_code", "hits", "hit_velocity" ] if sort_by not in valid_sort_columns: sort_by = "detected_at" main_query = f""" SELECT detected_at, src_ip, ja4, host, bot_name, anomaly_score, threat_level, model_name, recurrence, asn_number, asn_org, asn_detail, asn_domain, country_code, asn_label, hits, hit_velocity, fuzzing_index, post_ratio, reason, ar.label AS asn_rep_label, anubis_bot_name, anubis_bot_action, anubis_bot_category FROM ml_detected_anomalies LEFT JOIN mabase_prod.asn_reputation ar ON ar.src_asn = toUInt32OrZero(asn_number) WHERE {where_clause} ORDER BY {sort_by} {sort_order} LIMIT %(limit)s OFFSET %(offset)s """ params["limit"] = page_size params["offset"] = offset result = db.query(main_query, params) detections = [ Detection( detected_at=row[0], src_ip=str(row[1]), ja4=row[2] or "", host=row[3] or "", bot_name=row[4] or "", anomaly_score=float(row[5]) if row[5] else 0.0, threat_level=row[6] or "LOW", model_name=row[7] or "", recurrence=row[8] or 0, asn_number=str(row[9]) if row[9] else "", asn_org=row[10] or "", asn_detail=row[11] or "", asn_domain=row[12] or "", country_code=row[13] or "", asn_label=row[14] or "", hits=row[15] or 0, hit_velocity=float(row[16]) if row[16] else 0.0, fuzzing_index=float(row[17]) if row[17] else 0.0, post_ratio=float(row[18]) if row[18] else 0.0, reason=row[19] or "", asn_rep_label=row[20] or "", asn_score=_label_to_score(row[20] or ""), anubis_bot_name=row[21] or "", anubis_bot_action=row[22] or "", anubis_bot_category=row[23] or "", ) for row in result.result_rows ] total_pages = (total + page_size - 1) // page_size return DetectionsListResponse( items=detections, total=total, page=page, page_size=page_size, total_pages=total_pages ) except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur lors de la récupération des détections: {str(e)}") @router.get("/{detection_id}") async def get_detection_details(detection_id: str): """ Récupère les détails d'une détection spécifique detection_id peut être une IP ou un identifiant """ try: query = """ SELECT detected_at, src_ip, ja4, host, bot_name, anomaly_score, threat_level, model_name, recurrence, asn_number, asn_org, asn_detail, asn_domain, country_code, asn_label, hits, hit_velocity, fuzzing_index, post_ratio, port_exhaustion_ratio, orphan_ratio, tcp_jitter_variance, tcp_shared_count, true_window_size, window_mss_ratio, alpn_http_mismatch, is_alpn_missing, sni_host_mismatch, header_count, has_accept_language, has_cookie, has_referer, modern_browser_score, ua_ch_mismatch, header_order_shared_count, ip_id_zero_ratio, request_size_variance, multiplexing_efficiency, mss_mobile_mismatch, correlated, reason, asset_ratio, direct_access_ratio, is_ua_rotating, distinct_ja4_count, src_port_density, ja4_asn_concentration, ja4_country_concentration, is_rare_ja4 FROM ml_detected_anomalies WHERE src_ip = %(ip)s ORDER BY detected_at DESC LIMIT 1 """ result = db.query(query, {"ip": detection_id}) if not result.result_rows: raise HTTPException(status_code=404, detail="Détection non trouvée") row = result.result_rows[0] return { "detected_at": row[0], "src_ip": str(row[1]), "ja4": row[2] or "", "host": row[3] or "", "bot_name": row[4] or "", "anomaly_score": float(row[5]) if row[5] else 0.0, "threat_level": row[6] or "LOW", "model_name": row[7] or "", "recurrence": row[8] or 0, "asn": { "number": str(row[9]) if row[9] else "", "org": row[10] or "", "detail": row[11] or "", "domain": row[12] or "", "label": row[14] or "" }, "country": { "code": row[13] or "", }, "metrics": { "hits": row[15] or 0, "hit_velocity": float(row[16]) if row[16] else 0.0, "fuzzing_index": float(row[17]) if row[17] else 0.0, "post_ratio": float(row[18]) if row[18] else 0.0, "port_exhaustion_ratio": float(row[19]) if row[19] else 0.0, "orphan_ratio": float(row[20]) if row[20] else 0.0, }, "tcp": { "jitter_variance": float(row[21]) if row[21] else 0.0, "shared_count": row[22] or 0, "true_window_size": row[23] or 0, "window_mss_ratio": float(row[24]) if row[24] else 0.0, }, "tls": { "alpn_http_mismatch": bool(row[25]) if row[25] is not None else False, "is_alpn_missing": bool(row[26]) if row[26] is not None else False, "sni_host_mismatch": bool(row[27]) if row[27] is not None else False, }, "headers": { "count": row[28] or 0, "has_accept_language": bool(row[29]) if row[29] is not None else False, "has_cookie": bool(row[30]) if row[30] is not None else False, "has_referer": bool(row[31]) if row[31] is not None else False, "modern_browser_score": row[32] or 0, "ua_ch_mismatch": bool(row[33]) if row[33] is not None else False, "header_order_shared_count": row[34] or 0, }, "behavior": { "ip_id_zero_ratio": float(row[35]) if row[35] else 0.0, "request_size_variance": float(row[36]) if row[36] else 0.0, "multiplexing_efficiency": float(row[37]) if row[37] else 0.0, "mss_mobile_mismatch": bool(row[38]) if row[38] is not None else False, "correlated": bool(row[39]) if row[39] is not None else False, }, "advanced": { "asset_ratio": float(row[41]) if row[41] else 0.0, "direct_access_ratio": float(row[42]) if row[42] else 0.0, "is_ua_rotating": bool(row[43]) if row[43] is not None else False, "distinct_ja4_count": row[44] or 0, "src_port_density": float(row[45]) if row[45] else 0.0, "ja4_asn_concentration": float(row[46]) if row[46] else 0.0, "ja4_country_concentration": float(row[47]) if row[47] else 0.0, "is_rare_ja4": bool(row[48]) if row[48] is not None else False, }, "reason": row[40] or "" } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")