""" Endpoint d'investigation enrichie pour une IP donnée. Agrège en une seule requête les données provenant de toutes les sources : ml_detected_anomalies, view_form_bruteforce_detected, view_tcp_spoofing_detected, agg_host_ip_ja4_1h (rotation JA4), view_ip_recurrence, view_ai_features_1h. """ from fastapi import APIRouter, HTTPException from ..database import db from ..services.tcp_fingerprint import fingerprint_os, detect_spoof, declared_os_from_ua router = APIRouter(prefix="/api/investigation", tags=["investigation"]) @router.get("/{ip}/summary") async def get_ip_full_summary(ip: str): """ Synthèse complète pour une IP : toutes les sources en un appel. Normalise l'IP (accepte ::ffff:x.x.x.x ou x.x.x.x). """ clean_ip = ip.replace("::ffff:", "").strip() try: # ── 1. Score ML / features ───────────────────────────────────────────── ml_sql = """ SELECT max(abs(anomaly_score)) AS max_score, any(threat_level) AS threat_level, any(bot_name) AS bot_name, count() AS total_detections, uniq(host) AS distinct_hosts, uniq(ja4) AS distinct_ja4 FROM mabase_prod.ml_detected_anomalies WHERE replaceRegexpAll(toString(src_ip), '^::ffff:', '') = %(ip)s """ ml_res = db.query(ml_sql, {"ip": clean_ip}) ml_row = ml_res.result_rows[0] if ml_res.result_rows else None ml_data = { "max_score": round(float(ml_row[0] or 0), 2) if ml_row else 0, "threat_level": str(ml_row[1] or "") if ml_row else "", "attack_type": str(ml_row[2] or "") if ml_row else "", "total_detections": int(ml_row[3] or 0) if ml_row else 0, "distinct_hosts": int(ml_row[4] or 0) if ml_row else 0, "distinct_ja4": int(ml_row[5] or 0) if ml_row else 0, } # ── 2. Brute force ───────────────────────────────────────────────────── bf_sql = """ SELECT uniq(host) AS hosts_attacked, sum(hits) AS total_hits, sum(query_params_count) AS total_params, groupArray(3)(host) AS top_hosts FROM mabase_prod.view_form_bruteforce_detected WHERE replaceRegexpAll(toString(src_ip), '^::ffff:', '') = %(ip)s """ bf_res = db.query(bf_sql, {"ip": clean_ip}) bf_row = bf_res.result_rows[0] if bf_res.result_rows else None bf_data = { "active": bool(bf_row and int(bf_row[1] or 0) > 0), "hosts_attacked": int(bf_row[0] or 0) if bf_row else 0, "total_hits": int(bf_row[1] or 0) if bf_row else 0, "total_params": int(bf_row[2] or 0) if bf_row else 0, "top_hosts": [str(h) for h in (bf_row[3] or [])] if bf_row else [], } # ── 3. TCP spoofing — fingerprinting multi-signal ───────────────────── tcp_sql = """ SELECT any(tcp_ttl_raw) AS ttl, any(tcp_win_raw) AS win, any(tcp_scale_raw) AS scale, any(tcp_mss_raw) AS mss, any(first_ua) AS ua FROM mabase_prod.agg_host_ip_ja4_1h WHERE replaceRegexpAll(toString(src_ip), '^::ffff:', '') = %(ip)s AND window_start >= now() - INTERVAL 24 HOUR AND tcp_ttl_raw > 0 LIMIT 1 """ tcp_res = db.query(tcp_sql, {"ip": clean_ip}) tcp_data = {"detected": False, "tcp_ttl": None, "suspected_os": None} if tcp_res.result_rows: r = tcp_res.result_rows[0] ttl = int(r[0] or 0) win = int(r[1] or 0) scale = int(r[2] or 0) mss = int(r[3] or 0) ua = str(r[4] or "") fp = fingerprint_os(ttl, win, scale, mss) dec_os = declared_os_from_ua(ua) spoof_res = detect_spoof(fp, dec_os) tcp_data = { "detected": spoof_res.is_spoof, "tcp_ttl": ttl, "tcp_mss": mss, "tcp_win_scale": scale, "initial_ttl": fp.initial_ttl, "hop_count": fp.hop_count, "suspected_os": fp.os_name, "declared_os": dec_os, "confidence": fp.confidence, "network_path": fp.network_path, "is_bot_tool": fp.is_bot_tool, "spoof_reason": spoof_res.reason, } # ── 4. JA4 rotation ──────────────────────────────────────────────────── rot_sql = """ SELECT distinct_ja4_count, total_hits FROM mabase_prod.view_host_ip_ja4_rotation WHERE replaceRegexpAll(toString(src_ip), '^::ffff:', '') = %(ip)s LIMIT 1 """ rot_res = db.query(rot_sql, {"ip": clean_ip}) rot_data = {"rotating": False, "distinct_ja4_count": 0} if rot_res.result_rows: row = rot_res.result_rows[0] cnt = int(row[0] or 0) rot_data = {"rotating": cnt > 1, "distinct_ja4_count": cnt, "total_hits": int(row[1] or 0)} # ── 5. Persistance ───────────────────────────────────────────────────── pers_sql = """ SELECT recurrence, worst_score, worst_threat_level, first_seen, last_seen FROM mabase_prod.view_ip_recurrence WHERE replaceRegexpAll(toString(src_ip), '^::ffff:', '') = %(ip)s LIMIT 1 """ pers_res = db.query(pers_sql, {"ip": clean_ip}) pers_data = {"persistent": False, "recurrence": 0} if pers_res.result_rows: row = pers_res.result_rows[0] pers_data = { "persistent": True, "recurrence": int(row[0] or 0), "worst_score": round(float(row[1] or 0), 2), "worst_threat_level":str(row[2] or ""), "first_seen": str(row[3]), "last_seen": str(row[4]), } # ── 6. Timeline 24h ──────────────────────────────────────────────────── tl_sql = """ SELECT toHour(window_start) AS hour, sum(hits) AS hits, groupUniqArray(3)(ja4) AS ja4s FROM mabase_prod.agg_host_ip_ja4_1h WHERE replaceRegexpAll(toString(src_ip), '^::ffff:', '') = %(ip)s AND window_start >= now() - INTERVAL 24 HOUR GROUP BY hour ORDER BY hour ASC """ tl_res = db.query(tl_sql, {"ip": clean_ip}) timeline = [ {"hour": int(r[0]), "hits": int(r[1]), "ja4s": [str(j) for j in (r[2] or [])]} for r in tl_res.result_rows ] # ── Global risk score (heuristic) ────────────────────────────────────── risk = 0 risk += min(50, ml_data["max_score"] * 50) if bf_data["active"]: risk += 20 if tcp_data["detected"]: if tcp_data.get("is_bot_tool"): risk += 30 # outil de scan connu else: risk += 15 # spoof OS if rot_data["rotating"]: risk += min(15, rot_data["distinct_ja4_count"] * 3) if pers_data["persistent"]: risk += min(10, pers_data["recurrence"] * 2) risk = min(100, round(risk)) return { "ip": clean_ip, "risk_score": risk, "ml": ml_data, "bruteforce": bf_data, "tcp_spoofing":tcp_data, "ja4_rotation":rot_data, "persistence": pers_data, "timeline_24h":timeline, } except Exception as e: raise HTTPException(status_code=500, detail=str(e))