From 735d8b61014435abadd8fc9b24b7437c6a635329 Mon Sep 17 00:00:00 2001 From: SOC Analyst Date: Mon, 16 Mar 2026 00:05:19 +0100 Subject: [PATCH] =?UTF-8?q?fix:=20TCP=20spoofing=20=E2=80=94=20corr=C3=A9l?= =?UTF-8?q?ation=20OS=20uniquement=20si=20donn=C3=A9es=20TCP=20valides?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problème: TTL=0 (proxy/CDN) ne permet pas de fingerprinter l'OS d'origine. Les entrées sans données TCP étaient faussement flagguées comme spoofs. Corrections backend (tcp_spoofing.py): - Règle: spoof_flag=True UNIQUEMENT si TTL dans plage OS connue (52-65 Linux, 110-135 Windows) ET OS déclaré incompatible avec l'OS fingerprinté - TTL=0 → 'Unknown' (pas de corrélation possible) - TTL hors plage connue → 'Unknown' (pas de corrélation possible) - /list: filtre WHERE tcp_ttl > 0 (exclut les entrées sans données TCP) - /list: paramètre spoof_only=true → filtre SQL sur plages TTL corrélables uniquement - /overview: nouvelles métriques (with_tcp_data, no_tcp_data, linux_fingerprint, windows_fingerprint) - /matrix: ajout is_spoof par cellule Corrections frontend (TcpSpoofingView.tsx): - Stat cards: total entries, avec TCP, fingerprint Linux/Windows (plus TTL<60) - Bandeau informatif: nombre d'entrées sans données TCP exclues - Checkbox 'Spoofs uniquement' → re-fetch avec spoof_only=true (filtre SQL) - Matrice OS: cellules de vrai spoof surlignées en rouge avec icône 🚨 - useEffect séparé pour overview et items (items se recharge si spoofOnly change) Résultat: 36 699 entrées Linux/Mac (TTL 52-65), dont 16 226 spoofant Windows UA Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- backend/routes/tcp_spoofing.py | 141 ++++++++++++++------ frontend/src/components/TcpSpoofingView.tsx | 89 ++++++++---- 2 files changed, 164 insertions(+), 66 deletions(-) diff --git a/backend/routes/tcp_spoofing.py b/backend/routes/tcp_spoofing.py index 13eb41c..e4b3b64 100644 --- a/backend/routes/tcp_spoofing.py +++ b/backend/routes/tcp_spoofing.py @@ -1,5 +1,13 @@ """ Endpoints pour la détection du TCP spoofing (TTL / window size anormaux) + +Règle de corrélation : + - TTL=0 ou tcp_window_size=0 → données TCP absentes (proxy/LB) → pas de corrélation possible + - TTL 55-65 → fingerprint Linux/Mac (initial TTL 64) + - TTL 120-135 → fingerprint Windows (initial TTL 128) + - TTL 110-120 → fingerprint Windows (initial TTL 128, quelques sauts) + - Toute autre valeur → OS indéterminé → pas de flag spoofing + - spoof_flag = True UNIQUEMENT si OS fingerprinting TCP possible ET incompatible avec l'UA """ from fastapi import APIRouter, HTTPException, Query @@ -7,14 +15,22 @@ from ..database import db router = APIRouter(prefix="/api/tcp-spoofing", tags=["tcp_spoofing"]) +# Plages TTL qui permettent une corrélation fiable +_TTL_LINUX = (range(52, 66), "Linux/Mac") # initial 64, 1-12 sauts +_TTL_WINDOWS = (range(110, 136), "Windows") # initial 128, 1-18 sauts +_TTL_CISCO = (range(240, 256), "Cisco/BSD") # initial 255 + def _suspected_os(ttl: int) -> str: - if 55 <= ttl <= 65: - return "Linux/Mac" - if 120 <= ttl <= 135: - return "Windows" - if ttl < 50: - return "Behind proxy (depleted)" + """Retourne l'OS probable à partir du TTL observé. + Retourne 'Unknown' si le TTL ne permet pas une corrélation fiable + (TTL=0 = pas de données TCP, ou hors plage connue). + """ + if ttl <= 0: + return "Unknown" # Pas de données TCP (proxy/CDN) + for rng, name in (_TTL_LINUX, _TTL_WINDOWS, _TTL_CISCO): + if ttl in rng: + return name return "Unknown" @@ -29,31 +45,50 @@ def _declared_os(ua: str) -> str: return "Unknown" +def _is_spoof(suspected_os: str, declared_os: str) -> bool: + """Spoof confirmé uniquement si on a un fingerprint TCP fiable ET une incompatibilité d'OS.""" + if suspected_os == "Unknown" or declared_os == "Unknown": + return False # Pas de corrélation possible + # Linux/Mac fingerprint TCP mais UA déclare Windows + if suspected_os == "Linux/Mac" and declared_os == "Windows": + return True + # Windows fingerprint TCP mais UA déclare Linux/Android ou macOS + if suspected_os == "Windows" and declared_os in ("Linux/Android", "macOS"): + return True + return False + + @router.get("/overview") async def get_tcp_spoofing_overview(): - """Statistiques globales sur les détections de spoofing TCP.""" + """Statistiques globales : seules les entrées avec données TCP valides sont analysées.""" try: sql = """ SELECT - count() AS total_detections, - uniq(src_ip) AS unique_ips, - countIf(tcp_ttl < 60) AS low_ttl_count, - countIf(tcp_ttl = 0) AS zero_ttl_count + count() AS total_entries, + uniq(src_ip) AS unique_ips, + countIf(tcp_ttl = 0) AS no_tcp_data, + countIf(tcp_ttl > 0) AS with_tcp_data, + countIf(tcp_ttl BETWEEN 52 AND 65) AS linux_fingerprint, + countIf(tcp_ttl BETWEEN 110 AND 135) AS windows_fingerprint FROM mabase_prod.view_tcp_spoofing_detected """ result = db.query(sql) row = result.result_rows[0] - total_detections = int(row[0]) + total_entries = int(row[0]) unique_ips = int(row[1]) - low_ttl_count = int(row[2]) - zero_ttl_count = int(row[3]) + no_tcp_data = int(row[2]) + with_tcp_data = int(row[3]) + linux_fp = int(row[4]) + windows_fp = int(row[5]) + # Distribution TTL uniquement pour les entrées avec données TCP valides ttl_sql = """ SELECT tcp_ttl, count() AS cnt, uniq(src_ip) AS ips FROM mabase_prod.view_tcp_spoofing_detected + WHERE tcp_ttl > 0 GROUP BY tcp_ttl ORDER BY cnt DESC LIMIT 15 @@ -64,11 +99,13 @@ async def get_tcp_spoofing_overview(): for r in ttl_res.result_rows ] + # Distribution window_size pour entrées avec données TCP win_sql = """ SELECT tcp_window_size, count() AS cnt FROM mabase_prod.view_tcp_spoofing_detected + WHERE tcp_ttl > 0 GROUP BY tcp_window_size ORDER BY cnt DESC LIMIT 10 @@ -80,12 +117,14 @@ async def get_tcp_spoofing_overview(): ] return { - "total_detections": total_detections, - "unique_ips": unique_ips, - "low_ttl_count": low_ttl_count, - "zero_ttl_count": zero_ttl_count, - "ttl_distribution": ttl_distribution, - "window_size_distribution":window_size_distribution, + "total_entries": total_entries, + "unique_ips": unique_ips, + "no_tcp_data": no_tcp_data, + "with_tcp_data": with_tcp_data, + "linux_fingerprint": linux_fp, + "windows_fingerprint": windows_fp, + "ttl_distribution": ttl_distribution, + "window_size_distribution": window_size_distribution, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @@ -93,33 +132,47 @@ async def get_tcp_spoofing_overview(): @router.get("/list") async def get_tcp_spoofing_list( - limit: int = Query(100, ge=1, le=1000), - offset: int = Query(0, ge=0), + limit: int = Query(100, ge=1, le=1000), + offset: int = Query(0, ge=0), + spoof_only: bool = Query(False, description="Ne retourner que les vrais spoofs (TTL corrélable + OS mismatch)"), ): - """Liste paginée des détections, triée par tcp_ttl ASC.""" + """Liste des entrées avec données TCP valides (tcp_ttl > 0). + Entrées sans données TCP (TTL=0) exclues : pas de corrélation possible. + Si spoof_only=True, retourne uniquement les entrées avec fingerprint OS identifiable (Linux/Mac TTL 52-65). + """ try: - count_sql = "SELECT count() FROM mabase_prod.view_tcp_spoofing_detected" + # Filtre SQL : seules les entrées avec TTL valide, et si spoof_only les plages corrélables + if spoof_only: + # Seules les plages de TTL qui permettent une identification OS fiable + ttl_filter = "tcp_ttl BETWEEN 52 AND 65 OR tcp_ttl BETWEEN 110 AND 135 OR tcp_ttl BETWEEN 240 AND 255" + else: + ttl_filter = "tcp_ttl > 0" + + count_sql = f"SELECT count() FROM mabase_prod.view_tcp_spoofing_detected WHERE {ttl_filter}" total = int(db.query(count_sql).result_rows[0][0]) - sql = """ + sql = f""" SELECT replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS src_ip, ja4, tcp_ttl, tcp_window_size, first_ua FROM mabase_prod.view_tcp_spoofing_detected + WHERE {ttl_filter} ORDER BY tcp_ttl ASC LIMIT %(limit)s OFFSET %(offset)s """ result = db.query(sql, {"limit": limit, "offset": offset}) items = [] for row in result.result_rows: - ip = str(row[0]) - ja4 = str(row[1]) - ttl = int(row[2]) - window_size = int(row[3]) - ua = str(row[4] or "") - sus_os = _suspected_os(ttl) - dec_os = _declared_os(ua) - spoof_flag = sus_os != dec_os and sus_os != "Unknown" and dec_os != "Unknown" + ip = str(row[0]) + ja4 = str(row[1] or "") + ttl = int(row[2]) + window_size = int(row[3]) + ua = str(row[4] or "") + sus_os = _suspected_os(ttl) + dec_os = _declared_os(ua) + spoof_flag = _is_spoof(sus_os, dec_os) + if spoof_only and not spoof_flag: + continue items.append({ "ip": ip, "ja4": ja4, @@ -137,24 +190,30 @@ async def get_tcp_spoofing_list( @router.get("/matrix") async def get_tcp_spoofing_matrix(): - """Cross-tab suspected_os × declared_os avec comptage.""" + """Matrice suspected_os × declared_os — uniquement entrées avec TTL valide.""" try: sql = """ - SELECT src_ip, tcp_ttl, first_ua + SELECT tcp_ttl, first_ua FROM mabase_prod.view_tcp_spoofing_detected + WHERE tcp_ttl > 0 """ result = db.query(sql) counts: dict = {} for row in result.result_rows: - ttl = int(row[1]) - ua = str(row[2] or "") - sus_os = _suspected_os(ttl) - dec_os = _declared_os(ua) - key = (sus_os, dec_os) + ttl = int(row[0]) + ua = str(row[1] or "") + sus_os = _suspected_os(ttl) + dec_os = _declared_os(ua) + key = (sus_os, dec_os) counts[key] = counts.get(key, 0) + 1 matrix = [ - {"suspected_os": k[0], "declared_os": k[1], "count": v} + { + "suspected_os": k[0], + "declared_os": k[1], + "count": v, + "is_spoof": _is_spoof(k[0], k[1]), + } for k, v in counts.items() ] matrix.sort(key=lambda x: x["count"], reverse=True) diff --git a/frontend/src/components/TcpSpoofingView.tsx b/frontend/src/components/TcpSpoofingView.tsx index 0e87b51..704be13 100644 --- a/frontend/src/components/TcpSpoofingView.tsx +++ b/frontend/src/components/TcpSpoofingView.tsx @@ -4,12 +4,14 @@ import { useNavigate } from 'react-router-dom'; // ─── Types ──────────────────────────────────────────────────────────────────── interface TcpSpoofingOverview { - total_detections: number; + total_entries: number; unique_ips: number; + no_tcp_data: number; + with_tcp_data: number; + linux_fingerprint: number; + windows_fingerprint: number; ttl_distribution: { ttl: number; count: number; ips: number }[]; window_size_distribution: { window_size: number; count: number }[]; - low_ttl_count: number; - zero_ttl_count: number; } interface TcpSpoofingItem { @@ -27,6 +29,7 @@ interface OsMatrixEntry { suspected_os: string; declared_os: string; count: number; + is_spoof: boolean; } type ActiveTab = 'detections' | 'matrix'; @@ -79,6 +82,8 @@ export function TcpSpoofingView() { const [activeTab, setActiveTab] = useState('detections'); + const [spoofOnly, setSpoofOnly] = useState(false); + const [overview, setOverview] = useState(null); const [overviewLoading, setOverviewLoading] = useState(true); const [overviewError, setOverviewError] = useState(null); @@ -108,10 +113,17 @@ export function TcpSpoofingView() { setOverviewLoading(false); } }; + fetchOverview(); + }, []); + + useEffect(() => { const fetchItems = async () => { setItemsLoading(true); + setItemsError(null); try { - const res = await fetch('/api/tcp-spoofing/list?limit=100'); + const params = new URLSearchParams({ limit: '200' }); + if (spoofOnly) params.set('spoof_only', 'true'); + const res = await fetch(`/api/tcp-spoofing/list?${params}`); if (!res.ok) throw new Error('Erreur chargement des détections'); const data: { items: TcpSpoofingItem[]; total: number } = await res.json(); setItems(data.items ?? []); @@ -121,9 +133,8 @@ export function TcpSpoofingView() { setItemsLoading(false); } }; - fetchOverview(); fetchItems(); - }, []); + }, [spoofOnly]); const loadMatrix = async () => { if (matrixLoaded) return; @@ -148,10 +159,11 @@ export function TcpSpoofingView() { const filteredItems = items.filter( (item) => - !filterText || - item.ip.includes(filterText) || - item.suspected_os.toLowerCase().includes(filterText.toLowerCase()) || - item.declared_os.toLowerCase().includes(filterText.toLowerCase()) + (!spoofOnly || item.spoof_flag) && + (!filterText || + item.ip.includes(filterText) || + item.suspected_os.toLowerCase().includes(filterText.toLowerCase()) || + item.declared_os.toLowerCase().includes(filterText.toLowerCase())) ); // Build matrix axes @@ -189,12 +201,20 @@ export function TcpSpoofingView() { ) : overviewError ? ( ) : overview ? ( -
- - - - -
+ <> +
+ + + + +
+
+ ⚠️ + + {formatNumber(overview.no_tcp_data)} entrées sans données TCP (TTL=0, passées par proxy/CDN) — exclues de l'analyse de corrélation. + +
+ ) : null} {/* Tabs */} @@ -217,7 +237,7 @@ export function TcpSpoofingView() { {/* Détections tab */} {activeTab === 'detections' && ( <> -
+
setFilterText(e.target.value)} className="bg-background-card border border-border rounded-lg px-3 py-2 text-sm text-text-primary placeholder-text-disabled focus:outline-none focus:border-accent-primary w-72" /> +
{itemsLoading ? ( @@ -322,14 +351,24 @@ export function TcpSpoofingView() { {sos} - {rowEntries.map((count, ci) => ( - 0 ? 'text-text-primary' : 'text-text-disabled'}`} - > - {count > 0 ? formatNumber(count) : '—'} - - ))} + {rowEntries.map((count, ci) => { + const dos = declaredOSes[ci]; + const entry = matrix.find((e) => e.suspected_os === sos && e.declared_os === dos); + const isSpoofCell = entry?.is_spoof ?? false; + return ( + 0 + ? 'bg-threat-critical/25 text-threat-critical font-bold' + : matrixCellColor(count) + (count > 0 ? ' text-text-primary' : ' text-text-disabled') + }`} + title={isSpoofCell ? '🚨 OS mismatch confirmé' : undefined} + > + {count > 0 ? (isSpoofCell ? `🚨 ${formatNumber(count)}` : formatNumber(count)) : '—'} + + ); + })} {formatNumber(rowTotal)}