""" Endpoints pour la détection du TCP spoofing (TTL / window size anormaux) Règle de corrélation : - TTL=0 ou tcp_window_size=0 → données TCP absentes (proxy/LB) → pas de corrélation possible - TTL 55-65 → fingerprint Linux/Mac (initial TTL 64) - TTL 120-135 → fingerprint Windows (initial TTL 128) - TTL 110-120 → fingerprint Windows (initial TTL 128, quelques sauts) - Toute autre valeur → OS indéterminé → pas de flag spoofing - spoof_flag = True UNIQUEMENT si OS fingerprinting TCP possible ET incompatible avec l'UA """ from fastapi import APIRouter, HTTPException, Query from ..database import db router = APIRouter(prefix="/api/tcp-spoofing", tags=["tcp_spoofing"]) # Plages TTL qui permettent une corrélation fiable _TTL_LINUX = (range(52, 66), "Linux/Mac") # initial 64, 1-12 sauts _TTL_WINDOWS = (range(110, 136), "Windows") # initial 128, 1-18 sauts _TTL_CISCO = (range(240, 256), "Cisco/BSD") # initial 255 def _suspected_os(ttl: int) -> str: """Retourne l'OS probable à partir du TTL observé. Retourne 'Unknown' si le TTL ne permet pas une corrélation fiable (TTL=0 = pas de données TCP, ou hors plage connue). """ if ttl <= 0: return "Unknown" # Pas de données TCP (proxy/CDN) for rng, name in (_TTL_LINUX, _TTL_WINDOWS, _TTL_CISCO): if ttl in rng: return name return "Unknown" def _declared_os(ua: str) -> str: ua = ua or "" if "Windows" in ua: return "Windows" if "Mac OS X" in ua: return "macOS" if "Linux" in ua or "Android" in ua: return "Linux/Android" return "Unknown" def _is_spoof(suspected_os: str, declared_os: str) -> bool: """Spoof confirmé uniquement si on a un fingerprint TCP fiable ET une incompatibilité d'OS.""" if suspected_os == "Unknown" or declared_os == "Unknown": return False # Pas de corrélation possible # Linux/Mac fingerprint TCP mais UA déclare Windows if suspected_os == "Linux/Mac" and declared_os == "Windows": return True # Windows fingerprint TCP mais UA déclare Linux/Android ou macOS if suspected_os == "Windows" and declared_os in ("Linux/Android", "macOS"): return True return False @router.get("/overview") async def get_tcp_spoofing_overview(): """Statistiques globales : seules les entrées avec données TCP valides sont analysées.""" try: sql = """ SELECT count() AS total_entries, uniq(src_ip) AS unique_ips, countIf(tcp_ttl = 0) AS no_tcp_data, countIf(tcp_ttl > 0) AS with_tcp_data, countIf(tcp_ttl BETWEEN 52 AND 65) AS linux_fingerprint, countIf(tcp_ttl BETWEEN 110 AND 135) AS windows_fingerprint FROM mabase_prod.view_tcp_spoofing_detected """ result = db.query(sql) row = result.result_rows[0] total_entries = int(row[0]) unique_ips = int(row[1]) no_tcp_data = int(row[2]) with_tcp_data = int(row[3]) linux_fp = int(row[4]) windows_fp = int(row[5]) # Distribution TTL uniquement pour les entrées avec données TCP valides ttl_sql = """ SELECT tcp_ttl, count() AS cnt, uniq(src_ip) AS ips FROM mabase_prod.view_tcp_spoofing_detected WHERE tcp_ttl > 0 GROUP BY tcp_ttl ORDER BY cnt DESC LIMIT 15 """ ttl_res = db.query(ttl_sql) ttl_distribution = [ {"ttl": int(r[0]), "count": int(r[1]), "ips": int(r[2])} for r in ttl_res.result_rows ] # Distribution window_size pour entrées avec données TCP win_sql = """ SELECT tcp_window_size, count() AS cnt FROM mabase_prod.view_tcp_spoofing_detected WHERE tcp_ttl > 0 GROUP BY tcp_window_size ORDER BY cnt DESC LIMIT 10 """ win_res = db.query(win_sql) window_size_distribution = [ {"window_size": int(r[0]), "count": int(r[1])} for r in win_res.result_rows ] return { "total_entries": total_entries, "unique_ips": unique_ips, "no_tcp_data": no_tcp_data, "with_tcp_data": with_tcp_data, "linux_fingerprint": linux_fp, "windows_fingerprint": windows_fp, "ttl_distribution": ttl_distribution, "window_size_distribution": window_size_distribution, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/list") async def get_tcp_spoofing_list( limit: int = Query(100, ge=1, le=1000), offset: int = Query(0, ge=0), spoof_only: bool = Query(False, description="Ne retourner que les vrais spoofs (TTL corrélable + OS mismatch)"), ): """Liste des entrées avec données TCP valides (tcp_ttl > 0). Entrées sans données TCP (TTL=0) exclues : pas de corrélation possible. Si spoof_only=True, retourne uniquement les entrées avec fingerprint OS identifiable (Linux/Mac TTL 52-65). """ try: # Filtre SQL : seules les entrées avec TTL valide, et si spoof_only les plages corrélables if spoof_only: # Seules les plages de TTL qui permettent une identification OS fiable ttl_filter = "tcp_ttl BETWEEN 52 AND 65 OR tcp_ttl BETWEEN 110 AND 135 OR tcp_ttl BETWEEN 240 AND 255" else: ttl_filter = "tcp_ttl > 0" count_sql = f"SELECT count() FROM mabase_prod.view_tcp_spoofing_detected WHERE {ttl_filter}" total = int(db.query(count_sql).result_rows[0][0]) sql = f""" SELECT replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS src_ip, ja4, tcp_ttl, tcp_window_size, first_ua FROM mabase_prod.view_tcp_spoofing_detected WHERE {ttl_filter} ORDER BY tcp_ttl ASC LIMIT %(limit)s OFFSET %(offset)s """ result = db.query(sql, {"limit": limit, "offset": offset}) items = [] for row in result.result_rows: ip = str(row[0]) ja4 = str(row[1] or "") ttl = int(row[2]) window_size = int(row[3]) ua = str(row[4] or "") sus_os = _suspected_os(ttl) dec_os = _declared_os(ua) spoof_flag = _is_spoof(sus_os, dec_os) if spoof_only and not spoof_flag: continue items.append({ "ip": ip, "ja4": ja4, "tcp_ttl": ttl, "tcp_window_size": window_size, "first_ua": ua, "suspected_os": sus_os, "declared_os": dec_os, "spoof_flag": spoof_flag, }) return {"items": items, "total": total} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/matrix") async def get_tcp_spoofing_matrix(): """Matrice suspected_os × declared_os — uniquement entrées avec TTL valide.""" try: sql = """ SELECT tcp_ttl, first_ua FROM mabase_prod.view_tcp_spoofing_detected WHERE tcp_ttl > 0 """ result = db.query(sql) counts: dict = {} for row in result.result_rows: ttl = int(row[0]) ua = str(row[1] or "") sus_os = _suspected_os(ttl) dec_os = _declared_os(ua) key = (sus_os, dec_os) counts[key] = counts.get(key, 0) + 1 matrix = [ { "suspected_os": k[0], "declared_os": k[1], "count": v, "is_spoof": _is_spoof(k[0], k[1]), } for k, v in counts.items() ] matrix.sort(key=lambda x: x["count"], reverse=True) return {"matrix": matrix} except Exception as e: raise HTTPException(status_code=500, detail=str(e))