diff --git a/services/bot-detector/bot_detector/bot_detector.py b/services/bot-detector/bot_detector/bot_detector.py deleted file mode 100644 index cca0e49..0000000 --- a/services/bot-detector/bot_detector/bot_detector.py +++ /dev/null @@ -1,1759 +0,0 @@ -"""Détecteur de bots par apprentissage automatique semi-supervisé (Extended Isolation Forest + Ensemble). - -Ce module implémente le cycle de détection IA du service bot_detector : - - chargement et retraining automatique du modèle Extended Isolation Forest, - - scoring, normalisation et classification du trafic (fenêtre 1h / 24h), - - intégration des règles Anubis (ALLOW / DENY / WEIGH), - - clustering comportemental HDBSCAN, déduplication inter-cycles, - - explainabilité SHAP, détection de dérive conceptuelle, - - boucle de feedback SOC, élagage dynamique des features, - - écriture des résultats dans ClickHouse (ml_detected_anomalies, ml_all_scores). -""" -import time -import os -import json -import glob -import signal -import sys -import logging -import threading -import joblib -import pandas as pd -import numpy as np -import clickhouse_connect -from logging.handlers import RotatingFileHandler -from http.server import HTTPServer, BaseHTTPRequestHandler - -# Extended Isolation Forest (Hariri et al., IEEE TKDE 2021) -# Élimine les artefacts de score axis-aligned de l'IF standard dans les espaces >10 dimensions -try: - from isotree import IsolationForest as ExtendedIsolationForest - EIF_AVAILABLE = True -except ImportError: - EIF_AVAILABLE = False - -from sklearn.ensemble import IsolationForest - -try: - import hdbscan as _hdbscan - HDBSCAN_AVAILABLE = True -except ImportError: - HDBSCAN_AVAILABLE = False - -from sklearn.cluster import DBSCAN -from sklearn.preprocessing import StandardScaler -import warnings -from datetime import datetime - -try: - import shap as _shap - SHAP_AVAILABLE = True -except ImportError: - SHAP_AVAILABLE = False - -# PyTorch Autoencoder (Baptiste et al., arXiv 2602; Kitsune, Mirsky et al., NDSS 2018) -# Second scorer parallèle : détecte les anomalies de reconstruction que l'IF manque -try: - import torch - import torch.nn as nn - TORCH_AVAILABLE = True -except ImportError: - TORCH_AVAILABLE = False - -# XGBoost supervisé (Osama et al. 2025, Chen & Guestrin 2016) -# Troisième voix de l'ensemble : classificateur sur labels historiques + feedback SOC -try: - import xgboost as xgb - XGB_AVAILABLE = True -except ImportError: - XGB_AVAILABLE = False - -warnings.filterwarnings('ignore') - -# ═══════════════════════════════════════════════════════════════════════════════ -# CONFIGURATION -# ═══════════════════════════════════════════════════════════════════════════════ -def _require_float(name, default, lo=None, hi=None): - """Lit une variable d'environnement comme flottant et valide la plage si spécifiée. - - Lève SystemExit si la valeur est non numérique ou hors plage (lo, hi) exclusive. - """ - raw = os.getenv(name, str(default)) - try: - v = float(raw) - except ValueError: - raise SystemExit(f"[CONFIG] {name}={raw!r} invalide — doit être un nombre décimal.") - if lo is not None and not (lo < v < hi): - raise SystemExit(f"[CONFIG] {name}={v} hors plage ({lo} < valeur < {hi}).") - return v - -# Nom de la base de données ClickHouse -# Note : Utilisé dans des requêtes SQL via f-string (ex: f'SELECT * FROM {DB}.view_ai_features_1h') -# Cette variable provient uniquement de variables d'environnement contrôlées (docker-compose, K8s, etc.) -# et n'est jamais exposée à des entrées utilisateur. Le risque d'injection SQL est considéré comme négligeable. -DB = os.getenv('CLICKHOUSE_DB_PROCESSING', os.getenv('CLICKHOUSE_DB', 'ja4_processing')) -DB_LOGS = os.getenv('CLICKHOUSE_DB_LOGS', 'ja4_logs') - -import re -_SAFE_IDENTIFIER_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') -for _db_name, _db_val in [('CLICKHOUSE_DB_PROCESSING', DB), ('CLICKHOUSE_DB_LOGS', DB_LOGS)]: - if not _SAFE_IDENTIFIER_RE.match(_db_val): - raise SystemExit(f"[CONFIG] {_db_name}={_db_val!r} invalide — doit être un identifiant SQL valide.") - -CONTAMINATION = _require_float('ISOLATION_CONTAMINATION', 0.001, 0, 0.5) -ANOMALY_THRESHOLD = _require_float('ANOMALY_THRESHOLD', -0.05) -LOG_FILE = os.getenv('BOT_DETECTOR_LOG', '/var/log/bot_detector/decisions.jsonl') -LOG_BACKUP_COUNT = int(os.getenv('LOG_BACKUP_COUNT', '7')) -MODEL_DIR = os.getenv('MODEL_DIR', '/var/lib/bot_detector') -RETRAIN_INTERVAL_H = int(os.getenv('RETRAIN_INTERVAL_HOURS', '24')) -MODEL_HISTORY_COUNT = int(os.getenv('MODEL_HISTORY_COUNT', '10')) -MAX_FAILURES = int(os.getenv('MAX_CONSECUTIVE_FAILURES', '3')) -HEALTH_PORT = int(os.getenv('HEALTH_PORT', '8080')) -CYCLE_INTERVAL = int(os.getenv('CYCLE_INTERVAL_SEC', '300')) - -# ── Améliorations A1 / A2 / A3 / A4 / A5 / A6 / A7 / A8 / A10 ────────────── -# A1 — Dérive conceptuelle (concept drift) -DRIFT_THRESHOLD = _require_float('DRIFT_THRESHOLD', 0.30, 0, 1) - -# A2 — Seuil adaptatif -ANOMALY_PERCENTILE = int(os.getenv('ANOMALY_PERCENTILE', '5')) - -# A3 — Analyse multi-fenêtres -ENABLE_MULTIWINDOW = os.getenv('ENABLE_MULTIWINDOW', 'false').lower() == 'true' -MULTIWINDOW_VIEW = os.getenv('MULTIWINDOW_VIEW', 'view_ai_features_24h') -if not _SAFE_IDENTIFIER_RE.match(MULTIWINDOW_VIEW): - raise SystemExit(f"[CONFIG] MULTIWINDOW_VIEW={MULTIWINDOW_VIEW!r} invalide — doit être un identifiant SQL valide.") - -# A4 — Explainabilité SHAP -ENABLE_SHAP = SHAP_AVAILABLE and os.getenv('ENABLE_SHAP', 'true').lower() == 'true' - -# A5 — Déduplication inter-cycles avec TTL -DEDUP_TTL_MIN = int(os.getenv('DEDUP_TTL_MIN', '60')) - -# A6 — Pondération par récurrence -RECURRENCE_WEIGHT = _require_float('RECURRENCE_WEIGHT', 0.005) - -# A7 — Validation de complétude des features -MIN_VALID_FEATURE_RATIO = _require_float('MIN_VALID_FEATURE_RATIO', 0.50, 0, 1) - -# A8 — Clustering comportemental des anomalies -ENABLE_CLUSTERING = os.getenv('ENABLE_CLUSTERING', 'true').lower() == 'true' -CLUSTERING_MIN_SAMPLES = int(os.getenv('CLUSTERING_MIN_SAMPLES', '3')) - -# Autoencoder — Second scorer parallèle (Baptiste et al. 2026, Kitsune / Mirsky et al. NDSS 2018) -AE_WEIGHT = _require_float('AE_WEIGHT', 0.30, 0, 1) # pondération dans le score combiné (0 = IF seul) -AE_EPOCHS = int(os.getenv('AE_EPOCHS', '50')) -AE_LATENT_DIM = int(os.getenv('AE_LATENT_DIM', '16')) -AE_LEARNING_RATE = float(os.getenv('AE_LEARNING_RATE', '1e-3')) - -# XGBoost — Troisième voix supervisée (Chen & Guestrin 2016, Osama et al. 2025) -XGB_WEIGHT = _require_float('XGB_WEIGHT', 0.20, 0, 1) # pondération dans le score combiné (0 = désactivé) -XGB_MIN_LABELS = int(os.getenv('XGB_MIN_LABELS', '100')) # nb minimum de labels historiques pour entraîner -XGB_RETRAIN_INTERVAL_H = int(os.getenv('XGB_RETRAIN_INTERVAL_HOURS', '168')) # retrain hebdomadaire - -# A9 — Classification navigateurs légitimes (JA4 + cohérence comportementale) -# Seuil minimal de browser_consistency_score [0..5] pour classer LEGITIMATE_BROWSER. -# 4/5 = navigateur reconnu + ≥3 signaux comportementaux (sec-ch-ua, cookies, Accept-Language, Sec-Fetch). -BROWSER_LEGIT_MIN_CONSISTENCY = int(os.getenv('BROWSER_LEGIT_MIN_CONSISTENCY', '4')) - -# Features structurellement indisponibles par modèle (pas de données L4 pour trafic non-corrélé) -# Ces features ne génèrent pas de warnings "pipeline" — leur absence est by-design. -STRUCTURAL_EXCLUDED_FEATURES: dict[str, list] = { - 'Complet': ['orphan_ratio'], - 'Applicatif': ['orphan_ratio', 'is_rare_ja4', 'tcp_shared_count', - 'request_size_variance', 'mss_mobile_mismatch', - # B features TLS/TCP : indisponibles pour trafic non-corrélé - 'ja3_diversity_ratio', 'syn_timing_cv', 'tls12_ratio', 'ip_df_variance', - # L4 uniquement : TTL et window scale indisponibles sans capture TCP - 'avg_ttl', 'ttl_std', 'no_window_scale_ratio', - # §5.5 JA4 Drift nécessite une corrélation fiable pour le JA4 - 'ja4_drift_ratio'], -} - -TRAINING_HISTORY_FILE = os.path.join(MODEL_DIR, 'training_history.jsonl') - -# ═══════════════════════════════════════════════════════════════════════════════ -# LOGGING -# ═══════════════════════════════════════════════════════════════════════════════ -os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) -os.makedirs(MODEL_DIR, exist_ok=True) - -logger = logging.getLogger('bot_detector') -logger.setLevel(logging.DEBUG) - -_console_handler = logging.StreamHandler() -_console_handler.setFormatter(logging.Formatter('[%(asctime)s] %(message)s', '%Y-%m-%d %H:%M:%S')) -logger.addHandler(_console_handler) - -_file_handler = RotatingFileHandler( - LOG_FILE, maxBytes=50 * 1024 * 1024, backupCount=LOG_BACKUP_COUNT, encoding='utf-8' -) -_file_handler.setFormatter(logging.Formatter('%(message)s')) -logger.addHandler(_file_handler) - -# Wrapper court pour homogénéiser les appels de logging (évite d'importer logger partout). -def log_info(message: str): - """Enregistre un message de niveau INFO dans le logger du service.""" - logger.info(message) - -def log_decision(event: str, cycle_id: str, model: str = '', row: dict = None): - """Enregistre un événement de décision IA au format JSONL dans le fichier de log rotatif. - - Chaque ligne contient l'horodatage, le cycle_id, l'événement, le modèle, - la contamination, le seuil et les données supplémentaires de ``row``. - """ - entry = { - 'ts': datetime.now().strftime('%Y-%m-%dT%H:%M:%S'), - 'cycle_id': cycle_id, - 'event': event, - 'model': model, - 'contamination': CONTAMINATION, - 'threshold': ANOMALY_THRESHOLD, - } - if row: - entry.update(row) - _file_handler.stream.write(json.dumps(entry, ensure_ascii=False, default=str) + '\n') - _file_handler.stream.flush() - -def _append_training_history(entry: dict): - """Ajoute une entrée de métadonnées d'entraînement au fichier d'historique JSONL.""" - with open(TRAINING_HISTORY_FILE, 'a', encoding='utf-8') as f: - f.write(json.dumps(entry, ensure_ascii=False, default=str) + '\n') - -# ═══════════════════════════════════════════════════════════════════════════════ -# ARRÊT PROPRE ET HEALTH CHECK -# ═══════════════════════════════════════════════════════════════════════════════ -def _shutdown(sig, frame): - """Gestionnaire de signal SIGTERM/SIGINT : journalise l'arrêt et quitte proprement.""" - log_info(f"Signal {sig} reçu — arrêt propre.") - log_decision('SERVICE_STOP', 'shutdown', '', {'signal': sig}) - sys.exit(0) - -signal.signal(signal.SIGTERM, _shutdown) -signal.signal(signal.SIGINT, _shutdown) - -_service_healthy = True -_health_lock = threading.Lock() - -class _HealthHandler(BaseHTTPRequestHandler): - """Gestionnaire HTTP minimal pour le point de santé du service. - - Répond 200/OK si le service est sain, 503/DEGRADED dans le cas contraire. - """ - - def do_GET(self): - """Répond à la requête GET : renvoie 200 OK ou 503 DEGRADED selon l'état du service.""" - with _health_lock: - healthy = _service_healthy - code = 200 if healthy else 503 - self.send_response(code) - self.end_headers() - self.wfile.write(b'OK' if healthy else b'DEGRADED') - def log_message(self, *args): - """Supprime les logs HTTP internes pour ne pas polluer la sortie standard.""" - pass - -threading.Thread( - target=lambda: HTTPServer(('', HEALTH_PORT), _HealthHandler).serve_forever(), - daemon=True -).start() - -# ═══════════════════════════════════════════════════════════════════════════════ -# CONNEXION CLICKHOUSE — delegated to ja4_common shared client -# ═══════════════════════════════════════════════════════════════════════════════ -from ja4_common.clickhouse import get_client as _ja4_get_client - -def get_client(): - """Return the shared ja4_common ClickHouse client, reconnecting on ping failure.""" - return _ja4_get_client().connect() - -def score_to_threat_level(score: float) -> str: - """Convertit un score d'anomalie brut IsolationForest en niveau de menace textuel. - - Seuils : CRITICAL < −0.30 | HIGH < −0.15 | MEDIUM < −0.05 | LOW < 0 | NORMAL ≥ 0. - """ - if score < -0.30: return 'CRITICAL' - if score < -0.15: return 'HIGH' - if score < -0.05: return 'MEDIUM' - if score < 0: return 'LOW' - return 'NORMAL' - -# ═══════════════════════════════════════════════════════════════════════════════ -# GESTION DES MODÈLES -# ═══════════════════════════════════════════════════════════════════════════════ -def _current_pointer_path(name: str) -> str: - """Retourne le chemin du fichier pointeur vers la version courante du modèle ``name``.""" - return os.path.join(MODEL_DIR, f'model_{name}.current') - -def _get_current_version(name: str): - """Lit le fichier pointeur et retourne (chemin_modèle, métadonnées) ou (None, None) si absent.""" - pointer = _current_pointer_path(name) - if not os.path.exists(pointer): return None, None - with open(pointer) as f: version_id = f.read().strip() - model_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.joblib') - meta_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.meta.json') - if not os.path.exists(model_path) or not os.path.exists(meta_path): return None, None - with open(meta_path) as f: meta = json.load(f) - return model_path, meta - -def _purge_old_versions(name: str): - """Supprime les versions excédentaires du modèle ``name`` en ne conservant que MODEL_HISTORY_COUNT fichiers.""" - pattern = os.path.join(MODEL_DIR, f'model_{name}_*.joblib') - versions = sorted(glob.glob(pattern)) - to_delete = versions[:-MODEL_HISTORY_COUNT] if len(versions) > MODEL_HISTORY_COUNT else [] - for joblib_path in to_delete: - version_id = os.path.basename(joblib_path).replace(f'model_{name}_', '').replace('.joblib', '') - meta_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.meta.json') - os.remove(joblib_path) - if os.path.exists(meta_path): os.remove(meta_path) - log_info(f"[{name}] Version purgée : {version_id} (limite={MODEL_HISTORY_COUNT})") - -# ═══════════════════════════════════════════════════════════════════════════════ -# AUTOENCODER — Second scorer parallèle (détection d'anomalies par reconstruction) -# ═══════════════════════════════════════════════════════════════════════════════ -class TrafficAutoEncoder: - """Autoencoder symétrique pour détection d'anomalies par erreur de reconstruction. - - Architecture : encoder (n→64→32→latent_dim) — decoder (latent_dim→32→64→n) - Activation : ReLU + BatchNorm (encoder/decoder), sigmoid (sortie — données normalisées [0,1]) - Score = MSE(input, reconstruction) par échantillon. - L'espace latent (16-dim par défaut) peut servir de features compressées pour HDBSCAN. - """ - - def __init__(self, n_features: int, latent_dim: int = AE_LATENT_DIM): - if not TORCH_AVAILABLE: - raise RuntimeError("PyTorch non disponible — autoencoder désactivé.") - self.n_features = n_features - self.latent_dim = latent_dim - self.device = torch.device('cpu') - self._build_model() - self._scaler_min = None - self._scaler_range = None - - def _build_model(self): - dim1 = min(64, max(self.n_features, self.latent_dim + 4)) - dim2 = min(32, max(dim1 // 2, self.latent_dim + 2)) - self.encoder = nn.Sequential( - nn.Linear(self.n_features, dim1), nn.BatchNorm1d(dim1), nn.ReLU(), - nn.Linear(dim1, dim2), nn.BatchNorm1d(dim2), nn.ReLU(), - nn.Linear(dim2, self.latent_dim), - ).to(self.device) - self.decoder = nn.Sequential( - nn.Linear(self.latent_dim, dim2), nn.BatchNorm1d(dim2), nn.ReLU(), - nn.Linear(dim2, dim1), nn.BatchNorm1d(dim1), nn.ReLU(), - nn.Linear(dim1, self.n_features), nn.Sigmoid(), - ).to(self.device) - self._all_params = list(self.encoder.parameters()) + list(self.decoder.parameters()) - - def _to_tensor(self, X: np.ndarray) -> torch.Tensor: - """Normalise [0,1] via min-max puis convertit en Tensor.""" - if self._scaler_min is not None: - X_norm = (X - self._scaler_min) / (self._scaler_range + 1e-9) - else: - X_norm = X - return torch.tensor(np.clip(X_norm, 0, 1), dtype=torch.float32, device=self.device) - - def fit(self, X: np.ndarray, epochs: int = AE_EPOCHS, lr: float = AE_LEARNING_RATE, - batch_size: int = 256) -> dict: - """Entraîne l'autoencoder sur la baseline humaine (données normales).""" - self._scaler_min = X.min(axis=0) - self._scaler_range = X.max(axis=0) - self._scaler_min - X_t = self._to_tensor(X) - - dataset = torch.utils.data.TensorDataset(X_t) - loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True) - optimizer = torch.optim.Adam(self._all_params, lr=lr, weight_decay=1e-5) - criterion = nn.MSELoss() - - self.encoder.train() - self.decoder.train() - losses = [] - for epoch in range(epochs): - epoch_loss = 0.0 - for (batch,) in loader: - latent = self.encoder(batch) - recon = self.decoder(latent) - loss = criterion(recon, batch) - optimizer.zero_grad() - loss.backward() - optimizer.step() - epoch_loss += loss.item() * len(batch) - losses.append(epoch_loss / len(X_t)) - return {'final_loss': losses[-1], 'epochs': epochs, 'n_samples': len(X)} - - def score_samples(self, X: np.ndarray) -> np.ndarray: - """Retourne l'erreur de reconstruction MSE par échantillon (plus élevé = plus anomal).""" - self.encoder.eval() - self.decoder.eval() - X_t = self._to_tensor(X) - with torch.no_grad(): - latent = self.encoder(X_t) - recon = self.decoder(latent) - mse = ((recon - X_t) ** 2).mean(dim=1).numpy() - return mse - - def encode(self, X: np.ndarray) -> np.ndarray: - """Retourne l'espace latent (pour HDBSCAN clustering).""" - self.encoder.eval() - X_t = self._to_tensor(X) - with torch.no_grad(): - return self.encoder(X_t).numpy() - - def state_dict(self) -> dict: - return { - 'encoder': self.encoder.state_dict(), - 'decoder': self.decoder.state_dict(), - 'scaler_min': self._scaler_min, - 'scaler_range': self._scaler_range, - 'n_features': self.n_features, - 'latent_dim': self.latent_dim, - } - - @classmethod - def load_state_dict(cls, state: dict) -> 'TrafficAutoEncoder': - ae = cls(state['n_features'], state['latent_dim']) - ae._scaler_min = state['scaler_min'] - ae._scaler_range = state['scaler_range'] - ae.encoder.load_state_dict(state['encoder']) - ae.decoder.load_state_dict(state['decoder']) - return ae - - -def _ae_model_path(name: str, version_id: str) -> str: - return os.path.join(MODEL_DIR, f'ae_{name}_{version_id}.pt') - - -# ═══════════════════════════════════════════════════════════════════════════════ -# XGBOOST — Troisième voix supervisée (labels historiques + feedback SOC) -# ═══════════════════════════════════════════════════════════════════════════════ -def _xgb_model_path(name: str) -> str: - return os.path.join(MODEL_DIR, f'xgb_{name}.json') - - -def _xgb_meta_path(name: str) -> str: - return os.path.join(MODEL_DIR, f'xgb_{name}.meta.json') - - -def _load_xgb_labels(client, features: list, min_labels: int = XGB_MIN_LABELS) -> tuple: - """Charge les labels historiques depuis ml_all_scores + view_ai_features_1h. - - Les labels (threat_level) viennent de ml_all_scores, les features de - view_ai_features_1h via une jointure sur (src_ip, ja4, host). - Les features absentes de la vue (ex: thesis §5 features) sont ignorées. - - Positifs : threat_level IN ('HIGH', 'CRITICAL', 'ANUBIS_DENY', 'KNOWN_BOT') → label=1 - Négatifs : threat_level IN ('NORMAL', 'LEGITIMATE_BROWSER') → label=0 - Retourne (X, y, usable_features) ou (None, None, None) si insuffisant. - """ - try: - # Découvrir les colonnes disponibles dans la vue - cols_result = client.query( - f"SELECT name FROM system.columns " - f"WHERE database = '{DB}' AND table = 'view_ai_features_1h'" - ) - available_cols = {row[0] for row in cols_result.result_rows} if cols_result.result_rows else set() - usable_features = [f for f in features if f in available_cols] - if len(usable_features) < 10: - log_info(f"[XGB] Seulement {len(usable_features)} features disponibles dans view_ai_features_1h — insuffisant.") - return None, None, None - - feature_cols = ', '.join(f'f.{c}' for c in usable_features) - result = client.query( - f"SELECT {feature_cols}, s.threat_level " - f"FROM {DB}.ml_all_scores AS s " - f"INNER JOIN {DB}.view_ai_features_1h AS f " - f" ON s.src_ip = f.src_ip AND s.ja4 = f.ja4 AND s.host = f.host " - f"WHERE s.threat_level IN ('NORMAL', 'LEGITIMATE_BROWSER', 'HIGH', 'CRITICAL', 'ANUBIS_DENY', 'KNOWN_BOT') " - f"AND s.window_start >= now() - INTERVAL 7 DAY " - f"ORDER BY rand() LIMIT 50000" - ) - if not result.result_rows: - return None, None, None - cols = usable_features + ['threat_level'] - df = pd.DataFrame(result.result_rows, columns=cols) - df[usable_features] = df[usable_features].apply(pd.to_numeric, errors='coerce') - df = df.replace([np.inf, -np.inf], np.nan).dropna(subset=usable_features) - y = (~df['threat_level'].isin(['NORMAL', 'LEGITIMATE_BROWSER'])).astype(int) - if y.sum() < 10 or len(y) < min_labels: - return None, None, None - X = df[usable_features].values - return X, y.values, usable_features - except Exception as exc: - log_info(f"[XGB] Erreur chargement labels : {exc}") - return None, None, None - - -def load_or_train_xgb(name: str, client, features: list, cycle_id: str): - """Charge ou entraîne le modèle XGBoost supervisé. - - Retourne (XGBClassifier, list[str] features) ou (None, None) si indisponible. - """ - if not XGB_AVAILABLE or XGB_WEIGHT <= 0: - return None, None - - model_path = _xgb_model_path(name) - meta_path = _xgb_meta_path(name) - - # Charger le modèle existant si récent - if os.path.exists(model_path) and os.path.exists(meta_path): - try: - with open(meta_path) as f: - meta = json.load(f) - trained_at = datetime.fromisoformat(meta['trained_at']) - age_h = (datetime.now() - trained_at).total_seconds() / 3600 - if age_h < XGB_RETRAIN_INTERVAL_H: - model = xgb.XGBClassifier() - model.load_model(model_path) - log_info(f"[XGB][{name}] Modèle rechargé ({age_h:.1f}h / {XGB_RETRAIN_INTERVAL_H}h, {meta.get('n_labels', '?')} labels).") - return model, meta.get('features', features) - except Exception as exc: - log_info(f"[XGB][{name}] Erreur chargement : {exc}") - - # Entraîner un nouveau modèle - X, y, xgb_features = _load_xgb_labels(client, features) - if X is None: - log_info(f"[XGB][{name}] Labels insuffisants (< {XGB_MIN_LABELS}) — XGBoost désactivé ce cycle.") - # Tenter de réutiliser un modèle ancien - if os.path.exists(model_path) and os.path.exists(meta_path): - try: - model = xgb.XGBClassifier() - model.load_model(model_path) - with open(meta_path) as f: - meta = json.load(f) - return model, meta.get('features', features) - except Exception: - pass - return None, None - - scale_pos = max(1, int((y == 0).sum() / max((y == 1).sum(), 1))) - model = xgb.XGBClassifier( - n_estimators=200, max_depth=6, learning_rate=0.1, - scale_pos_weight=scale_pos, eval_metric='logloss', - random_state=42, n_jobs=-1, - tree_method='hist', - ) - model.fit(X, y, verbose=False) - - model.save_model(model_path) - meta = { - 'trained_at': datetime.now().isoformat(), - 'n_labels': len(y), 'n_positive': int(y.sum()), - 'n_negative': int((y == 0).sum()), 'n_features': len(xgb_features), - 'features': xgb_features, - 'scale_pos_weight': scale_pos, 'model_name': name, - } - with open(meta_path, 'w') as f: - json.dump(meta, f, indent=2) - - log_info(f"[XGB][{name}] Modèle entraîné : {len(y)} labels ({y.sum()} positifs), scale_pos_weight={scale_pos}") - log_decision('XGB_TRAINED', cycle_id, name, meta) - return model, xgb_features - - -def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, cycle_id: str): - """Charge le modèle IsolationForest existant ou en entraîne un nouveau si nécessaire. - - Réutilise le modèle si son âge est inférieur à RETRAIN_INTERVAL_H et si aucune - dérive conceptuelle significative n'est détectée (A1). En cas d'expiration ou de - dérive, entraîne un nouveau modèle sur ``human_baseline``, le sérialise sur disque, - met à jour le fichier pointeur et purge les anciennes versions. - - Retourne (IsolationForest, TrafficAutoEncoder|None, list[str] features). - """ - model_path, meta = _get_current_version(name) - if model_path and meta: - trained_at = datetime.fromisoformat(meta['trained_at']) - age_h = (datetime.now() - trained_at).total_seconds() / 3600 - age_ok = age_h < RETRAIN_INTERVAL_H - - # A1 — Dérive conceptuelle : comparer la distribution actuelle avec celle de l'entraînement - drift_score = 0.0 - drift_forced = False - if age_ok and 'baseline_stats' in meta: - drift_score = _compute_drift_score(meta['baseline_stats'], human_baseline, features) - if drift_score >= DRIFT_THRESHOLD: - drift_forced = True - log_info(f"[{name}] Dérive détectée ({drift_score:.0%} features) — retraining forcé.") - log_decision('DRIFT_DETECTED', cycle_id, name, { - 'version_id': meta['version_id'], 'drift_score': round(drift_score, 3), - 'drift_threshold': DRIFT_THRESHOLD, 'model_age_hours': round(age_h, 2) - }) - - if age_ok and not drift_forced: - log_info(f"[{name}] Modèle v{meta['version_id']} valide ({age_h:.1f}h / {RETRAIN_INTERVAL_H}h, drift={drift_score:.0%}) — réutilisation.") - log_decision('MODEL_LOADED', cycle_id, name, { - 'version_id': meta['version_id'], 'model_age_hours': round(age_h, 2), - 'trained_at': meta['trained_at'], 'human_samples': meta.get('human_samples', '?'), - 'retrain_in_hours': round(RETRAIN_INTERVAL_H - age_h, 1), 'drift_score': round(drift_score, 3) - }) - ae_loaded = None - if TORCH_AVAILABLE and AE_WEIGHT > 0: - ae_path = _ae_model_path(name, meta['version_id']) - if os.path.exists(ae_path): - try: - ae_loaded = TrafficAutoEncoder.load_state_dict(torch.load(ae_path, weights_only=False)) - log_info(f"[{name}] Autoencoder v{meta['version_id']} rechargé.") - except Exception as exc: - log_info(f"[{name}] Erreur chargement AE : {exc} — AE désactivé ce cycle.") - return joblib.load(model_path), ae_loaded, meta.get('features', features) - elif not drift_forced: - log_info(f"[{name}] Modèle v{meta['version_id']} expiré ({age_h:.1f}h ≥ {RETRAIN_INTERVAL_H}h) — retraining.") - - version_id = datetime.now().strftime('%Y%m%d_%H%M%S') - log_info(f"[{name}] Entraînement EIF v{version_id} — {len(human_baseline)} sessions ISP, {len(features)} features, contamination={CONTAMINATION}") - - X = human_baseline[features].replace([np.inf, -np.inf], np.nan) - X = X.fillna(X.median()) - - # Feature pruning : retirer les features à variance quasi-nulle (inutiles pour les arbres) - PRUNE_VARIANCE_THRESHOLD = float(os.getenv('PRUNE_VARIANCE_THRESHOLD', '1e-6')) - feature_variances = X.var() - low_var_features = feature_variances[feature_variances < PRUNE_VARIANCE_THRESHOLD].index.tolist() - if low_var_features: - log_info(f"[{name}] Élagage : {len(low_var_features)} feature(s) à variance < {PRUNE_VARIANCE_THRESHOLD} retirées : {low_var_features}") - X = X.drop(columns=low_var_features) - features = [f for f in features if f not in low_var_features] - log_decision('FEATURE_PRUNED', cycle_id, name, {'pruned': low_var_features, 'remaining': len(features)}) - - # Validation split : réserver 20% pour évaluation offline - val_size = max(1, int(len(X) * 0.2)) - X_train = X.iloc[:-val_size] - X_val = X.iloc[-val_size:] - - if EIF_AVAILABLE: - model = ExtendedIsolationForest( - ntrees=300, ndim=min(3, len(features)), - sample_size='auto', missing_action='impute', - random_seed=42, nthreads=-1 - ) - else: - model = IsolationForest(n_estimators=300, contamination=CONTAMINATION, random_state=42, n_jobs=-1) - model.fit(X_train) - - # Évaluation offline : score moyen sur la validation (devrait être > 0 pour du trafic humain sklearn) - val_scores = model.decision_function(X_val) - # Unifier la convention : négatif = anomal (isotree: 0.5 - score) - if EIF_AVAILABLE: - val_scores = 0.5 - val_scores - val_mean_score = float(np.mean(val_scores)) - val_anomaly_rate = float(np.mean(val_scores < 0)) - log_info(f"[{name}] Validation : score moyen={val_mean_score:.4f}, taux anomalie={val_anomaly_rate:.2%} ({len(X_val)} échantillons)") - - # GATE CONDITION : rejeter le modèle si la baseline semble contaminée - VAL_ANOMALY_GATE = float(os.getenv('VAL_ANOMALY_GATE', '0.20')) - if val_anomaly_rate > VAL_ANOMALY_GATE: - log_info(f"[{name}] ⚠ REJET : val_anomaly_rate={val_anomaly_rate:.2%} > gate={VAL_ANOMALY_GATE:.0%} — baseline probablement contaminée.") - log_decision('MODEL_REJECTED', cycle_id, name, { - 'val_anomaly_rate': round(val_anomaly_rate, 4), 'gate': VAL_ANOMALY_GATE, - 'val_mean_score': round(val_mean_score, 4), 'version_id': version_id, - }) - # Tenter de réutiliser le modèle précédent - if model_path and os.path.exists(model_path): - log_info(f"[{name}] Conservation du modèle précédent v{meta.get('version_id', '?')}.") - ae_prev = None - if TORCH_AVAILABLE and AE_WEIGHT > 0: - ae_prev_path = _ae_model_path(name, meta.get('version_id', '')) - if os.path.exists(ae_prev_path): - try: - ae_prev = TrafficAutoEncoder.load_state_dict(torch.load(ae_prev_path, weights_only=False)) - except Exception: - pass - return joblib.load(model_path), ae_prev, meta.get('features', features) - log_info(f"[{name}] Aucun modèle précédent — utilisation du modèle rejeté par défaut.") - - # A1 — Sauvegarder les statistiques de distribution avec quantile digest pour drift detection - baseline_stats = { - f: { - 'mean': float(X[f].mean()), 'std': float(X[f].std()), - 'p10': float(X[f].quantile(0.10)), 'p25': float(X[f].quantile(0.25)), - 'p50': float(X[f].quantile(0.50)), 'p75': float(X[f].quantile(0.75)), - 'p90': float(X[f].quantile(0.90)), - } - for f in features - } - - new_model_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.joblib') - new_meta_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.meta.json') - joblib.dump(model, new_model_path) - - # Entraînement de l'Autoencoder en parallèle (si PyTorch disponible et AE_WEIGHT > 0) - ae_model = None - if TORCH_AVAILABLE and AE_WEIGHT > 0: - try: - ae_model = TrafficAutoEncoder(n_features=len(features)) - ae_stats = ae_model.fit(X_train.values) - ae_path = _ae_model_path(name, version_id) - torch.save(ae_model.state_dict(), ae_path) - log_info(f"[{name}] Autoencoder entraîné : loss={ae_stats['final_loss']:.6f}, epochs={ae_stats['epochs']}") - except Exception as exc: - log_info(f"[{name}] Autoencoder training échoué : {exc} — AE désactivé.") - ae_model = None - - previous_version = meta.get('version_id', None) if meta else None - new_meta = { - 'version_id': version_id, 'trained_at': datetime.now().isoformat(), - 'human_samples': len(human_baseline), 'contamination': CONTAMINATION, - 'threshold': ANOMALY_THRESHOLD, 'features': features, - 'model_name': name, 'previous_version': previous_version, - 'retrain_interval': RETRAIN_INTERVAL_H, 'baseline_stats': baseline_stats, - 'algorithm': 'ExtendedIsolationForest' if EIF_AVAILABLE else 'IsolationForest', - 'autoencoder': ae_model is not None, - 'ae_weight': AE_WEIGHT if ae_model else 0.0, - 'validation': { - 'val_size': len(X_val), 'train_size': len(X_train), - 'val_mean_score': round(val_mean_score, 4), - 'val_anomaly_rate': round(val_anomaly_rate, 4), - } - } - with open(new_meta_path, 'w') as f: json.dump(new_meta, f, indent=2) - with open(_current_pointer_path(name), 'w') as f: f.write(version_id) - - _append_training_history({k: v for k, v in new_meta.items() if k != 'baseline_stats'}) - _purge_old_versions(name) - - log_info(f"[{name}] Modèle v{version_id} sauvegardé → {new_model_path} (AE={'oui' if ae_model is not None else 'non'})") - log_decision('MODEL_TRAINED', cycle_id, name, { - 'version_id': version_id, 'previous_version': previous_version, - 'human_samples': len(human_baseline), 'next_retrain_in_h': RETRAIN_INTERVAL_H, - 'history_kept': MODEL_HISTORY_COUNT - }) - return model, ae_model, features - -# ═══════════════════════════════════════════════════════════════════════════════ -# A1 — DÉTECTION DE DÉRIVE CONCEPTUELLE (CONCEPT DRIFT) -# ═══════════════════════════════════════════════════════════════════════════════ -def _compute_drift_score(baseline_stats: dict, current_baseline: pd.DataFrame, features: list) -> float: - """Compare la distribution actuelle de la baseline humaine avec celle de l'entraînement. - - Utilise le test de Kolmogorov-Smirnov bilatéral par feature. La distribution - d'entraînement est reconstruite à partir d'un quantile digest (p10..p90) par - interpolation linéaire — bien plus fidèle qu'une approximation N(μ,σ) pour les - features asymétriques ou multimodales. - Retourne la fraction de features en dérive significative (p < 0.05). - """ - if not baseline_stats or current_baseline.empty: - return 0.0 - try: - from scipy.stats import ks_2samp - except ImportError: - return _compute_drift_score_zscore(baseline_stats, current_baseline, features) - - drifted = 0 - tested = 0 - rng = np.random.default_rng(42) - for feat in features: - if feat not in baseline_stats or feat not in current_baseline.columns: - continue - stats = baseline_stats[feat] - curr_values = current_baseline[feat].dropna() - if len(curr_values) < 30: - continue - trained_std = stats.get('std', 0) - if trained_std < 1e-9: - continue - - # Reconstruire un échantillon via quantile inverse si quantiles disponibles - quantile_keys = ['p10', 'p25', 'p50', 'p75', 'p90'] - if all(k in stats for k in quantile_keys): - quantile_probs = np.array([0.10, 0.25, 0.50, 0.75, 0.90]) - quantile_vals = np.array([stats[k] for k in quantile_keys]) - # Interpolation linéaire : tirage uniforme → quantile inverse - u = rng.uniform(0, 1, size=len(curr_values)) - synthetic_trained = np.interp(u, quantile_probs, quantile_vals) - else: - # Fallback N(μ,σ) si anciens metadata sans quantiles - synthetic_trained = rng.normal(stats['mean'], trained_std, size=len(curr_values)) - - _, p_value = ks_2samp(curr_values.values, synthetic_trained) - if p_value < 0.05: - drifted += 1 - tested += 1 - return drifted / max(tested, 1) - - -def _compute_drift_score_zscore(baseline_stats: dict, current_baseline: pd.DataFrame, features: list) -> float: - """Fallback Z-score pour la détection de dérive quand scipy n'est pas disponible.""" - if not baseline_stats or current_baseline.empty: - return 0.0 - drifted = 0 - tested = 0 - for feat in features: - if feat not in baseline_stats or feat not in current_baseline.columns: - continue - stats = baseline_stats[feat] - curr_mean = current_baseline[feat].mean() - trained_std = stats.get('std', 0) - if trained_std < 1e-9: - continue - z = abs(curr_mean - stats['mean']) / trained_std - if z > 2.0: - drifted += 1 - tested += 1 - return drifted / max(tested, 1) - - -# Cache par modèle conservant le dernier état des features invalides. -# Permet de supprimer les logs répétitifs : on ne loggue que si l'état a changé depuis le cycle précédent. -_feature_warning_cache: dict = {} - - -# ═══════════════════════════════════════════════════════════════════════════════ -# A7 — VALIDATION DE COMPLÉTUDE DES FEATURES -# ═══════════════════════════════════════════════════════════════════════════════ -def validate_features(df: pd.DataFrame, features: list, name: str, cycle_id: str): - """ - Vérifie que les features sont présentes et non constantes dans le DataFrame. - Catégorise les features invalides : - - structural : absente par design pour ce modèle (défini dans STRUCTURAL_EXCLUDED_FEATURES) - - zero : colonne toujours à 0 — problème de pipeline - - unique : colonne avec une seule valeur non-nulle — agrégat global non discriminant - - missing : colonne absente du DataFrame - Retourne la liste des features valides, ou None si trop de features sont invalides. - Les avertissements ne sont logués que si l'état a changé depuis le cycle précédent - (grâce à _feature_warning_cache), pour éviter de polluer les logs à chaque cycle. - """ - structural = STRUCTURAL_EXCLUDED_FEATURES.get(name, []) - # Exclure les features structurelles d'emblée (sans warning pipeline) - active_features = [f for f in features if f not in structural] - - missing = [f for f in active_features if f not in df.columns] - present = [f for f in active_features if f in df.columns] - - zero_val = [f for f in present if df[f].nunique() == 1 and df[f].max() == 0] - unique_val = [f for f in present if df[f].nunique() == 1 and df[f].max() != 0] - constant = zero_val + unique_val - valid = [f for f in present if f not in constant] - - current_state = (frozenset(missing), frozenset(zero_val), frozenset(unique_val)) - state_changed = _feature_warning_cache.get(name) != current_state - _feature_warning_cache[name] = current_state - - if structural: - log_info(f"[{name}] Features exclues (structurelles / L4 indisponible) : {structural}") - # Ne logguer les avertissements que si l'état a changé (nouveau problème ou résolution) - if state_changed: - if missing: - log_info(f"[{name}] Features absentes du schéma : {missing}") - if zero_val: - log_info(f"[{name}] Features à 0 (pipeline non-alimenté) : {zero_val}") - if unique_val: - log_info(f"[{name}] Features non-discriminantes (agrégat global) : {unique_val}") - if missing or zero_val or unique_val: - log_decision('FEATURE_WARNING', cycle_id, name, { - 'structural': structural, 'missing': missing, - 'zero': zero_val, 'unique_nonzero': unique_val, - 'valid_count': len(valid), 'total': len(active_features) - }) - - ratio = len(valid) / max(len(active_features), 1) - if ratio < MIN_VALID_FEATURE_RATIO: - log_info(f"[{name}] Ratio features valides insuffisant ({ratio:.0%} < {MIN_VALID_FEATURE_RATIO:.0%}) — cycle ignoré.") - log_decision('SKIPPED_INVALID_FEATURES', cycle_id, name, { - 'valid_ratio': round(ratio, 3), 'threshold': MIN_VALID_FEATURE_RATIO - }) - return None - return valid - - -# ═══════════════════════════════════════════════════════════════════════════════ -# A2 / A10 — SEUIL ADAPTATIF ET NORMALISATION DES SCORES -# ═══════════════════════════════════════════════════════════════════════════════ -def compute_adaptive_threshold(scores: np.ndarray) -> float: - """ - A2 : Calcule un seuil adaptatif basé sur le percentile ANOMALY_PERCENTILE des scores négatifs. - Retourne le min entre le seuil adaptatif et le seuil statique configuré. - """ - neg_scores = scores[scores < 0] - if len(neg_scores) == 0: - return ANOMALY_THRESHOLD - adaptive = float(np.percentile(neg_scores, ANOMALY_PERCENTILE)) - return min(adaptive, ANOMALY_THRESHOLD) - - -def normalize_scores(scores: np.ndarray) -> np.ndarray: - """ - A10 : Normalise les scores d'anomalie en [0, 1] avec 1 = le plus anomal. - - Les scores positifs (trafic normal) reçoivent un score normalisé de 0. - Les scores négatifs sont mappés linéairement : le plus négatif → 1.0, zéro → 0.0. - """ - result = np.zeros_like(scores) - mask = scores < 0 - if mask.sum() == 0: - return result - s_min = scores[mask].min() - if s_min == 0: - return result - result[mask] = np.clip(-scores[mask] / (-s_min + 1e-9), 0.0, 1.0) - return result - - -# ═══════════════════════════════════════════════════════════════════════════════ -# A4 — EXPLAINABILITÉ PAR SHAP -# ═══════════════════════════════════════════════════════════════════════════════ -def _compute_shap_top_features(model, X: pd.DataFrame, features: list, n_top: int = 5) -> list: - """ - Calcule les valeurs SHAP pour chaque ligne de X et retourne les n_top features - les plus contributives (valeur SHAP la plus négative = plus responsable de l'anomalie). - Retourne une liste de dicts {feature: shap_value} par ligne. - - Utilise TreeExplainer pour sklearn, et un échantillon Permutation pour isotree. - """ - if not ENABLE_SHAP or X.empty: - return [{}] * len(X) - try: - if EIF_AVAILABLE: - # isotree non supporté par TreeExplainer — utiliser un échantillon + Permutation - sample_size = min(100, len(X)) - X_sample = X.sample(n=sample_size, random_state=42) if len(X) > sample_size else X - explainer = _shap.Explainer(model.decision_function, X_sample) - shap_values = explainer(X).values - else: - explainer = _shap.TreeExplainer(model) - shap_values = explainer.shap_values(X) - result = [] - for sv in shap_values: - # Features les plus négatives = les plus responsables de l'anomalie - pairs = sorted(zip(features, sv), key=lambda x: x[1]) - result.append({f: round(float(v), 4) for f, v in pairs[:n_top]}) - return result - except Exception as e: - log_info(f"[SHAP] Erreur de calcul SHAP: {e}") - return [{}] * len(X) - - -def _build_reason(name: str, row: pd.Series, shap_top: dict) -> str: - """Construit le champ reason enrichi avec le top SHAP ou les métriques clés.""" - # Utilise le score brut pour l'affichage (plus interprétable que le score normalisé) - score = round(float(row.get('raw_anomaly_score', row.get('anomaly_score', 0))), 3) - threat = row.get('threat_level', '') - if shap_top: - top_str = ' | '.join(f"{f}({v:+.3f})" for f, v in shap_top.items()) - return f"[{name}] Score: {score} | SHAP: {top_str} | Threat: {threat}" - vel = round(float(row.get('hit_velocity', 0)), 1) - fuzz = round(float(row.get('fuzzing_index', 0)), 1) - return f"[{name}] Score: {score} | Vel: {vel} req/s | Fuzzing: {fuzz} | Threat: {threat}" - - -# ═══════════════════════════════════════════════════════════════════════════════ -# A8 — CLUSTERING COMPORTEMENTAL DES ANOMALIES (DBSCAN) -# ═══════════════════════════════════════════════════════════════════════════════ -def _cluster_anomalies(anomalies: pd.DataFrame, features: list, ae_model=None) -> pd.DataFrame: - """A8 : Applique HDBSCAN (ou DBSCAN en fallback) sur les features normalisées des anomalies. - - HDBSCAN est préféré car il détermine automatiquement le nombre de clusters - et la densité optimale (pas de paramètre eps à régler manuellement). - Si un autoencoder est disponible, utilise l'espace latent (16-dim) au lieu des features brutes - pour un clustering plus expressif dans un espace de dimension réduite. - Ajoute une colonne campaign_id : −1 = IP isolée, ≥0 = identifiant de campagne coordonnée. - """ - anomalies = anomalies.copy() - if len(anomalies) < CLUSTERING_MIN_SAMPLES: - anomalies['campaign_id'] = -1 - return anomalies - try: - X = anomalies[features].replace([np.inf, -np.inf], np.nan).fillna(0) - # Utiliser l'espace latent AE si disponible (meilleure séparation dans un espace 16-dim) - if ae_model is not None: - try: - X_scaled = ae_model.encode(X.values) - algo_prefix = 'AE+' - except Exception: - X_scaled = StandardScaler().fit_transform(X) - algo_prefix = '' - else: - X_scaled = StandardScaler().fit_transform(X) - algo_prefix = '' - if HDBSCAN_AVAILABLE: - clusterer = _hdbscan.HDBSCAN( - min_cluster_size=CLUSTERING_MIN_SAMPLES, - min_samples=max(2, CLUSTERING_MIN_SAMPLES - 1), - cluster_selection_method='eom' - ) - labels = clusterer.fit_predict(X_scaled) - else: - labels = DBSCAN(eps=0.5, min_samples=CLUSTERING_MIN_SAMPLES).fit_predict(X_scaled) - anomalies['campaign_id'] = labels - n_campaigns = len(set(labels)) - (1 if -1 in labels else 0) - algo = algo_prefix + ('HDBSCAN' if HDBSCAN_AVAILABLE else 'DBSCAN') - if n_campaigns > 0: - log_info(f"[{algo}] {n_campaigns} campagne(s) détectée(s) parmi {len(anomalies)} anomalies.") - except Exception as e: - log_info(f"[Clustering] Erreur de clustering: {e}") - anomalies['campaign_id'] = -1 - return anomalies - - -# ═══════════════════════════════════════════════════════════════════════════════ -# ANALYSE SEMI-SUPERVISÉE -# ═══════════════════════════════════════════════════════════════════════════════ -def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): - """Applique le pipeline de détection semi-supervisée sur un sous-ensemble du trafic. - - Trifurque le trafic en bots connus, bots Anubis ALLOW et trafic inconnu, - entraîne ou charge le modèle IsolationForest sur la baseline humaine, - score le trafic inconnu, applique les améliorations A2/A4/A6/A8, - et retourne (threats, all_scored) sous forme de DataFrames. - - Effets de bord : écriture dans les logs de décision via log_decision. - """ - # 1. Bots connus (dict_bot_ip / dict_bot_ja4) → exclus du scoring IF - known_bots = df[df['bot_name'] != ''].copy() - rest = df[df['bot_name'] == ''].copy() - - # 2. Bots Anubis ALLOW → bots légitimes, exclus du scoring IF - anubis_allow = rest[rest['anubis_bot_action'] == 'ALLOW'].copy() - - # 3. Tout le reste passe par l'IsolationForest pour un score réel : - # - DENY : menaces identifiées par règles Anubis → IF donne le score de sévérité - # - WEIGH / inconnu → scorés normalement (anubis_is_flagged=1 pour WEIGH) - # Les DENY sont TOUJOURS inclus dans les threats, indépendamment du seuil IF. - unknown_traffic = rest[rest['anubis_bot_action'] != 'ALLOW'].copy() - human_baseline = unknown_traffic[unknown_traffic['asn_label'] == 'isp'] - - log_info(f'[{name}] ── Triage ──────────────────────────────────────') - log_info(f'[{name}] Total sessions : {len(df):>6}') - log_info(f'[{name}] Bots connus (dict) : {len(known_bots):>6}') - log_info(f'[{name}] Anubis ALLOW : {len(anubis_allow):>6}') - log_info(f'[{name}] Trafic à scorer (IF) : {len(unknown_traffic):>6}') - log_info(f'[{name}] Baseline ISP (human) : {len(human_baseline):>6} (seuil min=500)') - - # A7 — Valider les features avant tout traitement - valid_features = validate_features(df, features, name, cycle_id) - if valid_features is None: - return pd.DataFrame(), pd.DataFrame() - - if len(human_baseline) < 500: - log_info(f"[{name}] ⚠ Données humaines insuffisantes ({len(human_baseline)} < 500) — cycle ignoré.") - log_info(f"[{name}] Distribution asn_label dans le trafic à scorer :") - if 'asn_label' in unknown_traffic.columns: - for label, cnt in unknown_traffic['asn_label'].value_counts().head(8).items(): - log_info(f"[{name}] {label:>15} : {cnt}") - log_decision('SKIPPED_LOW_DATA', cycle_id, name, { - 'human_count': len(human_baseline), 'unknown_count': len(unknown_traffic) - }) - return pd.DataFrame(), pd.DataFrame() - - log_info(f'[{name}] ── Modèle EIF ─────────────────────────────────') - log_info(f'[{name}] Features validées : {len(valid_features)}/{len(features)} ({", ".join(valid_features[:5])}{"…" if len(valid_features) > 5 else ""})') - - # A1 — Dérive conceptuelle intégrée dans load_or_train_model - model, ae_model, model_features = load_or_train_model(name, human_baseline, valid_features, cycle_id) - # Utiliser les features du modèle (possiblement différentes après pruning/chargement) - scoring_features = [f for f in model_features if f in unknown_traffic.columns] - unknown_traffic = unknown_traffic.copy() - - X_test = unknown_traffic[scoring_features].replace([np.inf, -np.inf], np.nan) - X_test = X_test.fillna(X_test.median()) - raw_scores = model.decision_function(X_test) - - # isotree renvoie des scores dans [0, 1] : 0.5 = frontière, >0.5 = anomal - # sklearn renvoie des scores centrés sur 0 : <0 = anomal, >0 = normal - # Conversion : sklearn_equiv = 0.5 - isotree_score - # isotree 0.8 → -0.3 (CRITICAL) | isotree 0.5 → 0.0 (frontière) - # isotree 0.3 → +0.2 (NORMAL) - if EIF_AVAILABLE: - raw_scores = 0.5 - raw_scores - - log_info(f'[{name}] Scoring EIF : {len(X_test)} sessions scorées (min={raw_scores.min():.4f}, max={raw_scores.max():.4f}, mean={raw_scores.mean():.4f})') - - # Combinaison EIF + Autoencoder si disponible - # Score final = (1-α) * eif_norm + α * ae_norm où α = AE_WEIGHT - if ae_model is not None and AE_WEIGHT > 0: - try: - ae_recon_errors = ae_model.score_samples(X_test.values) - ae_norm = normalize_scores(-ae_recon_errors) # plus élevé = plus anomal - eif_norm = normalize_scores(raw_scores) - combined_norm = (1 - AE_WEIGHT) * eif_norm + AE_WEIGHT * ae_norm - unknown_traffic['ae_recon_error'] = ae_recon_errors - unknown_traffic['anomaly_score'] = combined_norm - log_info(f"[{name}] Score combiné EIF+AE (α={AE_WEIGHT}): ae_mean={ae_recon_errors.mean():.6f}") - except Exception as exc: - log_info(f"[{name}] AE scoring échoué : {exc} — utilisation EIF seul.") - unknown_traffic['ae_recon_error'] = 0.0 - unknown_traffic['anomaly_score'] = normalize_scores(raw_scores) - else: - unknown_traffic['ae_recon_error'] = 0.0 - unknown_traffic['anomaly_score'] = normalize_scores(raw_scores) - - # raw_anomaly_score : score brut IF pour comparaison au seuil et assignation du threat_level - unknown_traffic['raw_anomaly_score'] = raw_scores - unknown_traffic['model_name'] = name - - # XGBoost supervisé — troisième voix (si labels historiques disponibles) - unknown_traffic['xgb_prob'] = 0.0 - if XGB_AVAILABLE and XGB_WEIGHT > 0: - try: - xgb_client = get_client() - xgb_model, xgb_feats = load_or_train_xgb(name, xgb_client, scoring_features, cycle_id) - if xgb_model is not None and xgb_feats is not None: - # XGB peut utiliser un sous-ensemble de features (celles disponibles dans la vue) - xgb_cols = [f for f in xgb_feats if f in unknown_traffic.columns] - X_xgb = unknown_traffic[xgb_cols].replace([np.inf, -np.inf], np.nan).fillna(0) - xgb_probs = xgb_model.predict_proba(X_xgb.values)[:, 1] - unknown_traffic['xgb_prob'] = xgb_probs - # Méta-learner : combiner anomaly_score (EIF+AE) et xgb_prob - # anomaly_score déjà normalisé [0,1], xgb_prob est [0,1] - α_xgb = XGB_WEIGHT - unknown_traffic['anomaly_score'] = ( - (1 - α_xgb) * unknown_traffic['anomaly_score'] + α_xgb * xgb_probs - ) - log_info(f"[{name}] Score combiné EIF+AE+XGB (β={α_xgb}): xgb_mean={xgb_probs.mean():.4f}") - except Exception as exc: - log_info(f"[{name}] XGBoost scoring échoué : {exc} — EIF+AE seuls.") - - # A2 — Seuil adaptatif calculé sur les scores BRUTS (même échelle que ANOMALY_THRESHOLD) - effective_threshold = compute_adaptive_threshold(raw_scores) - log_info(f"[{name}] Seuil effectif : {effective_threshold:.4f} (statique={ANOMALY_THRESHOLD}, percentile={ANOMALY_PERCENTILE})") - - # A6 — Pénaliser les IPs récurrentes sur le score BRUT avant comparaison au seuil - if RECURRENCE_WEIGHT > 0: - recurrences = unknown_traffic['src_ip'].map(recurrence_map).fillna(0) - penalty = np.log1p(recurrences.values) * RECURRENCE_WEIGHT - unknown_traffic['raw_anomaly_score'] = unknown_traffic['raw_anomaly_score'] - penalty - - # Assigner threat_level à TOUTES les sessions scorées (pour ml_all_scores) - unknown_traffic['threat_level'] = unknown_traffic['raw_anomaly_score'].apply(score_to_threat_level) - unknown_traffic['recurrence'] = unknown_traffic['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1 - unknown_traffic['campaign_id'] = -1 - - # Extraire les DENY (maintenant avec leur vrai score IF) et forcer leur threat_level - deny_mask = unknown_traffic['anubis_bot_action'] == 'DENY' - unknown_traffic.loc[deny_mask, 'threat_level'] = 'ANUBIS_DENY' - - # ── Classification des navigateurs légitimes ───────────────────────────── - # Principe : un cluster/session est classé LEGITIMATE_BROWSER si et seulement si : - # 1. Le JA4 correspond à un navigateur connu (browser_family != '') - # 2. Le comportement est cohérent avec un vrai navigateur (browser_consistency_score >= 4) - # 3. Le score IF ne signale pas de menace (threat_level in NORMAL/LOW) - # 4. Ce n'est pas un DENY Anubis - # Cela réduit les faux positifs sur les vrais utilisateurs tout en détectant - # le spoofing de JA4 (navigateur déclaré mais comportement incohérent). - _bf = unknown_traffic.get('browser_family', pd.Series('', index=unknown_traffic.index)).fillna('').astype(str) - _bcs = unknown_traffic.get('browser_consistency_score', pd.Series(0, index=unknown_traffic.index)).fillna(0) - browser_legit_mask = ( - (_bf != '') & # JA4 navigateur reconnu - (_bcs >= BROWSER_LEGIT_MIN_CONSISTENCY) & # comportement cohérent - (unknown_traffic['threat_level'].isin(['NORMAL', 'LOW'])) & # pas de menace IF - (~deny_mask) # pas un DENY Anubis - ) - if browser_legit_mask.any(): - unknown_traffic.loc[browser_legit_mask, 'threat_level'] = 'LEGITIMATE_BROWSER' - unknown_traffic.loc[browser_legit_mask, 'reason'] = ( - '[Navigateur légitime] ' + _bf[browser_legit_mask] + - ' (cohérence=' + _bcs[browser_legit_mask].astype(int).astype(str) + '/5)' - ) - n_legit = browser_legit_mask.sum() - families = _bf[browser_legit_mask].value_counts().to_dict() - log_info(f"[{name}] {n_legit} session(s) classée(s) LEGITIMATE_BROWSER : {families}") - log_decision('LEGITIMATE_BROWSER', cycle_id, name, { - 'count': int(n_legit), 'families': families, - 'mean_consistency': round(float(_bcs[browser_legit_mask].mean()), 2), - }) - - # Capturer toutes les sessions scorées (avant filtrage par seuil) — pour ml_all_scores - all_scored = unknown_traffic.copy() - - if not known_bots.empty: - known_bots = known_bots.copy() - known_bots['anomaly_score'] = 0.0 - known_bots['raw_anomaly_score'] = 0.0 - known_bots['ae_recon_error'] = 0.0 - known_bots['xgb_prob'] = 0.0 - known_bots['threat_level'] = 'KNOWN_BOT' - known_bots['model_name'] = name - known_bots['campaign_id'] = -1 - known_bots['reason'] = '[Identification] Bot légitime: ' + known_bots['bot_name'] - known_bots['recurrence'] = known_bots['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1 - for _, row in known_bots.iterrows(): - log_decision('KNOWN_BOT', cycle_id, name, { - 'src_ip': row.get('src_ip', ''), 'bot_name': row.get('bot_name', ''), - 'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''), - 'asn_domain': row.get('asn_domain', ''), 'country_code': row.get('country_code', ''), - 'recurrence': int(row.get('recurrence', 1)) - }) - - # ── Anubis ALLOW : bots légitimes identifiés par règles Anubis ─────────── - if not anubis_allow.empty: - anubis_allow = anubis_allow.copy() - anubis_allow['anomaly_score'] = 0.0 - anubis_allow['raw_anomaly_score'] = 0.0 - anubis_allow['ae_recon_error'] = 0.0 - anubis_allow['xgb_prob'] = 0.0 - anubis_allow['threat_level'] = 'KNOWN_BOT' - anubis_allow['bot_name'] = anubis_allow['anubis_bot_name'] - anubis_allow['model_name'] = name - anubis_allow['campaign_id'] = -1 - anubis_allow['reason'] = '[Anubis ALLOW] ' + anubis_allow['anubis_bot_name'] - anubis_allow['recurrence'] = anubis_allow['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1 - for _, row in anubis_allow.iterrows(): - log_decision('KNOWN_BOT', cycle_id, name, { - 'src_ip': row.get('src_ip', ''), 'bot_name': row.get('anubis_bot_name', ''), - 'anubis_bot_name': row.get('anubis_bot_name', ''), - 'anubis_bot_action': row.get('anubis_bot_action', ''), - 'anubis_bot_category': row.get('anubis_bot_category', ''), - 'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''), - 'asn_domain': row.get('asn_domain', ''), 'country_code': row.get('country_code', ''), - 'recurrence': int(row.get('recurrence', 1)), - }) - - # ── Anubis DENY : scorés par IF, toujours inclus dans les threats ──────── - # Extraits de unknown_traffic après scoring — ils ont leur vrai score IF. - anubis_deny = unknown_traffic[deny_mask].copy() - if not anubis_deny.empty: - anubis_deny['reason'] = '[Anubis DENY] ' + anubis_deny['anubis_bot_name'].fillna('') + \ - ' | ' + anubis_deny['raw_anomaly_score'].apply(lambda s: f'IF={s:.4f}') - log_info(f"[{name}] Anubis DENY: {len(anubis_deny)} IP(s) scorées par IF " - f"(score moyen: {anubis_deny['raw_anomaly_score'].mean():.4f}).") - for _, row in anubis_deny.iterrows(): - log_decision('ANUBIS_DENY', cycle_id, name, { - 'src_ip': row.get('src_ip', ''), 'anubis_bot_name': row.get('anubis_bot_name', ''), - 'anubis_bot_action': row.get('anubis_bot_action', ''), - 'anubis_bot_category': row.get('anubis_bot_category', ''), - 'anomaly_score': round(float(row.get('anomaly_score', 0)), 4), - 'raw_anomaly_score': round(float(row.get('raw_anomaly_score', 0)), 4), - 'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''), - 'asn_domain': row.get('asn_domain', ''), 'country_code': row.get('country_code', ''), - 'recurrence': int(row.get('recurrence', 1)), - }) - - # Filtrer sur raw_anomaly_score (A6 inclus) — seulement le trafic non-DENY et non-navigateur légitime - # Les DENY sont toujours des threats, indépendamment du seuil IF - # Les LEGITIMATE_BROWSER sont exclus des anomalies (navigateurs confirmés) - non_deny_traffic = unknown_traffic[~deny_mask & (unknown_traffic['threat_level'] != 'LEGITIMATE_BROWSER')] - anomalies = non_deny_traffic[non_deny_traffic['raw_anomaly_score'] < effective_threshold].copy() - if not anomalies.empty: - log_info(f"[{name}] ALERT: {len(anomalies)} anomalies détectées (seuil={effective_threshold:.4f}).") - anomalies['recurrence'] = anomalies['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1 - - # A4 — Explainabilité SHAP : top features responsables de chaque anomalie - X_anomalies = X_test.loc[anomalies.index] - shap_tops = _compute_shap_top_features(model, X_anomalies, valid_features) - anomalies['reason'] = [ - _build_reason(name, row, shap) - for (_, row), shap in zip(anomalies.iterrows(), shap_tops) - ] - - # A8 — Clustering DBSCAN pour identifier les campagnes coordonnées - if ENABLE_CLUSTERING: - anomalies = _cluster_anomalies(anomalies, scoring_features, ae_model=ae_model) - - anomalies['ja4'] = anomalies['ja4'].replace({'': 'HTTP_CLEAR_TEXT'}) - for _, row in anomalies.iterrows(): - log_decision('ANOMALY', cycle_id, name, { - 'src_ip': row.get('src_ip', ''), 'anomaly_score': round(float(row.get('anomaly_score', 0)), 4), - 'raw_anomaly_score': round(float(row.get('raw_anomaly_score', 0)), 4), - 'threat_level': row.get('threat_level', ''), 'recurrence': int(row.get('recurrence', 1)), - 'hit_velocity': round(float(row.get('hit_velocity', 0)), 2), - 'fuzzing_index': round(float(row.get('fuzzing_index', 0)), 2), - 'post_ratio': round(float(row.get('post_ratio', 0)), 3), - 'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''), - 'asn_detail': row.get('asn_detail', ''), 'asn_domain': row.get('asn_domain', ''), - 'country_code': row.get('country_code', ''), 'asn_label': row.get('asn_label', ''), - 'ja4': row.get('ja4', ''), 'host': row.get('host', ''), - 'correlated': int(row.get('correlated', 0)), 'campaign_id': int(row.get('campaign_id', -1)), - 'effective_threshold': round(effective_threshold, 4), 'reason': row.get('reason', '') - }) - - threats = pd.concat([df for df in [ - anomalies if not anomalies.empty else None, - known_bots if not known_bots.empty else None, - anubis_allow if not anubis_allow.empty else None, - anubis_deny if not anubis_deny.empty else None, - ] if df is not None], ignore_index=True) - - # Inclure anubis_allow dans all_scored pour traçabilité dans ml_all_scores. - # Ces IPs sont exclues de l'analyse IF mais doivent apparaître dans la table - # de scores avec threat_level='KNOWN_BOT' et anomaly_score=0.0. - if not anubis_allow.empty: - all_scored = pd.concat([all_scored, anubis_allow], ignore_index=True) - - # ── Résumé du modèle ───────────────────────────────────────────────────── - n_threats = len(threats) if not threats.empty else 0 - n_anomalies = len(anomalies) if not anomalies.empty else 0 - n_legit_browser = int(browser_legit_mask.sum()) if browser_legit_mask is not None else 0 - n_deny = len(anubis_deny) if not anubis_deny.empty else 0 - tl_counts = threats['threat_level'].value_counts().to_dict() if not threats.empty else {} - tl_str = ', '.join(f'{k}={v}' for k, v in sorted(tl_counts.items())) if tl_counts else 'aucune' - log_info(f'[{name}] ── Résultat ────────────────────────────────────') - log_info(f'[{name}] Menaces totales : {n_threats:>6} ({tl_str})') - log_info(f'[{name}] Anomalies IF : {n_anomalies:>6} (seuil={effective_threshold:.4f})') - log_info(f'[{name}] Navigateurs légit. : {n_legit_browser:>6}') - log_info(f'[{name}] Anubis DENY (forcé) : {n_deny:>6}') - log_info(f'[{name}] Sessions scorées : {len(all_scored):>6} (→ ml_all_scores)') - - return threats, all_scored - -# ═══════════════════════════════════════════════════════════════════════════════ -# A5 — DÉDUPLICATION INTER-CYCLES AVEC TTL -# ═══════════════════════════════════════════════════════════════════════════════ - -# ═══════════════════════════════════════════════════════════════════════════════ -# FEEDBACK LOOP — Intégration des classifications SOC dans la baseline -# ═══════════════════════════════════════════════════════════════════════════════ -ENABLE_FEEDBACK = os.getenv('ENABLE_FEEDBACK', 'true').lower() == 'true' -FEEDBACK_WINDOW_DAYS = int(os.getenv('FEEDBACK_WINDOW_DAYS', '7')) - -def _load_soc_feedback(client) -> dict: - """Charge les classifications SOC récentes pour ajuster la baseline. - - Retourne un dict {src_ip: classification} où classification est - 'true_positive', 'false_positive', 'suspicious', etc. - Les faux positifs sont exclus du scoring (considérés comme humains), - les vrais positifs sont exclus de la baseline humaine. - """ - if not ENABLE_FEEDBACK: - return {} - try: - feedback_df = client.query_df( - f"SELECT entity_id AS src_ip, " - f" argMax(JSONExtractString(details, 'classification'), timestamp) AS classification " - f"FROM {DB}.audit_logs " - f"WHERE action = 'create_classification' " - f" AND entity_type = 'ip' " - f" AND timestamp >= now() - INTERVAL {FEEDBACK_WINDOW_DAYS} DAY " - f"GROUP BY entity_id" - ) - if feedback_df is None or feedback_df.empty: - return {} - result = dict(zip(feedback_df['src_ip'], feedback_df['classification'])) - log_info(f"[Feedback] {len(result)} classification(s) SOC chargées ({FEEDBACK_WINDOW_DAYS}j).") - return result - except Exception as e: - log_info(f"[Feedback] Impossible de charger les classifications SOC : {e}") - return {} - - -# ═══════════════════════════════════════════════════════════════════════════════ -# A5 — DÉDUPLICATION INTER-CYCLES AVEC TTL -# ═══════════════════════════════════════════════════════════════════════════════ -def _filter_recent_detections(client, all_anom: pd.DataFrame) -> pd.DataFrame: - """ - A5 : Filtre les IPs déjà insérées dans ml_detected_anomalies dans les DEDUP_TTL_MIN dernières minutes. - Exception : une IP est réinsérée si son nouveau score est ≥ 0.05 points plus bas (aggravation). - """ - if DEDUP_TTL_MIN <= 0 or all_anom.empty: - return all_anom - try: - recent_df = client.query_df( - f"SELECT src_ip, min(anomaly_score) AS best_score " - f"FROM {DB}.ml_detected_anomalies " - f"WHERE detected_at > now() - INTERVAL {DEDUP_TTL_MIN} MINUTE " - f"GROUP BY src_ip" - ) - if recent_df.empty: - return all_anom - recent_map = dict(zip(recent_df['src_ip'], recent_df['best_score'])) - def _should_insert(row): - """Détermine si une anomalie doit être réinsérée selon l'évolution du score.""" - prev = recent_map.get(row['src_ip']) - if prev is None: - return True - # Réinsérer seulement si le score brut s'est significativement aggravé - return float(row.get('raw_anomaly_score', row['anomaly_score'])) < float(prev) - 0.05 - mask = all_anom.apply(_should_insert, axis=1) - filtered = all_anom[mask] - skipped = len(all_anom) - len(filtered) - if skipped > 0: - log_info(f"[Dedup TTL={DEDUP_TTL_MIN}min] {skipped} IP(s) filtrée(s) (déjà détectées récemment).") - return filtered - except Exception as e: - log_info(f"[Dedup] Erreur lors de la déduplication TTL : {e}") - return all_anom - - -# ═══════════════════════════════════════════════════════════════════════════════ -# A3 — ANALYSE MULTI-FENÊTRES : PRÉTRAITEMENT COMMUN -# ═══════════════════════════════════════════════════════════════════════════════ -def _preprocess_df(df: pd.DataFrame) -> pd.DataFrame: - """Normalise les colonnes et remplit les valeurs manquantes (commun 1h et 24h).""" - df.columns = [c.split('.')[-1] for c in df.columns] - for col in ['src_ip', 'ja4', 'host', 'bot_name', 'anubis_bot_name', 'anubis_bot_action', 'anubis_bot_category', - 'asn_number', 'asn_org', 'asn_detail', 'asn_domain', 'country_code', 'asn_label']: - if col in df.columns: - df[col] = df[col].fillna('').astype(str) - - # ── Feature dérivée : navigateur légitime connu (dict_browser_ja4) ── - # is_known_browser : 1 si le JA4 correspond à un navigateur dans dict_browser_ja4. - # Signal fort de légitimité passé à l'IF pour réduire les faux positifs sur les - # vrais navigateurs. Le modèle apprend que browser_family ≠ '' corrèle avec le - # comportement humain normal (navigation, assets, referer, cookies). - df['is_known_browser'] = ( - df.get('browser_family', pd.Series('', index=df.index)).fillna('').astype(str) != '' - ).astype(int) - - # ── browser_consistency_score : [0..5] — cohérence comportementale navigateur ── - # Combine 5 signaux binaires attendus d'un vrai navigateur : - # 1. browser_family reconnu (JA4 dans dict_browser_ja4) - # 2. modern_browser_score ≥ 50 (sec-ch-ua / UA modernes) - # 3. Accept-Language présent - # 4. Cookies présents (session active) - # 5. Sec-Fetch-* présent (absence faible) - # Un score ≥ 4 indique un navigateur cohérent. Un score bas avec - # is_known_browser=1 signale un possible spoofing de JA4. - _mbs = df.get('modern_browser_score', pd.Series(0, index=df.index)).fillna(0) - _hal = df.get('has_accept_language', pd.Series(0, index=df.index)).fillna(0) - _hck = df.get('has_cookie', pd.Series(0, index=df.index)).fillna(0) - _sfa = df.get('sec_fetch_absence_rate', pd.Series(1, index=df.index)).fillna(1) - df['browser_consistency_score'] = ( - df['is_known_browser'] - + (_mbs >= 50).astype(int) - + (_hal > 0).astype(int) - + (_hck > 0).astype(int) - + (_sfa < 0.5).astype(int) - ) - - # ── Features numériques dérivées des labels Anubis (pour IsolationForest) ── - # anubis_is_flagged : 1 si le trafic est marqué WEIGH/CHALLENGE par Anubis - # → signal de suspicion modéré passé à l'IF (ALLOW/DENY sont exclus du pipeline) - df['anubis_is_flagged'] = ( - (df.get('anubis_bot_name', pd.Series('', index=df.index)) != '') & - (~df.get('anubis_bot_action', pd.Series('', index=df.index)).isin(['ALLOW', 'DENY', ''])) - ).astype(int) - - # ── Imputation intelligente des valeurs manquantes ── - # Les features binaires (0/1) et les ratios utilisent -1 comme sentinelle pour "inconnu" - # afin de ne pas biaiser le modèle (0 = valeur légitime pour beaucoup de features). - binary_features = { - 'has_accept_language', 'has_cookie', 'has_referer', 'ua_ch_mismatch', - 'is_ua_rotating', 'is_alpn_missing', 'sni_host_mismatch', 'alpn_http_mismatch', - 'mss_mobile_mismatch', 'anubis_is_flagged', 'is_rare_ja4', - } - for col in df.columns: - if col in binary_features: - df[col] = df[col].fillna(-1) - elif df[col].dtype in ('float64', 'float32', 'int64', 'int32', 'uint64', 'uint32'): - df[col] = df[col].replace([np.inf, -np.inf], np.nan).fillna(df[col].median()) - - return df - - -# ═══════════════════════════════════════════════════════════════════════════════ -# CYCLE PRINCIPAL -# ═══════════════════════════════════════════════════════════════════════════════ -_consecutive_failures = 0 -def fetch_and_analyze(): - """Exécute un cycle complet de détection : requête ClickHouse, scoring et insertion des résultats. - - Récupère le trafic depuis la vue view_ai_features_1h (et optionnellement view_ai_features_24h), - applique run_semi_supervised_logic sur les deux modèles (Complet / Applicatif), - insère les scores dans ml_all_scores et les anomalies dans ml_detected_anomalies. - Met à jour _service_healthy et _consecutive_failures en cas d'échec de requête. - """ - global _consecutive_failures, _service_healthy - cycle_id = datetime.now().strftime('%Y%m%d_%H%M%S') - log_info('') - log_info('=' * 70) - log_info(f' CYCLE {cycle_id}') - log_info('=' * 70) - - client = get_client() - - # ── Récupération du trafic (fenêtre 1h) ────────────────────────────────── - try: - df = client.query_df(f'SELECT * FROM {DB}.view_ai_features_1h') - except Exception as e: - log_info(f'ERREUR REQUETE: {e}') - with _health_lock: - _consecutive_failures += 1 - if _consecutive_failures >= MAX_FAILURES: - _service_healthy = False - log_decision('CONSECUTIVE_FAILURES', cycle_id, '', {'count': _consecutive_failures, 'error': str(e)}) - return - - with _health_lock: - _consecutive_failures = 0 - _service_healthy = True - - if df is None or df.empty: - log_info('[Données] Aucun trafic trouvé dans view_ai_features_1h.') - return - - log_info(f'[Données] {len(df)} sessions chargées depuis view_ai_features_1h ({len(df.columns)} colonnes).') - - # ── Enrichissement avec les features avancées de la thèse §5 ───────────── - try: - df_thesis = client.query_df(f'SELECT * FROM {DB}.view_thesis_features_1h') - if df_thesis is not None and not df_thesis.empty: - df_thesis.columns = [c.split('.')[-1] for c in df_thesis.columns] - df.columns = [c.split('.')[-1] for c in df.columns] - thesis_cols = [c for c in df_thesis.columns if c not in ('window_start', 'src_ip', 'ja4', 'host')] - df = df.merge( - df_thesis, on=['window_start', 'src_ip', 'ja4', 'host'], - how='left', suffixes=('', '_thesis') - ) - for col in thesis_cols: - if col in df.columns: - df[col] = df[col].fillna(df[col].median() if df[col].notna().any() else 0) - log_info(f'[Thèse §5] {len(df_thesis)} sessions enrichies avec {len(thesis_cols)} features avancées.') - else: - log_info('[Thèse §5] view_thesis_features_1h vide — features avancées indisponibles.') - except Exception as e: - log_info(f'[Thèse §5] view_thesis_features_1h inaccessible : {e} — features avancées ignorées.') - - df = _preprocess_df(df) - - # ── Résumé des données chargées ─────────────────────────────────────────── - n_total = len(df) - n_correlated = int((df.get('correlated', pd.Series()) == 1).sum()) - n_uncorrelated = n_total - n_correlated - n_isp = int((df.get('asn_label', pd.Series()) == 'isp').sum()) - n_datacenter = int((df.get('asn_label', pd.Series()) == 'datacenter').sum()) - n_cdn = int((df.get('asn_label', pd.Series()) == 'cdn').sum()) - n_known_bot = int((df.get('bot_name', pd.Series()) != '').sum()) - n_anubis_allow = int((df.get('anubis_bot_action', pd.Series()) == 'ALLOW').sum()) - n_anubis_deny = int((df.get('anubis_bot_action', pd.Series()) == 'DENY').sum()) - n_anubis_weigh = int((df.get('anubis_bot_action', pd.Series()) == 'WEIGH').sum()) - n_unique_ips = int(df['src_ip'].nunique()) if 'src_ip' in df.columns else 0 - n_unique_ja4 = int(df['ja4'].nunique()) if 'ja4' in df.columns else 0 - - log_info(f'[Données] Après preprocessing : {n_total} sessions, {n_unique_ips} IP uniques, {n_unique_ja4} JA4 uniques.') - log_info(f'[Données] Corrélées (L3-L7) : {n_correlated:>6} | Non-corrélées (L7) : {n_uncorrelated:>6}') - log_info(f'[Données] ASN ISP : {n_isp:>6} | Datacenter : {n_datacenter:>6} | CDN : {n_cdn}') - log_info(f'[Données] Bots connus (dict) : {n_known_bot:>6} | Anubis ALLOW : {n_anubis_allow:>6}') - log_info(f'[Données] Anubis DENY : {n_anubis_deny:>6} | Anubis WEIGH : {n_anubis_weigh:>6}') - - # Distribution browser_family - if 'browser_family' in df.columns: - bf_counts = df['browser_family'].value_counts() - bf_known = bf_counts[bf_counts.index != ''] - if not bf_known.empty: - bf_summary = ', '.join(f'{fam}={cnt}' for fam, cnt in bf_known.head(5).items()) - log_info(f'[Données] Navigateurs JA4 : {bf_known.sum():>6} sessions ({len(bf_known)} familles : {bf_summary})') - - log_decision('CYCLE_START', cycle_id, '', { - 'total_rows': n_total, - 'human_rows': n_isp, - 'known_bot_rows': n_known_bot, - 'correlated_rows': n_correlated, - 'anubis_allow_rows': n_anubis_allow, - 'anubis_deny_rows': n_anubis_deny, - 'anubis_weigh_rows': n_anubis_weigh, - 'multiwindow': ENABLE_MULTIWINDOW, - }) - - try: - rec_df = client.query_df(f'SELECT src_ip, recurrence FROM {DB}.view_ip_recurrence') - recurrence_map = dict(zip(rec_df['src_ip'], rec_df['recurrence'])) - except Exception: - recurrence_map = {} - - # ── Feedback SOC : ajuster la baseline selon les classifications humaines ─ - soc_feedback = _load_soc_feedback(client) - if soc_feedback: - fp_ips = {ip for ip, cls in soc_feedback.items() if cls in ('false_positive', 'legitimate')} - tp_ips = {ip for ip, cls in soc_feedback.items() if cls in ('true_positive', 'malicious', 'bot')} - if fp_ips: - # Les faux positifs confirmés rejoignent le pool humain - mask_fp = df['src_ip'].isin(fp_ips) & (df.get('asn_label', pd.Series(dtype=str)) != 'isp') - df.loc[mask_fp, 'asn_label'] = 'isp' - log_info(f"[Feedback] {mask_fp.sum()} lignes reclassées 'isp' (FP confirmés).") - if tp_ips: - # Les vrais positifs confirmés sont exclus de la baseline humaine - mask_tp = df['src_ip'].isin(tp_ips) & (df.get('asn_label', pd.Series(dtype=str)) == 'isp') - df.loc[mask_tp, 'asn_label'] = 'soc_confirmed_bot' - log_info(f"[Feedback] {mask_tp.sum()} lignes exclues de la baseline humaine (TP confirmés).") - log_decision('SOC_FEEDBACK', cycle_id, '', { - 'fp_ips': len(fp_ips), 'tp_ips': len(tp_ips), - 'total_classifications': len(soc_feedback), - }) - - # ── Features par modèle (voir DOCUMENTATION.md §4) ─────────────────────── - # Features communes aux deux modèles (L7 HTTP pur, disponibles correlated=0 et 1) - feats = [ - 'hits', 'hit_velocity', 'fuzzing_index', 'post_ratio', 'port_exhaustion_ratio', - 'orphan_ratio', 'max_keepalives', 'tcp_shared_count', 'header_order_shared_count', - 'header_count', 'has_accept_language', 'has_cookie', 'has_referer', - 'modern_browser_score', 'ua_ch_mismatch', 'ip_id_zero_ratio', - 'request_size_variance', 'multiplexing_efficiency', 'mss_mobile_mismatch', - 'asset_ratio', 'direct_access_ratio', 'is_ua_rotating', 'distinct_ja4_count', - 'src_port_density', 'ja4_asn_concentration', 'ja4_country_concentration', 'is_rare_ja4', - 'header_order_confidence', 'distinct_header_orders', 'temporal_entropy', - 'path_diversity_ratio', 'url_depth_variance', 'anomalous_payload_ratio', - # B4-B7 : features L7 pures (disponibles correlated=0 et 1) - 'head_ratio', 'sec_fetch_absence_rate', 'generic_accept_ratio', 'http10_ratio', - # Anubis : signal de suspicion modéré (WEIGH/CHALLENGE) — bypass pour ALLOW/DENY - 'anubis_is_flagged', - # Browser : signaux de navigateur légitime (dict_browser_ja4 + cohérence comportementale) - 'is_known_browser', 'browser_consistency_score', - # HTTP : header incomplet et usage HTTP plain (disponibles pour les deux modèles) - 'missing_accept_enc_ratio', 'http_scheme_ratio', - # ── Thèse §5 : features avancées (optionnelles — ignorées si indisponibles) ── - # §5.1 — Entropie de séquence de chemins - 'path_transition_entropy', - # §5.3 — Cadence inter-requêtes - 'cadence_cv', 'burst_ratio', 'pause_ratio', - 'lag1_autocorrelation', 'benford_deviation', - # §5.8 — Cross-domain (par IP, sans décomposition host) - 'host_diversity', 'host_sweep_speed', 'host_coverage_uniformity', - ] - # Features supplémentaires pour le modèle Complet (nécessitent des données TCP/TLS) - feats_complet = feats + [ - 'tcp_jitter_variance', 'alpn_http_mismatch', 'is_alpn_missing', 'sni_host_mismatch', - # B1-B3, B8 : features TLS/TCP (disponibles correlated=1 uniquement) - 'ja3_diversity_ratio', 'syn_timing_cv', 'tls12_ratio', 'ip_df_variance', - # TTL fingerprinting OS + TCP window scale (L4 uniquement) - 'avg_ttl', 'ttl_std', 'no_window_scale_ratio', - # §5.5 — Dérive JA4 intra-session (nécessite corrélation pour JA4 fiable) - 'ja4_drift_ratio', - ] - - # ── Analyse fenêtre 1h ──────────────────────────────────────────────────── - df_corr = df[df['correlated'] == 1].copy() - df_uncorr = df[df['correlated'] == 0].copy() - log_info('') - log_info(f'── Modèle Complet (L3→L7, corrélé) : {len(df_corr)} sessions, {len(feats_complet)} features ──') - anom_a, scored_a = run_semi_supervised_logic(df_corr, feats_complet, 'Complet', cycle_id, recurrence_map) - log_info('') - log_info(f'── Modèle Applicatif (L7 seul, non-corrélé) : {len(df_uncorr)} sessions, {len(feats)} features ──') - anom_b, scored_b = run_semi_supervised_logic(df_uncorr, feats, 'Applicatif', cycle_id, recurrence_map) - all_anom = pd.concat([anom_a, anom_b], ignore_index=True) - all_scored = pd.concat([scored_a, scored_b], ignore_index=True) - - # ── A3 : Analyse fenêtre 24h (optionnelle) ──────────────────────────────── - if ENABLE_MULTIWINDOW: - try: - df_24h = client.query_df(f'SELECT * FROM {DB}.{MULTIWINDOW_VIEW}') - if df_24h is not None and not df_24h.empty: - df_24h = _preprocess_df(df_24h) - log_info(f"[24h] {len(df_24h)} sessions dans la fenêtre 24h.") - anom_c, scored_c = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 1].copy(), feats_complet, 'Complet_24h', cycle_id, recurrence_map) - anom_d, scored_d = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 0].copy(), feats, 'Applicatif_24h', cycle_id, recurrence_map) - all_anom_24h = pd.concat([anom_c, anom_d], ignore_index=True) - all_scored_24h = pd.concat([scored_c, scored_d], ignore_index=True) - # Fusion : pour les IPs présentes dans les deux fenêtres, conserver le score le plus bas - if not all_anom_24h.empty: - all_anom = pd.concat([all_anom, all_anom_24h], ignore_index=True) - log_info(f"[24h] Fusion 1h+24h : {len(all_anom)} entrées avant déduplication.") - all_scored = pd.concat([all_scored, all_scored_24h], ignore_index=True) - else: - log_info(f"[24h] Vue {MULTIWINDOW_VIEW} vide — analyse mono-fenêtre.") - except Exception as e: - log_info(f"[24h] Vue {MULTIWINDOW_VIEW} inaccessible : {e} — analyse mono-fenêtre.") - - # ── Insertion de toutes les classifications dans ml_all_scores ─────────── - if not all_scored.empty: - try: - now = datetime.now().replace(microsecond=0) - all_scored['detected_at'] = now - all_scored['ja4'] = all_scored['ja4'].replace({'': 'HTTP_CLEAR_TEXT'}) - all_scores_cols = [ - 'detected_at', 'window_start', 'src_ip', 'ja4', 'host', 'bot_name', - 'browser_family', - 'anubis_bot_name', 'anubis_bot_action', 'anubis_bot_category', - 'anomaly_score', 'raw_anomaly_score', 'threat_level', 'model_name', - 'correlated', 'asn_number', 'asn_org', 'country_code', 'asn_label', - 'hits', 'hit_velocity', 'fuzzing_index', 'post_ratio', 'campaign_id', - 'ae_recon_error', 'xgb_prob' - ] - scores_df = all_scored[[c for c in all_scores_cols if c in all_scored.columns]] - client.insert_df(f'{DB}.ml_all_scores', scores_df) - log_info(f'[ml_all_scores] {len(scores_df)} sessions scorées enregistrées.') - except Exception as e: - log_info(f'[ml_all_scores] ERREUR INSERTION: {e}') - - if not all_anom.empty: - all_anom = all_anom.sort_values('raw_anomaly_score', ascending=True).drop_duplicates(subset=['src_ip'], keep='first') - log_info(f'[Dédup] Intra-cycle : {len(all_anom)} IP uniques après déduplication.') - - # A5 — Déduplication inter-cycles avec TTL - all_anom = _filter_recent_detections(client, all_anom) - - if all_anom.empty: - log_info('[Dédup] Toutes les anomalies filtrées par TTL — rien à insérer.') - log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN}) - return - - all_anom['detected_at'] = datetime.now().replace(microsecond=0) - fake_nav_col = 'is_fake_navigation' - all_anom['is_headless'] = all_anom[fake_nav_col].astype(int) if fake_nav_col in all_anom.columns else 0 - - cols = [ - 'detected_at', 'src_ip', 'ja4', 'host', 'bot_name', 'browser_family', 'anomaly_score', - 'raw_anomaly_score', 'campaign_id', - 'threat_level', 'model_name', 'recurrence', - 'asn_number', 'asn_org', 'asn_detail', 'asn_domain', 'country_code', 'asn_label', - 'hits', 'hit_velocity', 'fuzzing_index', 'post_ratio', 'port_exhaustion_ratio', 'max_keepalives', 'orphan_ratio', - 'tcp_jitter_variance', 'tcp_shared_count', 'true_window_size', 'window_mss_ratio', - 'alpn_http_mismatch', 'is_alpn_missing', 'sni_host_mismatch', - 'header_count', 'has_accept_language', 'has_cookie', 'has_referer', - 'modern_browser_score', 'is_headless', 'ua_ch_mismatch', - 'header_order_shared_count', 'ip_id_zero_ratio', 'request_size_variance', - 'multiplexing_efficiency', 'mss_mobile_mismatch', - 'correlated', 'reason', 'asset_ratio', 'direct_access_ratio', 'is_ua_rotating', - 'distinct_ja4_count', 'src_port_density', 'ja4_asn_concentration', - 'ja4_country_concentration', 'is_rare_ja4', - 'header_order_confidence', 'distinct_header_orders', 'temporal_entropy', - 'path_diversity_ratio', 'url_depth_variance', 'anomalous_payload_ratio', - 'anubis_bot_name', 'anubis_bot_action', 'anubis_bot_category', - ] - - try: - final_df = all_anom[[c for c in cols if c in all_anom.columns]] - client.insert_df(f'{DB}.ml_detected_anomalies', final_df) - n_critical = int((final_df.get('threat_level', pd.Series()) == 'CRITICAL').sum()) - n_high = int((final_df.get('threat_level', pd.Series()) == 'HIGH').sum()) - n_medium = int((final_df.get('threat_level', pd.Series()) == 'MEDIUM').sum()) - n_known = int((final_df.get('bot_name', pd.Series()) != '').sum()) - log_info('') - log_info(f'╔══ RÉSULTAT CYCLE {cycle_id} ══════════════════════════════════') - log_info(f'║ {len(final_df)} menaces insérées dans ml_detected_anomalies') - log_info(f'║ CRITICAL={n_critical} HIGH={n_high} MEDIUM={n_medium} KNOWN_BOT={n_known}') - if not all_scored.empty: - log_info(f'║ {len(all_scored)} sessions scorées dans ml_all_scores') - log_info(f'╚═══════════════════════════════════════════════════════════') - log_decision('CYCLE_END', cycle_id, '', { - 'inserted': len(final_df), - 'anomalies': int((final_df.get('bot_name', pd.Series()) == '').sum()), - 'known_bots': n_known, - 'critical': n_critical, - 'high': n_high, - 'dedup_ttl_min': DEDUP_TTL_MIN, - }) - except Exception as e: - log_info(f'ERREUR INSERTION: {e}') - else: - log_info('') - log_info(f'╔══ RÉSULTAT CYCLE {cycle_id} ══════════════════════════════════') - log_info(f'║ Aucune menace détectée ce cycle.') - if not all_scored.empty: - log_info(f'║ {len(all_scored)} sessions scorées dans ml_all_scores') - tl_dist = all_scored['threat_level'].value_counts().to_dict() if not all_scored.empty and 'threat_level' in all_scored.columns else {} - if tl_dist: - log_info(f'║ Distribution : {", ".join(f"{k}={v}" for k, v in sorted(tl_dist.items()))}') - log_info(f'╚═══════════════════════════════════════════════════════════') - log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN}) - -if __name__ == '__main__': - log_info('') - log_info('╔═══════════════════════════════════════════════════════════════╗') - log_info('║ BOT DETECTOR IA v12 — Pipeline de détection semi-supervisé ║') - log_info('╚═══════════════════════════════════════════════════════════════╝') - log_info(f' Base de données : {DB}') - log_info(f' Contamination EIF : {CONTAMINATION}') - log_info(f' Seuil anomalie : {ANOMALY_THRESHOLD} (adaptatif p{ANOMALY_PERCENTILE})') - log_info(f' Cycle d\'analyse : toutes les {CYCLE_INTERVAL}s') - log_info(f' Retraining : toutes les {RETRAIN_INTERVAL_H}h (drift seuil={DRIFT_THRESHOLD:.0%})') - log_info(f' Modèles : {MODEL_DIR}') - log_info(f' Autoencoder (AE) : poids={AE_WEIGHT} {"(PyTorch disponible)" if TORCH_AVAILABLE else "(PyTorch absent — désactivé)"}') - log_info(f' XGBoost : poids={XGB_WEIGHT} (min labels={XGB_MIN_LABELS})') - log_info(f' SHAP explainabilité : {"activé" if ENABLE_SHAP else "désactivé (shap non installé)" if not SHAP_AVAILABLE else "désactivé"}') - log_info(f' Clustering HDBSCAN : {"activé" if ENABLE_CLUSTERING else "désactivé"}') - log_info(f' Dédup inter-cycles : TTL={DEDUP_TTL_MIN}min') - log_info(f' Récurrence : weight={RECURRENCE_WEIGHT}') - log_info(f' Multi-fenêtres (24h) : {"activé" if ENABLE_MULTIWINDOW else "désactivé"}') - log_info(f' Feature ratio minimum : {MIN_VALID_FEATURE_RATIO:.0%}') - log_info(f' Anubis : ALLOW→KNOWN_BOT, DENY→forcé menace (score IF réel)') - log_info(f' Browser légitime : consistency≥{BROWSER_LEGIT_MIN_CONSISTENCY}/5 + JA4 reconnu') - log_info(f' Feedback SOC : {"activé" if ENABLE_FEEDBACK else "désactivé"} (fenêtre={FEEDBACK_WINDOW_DAYS}j)') - log_info('') - log_decision('SERVICE_START', 'boot', '', { - 'db': DB, 'contamination': CONTAMINATION, 'anomaly_threshold': ANOMALY_THRESHOLD, - 'cycle_interval': CYCLE_INTERVAL, 'retrain_interval_h': RETRAIN_INTERVAL_H - }) - log_info(f'En attente du premier cycle dans {CYCLE_INTERVAL}s…') - while True: - try: fetch_and_analyze() - except Exception as e: log_info(f"Erreur globale : {e}") - time.sleep(CYCLE_INTERVAL) - diff --git a/services/bot-detector/bot_detector/tests/test_detector.py b/services/bot-detector/bot_detector/tests/test_detector.py index ce5daa3..12d4e60 100644 --- a/services/bot-detector/bot_detector/tests/test_detector.py +++ b/services/bot-detector/bot_detector/tests/test_detector.py @@ -626,23 +626,23 @@ def test_browser_consistency_score_range(): def test_legitimate_browser_classification_threshold(): - """LEGITIMATE_BROWSER requires browser_family + consistency >= threshold + NORMAL/LOW threat.""" - BROWSER_LEGIT_MIN_CONSISTENCY = 4 + """LEGITIMATE_BROWSER requires browser_confidence >= threshold + family + NORMAL/LOW threat.""" + BROWSER_CONFIDENCE_THRESHOLD = 0.55 sessions = [ - # (browser_family, bcs, threat_level) → expected classification - ('Chromium', 5, 'NORMAL'), # → LEGITIMATE_BROWSER - ('Chromium', 5, 'MEDIUM'), # threat too high → keep MEDIUM - ('Firefox', 4, 'LOW'), # → LEGITIMATE_BROWSER - ('Firefox', 3, 'NORMAL'), # consistency too low → keep NORMAL - ('', 5, 'NORMAL'), # no browser → keep NORMAL (can't be 5 without browser, but edge case) - ('Chromium', 5, 'ANUBIS_DENY'), # Anubis DENY → keep ANUBIS_DENY + # (inferred_browser_family, browser_confidence, threat_level) → expected + ('Chromium', 0.80, 'NORMAL'), # → LEGITIMATE_BROWSER + ('Chromium', 0.80, 'MEDIUM'), # threat too high → keep MEDIUM + ('Firefox', 0.60, 'LOW'), # → LEGITIMATE_BROWSER + ('Firefox', 0.40, 'NORMAL'), # confidence too low → keep NORMAL + ('', 0.90, 'NORMAL'), # no family → keep NORMAL + ('Chromium', 0.80, 'ANUBIS_DENY'), # Anubis DENY → keep ANUBIS_DENY ] results = [] - for bf, bcs, tl in sessions: + for bf, conf, tl in sessions: is_legit = ( + conf >= BROWSER_CONFIDENCE_THRESHOLD and bf != '' and - bcs >= BROWSER_LEGIT_MIN_CONSISTENCY and tl in ('NORMAL', 'LOW') ) results.append('LEGITIMATE_BROWSER' if is_legit else tl) @@ -673,17 +673,18 @@ def test_legitimate_browser_excluded_from_anomalies(): def test_browser_spoofing_detection(): - """Inconsistent browser behavior (known JA4 but low consistency) stays in normal scoring.""" - BROWSER_LEGIT_MIN_CONSISTENCY = 4 + """Spoofed browser (known JA4 but low overall confidence) stays in normal scoring.""" + BROWSER_CONFIDENCE_THRESHOLD = 0.55 - # Spoofed: JA4 looks like Chrome but no cookies, no Accept-Language, high sec_fetch_absence + # Spoofed: JA4 looks like Chrome (axis_ja4_known=1) but no cookies, + # no Accept-Language, high sec_fetch_absence → low overall confidence spoofed_bf = 'Chromium' - spoofed_bcs = 1 # only is_known_browser=1, all others fail + spoofed_confidence = 0.30 # only JA4 known axis scores high spoofed_tl = 'MEDIUM' is_legit = ( + spoofed_confidence >= BROWSER_CONFIDENCE_THRESHOLD and spoofed_bf != '' and - spoofed_bcs >= BROWSER_LEGIT_MIN_CONSISTENCY and spoofed_tl in ('NORMAL', 'LOW') ) assert not is_legit, "Spoofed browser should NOT be classified as LEGITIMATE_BROWSER"