From 1f103392acca4c1ce12ceb3895c79b0f8795c4b5 Mon Sep 17 00:00:00 2001 From: toto Date: Thu, 9 Apr 2026 01:02:04 +0200 Subject: [PATCH] refactor(bot-detector): extract monolith into modular package Split bot_detector.py (~1982 lines) into 10 focused modules: - config.py: all configuration constants and optional imports - log.py: logging utilities (log_info, log_decision, append_training_history) - infra.py: ClickHouse client, health check HTTP server, shutdown - browser.py: multifactorial browser identification (5 axes) - scoring.py: drift detection, feature validation, SHAP, clustering - models.py: EIF, Autoencoder, XGBoost model management - preprocessing.py: data preprocessing and feature list definitions - pipeline.py: core semi-supervised scoring loop - cycle.py: main analysis cycle orchestration - __main__.py: entry point with startup banner Update Dockerfile to copy package directory and use python -m bot_detector. All 36 existing tests pass unchanged. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- services/bot-detector/bot_detector/Dockerfile | 4 +- .../bot-detector/bot_detector/__init__.py | 1 + .../bot-detector/bot_detector/__main__.py | 41 ++ services/bot-detector/bot_detector/browser.py | 170 +++++++ services/bot-detector/bot_detector/config.py | 153 ++++++ services/bot-detector/bot_detector/cycle.py | 371 ++++++++++++++ services/bot-detector/bot_detector/infra.py | 89 ++++ services/bot-detector/bot_detector/log.py | 65 +++ services/bot-detector/bot_detector/models.py | 478 ++++++++++++++++++ .../bot-detector/bot_detector/pipeline.py | 353 +++++++++++++ .../bot_detector/preprocessing.py | 110 ++++ services/bot-detector/bot_detector/scoring.py | 279 ++++++++++ 12 files changed, 2112 insertions(+), 2 deletions(-) create mode 100644 services/bot-detector/bot_detector/__init__.py create mode 100644 services/bot-detector/bot_detector/__main__.py create mode 100644 services/bot-detector/bot_detector/browser.py create mode 100644 services/bot-detector/bot_detector/config.py create mode 100644 services/bot-detector/bot_detector/cycle.py create mode 100644 services/bot-detector/bot_detector/infra.py create mode 100644 services/bot-detector/bot_detector/log.py create mode 100644 services/bot-detector/bot_detector/models.py create mode 100644 services/bot-detector/bot_detector/pipeline.py create mode 100644 services/bot-detector/bot_detector/preprocessing.py create mode 100644 services/bot-detector/bot_detector/scoring.py diff --git a/services/bot-detector/bot_detector/Dockerfile b/services/bot-detector/bot_detector/Dockerfile index 7a9af2f..bdf2d39 100644 --- a/services/bot-detector/bot_detector/Dockerfile +++ b/services/bot-detector/bot_detector/Dockerfile @@ -13,6 +13,6 @@ RUN pip install --no-cache-dir /app/shared/ja4_common/ COPY services/bot-detector/bot_detector/requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -COPY services/bot-detector/bot_detector/bot_detector.py . +COPY services/bot-detector/bot_detector/ ./bot_detector/ -CMD ["python", "bot_detector.py"] +CMD ["python", "-m", "bot_detector"] diff --git a/services/bot-detector/bot_detector/__init__.py b/services/bot-detector/bot_detector/__init__.py new file mode 100644 index 0000000..ea09e03 --- /dev/null +++ b/services/bot-detector/bot_detector/__init__.py @@ -0,0 +1 @@ +"""Package bot_detector — détection de bots par ML semi-supervisé.""" diff --git a/services/bot-detector/bot_detector/__main__.py b/services/bot-detector/bot_detector/__main__.py new file mode 100644 index 0000000..324ab3e --- /dev/null +++ b/services/bot-detector/bot_detector/__main__.py @@ -0,0 +1,41 @@ +"""Point d'entrée du bot-detector : boucle principale d'analyse.""" +import time + +from .config import * # noqa: F403 — All config for startup banner +from .log import log_info, log_decision +from .cycle import fetch_and_analyze + +if __name__ == '__main__': + log_info('') + log_info('╔═══════════════════════════════════════════════════════════════╗') + log_info('║ BOT DETECTOR IA v12 — Pipeline de détection semi-supervisé ║') + log_info('╚═══════════════════════════════════════════════════════════════╝') + log_info(f' Base de données : {DB}') + log_info(f' Contamination EIF : {CONTAMINATION}') + log_info(f' Seuil anomalie : {ANOMALY_THRESHOLD} (adaptatif p{ANOMALY_PERCENTILE})') + log_info(f' Cycle d\'analyse : toutes les {CYCLE_INTERVAL}s') + log_info(f' Retraining : toutes les {RETRAIN_INTERVAL_H}h (drift seuil={DRIFT_THRESHOLD:.0%})') + log_info(f' Modèles : {MODEL_DIR}') + log_info(f' Autoencoder (AE) : poids={AE_WEIGHT} {"(PyTorch disponible)" if TORCH_AVAILABLE else "(PyTorch absent — désactivé)"}') + log_info(f' XGBoost : poids={XGB_WEIGHT} (min labels={XGB_MIN_LABELS})') + log_info(f' SHAP explainabilité : {"activé" if ENABLE_SHAP else "désactivé (shap non installé)" if not SHAP_AVAILABLE else "désactivé"}') + log_info(f' Clustering HDBSCAN : {"activé" if ENABLE_CLUSTERING else "désactivé"}') + log_info(f' Dédup inter-cycles : TTL={DEDUP_TTL_MIN}min') + log_info(f' Récurrence : weight={RECURRENCE_WEIGHT}') + log_info(f' Multi-fenêtres (24h) : {"activé" if ENABLE_MULTIWINDOW else "désactivé"}') + log_info(f' Feature ratio minimum : {MIN_VALID_FEATURE_RATIO:.0%}') + log_info(f' Anubis : ALLOW→KNOWN_BOT, DENY→forcé menace (score IF réel)') + log_info(f' Browser légitime : confidence≥{BROWSER_CONFIDENCE_THRESHOLD} (5 axes), cohorte≥{BROWSER_COHORT_RATIO}') + log_info(f' Feedback SOC : {"activé" if ENABLE_FEEDBACK else "désactivé"} (fenêtre={FEEDBACK_WINDOW_DAYS}j)') + log_info('') + log_decision('SERVICE_START', 'boot', '', { + 'db': DB, 'contamination': CONTAMINATION, 'anomaly_threshold': ANOMALY_THRESHOLD, + 'cycle_interval': CYCLE_INTERVAL, 'retrain_interval_h': RETRAIN_INTERVAL_H + }) + log_info(f'En attente du premier cycle dans {CYCLE_INTERVAL}s…') + while True: + try: + fetch_and_analyze() + except Exception as e: + log_info(f"Erreur globale : {e}") + time.sleep(CYCLE_INTERVAL) diff --git a/services/bot-detector/bot_detector/browser.py b/services/bot-detector/bot_detector/browser.py new file mode 100644 index 0000000..fcece45 --- /dev/null +++ b/services/bot-detector/bot_detector/browser.py @@ -0,0 +1,170 @@ +"""A9 — Identification multifactorielle des navigateurs. + +5 axes indépendants combinés pour identifier les navigateurs légitimes. +Chaque axe produit un score [0,1]. La combinaison pondérée produit +browser_confidence [0,1], remplaçant l'ancien système binaire JA4-only. +""" +import pandas as pd +import numpy as np + +# Profils structurels JA4 des navigateurs connus (TCP). +# Plages de cipher/extension count caractéristiques par famille TLS. +# Sources : FoxIO ja4plus-mapping.csv, ja4db.com, captures réelles. +_BROWSER_JA4_PROFILES = { + 'Chromium': {'tls': '13', 'alpn': ('h2', 'h3'), + 'ciphers': range(13, 19), 'exts': range(13, 20)}, + 'Firefox': {'tls': '13', 'alpn': ('h2', 'h3'), + 'ciphers': range(15, 20), 'exts': range(13, 18)}, + 'Safari': {'tls': '13', 'alpn': ('h2', 'h3'), + 'ciphers': range(17, 25), 'exts': range(11, 17)}, + 'Tor_Browser': {'tls': '13', 'alpn': ('h2',), + 'ciphers': range(14, 18), 'exts': range(12, 17)}, +} + +# Pondération des 5 axes pour le score browser_confidence. +_AXIS_WEIGHTS = { + 'ja4_known': 0.25, # Axe 1 — Signature JA4 dans dict_browser_ja4 + 'ja4_struct': 0.15, # Axe 2 — Structure JA4 (TLS1.3, h2, nb ciphers/ext) + 'http_modern': 0.25, # Axe 3 — Headers HTTP modernes (sec-ch-ua, Sec-Fetch-*) + 'nav_behavior': 0.15, # Axe 4 — Comportement de navigation (assets, referers) + 'tls_coherence': 0.20, # Axe 5 — Cohérence TLS/TCP (pas de mismatch) +} + + +def _parse_ja4_columns(ja4_series: pd.Series) -> pd.DataFrame: + """Parse la partie structurelle du JA4 (10 premiers caractères) vectorisé. + + Format JA4 : {proto}{tls_ver}{sni}{cipher_cnt}{ext_cnt}{alpn}_hash_hash + Exemple : t13d1516h2_8daaf6152771_02713d6af862 + + Retourne un DataFrame avec : ja4_tls, ja4_ciphers, ja4_exts, ja4_alpn. + """ + s = ja4_series.fillna('').astype(str) + valid = s.str.len() >= 10 + result = pd.DataFrame(index=ja4_series.index) + result['ja4_tls'] = s.str[1:3].where(valid, '00') + result['ja4_ciphers'] = pd.to_numeric(s.str[4:6].where(valid, '0'), errors='coerce').fillna(0).astype(int) + result['ja4_exts'] = pd.to_numeric(s.str[6:8].where(valid, '0'), errors='coerce').fillna(0).astype(int) + result['ja4_alpn'] = s.str[8:10].where(valid, '00') + return result + + +def _compute_browser_axes(df: pd.DataFrame) -> pd.DataFrame: + """Calcule les 5 axes d'identification navigateur pour chaque session. + + Axe 1 (ja4_known) : JA4 présent dans dict_browser_ja4 (0 ou 1) + Axe 2 (ja4_struct) : Structure JA4 compatible navigateur (TLS1.3 + h2/h3 + ciphers/exts) + Axe 3 (http_modern) : Headers HTTP modernes (sec-ch-ua, Accept-Language, Sec-Fetch-*) + Axe 4 (nav_behavior) : Comportement navigation (cookies, referer, assets, accès direct) + Axe 5 (tls_coherence) : Cohérence TLS/TCP (pas de mismatch ALPN, window scaling, TLS1.3) + + Retourne un DataFrame avec les colonnes axis_* et browser_confidence. + """ + n = len(df) + axes = pd.DataFrame(index=df.index) + + # ── Axe 1 — Signature JA4 connue ── + bf = df.get('browser_family', pd.Series('', index=df.index)).fillna('').astype(str) + axes['axis_ja4_known'] = (bf != '').astype(float) + + # ── Axe 2 — Structure JA4 ── + ja4_parsed = _parse_ja4_columns(df.get('ja4', pd.Series('', index=df.index))) + is_tls13 = (ja4_parsed['ja4_tls'] == '13').astype(float) + is_h2h3 = ja4_parsed['ja4_alpn'].isin(['h2', 'h3']).astype(float) + # Plage de ciphers et extensions typique d'un navigateur (10-25) + c = ja4_parsed['ja4_ciphers'] + e = ja4_parsed['ja4_exts'] + ciphers_ok = ((c >= 10) & (c <= 25)).astype(float) + exts_ok = ((e >= 10) & (e <= 25)).astype(float) + axes['axis_ja4_struct'] = ( + is_tls13 * 0.35 + is_h2h3 * 0.25 + ciphers_ok * 0.20 + exts_ok * 0.20 + ) + + # ── Axe 3 — Headers HTTP modernes ── + mbs = df.get('modern_browser_score', pd.Series(0, index=df.index)).fillna(0) + hal = df.get('has_accept_language', pd.Series(0, index=df.index)).fillna(0) + sfa = df.get('sec_fetch_absence_rate', pd.Series(1, index=df.index)).fillna(1) + gar = df.get('generic_accept_ratio', pd.Series(1, index=df.index)).fillna(1) + uam = df.get('ua_ch_mismatch', pd.Series(0, index=df.index)).fillna(0) + axes['axis_http_modern'] = ( + (mbs >= 50).astype(float) * 0.35 + + (hal > 0).astype(float) * 0.20 + + (sfa < 0.3).astype(float) * 0.25 + + (gar < 0.3).astype(float) * 0.10 + + (uam == 0).astype(float) * 0.10 + ) + + # ── Axe 4 — Comportement de navigation ── + hck = df.get('has_cookie', pd.Series(0, index=df.index)).fillna(0) + hrf = df.get('has_referer', pd.Series(0, index=df.index)).fillna(0) + asr = df.get('asset_ratio', pd.Series(0, index=df.index)).fillna(0) + dar = df.get('direct_access_ratio', pd.Series(1, index=df.index)).fillna(1) + axes['axis_nav_behavior'] = ( + (hck > 0).astype(float) * 0.25 + + (hrf > 0).astype(float) * 0.25 + + (asr > 0.15).astype(float) * 0.25 + + (dar < 0.5).astype(float) * 0.25 + ) + + # ── Axe 5 — Cohérence TLS/TCP ── + alm = df.get('alpn_http_mismatch', pd.Series(0, index=df.index)).fillna(0) + nws = df.get('no_window_scale_ratio', pd.Series(0, index=df.index)).fillna(0) + t12 = df.get('tls12_ratio', pd.Series(0, index=df.index)).fillna(0) + h10 = df.get('http10_ratio', pd.Series(0, index=df.index)).fillna(0) + iam = df.get('is_alpn_missing', pd.Series(0, index=df.index)).fillna(0) + axes['axis_tls_coherence'] = ( + (alm == 0).astype(float) * 0.25 + + (nws == 0).astype(float) * 0.20 + + (t12 < 0.1).astype(float) * 0.20 + + (h10 == 0).astype(float) * 0.15 + + (iam == 0).astype(float) * 0.20 + ) + + # ── Score combiné pondéré ── + axes['browser_confidence'] = sum( + axes[f'axis_{k}'] * w for k, w in _AXIS_WEIGHTS.items() + ) + return axes + + +def _infer_browser_family(df: pd.DataFrame, ja4_parsed: pd.DataFrame, + axes: pd.DataFrame) -> pd.Series: + """Infère la famille navigateur par analyse structurelle quand dict_browser_ja4 manque. + + Priorité : + 1. browser_family connu (dict_browser_ja4) → conservé + 2. Profil structurel JA4 (cipher/ext count) → famille probable + 3. Sinon → '' (inconnu) + + La famille inférée nécessite browser_confidence ≥ 0.45 pour éviter les faux positifs + (un bot avec un JA4 structurellement similaire mais sans comportement navigateur). + """ + bf = df.get('browser_family', pd.Series('', index=df.index)).fillna('').astype(str) + result = bf.copy() + unknown_mask = (result == '') + if not unknown_mask.any(): + return result + + # Inférence structurelle sur les sessions sans famille connue + c = ja4_parsed.loc[unknown_mask, 'ja4_ciphers'] + e = ja4_parsed.loc[unknown_mask, 'ja4_exts'] + tls = ja4_parsed.loc[unknown_mask, 'ja4_tls'] + alpn = ja4_parsed.loc[unknown_mask, 'ja4_alpn'] + conf = axes.loc[unknown_mask, 'browser_confidence'] + + # Seuil minimal : le comportement global doit être suffisamment navigateur + eligible = conf >= 0.45 + + for family, profile in _BROWSER_JA4_PROFILES.items(): + match = ( + eligible + & (tls == profile['tls']) + & alpn.isin(profile['alpn']) + & c.isin(profile['ciphers']) + & e.isin(profile['exts']) + ) + # Affecter seulement si pas encore attribué + match = match & (result.loc[unknown_mask] == '') + result.loc[match[match].index] = family + + return result diff --git a/services/bot-detector/bot_detector/config.py b/services/bot-detector/bot_detector/config.py new file mode 100644 index 0000000..3d9256d --- /dev/null +++ b/services/bot-detector/bot_detector/config.py @@ -0,0 +1,153 @@ +"""Configuration centralisée du bot-detector. + +Toutes les variables d'environnement, constantes et imports optionnels. +Aucun effet de bord (pas de logging, pas de connexion) — pur configuration. +""" +import os +import re +import warnings + +warnings.filterwarnings('ignore') + + +# ─── Utilitaire de lecture d'env var ──────────────────────────────────────── + +def _require_float(name, default, lo=None, hi=None): + """Lit une variable d'environnement comme flottant et valide la plage si spécifiée. + + Lève SystemExit si la valeur est non numérique ou hors plage (lo, hi) exclusive. + """ + raw = os.getenv(name, str(default)) + try: + v = float(raw) + except ValueError: + raise SystemExit(f"[CONFIG] {name}={raw!r} invalide — doit être un nombre décimal.") + if lo is not None and not (lo < v < hi): + raise SystemExit(f"[CONFIG] {name}={v} hors plage ({lo} < valeur < {hi}).") + return v + + +# ─── Validation identifiants SQL ──────────────────────────────────────────── + +_SAFE_IDENTIFIER_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') + +# ─── ClickHouse ───────────────────────────────────────────────────────────── + +DB = os.getenv('CLICKHOUSE_DB_PROCESSING', os.getenv('CLICKHOUSE_DB', 'ja4_processing')) +DB_LOGS = os.getenv('CLICKHOUSE_DB_LOGS', 'ja4_logs') + +for _db_name, _db_val in [('CLICKHOUSE_DB_PROCESSING', DB), ('CLICKHOUSE_DB_LOGS', DB_LOGS)]: + if not _SAFE_IDENTIFIER_RE.match(_db_val): + raise SystemExit(f"[CONFIG] {_db_name}={_db_val!r} invalide — doit être un identifiant SQL valide.") + +# ─── Isolation Forest ─────────────────────────────────────────────────────── + +CONTAMINATION = _require_float('ISOLATION_CONTAMINATION', 0.001, 0, 0.5) +N_ESTIMATORS = int(os.getenv('N_ESTIMATORS', '300')) +ANOMALY_THRESHOLD = _require_float('ANOMALY_THRESHOLD', -0.05) +ANOMALY_PERCENTILE = int(os.getenv('ANOMALY_PERCENTILE', '5')) + +# ─── Modèles ─────────────────────────────────────────────────────────────── + +MODEL_DIR = os.getenv('MODEL_DIR', '/var/lib/bot_detector') +MODEL_HISTORY_COUNT = int(os.getenv('MODEL_HISTORY_COUNT', '10')) +RETRAIN_INTERVAL_H = int(os.getenv('RETRAIN_INTERVAL_HOURS', '24')) +DRIFT_THRESHOLD = _require_float('DRIFT_THRESHOLD', 0.30, 0, 1) +MIN_VALID_FEATURE_RATIO = _require_float('MIN_VALID_FEATURE_RATIO', 0.50, 0, 1) +MAX_FAILURES = int(os.getenv('MAX_CONSECUTIVE_FAILURES', '3')) + +# ─── Cycle & Logging ─────────────────────────────────────────────────────── + +CYCLE_INTERVAL = int(os.getenv('CYCLE_INTERVAL_SEC', '300')) +LOG_FILE = os.getenv('BOT_DETECTOR_LOG', '/var/log/bot_detector/decisions.jsonl') +LOG_BACKUP_COUNT = int(os.getenv('LOG_BACKUP_COUNT', '7')) +TRAINING_HISTORY_FILE = os.path.join(MODEL_DIR, 'training_history.jsonl') + +# ─── Health check ─────────────────────────────────────────────────────────── + +HEALTH_PORT = int(os.getenv('HEALTH_PORT', '8080')) + +# ─── Déduplication et récurrence ──────────────────────────────────────────── + +DEDUP_TTL_MIN = int(os.getenv('DEDUP_TTL_MIN', '60')) +RECURRENCE_WEIGHT = _require_float('RECURRENCE_WEIGHT', 0.005) + +# ─── Autoencoder (AE) — second scorer parallèle ──────────────────────────── + +AE_WEIGHT = _require_float('AE_WEIGHT', 0.30, 0, 1) +AE_EPOCHS = int(os.getenv('AE_EPOCHS', '50')) +AE_LATENT_DIM = int(os.getenv('AE_LATENT_DIM', '16')) +AE_LEARNING_RATE = float(os.getenv('AE_LEARNING_RATE', '1e-3')) + +# ─── XGBoost — troisième voix supervisée ──────────────────────────────────── + +XGB_WEIGHT = _require_float('XGB_WEIGHT', 0.20, 0, 1) +XGB_MIN_LABELS = int(os.getenv('XGB_MIN_LABELS', '100')) +XGB_RETRAIN_INTERVAL_H = int(os.getenv('XGB_RETRAIN_INTERVAL_HOURS', '168')) + +# ─── A9 — Classification multifactorielle des navigateurs ────────────────── + +BROWSER_CONFIDENCE_THRESHOLD = _require_float('BROWSER_CONFIDENCE_THRESHOLD', 0.55, 0, 1) +BROWSER_COHORT_RATIO = _require_float('BROWSER_COHORT_RATIO', 0.70, 0, 1) + +# ─── SHAP / Clustering / Multi-fenêtres / Feedback ───────────────────────── + +ENABLE_CLUSTERING = os.getenv('ENABLE_CLUSTERING', 'true').lower() == 'true' +CLUSTERING_MIN_SAMPLES = int(os.getenv('CLUSTERING_MIN_SAMPLES', '3')) +ENABLE_MULTIWINDOW = os.getenv('ENABLE_MULTIWINDOW', 'false').lower() == 'true' +MULTIWINDOW_VIEW = os.getenv('MULTIWINDOW_VIEW', 'view_ai_features_24h') +if not _SAFE_IDENTIFIER_RE.match(MULTIWINDOW_VIEW): + raise SystemExit(f"[CONFIG] MULTIWINDOW_VIEW={MULTIWINDOW_VIEW!r} invalide.") +ENABLE_FEEDBACK = os.getenv('ENABLE_FEEDBACK', 'true').lower() == 'true' +FEEDBACK_WINDOW_DAYS = int(os.getenv('FEEDBACK_WINDOW_DAYS', '7')) + +# ─── Features structurellement exclues par modèle ────────────────────────── + +STRUCTURAL_EXCLUDED_FEATURES: dict[str, list] = { + 'Complet': ['orphan_ratio'], + 'Applicatif': ['orphan_ratio', 'is_rare_ja4', 'tcp_shared_count', + 'request_size_variance', 'mss_mobile_mismatch', + 'ja3_diversity_ratio', 'syn_timing_cv', 'tls12_ratio', 'ip_df_variance', + 'avg_ttl', 'ttl_std', 'no_window_scale_ratio', + 'ja4_drift_ratio'], +} + +# ─── Imports optionnels (bibliothèques lourdes) ──────────────────────────── + +try: + from isotree import IsolationForest as ExtendedIsolationForest + EIF_AVAILABLE = True +except ImportError: + EIF_AVAILABLE = False + +from sklearn.ensemble import IsolationForest + +try: + import hdbscan as _hdbscan + HDBSCAN_AVAILABLE = True +except ImportError: + HDBSCAN_AVAILABLE = False + +from sklearn.cluster import DBSCAN +from sklearn.preprocessing import StandardScaler + +try: + import shap as _shap + SHAP_AVAILABLE = True +except ImportError: + SHAP_AVAILABLE = False + +ENABLE_SHAP = SHAP_AVAILABLE and os.getenv('ENABLE_SHAP', 'true').lower() == 'true' + +try: + import torch + import torch.nn as nn + TORCH_AVAILABLE = True +except ImportError: + TORCH_AVAILABLE = False + +try: + import xgboost as xgb + XGB_AVAILABLE = True +except ImportError: + XGB_AVAILABLE = False diff --git a/services/bot-detector/bot_detector/cycle.py b/services/bot-detector/bot_detector/cycle.py new file mode 100644 index 0000000..df7ac20 --- /dev/null +++ b/services/bot-detector/bot_detector/cycle.py @@ -0,0 +1,371 @@ +"""Cycle principal d'analyse : récupération, scoring et insertion des résultats. + +Orchestre un cycle complet : requête ClickHouse, preprocessing, scoring +(Complet + Applicatif), feedback SOC, déduplication et insertion des résultats. +""" +import pandas as pd +from datetime import datetime + +from .config import ( + DB, DB_LOGS, CYCLE_INTERVAL, DEDUP_TTL_MIN, + ENABLE_MULTIWINDOW, MULTIWINDOW_VIEW, + ENABLE_FEEDBACK, FEEDBACK_WINDOW_DAYS, MAX_FAILURES, + BROWSER_CONFIDENCE_THRESHOLD, BROWSER_COHORT_RATIO, +) +from .log import log_info, log_decision +from .infra import get_client, set_healthy +from .preprocessing import preprocess_df, FEATURES_BASE, FEATURES_COMPLET +from .pipeline import run_semi_supervised_logic + + +# ═══════════════════════════════════════════════════════════════════════════════ +# A5 — DÉDUPLICATION INTER-CYCLES AVEC TTL +# ═══════════════════════════════════════════════════════════════════════════════ + +# ═══════════════════════════════════════════════════════════════════════════════ +# FEEDBACK LOOP — Intégration des classifications SOC dans la baseline +# ═══════════════════════════════════════════════════════════════════════════════ + +def _load_soc_feedback(client) -> dict: + """Charge les classifications SOC récentes pour ajuster la baseline. + + Retourne un dict {src_ip: classification} où classification est + 'true_positive', 'false_positive', 'suspicious', etc. + Les faux positifs sont exclus du scoring (considérés comme humains), + les vrais positifs sont exclus de la baseline humaine. + """ + if not ENABLE_FEEDBACK: + return {} + try: + feedback_df = client.query_df( + f"SELECT entity_id AS src_ip, " + f" argMax(JSONExtractString(details, 'classification'), timestamp) AS classification " + f"FROM {DB}.audit_logs " + f"WHERE action = 'create_classification' " + f" AND entity_type = 'ip' " + f" AND timestamp >= now() - INTERVAL {FEEDBACK_WINDOW_DAYS} DAY " + f"GROUP BY entity_id" + ) + if feedback_df is None or feedback_df.empty: + return {} + result = dict(zip(feedback_df['src_ip'], feedback_df['classification'])) + log_info(f"[Feedback] {len(result)} classification(s) SOC chargées ({FEEDBACK_WINDOW_DAYS}j).") + return result + except Exception as e: + log_info(f"[Feedback] Impossible de charger les classifications SOC : {e}") + return {} + + +# ═══════════════════════════════════════════════════════════════════════════════ +# A5 — DÉDUPLICATION INTER-CYCLES AVEC TTL +# ═══════════════════════════════════════════════════════════════════════════════ +def _filter_recent_detections(client, all_anom: pd.DataFrame) -> pd.DataFrame: + """ + A5 : Filtre les IPs déjà insérées dans ml_detected_anomalies dans les DEDUP_TTL_MIN dernières minutes. + Exception : une IP est réinsérée si son nouveau score est ≥ 0.05 points plus bas (aggravation). + """ + if DEDUP_TTL_MIN <= 0 or all_anom.empty: + return all_anom + try: + recent_df = client.query_df( + f"SELECT src_ip, min(anomaly_score) AS best_score " + f"FROM {DB}.ml_detected_anomalies " + f"WHERE detected_at > now() - INTERVAL {DEDUP_TTL_MIN} MINUTE " + f"GROUP BY src_ip" + ) + if recent_df.empty: + return all_anom + recent_map = dict(zip(recent_df['src_ip'], recent_df['best_score'])) + def _should_insert(row): + """Détermine si une anomalie doit être réinsérée selon l'évolution du score.""" + prev = recent_map.get(row['src_ip']) + if prev is None: + return True + # Réinsérer seulement si le score brut s'est significativement aggravé + return float(row.get('raw_anomaly_score', row['anomaly_score'])) < float(prev) - 0.05 + mask = all_anom.apply(_should_insert, axis=1) + filtered = all_anom[mask] + skipped = len(all_anom) - len(filtered) + if skipped > 0: + log_info(f"[Dedup TTL={DEDUP_TTL_MIN}min] {skipped} IP(s) filtrée(s) (déjà détectées récemment).") + return filtered + except Exception as e: + log_info(f"[Dedup] Erreur lors de la déduplication TTL : {e}") + return all_anom + + +# ═══════════════════════════════════════════════════════════════════════════════ +# CYCLE PRINCIPAL +# ═══════════════════════════════════════════════════════════════════════════════ +_consecutive_failures = 0 + +def fetch_and_analyze(): + """Exécute un cycle complet de détection : requête ClickHouse, scoring et insertion des résultats. + + Récupère le trafic depuis la vue view_ai_features_1h (et optionnellement view_ai_features_24h), + applique run_semi_supervised_logic sur les deux modèles (Complet / Applicatif), + insère les scores dans ml_all_scores et les anomalies dans ml_detected_anomalies. + Met à jour l'état de santé et _consecutive_failures en cas d'échec de requête. + """ + global _consecutive_failures + cycle_id = datetime.now().strftime('%Y%m%d_%H%M%S') + log_info('') + log_info('=' * 70) + log_info(f' CYCLE {cycle_id}') + log_info('=' * 70) + + client = get_client() + + # ── Récupération du trafic (fenêtre 1h) ────────────────────────────────── + try: + df = client.query_df(f'SELECT * FROM {DB}.view_ai_features_1h') + except Exception as e: + log_info(f'ERREUR REQUETE: {e}') + _consecutive_failures += 1 + if _consecutive_failures >= MAX_FAILURES: + set_healthy(False) + log_decision('CONSECUTIVE_FAILURES', cycle_id, '', {'count': _consecutive_failures, 'error': str(e)}) + return + + _consecutive_failures = 0 + set_healthy(True) + + if df is None or df.empty: + log_info('[Données] Aucun trafic trouvé dans view_ai_features_1h.') + return + + log_info(f'[Données] {len(df)} sessions chargées depuis view_ai_features_1h ({len(df.columns)} colonnes).') + + # ── Enrichissement avec les features avancées de la thèse §5 ───────────── + try: + df_thesis = client.query_df(f'SELECT * FROM {DB}.view_thesis_features_1h') + if df_thesis is not None and not df_thesis.empty: + df_thesis.columns = [c.split('.')[-1] for c in df_thesis.columns] + df.columns = [c.split('.')[-1] for c in df.columns] + thesis_cols = [c for c in df_thesis.columns if c not in ('window_start', 'src_ip', 'ja4', 'host')] + df = df.merge( + df_thesis, on=['window_start', 'src_ip', 'ja4', 'host'], + how='left', suffixes=('', '_thesis') + ) + for col in thesis_cols: + if col in df.columns: + df[col] = df[col].fillna(df[col].median() if df[col].notna().any() else 0) + log_info(f'[Thèse §5] {len(df_thesis)} sessions enrichies avec {len(thesis_cols)} features avancées.') + else: + log_info('[Thèse §5] view_thesis_features_1h vide — features avancées indisponibles.') + except Exception as e: + log_info(f'[Thèse §5] view_thesis_features_1h inaccessible : {e} — features avancées ignorées.') + + df = preprocess_df(df) + + # ── Résumé des données chargées ─────────────────────────────────────────── + n_total = len(df) + n_correlated = int((df.get('correlated', pd.Series()) == 1).sum()) + n_uncorrelated = n_total - n_correlated + n_isp = int((df.get('asn_label', pd.Series()) == 'isp').sum()) + n_datacenter = int((df.get('asn_label', pd.Series()) == 'datacenter').sum()) + n_cdn = int((df.get('asn_label', pd.Series()) == 'cdn').sum()) + n_known_bot = int((df.get('bot_name', pd.Series()) != '').sum()) + n_anubis_allow = int((df.get('anubis_bot_action', pd.Series()) == 'ALLOW').sum()) + n_anubis_deny = int((df.get('anubis_bot_action', pd.Series()) == 'DENY').sum()) + n_anubis_weigh = int((df.get('anubis_bot_action', pd.Series()) == 'WEIGH').sum()) + n_unique_ips = int(df['src_ip'].nunique()) if 'src_ip' in df.columns else 0 + n_unique_ja4 = int(df['ja4'].nunique()) if 'ja4' in df.columns else 0 + + log_info(f'[Données] Après preprocessing : {n_total} sessions, {n_unique_ips} IP uniques, {n_unique_ja4} JA4 uniques.') + log_info(f'[Données] Corrélées (L3-L7) : {n_correlated:>6} | Non-corrélées (L7) : {n_uncorrelated:>6}') + log_info(f'[Données] ASN ISP : {n_isp:>6} | Datacenter : {n_datacenter:>6} | CDN : {n_cdn}') + log_info(f'[Données] Bots connus (dict) : {n_known_bot:>6} | Anubis ALLOW : {n_anubis_allow:>6}') + log_info(f'[Données] Anubis DENY : {n_anubis_deny:>6} | Anubis WEIGH : {n_anubis_weigh:>6}') + + # Distribution navigateurs : dict_browser_ja4 (connu) + inféré (structurel) + if 'inferred_browser_family' in df.columns: + ibf_counts = df['inferred_browser_family'].value_counts() + ibf_known = ibf_counts[ibf_counts.index != ''] + if not ibf_known.empty: + ibf_summary = ', '.join(f'{fam}={cnt}' for fam, cnt in ibf_known.head(7).items()) + n_dict = int((df.get('browser_family', pd.Series('')).fillna('').astype(str) != '').sum()) + n_inferred = int(ibf_known.sum()) - n_dict + log_info(f'[Données] Navigateurs : {ibf_known.sum():>6} sessions ({len(ibf_known)} familles : {ibf_summary})') + log_info(f'[Données] Dict JA4 connu : {n_dict:>6} | Inféré structurel : {max(0, n_inferred):>6}') + elif 'browser_family' in df.columns: + bf_counts = df['browser_family'].value_counts() + bf_known = bf_counts[bf_counts.index != ''] + if not bf_known.empty: + bf_summary = ', '.join(f'{fam}={cnt}' for fam, cnt in bf_known.head(5).items()) + log_info(f'[Données] Navigateurs JA4 : {bf_known.sum():>6} sessions ({len(bf_known)} familles : {bf_summary})') + # Distribution browser_confidence + if 'browser_confidence' in df.columns: + bc = df['browser_confidence'] + n_high = int((bc >= BROWSER_CONFIDENCE_THRESHOLD).sum()) + log_info(f'[Données] browser_confidence: mean={bc.mean():.3f}, ≥seuil({BROWSER_CONFIDENCE_THRESHOLD})={n_high}') + + log_decision('CYCLE_START', cycle_id, '', { + 'total_rows': n_total, + 'human_rows': n_isp, + 'known_bot_rows': n_known_bot, + 'correlated_rows': n_correlated, + 'anubis_allow_rows': n_anubis_allow, + 'anubis_deny_rows': n_anubis_deny, + 'anubis_weigh_rows': n_anubis_weigh, + 'multiwindow': ENABLE_MULTIWINDOW, + }) + + try: + rec_df = client.query_df(f'SELECT src_ip, recurrence FROM {DB}.view_ip_recurrence') + recurrence_map = dict(zip(rec_df['src_ip'], rec_df['recurrence'])) + except Exception: + recurrence_map = {} + + # ── Feedback SOC : ajuster la baseline selon les classifications humaines ─ + soc_feedback = _load_soc_feedback(client) + if soc_feedback: + fp_ips = {ip for ip, cls in soc_feedback.items() if cls in ('false_positive', 'legitimate')} + tp_ips = {ip for ip, cls in soc_feedback.items() if cls in ('true_positive', 'malicious', 'bot')} + if fp_ips: + # Les faux positifs confirmés rejoignent le pool humain + mask_fp = df['src_ip'].isin(fp_ips) & (df.get('asn_label', pd.Series(dtype=str)) != 'isp') + df.loc[mask_fp, 'asn_label'] = 'isp' + log_info(f"[Feedback] {mask_fp.sum()} lignes reclassées 'isp' (FP confirmés).") + if tp_ips: + # Les vrais positifs confirmés sont exclus de la baseline humaine + mask_tp = df['src_ip'].isin(tp_ips) & (df.get('asn_label', pd.Series(dtype=str)) == 'isp') + df.loc[mask_tp, 'asn_label'] = 'soc_confirmed_bot' + log_info(f"[Feedback] {mask_tp.sum()} lignes exclues de la baseline humaine (TP confirmés).") + log_decision('SOC_FEEDBACK', cycle_id, '', { + 'fp_ips': len(fp_ips), 'tp_ips': len(tp_ips), + 'total_classifications': len(soc_feedback), + }) + + # ── Features par modèle (voir DOCUMENTATION.md §4) ─────────────────────── + feats = FEATURES_BASE + feats_complet = FEATURES_COMPLET + + # ── Analyse fenêtre 1h ──────────────────────────────────────────────────── + df_corr = df[df['correlated'] == 1].copy() + df_uncorr = df[df['correlated'] == 0].copy() + log_info('') + log_info(f'── Modèle Complet (L3→L7, corrélé) : {len(df_corr)} sessions, {len(feats_complet)} features ──') + anom_a, scored_a = run_semi_supervised_logic(df_corr, feats_complet, 'Complet', cycle_id, recurrence_map) + log_info('') + log_info(f'── Modèle Applicatif (L7 seul, non-corrélé) : {len(df_uncorr)} sessions, {len(feats)} features ──') + anom_b, scored_b = run_semi_supervised_logic(df_uncorr, feats, 'Applicatif', cycle_id, recurrence_map) + all_anom = pd.concat([anom_a, anom_b], ignore_index=True) + all_scored = pd.concat([scored_a, scored_b], ignore_index=True) + + # ── A3 : Analyse fenêtre 24h (optionnelle) ──────────────────────────────── + if ENABLE_MULTIWINDOW: + try: + df_24h = client.query_df(f'SELECT * FROM {DB}.{MULTIWINDOW_VIEW}') + if df_24h is not None and not df_24h.empty: + df_24h = preprocess_df(df_24h) + log_info(f"[24h] {len(df_24h)} sessions dans la fenêtre 24h.") + anom_c, scored_c = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 1].copy(), feats_complet, 'Complet_24h', cycle_id, recurrence_map) + anom_d, scored_d = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 0].copy(), feats, 'Applicatif_24h', cycle_id, recurrence_map) + all_anom_24h = pd.concat([anom_c, anom_d], ignore_index=True) + all_scored_24h = pd.concat([scored_c, scored_d], ignore_index=True) + # Fusion : pour les IPs présentes dans les deux fenêtres, conserver le score le plus bas + if not all_anom_24h.empty: + all_anom = pd.concat([all_anom, all_anom_24h], ignore_index=True) + log_info(f"[24h] Fusion 1h+24h : {len(all_anom)} entrées avant déduplication.") + all_scored = pd.concat([all_scored, all_scored_24h], ignore_index=True) + else: + log_info(f"[24h] Vue {MULTIWINDOW_VIEW} vide — analyse mono-fenêtre.") + except Exception as e: + log_info(f"[24h] Vue {MULTIWINDOW_VIEW} inaccessible : {e} — analyse mono-fenêtre.") + + # ── Insertion de toutes les classifications dans ml_all_scores ─────────── + if not all_scored.empty: + try: + now = datetime.now().replace(microsecond=0) + all_scored['detected_at'] = now + all_scored['ja4'] = all_scored['ja4'].replace({'': 'HTTP_CLEAR_TEXT'}) + # Utiliser la famille inférée (multifactorielle) pour browser_family + if 'inferred_browser_family' in all_scored.columns: + all_scored['browser_family'] = all_scored['inferred_browser_family'] + all_scores_cols = [ + 'detected_at', 'window_start', 'src_ip', 'ja4', 'host', 'bot_name', + 'browser_family', + 'anubis_bot_name', 'anubis_bot_action', 'anubis_bot_category', + 'anomaly_score', 'raw_anomaly_score', 'threat_level', 'model_name', + 'correlated', 'asn_number', 'asn_org', 'country_code', 'asn_label', + 'hits', 'hit_velocity', 'fuzzing_index', 'post_ratio', 'campaign_id', + 'ae_recon_error', 'xgb_prob' + ] + scores_df = all_scored[[c for c in all_scores_cols if c in all_scored.columns]] + client.insert_df(f'{DB}.ml_all_scores', scores_df) + log_info(f'[ml_all_scores] {len(scores_df)} sessions scorées enregistrées.') + except Exception as e: + log_info(f'[ml_all_scores] ERREUR INSERTION: {e}') + + if not all_anom.empty: + all_anom = all_anom.sort_values('raw_anomaly_score', ascending=True).drop_duplicates(subset=['src_ip'], keep='first') + log_info(f'[Dédup] Intra-cycle : {len(all_anom)} IP uniques après déduplication.') + + # A5 — Déduplication inter-cycles avec TTL + all_anom = _filter_recent_detections(client, all_anom) + + if all_anom.empty: + log_info('[Dédup] Toutes les anomalies filtrées par TTL — rien à insérer.') + log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN}) + return + + all_anom['detected_at'] = datetime.now().replace(microsecond=0) + fake_nav_col = 'is_fake_navigation' + all_anom['is_headless'] = all_anom[fake_nav_col].astype(int) if fake_nav_col in all_anom.columns else 0 + + cols = [ + 'detected_at', 'src_ip', 'ja4', 'host', 'bot_name', 'browser_family', 'anomaly_score', + 'raw_anomaly_score', 'campaign_id', + 'threat_level', 'model_name', 'recurrence', + 'asn_number', 'asn_org', 'asn_detail', 'asn_domain', 'country_code', 'asn_label', + 'hits', 'hit_velocity', 'fuzzing_index', 'post_ratio', 'port_exhaustion_ratio', 'max_keepalives', 'orphan_ratio', + 'tcp_jitter_variance', 'tcp_shared_count', 'true_window_size', 'window_mss_ratio', + 'alpn_http_mismatch', 'is_alpn_missing', 'sni_host_mismatch', + 'header_count', 'has_accept_language', 'has_cookie', 'has_referer', + 'modern_browser_score', 'is_headless', 'ua_ch_mismatch', + 'header_order_shared_count', 'ip_id_zero_ratio', 'request_size_variance', + 'multiplexing_efficiency', 'mss_mobile_mismatch', + 'correlated', 'reason', 'asset_ratio', 'direct_access_ratio', 'is_ua_rotating', + 'distinct_ja4_count', 'src_port_density', 'ja4_asn_concentration', + 'ja4_country_concentration', 'is_rare_ja4', + 'header_order_confidence', 'distinct_header_orders', 'temporal_entropy', + 'path_diversity_ratio', 'url_depth_variance', 'anomalous_payload_ratio', + 'anubis_bot_name', 'anubis_bot_action', 'anubis_bot_category', + ] + + try: + final_df = all_anom[[c for c in cols if c in all_anom.columns]] + client.insert_df(f'{DB}.ml_detected_anomalies', final_df) + n_critical = int((final_df.get('threat_level', pd.Series()) == 'CRITICAL').sum()) + n_high = int((final_df.get('threat_level', pd.Series()) == 'HIGH').sum()) + n_medium = int((final_df.get('threat_level', pd.Series()) == 'MEDIUM').sum()) + n_known = int((final_df.get('bot_name', pd.Series()) != '').sum()) + log_info('') + log_info(f'╔══ RÉSULTAT CYCLE {cycle_id} ══════════════════════════════════') + log_info(f'║ {len(final_df)} menaces insérées dans ml_detected_anomalies') + log_info(f'║ CRITICAL={n_critical} HIGH={n_high} MEDIUM={n_medium} KNOWN_BOT={n_known}') + if not all_scored.empty: + log_info(f'║ {len(all_scored)} sessions scorées dans ml_all_scores') + log_info(f'╚═══════════════════════════════════════════════════════════') + log_decision('CYCLE_END', cycle_id, '', { + 'inserted': len(final_df), + 'anomalies': int((final_df.get('bot_name', pd.Series()) == '').sum()), + 'known_bots': n_known, + 'critical': n_critical, + 'high': n_high, + 'dedup_ttl_min': DEDUP_TTL_MIN, + }) + except Exception as e: + log_info(f'ERREUR INSERTION: {e}') + else: + log_info('') + log_info(f'╔══ RÉSULTAT CYCLE {cycle_id} ══════════════════════════════════') + log_info(f'║ Aucune menace détectée ce cycle.') + if not all_scored.empty: + log_info(f'║ {len(all_scored)} sessions scorées dans ml_all_scores') + tl_dist = all_scored['threat_level'].value_counts().to_dict() if not all_scored.empty and 'threat_level' in all_scored.columns else {} + if tl_dist: + log_info(f'║ Distribution : {", ".join(f"{k}={v}" for k, v in sorted(tl_dist.items()))}') + log_info(f'╚═══════════════════════════════════════════════════════════') + log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN}) diff --git a/services/bot-detector/bot_detector/infra.py b/services/bot-detector/bot_detector/infra.py new file mode 100644 index 0000000..413ddad --- /dev/null +++ b/services/bot-detector/bot_detector/infra.py @@ -0,0 +1,89 @@ +"""Infrastructure : client ClickHouse, health check HTTP, arrêt propre. + +Exécute le serveur de santé en thread daemon dès l'import. +""" +import signal +import sys +import threading +from http.server import HTTPServer, BaseHTTPRequestHandler + +from ja4_common.clickhouse import get_client as _ja4_get_client + +from .config import HEALTH_PORT +from .log import log_info, log_decision + +# ─── Arrêt propre ─────────────────────────────────────────────────────────── + + +def _shutdown(sig, frame): + """Gestionnaire de signal SIGTERM/SIGINT : journalise l'arrêt et quitte proprement.""" + log_info(f"Signal {sig} reçu — arrêt propre.") + log_decision('SERVICE_STOP', 'shutdown', '', {'signal': sig}) + sys.exit(0) + + +signal.signal(signal.SIGTERM, _shutdown) +signal.signal(signal.SIGINT, _shutdown) + +# ─── Health check ─────────────────────────────────────────────────────────── + +_service_healthy = True +_health_lock = threading.Lock() + + +def set_healthy(healthy: bool): + """Met à jour l'état de santé du service (thread-safe).""" + global _service_healthy + with _health_lock: + _service_healthy = healthy + + +def is_healthy() -> bool: + """Retourne l'état de santé courant.""" + with _health_lock: + return _service_healthy + + +class _HealthHandler(BaseHTTPRequestHandler): + """Gestionnaire HTTP minimal pour le point de santé du service.""" + + def do_GET(self): + """Répond à la requête GET : renvoie 200 OK ou 503 DEGRADED selon l'état du service.""" + healthy = is_healthy() + code = 200 if healthy else 503 + self.send_response(code) + self.end_headers() + self.wfile.write(b'OK' if healthy else b'DEGRADED') + + def log_message(self, *args): + """Supprime les logs HTTP internes pour ne pas polluer la sortie standard.""" + pass + + +threading.Thread( + target=lambda: HTTPServer(('', HEALTH_PORT), _HealthHandler).serve_forever(), + daemon=True +).start() + +# ─── Client ClickHouse ────────────────────────────────────────────────────── + + +def get_client(): + """Return the shared ja4_common ClickHouse client, reconnecting on ping failure.""" + return _ja4_get_client().connect() + + +def score_to_threat_level(score: float) -> str: + """Convertit un score d'anomalie brut IsolationForest en niveau de menace textuel. + + Seuils : CRITICAL < −0.30 | HIGH < −0.15 | MEDIUM < −0.05 | LOW < 0 | NORMAL ≥ 0. + """ + if score < -0.30: + return 'CRITICAL' + if score < -0.15: + return 'HIGH' + if score < -0.05: + return 'MEDIUM' + if score < 0: + return 'LOW' + return 'NORMAL' diff --git a/services/bot-detector/bot_detector/log.py b/services/bot-detector/bot_detector/log.py new file mode 100644 index 0000000..b9dc434 --- /dev/null +++ b/services/bot-detector/bot_detector/log.py @@ -0,0 +1,65 @@ +"""Logging et journalisation des décisions IA. + +Fournit log_info() et log_decision() utilisés par tous les modules. +""" +import os +import json +import logging +from datetime import datetime +from logging.handlers import RotatingFileHandler + +from .config import ( + LOG_FILE, LOG_BACKUP_COUNT, TRAINING_HISTORY_FILE, + CONTAMINATION, ANOMALY_THRESHOLD, MODEL_DIR, +) + +# ─── Initialisation des répertoires ───────────────────────────────────────── + +os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) +os.makedirs(MODEL_DIR, exist_ok=True) + +# ─── Logger principal ─────────────────────────────────────────────────────── + +logger = logging.getLogger('bot_detector') +logger.setLevel(logging.DEBUG) + +_console_handler = logging.StreamHandler() +_console_handler.setFormatter(logging.Formatter('[%(asctime)s] %(message)s', '%Y-%m-%d %H:%M:%S')) +logger.addHandler(_console_handler) + +_file_handler = RotatingFileHandler( + LOG_FILE, maxBytes=50 * 1024 * 1024, backupCount=LOG_BACKUP_COUNT, encoding='utf-8' +) +_file_handler.setFormatter(logging.Formatter('%(message)s')) +logger.addHandler(_file_handler) + + +def log_info(message: str): + """Enregistre un message de niveau INFO dans le logger du service.""" + logger.info(message) + + +def log_decision(event: str, cycle_id: str, model: str = '', row: dict = None): + """Enregistre un événement de décision IA au format JSONL dans le fichier de log rotatif. + + Chaque ligne contient l'horodatage, le cycle_id, l'événement, le modèle, + la contamination, le seuil et les données supplémentaires de ``row``. + """ + entry = { + 'ts': datetime.now().strftime('%Y-%m-%dT%H:%M:%S'), + 'cycle_id': cycle_id, + 'event': event, + 'model': model, + 'contamination': CONTAMINATION, + 'threshold': ANOMALY_THRESHOLD, + } + if row: + entry.update(row) + _file_handler.stream.write(json.dumps(entry, ensure_ascii=False, default=str) + '\n') + _file_handler.stream.flush() + + +def append_training_history(entry: dict): + """Ajoute une entrée de métadonnées d'entraînement au fichier d'historique JSONL.""" + with open(TRAINING_HISTORY_FILE, 'a', encoding='utf-8') as f: + f.write(json.dumps(entry, ensure_ascii=False, default=str) + '\n') diff --git a/services/bot-detector/bot_detector/models.py b/services/bot-detector/bot_detector/models.py new file mode 100644 index 0000000..b7f46c9 --- /dev/null +++ b/services/bot-detector/bot_detector/models.py @@ -0,0 +1,478 @@ +"""Gestion des modèles : chargement, entraînement, versionnement. + +IsolationForest (EIF), Autoencoder (PyTorch), XGBoost supervisé. +""" +import os +import json +import glob +import joblib +import numpy as np +import pandas as pd +from datetime import datetime + +from .config import ( + MODEL_DIR, MODEL_HISTORY_COUNT, RETRAIN_INTERVAL_H, DRIFT_THRESHOLD, + N_ESTIMATORS, CONTAMINATION, ANOMALY_THRESHOLD, AE_WEIGHT, AE_EPOCHS, AE_LATENT_DIM, + AE_LEARNING_RATE, XGB_WEIGHT, XGB_MIN_LABELS, XGB_RETRAIN_INTERVAL_H, + EIF_AVAILABLE, TORCH_AVAILABLE, XGB_AVAILABLE, DB, + IsolationForest, StandardScaler, +) +from .log import log_info, log_decision, append_training_history +from .scoring import compute_drift_score + +# Imports conditionnels depuis config (déjà importés une seule fois) +if EIF_AVAILABLE: + from .config import ExtendedIsolationForest + +if TORCH_AVAILABLE: + from .config import torch, nn + +if XGB_AVAILABLE: + import xgboost as xgb + + +# ─── Caches de modèles ───────────────────────────────────────────────────── +_model_cache: dict = {} +_xgb_cache: dict = {} + + +# ═══════════════════════════════════════════════════════════════════════════════ +# GESTION DES MODÈLES +# ═══════════════════════════════════════════════════════════════════════════════ +def _current_pointer_path(name: str) -> str: + """Retourne le chemin du fichier pointeur vers la version courante du modèle ``name``.""" + return os.path.join(MODEL_DIR, f'model_{name}.current') + +def _get_current_version(name: str): + """Lit le fichier pointeur et retourne (chemin_modèle, métadonnées) ou (None, None) si absent.""" + pointer = _current_pointer_path(name) + if not os.path.exists(pointer): return None, None + with open(pointer) as f: version_id = f.read().strip() + model_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.joblib') + meta_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.meta.json') + if not os.path.exists(model_path) or not os.path.exists(meta_path): return None, None + with open(meta_path) as f: meta = json.load(f) + return model_path, meta + +def _purge_old_versions(name: str): + """Supprime les versions excédentaires du modèle ``name`` en ne conservant que MODEL_HISTORY_COUNT fichiers.""" + pattern = os.path.join(MODEL_DIR, f'model_{name}_*.joblib') + versions = sorted(glob.glob(pattern)) + to_delete = versions[:-MODEL_HISTORY_COUNT] if len(versions) > MODEL_HISTORY_COUNT else [] + for joblib_path in to_delete: + version_id = os.path.basename(joblib_path).replace(f'model_{name}_', '').replace('.joblib', '') + meta_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.meta.json') + os.remove(joblib_path) + if os.path.exists(meta_path): os.remove(meta_path) + log_info(f"[{name}] Version purgée : {version_id} (limite={MODEL_HISTORY_COUNT})") + +# ═══════════════════════════════════════════════════════════════════════════════ +# AUTOENCODER — Second scorer parallèle (détection d'anomalies par reconstruction) +# ═══════════════════════════════════════════════════════════════════════════════ +class TrafficAutoEncoder: + """Autoencoder symétrique pour détection d'anomalies par erreur de reconstruction. + + Architecture : encoder (n→64→32→latent_dim) — decoder (latent_dim→32→64→n) + Activation : ReLU + BatchNorm (encoder/decoder), sigmoid (sortie — données normalisées [0,1]) + Score = MSE(input, reconstruction) par échantillon. + L'espace latent (16-dim par défaut) peut servir de features compressées pour HDBSCAN. + """ + + def __init__(self, n_features: int, latent_dim: int = AE_LATENT_DIM): + if not TORCH_AVAILABLE: + raise RuntimeError("PyTorch non disponible — autoencoder désactivé.") + self.n_features = n_features + self.latent_dim = latent_dim + self.device = torch.device('cpu') + self._build_model() + self._scaler_min = None + self._scaler_range = None + + def _build_model(self): + dim1 = min(64, max(self.n_features, self.latent_dim + 4)) + dim2 = min(32, max(dim1 // 2, self.latent_dim + 2)) + self.encoder = nn.Sequential( + nn.Linear(self.n_features, dim1), nn.BatchNorm1d(dim1), nn.ReLU(), + nn.Linear(dim1, dim2), nn.BatchNorm1d(dim2), nn.ReLU(), + nn.Linear(dim2, self.latent_dim), + ).to(self.device) + self.decoder = nn.Sequential( + nn.Linear(self.latent_dim, dim2), nn.BatchNorm1d(dim2), nn.ReLU(), + nn.Linear(dim2, dim1), nn.BatchNorm1d(dim1), nn.ReLU(), + nn.Linear(dim1, self.n_features), nn.Sigmoid(), + ).to(self.device) + self._all_params = list(self.encoder.parameters()) + list(self.decoder.parameters()) + + def _to_tensor(self, X: np.ndarray) -> 'torch.Tensor': + """Normalise [0,1] via min-max puis convertit en Tensor.""" + if self._scaler_min is not None: + X_norm = (X - self._scaler_min) / (self._scaler_range + 1e-9) + else: + X_norm = X + return torch.tensor(np.clip(X_norm, 0, 1), dtype=torch.float32, device=self.device) + + def fit(self, X: np.ndarray, epochs: int = AE_EPOCHS, lr: float = AE_LEARNING_RATE, + batch_size: int = 256) -> dict: + """Entraîne l'autoencoder sur la baseline humaine (données normales).""" + self._scaler_min = X.min(axis=0) + self._scaler_range = X.max(axis=0) - self._scaler_min + X_t = self._to_tensor(X) + + dataset = torch.utils.data.TensorDataset(X_t) + loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True) + optimizer = torch.optim.Adam(self._all_params, lr=lr, weight_decay=1e-5) + criterion = nn.MSELoss() + + self.encoder.train() + self.decoder.train() + losses = [] + for epoch in range(epochs): + epoch_loss = 0.0 + for (batch,) in loader: + latent = self.encoder(batch) + recon = self.decoder(latent) + loss = criterion(recon, batch) + optimizer.zero_grad() + loss.backward() + optimizer.step() + epoch_loss += loss.item() * len(batch) + losses.append(epoch_loss / len(X_t)) + return {'final_loss': losses[-1], 'epochs': epochs, 'n_samples': len(X)} + + def score_samples(self, X: np.ndarray) -> np.ndarray: + """Retourne l'erreur de reconstruction MSE par échantillon (plus élevé = plus anomal).""" + self.encoder.eval() + self.decoder.eval() + X_t = self._to_tensor(X) + with torch.no_grad(): + latent = self.encoder(X_t) + recon = self.decoder(latent) + mse = ((recon - X_t) ** 2).mean(dim=1).numpy() + return mse + + def encode(self, X: np.ndarray) -> np.ndarray: + """Retourne l'espace latent (pour HDBSCAN clustering).""" + self.encoder.eval() + X_t = self._to_tensor(X) + with torch.no_grad(): + return self.encoder(X_t).numpy() + + def state_dict(self) -> dict: + return { + 'encoder': self.encoder.state_dict(), + 'decoder': self.decoder.state_dict(), + 'scaler_min': self._scaler_min, + 'scaler_range': self._scaler_range, + 'n_features': self.n_features, + 'latent_dim': self.latent_dim, + } + + @classmethod + def load_state_dict(cls, state: dict) -> 'TrafficAutoEncoder': + ae = cls(state['n_features'], state['latent_dim']) + ae._scaler_min = state['scaler_min'] + ae._scaler_range = state['scaler_range'] + ae.encoder.load_state_dict(state['encoder']) + ae.decoder.load_state_dict(state['decoder']) + return ae + + +def _ae_model_path(name: str, version_id: str) -> str: + return os.path.join(MODEL_DIR, f'ae_{name}_{version_id}.pt') + + +# ═══════════════════════════════════════════════════════════════════════════════ +# XGBOOST — Troisième voix supervisée (labels historiques + feedback SOC) +# ═══════════════════════════════════════════════════════════════════════════════ +def _xgb_model_path(name: str) -> str: + return os.path.join(MODEL_DIR, f'xgb_{name}.json') + + +def _xgb_meta_path(name: str) -> str: + return os.path.join(MODEL_DIR, f'xgb_{name}.meta.json') + + +def _load_xgb_labels(client, features: list, min_labels: int = XGB_MIN_LABELS) -> tuple: + """Charge les labels historiques depuis ml_all_scores + view_ai_features_1h. + + Les labels (threat_level) viennent de ml_all_scores, les features de + view_ai_features_1h via une jointure sur (src_ip, ja4, host). + Les features absentes de la vue (ex: thesis §5 features) sont ignorées. + + Positifs : threat_level IN ('HIGH', 'CRITICAL', 'ANUBIS_DENY', 'KNOWN_BOT') → label=1 + Négatifs : threat_level IN ('NORMAL', 'LEGITIMATE_BROWSER') → label=0 + Retourne (X, y, usable_features) ou (None, None, None) si insuffisant. + """ + try: + # Découvrir les colonnes disponibles dans la vue + cols_result = client.query( + f"SELECT name FROM system.columns " + f"WHERE database = '{DB}' AND table = 'view_ai_features_1h'" + ) + available_cols = {row[0] for row in cols_result.result_rows} if cols_result.result_rows else set() + usable_features = [f for f in features if f in available_cols] + if len(usable_features) < 10: + log_info(f"[XGB] Seulement {len(usable_features)} features disponibles dans view_ai_features_1h — insuffisant.") + return None, None, None + + feature_cols = ', '.join(f'f.{c}' for c in usable_features) + result = client.query( + f"SELECT {feature_cols}, s.threat_level " + f"FROM {DB}.ml_all_scores AS s " + f"INNER JOIN {DB}.view_ai_features_1h AS f " + f" ON s.src_ip = f.src_ip AND s.ja4 = f.ja4 AND s.host = f.host " + f"WHERE s.threat_level IN ('NORMAL', 'LEGITIMATE_BROWSER', 'HIGH', 'CRITICAL', 'ANUBIS_DENY', 'KNOWN_BOT') " + f"AND s.window_start >= now() - INTERVAL 7 DAY " + f"ORDER BY rand() LIMIT 50000" + ) + if not result.result_rows: + return None, None, None + cols = usable_features + ['threat_level'] + df = pd.DataFrame(result.result_rows, columns=cols) + df[usable_features] = df[usable_features].apply(pd.to_numeric, errors='coerce') + df = df.replace([np.inf, -np.inf], np.nan).dropna(subset=usable_features) + y = (~df['threat_level'].isin(['NORMAL', 'LEGITIMATE_BROWSER'])).astype(int) + if y.sum() < 10 or len(y) < min_labels: + return None, None, None + X = df[usable_features].values + return X, y.values, usable_features + except Exception as exc: + log_info(f"[XGB] Erreur chargement labels : {exc}") + return None, None, None + + +def load_or_train_xgb(name: str, client, features: list, cycle_id: str): + """Charge ou entraîne le modèle XGBoost supervisé. + + Retourne (XGBClassifier, list[str] features) ou (None, None) si indisponible. + """ + if not XGB_AVAILABLE or XGB_WEIGHT <= 0: + return None, None + + model_path = _xgb_model_path(name) + meta_path = _xgb_meta_path(name) + + # Charger le modèle existant si récent + if os.path.exists(model_path) and os.path.exists(meta_path): + try: + with open(meta_path) as f: + meta = json.load(f) + trained_at = datetime.fromisoformat(meta['trained_at']) + age_h = (datetime.now() - trained_at).total_seconds() / 3600 + if age_h < XGB_RETRAIN_INTERVAL_H: + model = xgb.XGBClassifier() + model.load_model(model_path) + log_info(f"[XGB][{name}] Modèle rechargé ({age_h:.1f}h / {XGB_RETRAIN_INTERVAL_H}h, {meta.get('n_labels', '?')} labels).") + return model, meta.get('features', features) + except Exception as exc: + log_info(f"[XGB][{name}] Erreur chargement : {exc}") + + # Entraîner un nouveau modèle + X, y, xgb_features = _load_xgb_labels(client, features) + if X is None: + log_info(f"[XGB][{name}] Labels insuffisants (< {XGB_MIN_LABELS}) — XGBoost désactivé ce cycle.") + # Tenter de réutiliser un modèle ancien + if os.path.exists(model_path) and os.path.exists(meta_path): + try: + model = xgb.XGBClassifier() + model.load_model(model_path) + with open(meta_path) as f: + meta = json.load(f) + return model, meta.get('features', features) + except Exception: + pass + return None, None + + scale_pos = max(1, int((y == 0).sum() / max((y == 1).sum(), 1))) + model = xgb.XGBClassifier( + n_estimators=200, max_depth=6, learning_rate=0.1, + scale_pos_weight=scale_pos, eval_metric='logloss', + random_state=42, n_jobs=-1, + tree_method='hist', + ) + model.fit(X, y, verbose=False) + + model.save_model(model_path) + meta = { + 'trained_at': datetime.now().isoformat(), + 'n_labels': len(y), 'n_positive': int(y.sum()), + 'n_negative': int((y == 0).sum()), 'n_features': len(xgb_features), + 'features': xgb_features, + 'scale_pos_weight': scale_pos, 'model_name': name, + } + with open(meta_path, 'w') as f: + json.dump(meta, f, indent=2) + + log_info(f"[XGB][{name}] Modèle entraîné : {len(y)} labels ({y.sum()} positifs), scale_pos_weight={scale_pos}") + log_decision('XGB_TRAINED', cycle_id, name, meta) + return model, xgb_features + + +def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, cycle_id: str): + """Charge le modèle IsolationForest existant ou en entraîne un nouveau si nécessaire. + + Réutilise le modèle si son âge est inférieur à RETRAIN_INTERVAL_H et si aucune + dérive conceptuelle significative n'est détectée (A1). En cas d'expiration ou de + dérive, entraîne un nouveau modèle sur ``human_baseline``, le sérialise sur disque, + met à jour le fichier pointeur et purge les anciennes versions. + + Retourne (IsolationForest, TrafficAutoEncoder|None, list[str] features). + """ + model_path, meta = _get_current_version(name) + if model_path and meta: + trained_at = datetime.fromisoformat(meta['trained_at']) + age_h = (datetime.now() - trained_at).total_seconds() / 3600 + age_ok = age_h < RETRAIN_INTERVAL_H + + # A1 — Dérive conceptuelle : comparer la distribution actuelle avec celle de l'entraînement + drift_score = 0.0 + drift_forced = False + if age_ok and 'baseline_stats' in meta: + drift_score = compute_drift_score(meta['baseline_stats'], human_baseline, features) + if drift_score >= DRIFT_THRESHOLD: + drift_forced = True + log_info(f"[{name}] Dérive détectée ({drift_score:.0%} features) — retraining forcé.") + log_decision('DRIFT_DETECTED', cycle_id, name, { + 'version_id': meta['version_id'], 'drift_score': round(drift_score, 3), + 'drift_threshold': DRIFT_THRESHOLD, 'model_age_hours': round(age_h, 2) + }) + + if age_ok and not drift_forced: + log_info(f"[{name}] Modèle v{meta['version_id']} valide ({age_h:.1f}h / {RETRAIN_INTERVAL_H}h, drift={drift_score:.0%}) — réutilisation.") + log_decision('MODEL_LOADED', cycle_id, name, { + 'version_id': meta['version_id'], 'model_age_hours': round(age_h, 2), + 'trained_at': meta['trained_at'], 'human_samples': meta.get('human_samples', '?'), + 'retrain_in_hours': round(RETRAIN_INTERVAL_H - age_h, 1), 'drift_score': round(drift_score, 3) + }) + ae_loaded = None + if TORCH_AVAILABLE and AE_WEIGHT > 0: + ae_path = _ae_model_path(name, meta['version_id']) + if os.path.exists(ae_path): + try: + ae_loaded = TrafficAutoEncoder.load_state_dict(torch.load(ae_path, weights_only=False)) + log_info(f"[{name}] Autoencoder v{meta['version_id']} rechargé.") + except Exception as exc: + log_info(f"[{name}] Erreur chargement AE : {exc} — AE désactivé ce cycle.") + return joblib.load(model_path), ae_loaded, meta.get('features', features) + elif not drift_forced: + log_info(f"[{name}] Modèle v{meta['version_id']} expiré ({age_h:.1f}h ≥ {RETRAIN_INTERVAL_H}h) — retraining.") + + version_id = datetime.now().strftime('%Y%m%d_%H%M%S') + log_info(f"[{name}] Entraînement EIF v{version_id} — {len(human_baseline)} sessions ISP, {len(features)} features, contamination={CONTAMINATION}") + + X = human_baseline[features].replace([np.inf, -np.inf], np.nan) + X = X.fillna(X.median()) + + # Feature pruning : retirer les features à variance quasi-nulle (inutiles pour les arbres) + PRUNE_VARIANCE_THRESHOLD = float(os.getenv('PRUNE_VARIANCE_THRESHOLD', '1e-6')) + feature_variances = X.var() + low_var_features = feature_variances[feature_variances < PRUNE_VARIANCE_THRESHOLD].index.tolist() + if low_var_features: + log_info(f"[{name}] Élagage : {len(low_var_features)} feature(s) à variance < {PRUNE_VARIANCE_THRESHOLD} retirées : {low_var_features}") + X = X.drop(columns=low_var_features) + features = [f for f in features if f not in low_var_features] + log_decision('FEATURE_PRUNED', cycle_id, name, {'pruned': low_var_features, 'remaining': len(features)}) + + # Validation split : réserver 20% pour évaluation offline + val_size = max(1, int(len(X) * 0.2)) + X_train = X.iloc[:-val_size] + X_val = X.iloc[-val_size:] + + if EIF_AVAILABLE: + model = ExtendedIsolationForest( + ntrees=300, ndim=min(3, len(features)), + sample_size='auto', missing_action='impute', + random_seed=42, nthreads=-1 + ) + else: + model = IsolationForest(n_estimators=300, contamination=CONTAMINATION, random_state=42, n_jobs=-1) + model.fit(X_train) + + # Évaluation offline : score moyen sur la validation (devrait être > 0 pour du trafic humain sklearn) + val_scores = model.decision_function(X_val) + # Unifier la convention : négatif = anomal (isotree: 0.5 - score) + if EIF_AVAILABLE: + val_scores = 0.5 - val_scores + val_mean_score = float(np.mean(val_scores)) + val_anomaly_rate = float(np.mean(val_scores < 0)) + log_info(f"[{name}] Validation : score moyen={val_mean_score:.4f}, taux anomalie={val_anomaly_rate:.2%} ({len(X_val)} échantillons)") + + # GATE CONDITION : rejeter le modèle si la baseline semble contaminée + VAL_ANOMALY_GATE = float(os.getenv('VAL_ANOMALY_GATE', '0.20')) + if val_anomaly_rate > VAL_ANOMALY_GATE: + log_info(f"[{name}] ⚠ REJET : val_anomaly_rate={val_anomaly_rate:.2%} > gate={VAL_ANOMALY_GATE:.0%} — baseline probablement contaminée.") + log_decision('MODEL_REJECTED', cycle_id, name, { + 'val_anomaly_rate': round(val_anomaly_rate, 4), 'gate': VAL_ANOMALY_GATE, + 'val_mean_score': round(val_mean_score, 4), 'version_id': version_id, + }) + # Tenter de réutiliser le modèle précédent + if model_path and os.path.exists(model_path): + log_info(f"[{name}] Conservation du modèle précédent v{meta.get('version_id', '?')}.") + ae_prev = None + if TORCH_AVAILABLE and AE_WEIGHT > 0: + ae_prev_path = _ae_model_path(name, meta.get('version_id', '')) + if os.path.exists(ae_prev_path): + try: + ae_prev = TrafficAutoEncoder.load_state_dict(torch.load(ae_prev_path, weights_only=False)) + except Exception: + pass + return joblib.load(model_path), ae_prev, meta.get('features', features) + log_info(f"[{name}] Aucun modèle précédent — utilisation du modèle rejeté par défaut.") + + # A1 — Sauvegarder les statistiques de distribution avec quantile digest pour drift detection + baseline_stats = { + f: { + 'mean': float(X[f].mean()), 'std': float(X[f].std()), + 'p10': float(X[f].quantile(0.10)), 'p25': float(X[f].quantile(0.25)), + 'p50': float(X[f].quantile(0.50)), 'p75': float(X[f].quantile(0.75)), + 'p90': float(X[f].quantile(0.90)), + } + for f in features + } + + new_model_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.joblib') + new_meta_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.meta.json') + joblib.dump(model, new_model_path) + + # Entraînement de l'Autoencoder en parallèle (si PyTorch disponible et AE_WEIGHT > 0) + ae_model = None + if TORCH_AVAILABLE and AE_WEIGHT > 0: + try: + ae_model = TrafficAutoEncoder(n_features=len(features)) + ae_stats = ae_model.fit(X_train.values) + ae_path = _ae_model_path(name, version_id) + torch.save(ae_model.state_dict(), ae_path) + log_info(f"[{name}] Autoencoder entraîné : loss={ae_stats['final_loss']:.6f}, epochs={ae_stats['epochs']}") + except Exception as exc: + log_info(f"[{name}] Autoencoder training échoué : {exc} — AE désactivé.") + ae_model = None + + previous_version = meta.get('version_id', None) if meta else None + new_meta = { + 'version_id': version_id, 'trained_at': datetime.now().isoformat(), + 'human_samples': len(human_baseline), 'contamination': CONTAMINATION, + 'threshold': ANOMALY_THRESHOLD, 'features': features, + 'model_name': name, 'previous_version': previous_version, + 'retrain_interval': RETRAIN_INTERVAL_H, 'baseline_stats': baseline_stats, + 'algorithm': 'ExtendedIsolationForest' if EIF_AVAILABLE else 'IsolationForest', + 'autoencoder': ae_model is not None, + 'ae_weight': AE_WEIGHT if ae_model else 0.0, + 'validation': { + 'val_size': len(X_val), 'train_size': len(X_train), + 'val_mean_score': round(val_mean_score, 4), + 'val_anomaly_rate': round(val_anomaly_rate, 4), + } + } + with open(new_meta_path, 'w') as f: json.dump(new_meta, f, indent=2) + with open(_current_pointer_path(name), 'w') as f: f.write(version_id) + + append_training_history({k: v for k, v in new_meta.items() if k != 'baseline_stats'}) + _purge_old_versions(name) + + log_info(f"[{name}] Modèle v{version_id} sauvegardé → {new_model_path} (AE={'oui' if ae_model is not None else 'non'})") + log_decision('MODEL_TRAINED', cycle_id, name, { + 'version_id': version_id, 'previous_version': previous_version, + 'human_samples': len(human_baseline), 'next_retrain_in_h': RETRAIN_INTERVAL_H, + 'history_kept': MODEL_HISTORY_COUNT + }) + return model, ae_model, features diff --git a/services/bot-detector/bot_detector/pipeline.py b/services/bot-detector/bot_detector/pipeline.py new file mode 100644 index 0000000..d62f2a8 --- /dev/null +++ b/services/bot-detector/bot_detector/pipeline.py @@ -0,0 +1,353 @@ +"""Pipeline de détection semi-supervisée. + +Fonction principale run_semi_supervised_logic() : triage, scoring EIF, AE, XGB, +classification navigateur, détection d'anomalies, SHAP et clustering. +""" +import numpy as np +import pandas as pd + +from .config import ( + DB, CONTAMINATION, AE_WEIGHT, XGB_WEIGHT, RECURRENCE_WEIGHT, + ANOMALY_THRESHOLD, ANOMALY_PERCENTILE, ENABLE_CLUSTERING, + ENABLE_SHAP, EIF_AVAILABLE, TORCH_AVAILABLE, XGB_AVAILABLE, + BROWSER_CONFIDENCE_THRESHOLD, BROWSER_COHORT_RATIO, + MIN_VALID_FEATURE_RATIO, STRUCTURAL_EXCLUDED_FEATURES, +) +from .log import log_info, log_decision +from .infra import score_to_threat_level, get_client +from .models import load_or_train_model, load_or_train_xgb, TrafficAutoEncoder +from .scoring import ( + validate_features, compute_adaptive_threshold, normalize_scores, + compute_shap_top_features, build_reason, cluster_anomalies, +) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# ANALYSE SEMI-SUPERVISÉE +# ═══════════════════════════════════════════════════════════════════════════════ +def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): + """Applique le pipeline de détection semi-supervisée sur un sous-ensemble du trafic. + + Trifurque le trafic en bots connus, bots Anubis ALLOW et trafic inconnu, + entraîne ou charge le modèle IsolationForest sur la baseline humaine, + score le trafic inconnu, applique les améliorations A2/A4/A6/A8, + et retourne (threats, all_scored) sous forme de DataFrames. + + Effets de bord : écriture dans les logs de décision via log_decision. + """ + # 1. Bots connus (dict_bot_ip / dict_bot_ja4) → exclus du scoring IF + known_bots = df[df['bot_name'] != ''].copy() + rest = df[df['bot_name'] == ''].copy() + + # 2. Bots Anubis ALLOW → bots légitimes, exclus du scoring IF + anubis_allow = rest[rest['anubis_bot_action'] == 'ALLOW'].copy() + + # 3. Tout le reste passe par l'IsolationForest pour un score réel : + # - DENY : menaces identifiées par règles Anubis → IF donne le score de sévérité + # - WEIGH / inconnu → scorés normalement (anubis_is_flagged=1 pour WEIGH) + # Les DENY sont TOUJOURS inclus dans les threats, indépendamment du seuil IF. + unknown_traffic = rest[rest['anubis_bot_action'] != 'ALLOW'].copy() + human_baseline = unknown_traffic[unknown_traffic['asn_label'] == 'isp'] + + log_info(f'[{name}] ── Triage ──────────────────────────────────────') + log_info(f'[{name}] Total sessions : {len(df):>6}') + log_info(f'[{name}] Bots connus (dict) : {len(known_bots):>6}') + log_info(f'[{name}] Anubis ALLOW : {len(anubis_allow):>6}') + log_info(f'[{name}] Trafic à scorer (IF) : {len(unknown_traffic):>6}') + log_info(f'[{name}] Baseline ISP (human) : {len(human_baseline):>6} (seuil min=500)') + + # A7 — Valider les features avant tout traitement + valid_features = validate_features(df, features, name, cycle_id) + if valid_features is None: + return pd.DataFrame(), pd.DataFrame() + + if len(human_baseline) < 500: + log_info(f"[{name}] ⚠ Données humaines insuffisantes ({len(human_baseline)} < 500) — cycle ignoré.") + log_info(f"[{name}] Distribution asn_label dans le trafic à scorer :") + if 'asn_label' in unknown_traffic.columns: + for label, cnt in unknown_traffic['asn_label'].value_counts().head(8).items(): + log_info(f"[{name}] {label:>15} : {cnt}") + log_decision('SKIPPED_LOW_DATA', cycle_id, name, { + 'human_count': len(human_baseline), 'unknown_count': len(unknown_traffic) + }) + return pd.DataFrame(), pd.DataFrame() + + log_info(f'[{name}] ── Modèle EIF ─────────────────────────────────') + log_info(f'[{name}] Features validées : {len(valid_features)}/{len(features)} ({", ".join(valid_features[:5])}{"…" if len(valid_features) > 5 else ""})') + + # A1 — Dérive conceptuelle intégrée dans load_or_train_model + model, ae_model, model_features = load_or_train_model(name, human_baseline, valid_features, cycle_id) + # Utiliser les features du modèle (possiblement différentes après pruning/chargement) + scoring_features = [f for f in model_features if f in unknown_traffic.columns] + unknown_traffic = unknown_traffic.copy() + + X_test = unknown_traffic[scoring_features].replace([np.inf, -np.inf], np.nan) + X_test = X_test.fillna(X_test.median()) + raw_scores = model.decision_function(X_test) + + # isotree renvoie des scores dans [0, 1] : 0.5 = frontière, >0.5 = anomal + # sklearn renvoie des scores centrés sur 0 : <0 = anomal, >0 = normal + # Conversion : sklearn_equiv = 0.5 - isotree_score + # isotree 0.8 → -0.3 (CRITICAL) | isotree 0.5 → 0.0 (frontière) + # isotree 0.3 → +0.2 (NORMAL) + if EIF_AVAILABLE: + raw_scores = 0.5 - raw_scores + + log_info(f'[{name}] Scoring EIF : {len(X_test)} sessions scorées (min={raw_scores.min():.4f}, max={raw_scores.max():.4f}, mean={raw_scores.mean():.4f})') + + # Combinaison EIF + Autoencoder si disponible + # Score final = (1-α) * eif_norm + α * ae_norm où α = AE_WEIGHT + if ae_model is not None and AE_WEIGHT > 0: + try: + ae_recon_errors = ae_model.score_samples(X_test.values) + ae_norm = normalize_scores(-ae_recon_errors) # plus élevé = plus anomal + eif_norm = normalize_scores(raw_scores) + combined_norm = (1 - AE_WEIGHT) * eif_norm + AE_WEIGHT * ae_norm + unknown_traffic['ae_recon_error'] = ae_recon_errors + unknown_traffic['anomaly_score'] = combined_norm + log_info(f"[{name}] Score combiné EIF+AE (α={AE_WEIGHT}): ae_mean={ae_recon_errors.mean():.6f}") + except Exception as exc: + log_info(f"[{name}] AE scoring échoué : {exc} — utilisation EIF seul.") + unknown_traffic['ae_recon_error'] = 0.0 + unknown_traffic['anomaly_score'] = normalize_scores(raw_scores) + else: + unknown_traffic['ae_recon_error'] = 0.0 + unknown_traffic['anomaly_score'] = normalize_scores(raw_scores) + + # raw_anomaly_score : score brut IF pour comparaison au seuil et assignation du threat_level + unknown_traffic['raw_anomaly_score'] = raw_scores + unknown_traffic['model_name'] = name + + # XGBoost supervisé — troisième voix (si labels historiques disponibles) + unknown_traffic['xgb_prob'] = 0.0 + if XGB_AVAILABLE and XGB_WEIGHT > 0: + try: + xgb_client = get_client() + xgb_model, xgb_feats = load_or_train_xgb(name, xgb_client, scoring_features, cycle_id) + if xgb_model is not None and xgb_feats is not None: + # XGB peut utiliser un sous-ensemble de features (celles disponibles dans la vue) + xgb_cols = [f for f in xgb_feats if f in unknown_traffic.columns] + X_xgb = unknown_traffic[xgb_cols].replace([np.inf, -np.inf], np.nan).fillna(0) + xgb_probs = xgb_model.predict_proba(X_xgb.values)[:, 1] + unknown_traffic['xgb_prob'] = xgb_probs + # Méta-learner : combiner anomaly_score (EIF+AE) et xgb_prob + # anomaly_score déjà normalisé [0,1], xgb_prob est [0,1] + α_xgb = XGB_WEIGHT + unknown_traffic['anomaly_score'] = ( + (1 - α_xgb) * unknown_traffic['anomaly_score'] + α_xgb * xgb_probs + ) + log_info(f"[{name}] Score combiné EIF+AE+XGB (β={α_xgb}): xgb_mean={xgb_probs.mean():.4f}") + except Exception as exc: + log_info(f"[{name}] XGBoost scoring échoué : {exc} — EIF+AE seuls.") + + # A2 — Seuil adaptatif calculé sur les scores BRUTS (même échelle que ANOMALY_THRESHOLD) + effective_threshold = compute_adaptive_threshold(raw_scores) + log_info(f"[{name}] Seuil effectif : {effective_threshold:.4f} (statique={ANOMALY_THRESHOLD}, percentile={ANOMALY_PERCENTILE})") + + # A6 — Pénaliser les IPs récurrentes sur le score BRUT avant comparaison au seuil + if RECURRENCE_WEIGHT > 0: + recurrences = unknown_traffic['src_ip'].map(recurrence_map).fillna(0) + penalty = np.log1p(recurrences.values) * RECURRENCE_WEIGHT + unknown_traffic['raw_anomaly_score'] = unknown_traffic['raw_anomaly_score'] - penalty + + # Assigner threat_level à TOUTES les sessions scorées (pour ml_all_scores) + unknown_traffic['threat_level'] = unknown_traffic['raw_anomaly_score'].apply(score_to_threat_level) + unknown_traffic['recurrence'] = unknown_traffic['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1 + unknown_traffic['campaign_id'] = -1 + + # Extraire les DENY (maintenant avec leur vrai score IF) et forcer leur threat_level + deny_mask = unknown_traffic['anubis_bot_action'] == 'DENY' + unknown_traffic.loc[deny_mask, 'threat_level'] = 'ANUBIS_DENY' + + # ── A9 — Classification multifactorielle des navigateurs légitimes ───────── + # 5 axes indépendants : JA4 connu, structure JA4, headers HTTP modernes, + # comportement de navigation, cohérence TLS/TCP. + # browser_confidence [0..1] = combinaison pondérée des 5 axes. + # La classification n'exige plus que le JA4 soit dans le dictionnaire statique : + # un JA4 inconnu avec des signaux HTTP/TLS/nav forts sera quand même reconnu. + _bconf = unknown_traffic.get('browser_confidence', pd.Series(0, index=unknown_traffic.index)).fillna(0) + _ibf = unknown_traffic.get('inferred_browser_family', pd.Series('', index=unknown_traffic.index)).fillna('').astype(str) + browser_legit_mask = ( + (_bconf >= BROWSER_CONFIDENCE_THRESHOLD) & # confiance multifactorielle + (_ibf != '') & # famille identifiée (dict ou inférée) + (unknown_traffic['threat_level'].isin(['NORMAL', 'LOW'])) & # pas de menace IF + (~deny_mask) # pas un DENY Anubis + ) + + # Propagation par cohorte JA4 : si ≥ BROWSER_COHORT_RATIO% des sessions d'un JA4 + # sont déjà classées navigateur, propager aux sessions restantes du même JA4. + if browser_legit_mask.any(): + ja4_col = unknown_traffic['ja4'] + legit_per_ja4 = ja4_col[browser_legit_mask].value_counts() + total_per_ja4 = ja4_col.value_counts() + ratio_per_ja4 = (legit_per_ja4 / total_per_ja4).dropna() + cohort_ja4s = set(ratio_per_ja4[ratio_per_ja4 >= BROWSER_COHORT_RATIO].index) + if cohort_ja4s: + cohort_mask = ( + ja4_col.isin(cohort_ja4s) & + (~browser_legit_mask) & # pas déjà classé + (unknown_traffic['threat_level'].isin(['NORMAL', 'LOW'])) & + (~deny_mask) + ) + browser_legit_mask = browser_legit_mask | cohort_mask + n_cohort = cohort_mask.sum() + if n_cohort > 0: + log_info(f"[{name}] Propagation cohorte JA4 : {n_cohort} sessions supplémentaires ({len(cohort_ja4s)} JA4)") + + if browser_legit_mask.any(): + unknown_traffic.loc[browser_legit_mask, 'threat_level'] = 'LEGITIMATE_BROWSER' + # Utiliser la famille inférée (dict ou structurel) + _family_disp = _ibf[browser_legit_mask].where(_ibf[browser_legit_mask] != '', 'Unknown') + unknown_traffic.loc[browser_legit_mask, 'reason'] = ( + '[Navigateur légitime] ' + _family_disp + + ' (confiance=' + _bconf[browser_legit_mask].round(2).astype(str) + ')' + ) + n_legit = browser_legit_mask.sum() + families = _ibf[browser_legit_mask].value_counts().to_dict() + # Log des axes moyens pour diagnostic + ax_means = {} + for ax in ['axis_ja4_known', 'axis_ja4_struct', 'axis_http_modern', + 'axis_nav_behavior', 'axis_tls_coherence']: + col = unknown_traffic.get(ax, None) + if col is not None: + ax_means[ax.replace('axis_', '')] = round(float(col[browser_legit_mask].mean()), 3) + log_info(f"[{name}] {n_legit} session(s) classée(s) LEGITIMATE_BROWSER : {families}") + log_info(f"[{name}] Axes moyens : {ax_means}") + log_decision('LEGITIMATE_BROWSER', cycle_id, name, { + 'count': int(n_legit), 'families': families, + 'mean_confidence': round(float(_bconf[browser_legit_mask].mean()), 3), + 'axis_means': ax_means, + }) + + # Capturer toutes les sessions scorées (avant filtrage par seuil) — pour ml_all_scores + all_scored = unknown_traffic.copy() + + if not known_bots.empty: + known_bots = known_bots.copy() + known_bots['anomaly_score'] = 0.0 + known_bots['raw_anomaly_score'] = 0.0 + known_bots['ae_recon_error'] = 0.0 + known_bots['xgb_prob'] = 0.0 + known_bots['threat_level'] = 'KNOWN_BOT' + known_bots['model_name'] = name + known_bots['campaign_id'] = -1 + known_bots['reason'] = '[Identification] Bot légitime: ' + known_bots['bot_name'] + known_bots['recurrence'] = known_bots['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1 + for _, row in known_bots.iterrows(): + log_decision('KNOWN_BOT', cycle_id, name, { + 'src_ip': row.get('src_ip', ''), 'bot_name': row.get('bot_name', ''), + 'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''), + 'asn_domain': row.get('asn_domain', ''), 'country_code': row.get('country_code', ''), + 'recurrence': int(row.get('recurrence', 1)) + }) + + # ── Anubis ALLOW : bots légitimes identifiés par règles Anubis ─────────── + if not anubis_allow.empty: + anubis_allow = anubis_allow.copy() + anubis_allow['anomaly_score'] = 0.0 + anubis_allow['raw_anomaly_score'] = 0.0 + anubis_allow['ae_recon_error'] = 0.0 + anubis_allow['xgb_prob'] = 0.0 + anubis_allow['threat_level'] = 'KNOWN_BOT' + anubis_allow['bot_name'] = anubis_allow['anubis_bot_name'] + anubis_allow['model_name'] = name + anubis_allow['campaign_id'] = -1 + anubis_allow['reason'] = '[Anubis ALLOW] ' + anubis_allow['anubis_bot_name'] + anubis_allow['recurrence'] = anubis_allow['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1 + for _, row in anubis_allow.iterrows(): + log_decision('KNOWN_BOT', cycle_id, name, { + 'src_ip': row.get('src_ip', ''), 'bot_name': row.get('anubis_bot_name', ''), + 'anubis_bot_name': row.get('anubis_bot_name', ''), + 'anubis_bot_action': row.get('anubis_bot_action', ''), + 'anubis_bot_category': row.get('anubis_bot_category', ''), + 'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''), + 'asn_domain': row.get('asn_domain', ''), 'country_code': row.get('country_code', ''), + 'recurrence': int(row.get('recurrence', 1)), + }) + + # ── Anubis DENY : scorés par IF, toujours inclus dans les threats ──────── + # Extraits de unknown_traffic après scoring — ils ont leur vrai score IF. + anubis_deny = unknown_traffic[deny_mask].copy() + if not anubis_deny.empty: + anubis_deny['reason'] = '[Anubis DENY] ' + anubis_deny['anubis_bot_name'].fillna('') + \ + ' | ' + anubis_deny['raw_anomaly_score'].apply(lambda s: f'IF={s:.4f}') + log_info(f"[{name}] Anubis DENY: {len(anubis_deny)} IP(s) scorées par IF " + f"(score moyen: {anubis_deny['raw_anomaly_score'].mean():.4f}).") + for _, row in anubis_deny.iterrows(): + log_decision('ANUBIS_DENY', cycle_id, name, { + 'src_ip': row.get('src_ip', ''), 'anubis_bot_name': row.get('anubis_bot_name', ''), + 'anubis_bot_action': row.get('anubis_bot_action', ''), + 'anubis_bot_category': row.get('anubis_bot_category', ''), + 'anomaly_score': round(float(row.get('anomaly_score', 0)), 4), + 'raw_anomaly_score': round(float(row.get('raw_anomaly_score', 0)), 4), + 'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''), + 'asn_domain': row.get('asn_domain', ''), 'country_code': row.get('country_code', ''), + 'recurrence': int(row.get('recurrence', 1)), + }) + + # Filtrer sur raw_anomaly_score (A6 inclus) — seulement le trafic non-DENY et non-navigateur légitime + # Les DENY sont toujours des threats, indépendamment du seuil IF + # Les LEGITIMATE_BROWSER sont exclus des anomalies (navigateurs confirmés) + non_deny_traffic = unknown_traffic[~deny_mask & (unknown_traffic['threat_level'] != 'LEGITIMATE_BROWSER')] + anomalies = non_deny_traffic[non_deny_traffic['raw_anomaly_score'] < effective_threshold].copy() + if not anomalies.empty: + log_info(f"[{name}] ALERT: {len(anomalies)} anomalies détectées (seuil={effective_threshold:.4f}).") + anomalies['recurrence'] = anomalies['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1 + + # A4 — Explainabilité SHAP : top features responsables de chaque anomalie + X_anomalies = X_test.loc[anomalies.index] + shap_tops = compute_shap_top_features(model, X_anomalies, valid_features) + anomalies['reason'] = [ + build_reason(name, row, shap) + for (_, row), shap in zip(anomalies.iterrows(), shap_tops) + ] + + # A8 — Clustering DBSCAN pour identifier les campagnes coordonnées + if ENABLE_CLUSTERING: + anomalies = cluster_anomalies(anomalies, scoring_features, ae_model=ae_model) + + anomalies['ja4'] = anomalies['ja4'].replace({'': 'HTTP_CLEAR_TEXT'}) + for _, row in anomalies.iterrows(): + log_decision('ANOMALY', cycle_id, name, { + 'src_ip': row.get('src_ip', ''), 'anomaly_score': round(float(row.get('anomaly_score', 0)), 4), + 'raw_anomaly_score': round(float(row.get('raw_anomaly_score', 0)), 4), + 'threat_level': row.get('threat_level', ''), 'recurrence': int(row.get('recurrence', 1)), + 'hit_velocity': round(float(row.get('hit_velocity', 0)), 2), + 'fuzzing_index': round(float(row.get('fuzzing_index', 0)), 2), + 'post_ratio': round(float(row.get('post_ratio', 0)), 3), + 'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''), + 'asn_detail': row.get('asn_detail', ''), 'asn_domain': row.get('asn_domain', ''), + 'country_code': row.get('country_code', ''), 'asn_label': row.get('asn_label', ''), + 'ja4': row.get('ja4', ''), 'host': row.get('host', ''), + 'correlated': int(row.get('correlated', 0)), 'campaign_id': int(row.get('campaign_id', -1)), + 'effective_threshold': round(effective_threshold, 4), 'reason': row.get('reason', '') + }) + + threats = pd.concat([df for df in [ + anomalies if not anomalies.empty else None, + known_bots if not known_bots.empty else None, + anubis_allow if not anubis_allow.empty else None, + anubis_deny if not anubis_deny.empty else None, + ] if df is not None], ignore_index=True) + + # Inclure anubis_allow dans all_scored pour traçabilité dans ml_all_scores. + # Ces IPs sont exclues de l'analyse IF mais doivent apparaître dans la table + # de scores avec threat_level='KNOWN_BOT' et anomaly_score=0.0. + if not anubis_allow.empty: + all_scored = pd.concat([all_scored, anubis_allow], ignore_index=True) + + # ── Résumé du modèle ───────────────────────────────────────────────────── + n_threats = len(threats) if not threats.empty else 0 + n_anomalies = len(anomalies) if not anomalies.empty else 0 + n_legit_browser = int(browser_legit_mask.sum()) if browser_legit_mask is not None else 0 + n_deny = len(anubis_deny) if not anubis_deny.empty else 0 + tl_counts = threats['threat_level'].value_counts().to_dict() if not threats.empty else {} + tl_str = ', '.join(f'{k}={v}' for k, v in sorted(tl_counts.items())) if tl_counts else 'aucune' + log_info(f'[{name}] ── Résultat ────────────────────────────────────') + log_info(f'[{name}] Menaces totales : {n_threats:>6} ({tl_str})') + log_info(f'[{name}] Anomalies IF : {n_anomalies:>6} (seuil={effective_threshold:.4f})') + log_info(f'[{name}] Navigateurs légit. : {n_legit_browser:>6}') + log_info(f'[{name}] Anubis DENY (forcé) : {n_deny:>6}') + log_info(f'[{name}] Sessions scorées : {len(all_scored):>6} (→ ml_all_scores)') + + return threats, all_scored diff --git a/services/bot-detector/bot_detector/preprocessing.py b/services/bot-detector/bot_detector/preprocessing.py new file mode 100644 index 0000000..4adf085 --- /dev/null +++ b/services/bot-detector/bot_detector/preprocessing.py @@ -0,0 +1,110 @@ +"""Prétraitement des données et listes de features. + +Normalise les colonnes, enrichit via l'identification multifactorielle des +navigateurs, et définit les listes de features pour chaque modèle. +""" +import pandas as pd +import numpy as np + +from .config import BROWSER_CONFIDENCE_THRESHOLD +from .log import log_info +from .browser import _compute_browser_axes, _parse_ja4_columns, _infer_browser_family + + +# ═══════════════════════════════════════════════════════════════════════════════ +# LISTES DE FEATURES PAR MODÈLE +# ═══════════════════════════════════════════════════════════════════════════════ + +# Features communes (L7 HTTP pur, disponibles correlated=0 et 1) +FEATURES = [ + 'hits', 'hit_velocity', 'fuzzing_index', 'post_ratio', 'port_exhaustion_ratio', + 'orphan_ratio', 'max_keepalives', 'tcp_shared_count', 'header_order_shared_count', + 'header_count', 'has_accept_language', 'has_cookie', 'has_referer', + 'modern_browser_score', 'ua_ch_mismatch', 'ip_id_zero_ratio', + 'request_size_variance', 'multiplexing_efficiency', 'mss_mobile_mismatch', + 'asset_ratio', 'direct_access_ratio', 'is_ua_rotating', 'distinct_ja4_count', + 'src_port_density', 'ja4_asn_concentration', 'ja4_country_concentration', 'is_rare_ja4', + 'header_order_confidence', 'distinct_header_orders', 'temporal_entropy', + 'path_diversity_ratio', 'url_depth_variance', 'anomalous_payload_ratio', + # B4-B7 : features L7 pures + 'head_ratio', 'sec_fetch_absence_rate', 'generic_accept_ratio', 'http10_ratio', + # Anubis + 'anubis_is_flagged', + # Browser multifactoriel + 'is_known_browser', 'browser_consistency_score', 'browser_confidence', + 'axis_ja4_known', 'axis_ja4_struct', 'axis_http_modern', + 'axis_nav_behavior', 'axis_tls_coherence', + # HTTP + 'missing_accept_enc_ratio', 'http_scheme_ratio', + # Thèse §5 + 'path_transition_entropy', + 'cadence_cv', 'burst_ratio', 'pause_ratio', + 'lag1_autocorrelation', 'benford_deviation', + 'host_diversity', 'host_sweep_speed', 'host_coverage_uniformity', +] + +# Features supplémentaires pour le modèle Complet (données TCP/TLS requises) +FEATURES_COMPLET = FEATURES + [ + 'tcp_jitter_variance', 'alpn_http_mismatch', 'is_alpn_missing', 'sni_host_mismatch', + # B1-B3, B8 : features TLS/TCP + 'ja3_diversity_ratio', 'syn_timing_cv', 'tls12_ratio', 'ip_df_variance', + # TTL fingerprinting OS + TCP window scale + 'avg_ttl', 'ttl_std', 'no_window_scale_ratio', + # §5.5 — Dérive JA4 intra-session + 'ja4_drift_ratio', +] + + +# ═══════════════════════════════════════════════════════════════════════════════ +# PRÉTRAITEMENT +# ═══════════════════════════════════════════════════════════════════════════════ + +def preprocess_df(df: pd.DataFrame) -> pd.DataFrame: + """Normalise les colonnes et remplit les valeurs manquantes (commun 1h et 24h).""" + df.columns = [c.split('.')[-1] for c in df.columns] + for col in ['src_ip', 'ja4', 'host', 'bot_name', 'anubis_bot_name', 'anubis_bot_action', + 'anubis_bot_category', 'asn_number', 'asn_org', 'asn_detail', 'asn_domain', + 'country_code', 'asn_label']: + if col in df.columns: + df[col] = df[col].fillna('').astype(str) + + # ── A9 — Identification multifactorielle des navigateurs ────────────────── + browser_axes = _compute_browser_axes(df) + ja4_parsed = _parse_ja4_columns(df.get('ja4', pd.Series('', index=df.index))) + + df['inferred_browser_family'] = _infer_browser_family(df, ja4_parsed, browser_axes) + + df['browser_confidence'] = browser_axes['browser_confidence'] + for ax in ['axis_ja4_known', 'axis_ja4_struct', 'axis_http_modern', + 'axis_nav_behavior', 'axis_tls_coherence']: + df[ax] = browser_axes[ax] + + # Rétro-compatibilité + df['is_known_browser'] = browser_axes['axis_ja4_known'].astype(int) + df['browser_consistency_score'] = ( + browser_axes['axis_ja4_known'].clip(0, 1) + + browser_axes['axis_http_modern'].apply(lambda x: 1 if x >= 0.5 else 0) + + browser_axes['axis_nav_behavior'].apply(lambda x: 1 if x >= 0.5 else 0) + + browser_axes['axis_tls_coherence'].apply(lambda x: 1 if x >= 0.5 else 0) + + (df['inferred_browser_family'] != '').astype(int) + ).astype(int) + + # anubis_is_flagged : signal de suspicion modéré + df['anubis_is_flagged'] = ( + (df.get('anubis_bot_name', pd.Series('', index=df.index)) != '') & + (~df.get('anubis_bot_action', pd.Series('', index=df.index)).isin(['ALLOW', 'DENY', ''])) + ).astype(int) + + # Imputation intelligente + binary_features = { + 'has_accept_language', 'has_cookie', 'has_referer', 'ua_ch_mismatch', + 'is_ua_rotating', 'is_alpn_missing', 'sni_host_mismatch', 'alpn_http_mismatch', + 'mss_mobile_mismatch', 'anubis_is_flagged', 'is_rare_ja4', + } + for col in df.columns: + if col in binary_features: + df[col] = df[col].fillna(-1) + elif df[col].dtype in ('float64', 'float32', 'int64', 'int32', 'uint64', 'uint32'): + df[col] = df[col].replace([np.inf, -np.inf], np.nan).fillna(df[col].median()) + + return df diff --git a/services/bot-detector/bot_detector/scoring.py b/services/bot-detector/bot_detector/scoring.py new file mode 100644 index 0000000..34d8667 --- /dev/null +++ b/services/bot-detector/bot_detector/scoring.py @@ -0,0 +1,279 @@ +"""Scoring, dérive, validation, seuil adaptatif, SHAP et clustering. + +Regroupe les fonctions de scoring utilisées par le pipeline de détection : + - A1 : détection de dérive conceptuelle (KS-test / Z-score) + - A2 : seuil adaptatif basé sur le percentile des scores négatifs + - A4 : explainabilité SHAP (top features contributives) + - A7 : validation de complétude des features + - A8 : clustering HDBSCAN / DBSCAN des anomalies + - A10 : normalisation [0,1] des scores d'anomalie +""" +import numpy as np +import pandas as pd + +from .config import ( + ANOMALY_THRESHOLD, ANOMALY_PERCENTILE, + DRIFT_THRESHOLD, MIN_VALID_FEATURE_RATIO, + ENABLE_SHAP, SHAP_AVAILABLE, ENABLE_CLUSTERING, CLUSTERING_MIN_SAMPLES, + EIF_AVAILABLE, HDBSCAN_AVAILABLE, + STRUCTURAL_EXCLUDED_FEATURES, +) +from .log import log_info, log_decision + +# Imports conditionnels depuis config +if HDBSCAN_AVAILABLE: + from .config import _hdbscan +from .config import DBSCAN, StandardScaler +if ENABLE_SHAP: + from .config import _shap + + +# Cache par modèle : dernier état des features invalides (pour ne logguer que les changements). +_feature_warning_cache: dict = {} + + +# ═══════════════════════════════════════════════════════════════════════════════ +# A1 — DÉTECTION DE DÉRIVE CONCEPTUELLE +# ═══════════════════════════════════════════════════════════════════════════════ + +def _compute_drift_score(baseline_stats: dict, current_baseline: pd.DataFrame, + features: list) -> float: + """Compare la distribution actuelle de la baseline humaine avec celle de l'entraînement. + + Utilise le test de Kolmogorov-Smirnov bilatéral par feature. La distribution + d'entraînement est reconstruite à partir d'un quantile digest (p10..p90) par + interpolation linéaire — bien plus fidèle qu'une approximation N(μ,σ) pour les + features asymétriques ou multimodales. + Retourne la fraction de features en dérive significative (p < 0.05). + """ + if not baseline_stats or current_baseline.empty: + return 0.0 + try: + from scipy.stats import ks_2samp + except ImportError: + return _compute_drift_score_zscore(baseline_stats, current_baseline, features) + + drifted = 0 + tested = 0 + rng = np.random.default_rng(42) + for feat in features: + if feat not in baseline_stats or feat not in current_baseline.columns: + continue + stats = baseline_stats[feat] + curr_values = current_baseline[feat].dropna() + if len(curr_values) < 30: + continue + trained_std = stats.get('std', 0) + if trained_std < 1e-9: + continue + + quantile_keys = ['p10', 'p25', 'p50', 'p75', 'p90'] + if all(k in stats for k in quantile_keys): + quantile_probs = np.array([0.10, 0.25, 0.50, 0.75, 0.90]) + quantile_vals = np.array([stats[k] for k in quantile_keys]) + u = rng.uniform(0, 1, size=len(curr_values)) + synthetic_trained = np.interp(u, quantile_probs, quantile_vals) + else: + synthetic_trained = rng.normal(stats['mean'], trained_std, size=len(curr_values)) + + _, p_value = ks_2samp(curr_values.values, synthetic_trained) + if p_value < 0.05: + drifted += 1 + tested += 1 + return drifted / max(tested, 1) + + +def _compute_drift_score_zscore(baseline_stats: dict, current_baseline: pd.DataFrame, + features: list) -> float: + """Fallback Z-score pour la détection de dérive quand scipy n'est pas disponible.""" + if not baseline_stats or current_baseline.empty: + return 0.0 + drifted = 0 + tested = 0 + for feat in features: + if feat not in baseline_stats or feat not in current_baseline.columns: + continue + stats = baseline_stats[feat] + curr_mean = current_baseline[feat].mean() + trained_std = stats.get('std', 0) + if trained_std < 1e-9: + continue + z = abs(curr_mean - stats['mean']) / trained_std + if z > 2.0: + drifted += 1 + tested += 1 + return drifted / max(tested, 1) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# A7 — VALIDATION DE COMPLÉTUDE DES FEATURES +# ═══════════════════════════════════════════════════════════════════════════════ + +def validate_features(df: pd.DataFrame, features: list, name: str, + cycle_id: str): + """Vérifie que les features sont présentes et non constantes dans le DataFrame. + + Retourne la liste des features valides, ou None si le ratio est insuffisant. + """ + structural = STRUCTURAL_EXCLUDED_FEATURES.get(name, []) + active_features = [f for f in features if f not in structural] + + missing = [f for f in active_features if f not in df.columns] + present = [f for f in active_features if f in df.columns] + + zero_val = [f for f in present if df[f].nunique() == 1 and df[f].max() == 0] + unique_val = [f for f in present if df[f].nunique() == 1 and df[f].max() != 0] + constant = zero_val + unique_val + valid = [f for f in present if f not in constant] + + current_state = (frozenset(missing), frozenset(zero_val), frozenset(unique_val)) + state_changed = _feature_warning_cache.get(name) != current_state + _feature_warning_cache[name] = current_state + + if structural: + log_info(f"[{name}] Features exclues (structurelles / L4 indisponible) : {structural}") + if state_changed: + if missing: + log_info(f"[{name}] Features absentes du schéma : {missing}") + if zero_val: + log_info(f"[{name}] Features à 0 (pipeline non-alimenté) : {zero_val}") + if unique_val: + log_info(f"[{name}] Features non-discriminantes (agrégat global) : {unique_val}") + if missing or zero_val or unique_val: + log_decision('FEATURE_WARNING', cycle_id, name, { + 'structural': structural, 'missing': missing, + 'zero': zero_val, 'unique_nonzero': unique_val, + 'valid_count': len(valid), 'total': len(active_features) + }) + + ratio = len(valid) / max(len(active_features), 1) + if ratio < MIN_VALID_FEATURE_RATIO: + log_info(f"[{name}] Ratio features valides insuffisant ({ratio:.0%} < {MIN_VALID_FEATURE_RATIO:.0%}) — cycle ignoré.") + log_decision('SKIPPED_INVALID_FEATURES', cycle_id, name, { + 'valid_ratio': round(ratio, 3), 'threshold': MIN_VALID_FEATURE_RATIO + }) + return None + return valid + + +# ═══════════════════════════════════════════════════════════════════════════════ +# A2 / A10 — SEUIL ADAPTATIF ET NORMALISATION +# ═══════════════════════════════════════════════════════════════════════════════ + +def compute_adaptive_threshold(scores: np.ndarray) -> float: + """A2 : Seuil adaptatif basé sur le percentile des scores négatifs.""" + neg_scores = scores[scores < 0] + if len(neg_scores) == 0: + return ANOMALY_THRESHOLD + adaptive = float(np.percentile(neg_scores, ANOMALY_PERCENTILE)) + return min(adaptive, ANOMALY_THRESHOLD) + + +def normalize_scores(scores: np.ndarray) -> np.ndarray: + """A10 : Normalise les scores en [0, 1] avec 1 = le plus anomal.""" + result = np.zeros_like(scores) + mask = scores < 0 + if mask.sum() == 0: + return result + s_min = scores[mask].min() + if s_min == 0: + return result + result[mask] = np.clip(-scores[mask] / (-s_min + 1e-9), 0.0, 1.0) + return result + + +# ═══════════════════════════════════════════════════════════════════════════════ +# A4 — EXPLAINABILITÉ SHAP +# ═══════════════════════════════════════════════════════════════════════════════ + +def _compute_shap_top_features(model, X: pd.DataFrame, features: list, + n_top: int = 5) -> list: + """ + Calcule les valeurs SHAP pour chaque ligne de X et retourne les n_top features + les plus contributives (valeur SHAP la plus négative = plus responsable de l'anomalie). + Retourne une liste de dicts {feature: shap_value} par ligne. + + Utilise TreeExplainer pour sklearn, et un échantillon Permutation pour isotree. + """ + if not ENABLE_SHAP or X.empty: + return [{}] * len(X) + try: + if EIF_AVAILABLE: + sample_size = min(100, len(X)) + X_sample = X.sample(n=sample_size, random_state=42) if len(X) > sample_size else X + explainer = _shap.Explainer(model.decision_function, X_sample) + shap_values = explainer(X).values + else: + explainer = _shap.TreeExplainer(model) + shap_values = explainer.shap_values(X) + result = [] + for sv in shap_values: + pairs = sorted(zip(features, sv), key=lambda x: x[1]) + result.append({f: round(float(v), 4) for f, v in pairs[:n_top]}) + return result + except Exception as e: + log_info(f"[SHAP] Erreur de calcul SHAP: {e}") + return [{}] * len(X) + + +def _build_reason(name: str, row: pd.Series, shap_top: dict) -> str: + """Construit le champ reason enrichi avec le top SHAP ou les métriques clés.""" + # Utilise le score brut pour l'affichage (plus interprétable que le score normalisé) + score = round(float(row.get('raw_anomaly_score', row.get('anomaly_score', 0))), 3) + threat = row.get('threat_level', '') + if shap_top: + top_str = ' | '.join(f"{f}({v:+.3f})" for f, v in shap_top.items()) + return f"[{name}] Score: {score} | SHAP: {top_str} | Threat: {threat}" + vel = round(float(row.get('hit_velocity', 0)), 1) + fuzz = round(float(row.get('fuzzing_index', 0)), 1) + return f"[{name}] Score: {score} | Vel: {vel} req/s | Fuzzing: {fuzz} | Threat: {threat}" + + +# ═══════════════════════════════════════════════════════════════════════════════ +# A8 — CLUSTERING COMPORTEMENTAL DES ANOMALIES +# ═══════════════════════════════════════════════════════════════════════════════ + +def _cluster_anomalies(anomalies: pd.DataFrame, features: list, + ae_model=None) -> pd.DataFrame: + """A8 : Applique HDBSCAN (ou DBSCAN en fallback) sur les features normalisées des anomalies. + + HDBSCAN est préféré car il détermine automatiquement le nombre de clusters + et la densité optimale (pas de paramètre eps à régler manuellement). + Si un autoencoder est disponible, utilise l'espace latent (16-dim) au lieu des features brutes + pour un clustering plus expressif dans un espace de dimension réduite. + Ajoute une colonne campaign_id : −1 = IP isolée, ≥0 = identifiant de campagne coordonnée. + """ + anomalies = anomalies.copy() + if len(anomalies) < CLUSTERING_MIN_SAMPLES: + anomalies['campaign_id'] = -1 + return anomalies + try: + X = anomalies[features].replace([np.inf, -np.inf], np.nan).fillna(0) + if ae_model is not None: + try: + X_scaled = ae_model.encode(X.values) + algo_prefix = 'AE+' + except Exception: + X_scaled = StandardScaler().fit_transform(X) + algo_prefix = '' + else: + X_scaled = StandardScaler().fit_transform(X) + algo_prefix = '' + if HDBSCAN_AVAILABLE: + clusterer = _hdbscan.HDBSCAN( + min_cluster_size=CLUSTERING_MIN_SAMPLES, + min_samples=max(2, CLUSTERING_MIN_SAMPLES - 1), + cluster_selection_method='eom' + ) + labels = clusterer.fit_predict(X_scaled) + else: + labels = DBSCAN(eps=0.5, min_samples=CLUSTERING_MIN_SAMPLES).fit_predict(X_scaled) + anomalies['campaign_id'] = labels + n_campaigns = len(set(labels)) - (1 if -1 in labels else 0) + algo = algo_prefix + ('HDBSCAN' if HDBSCAN_AVAILABLE else 'DBSCAN') + if n_campaigns > 0: + log_info(f"[{algo}] {n_campaigns} campagne(s) détectée(s) parmi {len(anomalies)} anomalies.") + except Exception as e: + log_info(f"[Clustering] Erreur de clustering: {e}") + anomalies['campaign_id'] = -1 + return anomalies