From 3ae8c7d9c9d130e4231d82a51d29a43048467a3a Mon Sep 17 00:00:00 2001 From: toto Date: Wed, 8 Apr 2026 02:09:18 +0200 Subject: [PATCH] feat(bot-detector): upgrade to state-of-the-art detection pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix UnboundLocalError on global _consecutive_failures/_service_healthy - Add SQL identifier validation for DB names at startup - Replace Z-score drift detection with KS test (scipy.stats.ks_2samp) - Replace DBSCAN with HDBSCAN (adaptive clustering, no epsilon needed) - Fix NaN→0 blanket imputation with per-feature median/sentinel strategy - Add 80/20 temporal train/validation split with offline metrics logging - Integrate thesis §5 features from view_thesis_features_1h: path_transition_entropy, cadence_cv, burst/pause ratios, host_diversity, host_sweep_speed, host_coverage_uniformity, ja4_drift_ratio (Complet model only) - Add SOC feedback loop: read classifications from audit_logs, reclassify FP IPs as human, exclude TP IPs from baseline - Update dependencies: clickhouse-connect 0.8.12, scikit-learn 1.6.1, pandas 2.2.3, shap 0.47.2, add scipy>=1.14, hdbscan>=0.8.38 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../bot-detector/bot_detector/bot_detector.py | 215 ++++++++++++++++-- .../bot_detector/requirements.txt | 10 +- 2 files changed, 205 insertions(+), 20 deletions(-) diff --git a/services/bot-detector/bot_detector/bot_detector.py b/services/bot-detector/bot_detector/bot_detector.py index 8841b6c..ff22337 100644 --- a/services/bot-detector/bot_detector/bot_detector.py +++ b/services/bot-detector/bot_detector/bot_detector.py @@ -23,6 +23,12 @@ import clickhouse_connect from logging.handlers import RotatingFileHandler from http.server import HTTPServer, BaseHTTPRequestHandler from sklearn.ensemble import IsolationForest +try: + import hdbscan as _hdbscan + HDBSCAN_AVAILABLE = True +except ImportError: + HDBSCAN_AVAILABLE = False + from sklearn.cluster import DBSCAN from sklearn.preprocessing import StandardScaler import warnings @@ -60,6 +66,12 @@ def _require_float(name, default, lo=None, hi=None): DB = os.getenv('CLICKHOUSE_DB_PROCESSING', os.getenv('CLICKHOUSE_DB', 'ja4_processing')) DB_LOGS = os.getenv('CLICKHOUSE_DB_LOGS', 'ja4_logs') +import re +_SAFE_IDENTIFIER_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') +for _db_name, _db_val in [('CLICKHOUSE_DB_PROCESSING', DB), ('CLICKHOUSE_DB_LOGS', DB_LOGS)]: + if not _SAFE_IDENTIFIER_RE.match(_db_val): + raise SystemExit(f"[CONFIG] {_db_name}={_db_val!r} invalide — doit être un identifiant SQL valide.") + CONTAMINATION = _require_float('ISOLATION_CONTAMINATION', 0.001, 0, 0.5) ANOMALY_THRESHOLD = _require_float('ANOMALY_THRESHOLD', -0.05) LOG_FILE = os.getenv('BOT_DETECTOR_LOG', '/var/log/bot_detector/decisions.jsonl') @@ -81,6 +93,8 @@ ANOMALY_PERCENTILE = int(os.getenv('ANOMALY_PERCENTILE', '5')) # A3 — Analyse multi-fenêtres ENABLE_MULTIWINDOW = os.getenv('ENABLE_MULTIWINDOW', 'false').lower() == 'true' MULTIWINDOW_VIEW = os.getenv('MULTIWINDOW_VIEW', 'view_ai_features_24h') +if not _SAFE_IDENTIFIER_RE.match(MULTIWINDOW_VIEW): + raise SystemExit(f"[CONFIG] MULTIWINDOW_VIEW={MULTIWINDOW_VIEW!r} invalide — doit être un identifiant SQL valide.") # A4 — Explainabilité SHAP ENABLE_SHAP = SHAP_AVAILABLE and os.getenv('ENABLE_SHAP', 'true').lower() == 'true' @@ -107,7 +121,9 @@ STRUCTURAL_EXCLUDED_FEATURES: dict[str, list] = { # B features TLS/TCP : indisponibles pour trafic non-corrélé 'ja3_diversity_ratio', 'syn_timing_cv', 'tls12_ratio', 'ip_df_variance', # L4 uniquement : TTL et window scale indisponibles sans capture TCP - 'avg_ttl', 'ttl_std', 'no_window_scale_ratio'], + 'avg_ttl', 'ttl_std', 'no_window_scale_ratio', + # §5.5 JA4 Drift nécessite une corrélation fiable pour le JA4 + 'ja4_drift_ratio'], } TRAINING_HISTORY_FILE = os.path.join(MODEL_DIR, 'training_history.jsonl') @@ -287,9 +303,22 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, version_id = datetime.now().strftime('%Y%m%d_%H%M%S') log_info(f"[{name}] Entraînement version {version_id} sur {len(human_baseline)} sessions humaines... (contamination={CONTAMINATION})") - X = human_baseline[features].replace([np.inf, -np.inf], np.nan).fillna(0) + X = human_baseline[features].replace([np.inf, -np.inf], np.nan) + X = X.fillna(X.median()) + + # Validation split : réserver 20% pour évaluation offline + val_size = max(1, int(len(X) * 0.2)) + X_train = X.iloc[:-val_size] + X_val = X.iloc[-val_size:] + model = IsolationForest(n_estimators=300, contamination=CONTAMINATION, random_state=42, n_jobs=-1) - model.fit(X) + model.fit(X_train) + + # Évaluation offline : score moyen sur la validation (devrait être > 0 pour du trafic humain) + val_scores = model.decision_function(X_val) + val_mean_score = float(np.mean(val_scores)) + val_anomaly_rate = float(np.mean(val_scores < 0)) + log_info(f"[{name}] Validation : score moyen={val_mean_score:.4f}, taux anomalie={val_anomaly_rate:.2%} ({len(X_val)} échantillons)") # A1 — Sauvegarder les statistiques de distribution de la baseline pour la détection de dérive future baseline_stats = { @@ -307,7 +336,12 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, 'human_samples': len(human_baseline), 'contamination': CONTAMINATION, 'threshold': ANOMALY_THRESHOLD, 'features': features, 'model_name': name, 'previous_version': previous_version, - 'retrain_interval': RETRAIN_INTERVAL_H, 'baseline_stats': baseline_stats + 'retrain_interval': RETRAIN_INTERVAL_H, 'baseline_stats': baseline_stats, + 'validation': { + 'val_size': len(X_val), 'train_size': len(X_train), + 'val_mean_score': round(val_mean_score, 4), + 'val_anomaly_rate': round(val_anomaly_rate, 4), + } } with open(new_meta_path, 'w') as f: json.dump(new_meta, f, indent=2) with open(_current_pointer_path(name), 'w') as f: f.write(version_id) @@ -327,11 +361,47 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, # A1 — DÉTECTION DE DÉRIVE CONCEPTUELLE (CONCEPT DRIFT) # ═══════════════════════════════════════════════════════════════════════════════ def _compute_drift_score(baseline_stats: dict, current_baseline: pd.DataFrame, features: list) -> float: - """ - Compare la distribution actuelle de la baseline humaine avec celle utilisée à l'entraînement. - Utilise un test de Kolmogorov-Smirnov par feature. Retourne la fraction de features déroutantes. + """Compare la distribution actuelle de la baseline humaine avec celle de l'entraînement. + + Utilise le test de Kolmogorov-Smirnov bilatéral par feature pour détecter + les changements de distribution (forme, moyenne, variance). + Retourne la fraction de features en dérive significative (p < 0.05). Une valeur >= DRIFT_THRESHOLD déclenche un retraining forcé. """ + if not baseline_stats or current_baseline.empty: + return 0.0 + try: + from scipy.stats import ks_2samp + except ImportError: + # Fallback Z-score si scipy indisponible + return _compute_drift_score_zscore(baseline_stats, current_baseline, features) + + drifted = 0 + tested = 0 + for feat in features: + if feat not in baseline_stats or feat not in current_baseline.columns: + continue + stats = baseline_stats[feat] + curr_values = current_baseline[feat].dropna() + if len(curr_values) < 30: + continue + # Reconstruire un échantillon synthétique de la distribution d'entraînement + # à partir des statistiques sauvegardées (mean, std, p25, p75) + trained_std = stats.get('std', 0) + if trained_std < 1e-9: + continue + # Générer un échantillon normal avec les mêmes paramètres + rng = np.random.default_rng(42) + synthetic_trained = rng.normal(stats['mean'], trained_std, size=len(curr_values)) + _, p_value = ks_2samp(curr_values.values, synthetic_trained) + if p_value < 0.05: + drifted += 1 + tested += 1 + return drifted / max(tested, 1) + + +def _compute_drift_score_zscore(baseline_stats: dict, current_baseline: pd.DataFrame, features: list) -> float: + """Fallback Z-score pour la détection de dérive quand scipy n'est pas disponible.""" if not baseline_stats or current_baseline.empty: return 0.0 drifted = 0 @@ -344,9 +414,7 @@ def _compute_drift_score(baseline_stats: dict, current_baseline: pd.DataFrame, f trained_std = stats.get('std', 0) if trained_std < 1e-9: continue - # Z-score : écart entre la moyenne actuelle et celle de l'entraînement z = abs(curr_mean - stats['mean']) / trained_std - # Un z > 2 indique une dérive significative de la distribution if z > 2.0: drifted += 1 tested += 1 @@ -494,8 +562,10 @@ def _build_reason(name: str, row: pd.Series, shap_top: dict) -> str: # A8 — CLUSTERING COMPORTEMENTAL DES ANOMALIES (DBSCAN) # ═══════════════════════════════════════════════════════════════════════════════ def _cluster_anomalies(anomalies: pd.DataFrame, features: list) -> pd.DataFrame: - """ - A8 : Applique DBSCAN sur les features normalisées des anomalies. + """A8 : Applique HDBSCAN (ou DBSCAN en fallback) sur les features normalisées des anomalies. + + HDBSCAN est préféré car il détermine automatiquement le nombre de clusters + et la densité optimale (pas de paramètre eps à régler manuellement). Ajoute une colonne campaign_id : −1 = IP isolée, ≥0 = identifiant de campagne coordonnée. """ anomalies = anomalies.copy() @@ -505,13 +575,22 @@ def _cluster_anomalies(anomalies: pd.DataFrame, features: list) -> pd.DataFrame: try: X = anomalies[features].replace([np.inf, -np.inf], np.nan).fillna(0) X_scaled = StandardScaler().fit_transform(X) - labels = DBSCAN(eps=0.5, min_samples=CLUSTERING_MIN_SAMPLES).fit_predict(X_scaled) + if HDBSCAN_AVAILABLE: + clusterer = _hdbscan.HDBSCAN( + min_cluster_size=CLUSTERING_MIN_SAMPLES, + min_samples=max(2, CLUSTERING_MIN_SAMPLES - 1), + cluster_selection_method='eom' + ) + labels = clusterer.fit_predict(X_scaled) + else: + labels = DBSCAN(eps=0.5, min_samples=CLUSTERING_MIN_SAMPLES).fit_predict(X_scaled) anomalies['campaign_id'] = labels n_campaigns = len(set(labels)) - (1 if -1 in labels else 0) + algo = 'HDBSCAN' if HDBSCAN_AVAILABLE else 'DBSCAN' if n_campaigns > 0: - log_info(f"[DBSCAN] {n_campaigns} campagne(s) détectée(s) parmi {len(anomalies)} anomalies.") + log_info(f"[{algo}] {n_campaigns} campagne(s) détectée(s) parmi {len(anomalies)} anomalies.") except Exception as e: - log_info(f"[DBSCAN] Erreur de clustering: {e}") + log_info(f"[Clustering] Erreur de clustering: {e}") anomalies['campaign_id'] = -1 return anomalies @@ -559,7 +638,8 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): model = load_or_train_model(name, human_baseline, valid_features, cycle_id) unknown_traffic = unknown_traffic.copy() - X_test = unknown_traffic[valid_features].replace([np.inf, -np.inf], np.nan).fillna(0) + X_test = unknown_traffic[valid_features].replace([np.inf, -np.inf], np.nan) + X_test = X_test.fillna(X_test.median()) raw_scores = model.decision_function(X_test) # raw_anomaly_score : score brut IF pour comparaison au seuil et assignation du threat_level @@ -700,6 +780,46 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): return threats, all_scored +# ═══════════════════════════════════════════════════════════════════════════════ +# A5 — DÉDUPLICATION INTER-CYCLES AVEC TTL +# ═══════════════════════════════════════════════════════════════════════════════ + +# ═══════════════════════════════════════════════════════════════════════════════ +# FEEDBACK LOOP — Intégration des classifications SOC dans la baseline +# ═══════════════════════════════════════════════════════════════════════════════ +ENABLE_FEEDBACK = os.getenv('ENABLE_FEEDBACK', 'true').lower() == 'true' +FEEDBACK_WINDOW_DAYS = int(os.getenv('FEEDBACK_WINDOW_DAYS', '7')) + +def _load_soc_feedback(client) -> dict: + """Charge les classifications SOC récentes pour ajuster la baseline. + + Retourne un dict {src_ip: classification} où classification est + 'true_positive', 'false_positive', 'suspicious', etc. + Les faux positifs sont exclus du scoring (considérés comme humains), + les vrais positifs sont exclus de la baseline humaine. + """ + if not ENABLE_FEEDBACK: + return {} + try: + feedback_df = client.query_df( + f"SELECT entity_id AS src_ip, " + f" argMax(JSONExtractString(details, 'classification'), timestamp) AS classification " + f"FROM {DB}.audit_logs " + f"WHERE action = 'create_classification' " + f" AND entity_type = 'ip' " + f" AND timestamp >= now() - INTERVAL {FEEDBACK_WINDOW_DAYS} DAY " + f"GROUP BY entity_id" + ) + if feedback_df is None or feedback_df.empty: + return {} + result = dict(zip(feedback_df['src_ip'], feedback_df['classification'])) + log_info(f"[Feedback] {len(result)} classification(s) SOC chargées ({FEEDBACK_WINDOW_DAYS}j).") + return result + except Exception as e: + log_info(f"[Feedback] Impossible de charger les classifications SOC : {e}") + return {} + + # ═══════════════════════════════════════════════════════════════════════════════ # A5 — DÉDUPLICATION INTER-CYCLES AVEC TTL # ═══════════════════════════════════════════════════════════════════════════════ @@ -748,7 +868,6 @@ def _preprocess_df(df: pd.DataFrame) -> pd.DataFrame: 'asn_number', 'asn_org', 'asn_detail', 'asn_domain', 'country_code', 'asn_label']: if col in df.columns: df[col] = df[col].fillna('').astype(str) - df.fillna(0, inplace=True) # ── Features numériques dérivées des labels Anubis (pour IsolationForest) ── # anubis_is_flagged : 1 si le trafic est marqué WEIGH/CHALLENGE par Anubis @@ -758,6 +877,20 @@ def _preprocess_df(df: pd.DataFrame) -> pd.DataFrame: (~df.get('anubis_bot_action', pd.Series('', index=df.index)).isin(['ALLOW', 'DENY', ''])) ).astype(int) + # ── Imputation intelligente des valeurs manquantes ── + # Les features binaires (0/1) et les ratios utilisent -1 comme sentinelle pour "inconnu" + # afin de ne pas biaiser le modèle (0 = valeur légitime pour beaucoup de features). + binary_features = { + 'has_accept_language', 'has_cookie', 'has_referer', 'ua_ch_mismatch', + 'is_ua_rotating', 'is_alpn_missing', 'sni_host_mismatch', 'alpn_http_mismatch', + 'mss_mobile_mismatch', 'anubis_is_flagged', 'is_rare_ja4', + } + for col in df.columns: + if col in binary_features: + df[col] = df[col].fillna(-1) + elif df[col].dtype in ('float64', 'float32', 'int64', 'int32', 'uint64', 'uint32'): + df[col] = df[col].replace([np.inf, -np.inf], np.nan).fillna(df[col].median()) + return df @@ -773,6 +906,7 @@ def fetch_and_analyze(): insère les scores dans ml_all_scores et les anomalies dans ml_detected_anomalies. Met à jour _service_healthy et _consecutive_failures en cas d'échec de requête. """ + global _consecutive_failures, _service_healthy cycle_id = datetime.now().strftime('%Y%m%d_%H%M%S') log_info('=== Lancement cycle IA ===') @@ -796,6 +930,26 @@ def fetch_and_analyze(): log_info('Aucun trafic trouvé.') return + # ── Enrichissement avec les features avancées de la thèse §5 ───────────── + try: + df_thesis = client.query_df(f'SELECT * FROM {DB}.view_thesis_features_1h') + if df_thesis is not None and not df_thesis.empty: + df_thesis.columns = [c.split('.')[-1] for c in df_thesis.columns] + df.columns = [c.split('.')[-1] for c in df.columns] + thesis_cols = [c for c in df_thesis.columns if c not in ('window_start', 'src_ip', 'ja4', 'host')] + df = df.merge( + df_thesis, on=['window_start', 'src_ip', 'ja4', 'host'], + how='left', suffixes=('', '_thesis') + ) + for col in thesis_cols: + if col in df.columns: + df[col] = df[col].fillna(df[col].median() if df[col].notna().any() else 0) + log_info(f'[Thèse §5] {len(df_thesis)} sessions enrichies avec {len(thesis_cols)} features avancées.') + else: + log_info('[Thèse §5] view_thesis_features_1h vide — features avancées indisponibles.') + except Exception as e: + log_info(f'[Thèse §5] view_thesis_features_1h inaccessible : {e} — features avancées ignorées.') + df = _preprocess_df(df) log_decision('CYCLE_START', cycle_id, '', { @@ -815,6 +969,26 @@ def fetch_and_analyze(): except Exception: recurrence_map = {} + # ── Feedback SOC : ajuster la baseline selon les classifications humaines ─ + soc_feedback = _load_soc_feedback(client) + if soc_feedback: + fp_ips = {ip for ip, cls in soc_feedback.items() if cls in ('false_positive', 'legitimate')} + tp_ips = {ip for ip, cls in soc_feedback.items() if cls in ('true_positive', 'malicious', 'bot')} + if fp_ips: + # Les faux positifs confirmés rejoignent le pool humain + mask_fp = df['src_ip'].isin(fp_ips) & (df.get('asn_label', pd.Series(dtype=str)) != 'human') + df.loc[mask_fp, 'asn_label'] = 'human' + log_info(f"[Feedback] {mask_fp.sum()} lignes reclassées 'human' (FP confirmés).") + if tp_ips: + # Les vrais positifs confirmés sont exclus de la baseline humaine + mask_tp = df['src_ip'].isin(tp_ips) & (df.get('asn_label', pd.Series(dtype=str)) == 'human') + df.loc[mask_tp, 'asn_label'] = 'soc_confirmed_bot' + log_info(f"[Feedback] {mask_tp.sum()} lignes exclues de la baseline humaine (TP confirmés).") + log_decision('SOC_FEEDBACK', cycle_id, '', { + 'fp_ips': len(fp_ips), 'tp_ips': len(tp_ips), + 'total_classifications': len(soc_feedback), + }) + # ── Features par modèle (voir DOCUMENTATION.md §4) ─────────────────────── # Features communes aux deux modèles (L7 HTTP pur, disponibles correlated=0 et 1) feats = [ @@ -833,6 +1007,13 @@ def fetch_and_analyze(): 'anubis_is_flagged', # HTTP : header incomplet et usage HTTP plain (disponibles pour les deux modèles) 'missing_accept_enc_ratio', 'http_scheme_ratio', + # ── Thèse §5 : features avancées (optionnelles — ignorées si indisponibles) ── + # §5.1 — Entropie de séquence de chemins + 'path_transition_entropy', + # §5.3 — Cadence inter-requêtes + 'cadence_cv', 'burst_ratio', 'pause_ratio', + # §5.8 — Cross-domain (par IP, sans décomposition host) + 'host_diversity', 'host_sweep_speed', 'host_coverage_uniformity', ] # Features supplémentaires pour le modèle Complet (nécessitent des données TCP/TLS) feats_complet = feats + [ @@ -841,6 +1022,8 @@ def fetch_and_analyze(): 'ja3_diversity_ratio', 'syn_timing_cv', 'tls12_ratio', 'ip_df_variance', # TTL fingerprinting OS + TCP window scale (L4 uniquement) 'avg_ttl', 'ttl_std', 'no_window_scale_ratio', + # §5.5 — Dérive JA4 intra-session (nécessite corrélation pour JA4 fiable) + 'ja4_drift_ratio', ] # ── Analyse fenêtre 1h ──────────────────────────────────────────────────── diff --git a/services/bot-detector/bot_detector/requirements.txt b/services/bot-detector/bot_detector/requirements.txt index 356e79e..c2742a7 100644 --- a/services/bot-detector/bot_detector/requirements.txt +++ b/services/bot-detector/bot_detector/requirements.txt @@ -1,6 +1,8 @@ -clickhouse-connect==0.8.0 -pandas==2.2.0 -scikit-learn==1.4.0 -shap==0.44.1 +clickhouse-connect==0.8.12 +pandas==2.2.3 +scikit-learn==1.6.1 +shap==0.47.2 +scipy>=1.14 +hdbscan>=0.8.38 pyyaml>=6.0 ja4-common @ file:///app/shared/ja4_common