""" Endpoints pour l'analyse des fingerprints JA4 et User-Agents Objectifs: - Détecter le spoofing JA4 (fingerprint TLS qui prétend être un navigateur mais dont les User-Agents, les headers HTTP ou les métriques comportementales trahissent une origine bot/script) - Construire une matrice JA4 × User-Agent pour visualiser les associations suspectes - Analyser la distribution des User-Agents pour identifier les rotateurs et les bots qui usurpent des UA de navigateurs légitimes """ from fastapi import APIRouter, HTTPException, Query from typing import Optional import re from ..database import db router = APIRouter(prefix="/api/fingerprints", tags=["fingerprints"]) # ─── Helpers ────────────────────────────────────────────────────────────────── # Patterns indiquant clairement un bot/script sans simulation de navigateur _BOT_PATTERNS = re.compile( r"bot|crawler|spider|scraper|python|curl|wget|go-http|java/|axios|" r"libwww|httpclient|okhttp|requests|aiohttp|httpx|playwright|puppeteer|" r"selenium|headless|phantomjs", re.IGNORECASE, ) # Navigateurs légitimes communs — un JA4 de type "browser" devrait venir avec ces UAs _BROWSER_PATTERNS = re.compile( r"mozilla|chrome|safari|firefox|edge|opera|trident", re.IGNORECASE, ) def _classify_ua(ua: str) -> str: """Retourne 'bot', 'browser', ou 'script'""" if not ua: return "empty" if _BOT_PATTERNS.search(ua): return "bot" if _BROWSER_PATTERNS.search(ua): return "browser" return "script" # ============================================================================= # ENDPOINT 1 — Détection de spoofing JA4 # ============================================================================= @router.get("/spoofing") async def get_ja4_spoofing( hours: int = Query(24, ge=1, le=168, description="Fenêtre temporelle"), min_detections: int = Query(10, ge=1, description="Nombre minimum de détections"), limit: int = Query(50, ge=1, le=200), ): """ Identifie les JA4 fingerprints suspects de spoofing navigateur. Un JA4 est considéré suspect quand: - Il présente un taux élevé de ua_ch_mismatch (header UA ≠ Client Hints) - Son modern_browser_score est élevé mais les UAs associés sont des bots/scripts - Il apparaît avec un taux élevé de sni_host_mismatch ou alpn_http_mismatch - is_rare_ja4 = true avec un volume important Retourne un score de confiance de spoofing [0-100] pour chaque JA4. """ try: # Agrégation par JA4 avec tous les indicateurs de spoofing query = """ SELECT ja4, count() AS total_detections, uniq(src_ip) AS unique_ips, -- Indicateurs de mismatch countIf(ua_ch_mismatch = true) AS ua_ch_mismatch_count, round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct, countIf(sni_host_mismatch = true) AS sni_mismatch_count, round(countIf(sni_host_mismatch = true) * 100.0 / count(), 2) AS sni_mismatch_pct, countIf(alpn_http_mismatch = true) AS alpn_mismatch_count, round(countIf(alpn_http_mismatch = true) * 100.0 / count(), 2) AS alpn_mismatch_pct, -- Indicateurs comportementaux avg(modern_browser_score) AS avg_browser_score, countIf(is_rare_ja4 = true) AS rare_ja4_count, round(countIf(is_rare_ja4 = true) * 100.0 / count(), 2) AS rare_ja4_pct, countIf(is_ua_rotating = true) AS ua_rotating_count, round(countIf(is_ua_rotating = true) * 100.0 / count(), 2) AS ua_rotating_pct, -- Métriques TLS/TCP countIf(is_alpn_missing = true) AS alpn_missing_count, avg(distinct_ja4_count) AS avg_distinct_ja4_per_ip, -- Répartition threat levels countIf(threat_level = 'CRITICAL') AS critical_count, countIf(threat_level = 'HIGH') AS high_count, -- Botnet indicators avg(ja4_asn_concentration) AS avg_asn_concentration, avg(ja4_country_concentration) AS avg_country_concentration, argMax(threat_level, detected_at) AS last_threat_level FROM ml_detected_anomalies WHERE detected_at >= now() - INTERVAL %(hours)s HOUR AND ja4 != '' AND ja4 IS NOT NULL GROUP BY ja4 HAVING total_detections >= %(min_detections)s ORDER BY ua_ch_mismatch_pct DESC, total_detections DESC LIMIT %(limit)s """ result = db.query(query, { "hours": hours, "min_detections": min_detections, "limit": limit, }) # Fetch top UA per JA4 from view_dashboard_user_agents ja4_list = [str(r[0]) for r in result.result_rows if r[0]] ua_by_ja4: dict = {} if ja4_list: ja4_sql = ", ".join(f"'{j}'" for j in ja4_list[:100]) ua_q = f""" SELECT ja4, groupArray(5)(ua) AS top_uas FROM ( SELECT ja4, arrayJoin(user_agents) AS ua, sum(requests) AS cnt FROM view_dashboard_user_agents WHERE ja4 IN ({ja4_sql}) AND hour >= now() - INTERVAL {hours} HOUR AND ua != '' GROUP BY ja4, ua ORDER BY ja4, cnt DESC ) GROUP BY ja4 """ try: ua_res = db.query(ua_q) for ua_row in ua_res.result_rows: j4 = str(ua_row[0]) if ua_row[1]: ua_by_ja4[j4] = list(ua_row[1]) except Exception: pass items = [] for row in result.result_rows: ja4 = str(row[0]) ua_ch_mismatch_pct = float(row[4] or 0) sni_mismatch_pct = float(row[6] or 0) alpn_mismatch_pct = float(row[8] or 0) avg_browser_score = float(row[9] or 0) rare_ja4_pct = float(row[11] or 0) ua_rotating_pct = float(row[13] or 0) alpn_missing_count = int(row[14] or 0) total = int(row[1] or 1) top_uas = ua_by_ja4.get(ja4, []) ua_classes = [_classify_ua(u) for u in top_uas] has_bot_ua = any(c == "bot" for c in ua_classes) has_browser_ua = any(c == "browser" for c in ua_classes) # Spoofing confidence score [0-100]: # UA/CH mismatch est le signal le plus fort (poids 40) # Browser UA avec score navigateur élevé mais indicateurs bot (poids 25) # SNI/ALPN mismatches (poids 15) # is_rare_ja4 avec gros volume (poids 10) # UA rotating (poids 10) spoof_score = min(100, round( ua_ch_mismatch_pct * 0.40 + (avg_browser_score * 25 / 100 if has_bot_ua else 0) + sni_mismatch_pct * 0.10 + alpn_mismatch_pct * 0.05 + rare_ja4_pct * 0.10 + ua_rotating_pct * 0.10 + (10 if alpn_missing_count > total * 0.3 else 0) )) # Classification du JA4 if spoof_score >= 60: classification = "spoofed_browser" elif has_bot_ua and avg_browser_score < 30: classification = "known_bot" elif has_browser_ua and ua_ch_mismatch_pct < 10: classification = "legitimate_browser" else: classification = "suspicious" items.append({ "ja4": ja4, "classification": classification, "spoofing_score": spoof_score, "total_detections": int(row[1] or 0), "unique_ips": int(row[2] or 0), "indicators": { "ua_ch_mismatch_pct": ua_ch_mismatch_pct, "sni_mismatch_pct": sni_mismatch_pct, "alpn_mismatch_pct": alpn_mismatch_pct, "avg_browser_score": round(avg_browser_score, 1), "rare_ja4_pct": rare_ja4_pct, "ua_rotating_pct": ua_rotating_pct, "alpn_missing_count": alpn_missing_count, "avg_asn_concentration": round(float(row[18] or 0), 3), "avg_country_concentration": round(float(row[19] or 0), 3), }, "top_user_agents": [ {"ua": u, "type": _classify_ua(u)} for u in top_uas ], "threat_breakdown": { "critical": int(row[16] or 0), "high": int(row[17] or 0), "last_level": str(row[20] or "LOW"), }, }) # Trier: spoofed_browser d'abord, puis par score items.sort(key=lambda x: (-x["spoofing_score"], -x["total_detections"])) return { "items": items, "total": len(items), "period_hours": hours, "summary": { "spoofed_browser": sum(1 for i in items if i["classification"] == "spoofed_browser"), "known_bot": sum(1 for i in items if i["classification"] == "known_bot"), "suspicious": sum(1 for i in items if i["classification"] == "suspicious"), "legitimate_browser": sum(1 for i in items if i["classification"] == "legitimate_browser"), }, } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") # ============================================================================= # ENDPOINT 2 — Matrice JA4 × User-Agent # ============================================================================= @router.get("/ja4-ua-matrix") async def get_ja4_ua_matrix( hours: int = Query(24, ge=1, le=168), min_ips: int = Query(3, ge=1, description="Nombre minimum d'IPs pour inclure un JA4"), limit: int = Query(30, ge=1, le=100), ): """ Matrice JA4 × User-Agent. Pour chaque JA4: - Top User-Agents associés (depuis view_dashboard_entities) - Taux de ua_ch_mismatch - Classification UA (bot / browser / script) - Indicateur de spoofing si browser_score élevé + UA non-navigateur """ try: # Stats JA4 depuis ml_detected_anomalies stats_query = """ SELECT ja4, uniq(src_ip) AS unique_ips, count() AS total_detections, round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct, avg(modern_browser_score) AS avg_browser_score, countIf(is_rare_ja4 = true) AS rare_count, countIf(is_ua_rotating = true) AS rotating_count, argMax(threat_level, detected_at) AS last_threat FROM ml_detected_anomalies WHERE detected_at >= now() - INTERVAL %(hours)s HOUR AND ja4 != '' AND ja4 IS NOT NULL GROUP BY ja4 HAVING unique_ips >= %(min_ips)s ORDER BY ua_ch_mismatch_pct DESC, unique_ips DESC LIMIT %(limit)s """ stats_res = db.query(stats_query, {"hours": hours, "min_ips": min_ips, "limit": limit}) ja4_list = [str(r[0]) for r in stats_res.result_rows] if not ja4_list: return {"items": [], "total": 0, "period_hours": hours} # UAs par JA4 depuis view_dashboard_user_agents ja4_sql = ", ".join(f"'{j}'" for j in ja4_list) ua_query = f""" SELECT ja4, ua, sum(requests) AS cnt FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE ja4 IN ({ja4_sql}) AND hour >= now() - INTERVAL {hours} HOUR AND ua != '' GROUP BY ja4, ua ORDER BY ja4, cnt DESC """ ua_by_ja4: dict = {} try: ua_res = db.query(ua_query) for row in ua_res.result_rows: j4 = str(row[0]) if j4 not in ua_by_ja4: ua_by_ja4[j4] = [] if len(ua_by_ja4[j4]) < 8: ua_by_ja4[j4].append({"ua": str(row[1]), "count": int(row[2] or 0)}) except Exception: pass items = [] for row in stats_res.result_rows: ja4 = str(row[0]) unique_ips = int(row[1] or 0) ua_ch_mismatch_pct = float(row[3] or 0) avg_browser_score = float(row[4] or 0) top_uas = ua_by_ja4.get(ja4, []) ua_total = sum(u["count"] for u in top_uas) or 1 classified_uas = [] for u in top_uas: ua_type = _classify_ua(u["ua"]) classified_uas.append({ "ua": u["ua"], "count": u["count"], "pct": round(u["count"] * 100 / ua_total, 1), "type": ua_type, }) bot_pct = sum(u["pct"] for u in classified_uas if u["type"] == "bot") browser_pct = sum(u["pct"] for u in classified_uas if u["type"] == "browser") # Spoofing flag: JA4 ressemble à un navigateur (browser_score élevé) # mais les UAs sont des bots/scripts is_spoofing = avg_browser_score > 50 and bot_pct > 30 and ua_ch_mismatch_pct > 20 items.append({ "ja4": ja4, "unique_ips": unique_ips, "total_detections": int(row[2] or 0), "ua_ch_mismatch_pct": ua_ch_mismatch_pct, "avg_browser_score": round(avg_browser_score, 1), "rare_count": int(row[5] or 0), "rotating_count": int(row[6] or 0), "last_threat": str(row[7] or "LOW"), "user_agents": classified_uas, "ua_summary": { "bot_pct": round(bot_pct, 1), "browser_pct": round(browser_pct, 1), "script_pct": round(100 - bot_pct - browser_pct, 1), "total_distinct": len(top_uas), }, "is_spoofing_suspect": is_spoofing, }) return { "items": items, "total": len(items), "period_hours": hours, } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") # ============================================================================= # ENDPOINT 3 — Analyse globale des User-Agents # ============================================================================= @router.get("/ua-analysis") async def get_ua_analysis( hours: int = Query(24, ge=1, le=168), limit: int = Query(50, ge=1, le=200), ): """ Analyse globale des User-Agents dans les détections. Identifie: - UAs de type bot/script - UAs browser légitimes vs UAs browser utilisés par des bots (via ua_ch_mismatch) - UAs rares/suspects qui tournent (is_ua_rotating) - Distribution JA4 par UA pour détecter les UAs multi-fingerprints (rotation) """ try: # Top UAs globaux depuis view_dashboard_user_agents ua_global_query = """ SELECT ua, sum(requests) AS ip_count FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE hour >= now() - INTERVAL %(hours)s HOUR AND ua != '' GROUP BY ua ORDER BY ip_count DESC LIMIT %(limit)s """ ua_global_res = db.query(ua_global_query, {"hours": hours, "limit": limit}) top_uas = [str(r[0]) for r in ua_global_res.result_rows] # Pour chaque UA, chercher ses JA4 via view_dashboard_user_agents ua_sql = ", ".join(f"'{u.replace(chr(39), chr(39)*2)}'" for u in top_uas[:50]) if top_uas else "''" ja4_per_ua_query = f""" SELECT ua, uniq(ja4) AS unique_ja4s, groupUniqArray(3)(ja4) AS sample_ja4s FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE ua IN ({ua_sql}) AND hour >= now() - INTERVAL {hours} HOUR AND ua != '' AND ja4 != '' GROUP BY ua """ ja4_by_ua: dict = {} try: ja4_res = db.query(ja4_per_ua_query) for r in ja4_res.result_rows: ja4_by_ua[str(r[0])] = { "unique_ja4s": int(r[1] or 0), "sample_ja4s": list(r[2] or []), } except Exception: pass # IPs avec is_ua_rotating depuis ml_detected_anomalies rotating_query = """ SELECT replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS clean_ip, avg(ua_ch_mismatch) AS avg_ua_ch_mismatch FROM ml_detected_anomalies WHERE detected_at >= now() - INTERVAL %(hours)s HOUR AND is_ua_rotating = true GROUP BY clean_ip ORDER BY avg_ua_ch_mismatch DESC """ rotating_ips: list = [] try: rot_res = db.query(rotating_query, {"hours": hours}) rotating_ips = [str(r[0]) for r in rot_res.result_rows] except Exception: pass # Construire la réponse items = [] for row in ua_global_res.result_rows: ua = str(row[0]) ip_count = int(row[1] or 0) ua_type = _classify_ua(ua) ja4_info = ja4_by_ua.get(ua, {"unique_ja4s": 0, "sample_ja4s": []}) # UA multi-JA4 est suspect: un vrai navigateur a généralement 1-2 JA4 multi_ja4_flag = ja4_info["unique_ja4s"] > 3 items.append({ "user_agent": ua, "type": ua_type, "ip_count": ip_count, "unique_ja4_count": ja4_info["unique_ja4s"], "sample_ja4s": ja4_info["sample_ja4s"], "is_multi_ja4_suspect": multi_ja4_flag, "risk_flags": _build_ua_risk_flags(ua, ua_type, ja4_info["unique_ja4s"], ip_count), }) # IPs avec rotation d'UA ua_rotating_stats = { "rotating_ip_count": len(rotating_ips), "sample_rotating_ips": rotating_ips[:10], } return { "items": items, "total": len(items), "period_hours": hours, "ua_rotating_stats": ua_rotating_stats, "summary": { "bot_count": sum(1 for i in items if i["type"] == "bot"), "browser_count": sum(1 for i in items if i["type"] == "browser"), "script_count": sum(1 for i in items if i["type"] == "script"), "multi_ja4_suspect_count": sum(1 for i in items if i["is_multi_ja4_suspect"]), }, } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") def _build_ua_risk_flags(ua: str, ua_type: str, unique_ja4s: int, ip_count: int) -> list: flags = [] if ua_type == "bot": flags.append("ua_bot_signature") elif ua_type == "script": flags.append("ua_script_library") if unique_ja4s > 5: flags.append("ja4_rotation_suspect") if unique_ja4s > 3 and ua_type == "browser": flags.append("browser_ua_multi_fingerprint") if ip_count > 100: flags.append("high_volume") return flags # ============================================================================= # ENDPOINT 4 — JA4 d'un IP spécifique: analyse de cohérence UA/JA4 # ============================================================================= @router.get("/ip/{ip}/coherence") async def get_ip_fingerprint_coherence(ip: str): """ Analyse la cohérence JA4/UA pour une IP spécifique. Répond à la question: "Cette IP spoofait-elle son fingerprint?" Calcule un score de cohérence basé sur: - Correspondance entre JA4 (TLS client fingerprint) et User-Agent - ua_ch_mismatch (User-Agent vs Client Hints) - modern_browser_score vs type d'UA réel - Nombre de JA4 distincts utilisés (rotation) - sni_host_mismatch, alpn_http_mismatch """ try: # Données depuis ml_detected_anomalies ml_query = """ SELECT ja4, ua_ch_mismatch, modern_browser_score, sni_host_mismatch, alpn_http_mismatch, is_alpn_missing, is_rare_ja4, is_ua_rotating, distinct_ja4_count, header_count, has_accept_language, has_cookie, has_referer, header_order_shared_count, detected_at, threat_level, window_mss_ratio, tcp_jitter_variance, multiplexing_efficiency FROM ml_detected_anomalies WHERE src_ip = %(ip)s ORDER BY detected_at DESC """ ml_res = db.query(ml_query, {"ip": ip}) if not ml_res.result_rows: raise HTTPException(status_code=404, detail="IP non trouvée dans les détections") # User-agents réels depuis view_dashboard_user_agents ua_query = """ SELECT ua, sum(requests) AS cnt FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE toString(src_ip) = %(ip)s AND hour >= now() - INTERVAL 72 HOUR AND ua != '' GROUP BY ua ORDER BY cnt DESC """ ua_res = db.query(ua_query, {"ip": ip}) top_uas = [{"ua": str(r[0]), "count": int(r[1] or 0), "type": _classify_ua(str(r[0]))} for r in ua_res.result_rows] # Agréger les indicateurs de la dernière session rows = ml_res.result_rows latest = rows[0] total_rows = len(rows) ua_ch_mismatch_count = sum(1 for r in rows if r[1]) sni_mismatch_count = sum(1 for r in rows if r[3]) alpn_mismatch_count = sum(1 for r in rows if r[4]) is_rare_count = sum(1 for r in rows if r[6]) is_rotating = any(r[7] for r in rows) distinct_ja4s = {str(r[0]) for r in rows if r[0]} avg_browser_score = sum(int(r[2] or 0) for r in rows) / total_rows # UA analysis has_browser_ua = any(u["type"] == "browser" for u in top_uas) has_bot_ua = any(u["type"] == "bot" for u in top_uas) primary_ua_type = top_uas[0]["type"] if top_uas else "empty" # Calcul du score de spoofing spoof_score = min(100, round( (ua_ch_mismatch_count / total_rows * 100) * 0.40 + (avg_browser_score * 0.20 if has_bot_ua else 0) + (sni_mismatch_count / total_rows * 100) * 0.10 + (alpn_mismatch_count / total_rows * 100) * 0.05 + (len(distinct_ja4s) * 5 if len(distinct_ja4s) > 2 else 0) + (15 if is_rotating else 0) + (10 if is_rare_count > total_rows * 0.5 else 0) )) # Verdict if spoof_score >= 70: verdict = "high_confidence_spoofing" elif spoof_score >= 40: verdict = "suspicious_spoofing" elif has_bot_ua and avg_browser_score < 20: verdict = "known_bot_no_spoofing" elif has_browser_ua and spoof_score < 20: verdict = "legitimate_browser" else: verdict = "inconclusive" # Explication humaine explanation = [] if ua_ch_mismatch_count > total_rows * 0.3: explanation.append(f"UA-Client-Hints mismatch sur {round(ua_ch_mismatch_count*100/total_rows)}% des requêtes") if has_bot_ua and avg_browser_score > 40: explanation.append(f"JA4 ressemble à un navigateur (score {round(avg_browser_score)}/100) mais UA est de type bot") if len(distinct_ja4s) > 2: explanation.append(f"{len(distinct_ja4s)} JA4 distincts utilisés → rotation de fingerprint") if is_rotating: explanation.append("is_ua_rotating détecté → rotation d'User-Agent confirmée") if sni_mismatch_count > 0: explanation.append(f"SNI ≠ Host header sur {sni_mismatch_count}/{total_rows} requêtes") if not explanation: explanation.append("Aucun indicateur de spoofing majeur détecté") return { "ip": ip, "verdict": verdict, "spoofing_score": spoof_score, "explanation": explanation, "indicators": { "ua_ch_mismatch_rate": round(ua_ch_mismatch_count / total_rows * 100, 1), "sni_mismatch_rate": round(sni_mismatch_count / total_rows * 100, 1), "alpn_mismatch_rate": round(alpn_mismatch_count / total_rows * 100, 1), "avg_browser_score": round(avg_browser_score, 1), "distinct_ja4_count": len(distinct_ja4s), "is_ua_rotating": is_rotating, "rare_ja4_rate": round(is_rare_count / total_rows * 100, 1), }, "fingerprints": { "ja4_list": list(distinct_ja4s), "latest_ja4": str(latest[0] or ""), }, "user_agents": top_uas, "latest_detection": { "detected_at": latest[14].isoformat() if latest[14] else "", "threat_level": str(latest[15] or "LOW"), "modern_browser_score": int(latest[2] or 0), "header_count": int(latest[9] or 0), "has_accept_language": bool(latest[10]), "has_cookie": bool(latest[11]), "has_referer": bool(latest[12]), "header_order_shared_count": int(latest[13] or 0), }, } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") # ============================================================================= # ENDPOINT 5 — JA4 légitimes (baseline / whitelist) # ============================================================================= @router.get("/legitimate-ja4") async def get_legitimate_ja4( hours: int = Query(168, ge=24, le=720, description="Fenêtre pour établir la baseline"), min_ips: int = Query(50, ge=5, description="Nombre minimum d'IPs pour qualifier un JA4 de légitime"), ): """ Établit une baseline des JA4 fingerprints légitimes. Un JA4 est considéré légitime si: - Il est utilisé par un grand nombre d'IPs distinctes (> min_ips) - Son taux de ua_ch_mismatch est faible (< 5%) - Son modern_browser_score est élevé (> 60) - Il n'est PAS is_rare_ja4 - Ses UAs sont dominés par des navigateurs connus Utile comme whitelist pour réduire les faux positifs. """ try: query = """ SELECT ja4, uniq(src_ip) AS unique_ips, count() AS total_detections, round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct, avg(modern_browser_score) AS avg_browser_score, countIf(is_rare_ja4 = true) AS rare_count, round(countIf(threat_level = 'CRITICAL') * 100.0 / count(), 2) AS critical_pct, round(countIf(threat_level = 'HIGH') * 100.0 / count(), 2) AS high_pct FROM ml_detected_anomalies WHERE detected_at >= now() - INTERVAL %(hours)s HOUR AND ja4 != '' AND ja4 IS NOT NULL GROUP BY ja4 HAVING unique_ips >= %(min_ips)s AND ua_ch_mismatch_pct < 5.0 AND avg_browser_score > 60 AND rare_count = 0 ORDER BY unique_ips DESC """ result = db.query(query, {"hours": hours, "min_ips": min_ips}) items = [ { "ja4": str(row[0]), "unique_ips": int(row[1] or 0), "total_detections": int(row[2] or 0), "ua_ch_mismatch_pct": float(row[3] or 0), "avg_browser_score": round(float(row[4] or 0), 1), "critical_pct": float(row[6] or 0), "high_pct": float(row[7] or 0), "legitimacy_confidence": min(100, round( (1 - float(row[3] or 0) / 100) * 40 + float(row[4] or 0) * 0.40 + min(int(row[1] or 0) / min_ips, 1) * 20 )), } for row in result.result_rows ] return { "items": items, "total": len(items), "period_hours": hours, "note": "Ces JA4 sont candidats à une whitelist. Vérifier manuellement avant de whitelister.", } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") # ============================================================================= # ENDPOINT — Corrélation JA4 × ASN / Pays (C5) # Détecte les JA4 fortement concentrés sur un seul ASN ou pays # → signal de botnet ciblé ou d'infrastructure de test/attaque partagée # ============================================================================= @router.get("/asn-correlation") async def get_ja4_asn_correlation( min_concentration: float = Query(0.7, ge=0.0, le=1.0, description="Seuil min de concentration ASN ou pays"), min_ips: int = Query(5, ge=1, description="Nombre minimum d'IPs par JA4"), limit: int = Query(50, ge=1, le=200), ): """ Identifie les JA4 fingerprints fortement concentrés sur un seul ASN ou pays. Un JA4 avec asn_concentration ≥ 0.7 signifie que ≥70% des IPs utilisant ce fingerprint proviennent du même ASN → infrastructure de bot partagée ou datacenter suspect. """ try: # Two-pass: first aggregate per (ja4, asn) to get IP counts per ASN, # then aggregate per ja4 to compute concentration ratio sql = """ SELECT ja4, sum(ips_per_combo) AS unique_ips, uniq(src_asn) AS unique_asns, uniq(src_country_code) AS unique_countries, toString(argMax(src_asn, ips_per_combo)) AS top_asn_number, argMax(asn_name, ips_per_combo) AS top_asn_name, argMax(src_country_code, country_ips) AS dominant_country, sum(total_hits) AS total_hits, round(max(ips_per_combo) / greatest(sum(ips_per_combo), 1), 3) AS asn_concentration, round(max(country_ips) / greatest(sum(ips_per_combo), 1), 3) AS country_concentration FROM ( SELECT ja4, src_asn, src_country_code, any(src_as_name) AS asn_name, uniq(src_ip) AS ips_per_combo, uniq(src_ip) AS country_ips, sum(hits) AS total_hits FROM mabase_prod.agg_host_ip_ja4_1h WHERE window_start >= now() - INTERVAL 24 HOUR AND ja4 != '' GROUP BY ja4, src_asn, src_country_code ) GROUP BY ja4 HAVING unique_ips >= %(min_ips)s AND (asn_concentration >= %(min_conc)s OR country_concentration >= %(min_conc)s) ORDER BY asn_concentration DESC, unique_ips DESC LIMIT %(limit)s """ result = db.query(sql, {"min_ips": min_ips, "min_conc": min_concentration, "limit": limit}) items = [] for row in result.result_rows: ja4 = str(row[0]) unique_ips = int(row[1]) unique_asns = int(row[2]) unique_countries = int(row[3]) top_asn_number = str(row[4] or "") top_asn_name = str(row[5] or "") dominant_country = str(row[6] or "") total_hits = int(row[7] or 0) asn_concentration = float(row[8] or 0) country_concentration = float(row[9] or 0) if asn_concentration >= 0.85: corr_type, risk = "asn_monopoly", "high" elif asn_concentration >= min_concentration: corr_type, risk = "asn_dominant", "medium" elif country_concentration >= min_concentration: corr_type, risk = "geo_targeted", "medium" else: corr_type, risk = "distributed", "low" items.append({ "ja4": ja4, "unique_ips": unique_ips, "unique_asns": unique_asns, "unique_countries": unique_countries, "top_asn_name": top_asn_name, "top_asn_number": top_asn_number, "dominant_country": dominant_country, "total_hits": total_hits, "asn_concentration": asn_concentration, "country_concentration":country_concentration, "correlation_type": corr_type, "risk": risk, }) return {"items": items, "total": len(items)} except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")