"""Configuration centralisée du bot-detector. Toutes les variables d'environnement, constantes et imports optionnels. Aucun effet de bord (pas de logging, pas de connexion) — pur configuration. """ import os import re import warnings warnings.filterwarnings('ignore') # ─── Utilitaire de lecture d'env var ──────────────────────────────────────── def _require_float(name, default, lo=None, hi=None): """Lit une variable d'environnement comme flottant et valide la plage si spécifiée. Lève SystemExit si la valeur est non numérique ou hors plage (lo, hi) exclusive. """ raw = os.getenv(name, str(default)) try: v = float(raw) except ValueError: raise SystemExit(f"[CONFIG] {name}={raw!r} invalide — doit être un nombre décimal.") if lo is not None and not (lo < v < hi): raise SystemExit(f"[CONFIG] {name}={v} hors plage ({lo} < valeur < {hi}).") return v # ─── Validation identifiants SQL ──────────────────────────────────────────── _SAFE_IDENTIFIER_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') # ─── ClickHouse ───────────────────────────────────────────────────────────── DB = os.getenv('CLICKHOUSE_DB_PROCESSING', os.getenv('CLICKHOUSE_DB', 'ja4_processing')) DB_LOGS = os.getenv('CLICKHOUSE_DB_LOGS', 'ja4_logs') for _db_name, _db_val in [('CLICKHOUSE_DB_PROCESSING', DB), ('CLICKHOUSE_DB_LOGS', DB_LOGS)]: if not _SAFE_IDENTIFIER_RE.match(_db_val): raise SystemExit(f"[CONFIG] {_db_name}={_db_val!r} invalide — doit être un identifiant SQL valide.") # ─── Isolation Forest ─────────────────────────────────────────────────────── CONTAMINATION = _require_float('ISOLATION_CONTAMINATION', 0.001, 0, 0.5) N_ESTIMATORS = int(os.getenv('N_ESTIMATORS', '300')) ANOMALY_THRESHOLD = _require_float('ANOMALY_THRESHOLD', -0.05) ANOMALY_PERCENTILE = int(os.getenv('ANOMALY_PERCENTILE', '5')) # ─── Modèles ─────────────────────────────────────────────────────────────── MODEL_DIR = os.getenv('MODEL_DIR', '/var/lib/bot_detector') MODEL_HISTORY_COUNT = int(os.getenv('MODEL_HISTORY_COUNT', '10')) RETRAIN_INTERVAL_H = int(os.getenv('RETRAIN_INTERVAL_HOURS', '24')) DRIFT_THRESHOLD = _require_float('DRIFT_THRESHOLD', 0.30, 0, 1) MIN_VALID_FEATURE_RATIO = _require_float('MIN_VALID_FEATURE_RATIO', 0.50, 0, 1) MAX_FAILURES = int(os.getenv('MAX_CONSECUTIVE_FAILURES', '3')) # ─── Cycle & Logging ─────────────────────────────────────────────────────── CYCLE_INTERVAL = int(os.getenv('CYCLE_INTERVAL_SEC', '300')) LOG_FILE = os.getenv('BOT_DETECTOR_LOG', '/var/log/bot_detector/decisions.jsonl') LOG_BACKUP_COUNT = int(os.getenv('LOG_BACKUP_COUNT', '7')) TRAINING_HISTORY_FILE = os.path.join(MODEL_DIR, 'training_history.jsonl') # ─── Health check ─────────────────────────────────────────────────────────── HEALTH_PORT = int(os.getenv('HEALTH_PORT', '8080')) # ─── Déduplication et récurrence ──────────────────────────────────────────── DEDUP_TTL_MIN = int(os.getenv('DEDUP_TTL_MIN', '60')) RECURRENCE_WEIGHT = _require_float('RECURRENCE_WEIGHT', 0.005) # ─── Baseline minimum — nombre minimum de sessions humaines pour l'IF ───── MIN_HUMAN_BASELINE = int(os.getenv('MIN_HUMAN_BASELINE', '500')) # En mode test, les IPs privées n'ont pas d'ASN 'isp' — utiliser 'unknown' comme fallback BASELINE_ACCEPT_UNKNOWN = os.getenv('BASELINE_ACCEPT_UNKNOWN', 'false').lower() == 'true' # ─── Autoencoder (AE) — second scorer parallèle ──────────────────────────── AE_WEIGHT = _require_float('AE_WEIGHT', 0.30, 0, 1) AE_EPOCHS = int(os.getenv('AE_EPOCHS', '50')) AE_LATENT_DIM = int(os.getenv('AE_LATENT_DIM', '16')) AE_LEARNING_RATE = float(os.getenv('AE_LEARNING_RATE', '1e-3')) # ─── NFEnsemble — Deep Ensemble (M=5) incertitude ────────────────────────── NF_UNCERTAINTY_THRESHOLD = float(os.getenv('NF_UNCERTAINTY_THRESHOLD', '1.0')) SESSION_TRANSFORMER_PATH = os.getenv( 'SESSION_TRANSFORMER_PATH', os.path.join(MODEL_DIR, 'session_transformer.pt') ) # ─── XGBoost — troisième voix supervisée ──────────────────────────────────── XGB_WEIGHT = _require_float('XGB_WEIGHT', 0.20, 0, 1) XGB_MIN_LABELS = int(os.getenv('XGB_MIN_LABELS', '100')) XGB_RETRAIN_INTERVAL_H = int(os.getenv('XGB_RETRAIN_INTERVAL_HOURS', '168')) # ─── A9 — Classification multifactorielle des navigateurs ────────────────── BROWSER_CONFIDENCE_THRESHOLD = _require_float('BROWSER_CONFIDENCE_THRESHOLD', 0.55, 0, 1) BROWSER_COHORT_RATIO = _require_float('BROWSER_COHORT_RATIO', 0.70, 0, 1) # ─── SHAP / Clustering / Multi-fenêtres / Feedback ───────────────────────── ENABLE_CLUSTERING = os.getenv('ENABLE_CLUSTERING', 'true').lower() == 'true' CLUSTERING_MIN_SAMPLES = int(os.getenv('CLUSTERING_MIN_SAMPLES', '3')) ENABLE_MULTIWINDOW = os.getenv('ENABLE_MULTIWINDOW', 'false').lower() == 'true' MULTIWINDOW_VIEW = os.getenv('MULTIWINDOW_VIEW', 'view_ai_features_24h') if not _SAFE_IDENTIFIER_RE.match(MULTIWINDOW_VIEW): raise SystemExit(f"[CONFIG] MULTIWINDOW_VIEW={MULTIWINDOW_VIEW!r} invalide.") ENABLE_FEEDBACK = os.getenv('ENABLE_FEEDBACK', 'true').lower() == 'true' FEEDBACK_WINDOW_DAYS = int(os.getenv('FEEDBACK_WINDOW_DAYS', '7')) # ─── Features structurellement exclues par modèle ────────────────────────── STRUCTURAL_EXCLUDED_FEATURES: dict[str, list] = { 'Complet': ['orphan_ratio'], 'Applicatif': ['orphan_ratio', 'is_rare_ja4', 'tcp_shared_count', 'request_size_variance', 'mss_mobile_mismatch', 'ja3_diversity_ratio', 'syn_timing_cv', 'tls12_ratio', 'ip_df_variance', 'avg_ttl', 'ttl_std', 'no_window_scale_ratio', 'ja4_drift_ratio', 'true_window_size', 'window_mss_ratio'], } # ─── Imports optionnels (bibliothèques lourdes) ──────────────────────────── try: from isotree import IsolationForest as ExtendedIsolationForest EIF_AVAILABLE = True except ImportError: EIF_AVAILABLE = False from sklearn.ensemble import IsolationForest try: import hdbscan as _hdbscan HDBSCAN_AVAILABLE = True except ImportError: HDBSCAN_AVAILABLE = False from sklearn.cluster import DBSCAN from sklearn.preprocessing import StandardScaler try: import shap as _shap SHAP_AVAILABLE = True except ImportError: SHAP_AVAILABLE = False ENABLE_SHAP = SHAP_AVAILABLE and os.getenv('ENABLE_SHAP', 'true').lower() == 'true' try: import torch import torch.nn as nn TORCH_AVAILABLE = True except ImportError: TORCH_AVAILABLE = False try: import xgboost as xgb XGB_AVAILABLE = True except ImportError: XGB_AVAILABLE = False