Files
dashboard/backend/routes/tcp_spoofing.py
SOC Analyst 735d8b6101 fix: TCP spoofing — corrélation OS uniquement si données TCP valides
Problème: TTL=0 (proxy/CDN) ne permet pas de fingerprinter l'OS d'origine.
Les entrées sans données TCP étaient faussement flagguées comme spoofs.

Corrections backend (tcp_spoofing.py):
- Règle: spoof_flag=True UNIQUEMENT si TTL dans plage OS connue (52-65 Linux, 110-135 Windows)
  ET OS déclaré incompatible avec l'OS fingerprinté
- TTL=0 → 'Unknown' (pas de corrélation possible)
- TTL hors plage connue → 'Unknown' (pas de corrélation possible)
- /list: filtre WHERE tcp_ttl > 0 (exclut les entrées sans données TCP)
- /list: paramètre spoof_only=true → filtre SQL sur plages TTL corrélables uniquement
- /overview: nouvelles métriques (with_tcp_data, no_tcp_data, linux_fingerprint, windows_fingerprint)
- /matrix: ajout is_spoof par cellule

Corrections frontend (TcpSpoofingView.tsx):
- Stat cards: total entries, avec TCP, fingerprint Linux/Windows (plus TTL<60)
- Bandeau informatif: nombre d'entrées sans données TCP exclues
- Checkbox 'Spoofs uniquement' → re-fetch avec spoof_only=true (filtre SQL)
- Matrice OS: cellules de vrai spoof surlignées en rouge avec icône 🚨
- useEffect séparé pour overview et items (items se recharge si spoofOnly change)

Résultat: 36 699 entrées Linux/Mac (TTL 52-65), dont 16 226 spoofant Windows UA

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-16 00:05:19 +01:00

223 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Endpoints pour la détection du TCP spoofing (TTL / window size anormaux)
Règle de corrélation :
- TTL=0 ou tcp_window_size=0 → données TCP absentes (proxy/LB) → pas de corrélation possible
- TTL 55-65 → fingerprint Linux/Mac (initial TTL 64)
- TTL 120-135 → fingerprint Windows (initial TTL 128)
- TTL 110-120 → fingerprint Windows (initial TTL 128, quelques sauts)
- Toute autre valeur → OS indéterminé → pas de flag spoofing
- spoof_flag = True UNIQUEMENT si OS fingerprinting TCP possible ET incompatible avec l'UA
"""
from fastapi import APIRouter, HTTPException, Query
from ..database import db
router = APIRouter(prefix="/api/tcp-spoofing", tags=["tcp_spoofing"])
# Plages TTL qui permettent une corrélation fiable
_TTL_LINUX = (range(52, 66), "Linux/Mac") # initial 64, 1-12 sauts
_TTL_WINDOWS = (range(110, 136), "Windows") # initial 128, 1-18 sauts
_TTL_CISCO = (range(240, 256), "Cisco/BSD") # initial 255
def _suspected_os(ttl: int) -> str:
"""Retourne l'OS probable à partir du TTL observé.
Retourne 'Unknown' si le TTL ne permet pas une corrélation fiable
(TTL=0 = pas de données TCP, ou hors plage connue).
"""
if ttl <= 0:
return "Unknown" # Pas de données TCP (proxy/CDN)
for rng, name in (_TTL_LINUX, _TTL_WINDOWS, _TTL_CISCO):
if ttl in rng:
return name
return "Unknown"
def _declared_os(ua: str) -> str:
ua = ua or ""
if "Windows" in ua:
return "Windows"
if "Mac OS X" in ua:
return "macOS"
if "Linux" in ua or "Android" in ua:
return "Linux/Android"
return "Unknown"
def _is_spoof(suspected_os: str, declared_os: str) -> bool:
"""Spoof confirmé uniquement si on a un fingerprint TCP fiable ET une incompatibilité d'OS."""
if suspected_os == "Unknown" or declared_os == "Unknown":
return False # Pas de corrélation possible
# Linux/Mac fingerprint TCP mais UA déclare Windows
if suspected_os == "Linux/Mac" and declared_os == "Windows":
return True
# Windows fingerprint TCP mais UA déclare Linux/Android ou macOS
if suspected_os == "Windows" and declared_os in ("Linux/Android", "macOS"):
return True
return False
@router.get("/overview")
async def get_tcp_spoofing_overview():
"""Statistiques globales : seules les entrées avec données TCP valides sont analysées."""
try:
sql = """
SELECT
count() AS total_entries,
uniq(src_ip) AS unique_ips,
countIf(tcp_ttl = 0) AS no_tcp_data,
countIf(tcp_ttl > 0) AS with_tcp_data,
countIf(tcp_ttl BETWEEN 52 AND 65) AS linux_fingerprint,
countIf(tcp_ttl BETWEEN 110 AND 135) AS windows_fingerprint
FROM mabase_prod.view_tcp_spoofing_detected
"""
result = db.query(sql)
row = result.result_rows[0]
total_entries = int(row[0])
unique_ips = int(row[1])
no_tcp_data = int(row[2])
with_tcp_data = int(row[3])
linux_fp = int(row[4])
windows_fp = int(row[5])
# Distribution TTL uniquement pour les entrées avec données TCP valides
ttl_sql = """
SELECT
tcp_ttl,
count() AS cnt,
uniq(src_ip) AS ips
FROM mabase_prod.view_tcp_spoofing_detected
WHERE tcp_ttl > 0
GROUP BY tcp_ttl
ORDER BY cnt DESC
LIMIT 15
"""
ttl_res = db.query(ttl_sql)
ttl_distribution = [
{"ttl": int(r[0]), "count": int(r[1]), "ips": int(r[2])}
for r in ttl_res.result_rows
]
# Distribution window_size pour entrées avec données TCP
win_sql = """
SELECT
tcp_window_size,
count() AS cnt
FROM mabase_prod.view_tcp_spoofing_detected
WHERE tcp_ttl > 0
GROUP BY tcp_window_size
ORDER BY cnt DESC
LIMIT 10
"""
win_res = db.query(win_sql)
window_size_distribution = [
{"window_size": int(r[0]), "count": int(r[1])}
for r in win_res.result_rows
]
return {
"total_entries": total_entries,
"unique_ips": unique_ips,
"no_tcp_data": no_tcp_data,
"with_tcp_data": with_tcp_data,
"linux_fingerprint": linux_fp,
"windows_fingerprint": windows_fp,
"ttl_distribution": ttl_distribution,
"window_size_distribution": window_size_distribution,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/list")
async def get_tcp_spoofing_list(
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
spoof_only: bool = Query(False, description="Ne retourner que les vrais spoofs (TTL corrélable + OS mismatch)"),
):
"""Liste des entrées avec données TCP valides (tcp_ttl > 0).
Entrées sans données TCP (TTL=0) exclues : pas de corrélation possible.
Si spoof_only=True, retourne uniquement les entrées avec fingerprint OS identifiable (Linux/Mac TTL 52-65).
"""
try:
# Filtre SQL : seules les entrées avec TTL valide, et si spoof_only les plages corrélables
if spoof_only:
# Seules les plages de TTL qui permettent une identification OS fiable
ttl_filter = "tcp_ttl BETWEEN 52 AND 65 OR tcp_ttl BETWEEN 110 AND 135 OR tcp_ttl BETWEEN 240 AND 255"
else:
ttl_filter = "tcp_ttl > 0"
count_sql = f"SELECT count() FROM mabase_prod.view_tcp_spoofing_detected WHERE {ttl_filter}"
total = int(db.query(count_sql).result_rows[0][0])
sql = f"""
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS src_ip,
ja4, tcp_ttl, tcp_window_size, first_ua
FROM mabase_prod.view_tcp_spoofing_detected
WHERE {ttl_filter}
ORDER BY tcp_ttl ASC
LIMIT %(limit)s OFFSET %(offset)s
"""
result = db.query(sql, {"limit": limit, "offset": offset})
items = []
for row in result.result_rows:
ip = str(row[0])
ja4 = str(row[1] or "")
ttl = int(row[2])
window_size = int(row[3])
ua = str(row[4] or "")
sus_os = _suspected_os(ttl)
dec_os = _declared_os(ua)
spoof_flag = _is_spoof(sus_os, dec_os)
if spoof_only and not spoof_flag:
continue
items.append({
"ip": ip,
"ja4": ja4,
"tcp_ttl": ttl,
"tcp_window_size": window_size,
"first_ua": ua,
"suspected_os": sus_os,
"declared_os": dec_os,
"spoof_flag": spoof_flag,
})
return {"items": items, "total": total}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/matrix")
async def get_tcp_spoofing_matrix():
"""Matrice suspected_os × declared_os — uniquement entrées avec TTL valide."""
try:
sql = """
SELECT tcp_ttl, first_ua
FROM mabase_prod.view_tcp_spoofing_detected
WHERE tcp_ttl > 0
"""
result = db.query(sql)
counts: dict = {}
for row in result.result_rows:
ttl = int(row[0])
ua = str(row[1] or "")
sus_os = _suspected_os(ttl)
dec_os = _declared_os(ua)
key = (sus_os, dec_os)
counts[key] = counts.get(key, 0) + 1
matrix = [
{
"suspected_os": k[0],
"declared_os": k[1],
"count": v,
"is_spoof": _is_spoof(k[0], k[1]),
}
for k, v in counts.items()
]
matrix.sort(key=lambda x: x["count"], reverse=True)
return {"matrix": matrix}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))