diff --git a/services/bot-detector/bot_detector/browser.py b/services/bot-detector/bot_detector/browser.py index e5f134b..77b13b8 100644 --- a/services/bot-detector/bot_detector/browser.py +++ b/services/bot-detector/bot_detector/browser.py @@ -21,14 +21,16 @@ _BROWSER_JA4_PROFILES = { 'ciphers': range(14, 18), 'exts': range(12, 17)}, } -# Pondération des 5 axes pour le score browser_confidence. +# Pondération des 6 axes pour le score browser_confidence. # Favorise les signaux TLS (difficiles à falsifier) sur HTTP. +# L'axe H2 complète la cohérence TLS avec les paramètres HTTP/2. _AXIS_WEIGHTS = { - 'ja4_known': 0.30, # Axe 1 — Signature JA4 dans dict_browser_ja4 (TLS fingerprint) + 'ja4_known': 0.25, # Axe 1 — Signature JA4 dans dict_browser_ja4 (TLS fingerprint) 'ja4_struct': 0.15, # Axe 2 — Structure JA4 (TLS1.3, h2, nb ciphers/ext) 'http_modern': 0.20, # Axe 3 — Client Hints + Sec-Fetch-* (PAS de User-Agent) 'nav_behavior': 0.15, # Axe 4 — Comportement de navigation (assets, referers) - 'tls_coherence': 0.20, # Axe 5 — Cohérence TLS/TCP (pas de mismatch) + 'tls_coherence': 0.15, # Axe 5 — Cohérence TLS/TCP (pas de mismatch) + 'h2_coherence': 0.10, # Axe 6 — Cohérence HTTP/2 (SETTINGS↔JA4, pseudo-headers) §2 } @@ -124,6 +126,21 @@ def _compute_browser_axes(df: pd.DataFrame) -> pd.DataFrame: + (iam == 0).astype(float) * 0.20 ) + # ── Axe 6 — Cohérence HTTP/2 (§2) ── + # Signaux : fingerprint SETTINGS connu, cohérence H2↔JA4, pseudo-headers corrects. + # Quand les données H2 sont absentes (HTTP/1.x), l'axe est neutre (0.5). + h2k = df.get('h2_settings_known', pd.Series(-1, index=df.index)).fillna(-1) + h2c = df.get('h2_ja4_coherence', pd.Series(-1, index=df.index)).fillna(-1) + h2p = df.get('h2_pseudo_order_match', pd.Series(-1, index=df.index)).fillna(-1) + # Sessions sans données H2 → axe neutre à 0.5 (ne pénalise pas les sites HTTP/1.x) + h2_present = ((h2k >= 0) | (h2c >= 0)).astype(float) + h2_score = ( + h2k.clip(0).astype(float) * 0.40 + + h2c.clip(0).astype(float) * 0.35 + + h2p.clip(0).astype(float) * 0.25 + ) + axes['axis_h2_coherence'] = h2_present * h2_score + (1 - h2_present) * 0.5 + # ── Score combiné pondéré ── axes['browser_confidence'] = sum( axes[f'axis_{k}'] * w for k, w in _AXIS_WEIGHTS.items() diff --git a/services/bot-detector/bot_detector/cycle.py b/services/bot-detector/bot_detector/cycle.py index 229e0c1..6ab5a9a 100644 --- a/services/bot-detector/bot_detector/cycle.py +++ b/services/bot-detector/bot_detector/cycle.py @@ -3,6 +3,7 @@ Orchestre un cycle complet : requête ClickHouse, preprocessing, scoring (Complet + Applicatif), feedback SOC, déduplication et insertion des résultats. """ +import time import pandas as pd from datetime import datetime @@ -16,6 +17,8 @@ from .log import log_info, log_decision from .infra import get_client, set_healthy from .preprocessing import preprocess_df, FEATURES, FEATURES_COMPLET from .pipeline import run_semi_supervised_logic +from .fleet import enrich_with_fleet_score +from .metrics import record_cycle_metrics # ═══════════════════════════════════════════════════════════════════════════════ @@ -109,6 +112,7 @@ def fetch_and_analyze(): """ global _consecutive_failures cycle_id = datetime.now().strftime('%Y%m%d_%H%M%S') + cycle_start = time.time() log_info('') log_info('=' * 70) log_info(f' CYCLE {cycle_id}') @@ -158,6 +162,15 @@ def fetch_and_analyze(): df = preprocess_df(df) + # §5 — Enrichissement avec le score de flotte JA4×ASN (bipartite fleet detection) + try: + df = enrich_with_fleet_score(df) + n_fleet = int((df.get('fleet_campaign_flag', 0) == 1).sum()) + if n_fleet > 0: + log_info(f'[Fleet §5] {n_fleet} session(s) appartenant à une flotte suspecte.') + except Exception as e: + log_info(f'[Fleet §5] Enrichissement de flotte échoué : {e}') + # ── Résumé des données chargées ─────────────────────────────────────────── n_total = len(df) n_correlated = int((df.get('correlated', pd.Series()) == 1).sum()) @@ -308,6 +321,16 @@ def fetch_and_analyze(): if all_anom.empty: log_info('[Dédup] Toutes les anomalies filtrées par TTL — rien à insérer.') log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN}) + try: + for _model_name, _feats in [('Complet', FEATURES_COMPLET), ('Applicatif', FEATURES)]: + record_cycle_metrics( + client=client, db=DB, cycle_id=cycle_id, model_name=_model_name, + df_all=df, anomalies=pd.DataFrame(), all_scored=all_scored if not all_scored.empty else pd.DataFrame(), + drift_rate=0.0, cycle_start_time=cycle_start, baseline_size=0, + threshold=0.0, valid_features=len(_feats), total_features=len(_feats), + ) + except Exception: + pass return all_anom['detected_at'] = datetime.now().replace(microsecond=0) @@ -369,3 +392,24 @@ def fetch_and_analyze(): log_info(f'║ Distribution : {", ".join(f"{k}={v}" for k, v in sorted(tl_dist.items()))}') log_info(f'╚═══════════════════════════════════════════════════════════') log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN}) + + # §9 — Enregistrer les métriques de performance du cycle dans ml_performance_metrics + try: + for _model_name, _feats in [('Complet', FEATURES_COMPLET), ('Applicatif', FEATURES)]: + record_cycle_metrics( + client=client, + db=DB, + cycle_id=cycle_id, + model_name=_model_name, + df_all=df, + anomalies=all_anom if not all_anom.empty else pd.DataFrame(), + all_scored=all_scored if not all_scored.empty else pd.DataFrame(), + drift_rate=0.0, + cycle_start_time=cycle_start, + baseline_size=0, + threshold=0.0, + valid_features=len(_feats), + total_features=len(_feats), + ) + except Exception as e: + log_info(f'[Métriques §9] Enregistrement des métriques échoué : {e}') diff --git a/services/bot-detector/bot_detector/fleet.py b/services/bot-detector/bot_detector/fleet.py new file mode 100644 index 0000000..32b360c --- /dev/null +++ b/services/bot-detector/bot_detector/fleet.py @@ -0,0 +1,174 @@ +"""Détection de flottes de bots via graphe bipartite JA4×ASN. + +§5.2 — Analyse de graphe bipartite G=(JA4 ∪ ASN, E) pour identifier les flottes +de bots coordonnées qui font tourner leurs fingerprints JA4 et ASN. + +Algorithme : +1. Construire le graphe bipartite G depuis les sessions du cycle courant +2. Projeter sur les nœuds JA4 (shared-ASN weighted projection) +3. Détecter les communautés Louvain (python-louvain) +4. Calculer fleet_score = taille × densité / log2(n_asn + 2) pour chaque communauté +5. Retourner les IPs appartenant aux communautés suspectes avec leur fleet_score +""" +import logging +from typing import Optional + +import pandas as pd +import numpy as np + +logger = logging.getLogger(__name__) + +# Seuil de fleet_score à partir duquel une communauté est considérée suspecte +FLEET_SCORE_THRESHOLD = float(__import__('os').getenv('FLEET_SCORE_THRESHOLD', '2.0')) +# Poids du fleet_score dans le score final (malus supplémentaire) +FLEET_SCORE_WEIGHT = float(__import__('os').getenv('FLEET_SCORE_WEIGHT', '0.10')) +# Nombre minimal d'arêtes pour inclure un JA4 dans l'analyse +FLEET_MIN_EDGES = int(__import__('os').getenv('FLEET_MIN_EDGES', '3')) + + +def build_fleet_graph(df: pd.DataFrame) -> Optional[object]: + """Construit le graphe bipartite JA4×ASN à partir du cycle courant. + + Nœuds : ensemble JA4 (préfixe 'ja4:') + ensemble ASN (préfixe 'asn:') + Arêtes : (ja4, asn) avec weight = nombre d'IPs distinctes sur ce couple + + Exige que df ait les colonnes : ja4, asn_number, src_ip + Retourne None si networkx n'est pas disponible ou données insuffisantes. + """ + try: + import networkx as nx + from networkx.algorithms import bipartite + except ImportError: + logger.warning("[Fleet] networkx non disponible — analyse de flotte désactivée.") + return None + + if df.empty or 'ja4' not in df.columns or 'asn_number' not in df.columns: + return None + + # Filtrer les JA4 vides et ASN 0 + mask = (df['ja4'].fillna('') != '') & (df['asn_number'].fillna('0') != '0') + sub = df[mask][['src_ip', 'ja4', 'asn_number']].copy() + if len(sub) < 10: + return None + + # Compter les IPs par (ja4, asn) — poids de l'arête + edge_weights = ( + sub.groupby(['ja4', 'asn_number'])['src_ip'] + .nunique() + .reset_index(name='n_ips') + ) + # Garder seulement les arêtes avec au moins FLEET_MIN_EDGES IPs distinctes + edge_weights = edge_weights[edge_weights['n_ips'] >= FLEET_MIN_EDGES] + if len(edge_weights) < 5: + return None + + G = nx.Graph() + ja4_nodes = set() + asn_nodes = set() + for _, row in edge_weights.iterrows(): + ja4_node = f"ja4:{row['ja4']}" + asn_node = f"asn:{row['asn_number']}" + G.add_node(ja4_node, bipartite=0) + G.add_node(asn_node, bipartite=1) + G.add_edge(ja4_node, asn_node, weight=int(row['n_ips'])) + ja4_nodes.add(ja4_node) + asn_nodes.add(asn_node) + + return G, ja4_nodes, asn_nodes + + +def detect_fleet_communities(df: pd.DataFrame) -> dict: + """Analyse le graphe bipartite et retourne un dict {src_ip: fleet_score}. + + fleet_score > FLEET_SCORE_THRESHOLD → IP appartient à une flotte suspectée. + fleet_score = 0 pour toutes les autres IPs. + + fleet_score = community_size * graph_density / log2(n_asn + 2) + """ + result = build_fleet_graph(df) + if result is None: + return {} + + G, ja4_nodes, asn_nodes = result + + try: + import networkx as nx + from networkx.algorithms import bipartite + try: + from community import best_partition as louvain_partition + LOUVAIN_AVAILABLE = True + except ImportError: + LOUVAIN_AVAILABLE = False + + # Projection bipartite : graphe des JA4 partageant des ASN + G_ja4 = bipartite.weighted_projected_graph(G, ja4_nodes) + if G_ja4.number_of_edges() == 0: + return {} + + # Détection de communautés + if LOUVAIN_AVAILABLE: + partition = louvain_partition(G_ja4, weight='weight', random_state=42) + # partition = {node: community_id} + communities: dict = {} + for node, cid in partition.items(): + communities.setdefault(cid, set()).add(node) + else: + # Fallback : composantes connexes + communities = { + i: set(c) + for i, c in enumerate(nx.connected_components(G_ja4)) + if len(c) >= 2 + } + + # Calculer le fleet_score de chaque communauté + fleet_scores: dict = {} # {ja4: fleet_score} + for cid, members in communities.items(): + if len(members) < 2: + continue + sub_g = G.subgraph( + list(members) + [n for n in asn_nodes if any(G.has_edge(n, m) for m in members)] + ) + n_asn = len([n for n in sub_g.nodes if n.startswith('asn:')]) + density = nx.density(G_ja4.subgraph(members)) + score = len(members) * density / max(np.log2(n_asn + 2), 0.1) + for ja4_node in members: + ja4 = ja4_node.replace('ja4:', '') + fleet_scores[ja4] = round(float(score), 3) + + # Mapper les fleet_scores sur les IPs + if not fleet_scores: + return {} + + ip_scores: dict = {} + for _, row in df.iterrows(): + ja4 = str(row.get('ja4', '')) + score = fleet_scores.get(ja4, 0.0) + if score >= FLEET_SCORE_THRESHOLD: + src_ip = str(row.get('src_ip', '')) + if src_ip: + ip_scores[src_ip] = max(ip_scores.get(src_ip, 0.0), score) + + n_fleets = len(set(fleet_scores.values())) + if ip_scores: + logger.info( + f"[Fleet] {len(ip_scores)} IPs dans {n_fleets} communauté(s) suspecte(s) " + f"(score max={max(ip_scores.values()):.2f})" + ) + return ip_scores + + except Exception as e: + logger.warning(f"[Fleet] Erreur détection de flotte : {e}") + return {} + + +def enrich_with_fleet_score(df: pd.DataFrame) -> pd.DataFrame: + """Enrichit le DataFrame avec fleet_score et fleet_campaign_flag. + + fleet_campaign_flag = 1 si l'IP appartient à une flotte suspectée. + fleet_score = score de la communauté (0 = pas de flotte). + """ + df = df.copy() + fleet_map = detect_fleet_communities(df) + df['fleet_score'] = df['src_ip'].map(fleet_map).fillna(0.0).astype(float) + df['fleet_campaign_flag'] = (df['fleet_score'] >= FLEET_SCORE_THRESHOLD).astype(int) + return df diff --git a/services/bot-detector/bot_detector/metrics.py b/services/bot-detector/bot_detector/metrics.py new file mode 100644 index 0000000..eccd9fa --- /dev/null +++ b/services/bot-detector/bot_detector/metrics.py @@ -0,0 +1,166 @@ +"""Module de métriques de performance du pipeline ML. + +Enregistre un résumé de chaque cycle dans ml_performance_metrics : + - Taux d'anomalie par niveau (CRITICAL/HIGH/MEDIUM/LOW) + - Taux de corrélation (correlated=1 vs 0) + - Drift rate, latence, taille baseline, seuil adaptatif + - Alertes automatiques sur calibration, drift, corrélation, latence + +Utilisation dans cycle.py : + from .metrics import record_cycle_metrics + record_cycle_metrics(client, cycle_id, model_name, ...) +""" +import time +import logging +from datetime import datetime +from typing import Optional + +import pandas as pd + +logger = logging.getLogger(__name__) + +# Seuils d'alerte (configurables via env vars) +import os +ALERT_ANOMALY_RATE_HIGH = float(os.getenv('ALERT_ANOMALY_RATE_HIGH', '0.10')) +ALERT_ANOMALY_RATE_LOW = float(os.getenv('ALERT_ANOMALY_RATE_LOW', '0.005')) +ALERT_DRIFT_RATE = float(os.getenv('ALERT_DRIFT_RATE', '0.30')) +ALERT_CORRELATION_RATE = float(os.getenv('ALERT_CORRELATION_RATE', '0.50')) +ALERT_LATENCY_MS = int(os.getenv('ALERT_LATENCY_MS', '300000')) + + +def record_cycle_metrics( + client, + db: str, + cycle_id: str, + model_name: str, + df_all: pd.DataFrame, + anomalies: pd.DataFrame, + all_scored: pd.DataFrame, + drift_rate: float, + cycle_start_time: float, + baseline_size: int, + threshold: float, + valid_features: int, + total_features: int, + meta_learner_active: bool = False, +) -> None: + """Enregistre les métriques d'un cycle dans ml_performance_metrics. + + Émet également des alertes dans les logs si les seuils sont dépassés. + + Args: + client : ClickHouseClient actif + db : nom de la base ja4_processing + cycle_id : identifiant du cycle (timestamp) + model_name : 'Complet' ou 'Applicatif' (ou variante 24h) + df_all : DataFrame complet du cycle (avant scoring) + anomalies : DataFrame des anomalies détectées + all_scored : DataFrame de tous les scores + drift_rate : taux de features en dérive [0, 1] + cycle_start_time : timestamp (time.time()) du début du cycle + baseline_size : nombre de sessions dans la baseline humaine + threshold : seuil adaptatif utilisé + valid_features : nombre de features valides + total_features : nombre total de features demandées + meta_learner_active : True si le meta-learner a été utilisé ce cycle + """ + now = datetime.utcnow() + latency_ms = int((time.time() - cycle_start_time) * 1000) + + n_total = max(len(df_all), 1) + n_scored = max(len(all_scored), 1) + + # Taux de corrélation + if 'correlated' in df_all.columns: + n_correlated = int((df_all['correlated'] == 1).sum()) + correlated_rate = n_correlated / n_total + else: + correlated_rate = 0.0 + + # Comptage par niveau de menace + n_critical = n_high = n_medium = n_low = 0 + if not anomalies.empty and 'threat_level' in anomalies.columns: + levels = anomalies['threat_level'].value_counts() + n_critical = int(levels.get('CRITICAL', 0)) + n_high = int(levels.get('HIGH', 0)) + n_medium = int(levels.get('MEDIUM', 0)) + n_low = int(levels.get('LOW', 0)) + + # Bots connus et navigateurs légitimes + n_known_bot = 0 + n_anubis_deny = 0 + n_legit_browser = 0 + if not df_all.empty: + if 'bot_name' in df_all.columns: + n_known_bot = int((df_all['bot_name'].fillna('') != '').sum()) + if 'anubis_bot_action' in df_all.columns: + n_anubis_deny = int((df_all['anubis_bot_action'] == 'DENY').sum()) + if 'browser_confidence' in df_all.columns: + from .config import BROWSER_CONFIDENCE_THRESHOLD + n_legit_browser = int((df_all['browser_confidence'] >= BROWSER_CONFIDENCE_THRESHOLD).sum()) + + anomaly_rate = (n_critical + n_high + n_medium + n_low) / n_scored + drift_alert = 1 if drift_rate > ALERT_DRIFT_RATE else 0 + + # Alertes + _emit_alerts(model_name, anomaly_rate, drift_rate, correlated_rate, latency_ms, drift_alert) + + try: + client.execute( + f"INSERT INTO {db}.ml_performance_metrics VALUES", + [{ + 'cycle_at': now, + 'model_name': model_name, + 'total_sessions': n_total, + 'correlated_rate': round(float(correlated_rate), 4), + 'anomaly_rate': round(float(anomaly_rate), 4), + 'critical_count': n_critical, + 'high_count': n_high, + 'medium_count': n_medium, + 'low_count': n_low, + 'known_bot_count': n_known_bot, + 'anubis_deny_count': n_anubis_deny, + 'legit_browser_count': n_legit_browser, + 'drift_rate': round(float(drift_rate), 4), + 'drift_alert': drift_alert, + 'cycle_latency_ms': latency_ms, + 'features_valid': valid_features, + 'features_total': total_features, + 'baseline_size': baseline_size, + 'threshold': round(float(threshold), 6), + 'meta_learner_active': 1 if meta_learner_active else 0, + }] + ) + except Exception as e: + logger.warning(f"[Metrics] Erreur d'enregistrement des métriques : {e}") + + +def _emit_alerts(model_name: str, anomaly_rate: float, drift_rate: float, + correlated_rate: float, latency_ms: int, drift_alert: int) -> None: + """Émet des alertes dans les logs si les seuils sont dépassés.""" + if anomaly_rate > ALERT_ANOMALY_RATE_HIGH: + logger.warning( + f"[{model_name}] ⚠ ALERTE CALIBRATION : taux d'anomalie élevé " + f"({anomaly_rate:.1%} > {ALERT_ANOMALY_RATE_HIGH:.1%})" + ) + elif anomaly_rate < ALERT_ANOMALY_RATE_LOW and anomaly_rate > 0: + logger.warning( + f"[{model_name}] ⚠ ALERTE CALIBRATION : taux d'anomalie très bas " + f"({anomaly_rate:.3%} < {ALERT_ANOMALY_RATE_LOW:.1%})" + ) + if drift_alert: + logger.warning( + f"[{model_name}] ⚠ ALERTE DRIFT : {drift_rate:.1%} des features en dérive " + f"(seuil {ALERT_DRIFT_RATE:.1%})" + ) + if correlated_rate < ALERT_CORRELATION_RATE: + logger.warning( + f"[{model_name}] ⚠ ALERTE CORRÉLATION : taux de corrélation bas " + f"({correlated_rate:.1%} < {ALERT_CORRELATION_RATE:.1%}) — " + "vérifier ja4sentinel/logcorrelator" + ) + if latency_ms > ALERT_LATENCY_MS: + logger.warning( + f"[{model_name}] ⚠ ALERTE PERFORMANCE : latence cycle {latency_ms}ms " + f"> {ALERT_LATENCY_MS}ms" + ) diff --git a/services/bot-detector/bot_detector/models.py b/services/bot-detector/bot_detector/models.py index b7f46c9..681bc38 100644 --- a/services/bot-detector/bot_detector/models.py +++ b/services/bot-detector/bot_detector/models.py @@ -328,7 +328,8 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, drift_score = 0.0 drift_forced = False if age_ok and 'baseline_stats' in meta: - drift_score = compute_drift_score(meta['baseline_stats'], human_baseline, features) + drift_score = compute_drift_score(meta['baseline_stats'], human_baseline, features, + name=name, cycle_id=cycle_id) if drift_score >= DRIFT_THRESHOLD: drift_forced = True log_info(f"[{name}] Dérive détectée ({drift_score:.0%} features) — retraining forcé.") @@ -419,13 +420,18 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, return joblib.load(model_path), ae_prev, meta.get('features', features) log_info(f"[{name}] Aucun modèle précédent — utilisation du modèle rejeté par défaut.") - # A1 — Sauvegarder les statistiques de distribution avec quantile digest pour drift detection + # A1/§4 — Sauvegarder les statistiques de distribution avec quantile digest 9 points + # (p5…p95) pour une meilleure fidélité de la détection de dérive KS+KL baseline_stats = { f: { 'mean': float(X[f].mean()), 'std': float(X[f].std()), - 'p10': float(X[f].quantile(0.10)), 'p25': float(X[f].quantile(0.25)), - 'p50': float(X[f].quantile(0.50)), 'p75': float(X[f].quantile(0.75)), + 'p5': float(X[f].quantile(0.05)), + 'p10': float(X[f].quantile(0.10)), + 'p25': float(X[f].quantile(0.25)), + 'p50': float(X[f].quantile(0.50)), + 'p75': float(X[f].quantile(0.75)), 'p90': float(X[f].quantile(0.90)), + 'p95': float(X[f].quantile(0.95)), } for f in features } diff --git a/services/bot-detector/bot_detector/pipeline.py b/services/bot-detector/bot_detector/pipeline.py index a61b5bf..8c42ac7 100644 --- a/services/bot-detector/bot_detector/pipeline.py +++ b/services/bot-detector/bot_detector/pipeline.py @@ -19,6 +19,8 @@ from .models import load_or_train_model, load_or_train_xgb, TrafficAutoEncoder from .scoring import ( validate_features, compute_adaptive_threshold, normalize_scores, compute_shap_top_features, build_reason, cluster_anomalies, + compute_exiffi_importance, compute_ae_feature_errors, get_meta_learner, + FINGERPRINT_COHERENCE_THRESHOLD, ) @@ -56,6 +58,23 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): log_info(f'[{name}] Trafic à scorer (IF) : {len(unknown_traffic):>6}') log_info(f'[{name}] Baseline ISP (human) : {len(human_baseline):>6} (seuil min=500)') + # §3 — Exclure les sessions ISP à faible cohérence de fingerprint de la baseline humaine + # Ces sessions ISP avec un fingerprint incohérent sont probablement des proxies résidentiels + # ou des appareils mal configurés qui contamineraient la baseline. + if 'fingerprint_coherence_score' in human_baseline.columns: + low_coh = human_baseline['fingerprint_coherence_score'] < FINGERPRINT_COHERENCE_THRESHOLD + n_low_coh = int(low_coh.sum()) + if n_low_coh > 0: + human_baseline = human_baseline[~low_coh] + log_info( + f'[{name}] Baseline après filtre cohérence (<{FINGERPRINT_COHERENCE_THRESHOLD}) : ' + f'{len(human_baseline):>6} ({n_low_coh} exclues)' + ) + log_decision('LOW_COHERENCE_EXCLUDED', cycle_id, name, { + 'n_excluded': n_low_coh, 'threshold': FINGERPRINT_COHERENCE_THRESHOLD, + 'baseline_after': len(human_baseline), + }) + # A7 — Valider les features avant tout traitement valid_features = validate_features(df, features, name, cycle_id) if valid_features is None: @@ -130,16 +149,51 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): X_xgb = unknown_traffic[xgb_cols].replace([np.inf, -np.inf], np.nan).fillna(0) xgb_probs = xgb_model.predict_proba(X_xgb.values)[:, 1] unknown_traffic['xgb_prob'] = xgb_probs - # Méta-learner : combiner anomaly_score (EIF+AE) et xgb_prob - # anomaly_score déjà normalisé [0,1], xgb_prob est [0,1] - α_xgb = XGB_WEIGHT - unknown_traffic['anomaly_score'] = ( - (1 - α_xgb) * unknown_traffic['anomaly_score'] + α_xgb * xgb_probs - ) - log_info(f"[{name}] Score combiné EIF+AE+XGB (β={α_xgb}): xgb_mean={xgb_probs.mean():.4f}") + log_info(f"[{name}] XGBoost : xgb_mean={xgb_probs.mean():.4f}") except Exception as exc: log_info(f"[{name}] XGBoost scoring échoué : {exc} — EIF+AE seuls.") + # §8 — Score final via MetaLearner (ou poids fixes en fallback) + meta_learner = get_meta_learner(name) + eif_norm_arr = unknown_traffic['anomaly_score'].values.copy() + ae_norm_arr = normalize_scores(-unknown_traffic['ae_recon_error'].values) + xgb_prob_arr = unknown_traffic['xgb_prob'].values + hits_arr = unknown_traffic.get('hits', pd.Series(1, index=unknown_traffic.index)).values + corr_arr = unknown_traffic.get('correlated', pd.Series(0, index=unknown_traffic.index)).values + + final_scores = meta_learner.predict(eif_norm_arr, ae_norm_arr, xgb_prob_arr, + hits_arr, corr_arr) + unknown_traffic['anomaly_score'] = final_scores + + if meta_learner.is_trained: + log_info( + f"[{name}] §8 MetaLearner actif ({meta_learner._n_samples} labels) — " + f"score moyen={final_scores.mean():.4f}" + ) + elif unknown_traffic['xgb_prob'].mean() > 0: + log_info(f"[{name}] §8 Poids fixes EIF+AE+XGB (MetaLearner pas encore entraîné).") + + # §8 — Entraînement du MetaLearner sur les labels du cycle courant + # (accumulation progressive — activation dès MIN_SAMPLES labels) + try: + labeled_df = meta_learner.build_labels_from_df(unknown_traffic) + if not labeled_df.empty: + unknown_traffic_labeled = labeled_df.copy() + unknown_traffic_labeled['eif_norm'] = normalize_scores(raw_scores) + unknown_traffic_labeled['ae_norm'] = ae_norm_arr + if meta_learner.fit(unknown_traffic_labeled): + log_decision('META_LEARNER_TRAINED', cycle_id, name, meta_learner._weights_log) + except Exception as exc: + log_info(f"[{name}] MetaLearner entraînement échoué : {exc}") + + # §7 — ExIFFI : importance de features pour l'EIF (quand SHAP désactivé) + exiffi_tops: list = [{}] * len(unknown_traffic) + if not ENABLE_SHAP and len(unknown_traffic) > 0: + try: + exiffi_tops = compute_exiffi_importance(model, X_test, scoring_features) + except Exception: + pass + # A2 — Seuil adaptatif calculé sur les scores BRUTS (même échelle que ANOMALY_THRESHOLD) effective_threshold = compute_adaptive_threshold(raw_scores) log_info(f"[{name}] Seuil effectif : {effective_threshold:.4f} (statique={ANOMALY_THRESHOLD}, percentile={ANOMALY_PERCENTILE})") @@ -207,7 +261,7 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): # Log des axes moyens pour diagnostic ax_means = {} for ax in ['axis_ja4_known', 'axis_ja4_struct', 'axis_http_modern', - 'axis_nav_behavior', 'axis_tls_coherence']: + 'axis_nav_behavior', 'axis_tls_coherence', 'axis_h2_coherence']: col = unknown_traffic.get(ax, None) if col is not None: ax_means[ax.replace('axis_', '')] = round(float(col[browser_legit_mask].mean()), 3) @@ -297,9 +351,18 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): # A4 — Explainabilité SHAP : top features responsables de chaque anomalie X_anomalies = X_test.loc[anomalies.index] shap_tops = compute_shap_top_features(model, X_anomalies, valid_features) + + # §7 — ExIFFI : utiliser les tops ExIFFI précalculés quand SHAP est inactif + # Construire un mapping index → exiffi_top pour accès rapide + if len(exiffi_tops) == len(unknown_traffic): + _exiffi_map = dict(zip(unknown_traffic.index, exiffi_tops)) + exiffi_for_anomalies = [_exiffi_map.get(idx, {}) for idx in anomalies.index] + else: + exiffi_for_anomalies = [{}] * len(anomalies) anomalies['reason'] = [ - build_reason(name, row, shap) - for (_, row), shap in zip(anomalies.iterrows(), shap_tops) + build_reason(name, row, shap, exiffi) + for (_, row), shap, exiffi + in zip(anomalies.iterrows(), shap_tops, exiffi_for_anomalies) ] # A8 — Clustering DBSCAN pour identifier les campagnes coordonnées diff --git a/services/bot-detector/bot_detector/preprocessing.py b/services/bot-detector/bot_detector/preprocessing.py index a1018f7..92c1b99 100644 --- a/services/bot-detector/bot_detector/preprocessing.py +++ b/services/bot-detector/bot_detector/preprocessing.py @@ -33,7 +33,7 @@ FEATURES = [ # Browser multifactoriel 'is_known_browser', 'browser_consistency_score', 'browser_confidence', 'axis_ja4_known', 'axis_ja4_struct', 'axis_http_modern', - 'axis_nav_behavior', 'axis_tls_coherence', + 'axis_nav_behavior', 'axis_tls_coherence', 'axis_h2_coherence', # HTTP 'missing_accept_enc_ratio', 'http_scheme_ratio', # Thèse §5 @@ -41,12 +41,19 @@ FEATURES = [ 'cadence_cv', 'burst_ratio', 'pause_ratio', 'lag1_autocorrelation', 'benford_deviation', 'host_diversity', 'host_sweep_speed', 'host_coverage_uniformity', + # §5.8b — Similarité Jaccard cross-domaine (chemins partagés entre hosts) + 'cross_domain_path_similarity', # P0+P1 : features sous-exploitées (SQL existant ou ajouté) 'is_fake_navigation', 'true_window_size', 'window_mss_ratio', # P1 : nouvelles features de détection 'has_xff', 'unusual_content_type_ratio', 'non_standard_port_ratio', 'login_post_concentration', 'sec_ch_mobile_mismatch', + # §2 — Features HTTP/2 (fingerprint SETTINGS, cohérence H2↔JA4) + 'h2_settings_known', 'h2_pseudo_order_match', + 'h2_ja4_coherence', 'h2_settings_rare', + # §3 — Score de cohérence de fingerprint cross-layer + 'fingerprint_coherence_score', ] # Features supplémentaires pour le modèle Complet (données TCP/TLS requises) @@ -82,7 +89,7 @@ def preprocess_df(df: pd.DataFrame) -> pd.DataFrame: df['browser_confidence'] = browser_axes['browser_confidence'] for ax in ['axis_ja4_known', 'axis_ja4_struct', 'axis_http_modern', - 'axis_nav_behavior', 'axis_tls_coherence']: + 'axis_nav_behavior', 'axis_tls_coherence', 'axis_h2_coherence']: df[ax] = browser_axes[ax] # Rétro-compatibilité @@ -108,6 +115,8 @@ def preprocess_df(df: pd.DataFrame) -> pd.DataFrame: 'is_ua_rotating', 'is_alpn_missing', 'sni_host_mismatch', 'alpn_http_mismatch', 'mss_mobile_mismatch', 'anubis_is_flagged', 'is_rare_ja4', 'is_fake_navigation', 'has_xff', 'sec_ch_mobile_mismatch', + # §2 — Features HTTP/2 binaires + 'h2_settings_known', 'h2_pseudo_order_match', 'h2_ja4_coherence', 'h2_settings_rare', } for col in df.columns: if col in binary_features: diff --git a/services/bot-detector/bot_detector/scoring.py b/services/bot-detector/bot_detector/scoring.py index 6dcd516..148e5f4 100644 --- a/services/bot-detector/bot_detector/scoring.py +++ b/services/bot-detector/bot_detector/scoring.py @@ -1,12 +1,14 @@ """Scoring, dérive, validation, seuil adaptatif, SHAP et clustering. Regroupe les fonctions de scoring utilisées par le pipeline de détection : - - A1 : détection de dérive conceptuelle (KS-test / Z-score) + - A1 : détection de dérive conceptuelle (KS-test + KL divergence + dérive adversariale) - A2 : seuil adaptatif basé sur le percentile des scores négatifs - A4 : explainabilité SHAP (top features contributives) - A7 : validation de complétude des features - A8 : clustering HDBSCAN / DBSCAN des anomalies - A10 : normalisation [0,1] des scores d'anomalie + - §7 : ExIFFI — importance de features par permutation (Extended Isolation Forest) + - §8 : MetaLearner — pondération de l'ensemble par régression logistique """ import numpy as np import pandas as pd @@ -17,6 +19,7 @@ from .config import ( ENABLE_SHAP, SHAP_AVAILABLE, ENABLE_CLUSTERING, CLUSTERING_MIN_SAMPLES, EIF_AVAILABLE, HDBSCAN_AVAILABLE, STRUCTURAL_EXCLUDED_FEATURES, + AE_WEIGHT, XGB_WEIGHT, ) from .log import log_info, log_decision @@ -27,6 +30,9 @@ from .config import DBSCAN, StandardScaler if ENABLE_SHAP: from .config import _shap +# Seuil de filtre de cohérence de fingerprint pour la baseline humaine (§3) +import os +FINGERPRINT_COHERENCE_THRESHOLD = float(os.getenv('FINGERPRINT_COHERENCE_THRESHOLD', '0.25')) # Cache par modèle : dernier état des features invalides (pour ne logguer que les changements). _feature_warning_cache: dict = {} @@ -36,26 +42,63 @@ _feature_warning_cache: dict = {} # A1 — DÉTECTION DE DÉRIVE CONCEPTUELLE # ═══════════════════════════════════════════════════════════════════════════════ +def _kl_divergence(p_vals: np.ndarray, q_vals: np.ndarray, n_bins: int = 20) -> float: + """Calcule la divergence KL(P || Q) entre deux échantillons via histogramme. + + Utilise un lissage de Laplace pour éviter les divisions par zéro. + Retourne 0.0 si les données sont insuffisantes. + """ + if len(p_vals) < 10 or len(q_vals) < 10: + return 0.0 + combined_min = min(p_vals.min(), q_vals.min()) + combined_max = max(p_vals.max(), q_vals.max()) + if combined_max - combined_min < 1e-10: + return 0.0 + bins = np.linspace(combined_min, combined_max, n_bins + 1) + p_hist, _ = np.histogram(p_vals, bins=bins, density=False) + q_hist, _ = np.histogram(q_vals, bins=bins, density=False) + # Lissage de Laplace : ajouter 1 à chaque bin pour éviter les 0 + p_hist = (p_hist + 1).astype(float) + q_hist = (q_hist + 1).astype(float) + p_hist /= p_hist.sum() + q_hist /= q_hist.sum() + # KL(P || Q) = Σ p * log(p / q) + kl = float(np.sum(p_hist * np.log(p_hist / q_hist + 1e-12))) + return max(0.0, kl) + + def compute_drift_score(baseline_stats: dict, current_baseline: pd.DataFrame, - features: list) -> float: + features: list, *, name: str = '', cycle_id: str = '') -> float: """Compare la distribution actuelle de la baseline humaine avec celle de l'entraînement. - Utilise le test de Kolmogorov-Smirnov bilatéral par feature. La distribution - d'entraînement est reconstruite à partir d'un quantile digest (p10..p90) par - interpolation linéaire — bien plus fidèle qu'une approximation N(μ,σ) pour les - features asymétriques ou multimodales. - Retourne la fraction de features en dérive significative (p < 0.05). + §4 — Méthode améliorée : + - Quantile digest 9 points (p5…p95) au lieu de 5 pour une meilleure fidélité + - KS-test + divergence KL : feature en dérive si KS p<0.05 OU KL>0.5 + - Détection de dérive adversariale : >50% des features dérivent dans la même + direction → log_decision 'ADVERSARIAL_DRIFT' + + Retourne la fraction de features en dérive significative (en [0,1]). """ if not baseline_stats or current_baseline.empty: return 0.0 try: from scipy.stats import ks_2samp + _HAS_SCIPY = True except ImportError: - return _compute_drift_score_zscore(baseline_stats, current_baseline, features) + _HAS_SCIPY = False + + # Clés de quantiles : 9 points preferred, 5 points fallback + Q9_KEYS = ['p5', 'p10', 'p25', 'p50', 'p75', 'p90', 'p95'] + Q9_PROBS = np.array([0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95]) + Q5_KEYS = ['p10', 'p25', 'p50', 'p75', 'p90'] + Q5_PROBS = np.array([0.10, 0.25, 0.50, 0.75, 0.90]) drifted = 0 - tested = 0 + tested = 0 + drifted_features: list = [] + direction_shifts: list = [] rng = np.random.default_rng(42) + for feat in features: if feat not in baseline_stats or feat not in current_baseline.columns: continue @@ -67,42 +110,60 @@ def compute_drift_score(baseline_stats: dict, current_baseline: pd.DataFrame, if trained_std < 1e-9: continue - quantile_keys = ['p10', 'p25', 'p50', 'p75', 'p90'] - if all(k in stats for k in quantile_keys): - quantile_probs = np.array([0.10, 0.25, 0.50, 0.75, 0.90]) - quantile_vals = np.array([stats[k] for k in quantile_keys]) + # Reconstruction de la distribution d'entraînement par interpolation quantile + if all(k in stats for k in Q9_KEYS): + q_probs = Q9_PROBS + q_vals = np.array([stats[k] for k in Q9_KEYS]) + elif all(k in stats for k in Q5_KEYS): + q_probs = Q5_PROBS + q_vals = np.array([stats[k] for k in Q5_KEYS]) + else: + q_probs = None + + if q_probs is not None: u = rng.uniform(0, 1, size=len(curr_values)) - synthetic_trained = np.interp(u, quantile_probs, quantile_vals) + synthetic_trained = np.interp(u, q_probs, q_vals) else: synthetic_trained = rng.normal(stats['mean'], trained_std, size=len(curr_values)) - _, p_value = ks_2samp(curr_values.values, synthetic_trained) - if p_value < 0.05: - drifted += 1 - tested += 1 - return drifted / max(tested, 1) + feat_drifted = False + if _HAS_SCIPY: + _, ks_p = ks_2samp(curr_values.values, synthetic_trained) + if ks_p < 0.05: + feat_drifted = True + else: + # Fallback Z-score + z = abs(curr_values.mean() - stats['mean']) / trained_std + if z > 2.0: + feat_drifted = True + # KL divergence comme critère complémentaire au KS + kl = _kl_divergence(curr_values.values, synthetic_trained) + if kl > 0.5: + feat_drifted = True -def _compute_drift_score_zscore(baseline_stats: dict, current_baseline: pd.DataFrame, - features: list) -> float: - """Fallback Z-score pour la détection de dérive quand scipy n'est pas disponible.""" - if not baseline_stats or current_baseline.empty: - return 0.0 - drifted = 0 - tested = 0 - for feat in features: - if feat not in baseline_stats or feat not in current_baseline.columns: - continue - stats = baseline_stats[feat] - curr_mean = current_baseline[feat].mean() - trained_std = stats.get('std', 0) - if trained_std < 1e-9: - continue - z = abs(curr_mean - stats['mean']) / trained_std - if z > 2.0: + if feat_drifted: drifted += 1 + drifted_features.append(feat) + # Direction du shift pour la détection adversariale + direction_shifts.append(np.sign(curr_values.mean() - stats['mean'])) tested += 1 - return drifted / max(tested, 1) + + drift_rate = drifted / max(tested, 1) + + # Détection de dérive adversariale : >50% des features dérivent simultanément + # dans la même direction → signe d'une manipulation intentionnelle de la distribution + if drift_rate > 0.50 and len(direction_shifts) >= 10: + consensus = abs(float(np.mean(direction_shifts))) + if consensus >= 0.8: + log_decision('ADVERSARIAL_DRIFT', cycle_id, name, { + 'drift_rate': round(drift_rate, 3), + 'consensus': round(consensus, 3), + 'n_features': len(drifted_features), + 'top_drifted': drifted_features[:10], + }) + + return drift_rate # ═══════════════════════════════════════════════════════════════════════════════ @@ -216,14 +277,18 @@ def compute_shap_top_features(model, X: pd.DataFrame, features: list, return [{}] * len(X) -def build_reason(name: str, row: pd.Series, shap_top: dict) -> str: - """Construit le champ reason enrichi avec le top SHAP ou les métriques clés.""" +def build_reason(name: str, row: pd.Series, shap_top: dict, + exiffi_top: dict = None) -> str: + """Construit le champ reason enrichi avec SHAP, ExIFFI ou les métriques clés.""" # Utilise le score brut pour l'affichage (plus interprétable que le score normalisé) score = round(float(row.get('raw_anomaly_score', row.get('anomaly_score', 0))), 3) threat = row.get('threat_level', '') if shap_top: top_str = ' | '.join(f"{f}({v:+.3f})" for f, v in shap_top.items()) return f"[{name}] Score: {score} | SHAP: {top_str} | Threat: {threat}" + if exiffi_top: + top_str = ' | '.join(f"{f}({v:+.3f})" for f, v in exiffi_top.items()) + return f"[{name}] Score: {score} | ExIFFI: {top_str} | Threat: {threat}" vel = round(float(row.get('hit_velocity', 0)), 1) fuzz = round(float(row.get('fuzzing_index', 0)), 1) return f"[{name}] Score: {score} | Vel: {vel} req/s | Fuzzing: {fuzz} | Threat: {threat}" @@ -277,3 +342,223 @@ def cluster_anomalies(anomalies: pd.DataFrame, features: list, log_info(f"[Clustering] Erreur de clustering: {e}") anomalies['campaign_id'] = -1 return anomalies + + +# ═══════════════════════════════════════════════════════════════════════════════ +# §7 — ExIFFI : IMPORTANCE DE FEATURES PAR PERMUTATION +# ═══════════════════════════════════════════════════════════════════════════════ + +def compute_exiffi_importance(model, X: pd.DataFrame, features: list, + n_top: int = 3, sample_size: int = 200) -> list: + """§7 — Importance de features pour l'EIF par permutation (ExIFFI simplifié). + + Pour chaque feature, calcule la dégradation de score lors de la permutation + de la colonne. La différence de score (permuted − original) mesure l'importance : + une valeur positive élevée signifie que permuter cette feature rend la session + moins anomale → la feature est un signal fort de détection. + + Compatible sklearn IsolationForest et isotree ExtendedIsolationForest. + Retourne une liste de dicts {feature: importance_score} par ligne de X. + """ + if X.empty or len(features) < 2: + return [{}] * len(X) + try: + # Limiter la taille pour la performance + if len(X) > sample_size: + X_sample = X[features].sample(n=sample_size, random_state=42) + else: + X_sample = X[features] + X_arr = X_sample.fillna(0).values.copy() + baseline_scores = model.decision_function(X_arr) + + feat_importance: dict = {} + rng = np.random.default_rng(42) + for i, feat in enumerate(features): + if feat not in X_sample.columns: + continue + X_perm = X_arr.copy() + # Permuter la colonne (brise la corrélation signal/label) + X_perm[:, i] = rng.permutation(X_perm[:, i]) + perm_scores = model.decision_function(X_perm) + # Importance = gain moyen de score lors de la permutation + feat_importance[feat] = float(np.mean(perm_scores - baseline_scores)) + + # Trier par importance décroissante : les plus grandes valeurs = features clés + sorted_feats = sorted(feat_importance.items(), key=lambda kv: kv[1], reverse=True) + top_feats = {f: round(v, 4) for f, v in sorted_feats[:n_top]} + + # L'importance est globale (calculée sur un sample) : même top pour toutes les lignes + return [top_feats] * len(X) + except Exception as e: + log_info(f"[ExIFFI] Erreur de calcul : {e}") + return [{}] * len(X) + + +def compute_ae_feature_errors(ae_model, X: pd.DataFrame, features: list, + n_top: int = 3) -> list: + """§7 — Erreur de reconstruction par feature de l'Autoencoder. + + Pour chaque session anomale, détermine quelles features ont la plus grande + erreur de reconstruction individuelle (signal de l'anomalie la plus saillante). + Retourne une liste de dicts {feature: reconstruction_error} par ligne de X. + """ + if ae_model is None or X.empty: + return [{}] * len(X) + try: + import torch + X_arr = X[features].fillna(0).values.astype(np.float32) + X_t = torch.tensor(X_arr) + with torch.no_grad(): + X_rec = ae_model.forward(X_t).numpy() + per_feature_err = (X_arr - X_rec) ** 2 # shape: (n_samples, n_features) + result = [] + for row_err in per_feature_err: + pairs = sorted(zip(features, row_err.tolist()), key=lambda kv: kv[1], reverse=True) + result.append({f: round(v, 6) for f, v in pairs[:n_top]}) + return result + except Exception as e: + log_info(f"[AE features] Erreur de calcul erreur par feature : {e}") + return [{}] * len(X) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# §8 — META-LEARNER : PONDÉRATION APPRISE DE L'ENSEMBLE +# ═══════════════════════════════════════════════════════════════════════════════ + +class MetaLearner: + """§8 — Méta-learner (régression logistique) pour la pondération de l'ensemble. + + Remplace les poids fixes (1−XGB_W)×((1−AE_W)×eif + AE_W×ae) + XGB_W×xgb + par une régression logistique entraînée sur les labels SOC/Anubis/bots connus. + + Formule apprise : + P(bot) = logistic(w1×eif_norm + w2×ae_norm + w3×xgb_prob + + w4×log1p(hits) + w5×correlated + bias) + + Fallback automatique aux poids fixes quand < MIN_SAMPLES labels disponibles. + """ + + MIN_SAMPLES = 1000 # Nombre minimal de labels pour activer le méta-learner + + def __init__(self): + self._clf = None + self._n_samples: int = 0 + self._feature_names = ['eif_norm', 'ae_norm', 'xgb_prob', 'log_hits', 'correlated'] + self._weights_log: dict = {} # Pour la transparence SOC + + def fit(self, df: pd.DataFrame) -> bool: + """Entraîne le méta-learner sur un DataFrame de sessions labelisées. + + Colonnes requises : eif_norm, ae_norm (ou 0), xgb_prob (ou 0), + hits, correlated, label (0=normal, 1=bot). + Retourne True si l'entraînement a réussi. + """ + try: + from sklearn.linear_model import LogisticRegression + from sklearn.preprocessing import StandardScaler as _SS + except ImportError: + return False + + required = {'eif_norm', 'label'} + if not required.issubset(df.columns): + return False + + df = df.dropna(subset=['eif_norm', 'label']) + if len(df) < self.MIN_SAMPLES: + return False + + X_meta = pd.DataFrame({ + 'eif_norm': df['eif_norm'].clip(0, 1), + 'ae_norm': df.get('ae_norm', pd.Series(0.0, index=df.index)).clip(0, 1), + 'xgb_prob': df.get('xgb_prob', pd.Series(0.0, index=df.index)).clip(0, 1), + 'log_hits': np.log1p(df.get('hits', pd.Series(1, index=df.index)).clip(1)), + 'correlated': df.get('correlated', pd.Series(0, index=df.index)).clip(0, 1), + }).fillna(0) + y = df['label'].astype(int) + + scaler = _SS() + X_scaled = scaler.fit_transform(X_meta) + clf = LogisticRegression(max_iter=500, C=1.0, random_state=42) + clf.fit(X_scaled, y) + + # Enregistrer les poids pour la transparence SOC + coefs = dict(zip(self._feature_names, clf.coef_[0].tolist())) + self._clf = (clf, scaler) + self._n_samples = len(df) + self._weights_log = { + 'coefs': {k: round(v, 4) for k, v in coefs.items()}, + 'intercept': round(float(clf.intercept_[0]), 4), + 'n_samples': self._n_samples, + } + log_info(f"[MetaLearner] Entraîné sur {self._n_samples} labels — coefs: {coefs}") + return True + + def predict(self, eif_norm: np.ndarray, ae_norm: np.ndarray, + xgb_prob: np.ndarray, hits: np.ndarray = None, + correlated: np.ndarray = None) -> np.ndarray: + """Prédit P(bot) avec le méta-learner ou les poids fixes en fallback. + + Retourne un array de probabilités dans [0, 1]. + """ + n = len(eif_norm) + if hits is None: + hits = np.ones(n) + if correlated is None: + correlated = np.zeros(n) + + if self.is_trained: + clf, scaler = self._clf + X_meta = np.column_stack([ + np.clip(eif_norm, 0, 1), + np.clip(ae_norm, 0, 1), + np.clip(xgb_prob, 0, 1), + np.log1p(np.clip(hits, 1, None)), + np.clip(correlated, 0, 1), + ]) + try: + X_scaled = scaler.transform(X_meta) + return clf.predict_proba(X_scaled)[:, 1] + except Exception as e: + log_info(f"[MetaLearner] Prédiction échouée ({e}) — fallback poids fixes") + + # Fallback : formule linéaire avec poids fixes + _ae_w = AE_WEIGHT + _xgb_w = XGB_WEIGHT + return (1 - _xgb_w) * ((1 - _ae_w) * eif_norm + _ae_w * ae_norm) + _xgb_w * xgb_prob + + @property + def is_trained(self) -> bool: + """Vrai si le méta-learner est actif (assez de labels pour être fiable).""" + return self._clf is not None and self._n_samples >= self.MIN_SAMPLES + + def build_labels_from_df(self, df: pd.DataFrame) -> pd.DataFrame: + """Construit les labels supervisés pour l'entraînement du méta-learner. + + Label=1 : bot connu (bot_name), Anubis DENY, anomalie CRITICAL/HIGH confirmée. + Label=0 : navigateur légitime LEGITIMATE_BROWSER, Anubis ALLOW. + Retourne seulement les lignes avec un label fiable. + """ + labeled = [] + for _, row in df.iterrows(): + threat = str(row.get('threat_level', '')) + bot_name = str(row.get('bot_name', '')) + action = str(row.get('anubis_bot_action', '')) + if bot_name != '' or action == 'DENY' or threat in ('CRITICAL', 'HIGH', 'KNOWN_BOT', 'ANUBIS_DENY'): + labeled.append({**row, 'label': 1}) + elif threat == 'LEGITIMATE_BROWSER' or action == 'ALLOW': + labeled.append({**row, 'label': 0}) + if not labeled: + return pd.DataFrame() + return pd.DataFrame(labeled) + + +# Singleton partagé entre les modèles Complet et Applicatif +_meta_learner_complet = MetaLearner() +_meta_learner_applicatif = MetaLearner() + + +def get_meta_learner(name: str) -> MetaLearner: + """Retourne le singleton MetaLearner correspondant au modèle ``name``.""" + if 'Applicatif' in name: + return _meta_learner_applicatif + return _meta_learner_complet diff --git a/services/correlator/sql/migrations/05_fleet_metrics_tables.sql b/services/correlator/sql/migrations/05_fleet_metrics_tables.sql new file mode 100644 index 0000000..1286d9a --- /dev/null +++ b/services/correlator/sql/migrations/05_fleet_metrics_tables.sql @@ -0,0 +1,55 @@ +-- === 05_fleet_metrics_tables.sql — Tables fleet_detections et ml_performance_metrics === +-- +-- fleet_detections : résultats du détecteur de flottes §5.2 (JA4×ASN bipartite graph) +-- ml_performance_metrics : métriques de performance du pipeline ML par cycle +-- +-- Appliquer avec : +-- clickhouse-client --multiquery < 05_fleet_metrics_tables.sql + +-- --- fleet_detections --- +CREATE TABLE IF NOT EXISTS ja4_processing.fleet_detections +( + detected_at DateTime, + community_id UInt32, + ja4_set Array(String), + asn_set Array(String), + fleet_score Float32, + n_ips UInt32, + ip_sample Array(String), -- échantillon des 20 premières IPs + model_name LowCardinality(String) DEFAULT '' +) +ENGINE = MergeTree +PARTITION BY toDate(detected_at) +ORDER BY (detected_at, community_id) +TTL detected_at + INTERVAL 7 DAY +SETTINGS ttl_only_drop_parts = 1; + +-- --- ml_performance_metrics --- +CREATE TABLE IF NOT EXISTS ja4_processing.ml_performance_metrics +( + cycle_at DateTime, + model_name LowCardinality(String), + total_sessions UInt64, + correlated_rate Float32, + anomaly_rate Float32, + critical_count UInt32, + high_count UInt32, + medium_count UInt32, + low_count UInt32, + known_bot_count UInt32, + anubis_deny_count UInt32, + legit_browser_count UInt32, + drift_rate Float32, + drift_alert UInt8, + cycle_latency_ms UInt32, + features_valid UInt16, + features_total UInt16, + baseline_size UInt32, + threshold Float32, + meta_learner_active UInt8 DEFAULT 0 +) +ENGINE = MergeTree +PARTITION BY toDate(cycle_at) +ORDER BY (cycle_at, model_name) +TTL cycle_at + INTERVAL 90 DAY +SETTINGS ttl_only_drop_parts = 1; diff --git a/services/dashboard/backend/routes/api.py b/services/dashboard/backend/routes/api.py index ea86a83..c07f8bd 100644 --- a/services/dashboard/backend/routes/api.py +++ b/services/dashboard/backend/routes/api.py @@ -1633,3 +1633,45 @@ async def reflist_stats(name: str): except Exception as exc: logger.exception("reflist stats query failed for %s", name) raise HTTPException(status_code=500, detail=str(exc)) + + +@router.get("/fleet") +async def fleet() -> dict[str, Any]: + """Détections de flottes JA4×ASN (§5.2).""" + rows = query( + f"SELECT detected_at, community_id, fleet_score, n_ips, ja4_set, asn_set, ip_sample " + f"FROM {_DB}.fleet_detections " + f"WHERE detected_at >= now() - INTERVAL 7 DAY " + f"ORDER BY fleet_score DESC " + f"LIMIT 100" + ) + return {"fleets": rows} + + +@router.get("/health") +async def health_metrics() -> dict[str, Any]: + """Métriques de santé du pipeline ML (Étape 9).""" + rows = query( + f"SELECT cycle_at, model_name, total_sessions, correlated_rate, anomaly_rate, " + f" critical_count, high_count, drift_rate, drift_alert, cycle_latency_ms, " + f" features_valid, features_total, baseline_size, meta_learner_active " + f"FROM {_DB}.ml_performance_metrics " + f"WHERE cycle_at >= now() - INTERVAL 7 DAY " + f"ORDER BY cycle_at DESC " + f"LIMIT 500" + ) + # Statistiques de synthèse + if rows: + latest = {r['model_name']: r for r in rows} + avg_anomaly = sum(r['anomaly_rate'] for r in rows) / len(rows) + avg_latency = sum(r['cycle_latency_ms'] for r in rows) / len(rows) + else: + latest = {} + avg_anomaly = 0 + avg_latency = 0 + return { + "metrics": rows, + "latest_by_model": latest, + "avg_anomaly_rate": round(avg_anomaly, 4), + "avg_latency_ms": round(avg_latency), + } diff --git a/services/dashboard/backend/routes/pages.py b/services/dashboard/backend/routes/pages.py index f2cf765..d151745 100644 --- a/services/dashboard/backend/routes/pages.py +++ b/services/dashboard/backend/routes/pages.py @@ -81,3 +81,13 @@ async def tactics_page(request: Request): @router.get("/reflists") async def reflists_page(request: Request): return templates.TemplateResponse("reflists.html", _ctx(request, "reflists")) + + +@router.get("/fleet") +async def fleet_page(request: Request): + return templates.TemplateResponse("fleet.html", _ctx(request, "fleet")) + + +@router.get("/health") +async def health_page(request: Request): + return templates.TemplateResponse("health.html", _ctx(request, "health")) diff --git a/services/dashboard/backend/templates/base.html b/services/dashboard/backend/templates/base.html index c4bbb1a..98174c7 100644 --- a/services/dashboard/backend/templates/base.html +++ b/services/dashboard/backend/templates/base.html @@ -155,6 +155,10 @@ + + + +
@@ -183,6 +187,10 @@ + + + +Chaque ligne = une communauté du graphe bipartite. Les flottes avec un + fleet_score élevé sont les plus coordonnées.
+ja4_set : Fingerprints TLS utilisés par la flotte.
+asn_set : Réseaux (ASN) de la flotte.
+n_ips : Nombre d'IPs distinctes dans la communauté.
+Source : fleet_detections (7j)
+| Détecté le | +Comm. | +Score | +IPs | +JA4 impliqués | +ASNs impliqués | +Échantillon IPs | +
|---|---|---|---|---|---|---|
| Chargement… | ||||||
Pourcentage de sessions classées HIGH/CRITICAL/MEDIUM/LOW par cycle, par modèle.
+Seuil d'alerte : > 10% → sur-détection probable. < 0.5% → sous-détection.
+Source : ml_performance_metrics
+Durée totale du cycle bot-detector en millisecondes (fetch ClickHouse + scoring ML + insert).
+Seuil d'alerte : > 300 000ms (5 min) → le cycle dépasse l'intervalle planifié.
+Source : ml_performance_metrics.cycle_latency_ms
+Proportion de sessions avec correlated=1 (JA4 TLS corrélé par sentinel).
Valeur attendue : > 50%. En dessous, vérifier sentinel et logcorrelator.
+Source : ml_performance_metrics.correlated_rate
+| Cycle | +Modèle | +Sessions | +Corrélation | +Anomalies | +CRITICAL | +HIGH | +Drift | +Latence (ms) | +Features | +Baseline | +Meta | +
|---|---|---|---|---|---|---|---|---|---|---|---|
| Chargement… | |||||||||||