""" Moteur de clustering K-means++ multi-métriques (pur Python). Ref: Arthur & Vassilvitskii (2007) — k-means++: The Advantages of Careful Seeding Hotelling (1933) — PCA par puissance itérative (deflation) Features (21 dimensions, normalisées [0,1]) : 0 ttl_n : TTL initial normalisé (hops-count estimé) 1 mss_n : MSS normalisé → type réseau (Ethernet/PPPoE/VPN) 2 scale_n : facteur de mise à l'échelle TCP 3 win_n : fenêtre TCP normalisée 4 score_n : score anomalie ML (abs) 5 velocity_n : vélocité de requêtes (log1p) 6 fuzzing_n : index de fuzzing (log1p) 7 headless_n : ratio sessions headless 8 post_n : ratio POST/total 9 ip_id_zero_n : ratio IP-ID=0 (Linux/spoofé) 10 entropy_n : entropie temporelle 11 browser_n : score navigateur moderne (normalisé max 50) 12 alpn_n : mismatch ALPN/protocole 13 alpn_absent_n : ratio ALPN absent 14 h2_n : efficacité H2 multiplexing (log1p) 15 hdr_conf_n : confiance ordre headers 16 ua_ch_n : mismatch User-Agent-Client-Hints 17 asset_n : ratio assets statiques 18 direct_n : ratio accès directs 19 ja4_div_n : diversité JA4 (log1p) 20 ua_rot_n : UA rotatif (booléen) """ from __future__ import annotations import math import random from dataclasses import dataclass, field # ─── Définition des features ────────────────────────────────────────────────── # (clé SQL, nom lisible, fonction de normalisation) FEATURES = [ # TCP stack ("ttl", "TTL Initial", lambda v: min(1.0, (v or 0) / 255.0)), ("mss", "MSS Réseau", lambda v: min(1.0, (v or 0) / 1460.0)), ("scale", "Scale TCP", lambda v: min(1.0, (v or 0) / 14.0)), ("win", "Fenêtre TCP", lambda v: min(1.0, (v or 0) / 65535.0)), # Anomalie ML ("avg_score", "Score Anomalie", lambda v: min(1.0, float(v or 0))), ("avg_velocity", "Vélocité (rps)", lambda v: min(1.0, math.log1p(float(v or 0)) / math.log1p(100))), ("avg_fuzzing", "Fuzzing", lambda v: min(1.0, math.log1p(float(v or 0)) / math.log1p(300))), ("pct_headless", "Headless", lambda v: min(1.0, float(v or 0))), ("avg_post", "Ratio POST", lambda v: min(1.0, float(v or 0))), # IP-ID ("ip_id_zero", "IP-ID Zéro", lambda v: min(1.0, float(v or 0))), # Temporel ("entropy", "Entropie Temporelle", lambda v: min(1.0, math.log1p(float(v or 0)) / math.log1p(10))), # Navigateur ("browser_score","Score Navigateur", lambda v: min(1.0, float(v or 0) / 50.0)), # TLS / Protocole ("alpn_mismatch","ALPN Mismatch", lambda v: min(1.0, float(v or 0))), ("alpn_missing", "ALPN Absent", lambda v: min(1.0, float(v or 0))), ("h2_eff", "H2 Multiplexing", lambda v: min(1.0, math.log1p(float(v or 0)) / math.log1p(20))), ("hdr_conf", "Ordre Headers", lambda v: min(1.0, float(v or 0))), ("ua_ch_mismatch","UA-CH Mismatch", lambda v: min(1.0, float(v or 0))), # Comportement HTTP ("asset_ratio", "Ratio Assets", lambda v: min(1.0, float(v or 0))), ("direct_ratio", "Accès Direct", lambda v: min(1.0, float(v or 0))), # Diversité JA4 ("ja4_count", "Diversité JA4", lambda v: min(1.0, math.log1p(float(v or 0)) / math.log1p(30))), # UA rotatif ("ua_rotating", "UA Rotatif", lambda v: 1.0 if float(v or 0) > 0 else 0.0), ] FEATURE_KEYS = [f[0] for f in FEATURES] FEATURE_NAMES = [f[1] for f in FEATURES] FEATURE_NORMS = [f[2] for f in FEATURES] N_FEATURES = len(FEATURES) # ─── Utilitaires vectoriels (pur Python) ────────────────────────────────────── def _dist2(a: list[float], b: list[float]) -> float: return sum((x - y) ** 2 for x, y in zip(a, b)) def _mean_vec(vecs: list[list[float]]) -> list[float]: n = len(vecs) if n == 0: return [0.0] * N_FEATURES return [sum(v[i] for v in vecs) / n for i in range(N_FEATURES)] # ─── Construction du vecteur de features ───────────────────────────────────── def build_feature_vector(row: dict) -> list[float]: """Normalise un dict de colonnes SQL → vecteur [0,1]^N_FEATURES.""" return [fn(row.get(key)) for key, fn in zip(FEATURE_KEYS, FEATURE_NORMS)] # ─── K-means++ ─────────────────────────────────────────────────────────────── @dataclass class KMeansResult: centroids: list[list[float]] labels: list[int] inertia: float n_iter: int def kmeans_pp( points: list[list[float]], k: int, max_iter: int = 60, seed: int = 42, n_init: int = 3, ) -> KMeansResult: """ K-means avec initialisation k-means++ (Arthur & Vassilvitskii, 2007). Lance `n_init` fois et retourne le meilleur résultat (inertie minimale). """ rng = random.Random(seed) best: KMeansResult | None = None for attempt in range(n_init): # ── Initialisation k-means++ ──────────────────────────────────── first_idx = rng.randrange(len(points)) centroids = [points[first_idx][:]] for _ in range(k - 1): d2 = [min(_dist2(p, c) for c in centroids) for p in points] total = sum(d2) if total == 0: break r = rng.random() * total cumul = 0.0 for i, d in enumerate(d2): cumul += d if cumul >= r: centroids.append(points[i][:]) break else: centroids.append(points[rng.randrange(len(points))][:]) # ── Itérations EM ─────────────────────────────────────────────── labels: list[int] = [0] * len(points) for iteration in range(max_iter): # E-step : affectation new_labels = [ min(range(len(centroids)), key=lambda c: _dist2(p, centroids[c])) for p in points ] if new_labels == labels and iteration > 0: break labels = new_labels # M-step : mise à jour clusters: list[list[list[float]]] = [[] for _ in range(k)] for i, l in enumerate(labels): clusters[l].append(points[i]) for j in range(k): if clusters[j]: centroids[j] = _mean_vec(clusters[j]) inertia = sum(_dist2(points[i], centroids[labels[i]]) for i in range(len(points))) result = KMeansResult( centroids=centroids, labels=labels, inertia=inertia, n_iter=iteration + 1, ) if best is None or inertia < best.inertia: best = result return best # type: ignore # ─── PCA 2D par puissance itérative ────────────────────────────────────────── def pca_2d(points: list[list[float]]) -> list[tuple[float, float]]: """ Projection PCA 2D par puissance itérative avec déflation (Hotelling). Retourne les coordonnées (pc1, pc2) normalisées dans [0,1]. """ n = len(points) if n == 0: return [] # Centrage mean = _mean_vec(points) X = [[p[i] - mean[i] for i in range(N_FEATURES)] for p in points] def power_iter(X_centered: list[list[float]], n_iter: int = 30) -> list[float]: """Trouve le premier vecteur propre de X^T X par puissance itérative.""" v = [1.0 / math.sqrt(N_FEATURES)] * N_FEATURES for _ in range(n_iter): # Xv = X @ v Xv = [sum(row[j] * v[j] for j in range(N_FEATURES)) for row in X_centered] # Xtxv = X^T @ Xv xtxv = [sum(X_centered[i][j] * Xv[i] for i in range(len(X_centered))) for j in range(N_FEATURES)] norm = math.sqrt(sum(x ** 2 for x in xtxv)) or 1e-10 v = [x / norm for x in xtxv] return v # PC1 v1 = power_iter(X) proj1 = [sum(row[j] * v1[j] for j in range(N_FEATURES)) for row in X] # Déflation : retire la composante PC1 de X X2 = [ [X[i][j] - proj1[i] * v1[j] for j in range(N_FEATURES)] for i in range(n) ] # PC2 v2 = power_iter(X2) proj2 = [sum(row[j] * v2[j] for j in range(N_FEATURES)) for row in X2] # Normalisation [0,1] def _norm01(vals: list[float]) -> list[float]: lo, hi = min(vals), max(vals) rng = hi - lo or 1e-10 return [(v - lo) / rng for v in vals] p1 = _norm01(proj1) p2 = _norm01(proj2) return list(zip(p1, p2)) # ─── Nommage automatique des clusters ──────────────────────────────────────── def name_cluster(centroid: list[float], raw_stats: dict | None = None) -> str: """ Génère un nom lisible à partir du centroïde normalisé et de statistiques brutes. Priorité : signaux les plus discriminants en premier. """ score = centroid[4] # anomalie ML vel = centroid[5] # vélocité fuzz = centroid[6] # fuzzing (log1p normalisé, >0.35 ≈ fuzzing_index > 100) hless = centroid[7] # headless post = centroid[8] # POST ratio alpn = centroid[12] # ALPN mismatch h2 = centroid[14] # H2 eff ua_ch = centroid[16] # UA-CH mismatch ja4d = centroid[19] # JA4 diversité ua_rot = centroid[20] # UA rotatif raw_mss = (raw_stats or {}).get("mean_mss", 0) raw_ttl = (raw_stats or {}).get("mean_ttl", 0) or (centroid[0] * 255) raw_scale = (raw_stats or {}).get("mean_scale", 0) # ── Signaux forts (déterministes) ──────────────────────────────────── # Pattern Masscan : mss≈1452, scale≈4, TTL 48-57 if raw_mss and 1440 <= raw_mss <= 1460 and raw_scale and 3 <= raw_scale <= 5 and raw_ttl < 60: return "🤖 Masscan / Scanner IP" # Fuzzer agressif (fuzzing_index normalisé > 0.35 ≈ valeur brute > 100) if fuzz > 0.35: return "🤖 Bot Fuzzer / Scanner" # UA rotatif + UA-CH mismatch : bot sophistiqué simulant un navigateur if ua_rot > 0.5 and ua_ch > 0.7: return "🤖 Bot UA Rotatif + CH Mismatch" # UA-CH mismatch fort seul (navigateur simulé sans headers CH) if ua_ch > 0.8: return "⚠️ Bot UA-CH Incohérent" # ── Score ML modéré + signal comportemental ────────────────────────── if score > 0.20: if hless > 0.3: return "⚠️ Navigateur Headless Suspect" if vel > 0.25: return "⚠️ Bot Haute Vélocité" if post > 0.4: return "⚠️ Bot POST Automatisé" if alpn > 0.5 or h2 > 0.5: return "⚠️ TLS/H2 Anormal" if ua_ch > 0.4: return "⚠️ Anomalie UA-CH" return "⚠️ Anomalie ML Modérée" # ── Signaux faibles ─────────────────────────────────────────────────── if ua_ch > 0.4: return "🔎 UA-CH Incohérent" if ja4d > 0.5: return "🔄 Client Multi-Fingerprint" # ── Classification réseau / OS ──────────────────────────────────────── # MSS bas → VPN ou tunnel if raw_mss and raw_mss < 1360: return "🌐 VPN / Tunnel" if raw_ttl < 70: return "🐧 Linux / Mobile" if raw_ttl > 110: return "🪟 Windows" return "✅ Trafic Légitime" def risk_score_from_centroid(centroid: list[float]) -> float: """Score de risque [0,1] pondéré. Calibré pour les valeurs observées (score ML ~0.3).""" # Normalisation de score ML : x / 0.5 pour étendre la plage utile (0-0.5 → 0-1) score_n = min(1.0, centroid[4] / 0.5) fuzz_n = centroid[6] ua_ch_n = centroid[16] ua_rot_n = centroid[20] vel_n = centroid[5] hless_n = centroid[7] ip_id_n = centroid[9] alpn_n = centroid[12] ja4d_n = centroid[19] post_n = centroid[8] return min(1.0, 0.25 * score_n + 0.20 * ua_ch_n + 0.15 * fuzz_n + 0.12 * ua_rot_n + 0.10 * hless_n + 0.07 * vel_n + 0.04 * ip_id_n + 0.04 * alpn_n + 0.03 * ja4d_n + 0.03 * post_n )