Files
dashboard/backend/routes/tcp_spoofing.py
SOC Analyst e2db8ca84e feat: clustering multi-métriques + TCP fingerprinting amélioré
- TCP fingerprinting: 20 signatures OS (p0f-style), scoring multi-signal
  TTL/MSS/scale/fenêtre, détection Masscan 97% confiance, réseau path
  (Ethernet/PPPoE/VPN/Tunnel), estimation hop-count

- Clustering IPs: K-means++ (Arthur & Vassilvitskii 2007) sur 21 features
  TCP stack + anomalie ML + TLS/protocole + navigateur + temporel
  PCA-2D par puissance itérative (Hotelling) pour positionnement

- Visualisation redesign: 2 vues lisibles
  - Tableau de bord: grille de cartes groupées par niveau de risque
    (Bots / Suspects / Légitimes), métriques clés + mini-barres
  - Graphe de relations: ReactFlow avec nœuds-cartes en colonnes
    par niveau de menace, arêtes colorées par similarité, légende
  - Sidebar: RadarChart comportemental + toutes métriques + export CSV

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-18 18:22:57 +01:00

224 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Endpoints pour la détection du TCP spoofing / fingerprinting OS
Approche multi-signal (p0f-style) :
- TTL initial estimé → famille OS (Linux/Mac=64, Windows=128, Cisco/BSD=255)
- MSS → type de réseau (Ethernet=1460, PPPoE=1452, VPN=1380-1420)
- Taille de fenêtre → signature OS précise
- Facteur d'échelle → affine la version kernel/stack TCP
Détection bots : signatures connues (Masscan/ZMap/Mirai) identifiées par combinaison
win+scale+mss indépendamment de l'UA.
"""
from fastapi import APIRouter, HTTPException, Query
from ..database import db
from ..services.tcp_fingerprint import (
fingerprint_os,
detect_spoof,
declared_os_from_ua,
)
router = APIRouter(prefix="/api/tcp-spoofing", tags=["tcp_spoofing"])
@router.get("/overview")
async def get_tcp_spoofing_overview():
"""Statistiques globales avec fingerprinting multi-signal (TTL + MSS + fenêtre + scale)."""
try:
sql = """
SELECT
count() AS total_entries,
uniq(src_ip) AS unique_ips,
countIf(tcp_ttl_raw = 0) AS no_tcp_data,
countIf(tcp_ttl_raw > 0) AS with_tcp_data,
countIf(tcp_ttl_raw > 0 AND tcp_ttl_raw <= 64) AS linux_mac_fp,
countIf(tcp_ttl_raw > 64 AND tcp_ttl_raw <= 128) AS windows_fp,
countIf(tcp_ttl_raw > 128) AS cisco_bsd_fp,
countIf(tcp_win_raw = 5808 AND tcp_mss_raw = 1452 AND tcp_scale_raw = 4) AS bot_scanner_fp
FROM mabase_prod.agg_host_ip_ja4_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
"""
result = db.query(sql)
row = result.result_rows[0]
# Distribution TTL (top 15)
ttl_sql = """
SELECT tcp_ttl_raw AS ttl, count() AS cnt, uniq(src_ip) AS ips
FROM mabase_prod.agg_host_ip_ja4_1h
WHERE window_start >= now() - INTERVAL 24 HOUR AND tcp_ttl_raw > 0
GROUP BY ttl ORDER BY cnt DESC LIMIT 15
"""
ttl_res = db.query(ttl_sql)
# Distribution MSS — nouveau signal clé (top 12)
mss_sql = """
SELECT tcp_mss_raw AS mss, count() AS cnt, uniq(src_ip) AS ips
FROM mabase_prod.agg_host_ip_ja4_1h
WHERE window_start >= now() - INTERVAL 24 HOUR AND tcp_mss_raw > 0
GROUP BY mss ORDER BY cnt DESC LIMIT 12
"""
mss_res = db.query(mss_sql)
# Distribution fenêtre (top 10)
win_sql = """
SELECT tcp_win_raw AS win, count() AS cnt
FROM mabase_prod.agg_host_ip_ja4_1h
WHERE window_start >= now() - INTERVAL 24 HOUR AND tcp_ttl_raw > 0
GROUP BY win ORDER BY cnt DESC LIMIT 10
"""
win_res = db.query(win_sql)
return {
"total_entries": int(row[0]),
"unique_ips": int(row[1]),
"no_tcp_data": int(row[2]),
"with_tcp_data": int(row[3]),
"linux_mac_fingerprint": int(row[4]),
"windows_fingerprint": int(row[5]),
"cisco_bsd_fingerprint": int(row[6]),
"bot_scanner_fingerprint": int(row[7]),
"ttl_distribution": [
{"ttl": int(r[0]), "count": int(r[1]), "ips": int(r[2])}
for r in ttl_res.result_rows
],
"mss_distribution": [
{"mss": int(r[0]), "count": int(r[1]), "ips": int(r[2])}
for r in mss_res.result_rows
],
"window_size_distribution": [
{"window_size": int(r[0]), "count": int(r[1])}
for r in win_res.result_rows
],
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/list")
async def get_tcp_spoofing_list(
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
spoof_only: bool = Query(False, description="Retourner uniquement les spoofs/bots confirmés"),
):
"""Liste avec fingerprinting multi-signal (TTL + MSS + fenêtre + scale).
Inclut les champs enrichis : mss, win_scale, initial_ttl, hop_count, confidence, network_path, is_bot_tool.
"""
try:
count_sql = """
SELECT count() FROM (
SELECT src_ip, ja4
FROM mabase_prod.agg_host_ip_ja4_1h
WHERE window_start >= now() - INTERVAL 24 HOUR AND tcp_ttl_raw > 0
GROUP BY src_ip, ja4
)
"""
total = int(db.query(count_sql).result_rows[0][0])
sql = """
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS src_ip,
ja4,
any(tcp_ttl_raw) AS tcp_ttl,
any(tcp_win_raw) AS tcp_window_size,
any(tcp_scale_raw) AS tcp_win_scale,
any(tcp_mss_raw) AS tcp_mss,
any(first_ua) AS first_ua,
sum(hits) AS hits
FROM mabase_prod.agg_host_ip_ja4_1h
WHERE window_start >= now() - INTERVAL 24 HOUR AND tcp_ttl_raw > 0
GROUP BY src_ip, ja4
ORDER BY hits DESC
LIMIT %(limit)s OFFSET %(offset)s
"""
result = db.query(sql, {"limit": limit, "offset": offset})
items = []
for row in result.result_rows:
ip = str(row[0])
ja4 = str(row[1] or "")
ttl = int(row[2] or 0)
win = int(row[3] or 0)
scale = int(row[4] or 0)
mss = int(row[5] or 0)
ua = str(row[6] or "")
hits = int(row[7] or 0)
fp = fingerprint_os(ttl, win, scale, mss)
dec_os = declared_os_from_ua(ua)
spoof_res = detect_spoof(fp, dec_os)
if spoof_only and not spoof_res.is_spoof:
continue
items.append({
"ip": ip,
"ja4": ja4,
"tcp_ttl": ttl,
"tcp_window_size": win,
"tcp_win_scale": scale,
"tcp_mss": mss,
"hits": hits,
"first_ua": ua,
"suspected_os": fp.os_name,
"initial_ttl": fp.initial_ttl,
"hop_count": fp.hop_count,
"confidence": fp.confidence,
"network_path": fp.network_path,
"is_bot_tool": fp.is_bot_tool,
"declared_os": dec_os,
"spoof_flag": spoof_res.is_spoof,
"spoof_reason": spoof_res.reason,
})
return {"items": items, "total": total}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/matrix")
async def get_tcp_spoofing_matrix():
"""Matrice OS suspecté × OS déclaré avec fingerprinting multi-signal."""
try:
sql = """
SELECT
any(tcp_ttl_raw) AS ttl,
any(tcp_win_raw) AS win,
any(tcp_scale_raw) AS scale,
any(tcp_mss_raw) AS mss,
any(first_ua) AS ua,
count() AS cnt
FROM mabase_prod.agg_host_ip_ja4_1h
WHERE window_start >= now() - INTERVAL 24 HOUR AND tcp_ttl_raw > 0
GROUP BY src_ip, ja4
"""
result = db.query(sql)
counts: dict = {}
for row in result.result_rows:
ttl = int(row[0] or 0)
win = int(row[1] or 0)
scale = int(row[2] or 0)
mss = int(row[3] or 0)
ua = str(row[4] or "")
cnt = int(row[5] or 1)
fp = fingerprint_os(ttl, win, scale, mss)
dec_os = declared_os_from_ua(ua)
spoof_res = detect_spoof(fp, dec_os)
key = (fp.os_name, dec_os, spoof_res.is_spoof, fp.is_bot_tool)
counts[key] = counts.get(key, 0) + cnt
matrix = [
{
"suspected_os": k[0],
"declared_os": k[1],
"count": v,
"is_spoof": k[2],
"is_bot_tool": k[3],
}
for k, v in counts.items()
]
matrix.sort(key=lambda x: x["count"], reverse=True)
return {"matrix": matrix}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))