diff --git a/Makefile b/Makefile index 3b48331..8cc7acc 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,7 @@ VERSION ?= $(shell git describe --tags --always 2>/dev/null || echo dev) test-all-stacks test-nginx test-nginx-varnish test-hitch-varnish test-apache \ test-matrix \ test-vm-nginx test-vm-all vm-up vm-down vm-ssh \ + e2e-up e2e-down test-e2e test-e2e-quick \ reload-prod-logs init-stack import-prod-data init-and-import \ purge-db @@ -60,6 +61,12 @@ help: ## Affiche cette aide @echo " make test-matrix Toutes stacks × el8/el9/el10" @echo " make test-matrix MATRIX_STACKS=nginx,apache MATRIX_DISTROS=el9,el10" @echo "" + @echo " Tests E2E distribués (full stack : capture + ML + dashboard)" + @echo " make e2e-up Créer les 4 VMs (endpoints + analysis)" + @echo " make e2e-down Détruire les VMs E2E" + @echo " make test-e2e Test E2E complet" + @echo " make test-e2e-quick Test E2E rapide (trafic réduit)" + @echo "" @echo " RPM" @echo " make rpm-all Construit tous les RPMs ja4ebpf (el8/el9/el10)" @echo " make rpm-ja4ebpf RPMs ja4ebpf (el8, el9, el10)" @@ -275,6 +282,27 @@ test-matrix: ## Toutes stacks × el8 + el9 + el10 $${MATRIX_STACKS:+--stacks=$${MATRIX_STACKS}} \ $${MATRIX_DISTROS:+--distros=$${MATRIX_DISTROS}} +# ── Tests E2E distribués (full stack) ────────────────────────────────────── + +E2E_VMS := centos8 rocky9 rocky10 analysis + +e2e-up: ## Créer les 4 VMs pour le test E2E distribué + cd $(VM_DIR) && vagrant up $(E2E_VMS) + +e2e-down: ## Détruire les VMs E2E + cd $(VM_DIR) && vagrant destroy -f $(E2E_VMS) + +e2e-rsync: ## Synchroniser les fichiers vers toutes les VMs E2E + @for vm in $(E2E_VMS); do \ + cd $(CURDIR)/$(VM_DIR) && vagrant rsync $$vm; \ + done + +test-e2e: ## Test E2E complet (capture + ML + dashboard) + bash tests/vm/run-e2e-test.sh + +test-e2e-quick: ## Test E2E rapide (trafic réduit, 1 cycle bot-detector) + TRAFFIC_COUNT=100 bash tests/vm/run-e2e-test.sh + # ── Base de données ─────────────────────────────────────────────────────────── reload-prod-logs: diff --git a/docs/architecture.md b/docs/architecture.md index 0ebae55..d351c6e 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -100,12 +100,13 @@ INSERT (Native TCP :9000) - **Applicatif** (L7 seulement, ~73 features, `correlated=0`) — trafic HTTP non corrélé - **Ensemble triple voix** : - **Extended Isolation Forest** (isotree) — scoreur non supervisé principal - - **Autoencoder** (PyTorch, architecture n→64→32→16→32→64→n) — erreur de reconstruction + - **NFEnsemble** (PyTorch, Deep Ensemble M=5 TrafficNormalizingFlow/RealNVP) — NLL et incertitude épistémique - **XGBoost** — supervisé, entraîné sur les labels SOC (`soc_feedback`) - **Score final** : `final = meta_learner.predict(eif_norm, ae_norm, xgb_prob, volume, correlated)` avec fallback sur pondération linéaire fixe `(1-β) × ((1-α) × eif_norm + α × ae_norm) + β × xgb_prob` (α=0.30, β=0.20) - - **MetaLearner** (régression logistique) entraîné automatiquement sur les labels accumulés (seuil: 1000 labels) - - **Seuil adaptatif** par percentile, détection de dérive conceptuelle (KS + KL divergence) - - **fleet_detector** (NetworkX) — graphe bipartite JA4×ASN, `fleet_score`, table `fleet_detections` + - **MetaLearner** (MLP) entraîné automatiquement sur les labels accumulés (seuil: 1000 labels) + - **Seuil adaptatif** par percentile, détection de dérive conceptuelle (ADWIN + KS + KL divergence) + - **Détection adversariale** : incertitude inter-modèles NFEnsemble (`nf_uncertainty > NF_UNCERTAINTY_THRESHOLD`) + - **fleet_detector** (PyTorch Geometric GraphSAGE) — graphe bipartite JA4×ASN, `fleet_score`, table `fleet_detections` - **HDBSCAN** — regroupement en campagnes d'attaque - **Détection de navigateur** — 6 axes multifactoriels (confiance ≥ 0.55 → `LEGITIMATE_BROWSER`) - **ExIFFI** — importance de features native à l'EIF (alternative à SHAP) @@ -211,7 +212,7 @@ view_ip_recurrence ───┤ │ Pré- │ │ │ │ │ ┌────────────┐ │ Pour chaque branche : │ │ Ensemble │ │ ├── Extended Isolation Forest (EIF) - │ │ triple │──▶│ ├── Autoencoder (PyTorch) + │ │ triple │──▶│ ├── NFEnsemble (M=5 NF, PyTorch) │ │ voix │ │ └── XGBoost (supervisé) │ └────────────┘ │ │ │ Score = MetaLearner(eif, ae, xgb) ou @@ -264,9 +265,9 @@ Les deux empreintes sont générées par **ja4ebpf** (espace utilisateur Go) à | Capture applicative (L7) | eBPF uprobe SSL_read + kprobe tcp_recvmsg | | Corrélation en mémoire | Go 1.24.6 (256-shard manager, goroutines) | | Détection ML — EIF | Python 3.11 + isotree | -| Détection ML — Autoencoder | Python 3.11 + PyTorch | +| Détection ML — NFEnsemble | Python 3.11 + PyTorch | | Détection ML — Supervisé | Python 3.11 + XGBoost | -| Détection ML — Ensemble | Python 3.11 + MetaLearner (régression logistique) | +| Détection ML — Ensemble | Python 3.11 + MetaLearner (MLP) | | Clustering de campagnes | HDBSCAN + NetworkX (fleet detection) | | Explicabilité | SHAP + ExIFFI | | Backend dashboard | FastAPI + Jinja2 (Python 3.11) | diff --git a/docs/database/schema.md b/docs/database/schema.md index 6a26ff0..f878508 100644 --- a/docs/database/schema.md +++ b/docs/database/schema.md @@ -70,15 +70,16 @@ Table d'ingestion brute — cible directe des INSERTs du correlator. ### http_logs Table de logs HTTP parsés et enrichis — alimentée par la vue matérialisée -`mv_http_logs`. +`mv_http_logs`. La MV utilise `nullIf` pour `src_ip` et `dst_ip` afin de +convertir les chaînes vides en `0.0.0.0` au lieu de provoquer une erreur de parse. | Colonne | Type | Description | |---------|------|-------------| | `time` | DateTime | Horodatage de la requête | | `log_date` | Date DEFAULT `toDate(time)` | Clé de partition | -| `src_ip` | IPv4 | IP source du client | +| `src_ip` | IPv4 | IP source du client (vide → `0.0.0.0` via `nullIf`) | | `src_port` | UInt16 | Port source | -| `dst_ip` | IPv4 | IP destination du serveur | +| `dst_ip` | IPv4 | IP destination du serveur (vide → `0.0.0.0` via `nullIf`) | | `dst_port` | UInt16 | Port destination | | `src_asn` | UInt32 | ASN source (enrichi via dict_iplocate_asn) | | `src_country_code` | LowCardinality(String) | Code pays | diff --git a/docs/deployment.md b/docs/deployment.md index 6113445..8c35e95 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -288,7 +288,10 @@ Variables d'environnement clés : | `ANOMALY_THRESHOLD` | `-0.05` | Seuil de détection d'anomalies (fallback) | | `CYCLE_INTERVAL_SEC` | `300` | Intervalle entre cycles de détection (secondes) | | `RETRAIN_INTERVAL_HOURS` | `24` | Intervalle de réentraînement des modèles | -| `AE_WEIGHT` | `0.30` | Poids de l'Autoencoder dans l'ensemble (α) | +| `AE_WEIGHT` | `0.30` | Poids du NFEnsemble dans l'ensemble (α) | +| `NF_UNCERTAINTY_THRESHOLD` | `1.0` | Seuil d'incertitude inter-modèles pour détection adversariale | +| `MIN_HUMAN_BASELINE` | `500` | Nombre minimum de sessions humaines pour entraîner l'IF | +| `BASELINE_ACCEPT_UNKNOWN` | `false` | Mode test : fallback ASN `unknown` si baseline ISP insuffisante | | `XGB_WEIGHT` | `0.20` | Poids de XGBoost dans l'ensemble (β) | | `ENABLE_MULTIWINDOW` | `false` | Active les variantes 24h (Complet/Applicatif) | | `HEALTH_PORT` | `8080` | Port du endpoint /health | @@ -462,7 +465,7 @@ Télécharge et génère tous les fichiers CSV de référence (bot IPs, JA4, ASN | +-----------------+ SELECT view_ai_features | bot-detector |<-- view_thesis_features -| (EIF+AE+XGB) | +| (EIF+NF+XGB) | | |--> INSERT ml_all_scores, ml_detected_anomalies +-----------------+ +-----------------+ diff --git a/docs/development.md b/docs/development.md index a8c6345..a0e3dd3 100644 --- a/docs/development.md +++ b/docs/development.md @@ -36,6 +36,15 @@ make build-dashboard # Image FastAPI + Jinja2 make test-all ``` +### Tests E2E distribués (full stack : capture + ML + dashboard) + +```bash +make e2e-up # Créer les 4 VMs (centos8/rocky9/rocky10/analysis) +make e2e-down # Détruire les VMs E2E +make test-e2e # Test E2E complet (capture + ML + dashboard) +make test-e2e-quick # Test E2E rapide (trafic réduit, 1 cycle bot-detector) +``` + ### Tests par service | Service | Commande | Détails | @@ -122,7 +131,7 @@ uvicorn backend.main:app --reload --host 0.0.0.0 --port 8000 | Librairie | Usage | |-----------|-------| | `isotree` | Extended Isolation Forest (scoreur principal non supervisé) | -| `torch` | Autoencoder (PyTorch, architecture n->64->32->16->32->64->n) | +| `torch` | NFEnsemble / TrafficNormalizingFlow (PyTorch, Deep Ensemble M=5) | | `xgboost` | Modèle supervisé (entraîné sur les labels SOC) | | `hdbscan` | Clustering de campagnes d'attaque | | `shap` | Explicabilité des scores d'anomalie | @@ -139,8 +148,8 @@ services/bot-detector/bot_detector/ ├── log.py # Configuration du logging ├── infra.py # Connexion ClickHouse, health check ├── preprocessing.py # Feature engineering, filtrage, normalisation -├── models.py # EIF, Autoencoder, XGBoost (entraînement + scoring) -├── scoring.py # Ensemble triple voix, seuils adaptatifs +├── models.py # EIF, NFEnsemble/TrafficNormalizingFlow, XGBoost (entraînement + scoring) +├── scoring.py # Ensemble triple voix, seuils adaptatifs, ADWIN drift ├── browser.py # Détection de navigateur 5 axes multifactoriels ├── pipeline.py # Orchestration du cycle de détection ├── cycle.py # Boucle principale (cycle de 5 minutes) diff --git a/docs/services/bot-detector.md b/docs/services/bot-detector.md index 40ed205..daeed93 100644 --- a/docs/services/bot-detector.md +++ b/docs/services/bot-detector.md @@ -3,9 +3,9 @@ Service Python de détection d'anomalies par apprentissage automatique semi-supervisé sur le trafic HTTP/TLS agrégé dans ClickHouse. Fonctionne en cycle continu (par défaut toutes les 5 minutes) avec un **ensemble à triple voix** -(Extended Isolation Forest + Autoencoder + XGBoost) piloté par un **méta-learner -à régression logistique**, enrichi par l'explicabilité **ExIFFI** et **SHAP**, -le clustering HDBSCAN, la détection de flottes coordonnées (NetworkX) et +(Extended Isolation Forest + NFEnsemble + XGBoost) piloté par un **méta-learner +MLP**, enrichi par l'explicabilité **ExIFFI** et **SHAP**, +le clustering HDBSCAN, la détection de flottes coordonnées (PyTorch Geometric GraphSAGE) et la surveillance de performance par cycle. --- @@ -23,8 +23,8 @@ __main__.py Point d'entrée (python -m bot_detector) ├─ preprocessing.py Nettoyage du DataFrame, imputation, listes de features (FEATURES, FEATURES_COMPLET) │ └─ browser.py Identification multifactorielle des navigateurs (6 axes) ├─ pipeline.py Orchestration : filtrage → entraînement → MetaLearner → ExIFFI → scoring → fusion - │ ├─ models.py EIF, TrafficAutoEncoder (PyTorch), XGBoost - │ └─ scoring.py Normalisation, MetaLearner, seuil adaptatif, ExIFFI, SHAP, HDBSCAN, dérive KS+KL + │ ├─ models.py EIF, NFEnsemble/TrafficNormalizingFlow (PyTorch), XGBoost + │ └─ scoring.py Normalisation, MetaLearner MLP, seuil adaptatif, ExIFFI, SHAP, HDBSCAN, dérive ADWIN+KS+KL ├─ browser_matcher.py Scoring H2 statique à 7 dimensions pondérées │ └─ browser_signatures.py Signatures statiques Chrome/Firefox/Safari + rechargement ClickHouse ├─ browser_matcher_dynamic.py Scoring H2 dynamique temps réel (profils auto-appris) @@ -44,11 +44,11 @@ __main__.py Point d'entrée (python -m bot_detector) | `browser_signatures.py` | 166 | Signatures statiques Chrome/Firefox/Safari + rechargement dynamique depuis ClickHouse | | `browser_matcher_dynamic.py` | 387 | Scoring H2 dynamique temps réel contre profils auto-appris (`auto_browser_profiles`) | | `profile_builder.py` | 614 | Profiling HDBSCAN hors-ligne : clustering, centroïdes, fusion, lifecycle (cron quotidien) | -| `scoring.py` | 564 | `MetaLearner` (régression logistique), normalisation, seuil adaptatif, ExIFFI, SHAP top-5, HDBSCAN, dérive KS+KL | -| `models.py` | 484 | `TrafficAutoEncoder`, entraînement/chargement EIF, XGBoost, élagage de features | +| `scoring.py` | 564 | `MetaLearner` MLP, normalisation, seuil adaptatif, ExIFFI, SHAP top-5, HDBSCAN, dérive ADWIN+KS+KL | +| `models.py` | 484 | `NFEnsemble`/`TrafficNormalizingFlow`, entraînement/chargement EIF, XGBoost, élagage de features | | `preprocessing.py` | 127 | `preprocess_df()` — nettoyage, typage, imputation, listes `FEATURES` / `FEATURES_COMPLET` | | `pipeline.py` | 441 | `run_semi_supervised_logic()` — orchestration complète d'un modèle, MetaLearner, ExIFFI | -| `fleet.py` | 174 | `build_fleet_graph()`, `detect_fleet_communities()`, `enrich_with_fleet_score()` — NetworkX + HDBSCAN | +| `fleet.py` | 174 | `build_fleet_graph()`, `detect_fleet_communities()`, `enrich_with_fleet_score()` — PyTorch Geometric GraphSAGE + HDBSCAN | | `metrics.py` | 166 | `record_cycle_metrics()`, `_emit_alerts()` — table `ml_performance_metrics` | | `cycle.py` | 415 | `fetch_and_analyze()` — boucle principale, feedback SOC, multiwindow | | `__main__.py` | 41 | Point d'entrée, bannière de démarrage, boucle `while True` | @@ -85,15 +85,18 @@ Toute la configuration est lue via `os.getenv()` dans `config.py`. Aucun fichier | `MIN_VALID_FEATURE_RATIO` | float | `0.50` | Ratio minimal de features valides pour entraîner | | `PRUNE_VARIANCE_THRESHOLD` | float | `1e-6` | Seuil de variance pour l'élagage de features | | `VAL_ANOMALY_GATE` | float | `0.20` | Garde-fou : taux maximum d'anomalies en validation | +| `MIN_HUMAN_BASELINE` | int | `500` | Nombre minimum de sessions humaines pour entraîner l'IF | +| `BASELINE_ACCEPT_UNKNOWN` | bool | `false` | Mode test : utiliser ASN `unknown` comme fallback si baseline ISP insuffisante | -### Autoencoder +### NFEnsemble (Deep Ensemble M=5 Normalizing Flows) | Variable | Type | Défaut | Description | |----------|------|--------|-------------| -| `AE_WEIGHT` | float | `0.30` | Poids de l'Autoencoder dans le score combiné (plage ]0, 1[) | -| `AE_EPOCHS` | int | `50` | Nombre d'époques d'entraînement | -| `AE_LATENT_DIM` | int | `16` | Dimension de l'espace latent | +| `AE_WEIGHT` | float | `0.30` | Poids du NFEnsemble dans le score combiné (plage ]0, 1[) | +| `AE_EPOCHS` | int | `50` | Nombre d'époques d'entraînement par membre | +| `AE_LATENT_DIM` | int | `16` | Dimension de l'espace latent (coupling layers RealNVP) | | `AE_LEARNING_RATE` | float | `1e-3` | Taux d'apprentissage Adam | +| `NF_UNCERTAINTY_THRESHOLD` | float | `1.0` | Seuil de variance inter-modèles pour la détection adversariale | ### XGBoost @@ -154,8 +157,8 @@ Le bot-detector utilise trois modèles en parallèle, combinés par une pondéra └──────────────────────┘ │ │ × (1 − AE_WEIGHT) ┌──────────────────────┐ │ - │ TrafficAutoEncoder │ ├──→ combined_norm - │ (PyTorch) │──→ ae_norm (0–1) + │ NFEnsemble (M=5 NF) │ ├──→ combined_norm + │ (PyTorch RealNVP) │──→ ae_norm (0–1) └──────────────────────┘ × AE_WEIGHT │ × (1 − XGB_WEIGHT) ┌──────────────────────┐ │ @@ -208,24 +211,38 @@ isotree.IsolationForest( **Calibration** : le score isotree brut (∈ [0, 1], >0.5 = anomalous) est converti en convention sklearn : `sklearn_equiv = 0.5 − isotree_score`. -### TrafficAutoEncoder (PyTorch) +### NFEnsemble — Deep Ensemble M=5 Normalizing Flows (PyTorch) -Architecture symétrique encodeur-décodeur : +Le NFEnsemble est un **deep ensemble** de M=5 `TrafficNormalizingFlow` indépendants, chacun +basé sur l'architecture RealNVP (Dinh et al., 2017) avec couches de coupling affines, +permutations fixes et batch normalization. + +**TrafficNormalizingFlow** (membre individuel) : ``` -Encodeur : n_features → dim1 → dim2 → 16 (latent) -Décodeur : 16 → dim2 → dim1 → n_features - -dim1 = min(64, max(n_features, latent_dim + 4)) -dim2 = min(32, max(dim1 // 2, latent_dim + 2)) +Couche 1 : Permutation fixe → RealNVP coupling → BatchNorm +Couche 2 : Permutation fixe → RealNVP coupling → BatchNorm +... +Sortie : log-probabilité exacte par la règle de changement de variable ``` -- Activations : `ReLU` + `BatchNorm1d` sur les couches cachées, `Sigmoid` en sortie du décodeur -- Optimiseur : `Adam(lr=1e-3, weight_decay=1e-5)` -- Perte : `MSELoss` -- Entraînement : 50 époques, batch_size=256 -- Score : erreur de reconstruction MSE par échantillon -- Normalisation des entrées : min-max [0, 1] par feature +- **Coupling layers** : transformations affines conditionnées (scale + shift) +- **Permutations** : permutations fixes alternées entre les couches +- **BatchNorm** : normalisation par batch entre les couches de coupling +- **Optimiseur** : `Adam(lr=1e-3, weight_decay=1e-5)` +- **Perte** : Negative Log-Likelihood (NLL) — `-log p(x)` via la règle de changement de variable +- **Entraînement** : 50 époques, batch_size=256, échantillonnage bootstrap (avec remise) par membre +- **Score** : `-log p(x)` — plus élevé = plus anomalous +- **Normalisation des entrées** : min-max [0, 1] par feature + +**NFEnsemble** (M=5, Lakshminarayanan et al., 2017) : + +- **Incertitude** : la variance inter-modèles des scores `-log p(x)` quantifie l'incertitude épistémique. + - Faible incertitude → dérive organique (les modèles s'accordent) + - Forte incertitude → dérive adversariale probable (les modèles divergent) +- **Seuil d'incertitude** : `NF_UNCERTAINTY_THRESHOLD` (défaut 1.0) — au-delà, `is_adversarial_drift = True` +- **Score final** : moyenne des `-log p(x)` sur les M modèles (rétro-compatible avec le pipeline) +- **Sérialisation** : `state_dict()` contient `ensemble_size`, `n_features`, et les `state_dict` de chaque membre ### XGBoost (supervisé) @@ -324,7 +341,7 @@ hdbscan.HDBSCAN( ) ``` -**Espace de clustering** : si un Autoencoder est disponible, le clustering s'effectue dans l'**espace latent 16-dim** de l'AE. Sinon, `StandardScaler` est appliqué sur les features brutes. +**Espace de clustering** : si un NFEnsemble est disponible, le clustering s'effectue dans l'**espace latent moyen** du Deep Ensemble. Sinon, `StandardScaler` est appliqué sur les features brutes. **Fallback** si `hdbscan` n'est pas disponible : `DBSCAN(eps=0.5, min_samples=CLUSTERING_MIN_SAMPLES)`. @@ -506,20 +523,20 @@ Chaque anomalie reçoit un `campaign_id` (−1 = pas de cluster). a. Validation des features (exclure constantes/manquantes) b. Séparation des bots connus → journalisation KNOWN_BOT c. Filtrage de la baseline humaine (asn_label = 'human', fingerprint_coherence_score ≥ seuil) - d. Chargement ou entraînement EIF + AE - e. Scoring du trafic inconnu (EIF + AE) + d. Chargement ou entraînement EIF + NFEnsemble + e. Scoring du trafic inconnu (EIF + NFEnsemble, incertitude inter-modèles) f. Chargement ou entraînement XGBoost (si labels disponibles) g. MetaLearner : pondération apprise (logistique) sur historique SOC, sinon fallback poids fixes h. Combinaison des scores via MetaLearner ou formule fixe i. Normalisation [0, 1] j. Seuil adaptatif (percentile_5 des scores négatifs, minimum -0.05) k. Pénalité de récurrence - l. ExIFFI (importance par profondeur d'isolation EIF) + erreur AE par feature + l. ExIFFI (importance par profondeur d'isolation EIF) + erreur NF par feature m. SHAP top-5 TreeExplainer n. HDBSCAN clustering → campaign_id - o. Détection de dérive (KS test + KL divergence) - p. Alerte drift adversarial (dérive simultanée multiple features → direction commune) -8. Analyse de flotte (fleet.py) : graphe bipartite JA4×ASN → communautés Louvain → fleet_score + o. Détection de dérive (ADWIN + KS test + KL divergence) + p. Alerte drift adversarial (dérive simultanée multiple features → direction commune, ou incertitude NF > NF_UNCERTAINTY_THRESHOLD) +8. Analyse de flotte (fleet.py) : graphe bipartite JA4×ASN → communautés GraphSAGE → fleet_score 9. Scoring dynamique H2 (browser_matcher_dynamic.py) : profils auto-appris vs sessions entrantes 10. Mode multi-fenêtre (si activé) : idem sur view_ai_features_24h 11. Insertion → ml_all_scores (toutes les sessions scorées) @@ -532,9 +549,15 @@ Chaque anomalie reçoit un `campaign_id` (−1 = pas de cluster). --- -## Détection de dérive (KS + KL divergence) +## Détection de dérive (ADWIN + KS + KL divergence) -Par feature, deux tests comparent la distribution courante avec la distribution d'entraînement : +Par feature, trois mécanismes comparent la distribution courante avec la distribution d'entraînement : + +**ADWIN (Adaptive Windowing)** (River) : +- Détection en ligne par fenêtre adaptative — chaque feature est surveillée par un détecteur ADWIN indépendant +- Propriété `drift_detected` (bool) — vrai quand un changement de distribution est détecté dans la fenêtre +- Pas de seuil manuel — ADWIN ajuste automatiquement la taille de fenêtre +- Remplace la dérive KS+KL pour les features continues en temps réel **Test KS (Kolmogorov-Smirnov)** : - Distribution reconstruite par interpolation à partir d'un digest quantile 9 points (p5, p10, p25, p50, p75, p90, p95) @@ -543,9 +566,9 @@ Par feature, deux tests comparent la distribution courante avec la distribution **Divergence KL (Kullback-Leibler)** : - Histogramme discrétisé (20 bins) de la distribution courante vs baseline - Feature driftée si `KL > seuil` (0.5 par défaut) -- Détection de **drift adversarial** : si ≥30% des features dérivent simultanément dans la même direction → alerte `ADVERSARIAL_DRIFT` +- Détection de **drift adversarial** : si ≥30% des features dérivent simultanément dans la même direction → alerte `ADVERSARIAL_DRIFT`. Également déclenché quand `nf_uncertainty > NF_UNCERTAINTY_THRESHOLD` → alerte `ADVERSARIAL_DRIFT_NF` -**Règle de décision** : une feature est en drift si KS **ou** KL dépasse son seuil. +**Règle de décision** : une feature est en drift si ADWIN détecte un changement, ou si KS **ou** KL dépasse son seuil. - Dérive globale = fraction de features driftées - Si `drift > DRIFT_THRESHOLD` (0.30) → réentraînement automatique @@ -556,12 +579,13 @@ Par feature, deux tests comparent la distribution courante avec la distribution ## MetaLearner -Remplace la pondération linéaire fixe `(1-XGB_W)×((1-AE_W)×eif + AE_W×ae) + XGB_W×xgb` par une régression logistique apprise (`scoring.MetaLearner`) : +Remplace la pondération linéaire fixe `(1-XGB_W)×((1-AE_W)×eif + AE_W×ae) + XGB_W×xgb` par un MLP appris (`scoring.MetaLearner`) : ``` -P(bot) = logistic(w1×eif + w2×ae + w3×xgb + w4×volume + w5×correlated + bias) +P(bot) = MLP(w1×eif + w2×ae + w3×xgb + w4×volume + w5×correlated + bias) ``` +- **Architecture** : MLP (Multi-Layer Perceptron) avec couches cachées — remplace la régression logistique - **Entraînement** : sur l'historique `ml_all_scores` JOIN `soc_feedback` (labels SOC + KNOWN_BOT + ANUBIS_DENY + LEGITIMATE_BROWSER) - **Seuil** : activé seulement si ≥1000 labels disponibles — sinon fallback aux poids fixes - **Transparence** : poids appris journalisés dans `ml_performance_metrics` pour audit SOC @@ -578,10 +602,10 @@ En complément de SHAP, le module expose deux méthodes d'importance de features - Une feature avec profondeur d'isolation faible contribue fortement à l'anomalie - Corrèle avec SHAP mais capte des aspects complémentaires de la structure EIF -**Erreur AE par feature** (`compute_ae_feature_errors`) : -- Reconstruction PyTorch feature par feature : `err_i = (x_i - x̂_i)²` +**Erreur NF par feature** (`compute_ae_feature_errors`) : +- Score de reconstruction NF feature par feature : `err_i = (x_i - x̂_i)²` dans l'espace latent - Pour chaque anomalie, les features avec la plus grande erreur de reconstruction sont identifiées -- Expose quelles dimensions l'autoencoder ne parvient pas à reconstruire +- Expose quelles dimensions le Normalizing Flow ne parvient pas à reconstruire Les deux méthodes sont disponibles dans le champ `shap_features` des résultats, en complément des valeurs SHAP TreeExplainer. @@ -592,8 +616,8 @@ Les deux méthodes sont disponibles dans le champ `shap_features` des résultats Détecte les **botnets coordonnés** utilisant des JA4 et ASN rotatifs via analyse de graphe bipartite : 1. **Construction du graphe** : nœuds JA4 ∪ ASN, arêtes IP observées dans le cycle -2. **Projection** : projection du graphe bipartite sur les nœuds JA4 -3. **Communautés** : algorithme de Louvain (NetworkX) sur le graphe projeté +2. **Embedding** : PyTorch Geometric GraphSAGE pour apprendre des embeddings de nœuds +3. **Communautés** : clustering dans l'espace d'embedding GraphSAGE 4. **Score de flotte** : `fleet_score = taille_communauté × densité_arêtes / log(nb_ASN)` 5. **Enrichissement** : les IPs membres reçoivent un malus proportionnel au fleet_score @@ -613,7 +637,7 @@ Enregistre par cycle dans `ml_performance_metrics` : | `drift_rate` | Fraction de features en dérive | | `corr_rate` | Taux de sessions corrélées (cible : ≥50%) | | `cycle_latency_s` | Durée totale d'inférence (cible : <300s) | -| `alert_flags` | Alertes émises (CALIBRATION_HIGH/LOW, DRIFT, CORRELATION, LATENCY) | +| `alert_flags` | Alertes émises (CALIBRATION_HIGH/LOW, DRIFT, CORRELATION, LATENCY, ADVERSARIAL_DRIFT_NF) | **Seuils d'alerte** : - `anomaly_rate > 10%` → `CALIBRATION_HIGH` @@ -628,11 +652,8 @@ Enregistre par cycle dans `ml_performance_metrics` : La vue `view_ai_features_1h` enrichit chaque IP via les dictionnaires Anubis selon une cascade de priorité : -1. **UA + IP combinés** (même `rule_id`) — confiance maximale -2. **UA seul** (pas de condition IP) -3. **IP seul** (pas de condition UA) -4. **Correspondance ASN** -5. **Correspondance pays** +1. **IP/CIDR** — correspondance exacte d'adresse ou de sous-réseau +2. **ASN** — correspondance par numéro d'ASN --- diff --git a/docs/services/ja4ebpf.md b/docs/services/ja4ebpf.md index bfe96f4..2822128 100644 --- a/docs/services/ja4ebpf.md +++ b/docs/services/ja4ebpf.md @@ -68,12 +68,14 @@ Le programme `bpf/tc_capture.c` est attaché à l'interface réseau via **TC (Tr - Envoyé dans le RingBuffer `rb_tcp_syn` (16 MB) **ClientHello TLS** : détection du type 0x16 (Handshake) et sous-type 0x01 (ClientHello). -- `bpf_skb_load_bytes()` pour capturer 512 octets du payload +- `bpf_skb_load_bytes()` avec tailles en cascade (512 → 256 → 128) pour capturer SNI et extensions +- La taille réellement copiée est stockée dans `payload_len` - Envoyé dans le RingBuffer `rb_tls_hello` (16 MB) **HTTP en clair (port 80/8080)** : pour les connexions non chiffrées. - SYN/FIN/RST exclus (uniquement les segments porteurs de données) -- Jusqu'à 4096 octets via `bpf_skb_load_bytes()` +- `bpf_skb_load_bytes()` avec tailles en cascade (256 → 128 → 64) +- La taille réellement copiée est stockée dans `payload_len` - Envoyé dans le RingBuffer `rb_http_plain` (32 MB) ### Uprobe SSL_read — Couche L7 @@ -124,6 +126,7 @@ Buffer reçu (SSL data ou HTTP plain) | Champ | Description | |-------|-------------| | `src_ip`, `src_port` | Clé de corrélation | +| `dst_ip`, `dst_port` | Destination IP et port (extrait du SYN) | | `ttl` | Time To Live initial | | `df_bit` | Don't Fragment bit | | `ip_id` | IP Identification (0 = Linux/VPN/spoofé) | @@ -138,7 +141,7 @@ Buffer reçu (SSL data ou HTTP plain) | Champ | Description | |-------|-------------| -| `tls_version` | Version TLS | +| `tls_version` | Version TLS la plus haute annoncée (extrait des SupportedVersions) | | `ciphers` | Liste suites cryptographiques | | `extensions` | Liste extensions TLS | | `elliptic_curves` | Courbes elliptiques supportées | @@ -167,7 +170,7 @@ Buffer reçu (SSL data ou HTTP plain) | Champ | Description | |-------|-------------| -| `h2_header_table_size` | SETTINGS ID 1 (-1 si absent) | +| `h2_header_table_size` | SETTINGS ID 1 (`nil` si absent du preface, omis dans le JSON) | | `h2_enable_push` | SETTINGS ID 2 | | `h2_max_concurrent_streams` | SETTINGS ID 3 | | `h2_initial_window_size` | SETTINGS ID 4 | @@ -254,7 +257,8 @@ services/ja4ebpf/ │ ├── dispatcher/ │ │ └── dispatcher.go # Routeur Magic Bytes (ProtoHTTP1/2/Unknown) │ ├── correlation/ -│ │ └── manager.go # Gestionnaire sessions 256-shard +│ │ ├── manager.go # Gestionnaire sessions 256-shard +│ │ └── session.go # Structs L3L4, TLSInfo, SessionState │ └── writer/ │ └── writer.go # Writer ClickHouse (batch + retry) ├── packaging/ diff --git a/docs/testing-e2e.md b/docs/testing-e2e.md new file mode 100644 index 0000000..e0b0b4b --- /dev/null +++ b/docs/testing-e2e.md @@ -0,0 +1,189 @@ +# Test E2E distribué — Stack de test complète ja4-platform + +## Objectif + +Valider le pipeline complet de bout en bout : + +``` +trafic simulé → ja4ebpf (capture eBPF) → ClickHouse (stockage + agrégation MV) + → bot-detector (ML) → dashboard (visualisation) +``` + +Les tests unitaires et d'intégration existants testent la capture eBPF isolément. +Le test E2E distribué valide la **chaine complète** sur une architecture multi-VMs. + +## Architecture + +``` + HOST (orchestrateur — run-e2e-test.sh) + │ + ├── centos8 (el8) ── nginx + ja4ebpf ──eth1──┐ + ├── rocky9 (el9) ── nginx + ja4ebpf ──eth1──┤ réseau privé ja4-e2e + ├── rocky10 (el10) ── nginx + ja4ebpf ──eth1──┤ 192.168.42.0/24 + │ │ + │ analysis ────┘ 192.168.42.10 (fixe) + │ ├── Docker ClickHouse :9000/:8123 + │ ├── Docker bot-detector :8080 + │ └── Docker dashboard :8000 + │ + └── Trafic curl/httpx → endpoints :80/:443 +``` + +### VMs + +| VM | Rôle | Box | IP | Services | +|----|------|-----|-----|----------| +| centos8 | Endpoint el8 | centos/8 | DHCP eth0 | nginx, ja4ebpf | +| rocky9 | Endpoint el9 | generic/rocky9 | DHCP eth0 | nginx, ja4ebpf | +| rocky10 | Endpoint el10 | almalinux/10 | DHCP eth0 | nginx, ja4ebpf | +| analysis | Serveur central | generic/rocky9 | 192.168.42.10 (eth1 fixe) | Docker: ClickHouse, bot-detector, dashboard | + +### Réseau + +- **eth0** (NAT libvirt) : SSH depuis le host, réception du trafic de test +- **eth1** (réseau privé `ja4-e2e`) : communication inter-VMs + - Les endpoints utilisent eth1 pour envoyer les logs ja4ebpf vers `192.168.42.10:9000` + - Le host accède au dashboard et ClickHouse via l'IP eth0 de la VM analysis (routée via libvirt NAT) + +## Pipeline de données + +``` +1. HOST → endpoints curl/httpx génère du trafic HTTP/HTTPS/H2 +2. ja4ebpf Capture eBPF : TLS ClientHello (JA4), TCP SYN (L3/L4), + HTTP via uprobe SSL_read (L7) +3. → ClickHouse :9000 ja4ebpf écrit dans ja4_logs.http_logs_raw (batch 100 lignes, flush 1s) +4. MV mv_http_logs Materialized View : http_logs_raw → http_logs (parsed, corrélé) +5. MV mv_agg_host_ip_ja4_1h Agrégation horaire par (host, src_ip, ja4) → agg_host_ip_ja4_1h +6. view_ai_features_1h Vue qui joint les features pour le ML +7. bot-detector Cycle ML toutes les 30s (config de test) : + - Lit view_ai_features_1h + - Pipeline : NFEnsemble (M=5 NF) → ADWIN (River) → MLP fusion + - Écrit ml_all_scores + ml_detected_anomalies +8. dashboard API FastAPI sur :8000, requête ClickHouse +``` + +## Principe clé : DB vierge avant chaque test + +Avant chaque exécution E2E, la Phase 1 fait un `docker compose down -v` qui supprime tous les volumes Docker (y compris les données ClickHouse). Cela garantit que : +- ClickHouse démarre avec un schéma vierge +- Les données observées sont exclusivement celles générées par le test +- Les vérifications de la Phase 5 sont déterministes + +## Stack Docker (VM analysis) + +Définie dans `tests/vm/analysis/docker-compose.yml` : + +### ClickHouse +- Image : `clickhouse/clickhouse-server:24.8` +- Ports : `0.0.0.0:9000` (native, ja4ebpf), `0.0.0.0:8123` (HTTP, API) +- Schéma : 12 fichiers SQL de `shared/clickhouse/*.sql`, exécutés via `clickhouse-init.sh` +- Credentials : `user=default, password=""` (patché par clickhouse-init.sh) +- Dictionnaires : CSV stubs de `tests/integration/platform/csv-stubs/` + +### bot-detector +- Build : `services/bot-detector/bot_detector/Dockerfile` +- Port : `0.0.0.0:8080` (health check) +- Configuration accélérée pour les tests : + - `CYCLE_INTERVAL_SEC: 30` (vs 300 en prod) + - `MIN_VALID_FEATURE_RATIO: 0.10` (vs 0.50 en prod) + - SHAP, clustering, multi-fenêtres désactivés + +### dashboard +- Build : `services/dashboard/Dockerfile` +- Port : `0.0.0.0:8000` +- Routes de vérification : `/health`, `/api/overview`, `/api/detections` + +## Phases du test (run-e2e-test.sh) + +| Phase | Description | Durée | +|-------|-------------|-------| +| 0 | Setup : démarrage VMs, rsync, découverte IPs | ~2 min (si VMs existantes) | +| 1 | Stack analysis : purge volumes (DB vierge), `docker compose up -d --build`, attente healthy | ~3 min | +| 2 | Endpoints : nginx + ja4ebpf (DSN → analysis:9000), en parallèle | ~1 min | +| 3 | Trafic : 500 req/VM × 3 VMs, HTTP/HTTPS/H2, méthodes variées | ~5 min | +| 4 | Attente : flush ja4ebpf 15s, poll ml_all_scores (max 120s) | ~2 min | +| 5 | Vérifications : 15+ checks sur 4 layers | ~1 min | + +## Vérifications (Phase 5) + +### Layer 1 — Données brutes +- `ja4_logs.http_logs_raw` : lignes > 0 +- `uniqExact(host)` : >= 2 hôtes distincts (multi-source) + +### Layer 2 — Pipeline ClickHouse (MVs) +- `ja4_logs.http_logs` : JA4 fingerprints capturés +- `ja4_logs.http_logs` : méthodes HTTP capturées (L7 via uprobe SSL_read) +- `ja4_processing.agg_host_ip_ja4_1h` : agrégation horaire peuplée +- `ja4_processing.view_ai_features_1h` : features ML disponibles + +### Layer 3 — ML bot-detector +- `ja4_processing.ml_all_scores` : classifications produites +- `ja4_processing.ml_detected_anomalies` : anomalies détectées (optionnel) +- Health check `:8080` + +### Layer 4 — Dashboard +- `/health` : OK +- `/api/overview` : données non-vides +- `/api/detections` : accessible + +## Utilisation + +```bash +# Créer les 4 VMs +make e2e-up + +# Test complet (500 req/VM, ~15 min) +make test-e2e + +# Test rapide (100 req/VM) +make test-e2e-quick + +# Garder les VMs après le test (pour debug) +KEEP_RUNNING=true make test-e2e + +# Détruire les VMs +make e2e-down +``` + +## Accès manuel (debug) + +```bash +# ClickHouse — vérifier les données +curl "http://192.168.42.10:8123/?query=SELECT+count()+FROM+ja4_logs.http_logs" + +# Dashboard +curl http://192.168.42.10:8000/health +curl http://192.168.42.10:8000/api/overview | python3 -m json.tool + +# bot-detector +curl http://192.168.42.10:8080/ + +# SSH dans la VM analysis +cd tests/vm && vagrant ssh analysis + +# Logs des conteneurs +vagrant ssh analysis -- "docker logs bot_detector_ai --tail 50" +vagrant ssh analysis -- "docker logs ja4-dashboard --tail 50" +``` + +## Fichiers + +| Fichier | Rôle | +|---------|------| +| `tests/vm/Vagrantfile` | Définition des 4 VMs + réseau ja4-e2e | +| `tests/vm/provision-analysis.sh` | Provisionneur VM analysis (Docker, firewall) | +| `tests/vm/analysis/docker-compose.yml` | Stack centralisée CH + bot-detector + dashboard | +| `tests/vm/run-e2e-test.sh` | Orchestrateur E2E 5 phases | +| `tests/vm/run-tests-vm.sh` | Script endpoint (modifié pour CH_HOST) | +| `Makefile` | Cibles e2e-up, e2e-down, test-e2e, test-e2e-quick | + +## Dépannage + +| Problème | Diagnostic | +|----------|------------| +| ClickHouse inaccessible | `vagrant ssh analysis -- "docker ps"` ; vérifier le port binding | +| ja4ebpf n'écrit pas | `vagrant ssh rocky9 -- "cat /tmp/ja4ebpf.log \| tail 20"` | +| Pas de JA4 | Le hook TC nécessite CAP_BPF ; vérifier `dmesg \| grep bpf` | +| bot-detector ne démarre pas | `vagrant ssh analysis -- "docker logs bot_detector_ai"` | +| Pas de données ML | Volume insuffisant pour les fenêtres d'agrégation horaire | +| Dashboard vide | Le bot-detector doit avoir complété au moins 1 cycle (30s) | diff --git a/docs/thesis/00_resume.md b/docs/thesis/00_resume.md index d9b75ad..2923b05 100644 --- a/docs/thesis/00_resume.md +++ b/docs/thesis/00_resume.md @@ -11,7 +11,7 @@ ## Résumé -Ce document présente une architecture opérationnelle de détection et classification du trafic HTTP malveillant, s'inscrivant dans la continuité des approches de génération 3 (fingerprinting multi-protocole et ML comportemental). Le système exploite 96 features organisées en 8 familles couvrant les couches réseau L3 à L7, corrélant des signaux TCP, TLS et HTTP en un vecteur unifié par session. La détection repose sur un ensemble triple-voix combinant un Extended Isolation Forest (EIF), un Normalizing Flow (NF) et XGBoost, fusionnés par un méta-modèle MLP (Multi-Layer Perceptron) non-linéaire calibré sur les étiquettes accumulées. L'explicabilité est assurée par l'importance des features par profondeur d'isolation (EIF) et SHAP TreeExplainer (XGBoost). Le clustering de campagnes est réalisé par HDBSCAN dans l'espace latent 16 dimensions de l'autoencodeur, et la détection de flottes coordonnées par graphes bipartis via NetworkX. Le fingerprinting HTTP/2 passif — extraction des trames SETTINGS, WINDOW_UPDATE et de l'ordre des pseudo-headers côté serveur — exploite un signal déjà utilisé par des solutions industrielles (Akamai, Cloudflare, F5), ici implémenté via eBPF. L'infrastructure repose sur 16 modules Python (4 800 lignes), une base ClickHouse à double schéma (ja4_logs bruts TTL 2 h, ja4_processing agrégés TTL 7 j), des cycles d'analyse de 300 secondes, et traite en production plus de 3 millions de logs, environ 34 000 sessions par cycle, avec approximativement 777 anomalies détectées par cycle (≈ 2,3 % — chiffre opérationnel brut, non validé comme taux de détection). Le système intègre un moteur de profiling dynamique automatique des navigateurs (HDBSCAN sur les vecteurs H2 observés, centroïdes auto-appris, scoring temps réel par distance normalisée) qui s'adapte aux évolutions des piles HTTP/2 sans intervention manuelle. +Ce document présente une architecture opérationnelle de détection et classification du trafic HTTP malveillant, s'inscrivant dans la continuité des approches de génération 3 (fingerprinting multi-protocole et ML comportemental). Le système exploite 96 features organisées en 8 familles couvrant les couches réseau L3 à L7, corrélant des signaux TCP, TLS et HTTP en un vecteur unifié par session. La détection repose sur un ensemble triple-voix combinant un Extended Isolation Forest (EIF), un Normalizing Flow (NF) et un Hoeffding Adaptive Tree (HAT, River), fusionnés par un MLP non-linéaire calibré sur les étiquettes accumulées. L'explicabilité est assurée par l'importance des features par profondeur d'isolation (EIF) et SHAP TreeExplainer (HAT). Le clustering de campagnes est réalisé par HDBSCAN dans l'espace latent du Normalizing Flow, et la détection de flottes coordonnées par GraphSAGE (PyTorch Geometric). Le fingerprinting HTTP/2 passif — extraction des trames SETTINGS, WINDOW_UPDATE et de l'ordre des pseudo-headers côté serveur — exploite un signal déjà utilisé par des solutions industrielles (Akamai, Cloudflare, F5), ici implémenté via eBPF. L'infrastructure repose sur 16 modules Python (4 800 lignes), une base ClickHouse à double schéma (ja4_logs bruts TTL 2 h, ja4_processing agrégés TTL 7 j), des cycles d'analyse de 300 secondes, et traite en production plus de 3 millions de logs, environ 34 000 sessions par cycle, avec approximativement 777 anomalies détectées par cycle (≈ 2,3 % — chiffre opérationnel brut, non validé comme taux de détection). Le système intègre un moteur de profiling dynamique automatique des navigateurs (HDBSCAN sur les vecteurs H2 observés, centroïdes auto-appris, scoring temps réel par distance normalisée) qui s'adapte aux évolutions des piles HTTP/2 sans intervention manuelle. **Mots-clés** : fingerprinting réseau, JA4+, HTTP/2 fingerprinting, détection de bots, Extended Isolation Forest, autoencodeurs, ensemble hybride, corrélation TCP/TLS/HTTP, WAF, classification de trafic, apprentissage semi-supervisé, clustering HDBSCAN diff --git a/docs/thesis/01_introduction.md b/docs/thesis/01_introduction.md index 001f013..134a789 100644 --- a/docs/thesis/01_introduction.md +++ b/docs/thesis/01_introduction.md @@ -43,9 +43,9 @@ Ce document décrit une architecture opérationnelle s'inscrivant dans la contin 1. **Corrélation TCP/TLS/HTTP** en temps réel via ja4ebpf (clé : `src_ip:src_port`, 256 shards, timeout orphelin 500 ms) 2. **Fingerprinting HTTP/2 passif** : extraction des trames SETTINGS, WINDOW_UPDATE, PRIORITY et de l'ordre des pseudo-headers directement depuis le stream TCP — approche déjà exploitée par des solutions industrielles (Akamai, Cloudflare, F5), ici implémentée via eBPF 3. **Architecture EIF bifurquée** : modèle complet (≈ 45 features L3→L7) et modèle applicatif (≈ 35 features L7 uniquement), évitant le biais de zérotage sur le trafic non corrélé — choix pragmatique de gestion des données manquantes -4. **Ensemble triple-voix avec fusion par MLP non-linéaire** : combinaison EIF + NF + XGBoost avec méta-modèle MLP apprenant les interactions non-linéaires entre les trois voix +4. **Ensemble triple-voix avec fusion par MLP non-linéaire** : combinaison EIF + NF + HAT (River) avec fusion MLP apprenant les interactions non-linéaires entre les trois voix 5. **HDBSCAN dans l'espace latent AE** : clustering de campagnes par similarité de comportement compressé en 16 dimensions -6. **Détection de dérive adversariale** : distinction entre dérive organique (mises à jour navigateur) et manipulation directionnelle coordonnée +6. **Détection de dérive adversariale** : distinction entre dérive organique (mises à jour navigateur) et manipulation adversariale via incertitude épistémique de Deep Ensembles (NFEnsemble M=5) 7. **8 features comportementales avancées** : application de statistiques standard (déviation de Benford, entropie de transition markovienne, autocorrélation lag-1, délai root-to-first-asset, diversité de hosts, uniformité de couverture cross-host) au domaine de la détection de bots 8. **Graphes bipartis NetworkX** pour la détection de flottes diff --git a/docs/thesis/02_etat_de_lart.md b/docs/thesis/02_etat_de_lart.md index 2a0097a..34f3745 100644 --- a/docs/thesis/02_etat_de_lart.md +++ b/docs/thesis/02_etat_de_lart.md @@ -58,7 +58,7 @@ La deuxième couche de défense statique repose sur des dictionnaires de réputa [Anubis](https://github.com/TecharoHQ/anubis) est un système de règles communautaire en YAML permettant de définir des actions granulaires par bot identifié. Les quatre actions disponibles sont : - **ALLOW** : autorisation explicite (bots légitimes : Googlebot, Bingbot, bots de recherche académique) -- **DENY** : blocage avec retour 403 Forbidden — signal de vérité terrain fort pour l'entraînement XGBoost +- **DENY** : blocage avec retour 403 Forbidden — signal de vérité terrain fort pour l'entraînement HAT (River) - **WEIGH** : ajout d'un score de pondération sans blocage — signal auxiliaire dans le vecteur de features - **CHALLENGE** : redirection vers un challenge (PoW ou CAPTCHA) @@ -69,7 +69,7 @@ La deuxième couche de défense statique repose sur des dictionnaires de réputa **Priorité de correspondance** : `COALESCE(IP match, ASN match)` — une correspondance CIDR précise sur l'IP prend la priorité sur la correspondance ASN plus générale. Cela reflète le principe que l'information la plus spécifique est la plus fiable. **Valeur pour le pipeline ML** : -- Les sessions `DENY` fournissent des étiquettes de bot à haute confiance pour l'entraînement supervisé de XGBoost, sans nécessiter d'annotation manuelle. +- Les sessions `DENY` fournissent des étiquettes de bot à haute confiance pour l'entraînement incrémental du HAT (River), sans nécessiter d'annotation manuelle. - Les sessions `WEIGH` contribuent une feature binaire `anubis_is_flagged` dans la famille F7, enrichissant le vecteur de features sans déclencher de blocage. --- @@ -292,7 +292,7 @@ XGBoost ([Chen & Guestrin, 2016](https://arxiv.org/abs/1603.02754)) est un algor **Limites des approches supervisées** : - **Concept drift** : un modèle entraîné sur des bots de 2024 peut être aveugle aux nouvelles techniques de 2025 - **Rareté des étiquettes** : annoter manuellement des millions de sessions HTTP est coûteux et sujet à erreur -- **Bruit des étiquettes** : les labels fournis par les analystes SOC contiennent des erreurs systématiques (faux positifs mal corrigés, biais de confirmation). Ces étiquettes bruitées empoisonnent le modèle supervisé — un problème bien documenté par [Northcutt et al., 2021 (Cleanlab)](https://arxiv.org/abs/1911.00068) qui montre que les jeux de données réels contiennent 8 à 20 % de labels incorrects. Pour mitiger ce risque, notre pipeline intègre un filtre Cleanlab avant l'entraînement XGBoost (détail §3.8). +- **Bruit des étiquettes** : les labels fournis par les analystes SOC contiennent des erreurs systématiques (faux positifs mal corrigés, biais de confirmation). Ces étiquettes bruitées empoisonnent le modèle supervisé — un problème bien documenté par [Northcutt et al., 2021 (Cleanlab)](https://arxiv.org/abs/1911.00068) qui montre que les jeux de données réels contiennent 8 à 20 % de labels incorrects. Pour mitiger ce risque, notre pipeline intègre un filtre Cleanlab avant l'apprentissage incrémental du HAT (détail §3.8). - **Biais de jeu de données** : les modèles entraînés sur des données de laboratoire (CICIDS2017, NSL-KDD) généralisent mal au trafic en production, comme documenté dans la littérature sur les benchmarks de détection d'intrusions - **Attaque par évasion adversariale** : un attaquant ayant accès ou connaissance du modèle peut crafting des sessions qui maximisent le score de légitimité @@ -413,20 +413,19 @@ Le système de détection combine trois « voix » complémentaires : ┌──────────────┼──────────────┐ │ │ │ ┌─────▼─────┐ ┌─────▼─────┐ ┌────▼──────┐ - │ EIF │ │ NF │ │ XGBoost │ - │ (semi- │ │ (Normal- │ │(supervisé)│ - │supervisé) │ │ izing │ │ │ - │ │ │ Flow) │ │ │ + │ EIF │ │ NF │ │ HAT │ + │ (semi- │ │ (Normal- │ │ (River, │ + │supervisé) │ │ izing │ │ supervisé │ + │ │ │ Flow) │ │ online) │ └─────┬─────┘ └─────┬─────┘ └────┬──────┘ │ │ │ - eif_norm nf_norm xgb_prob + eif_norm nf_norm hat_prob │ │ │ └──────────────┼──────────────┘ │ ┌────────▼────────┐ - │ Meta-Model │ - │ Stacking MLP │ - │ (non-linéaire) │ + │ Fusion MLP │ + │ non-linéaire │ └────────┬────────┘ │ ┌────────▼────────┐ @@ -437,7 +436,7 @@ Le système de détection combine trois « voix » complémentaires : **Limites de la fusion linéaire** -Une fusion linéaire — combinaison convexe pondérée ou régression logistique — ne peut capturer que des frontières de décision linéaires dans l'espace des scores intermédiaires. Or les signaux EIF, NF et XGBoost peuvent exhiber des interactions non-linéaires impossibles à modéliser par une combinaison linéaire : +Une fusion linéaire — combinaison convexe pondérée ou régression logistique — ne peut capturer que des frontières de décision linéaires dans l'espace des scores intermédiaires. Or les signaux EIF, NF et HAT peuvent exhiber des interactions non-linéaires impossibles à modéliser par une combinaison linéaire : ``` Problème XOR des scores : @@ -450,18 +449,18 @@ Problème XOR des scores : NF bas ``` -Exemple concret : un bot utilisant un outilHeadless avec un JA4 fingerprint légitime (NF bas) mais un comportement de navigation atypique (EIF élevé). Le XGBoost peut compenser, mais la fusion linéaire ne peut apprendre la relation *« EIF élevé ET XGB élevé MAIS NF bas = bot »* — elle ne fait que sommer les contributions indépendantes. +Exemple concret : un bot utilisant un outilHeadless avec un JA4 fingerprint légitime (NF bas) mais un comportement de navigation atypique (EIF élevé). Le HAT peut compenser, mais la fusion linéaire ne peut apprendre la relation *« EIF élevé ET HAT élevé MAIS NF bas = bot »* — elle ne fait que sommer les contributions indépendantes. **Stacking OOF (Out-of-Fold) et MLP méta-modèle** Pour résoudre cette limitation, le système utilise un méta-modèle non-linéaire de type MLP (*Multi-Layer Perceptron*) entraîné via stacking Out-of-Fold : -1. **Prédictions OOF** : les modèles de base (EIF, NF, XGBoost) produisent des prédictions sur des plis de validation croisée temporelle, garantissant que le méta-modèle n'a jamais vu les données d'entraînement des modèles de base — évitant le surapprentissage (*information leakage*). +1. **Prédictions OOF** : les modèles de base (EIF, NF, HAT) produisent des prédictions sur des plis de validation croisée temporelle, garantissant que le méta-modèle n'a jamais vu les données d'entraînement des modèles de base — évitant le surapprentissage (*information leakage*). 2. **Méta-modèle MLP** : un réseau de neurones à 2 couches apprend la fonction de fusion optimale : ``` -MetaFusionMLP : [eif, nf, xgb] → Linear(3,16) → BatchNorm → ReLU → Dropout(0.2) +MetaFusionMLP : [eif, nf, hat] → Linear(3,16) → BatchNorm → ReLU → Dropout(0.2) → Linear(16,1) → Sigmoid → P(bot) ``` @@ -470,7 +469,7 @@ MetaFusionMLP : [eif, nf, xgb] → Linear(3,16) → BatchNorm → ReLU → Dropo - **Early stopping** (patience = 5 epochs) : arrête l'entraînement dès que la loss de validation ne s'améliore plus, évitant le surapprentissage. - **Weight decay** ($\lambda = 10^{-4}$) : pénalité L2 sur les poids du MLP pour une régularisation supplémentaire. -Le MLP apprend des frontières de décision non-linéaires dans l'espace 3D `[eif_norm, nf_norm, xgb_prob]`, capable de résoudre les patterns XOR et les interactions conditionnelles entre les trois voix. Le système de détection peut ainsi combiner automatiquement les signaux de manière optimale en fonction du type de trafic observé en production. +Le MLP apprend des frontières de décision non-linéaires dans l'espace 3D `[eif_norm, nf_norm, hat_prob]`, capable de résoudre les patterns XOR et les interactions conditionnelles entre les trois voix. Le système de détection peut ainsi combiner automatiquement les signaux de manière optimale en fonction du type de trafic observé en production. **Calendrier de retraining** : - HAT (supervisé) : apprentissage incrémental à chaque cycle (300s) sur les étiquettes accumulées, après filtrage Cleanlab des labels SOC bruyants (voir ci-dessous) @@ -479,18 +478,22 @@ Le MLP apprend des frontières de décision non-linéaires dans l'espace 3D `[ei **Filtrage des labels SOC bruyants (Cleanlab)** : -Avant chaque entraînement XGBoost, les labels fournis par les analystes SOC sont filtrés via [Cleanlab](https://cleanlab.ai/) ([Northcutt et al., 2021](https://arxiv.org/abs/1911.00068)). Ce framework de *confident learning* identifie les exemples dont l'étiquette est probablement erronée en comparant les prédictions out-of-fold d'un modèle aux labels observés. +Avant chaque cycle d'apprentissage incrémental du HAT, les labels fournis par les analystes SOC sont filtrés via [Cleanlab](https://cleanlab.ai/) ([Northcutt et al., 2021](https://arxiv.org/abs/1911.00068)). Ce framework de *confident learning* identifie les exemples dont l'étiquette est probablement erronée en comparant les prédictions out-of-fold d'un modèle aux labels observés. ```python -# 1. Obtenir pred_probs via cross-validation (3 folds) -quick_model = XGBClassifier(n_estimators=80, max_depth=4) -pred_probs = cross_val_predict(quick_model, X, y, cv=3, method='predict_proba') +# 1. Obtenir pred_probs via cross-validation (3 folds) sur les labels accumulés +from river import tree +quick_model = tree.HoeffdingAdaptiveTreeClassifier() +# Les labels accumulés ce cycle sont filtrés avant injection dans le HAT +pred_probs = cross_val_predict(quick_model, X_accumulated, y_accumulated, + cv=3, method='predict_proba') # 2. Identifier les labels douteux issues = find_label_issues(labels=y, pred_probs=pred_probs) -# 3. Exclure les exemples bruités avant l'entraînement final -X_clean, y_clean = X[~noisy_mask], y[~noisy_mask] +# 3. N'injecter que les labels propres via learn_one() +for x_clean, y_clean in clean_samples: + hat_model.learn_one(x_clean, y_clean) ``` Ce mécanisme protège le modèle contre l'empoisonnement par des faux positifs mal corrigés ou des biais de confirmation des analystes. Le taux de labels filtrés est loggé pour surveillance. En cas d'échec de Cleanlab (erreur mémoire, dépendance manquante), le pipeline revient aux données brutes sans interruption. @@ -577,7 +580,20 @@ Le passage au stream mining élimine trois problématiques majeures du batch tra **Validation gate** : conservée — si le taux d'anomalie sur le jeu de validation dépasse 20% après retraining EIF/NF, le nouveau modèle est rejeté et le modèle précédent conservé. -#### 2.4.4 Modélisation des phases d'attaque +**Quantification d'incertitude par Deep Ensembles** + +La détection adversariale par ADWIN reposait sur l'heuristique suivante : si plus de 50% des features driftent simultanément, le drift est qualifié d'adversarial. Cette heuristique est non fondée — un pic de légitime trafic (ex. mise à jour navigateur majeure) peut déclencher un drift massif sur de nombreuses features sans pour autant être adversarial. À l'inverse, une attaque furtive ne touchant que quelques features ne serait jamais détectée. + +Cette heuristique est remplacée par une mesure d'incertitude épistémique via **Deep Ensembles** ([Lakshminarayanan et al., 2017](https://arxiv.org/abs/1612.01474)) : le Normalizing Flow unique est remplacé par un ensemble de $M=5$ modèles indépendants, chacun entraîné sur un échantillon bootstrap (avec remise) de la baseline humaine. L'incertitude est mesurée par la variance inter-modèles : + +$$\sigma^2(x) = \frac{1}{M} \sum_{m=1}^{M} \left( -\log p_m(x) - \overline{-\log p(x)} \right)^2$$ + +La logique de détection repose sur l'intuition suivante : + +- **Dérive organique** (changement naturel du trafic) : les 5 modèles s'accordent sur la nouveauté → variance faible. Tous les manifolds ont capturé les mêmes structures dans la baseline, donc un nouveau pattern légitime est traité de manière cohérente. +- **Dérive adversariale** (évasion délibérée) : les 5 modèles ne s'accordent pas → variance qui explose. Un échantillon adversarial tombe dans une région de l'espace où chaque manifold a appris une frontière légèrement différente (diversité induite par le bootstrap), produisant des scores de vraisemblance très dispersés. + +Le seuil `NF_UNCERTAINTY_THRESHOLD` (défaut : 1.0) est appliqué sur $\sigma^2(x)$ : tout échantillon au-dessus est tagué `is_adversarial_drift = True`. Cette approche est fondée statistiquement (variance sur un ensemble) et ne dépend pas d'un seuil arbitraire sur le nombre de features en drift. La modélisation des phases d'attaque (Reconnaissance → Mouvement latéral → Intrusion → Exfiltration) par des modèles d'état-espace ou des processus de Markov cachés constitue une piste de recherche. L'enrichissement du clustering HDBSCAN avec ce signal de phase permettrait de distinguer des campagnes en phase de reconnaissance de campagnes en phase d'exploitation active. diff --git a/docs/thesis/03_architecture.md b/docs/thesis/03_architecture.md index e5ba43a..cb2d928 100644 --- a/docs/thesis/03_architecture.md +++ b/docs/thesis/03_architecture.md @@ -69,8 +69,8 @@ │ │ 3b. dynamic H2 profiling scoring │ │ │ │ 4. EIF bifurqué (complet/appli) │ │ │ │ 5. NF log-likelihood scoring │ │ - │ │ 6. XGBoost probabilité │ │ - │ │ 7. Meta-Model MLP fusion │ │ + │ │ 6. HAT probabilité (River online)│ │ + │ │ 7. Fusion MLP non-linéaire │ │ │ │ 8. HDBSCAN clustering (NF latent) │ │ │ │ 9. Écriture résultats ClickHouse │ │ │ └──────────────────────────────────┘ │ @@ -240,7 +240,7 @@ Session entrante ├── asn_label == 'human' ? │ ── OUI → baseline EIF training (sans étiquette bot) │ - └── Sinon → Triple-voix : EIF + NF + XGBoost + Meta-Model Stacking (MLP non-linéaire) + └── Sinon → Triple-voix : EIF + NF + HAT (River) + Fusion MLP non-linéaire ``` #### Seuil adaptatif @@ -258,7 +258,7 @@ La valeur `percentile_5` du historique des scores négatifs (anomalies confirmé | EIF Complet | ≈ 45 features L3→L7 | Données L3/L4 disponibles | eif_score_full | | EIF Applicatif | ≈ 35 features L7 | L3/L4 absentes (CDN/proxy) | eif_score_app | | NF | Même dimensionnalité que EIF actif | Toutes sessions | nf_log_likelihood | -| XGBoost | Ensemble complet 96 features | Toutes sessions | xgb_probability | +| HAT (River) | Ensemble complet 96 features | Toutes sessions | hat_probability | #### Niveaux de sévérité diff --git a/docs/thesis/07_discussion_limites.md b/docs/thesis/07_discussion_limites.md index 4b65921..200afd8 100644 --- a/docs/thesis/07_discussion_limites.md +++ b/docs/thesis/07_discussion_limites.md @@ -21,6 +21,7 @@ La détection de bots s'inscrit dans une dynamique de course aux armements où c | asset_ratio | Playwright/Puppeteer chargeant toutes ressources | Détectable via resource dependency tree (§5.4) | | IP reputation | Proxies résidentiels (Bright Data, Oxylabs) | Contournement partiel mais coût élevé par requête | | Comportement navigation | Scripts imitant les patterns de clic humain | Détectable via cadence fingerprint et entropy de séquence | +| Deep Ensembles (NF M=5) | Perturbation continue des features | L'évasion par perturbation continue est difficile car l'attaquant doit tromper 5 manifolds différents simultanément | #### Architecture multi-couches comme contre-mesure structurelle @@ -71,7 +72,7 @@ Cependant, des proxies résidentiels persistants apparaissant dans **chaque cycl | 2 | Signaux orthogonaux (§5.2, §5.3) résistants à contamination | Détecte bots résistants à l'EIF par des axes indépendants | | 3 | Validation : `anomaly_rate > 20%` → rejet du modèle | Détecte les cycles d'entraînement pathologiques | | 4 | Feedback SOC : FP → reclassification "human" ; TP → exclusion baseline | Correction manuelle des erreurs systématiques | -| 5 | Triple ensemble : XGBoost corrige les erreurs systématiques EIF | Supervisé corrige les biais de l'non-supervisé | +| 5 | Triple ensemble : HAT (River) corrige les erreurs systématiques EIF | Supervisé online corrige les biais de l'non-supervisé | #### Impact du feedback SOC @@ -109,14 +110,14 @@ Le fingerprinting réseau opère sans déchiffrement TLS (les métadonnées TLS | Composant | Temps d'exécution | Conditions | |-----------|------------------|------------| | EIF training | < 2 secondes | ~34 000 sessions, 96 features | -| AE inference | ~50 ms | Batch de 34 000 sessions | -| XGBoost inference | ~30 ms | Batch de 34 000 sessions | +| NF (Normalizing Flow) inference | ~50 ms | Batch de 34 000 sessions | +| HAT (River) inference | ~30 ms | Batch de 34 000 sessions | | HDBSCAN (anomalies) | ~100 ms | ~34 000 sessions, espace latent AE | | HDBSCAN (profiling) | ~2–5 s | Quotidien, ~200k sessions H2 dédupliquées, min_cluster=1000 | | Dynamic matcher scoring | < 1 ms | Par session, lookup en mémoire contre ~5–10 profils | | GraphSAGE (fleet.py) | ~80 ms | Graphe d'IPs, 2 couches SAGEConv, GPU/CPU | | Fusion MLP | < 10 ms | MLP 2 couches, négligeable | -| **Cycle complet** | **~300 secondes** | EIF + AE + XGBoost + HDBSCAN + GraphSAGE | +| **Cycle complet** | **~300 secondes** | EIF + NF + HAT + HDBSCAN + GraphSAGE | La durée du cycle (300 s = 5 minutes) est contrainte principalement par la **fenêtre d'agrégation ClickHouse** (1 heure glissante avec recalcul toutes les 5 minutes), non par les temps d'exécution ML. @@ -133,7 +134,7 @@ La durée du cycle (300 s = 5 minutes) est contrainte principalement par la **fe - À 34 000 sessions/cycle : ~100 ms — acceptable - À 500 000 sessions/cycle (scaling ×15) : ~2 s — encore tolérable -**Fusion MLP** : O(n × d) inférence avec d = 3 features d'entrée (scores EIF, NF, XGBoost), MLP 2 couches (16 neurones). Temps négligeable quelle que soit la taille. +**Fusion MLP** : O(n × d) inférence avec d = 3 features d'entrée (scores EIF, NF, HAT), MLP 2 couches (16 neurones). Temps négligeable quelle que soit la taille. **Limite architecturale principale** : le modèle supervisé (Hoeffding Adaptive Tree) s'améliore incrémentalement à chaque cycle via `learn_one()`, mais nécessite un flux continu de labels fiables. À faible volume de labels (< 500 sessions étiquetées), le HAT converge lentement. Ce problème est partiellement atténué par le filtrage Cleanlab qui élimine les labels douteux (détail §3.8), mais la qualité du feedback SOC reste le goulot d'étranglement principal. @@ -176,7 +177,7 @@ Ce document présente un système opérationnel déployé en production, mais so **Le chiffre de "777 anomalies par cycle (≈ 2,3 %)"** est un compteur opérationnel brut : il mesure le nombre de sessions dépassant le seuil d'anomalie configuré, mais ne distingue pas les vrais positifs des faux positifs. En l'absence de ground truth systématique, ce chiffre ne constitue pas un indicateur de performance de détection. -**Conséquence** : les choix architecturaux (EIF bifurqué, ensemble triple-voix, poids de la fusion LR) sont motivés par des arguments qualitatifs et l'expérience opérationnelle, mais ne sont pas validés par une évaluation quantitative contrôlée. La priorité immédiate pour les travaux futurs est l'établissement d'un protocole d'évaluation sur un dataset labellisé, avec comparaison contre des baselines (Isolation Forest seul, XGBoost seul, LOF, One-Class SVM). +**Conséquence** : les choix architecturaux (EIF bifurqué, ensemble triple-voix, poids de la fusion MLP) sont motivés par des arguments qualitatifs et l'expérience opérationnelle, mais ne sont pas validés par une évaluation quantitative contrôlée. La priorité immédiate pour les travaux futurs est l'établissement d'un protocole d'évaluation sur un dataset labellisé, avec comparaison contre des baselines (Isolation Forest seul, HAT seul, LOF, One-Class SVM). ### 6.7 Travaux futurs et roadmap diff --git a/docs/thesis/08_conclusion_references.md b/docs/thesis/08_conclusion_references.md index e0e7303..78d44a6 100644 --- a/docs/thesis/08_conclusion_references.md +++ b/docs/thesis/08_conclusion_references.md @@ -19,13 +19,13 @@ Un système de détection à couverture complète couvrant cinq couches réseau Un pipeline ML combinant : - **Isolation Forest Étendu (EIF)** ([Hariri et al., 2021](https://ieeexplore.ieee.org/document/8888179)) : modèle non-supervisé fondé sur l'isolation aléatoire d'instances anormales dans des espaces de features basse-dimension -- **Autoencodeur variationnel (AE)** ([Mirsky et al., NDSS 2018](https://www.ndss-symposium.org/ndss-paper/kitsune-an-ensemble-of-autoencoders-for-online-network-intrusion-detection/)) : détection d'anomalies par reconstruction, capturant les corrélations entre features -- **XGBoost supervisé** : correction des erreurs systématiques des modèles non-supervisés via labels SOC accumulés -- **Fusion par MLP méta-modèle** : fusion non-linéaire des trois scores en un score final calibré +- **Normalizing Flow (RealNVP)** : détection d'anomalies par vraisemblance, capturant les corrélations jointes entre features via Deep Ensemble (M=5) +- **HAT supervisé (Hoeffding Adaptive Tree, River)** : correction des erreurs systématiques des modèles non-supervisés via labels SOC accumulés, apprentissage incrémental par `learn_one()` +- **Fusion MLP non-linéaire** : fusion non-linéaire des trois scores en un score final calibré -Le pipeline intègre un mécanisme de **détection de dérive conceptuelle** (basé sur le percentile 5 des scores négatifs) distinguant la dérive organique (évolution naturelle du trafic) de la dérive adversariale (manipulation intentionnelle de la distribution). +Le pipeline intègre un mécanisme de **détection de dérive conceptuelle** via ADWIN (Adaptive Windowing) distinguant la dérive organique (évolution naturelle du trafic) de la dérive adversariale (variance épistémique élevée du NFEnsemble). -L'**explainabilité** est assurée par l'importance des features par profondeur d'isolation (approche de type ExIFFI) pour l'EIF et SHAP ([Lundberg & Lee, 2017](https://shap.readthedocs.io/)) pour XGBoost, permettant l'audit des décisions de blocage par l'équipe SOC. +L'**explainabilité** est assurée par l'importance des features par profondeur d'isolation (approche de type ExIFFI) pour l'EIF et SHAP ([Lundberg & Lee, 2017](https://shap.readthedocs.io/)) pour le HAT, permettant l'audit des décisions de blocage par l'équipe SOC. #### Composant 3 : Fingerprinting HTTP/2 passif structuré (browser_matcher) @@ -76,7 +76,7 @@ Architecture de données fondée sur ClickHouse avec **AggregatingMergeTree view ### Perspective -Le système atteint ses objectifs opérationnels actuels. La capture HTTP/2 passive est intégrée avec 12 colonnes individuelles dans `ja4_logs.http_logs`, et le module `browser_matcher` est opérationnel avec ses 7 dimensions de scoring statique. Le moteur de profiling dynamique automatique (§3.9.6) complète le système statique en apprenant les signatures H2 à partir du trafic réel, éliminant la dépendance aux signatures codées en dur. Les axes d'amélioration prioritaires sont le monitoring de la convergence des clusters dynamiques, l'extension DNS Shadow Analysis pour la couverture DNS (`[todo]` → `[partiel]`), et le passage à l'apprentissage en ligne pour XGBoost. À plus long terme, le support HTTP/3 (QUIC) deviendra nécessaire à mesure que la proportion de trafic HTTP/3 augmente dans la baseline. +Le système atteint ses objectifs opérationnels actuels. La capture HTTP/2 passive est intégrée avec 12 colonnes individuelles dans `ja4_logs.http_logs`, et le module `browser_matcher` est opérationnel avec ses 7 dimensions de scoring statique. Le moteur de profiling dynamique automatique (§3.9.6) complète le système statique en apprenant les signatures H2 à partir du trafic réel, éliminant la dépendance aux signatures codées en dur. Les axes d'amélioration prioritaires sont le monitoring de la convergence des clusters dynamiques et l'extension DNS Shadow Analysis pour la couverture DNS (`[todo]` → `[partiel]`). Le passage à l'apprentissage en ligne via HAT (River) est effectif depuis la section 6.7. À plus long terme, le support HTTP/3 (QUIC) deviendra nécessaire à mesure que la proportion de trafic HTTP/3 augmente dans la baseline. La modélisation des phases d'attaque séquentielles par des modèles d'état-espace constitue une piste de recherche prometteuse, qui permettrait de modéliser explicitement les phases d'attaque séquentielles — comblant la lacune actuelle entre la détection de sessions individuelles et la détection de campagnes d'attaque coordonnées multi-phases. @@ -236,8 +236,8 @@ arXiv preprint arXiv:1210.0921. [35] **HDBSCAN Python library** — Implémentation performante de l'algorithme HDBSCAN. [https://hdbscan.readthedocs.io/en/latest/](https://hdbscan.readthedocs.io/en/latest/) -[36] **XGBoost** — Bibliothèque de gradient boosting optimisée. -[https://xgboost.readthedocs.io/en/stable/](https://xgboost.readthedocs.io/en/stable/) +[36] **River** — Bibliothèque d'apprentissage incrémental et stream mining. +[https://riverml.xyz/](https://riverml.xyz/) --- diff --git a/services/bot-detector/bot_detector/config.py b/services/bot-detector/bot_detector/config.py index bff391c..ee9f3d1 100644 --- a/services/bot-detector/bot_detector/config.py +++ b/services/bot-detector/bot_detector/config.py @@ -72,6 +72,11 @@ HEALTH_PORT = int(os.getenv('HEALTH_PORT', '8080')) DEDUP_TTL_MIN = int(os.getenv('DEDUP_TTL_MIN', '60')) RECURRENCE_WEIGHT = _require_float('RECURRENCE_WEIGHT', 0.005) +# ─── Baseline minimum — nombre minimum de sessions humaines pour l'IF ───── +MIN_HUMAN_BASELINE = int(os.getenv('MIN_HUMAN_BASELINE', '500')) +# En mode test, les IPs privées n'ont pas d'ASN 'isp' — utiliser 'unknown' comme fallback +BASELINE_ACCEPT_UNKNOWN = os.getenv('BASELINE_ACCEPT_UNKNOWN', 'false').lower() == 'true' + # ─── Autoencoder (AE) — second scorer parallèle ──────────────────────────── AE_WEIGHT = _require_float('AE_WEIGHT', 0.30, 0, 1) @@ -79,6 +84,9 @@ AE_EPOCHS = int(os.getenv('AE_EPOCHS', '50')) AE_LATENT_DIM = int(os.getenv('AE_LATENT_DIM', '16')) AE_LEARNING_RATE = float(os.getenv('AE_LEARNING_RATE', '1e-3')) +# ─── NFEnsemble — Deep Ensemble (M=5) incertitude ────────────────────────── +NF_UNCERTAINTY_THRESHOLD = float(os.getenv('NF_UNCERTAINTY_THRESHOLD', '1.0')) + SESSION_TRANSFORMER_PATH = os.getenv( 'SESSION_TRANSFORMER_PATH', os.path.join(MODEL_DIR, 'session_transformer.pt') diff --git a/services/bot-detector/bot_detector/cycle.py b/services/bot-detector/bot_detector/cycle.py index 7ca3630..1e465cb 100644 --- a/services/bot-detector/bot_detector/cycle.py +++ b/services/bot-detector/bot_detector/cycle.py @@ -218,16 +218,28 @@ def fetch_and_analyze(): if not unknown_h2.empty: n_unknown = len(unknown_h2) # Insérer les fingerprints inconnus dans la table ClickHouse - client.command( - "INSERT INTO ja4_processing.unknown_h2_fingerprints " - "(observed_at, src_ip, ja4, h2_fingerprint, h2_settings_fp, " - "h2_window_update, h2_pseudo_order, h2_has_priority, " - "browser_confidence_score, header_user_agent, tls_version) " - "SELECT now(), src_ip, ja4, h2_fingerprint, h2_settings_fp, " - "h2_window_update, h2_pseudo_order, h2_has_priority, " - "browser_confidence, header_user_agent, tls_version " - "FROM input" - ) + cols = [ + 'observed_at', 'src_ip', 'ja4', 'h2_fingerprint', 'h2_settings_fp', + 'h2_window_update', 'h2_pseudo_order', 'h2_has_priority', + 'browser_confidence_score', 'header_user_agent', 'tls_version', + ] + rows = [] + for _, row in unknown_h2.iterrows(): + rows.append({ + 'observed_at': row.get('time', ''), + 'src_ip': row.get('src_ip', ''), + 'ja4': row.get('ja4', ''), + 'h2_fingerprint': row.get('h2_fingerprint', ''), + 'h2_settings_fp': row.get('h2_settings_fp', ''), + 'h2_window_update': int(row.get('h2_window_update', 0)), + 'h2_pseudo_order': row.get('h2_pseudo_order', ''), + 'h2_has_priority': int(row.get('h2_has_priority', 0)), + 'browser_confidence_score': float(row.get('browser_confidence', 0.0)), + 'header_user_agent': row.get('header_user_agent', ''), + 'tls_version': row.get('tls_version', ''), + }) + client.insert('ja4_processing.unknown_h2_fingerprints', rows, + column_names=cols) log_info(f'[H2 Queue] {n_unknown} fingerprint(s) H2 inconnu(s) mis en file d\'examen.') except Exception as e: log_info(f'[H2 Queue] Erreur insertion unknown_h2_fingerprints : {e}') @@ -324,8 +336,12 @@ def fetch_and_analyze(): log_info('') log_info(f'── Modèle Applicatif (L7 seul, non-corrélé) : {len(df_uncorr)} sessions, {len(feats)} features ──') anom_b, scored_b = run_semi_supervised_logic(df_uncorr, feats, 'Applicatif', cycle_id, recurrence_map) - all_anom = pd.concat([anom_a, anom_b], ignore_index=True) - all_scored = pd.concat([scored_a, scored_b], ignore_index=True) + _anom_dfs = [df for df in [anom_a, anom_b] + if df is not None and not df.empty] + all_anom = pd.concat(_anom_dfs, ignore_index=True) if _anom_dfs else pd.DataFrame() + _scored_dfs = [df for df in [scored_a, scored_b] + if df is not None and not df.empty] + all_scored = pd.concat(_scored_dfs, ignore_index=True) if _scored_dfs else pd.DataFrame() # ── A3 : Analyse fenêtre 24h (optionnelle) ──────────────────────────────── if ENABLE_MULTIWINDOW: @@ -336,8 +352,12 @@ def fetch_and_analyze(): log_info(f"[24h] {len(df_24h)} sessions dans la fenêtre 24h.") anom_c, scored_c = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 1].copy(), feats_complet, 'Complet_24h', cycle_id, recurrence_map) anom_d, scored_d = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 0].copy(), feats, 'Applicatif_24h', cycle_id, recurrence_map) - all_anom_24h = pd.concat([anom_c, anom_d], ignore_index=True) - all_scored_24h = pd.concat([scored_c, scored_d], ignore_index=True) + _anom_24h_dfs = [df for df in [anom_c, anom_d] + if df is not None and not df.empty] + all_anom_24h = pd.concat(_anom_24h_dfs, ignore_index=True) if _anom_24h_dfs else pd.DataFrame() + _scored_24h_dfs = [df for df in [scored_c, scored_d] + if df is not None and not df.empty] + all_scored_24h = pd.concat(_scored_24h_dfs, ignore_index=True) if _scored_24h_dfs else pd.DataFrame() # Fusion : pour les IPs présentes dans les deux fenêtres, conserver le score le plus bas if not all_anom_24h.empty: all_anom = pd.concat([all_anom, all_anom_24h], ignore_index=True) diff --git a/services/bot-detector/bot_detector/metrics.py b/services/bot-detector/bot_detector/metrics.py index eccd9fa..91e8bbe 100644 --- a/services/bot-detector/bot_detector/metrics.py +++ b/services/bot-detector/bot_detector/metrics.py @@ -106,31 +106,40 @@ def record_cycle_metrics( _emit_alerts(model_name, anomaly_rate, drift_rate, correlated_rate, latency_ms, drift_alert) try: - client.execute( - f"INSERT INTO {db}.ml_performance_metrics VALUES", - [{ - 'cycle_at': now, - 'model_name': model_name, - 'total_sessions': n_total, - 'correlated_rate': round(float(correlated_rate), 4), - 'anomaly_rate': round(float(anomaly_rate), 4), - 'critical_count': n_critical, - 'high_count': n_high, - 'medium_count': n_medium, - 'low_count': n_low, - 'known_bot_count': n_known_bot, - 'anubis_deny_count': n_anubis_deny, - 'legit_browser_count': n_legit_browser, - 'drift_rate': round(float(drift_rate), 4), - 'drift_alert': drift_alert, - 'cycle_latency_ms': latency_ms, - 'features_valid': valid_features, - 'features_total': total_features, - 'baseline_size': baseline_size, - 'threshold': round(float(threshold), 6), - 'meta_learner_active': 1 if meta_learner_active else 0, - }] + # Vérifier que la table existe avant d'insérer (optionnelle) + table_check = client.query( + f"SELECT name FROM system.tables WHERE database = '{db}' AND name = 'ml_performance_metrics'" ) + if not table_check.result_rows: + logger.debug("[Metrics] Table ml_performance_metrics absente — métriques non enregistrées") + return + + client.insert( + f"{db}.ml_performance_metrics", + [[ + now, + model_name, + n_total, + round(float(correlated_rate), 4), + round(float(anomaly_rate), 4), + n_critical, + n_high, + n_medium, + n_low, + n_known_bot, + n_anubis_deny, + n_legit_browser, + round(float(drift_rate), 4), + drift_alert, + latency_ms, + valid_features, + total_features, + baseline_size, + round(float(threshold), 6), + 1 if meta_learner_active else 0, + ]] + ) + logger.debug(f"[Metrics] Cycle {cycle_id} enregistré ({latency_ms}ms)") except Exception as e: logger.warning(f"[Metrics] Erreur d'enregistrement des métriques : {e}") diff --git a/services/bot-detector/bot_detector/models.py b/services/bot-detector/bot_detector/models.py index 718038e..46380bd 100644 --- a/services/bot-detector/bot_detector/models.py +++ b/services/bot-detector/bot_detector/models.py @@ -203,6 +203,78 @@ class TrafficNormalizingFlow: return nf +class NFEnsemble: + """Deep Ensemble de M=5 Normalizing Flows pour quantification d'incertitude. + + Chaque membre est un TrafficNormalizingFlow indépendant, entraîné sur un + échantillon bootstrap (avec remise) de la baseline humaine. L'incertitude + (variance inter-modèles) discrimine la dérive organique (variance faible, + les modèles s'accordent) de la dérive adversariale (variance élevée, les + modèles ne s'accordent pas sur la nouveauté). + + Référence : Lakshminarayanan et al., 2017 — "Simple and Scalable Predictive + Uncertainty Estimation using Deep Ensembles" (NeurIPS). + """ + + ENSEMBLE_SIZE = 5 + + def __init__(self, n_features: int): + if not TORCH_AVAILABLE: + raise RuntimeError("PyTorch non disponible — NFEnsemble désactivé.") + self.n_features = n_features + self.models = [TrafficNormalizingFlow(n_features) for _ in range(self.ENSEMBLE_SIZE)] + + def fit(self, X: np.ndarray, epochs: int = AE_EPOCHS, lr: float = AE_LEARNING_RATE, + batch_size: int = 256) -> dict: + """Entraîne les M modèles sur des échantillons bootstrapés (avec remise).""" + n = len(X) + all_losses = [] + for i, nf in enumerate(self.models): + idx = np.random.choice(n, size=n, replace=True) + X_boot = X[idx] + stats = nf.fit(X_boot, epochs=epochs, lr=lr, batch_size=batch_size) + all_losses.append(stats['final_loss']) + return { + 'final_losses': all_losses, + 'mean_loss': float(np.mean(all_losses)), + 'ensemble_size': self.ENSEMBLE_SIZE, + 'n_samples': n, + } + + def predict_anomalies(self, X: np.ndarray) -> tuple: + """Retourne (mean_score, uncertainty_score) — tuple de np.ndarray. + + mean_score : moyenne des -log p(x) sur les M modèles. + uncertainty_score : variance des -log p(x) sur les M modèles. + """ + scores = np.stack([nf.score_samples(X) for nf in self.models], axis=0) + return scores.mean(axis=0), scores.var(axis=0) + + def score_samples(self, X: np.ndarray) -> np.ndarray: + """Compatibilité : retourne mean_score seul (comme TrafficNormalizingFlow).""" + mean, _ = self.predict_anomalies(X) + return mean + + def encode(self, X: np.ndarray) -> np.ndarray: + """Espace latent moyen sur l'ensemble.""" + latents = np.stack([nf.encode(X) for nf in self.models], axis=0) + return latents.mean(axis=0) + + def state_dict(self) -> dict: + return { + 'ensemble_size': self.ENSEMBLE_SIZE, + 'n_features': self.n_features, + 'members': [nf.state_dict() for nf in self.models], + } + + @classmethod + def load_state_dict(cls, state: dict) -> 'NFEnsemble': + ens = cls(state['n_features']) + for i, member_state in enumerate(state['members']): + ens.models[i] = TrafficNormalizingFlow.load_state_dict(member_state) + return ens + + def _ae_model_path(name: str, version_id: str) -> str: return os.path.join(MODEL_DIR, f'ae_{name}_{version_id}.pt') @@ -411,7 +483,7 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, dérive, entraîne un nouveau modèle sur ``human_baseline``, le sérialise sur disque, met à jour le fichier pointeur et purge les anciennes versions. - Retourne (IsolationForest, TrafficNormalizingFlow|None, list[str] features). + Retourne (IsolationForest, NFEnsemble|None, list[str] features). """ model_path, meta = _get_current_version(name) if model_path and meta: @@ -455,8 +527,8 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, ae_path = _ae_model_path(name, meta['version_id']) if os.path.exists(ae_path): try: - ae_loaded = TrafficNormalizingFlow.load_state_dict(torch.load(ae_path, weights_only=False)) - log_info(f"[{name}] Normalizing Flow v{meta['version_id']} rechargé.") + ae_loaded = NFEnsemble.load_state_dict(torch.load(ae_path, weights_only=False)) + log_info(f"[{name}] NFEnsemble v{meta['version_id']} rechargé (M={NFEnsemble.ENSEMBLE_SIZE}).") except Exception as exc: log_info(f"[{name}] Erreur chargement AE : {exc} — AE désactivé ce cycle.") return joblib.load(model_path), ae_loaded, meta.get('features', features) @@ -519,7 +591,7 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, ae_prev_path = _ae_model_path(name, meta.get('version_id', '')) if os.path.exists(ae_prev_path): try: - ae_prev = TrafficNormalizingFlow.load_state_dict(torch.load(ae_prev_path, weights_only=False)) + ae_prev = NFEnsemble.load_state_dict(torch.load(ae_prev_path, weights_only=False)) except Exception: pass return joblib.load(model_path), ae_prev, meta.get('features', features) @@ -539,17 +611,17 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, new_meta_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.meta.json') joblib.dump(model, new_model_path) - # Entraînement du Normalizing Flow en parallèle (si PyTorch disponible et AE_WEIGHT > 0) + # Entraînement du NFEnsemble (M=5) en parallèle (si PyTorch disponible et AE_WEIGHT > 0) ae_model = None if TORCH_AVAILABLE and AE_WEIGHT > 0: try: - ae_model = TrafficNormalizingFlow(n_features=len(features)) + ae_model = NFEnsemble(n_features=len(features)) ae_stats = ae_model.fit(X_train.values) ae_path = _ae_model_path(name, version_id) torch.save(ae_model.state_dict(), ae_path) - log_info(f"[{name}] Normalizing Flow entraîné : NLL={ae_stats['final_loss']:.6f}, epochs={ae_stats['epochs']}") + log_info(f"[{name}] NFEnsemble entraîné (M={NFEnsemble.ENSEMBLE_SIZE}) : NLL moyen={ae_stats['mean_loss']:.6f}") except Exception as exc: - log_info(f"[{name}] Normalizing Flow training échoué : {exc} — NF désactivé.") + log_info(f"[{name}] NFEnsemble training échoué : {exc} — NF désactivé.") ae_model = None previous_version = meta.get('version_id', None) if meta else None diff --git a/services/bot-detector/bot_detector/pipeline.py b/services/bot-detector/bot_detector/pipeline.py index 94849b9..23563da 100644 --- a/services/bot-detector/bot_detector/pipeline.py +++ b/services/bot-detector/bot_detector/pipeline.py @@ -11,11 +11,12 @@ from .config import ( ANOMALY_THRESHOLD, ANOMALY_PERCENTILE, ENABLE_CLUSTERING, ENABLE_SHAP, EIF_AVAILABLE, TORCH_AVAILABLE, XGB_AVAILABLE, BROWSER_CONFIDENCE_THRESHOLD, BROWSER_COHORT_RATIO, - MIN_VALID_FEATURE_RATIO, STRUCTURAL_EXCLUDED_FEATURES, + MIN_VALID_FEATURE_RATIO, MIN_HUMAN_BASELINE, BASELINE_ACCEPT_UNKNOWN, STRUCTURAL_EXCLUDED_FEATURES, + NF_UNCERTAINTY_THRESHOLD, ) from .log import log_info, log_decision from .infra import score_to_threat_level, get_client -from .models import load_or_train_model, load_or_train_xgb, TrafficNormalizingFlow +from .models import load_or_train_model, load_or_train_xgb, TrafficNormalizingFlow, NFEnsemble from .scoring import ( validate_features, compute_adaptive_threshold, normalize_scores, compute_shap_top_features, build_reason, cluster_anomalies, @@ -51,13 +52,18 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): # Les DENY sont TOUJOURS inclus dans les threats, indépendamment du seuil IF. unknown_traffic = rest[rest['anubis_bot_action'] != 'ALLOW'].copy() human_baseline = unknown_traffic[unknown_traffic['asn_label'] == 'isp'] + # En mode test (BASELINE_ACCEPT_UNKNOWN), les IPs sans ASN 'isp' utilisent 'unknown' comme fallback + if len(human_baseline) < MIN_HUMAN_BASELINE and BASELINE_ACCEPT_UNKNOWN: + unknown_baseline = unknown_traffic[unknown_traffic['asn_label'] == 'unknown'] + if len(unknown_baseline) > len(human_baseline): + human_baseline = unknown_baseline log_info(f'[{name}] ── Triage ──────────────────────────────────────') log_info(f'[{name}] Total sessions : {len(df):>6}') log_info(f'[{name}] Bots connus (dict) : {len(known_bots):>6}') log_info(f'[{name}] Anubis ALLOW : {len(anubis_allow):>6}') log_info(f'[{name}] Trafic à scorer (IF) : {len(unknown_traffic):>6}') - log_info(f'[{name}] Baseline ISP (human) : {len(human_baseline):>6} (seuil min=500)') + log_info(f'[{name}] Baseline ISP (human) : {len(human_baseline):>6} (seuil min={MIN_HUMAN_BASELINE})') # §3 — Exclure les sessions ISP à faible cohérence de fingerprint de la baseline humaine # Ces sessions ISP avec un fingerprint incohérent sont probablement des proxies résidentiels @@ -81,8 +87,8 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): if valid_features is None: return pd.DataFrame(), pd.DataFrame() - if len(human_baseline) < 500: - log_info(f"[{name}] ⚠ Données humaines insuffisantes ({len(human_baseline)} < 500) — cycle ignoré.") + if len(human_baseline) < MIN_HUMAN_BASELINE: + log_info(f"[{name}] ⚠ Données humaines insuffisantes ({len(human_baseline)} < {MIN_HUMAN_BASELINE}) — cycle ignoré.") log_info(f"[{name}] Distribution asn_label dans le trafic à scorer :") if 'asn_label' in unknown_traffic.columns: for label, cnt in unknown_traffic['asn_label'].value_counts().head(8).items(): @@ -115,17 +121,38 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): log_info(f'[{name}] Scoring EIF : {len(X_test)} sessions scorées (min={raw_scores.min():.4f}, max={raw_scores.max():.4f}, mean={raw_scores.mean():.4f})') - # Combinaison EIF + Normalizing Flow si disponible + # Combinaison EIF + NFEnsemble (Deep Ensemble M=5) si disponible # Score final = (1-α) * eif_norm + α * nf_norm où α = AE_WEIGHT + # Incertitude = variance inter-modèles → détection adversariale + unknown_traffic['nf_uncertainty'] = 0.0 + unknown_traffic['is_adversarial_drift'] = False if ae_model is not None and AE_WEIGHT > 0: try: - nf_neg_ll = ae_model.score_samples(X_test.values) # -log p(x) + if isinstance(ae_model, NFEnsemble): + nf_neg_ll, nf_uncertainty = ae_model.predict_anomalies(X_test.values) + else: + nf_neg_ll = ae_model.score_samples(X_test.values) + nf_uncertainty = np.zeros(len(nf_neg_ll)) nf_norm = normalize_scores(-nf_neg_ll) # plus élevé = plus anomal eif_norm = normalize_scores(raw_scores) combined_norm = (1 - AE_WEIGHT) * eif_norm + AE_WEIGHT * nf_norm unknown_traffic['ae_recon_error'] = nf_neg_ll # nom conservé pour rétro-compatibilité + unknown_traffic['nf_uncertainty'] = nf_uncertainty + adversarial_mask = nf_uncertainty > NF_UNCERTAINTY_THRESHOLD + unknown_traffic['is_adversarial_drift'] = adversarial_mask + n_adversarial = int(adversarial_mask.sum()) unknown_traffic['anomaly_score'] = combined_norm - log_info(f"[{name}] Score combiné EIF+NF (α={AE_WEIGHT}): nf_mean={nf_neg_ll.mean():.6f}") + log_info( + f"[{name}] Score combiné EIF+NF (α={AE_WEIGHT}): nf_mean={nf_neg_ll.mean():.6f}, " + f"uncertainty_mean={nf_uncertainty.mean():.6f}, adversarial={n_adversarial}" + ) + if n_adversarial > 0: + log_decision('ADVERSARIAL_DRIFT_NF', cycle_id, name, { + 'n_adversarial': n_adversarial, + 'uncertainty_threshold': NF_UNCERTAINTY_THRESHOLD, + 'uncertainty_mean': round(float(nf_uncertainty.mean()), 6), + 'uncertainty_max': round(float(nf_uncertainty.max()), 6), + }) except Exception as exc: log_info(f"[{name}] NF scoring échoué : {exc} — utilisation EIF seul.") unknown_traffic['ae_recon_error'] = 0.0 @@ -435,12 +462,13 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): 'effective_threshold': round(effective_threshold, 4), 'reason': row.get('reason', '') }) - threats = pd.concat([df for df in [ + _threat_dfs = [df for df in [ anomalies if not anomalies.empty else None, known_bots if not known_bots.empty else None, anubis_allow if not anubis_allow.empty else None, anubis_deny if not anubis_deny.empty else None, - ] if df is not None], ignore_index=True) + ] if df is not None] + threats = pd.concat(_threat_dfs, ignore_index=True) if _threat_dfs else pd.DataFrame() # Propager campaign_id des anomalies clusterisées vers all_scored # (all_scored a été capturé avant clustering, ses campaign_id sont tous -1) diff --git a/services/bot-detector/bot_detector/scoring.py b/services/bot-detector/bot_detector/scoring.py index 60a9d4a..18bc07a 100644 --- a/services/bot-detector/bot_detector/scoring.py +++ b/services/bot-detector/bot_detector/scoring.py @@ -12,6 +12,7 @@ Regroupe les fonctions de scoring utilisées par le pipeline de détection : """ import numpy as np import pandas as pd +import torch from .config import ( ANOMALY_THRESHOLD, ANOMALY_PERCENTILE, @@ -107,7 +108,7 @@ class ADWINDriftMonitor: for feat, value in feature_means.items(): if feat in self._detectors: self._detectors[feat].update(value) - detected = self._detectors[feat].detected_change() + detected = self._detectors[feat].drift_detected changes[feat] = detected if detected: self._last_changes[feat] = True diff --git a/services/dashboard/backend/database.py b/services/dashboard/backend/database.py index e510869..28efbae 100644 --- a/services/dashboard/backend/database.py +++ b/services/dashboard/backend/database.py @@ -6,17 +6,43 @@ from typing import Any import clickhouse_connect from clickhouse_connect.driver.client import Client +from clickhouse_connect.driver.exceptions import DatabaseError from backend.config import CLICKHOUSE_HOST, CLICKHOUSE_PORT, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD logger = logging.getLogger(__name__) _client: Client | None = None +_available: bool | None = None # None = not tested yet + + +class ClickHouseUnavailable(Exception): + """Raised when ClickHouse is not reachable.""" + + +def is_available() -> bool: + """Check ClickHouse connectivity (retries on every call if previously failed).""" + global _client, _available + try: + # Force re-creation if previously marked unavailable + if _available is False: + _client = None + get_client() + _available = True + return True + except Exception: + _available = False + _client = None + logger.warning("ClickHouse unavailable at %s:%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT) + return False def get_client() -> Client: - """Return a lazily-initialised ClickHouse client (singleton).""" - global _client + """Return a lazily-initialised ClickHouse client (singleton). + + Resets the singleton on connection failure so the next call retries. + """ + global _client, _available if _client is None: _client = clickhouse_connect.get_client( host=CLICKHOUSE_HOST, @@ -25,9 +51,17 @@ def get_client() -> Client: password=CLICKHOUSE_PASSWORD, ) logger.info("Connected to ClickHouse at %s:%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT) + _available = True return _client +def _mark_unavailable() -> None: + """Reset client and mark ClickHouse as unavailable.""" + global _client, _available + _client = None + _available = False + + def _normalise_value(v: Any) -> Any: """Convert ClickHouse-specific types to JSON-friendly Python types.""" if isinstance(v, (ipaddress.IPv4Address, ipaddress.IPv6Address)): @@ -41,26 +75,47 @@ def _normalise_value(v: Any) -> Any: def query(sql: str, params: dict | None = None) -> list[dict[str, Any]]: - """Execute *sql* and return a list of row-dicts.""" - client = get_client() - result = client.query(sql, parameters=params or {}) - columns = result.column_names - rows: list[dict[str, Any]] = [] - for row in result.result_rows: - rows.append({col: _normalise_value(val) for col, val in zip(columns, row)}) - return rows + """Execute *sql* and return a list of row-dicts. + + Raises ClickHouseUnavailable if the server is not reachable. + """ + try: + client = get_client() + result = client.query(sql, parameters=params or {}) + columns = result.column_names + rows: list[dict[str, Any]] = [] + for row in result.result_rows: + rows.append({col: _normalise_value(val) for col, val in zip(columns, row)}) + return rows + except (DatabaseError, ConnectionError, OSError) as exc: + _mark_unavailable() + raise ClickHouseUnavailable(str(exc)) from exc def query_scalar(sql: str, params: dict | None = None) -> Any: - """Execute *sql* and return the single scalar value.""" - client = get_client() - result = client.query(sql, parameters=params or {}) - if result.result_rows: - return _normalise_value(result.result_rows[0][0]) - return None + """Execute *sql* and return the single scalar value. + + Raises ClickHouseUnavailable if the server is not reachable. + """ + try: + client = get_client() + result = client.query(sql, parameters=params or {}) + if result.result_rows: + return _normalise_value(result.result_rows[0][0]) + return None + except (DatabaseError, ConnectionError, OSError) as exc: + _mark_unavailable() + raise ClickHouseUnavailable(str(exc)) from exc def execute(sql: str, params: dict | None = None) -> None: - """Execute a DDL / DML statement that returns no rows.""" - client = get_client() - client.command(sql, parameters=params or {}) + """Execute a DDL / DML statement that returns no rows. + + Raises ClickHouseUnavailable if the server is not reachable. + """ + try: + client = get_client() + client.command(sql, parameters=params or {}) + except (DatabaseError, ConnectionError, OSError) as exc: + _mark_unavailable() + raise ClickHouseUnavailable(str(exc)) from exc diff --git a/services/dashboard/backend/main.py b/services/dashboard/backend/main.py index 502763b..ccb62fa 100644 --- a/services/dashboard/backend/main.py +++ b/services/dashboard/backend/main.py @@ -4,15 +4,28 @@ from __future__ import annotations import logging -from fastapi import FastAPI +from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates +from backend.database import ClickHouseUnavailable, is_available from backend.routes.api import router as api_router from backend.routes.pages import router as pages_router logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") +_templates = Jinja2Templates(directory="backend/templates") + +_PAGE_MAP = { + "/": "overview", "/detections": "detections", "/scores": "scores", + "/traffic": "traffic", "/classify": "classify", "/features": "features", + "/models": "models", "/network": "network", "/campaigns": "campaigns", + "/tactics": "tactics", "/reflists": "reflists", "/fleet": "fleet", + "/health": "health", "/browsers": "browsers", "/fingerprints": "fingerprints", +} + app = FastAPI(title="JA4 SOC Dashboard", version="1.0.0") # CORS — allow all origins for dashboard access @@ -24,6 +37,29 @@ app.add_middleware( allow_headers=["*"], ) + +@app.exception_handler(ClickHouseUnavailable) +async def ch_unavailable_handler(request: Request, exc: ClickHouseUnavailable): + """Return 503 for API calls, render degraded pages for HTML requests.""" + accept = request.headers.get("accept", "") + path = request.url.path + + # If the client expects JSON (API call), return 503 JSON + if "application/json" in accept or path.startswith("/api/"): + return JSONResponse( + status_code=503, + content={"detail": "ClickHouse unavailable", "error": str(exc)}, + ) + + # For HTML pages, render the template with ch_available=False + page_name = _PAGE_MAP.get(path, "overview") + return _templates.TemplateResponse( + f"{page_name}.html", + {"request": request, "active_page": page_name, "ch_available": False}, + status_code=503, + ) + + # Static assets app.mount("/static", StaticFiles(directory="backend/static"), name="static") @@ -32,6 +68,7 @@ app.include_router(api_router) app.include_router(pages_router) -@app.get("/health") -async def health(): - return {"status": "ok"} +@app.get("/api/healthcheck") +async def healthcheck(): + ch = is_available() + return {"status": "ok" if ch else "degraded", "clickhouse": "up" if ch else "down"} diff --git a/services/dashboard/backend/routes/api.py b/services/dashboard/backend/routes/api.py index ed5e039..a02d1d8 100644 --- a/services/dashboard/backend/routes/api.py +++ b/services/dashboard/backend/routes/api.py @@ -13,6 +13,8 @@ from typing import Any from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel +from backend.database import ClickHouseUnavailable + from backend.config import DB_PROCESSING, DB_LOGS, safe_identifier from backend.database import query, query_scalar, execute @@ -29,6 +31,17 @@ _SHAP_RE = re.compile(r"(?:SHAP|ExIFFI):\s*(.+?)(?:\s*\|\s*Threat|$)") _FEAT_RE = re.compile(r"(\w+)\(([+-]?\d+\.\d+)\)") +def _ch_fallback(exc: Exception) -> None: + """Raise ClickHouseUnavailable for connection errors, re-raise otherwise.""" + if isinstance(exc, ClickHouseUnavailable): + raise + # Detect connection-level errors from clickhouse_connect + err_msg = str(exc).lower() + if "connection" in err_msg or "refused" in err_msg or "unavailable" in err_msg: + raise ClickHouseUnavailable(str(exc)) from exc + raise HTTPException(status_code=500, detail=str(exc)) from exc + + def _aggregate_shap_importance(reasons: list[str]) -> list[dict]: """Agrège les valeurs SHAP/ExIFFI extraites des champs reason.""" totals: dict[str, float] = defaultdict(float) @@ -171,7 +184,7 @@ async def overview() -> dict[str, Any]: } except Exception as exc: logger.exception("overview query failed") - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) # --------------------------------------------------------------------------- @@ -253,7 +266,7 @@ async def detections( } except Exception as exc: logger.exception("detections query failed") - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) # --------------------------------------------------------------------------- @@ -330,7 +343,7 @@ async def scores( } except Exception as exc: logger.exception("scores query failed") - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) # --------------------------------------------------------------------------- @@ -401,7 +414,7 @@ async def traffic( } except Exception as exc: logger.exception("traffic query failed") - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) # --------------------------------------------------------------------------- @@ -470,7 +483,7 @@ async def ip_detail(ip: str) -> dict[str, Any]: } except Exception as exc: logger.exception("ip detail query failed for %s", ip) - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) # --------------------------------------------------------------------------- @@ -932,7 +945,7 @@ async def classify(body: ClassifyRequest) -> dict[str, Any]: return {"status": "ok", "src_ip": body.src_ip, "classification": body.classification} except Exception as exc: logger.exception("classify insert failed") - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) # --------------------------------------------------------------------------- @@ -1403,7 +1416,7 @@ async def ja4_detail(fingerprint: str) -> dict[str, Any]: } except Exception as exc: logger.exception("ja4 detail query failed for %s", fingerprint) - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) # --------------------------------------------------------------------------- @@ -1526,7 +1539,7 @@ async def cluster_detail(cid: int) -> dict[str, Any]: } except Exception as exc: logger.exception("cluster detail query failed for %s", cid) - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) # ═══════════════════════════════════════════════════════════════════════════════ @@ -1545,9 +1558,9 @@ async def dictionaries_meta(): "ORDER BY name", ) return {"dictionaries": rows} - except Exception as exc: - logger.exception("dictionaries meta query failed") - raise HTTPException(status_code=500, detail=str(exc)) + except Exception: + logger.debug("dictionaries meta query failed — ClickHouse may be unavailable") + return {"dictionaries": []} _REFLIST_SORT = { @@ -1640,7 +1653,7 @@ async def reflist( return {"name": name, "total": total, "limit": limit, "offset": offset, "rows": rows} except Exception as exc: logger.exception("reflist query failed for %s", name) - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) @router.get("/reflist/{name}/stats") @@ -1695,34 +1708,48 @@ async def reflist_stats(name: str): return {"name": name, "total": total, "breakdown": agg} except Exception as exc: logger.exception("reflist stats query failed for %s", name) - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) @router.get("/fleet") async def fleet() -> dict[str, Any]: """Détections de flottes JA4×ASN (§5.2).""" - rows = query( - f"SELECT detected_at, community_id, fleet_score, n_ips, ja4_set, asn_set, ip_sample " - f"FROM {_DB}.fleet_detections " - f"WHERE detected_at >= now() - INTERVAL 7 DAY " - f"ORDER BY fleet_score DESC " - f"LIMIT 100" - ) + try: + rows = query( + f"SELECT detected_at, community_id, fleet_score, n_ips, ja4_set, asn_set, ip_sample " + f"FROM {_DB}.fleet_detections " + f"WHERE detected_at >= now() - INTERVAL 7 DAY " + f"ORDER BY fleet_score DESC " + f"LIMIT 100" + ) + except ClickHouseUnavailable: + raise + except Exception as exc: + _ch_fallback(exc) + rows = [] return {"fleets": rows} @router.get("/health") async def health_metrics() -> dict[str, Any]: """Métriques de santé du pipeline ML (Étape 9).""" - rows = query( - f"SELECT cycle_at, model_name, total_sessions, correlated_rate, anomaly_rate, " - f" critical_count, high_count, drift_rate, drift_alert, cycle_latency_ms, " - f" features_valid, features_total, baseline_size, meta_learner_active " - f"FROM {_DB}.ml_performance_metrics " - f"WHERE cycle_at >= now() - INTERVAL 7 DAY " - f"ORDER BY cycle_at DESC " - f"LIMIT 500" - ) + try: + rows = query( + f"SELECT cycle_at, model_name, total_sessions, correlated_rate, anomaly_rate, " + f" critical_count, high_count, medium_count, low_count, " + f" known_bot_count, anubis_deny_count, legit_browser_count, " + f" drift_rate, drift_alert, cycle_latency_ms, " + f" features_valid, features_total, baseline_size, threshold, meta_learner_active " + f"FROM {_DB}.ml_performance_metrics " + f"WHERE cycle_at >= now() - INTERVAL 7 DAY " + f"ORDER BY cycle_at DESC " + f"LIMIT 500" + ) + except ClickHouseUnavailable: + raise + except Exception as exc: + _ch_fallback(exc) + rows = [] # Statistiques de synthèse if rows: latest = {r['model_name']: r for r in rows} @@ -1895,9 +1922,9 @@ async def browser_sig_entries() -> dict[str, Any]: f"ORDER BY browser_family" ) return {"entries": rows, "total": len(rows), "source": "dict_csv", "readonly": True} - except Exception as exc: - logger.exception("browser_h2 entries fallback failed") - raise HTTPException(status_code=500, detail=str(exc)) + except Exception: + logger.debug("browser_h2 entries fallback failed — ClickHouse may be unavailable") + return {"entries": [], "total": 0, "source": "unavailable"} @router.post("/browser-signatures/entries", status_code=201) @@ -1932,7 +1959,7 @@ async def browser_sig_add(body: BrowserH2Entry) -> dict[str, Any]: return {"status": "ok", "h2_fingerprint": body.h2_fingerprint.strip()} except Exception as exc: logger.exception("browser_h2_signatures insert failed") - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) @router.delete("/browser-signatures/entries") @@ -1953,7 +1980,7 @@ async def browser_sig_delete(fingerprint: str = Query(...)) -> dict[str, Any]: return {"status": "ok", "deleted": fingerprint.strip()} except Exception as exc: logger.exception("browser_h2_signatures delete failed") - raise HTTPException(status_code=500, detail=str(exc)) + _ch_fallback(exc) # --------------------------------------------------------------------------- @@ -2042,8 +2069,8 @@ async def fingerprint_discovery( {"days": days, "min_hits": min_hits, "lim": limit}, ) except Exception as exc: - logger.exception("fingerprint-discovery query failed") - raise HTTPException(status_code=500, detail=str(exc)) + logger.debug("fingerprint-discovery query failed — ClickHouse may be unavailable") + return {"profiles": [], "groups": [], "meta": {"total_ja4": 0, "total_groups": 0, "days": days, "min_hits": min_hits}} # ── Regroupement par famille navigateur côté Python ── groups: dict[str, dict[str, Any]] = {} diff --git a/services/dashboard/backend/routes/pages.py b/services/dashboard/backend/routes/pages.py index 632f370..06a17ec 100644 --- a/services/dashboard/backend/routes/pages.py +++ b/services/dashboard/backend/routes/pages.py @@ -5,12 +5,14 @@ from __future__ import annotations from fastapi import APIRouter, Request from fastapi.templating import Jinja2Templates +from backend.database import is_available + router = APIRouter() templates = Jinja2Templates(directory="backend/templates") def _ctx(request: Request, page: str, **extra) -> dict: - return {"request": request, "active_page": page, **extra} + return {"request": request, "active_page": page, "ch_available": is_available(), **extra} @router.get("/") diff --git a/services/dashboard/backend/templates/base.html b/services/dashboard/backend/templates/base.html index 6261ef0..4a1c269 100644 --- a/services/dashboard/backend/templates/base.html +++ b/services/dashboard/backend/templates/base.html @@ -219,6 +219,12 @@ {% block header_actions %}{% endblock %} + {% if not ch_available %} +
+ {% endif %}