""" Endpoints pour l'analyse des fingerprints JA4 et User-Agents Objectifs: - Détecter le spoofing JA4 (fingerprint TLS qui prétend être un navigateur mais dont les User-Agents, les headers HTTP ou les métriques comportementales trahissent une origine bot/script) - Construire une matrice JA4 × User-Agent pour visualiser les associations suspectes - Analyser la distribution des User-Agents pour identifier les rotateurs et les bots qui usurpent des UA de navigateurs légitimes """ from fastapi import APIRouter, HTTPException, Query import re from ..database import db router = APIRouter(prefix="/api/fingerprints", tags=["fingerprints"]) # ─── Helpers ────────────────────────────────────────────────────────────────── # Patterns indiquant clairement un bot/script sans simulation de navigateur _BOT_PATTERNS = re.compile( r"bot|crawler|spider|scraper|python|curl|wget|go-http|java/|axios|" r"libwww|httpclient|okhttp|requests|aiohttp|httpx|playwright|puppeteer|" r"selenium|headless|phantomjs", re.IGNORECASE, ) # Navigateurs légitimes communs — un JA4 de type "browser" devrait venir avec ces UAs _BROWSER_PATTERNS = re.compile( r"mozilla|chrome|safari|firefox|edge|opera|trident", re.IGNORECASE, ) def _classify_ua(ua: str) -> str: """Retourne 'bot', 'browser', ou 'script'""" if not ua: return "empty" if _BOT_PATTERNS.search(ua): return "bot" if _BROWSER_PATTERNS.search(ua): return "browser" return "script" # ============================================================================= # ENDPOINT 1 — Détection de spoofing JA4 # ============================================================================= @router.get("/spoofing") async def get_ja4_spoofing( hours: int = Query(24, ge=1, le=168, description="Fenêtre temporelle"), min_detections: int = Query(10, ge=1, description="Nombre minimum de détections"), limit: int = Query(50, ge=1, le=200), ): """ Identifie les JA4 fingerprints suspects de spoofing navigateur. Un JA4 est considéré suspect quand: - Il présente un taux élevé de ua_ch_mismatch (header UA ≠ Client Hints) - Son modern_browser_score est élevé mais les UAs associés sont des bots/scripts - Il apparaît avec un taux élevé de sni_host_mismatch ou alpn_http_mismatch - is_rare_ja4 = true avec un volume important Retourne un score de confiance de spoofing [0-100] pour chaque JA4. """ try: # Agrégation par JA4 avec tous les indicateurs de spoofing query = """ SELECT ja4, count() AS total_detections, uniq(src_ip) AS unique_ips, -- Indicateurs de mismatch countIf(ua_ch_mismatch = true) AS ua_ch_mismatch_count, round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct, countIf(sni_host_mismatch = true) AS sni_mismatch_count, round(countIf(sni_host_mismatch = true) * 100.0 / count(), 2) AS sni_mismatch_pct, countIf(alpn_http_mismatch = true) AS alpn_mismatch_count, round(countIf(alpn_http_mismatch = true) * 100.0 / count(), 2) AS alpn_mismatch_pct, -- Indicateurs comportementaux avg(modern_browser_score) AS avg_browser_score, countIf(is_rare_ja4 = true) AS rare_ja4_count, round(countIf(is_rare_ja4 = true) * 100.0 / count(), 2) AS rare_ja4_pct, countIf(is_ua_rotating = true) AS ua_rotating_count, round(countIf(is_ua_rotating = true) * 100.0 / count(), 2) AS ua_rotating_pct, -- Métriques TLS/TCP countIf(is_alpn_missing = true) AS alpn_missing_count, avg(distinct_ja4_count) AS avg_distinct_ja4_per_ip, -- Répartition threat levels countIf(threat_level = 'CRITICAL') AS critical_count, countIf(threat_level = 'HIGH') AS high_count, -- Botnet indicators avg(ja4_asn_concentration) AS avg_asn_concentration, avg(ja4_country_concentration) AS avg_country_concentration, argMax(threat_level, detected_at) AS last_threat_level FROM ml_detected_anomalies WHERE detected_at >= now() - INTERVAL %(hours)s HOUR AND ja4 != '' AND ja4 IS NOT NULL GROUP BY ja4 HAVING total_detections >= %(min_detections)s ORDER BY ua_ch_mismatch_pct DESC, total_detections DESC LIMIT %(limit)s """ result = db.query(query, { "hours": hours, "min_detections": min_detections, "limit": limit, }) # Fetch top UA per JA4 from view_dashboard_user_agents ja4_list = [str(r[0]) for r in result.result_rows if r[0]] ua_by_ja4: dict = {} if ja4_list: ja4_sql = ", ".join(f"'{j}'" for j in ja4_list[:100]) ua_q = f""" SELECT ja4, groupArray(5)(ua) AS top_uas FROM ( SELECT ja4, arrayJoin(user_agents) AS ua, sum(requests) AS cnt FROM view_dashboard_user_agents WHERE ja4 IN ({ja4_sql}) AND hour >= now() - INTERVAL {hours} HOUR AND ua != '' GROUP BY ja4, ua ORDER BY ja4, cnt DESC ) GROUP BY ja4 """ try: ua_res = db.query(ua_q) for ua_row in ua_res.result_rows: j4 = str(ua_row[0]) if ua_row[1]: ua_by_ja4[j4] = list(ua_row[1]) except Exception: pass items = [] for row in result.result_rows: ja4 = str(row[0]) ua_ch_mismatch_pct = float(row[4] or 0) sni_mismatch_pct = float(row[6] or 0) alpn_mismatch_pct = float(row[8] or 0) avg_browser_score = float(row[9] or 0) rare_ja4_pct = float(row[11] or 0) ua_rotating_pct = float(row[13] or 0) alpn_missing_count = int(row[14] or 0) total = int(row[1] or 1) top_uas = ua_by_ja4.get(ja4, []) ua_classes = [_classify_ua(u) for u in top_uas] has_bot_ua = any(c == "bot" for c in ua_classes) has_browser_ua = any(c == "browser" for c in ua_classes) # Spoofing confidence score [0-100]: # UA/CH mismatch est le signal le plus fort (poids 40) # Browser UA avec score navigateur élevé mais indicateurs bot (poids 25) # SNI/ALPN mismatches (poids 15) # is_rare_ja4 avec gros volume (poids 10) # UA rotating (poids 10) spoof_score = min(100, round( ua_ch_mismatch_pct * 0.40 + (avg_browser_score * 25 / 100 if has_bot_ua else 0) + sni_mismatch_pct * 0.10 + alpn_mismatch_pct * 0.05 + rare_ja4_pct * 0.10 + ua_rotating_pct * 0.10 + (10 if alpn_missing_count > total * 0.3 else 0) )) # Classification du JA4 if spoof_score >= 60: classification = "spoofed_browser" elif has_bot_ua and avg_browser_score < 30: classification = "known_bot" elif has_browser_ua and ua_ch_mismatch_pct < 10: classification = "legitimate_browser" else: classification = "suspicious" items.append({ "ja4": ja4, "classification": classification, "spoofing_score": spoof_score, "total_detections": int(row[1] or 0), "unique_ips": int(row[2] or 0), "indicators": { "ua_ch_mismatch_pct": ua_ch_mismatch_pct, "sni_mismatch_pct": sni_mismatch_pct, "alpn_mismatch_pct": alpn_mismatch_pct, "avg_browser_score": round(avg_browser_score, 1), "rare_ja4_pct": rare_ja4_pct, "ua_rotating_pct": ua_rotating_pct, "alpn_missing_count": alpn_missing_count, "avg_asn_concentration": round(float(row[18] or 0), 3), "avg_country_concentration": round(float(row[19] or 0), 3), }, "top_user_agents": [ {"ua": u, "type": _classify_ua(u)} for u in top_uas ], "threat_breakdown": { "critical": int(row[16] or 0), "high": int(row[17] or 0), "last_level": str(row[20] or "LOW"), }, }) # Trier: spoofed_browser d'abord, puis par score items.sort(key=lambda x: (-x["spoofing_score"], -x["total_detections"])) return { "items": items, "total": len(items), "period_hours": hours, "summary": { "spoofed_browser": sum(1 for i in items if i["classification"] == "spoofed_browser"), "known_bot": sum(1 for i in items if i["classification"] == "known_bot"), "suspicious": sum(1 for i in items if i["classification"] == "suspicious"), "legitimate_browser": sum(1 for i in items if i["classification"] == "legitimate_browser"), }, } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") # ============================================================================= # ENDPOINT 2 — Matrice JA4 × User-Agent # ============================================================================= @router.get("/ja4-ua-matrix") async def get_ja4_ua_matrix( hours: int = Query(24, ge=1, le=168), min_ips: int = Query(3, ge=1, description="Nombre minimum d'IPs pour inclure un JA4"), limit: int = Query(30, ge=1, le=100), ): """ Matrice JA4 × User-Agent. Pour chaque JA4: - Top User-Agents associés (depuis view_dashboard_entities) - Taux de ua_ch_mismatch - Classification UA (bot / browser / script) - Indicateur de spoofing si browser_score élevé + UA non-navigateur """ try: # Stats JA4 depuis ml_detected_anomalies stats_query = """ SELECT ja4, uniq(src_ip) AS unique_ips, count() AS total_detections, round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct, avg(modern_browser_score) AS avg_browser_score, countIf(is_rare_ja4 = true) AS rare_count, countIf(is_ua_rotating = true) AS rotating_count, argMax(threat_level, detected_at) AS last_threat FROM ml_detected_anomalies WHERE detected_at >= now() - INTERVAL %(hours)s HOUR AND ja4 != '' AND ja4 IS NOT NULL GROUP BY ja4 HAVING unique_ips >= %(min_ips)s ORDER BY ua_ch_mismatch_pct DESC, unique_ips DESC LIMIT %(limit)s """ stats_res = db.query(stats_query, {"hours": hours, "min_ips": min_ips, "limit": limit}) ja4_list = [str(r[0]) for r in stats_res.result_rows] if not ja4_list: return {"items": [], "total": 0, "period_hours": hours} # UAs par JA4 depuis view_dashboard_user_agents ja4_sql = ", ".join(f"'{j}'" for j in ja4_list) ua_query = f""" SELECT ja4, ua, sum(requests) AS cnt FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE ja4 IN ({ja4_sql}) AND hour >= now() - INTERVAL {hours} HOUR AND ua != '' GROUP BY ja4, ua ORDER BY ja4, cnt DESC """ ua_by_ja4: dict = {} try: ua_res = db.query(ua_query) for row in ua_res.result_rows: j4 = str(row[0]) if j4 not in ua_by_ja4: ua_by_ja4[j4] = [] if len(ua_by_ja4[j4]) < 8: ua_by_ja4[j4].append({"ua": str(row[1]), "count": int(row[2] or 0)}) except Exception: pass items = [] for row in stats_res.result_rows: ja4 = str(row[0]) unique_ips = int(row[1] or 0) ua_ch_mismatch_pct = float(row[3] or 0) avg_browser_score = float(row[4] or 0) top_uas = ua_by_ja4.get(ja4, []) ua_total = sum(u["count"] for u in top_uas) or 1 classified_uas = [] for u in top_uas: ua_type = _classify_ua(u["ua"]) classified_uas.append({ "ua": u["ua"], "count": u["count"], "pct": round(u["count"] * 100 / ua_total, 1), "type": ua_type, }) bot_pct = sum(u["pct"] for u in classified_uas if u["type"] == "bot") browser_pct = sum(u["pct"] for u in classified_uas if u["type"] == "browser") # Spoofing flag: JA4 ressemble à un navigateur (browser_score élevé) # mais les UAs sont des bots/scripts is_spoofing = avg_browser_score > 50 and bot_pct > 30 and ua_ch_mismatch_pct > 20 items.append({ "ja4": ja4, "unique_ips": unique_ips, "total_detections": int(row[2] or 0), "ua_ch_mismatch_pct": ua_ch_mismatch_pct, "avg_browser_score": round(avg_browser_score, 1), "rare_count": int(row[5] or 0), "rotating_count": int(row[6] or 0), "last_threat": str(row[7] or "LOW"), "user_agents": classified_uas, "ua_summary": { "bot_pct": round(bot_pct, 1), "browser_pct": round(browser_pct, 1), "script_pct": round(100 - bot_pct - browser_pct, 1), "total_distinct": len(top_uas), }, "is_spoofing_suspect": is_spoofing, }) return { "items": items, "total": len(items), "period_hours": hours, } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") # ============================================================================= # ENDPOINT 3 — Analyse globale des User-Agents # ============================================================================= @router.get("/ua-analysis") async def get_ua_analysis( hours: int = Query(24, ge=1, le=168), limit: int = Query(50, ge=1, le=200), ): """ Analyse globale des User-Agents dans les détections. Identifie: - UAs de type bot/script - UAs browser légitimes vs UAs browser utilisés par des bots (via ua_ch_mismatch) - UAs rares/suspects qui tournent (is_ua_rotating) - Distribution JA4 par UA pour détecter les UAs multi-fingerprints (rotation) """ try: # Top UAs globaux depuis view_dashboard_user_agents ua_global_query = """ SELECT ua, sum(requests) AS ip_count FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE hour >= now() - INTERVAL %(hours)s HOUR AND ua != '' GROUP BY ua ORDER BY ip_count DESC LIMIT %(limit)s """ ua_global_res = db.query(ua_global_query, {"hours": hours, "limit": limit}) top_uas = [str(r[0]) for r in ua_global_res.result_rows] # Pour chaque UA, chercher ses JA4 via view_dashboard_user_agents ua_sql = ", ".join(f"'{u.replace(chr(39), chr(39)*2)}'" for u in top_uas[:50]) if top_uas else "''" ja4_per_ua_query = f""" SELECT ua, uniq(ja4) AS unique_ja4s, groupUniqArray(3)(ja4) AS sample_ja4s FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE ua IN ({ua_sql}) AND hour >= now() - INTERVAL {hours} HOUR AND ua != '' AND ja4 != '' GROUP BY ua """ ja4_by_ua: dict = {} try: ja4_res = db.query(ja4_per_ua_query) for r in ja4_res.result_rows: ja4_by_ua[str(r[0])] = { "unique_ja4s": int(r[1] or 0), "sample_ja4s": list(r[2] or []), } except Exception: pass # IPs avec is_ua_rotating depuis ml_detected_anomalies rotating_query = """ SELECT replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS clean_ip, avg(ua_ch_mismatch) AS avg_ua_ch_mismatch FROM ml_detected_anomalies WHERE detected_at >= now() - INTERVAL %(hours)s HOUR AND is_ua_rotating = true GROUP BY clean_ip ORDER BY avg_ua_ch_mismatch DESC """ rotating_ips: list = [] try: rot_res = db.query(rotating_query, {"hours": hours}) rotating_ips = [str(r[0]) for r in rot_res.result_rows] except Exception: pass # Construire la réponse items = [] for row in ua_global_res.result_rows: ua = str(row[0]) ip_count = int(row[1] or 0) ua_type = _classify_ua(ua) ja4_info = ja4_by_ua.get(ua, {"unique_ja4s": 0, "sample_ja4s": []}) # UA multi-JA4 est suspect: un vrai navigateur a généralement 1-2 JA4 multi_ja4_flag = ja4_info["unique_ja4s"] > 3 items.append({ "user_agent": ua, "type": ua_type, "ip_count": ip_count, "unique_ja4_count": ja4_info["unique_ja4s"], "sample_ja4s": ja4_info["sample_ja4s"], "is_multi_ja4_suspect": multi_ja4_flag, "risk_flags": _build_ua_risk_flags(ua, ua_type, ja4_info["unique_ja4s"], ip_count), }) # IPs avec rotation d'UA ua_rotating_stats = { "rotating_ip_count": len(rotating_ips), "sample_rotating_ips": rotating_ips[:10], } return { "items": items, "total": len(items), "period_hours": hours, "ua_rotating_stats": ua_rotating_stats, "summary": { "bot_count": sum(1 for i in items if i["type"] == "bot"), "browser_count": sum(1 for i in items if i["type"] == "browser"), "script_count": sum(1 for i in items if i["type"] == "script"), "multi_ja4_suspect_count": sum(1 for i in items if i["is_multi_ja4_suspect"]), }, } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") def _build_ua_risk_flags(ua: str, ua_type: str, unique_ja4s: int, ip_count: int) -> list: flags = [] if ua_type == "bot": flags.append("ua_bot_signature") elif ua_type == "script": flags.append("ua_script_library") if unique_ja4s > 5: flags.append("ja4_rotation_suspect") if unique_ja4s > 3 and ua_type == "browser": flags.append("browser_ua_multi_fingerprint") if ip_count > 100: flags.append("high_volume") return flags # ============================================================================= # ENDPOINT 4 — JA4 d'un IP spécifique: analyse de cohérence UA/JA4 # ============================================================================= @router.get("/ip/{ip}/coherence") async def get_ip_fingerprint_coherence(ip: str): """ Analyse la cohérence JA4/UA pour une IP spécifique. Répond à la question: "Cette IP spoofait-elle son fingerprint?" Calcule un score de cohérence basé sur: - Correspondance entre JA4 (TLS client fingerprint) et User-Agent - ua_ch_mismatch (User-Agent vs Client Hints) - modern_browser_score vs type d'UA réel - Nombre de JA4 distincts utilisés (rotation) - sni_host_mismatch, alpn_http_mismatch """ try: # Données depuis ml_detected_anomalies ml_query = """ SELECT ja4, ua_ch_mismatch, modern_browser_score, sni_host_mismatch, alpn_http_mismatch, is_alpn_missing, is_rare_ja4, is_ua_rotating, distinct_ja4_count, header_count, has_accept_language, has_cookie, has_referer, header_order_shared_count, detected_at, threat_level, window_mss_ratio, tcp_jitter_variance, multiplexing_efficiency FROM ml_detected_anomalies WHERE src_ip = %(ip)s ORDER BY detected_at DESC """ ml_res = db.query(ml_query, {"ip": ip}) if not ml_res.result_rows: raise HTTPException(status_code=404, detail="IP non trouvée dans les détections") # User-agents réels depuis view_dashboard_user_agents ua_query = """ SELECT ua, sum(requests) AS cnt FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE toString(src_ip) = %(ip)s AND hour >= now() - INTERVAL 72 HOUR AND ua != '' GROUP BY ua ORDER BY cnt DESC """ ua_res = db.query(ua_query, {"ip": ip}) top_uas = [{"ua": str(r[0]), "count": int(r[1] or 0), "type": _classify_ua(str(r[0]))} for r in ua_res.result_rows] # Agréger les indicateurs de la dernière session rows = ml_res.result_rows latest = rows[0] total_rows = len(rows) ua_ch_mismatch_count = sum(1 for r in rows if r[1]) sni_mismatch_count = sum(1 for r in rows if r[3]) alpn_mismatch_count = sum(1 for r in rows if r[4]) is_rare_count = sum(1 for r in rows if r[6]) is_rotating = any(r[7] for r in rows) distinct_ja4s = {str(r[0]) for r in rows if r[0]} avg_browser_score = sum(int(r[2] or 0) for r in rows) / total_rows # UA analysis has_browser_ua = any(u["type"] == "browser" for u in top_uas) has_bot_ua = any(u["type"] == "bot" for u in top_uas) primary_ua_type = top_uas[0]["type"] if top_uas else "empty" # Calcul du score de spoofing spoof_score = min(100, round( (ua_ch_mismatch_count / total_rows * 100) * 0.40 + (avg_browser_score * 0.20 if has_bot_ua else 0) + (sni_mismatch_count / total_rows * 100) * 0.10 + (alpn_mismatch_count / total_rows * 100) * 0.05 + (len(distinct_ja4s) * 5 if len(distinct_ja4s) > 2 else 0) + (15 if is_rotating else 0) + (10 if is_rare_count > total_rows * 0.5 else 0) )) # Verdict if spoof_score >= 70: verdict = "high_confidence_spoofing" elif spoof_score >= 40: verdict = "suspicious_spoofing" elif has_bot_ua and avg_browser_score < 20: verdict = "known_bot_no_spoofing" elif has_browser_ua and spoof_score < 20: verdict = "legitimate_browser" else: verdict = "inconclusive" # Explication humaine explanation = [] if ua_ch_mismatch_count > total_rows * 0.3: explanation.append(f"UA-Client-Hints mismatch sur {round(ua_ch_mismatch_count*100/total_rows)}% des requêtes") if has_bot_ua and avg_browser_score > 40: explanation.append(f"JA4 ressemble à un navigateur (score {round(avg_browser_score)}/100) mais UA est de type bot") if len(distinct_ja4s) > 2: explanation.append(f"{len(distinct_ja4s)} JA4 distincts utilisés → rotation de fingerprint") if is_rotating: explanation.append("is_ua_rotating détecté → rotation d'User-Agent confirmée") if sni_mismatch_count > 0: explanation.append(f"SNI ≠ Host header sur {sni_mismatch_count}/{total_rows} requêtes") if not explanation: explanation.append("Aucun indicateur de spoofing majeur détecté") return { "ip": ip, "verdict": verdict, "spoofing_score": spoof_score, "explanation": explanation, "indicators": { "ua_ch_mismatch_rate": round(ua_ch_mismatch_count / total_rows * 100, 1), "sni_mismatch_rate": round(sni_mismatch_count / total_rows * 100, 1), "alpn_mismatch_rate": round(alpn_mismatch_count / total_rows * 100, 1), "avg_browser_score": round(avg_browser_score, 1), "distinct_ja4_count": len(distinct_ja4s), "is_ua_rotating": is_rotating, "rare_ja4_rate": round(is_rare_count / total_rows * 100, 1), }, "fingerprints": { "ja4_list": list(distinct_ja4s), "latest_ja4": str(latest[0] or ""), }, "user_agents": top_uas, "latest_detection": { "detected_at": latest[14].isoformat() if latest[14] else "", "threat_level": str(latest[15] or "LOW"), "modern_browser_score": int(latest[2] or 0), "header_count": int(latest[9] or 0), "has_accept_language": bool(latest[10]), "has_cookie": bool(latest[11]), "has_referer": bool(latest[12]), "header_order_shared_count": int(latest[13] or 0), }, } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") # ============================================================================= # ENDPOINT 5 — JA4 légitimes (baseline / whitelist) # ============================================================================= @router.get("/legitimate-ja4") async def get_legitimate_ja4( hours: int = Query(168, ge=24, le=720, description="Fenêtre pour établir la baseline"), min_ips: int = Query(50, ge=5, description="Nombre minimum d'IPs pour qualifier un JA4 de légitime"), ): """ Établit une baseline des JA4 fingerprints légitimes. Un JA4 est considéré légitime si: - Il est utilisé par un grand nombre d'IPs distinctes (> min_ips) - Son taux de ua_ch_mismatch est faible (< 5%) - Son modern_browser_score est élevé (> 60) - Il n'est PAS is_rare_ja4 - Ses UAs sont dominés par des navigateurs connus Utile comme whitelist pour réduire les faux positifs. """ try: query = """ SELECT ja4, uniq(src_ip) AS unique_ips, count() AS total_detections, round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct, avg(modern_browser_score) AS avg_browser_score, countIf(is_rare_ja4 = true) AS rare_count, round(countIf(threat_level = 'CRITICAL') * 100.0 / count(), 2) AS critical_pct, round(countIf(threat_level = 'HIGH') * 100.0 / count(), 2) AS high_pct FROM ml_detected_anomalies WHERE detected_at >= now() - INTERVAL %(hours)s HOUR AND ja4 != '' AND ja4 IS NOT NULL GROUP BY ja4 HAVING unique_ips >= %(min_ips)s AND ua_ch_mismatch_pct < 5.0 AND avg_browser_score > 60 AND rare_count = 0 ORDER BY unique_ips DESC """ result = db.query(query, {"hours": hours, "min_ips": min_ips}) items = [ { "ja4": str(row[0]), "unique_ips": int(row[1] or 0), "total_detections": int(row[2] or 0), "ua_ch_mismatch_pct": float(row[3] or 0), "avg_browser_score": round(float(row[4] or 0), 1), "critical_pct": float(row[6] or 0), "high_pct": float(row[7] or 0), "legitimacy_confidence": min(100, round( (1 - float(row[3] or 0) / 100) * 40 + float(row[4] or 0) * 0.40 + min(int(row[1] or 0) / min_ips, 1) * 20 )), } for row in result.result_rows ] return { "items": items, "total": len(items), "period_hours": hours, "note": "Ces JA4 sont candidats à une whitelist. Vérifier manuellement avant de whitelister.", } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") # ============================================================================= # ENDPOINT — Corrélation JA4 × ASN / Pays (C5) # Détecte les JA4 fortement concentrés sur un seul ASN ou pays # → signal de botnet ciblé ou d'infrastructure de test/attaque partagée # ============================================================================= @router.get("/asn-correlation") async def get_ja4_asn_correlation( min_concentration: float = Query(0.7, ge=0.0, le=1.0, description="Seuil min de concentration ASN ou pays"), min_ips: int = Query(5, ge=1, description="Nombre minimum d'IPs par JA4"), limit: int = Query(50, ge=1, le=200), ): """ Identifie les JA4 fingerprints fortement concentrés sur un seul ASN ou pays. Un JA4 avec asn_concentration ≥ 0.7 signifie que ≥70% des IPs utilisant ce fingerprint proviennent du même ASN → infrastructure de bot partagée ou datacenter suspect. """ try: # Two-pass: first aggregate per (ja4, asn) to get IP counts per ASN, # then aggregate per ja4 to compute concentration ratio sql = """ SELECT ja4, sum(ips_per_combo) AS unique_ips, uniq(src_asn) AS unique_asns, uniq(src_country_code) AS unique_countries, toString(argMax(src_asn, ips_per_combo)) AS top_asn_number, argMax(asn_name, ips_per_combo) AS top_asn_name, argMax(src_country_code, country_ips) AS dominant_country, sum(total_hits) AS total_hits, round(max(ips_per_combo) / greatest(sum(ips_per_combo), 1), 3) AS asn_concentration, round(max(country_ips) / greatest(sum(ips_per_combo), 1), 3) AS country_concentration FROM ( SELECT ja4, src_asn, src_country_code, any(src_as_name) AS asn_name, uniq(src_ip) AS ips_per_combo, uniq(src_ip) AS country_ips, sum(hits) AS total_hits FROM mabase_prod.agg_host_ip_ja4_1h WHERE window_start >= now() - INTERVAL 24 HOUR AND ja4 != '' GROUP BY ja4, src_asn, src_country_code ) GROUP BY ja4 HAVING unique_ips >= %(min_ips)s AND (asn_concentration >= %(min_conc)s OR country_concentration >= %(min_conc)s) ORDER BY asn_concentration DESC, unique_ips DESC LIMIT %(limit)s """ result = db.query(sql, {"min_ips": min_ips, "min_conc": min_concentration, "limit": limit}) items = [] for row in result.result_rows: ja4 = str(row[0]) unique_ips = int(row[1]) unique_asns = int(row[2]) unique_countries = int(row[3]) top_asn_number = str(row[4] or "") top_asn_name = str(row[5] or "") dominant_country = str(row[6] or "") total_hits = int(row[7] or 0) asn_concentration = float(row[8] or 0) country_concentration = float(row[9] or 0) if asn_concentration >= 0.85: corr_type, risk = "asn_monopoly", "high" elif asn_concentration >= min_concentration: corr_type, risk = "asn_dominant", "medium" elif country_concentration >= min_concentration: corr_type, risk = "geo_targeted", "medium" else: corr_type, risk = "distributed", "low" items.append({ "ja4": ja4, "unique_ips": unique_ips, "unique_asns": unique_asns, "unique_countries": unique_countries, "top_asn_name": top_asn_name, "top_asn_number": top_asn_number, "dominant_country": dominant_country, "total_hits": total_hits, "asn_concentration": asn_concentration, "country_concentration":country_concentration, "correlation_type": corr_type, "risk": risk, }) return {"items": items, "total": len(items)} except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")