diff --git a/docs/thesis/00_resume.md b/docs/thesis/00_resume.md index f77b556..d9b75ad 100644 --- a/docs/thesis/00_resume.md +++ b/docs/thesis/00_resume.md @@ -11,7 +11,7 @@ ## Résumé -Ce document présente une architecture opérationnelle de détection et classification du trafic HTTP malveillant, s'inscrivant dans la continuité des approches de génération 3 (fingerprinting multi-protocole et ML comportemental). Le système exploite 96 features organisées en 8 familles couvrant les couches réseau L3 à L7, corrélant des signaux TCP, TLS et HTTP en un vecteur unifié par session. La détection repose sur un ensemble triple-voix combinant un Extended Isolation Forest (EIF), un autoencodeur (AE) et XGBoost, fusionnés par une régression logistique calibrée activée à partir de 1 000 étiquettes accumulées. L'explicabilité est assurée par l'importance des features par profondeur d'isolation (EIF) et SHAP TreeExplainer (XGBoost). Le clustering de campagnes est réalisé par HDBSCAN dans l'espace latent 16 dimensions de l'autoencodeur, et la détection de flottes coordonnées par graphes bipartis via NetworkX. Le fingerprinting HTTP/2 passif — extraction des trames SETTINGS, WINDOW_UPDATE et de l'ordre des pseudo-headers côté serveur — exploite un signal déjà utilisé par des solutions industrielles (Akamai, Cloudflare, F5), ici implémenté via eBPF. L'infrastructure repose sur 16 modules Python (4 800 lignes), une base ClickHouse à double schéma (ja4_logs bruts TTL 2 h, ja4_processing agrégés TTL 7 j), des cycles d'analyse de 300 secondes, et traite en production plus de 3 millions de logs, environ 34 000 sessions par cycle, avec approximativement 777 anomalies détectées par cycle (≈ 2,3 % — chiffre opérationnel brut, non validé comme taux de détection). Le système intègre un moteur de profiling dynamique automatique des navigateurs (HDBSCAN sur les vecteurs H2 observés, centroïdes auto-appris, scoring temps réel par distance normalisée) qui s'adapte aux évolutions des piles HTTP/2 sans intervention manuelle. +Ce document présente une architecture opérationnelle de détection et classification du trafic HTTP malveillant, s'inscrivant dans la continuité des approches de génération 3 (fingerprinting multi-protocole et ML comportemental). Le système exploite 96 features organisées en 8 familles couvrant les couches réseau L3 à L7, corrélant des signaux TCP, TLS et HTTP en un vecteur unifié par session. La détection repose sur un ensemble triple-voix combinant un Extended Isolation Forest (EIF), un Normalizing Flow (NF) et XGBoost, fusionnés par un méta-modèle MLP (Multi-Layer Perceptron) non-linéaire calibré sur les étiquettes accumulées. L'explicabilité est assurée par l'importance des features par profondeur d'isolation (EIF) et SHAP TreeExplainer (XGBoost). Le clustering de campagnes est réalisé par HDBSCAN dans l'espace latent 16 dimensions de l'autoencodeur, et la détection de flottes coordonnées par graphes bipartis via NetworkX. Le fingerprinting HTTP/2 passif — extraction des trames SETTINGS, WINDOW_UPDATE et de l'ordre des pseudo-headers côté serveur — exploite un signal déjà utilisé par des solutions industrielles (Akamai, Cloudflare, F5), ici implémenté via eBPF. L'infrastructure repose sur 16 modules Python (4 800 lignes), une base ClickHouse à double schéma (ja4_logs bruts TTL 2 h, ja4_processing agrégés TTL 7 j), des cycles d'analyse de 300 secondes, et traite en production plus de 3 millions de logs, environ 34 000 sessions par cycle, avec approximativement 777 anomalies détectées par cycle (≈ 2,3 % — chiffre opérationnel brut, non validé comme taux de détection). Le système intègre un moteur de profiling dynamique automatique des navigateurs (HDBSCAN sur les vecteurs H2 observés, centroïdes auto-appris, scoring temps réel par distance normalisée) qui s'adapte aux évolutions des piles HTTP/2 sans intervention manuelle. **Mots-clés** : fingerprinting réseau, JA4+, HTTP/2 fingerprinting, détection de bots, Extended Isolation Forest, autoencodeurs, ensemble hybride, corrélation TCP/TLS/HTTP, WAF, classification de trafic, apprentissage semi-supervisé, clustering HDBSCAN diff --git a/docs/thesis/01_introduction.md b/docs/thesis/01_introduction.md index 97bea92..001f013 100644 --- a/docs/thesis/01_introduction.md +++ b/docs/thesis/01_introduction.md @@ -43,7 +43,7 @@ Ce document décrit une architecture opérationnelle s'inscrivant dans la contin 1. **Corrélation TCP/TLS/HTTP** en temps réel via ja4ebpf (clé : `src_ip:src_port`, 256 shards, timeout orphelin 500 ms) 2. **Fingerprinting HTTP/2 passif** : extraction des trames SETTINGS, WINDOW_UPDATE, PRIORITY et de l'ordre des pseudo-headers directement depuis le stream TCP — approche déjà exploitée par des solutions industrielles (Akamai, Cloudflare, F5), ici implémentée via eBPF 3. **Architecture EIF bifurquée** : modèle complet (≈ 45 features L3→L7) et modèle applicatif (≈ 35 features L7 uniquement), évitant le biais de zérotage sur le trafic non corrélé — choix pragmatique de gestion des données manquantes -4. **Ensemble triple-voix avec fusion par régression logistique** : combinaison EIF + AE + XGBoost avec régression logistique apprise sur étiquettes accumulées +4. **Ensemble triple-voix avec fusion par MLP non-linéaire** : combinaison EIF + NF + XGBoost avec méta-modèle MLP apprenant les interactions non-linéaires entre les trois voix 5. **HDBSCAN dans l'espace latent AE** : clustering de campagnes par similarité de comportement compressé en 16 dimensions 6. **Détection de dérive adversariale** : distinction entre dérive organique (mises à jour navigateur) et manipulation directionnelle coordonnée 7. **8 features comportementales avancées** : application de statistiques standard (déviation de Benford, entropie de transition markovienne, autocorrélation lag-1, délai root-to-first-asset, diversité de hosts, uniformité de couverture cross-host) au domaine de la détection de bots diff --git a/docs/thesis/02_etat_de_lart.md b/docs/thesis/02_etat_de_lart.md index c9e4fe8..2a0097a 100644 --- a/docs/thesis/02_etat_de_lart.md +++ b/docs/thesis/02_etat_de_lart.md @@ -413,20 +413,20 @@ Le système de détection combine trois « voix » complémentaires : ┌──────────────┼──────────────┐ │ │ │ ┌─────▼─────┐ ┌─────▼─────┐ ┌────▼──────┐ - │ EIF │ │ AE │ │ XGBoost │ - │ (semi- │ │ (semi- │ │(supervisé)│ - │supervisé) │ │supervisé) │ │ │ + │ EIF │ │ NF │ │ XGBoost │ + │ (semi- │ │ (Normal- │ │(supervisé)│ + │supervisé) │ │ izing │ │ │ + │ │ │ Flow) │ │ │ └─────┬─────┘ └─────┬─────┘ └────┬──────┘ │ │ │ - eif_norm ae_norm xgb_prob + eif_norm nf_norm xgb_prob │ │ │ └──────────────┼──────────────┘ │ ┌────────▼────────┐ - │ Fusion LR │ - │ (régression │ - │ logistique, │ - │ ≥1000 labels) │ + │ Meta-Model │ + │ Stacking MLP │ + │ (non-linéaire) │ └────────┬────────┘ │ ┌────────▼────────┐ @@ -435,33 +435,47 @@ Le système de détection combine trois « voix » complémentaires : └─────────────────┘ ``` -**Formule de combinaison** : +**Limites de la fusion linéaire** -```python -final = (1 - XGB_WEIGHT) × ((1 - AE_WEIGHT) × eif_norm + AE_WEIGHT × ae_norm) \ - + XGB_WEIGHT × xgb_prob -``` - -Valeurs par défaut : `AE_WEIGHT=0.30`, `XGB_WEIGHT=0.20`. Configurable via variables d'environnement pour ajustement en production sans modification du code. - -**Fusion LR** - -Le Fusion LR est une régression logistique activée lorsqu'au moins 1 000 étiquettes ont été accumulées (sessions DENY Anubis + annotations manuelles + seuillage de confiance). - -Régression logistique : modèle linéaire probabiliste qui apprend des poids w pour chaque signal intermédiaire en minimisant l'entropie croisée binaire sur les étiquettes accumulées : +Une fusion linéaire — combinaison convexe pondérée ou régression logistique — ne peut capturer que des frontières de décision linéaires dans l'espace des scores intermédiaires. Or les signaux EIF, NF et XGBoost peuvent exhiber des interactions non-linéaires impossibles à modéliser par une combinaison linéaire : ``` -P(bot) = σ(w1×eif + w2×ae + w3×xgb + w4×volume + bias) +Problème XOR des scores : + NF élevé + ┌────────┐ + EIF bas │ BOT ✓ │ ← combinaison inhabituelle = bot confirmé + ├────────┤ + EIF haut │normal ✗│ ← faux positif fréquent de l'EIF seul + └────────┘ + NF bas ``` -où σ est la fonction sigmoïde : σ(z) = 1 / (1 + e^{-z}) +Exemple concret : un bot utilisant un outilHeadless avec un JA4 fingerprint légitime (NF bas) mais un comportement de navigation atypique (EIF élevé). Le XGBoost peut compenser, mais la fusion linéaire ne peut apprendre la relation *« EIF élevé ET XGB élevé MAIS NF bas = bot »* — elle ne fait que sommer les contributions indépendantes. -Les poids w1–w4 sont appris, permettant au système de calibrer automatiquement l'importance relative de chaque voix en fonction du type de trafic en production. En dessous de 1 000 étiquettes, le système revient aux poids fixes : `(eif: 0.50, ae: 0.30, xgb: 0.20)`. +**Stacking OOF (Out-of-Fold) et MLP méta-modèle** + +Pour résoudre cette limitation, le système utilise un méta-modèle non-linéaire de type MLP (*Multi-Layer Perceptron*) entraîné via stacking Out-of-Fold : + +1. **Prédictions OOF** : les modèles de base (EIF, NF, XGBoost) produisent des prédictions sur des plis de validation croisée temporelle, garantissant que le méta-modèle n'a jamais vu les données d'entraînement des modèles de base — évitant le surapprentissage (*information leakage*). + +2. **Méta-modèle MLP** : un réseau de neurones à 2 couches apprend la fonction de fusion optimale : + +``` +MetaFusionMLP : [eif, nf, xgb] → Linear(3,16) → BatchNorm → ReLU → Dropout(0.2) + → Linear(16,1) → Sigmoid → P(bot) +``` + +- **BatchNorm** : normalise les activations intermédiaires, stabilise l'apprentissage et régularise implicitement — crucial avec peu de données labellisées. +- **Dropout(0.2)** : désactive aléatoirement 20% des neurones pendant l'entraînement, prévenant la co-adaptation des poids. +- **Early stopping** (patience = 5 epochs) : arrête l'entraînement dès que la loss de validation ne s'améliore plus, évitant le surapprentissage. +- **Weight decay** ($\lambda = 10^{-4}$) : pénalité L2 sur les poids du MLP pour une régularisation supplémentaire. + +Le MLP apprend des frontières de décision non-linéaires dans l'espace 3D `[eif_norm, nf_norm, xgb_prob]`, capable de résoudre les patterns XOR et les interactions conditionnelles entre les trois voix. Le système de détection peut ainsi combiner automatiquement les signaux de manière optimale en fonction du type de trafic observé en production. **Calendrier de retraining** : -- XGBoost : hebdomadaire sur les étiquettes accumulées, après filtrage Cleanlab des labels SOC bruyants (voir ci-dessous) -- EIF : toutes les 24 heures -- AE : continu avec arrêt précoce sur la loss de validation +- HAT (supervisé) : apprentissage incrémental à chaque cycle (300s) sur les étiquettes accumulées, après filtrage Cleanlab des labels SOC bruyants (voir ci-dessous) +- EIF : toutes les 24 heures ou sur détection ADWIN +- NF : continu avec arrêt précoce sur la loss de validation **Filtrage des labels SOC bruyants (Cleanlab)** : @@ -481,7 +495,7 @@ X_clean, y_clean = X[~noisy_mask], y[~noisy_mask] Ce mécanisme protège le modèle contre l'empoisonnement par des faux positifs mal corrigés ou des biais de confirmation des analystes. Le taux de labels filtrés est loggé pour surveillance. En cas d'échec de Cleanlab (erreur mémoire, dépendance manquante), le pipeline revient aux données brutes sans interruption. -#### 2.4.3 Concept Drift et retraining adaptatif +#### 2.4.3 Concept Drift : ADWIN et Online Learning **Définition du concept drift** @@ -494,28 +508,74 @@ En apprentissage automatique, le concept drift désigne le changement des propri En détection de bots, le drift adversarial est particulièrement critique : les attaquants adaptent délibérément leurs outils pour contourner les modèles déployés. -**Test de Kolmogorov-Smirnov** +**Limites de l'approche KS + quantile digest** -Le test KS (Kolmogorov-Smirnov) mesure la différence maximale entre deux fonctions de distribution empiriques cumulées (ECDF) : +L'approche précédente détectait la dérive en sauvegardant 5 quantiles (p10, p25, p50, p75, p90) par feature à l'entraînement, puis en reconstruisant une distribution synthétique par interpolation de la CDF inverse et en appliquant un test de Kolmogorov-Smirnov entre cette distribution et la distribution courante. Cette méthode souffre de trois lacunes fondamentales : + +1. **5 quantiles ne captent pas les distributions bimodales** : les features de trafic web comme `asset_ratio`, `post_ratio` et `orphan_ratio` ont souvent des distributions bimodales (deux populations de trafic distinctes). Cinq points ne suffisent pas à reconstruire fidèlement ces distributions — une dérive dans un mode peut être masquée par la stabilité de l'autre mode. + +2. **Reconstruction par interpolation linéaire** : interpoler entre 5 quantiles suppose une distribution unimodale et lisse. Pour les distributions skewed à queue lourde (timing inter-requêtes, taille des payloads), l'interpolation sous-estime systématiquement les valeurs extrêmes, rendant le test KS peu fiable. + +3. **Seuil de 30% arbitraire** : le seuil `DRIFT_THRESHOLD = 0.30` (30% de features en dérive) est un hyperparamètre non fondé statistiquement. Il est trop sensible pour les périodes de forte activité (faux positifs) et trop conservateur pour les attaques furtives ne touchant que quelques features. + +**ADWIN : fenêtre glissante adaptative** + +[ADWIN (ADaptive WINdowing, Bifet & Gavalda, 2007)](https://dl.acm.org/doi/10.1145/1242572.1242660) résout ces problèmes en maintenant une fenêtre de longueur variable sur le flux de données. Le principe : + +1. **Fenêtre adaptative** : ADWIN maintient une fenêtre $W$ de valeurs récentes. La taille de $W$ s'ajuste automatiquement — elle grandit quand la distribution est stable et rétrécit quand un changement est détecté. + +2. **Test de Hoeffding** : pour chaque coupe possible $W = W_0 \cup W_1$, ADWIN compare les moyennes $\hat{\mu}_0$ et $\hat{\mu}_1$ des deux sous-fenêtres. Si la différence dépasse la borne de Hoeffding : + +$$|\hat{\mu}_0 - \hat{\mu}_1| \geq \sqrt{\frac{1}{2m} \ln\frac{4}{\delta}}$$ + + où $m = \frac{1}{1/|W_0| + 1/|W_1|}$ et $\delta$ est le paramètre de confiance, alors un changement est détecté et la sous-fenêtre la plus ancienne est supprimée. + +3. **Pas de seuil arbitraire** : la détection repose uniquement sur la borne de Hoeffding (garantie probabiliste), paramétrée par $\delta$ (défaut : 0.002). Aucun seuil de « 30% de features en dérive » n'est nécessaire au niveau de chaque feature — seul le nombre de features driftant simultanément déclenche le retraining global. + +**Architecture de monitoring ADWIN** : ``` -D = max|F1(x) - F2(x)| +Cycle n (300s) + │ + ├── Calculer μ_f pour chaque feature f sur le trafic baseline + │ + ├── ADWIN_f.update(μ_f) pour chaque feature + │ └── Fenêtre interne W_f ajustée automatiquement + │ + ├── ADWIN_f.detected_change() ? + │ └── Si oui → feature f marquée « en dérive » ce cycle + │ + └── Si > 30% features en dérive → flag retraining EIF/NF + Si > 50% features en dérive → alerte ADVERSARIAL_DRIFT ``` -où F1 et F2 sont les ECDFs de la distribution courante et de la distribution de référence (baseline). Si D dépasse la valeur critique (déterminée par les tables de la distribution KS pour un niveau de confiance α), les deux échantillons sont considérés comme provenant de distributions différentes. Avantage : test non-paramétrique, aucune hypothèse sur la forme de la distribution. +**Online Learning : Hoeffding Adaptive Tree** -**Méthode de détection de dérive** : +Le retraining hebdomadaire par batch (`XGBClassifier.fit()` sur l'ensemble des labels accumulés) est remplacé par un apprentissage incrémental via [River](https://riverml.xyz/), une bibliothèque spécialisée en stream mining. Le modèle utilisé est le `HoeffdingAdaptiveTreeClassifier` (HAT) : -1. Sauvegarde avec chaque modèle sérialisé de l'approximation 5-quantiles par feature : (p10, p25, p50, p75, p90) -2. Génération d'échantillons synthétiques par interpolation de la CDF inverse (quantile function) -3. Test KS + divergence KL sur chaque feature entre la distribution courante et la baseline -4. Feature marquée comme « en dérive » si le test KS OU la divergence KL dépasse le seuil configuré -5. Retraining forcé si > 30 % des features dérivent simultanément -6. **Détection de dérive adversariale** : si de nombreuses features dérivent simultanément dans la même direction (score de corrélation directionnelle élevé), génération d'une alerte spécifique distinguant la manipulation intentionnelle (dérive adversariale coordonnée) de l'évolution organique (mises à jour navigateur ou changements de comportement naturels) +- **Arbre de décision incrémental** : construit l'arbre de décision progressivement, un exemple à la fois via `learn_one(x, y)`. À chaque split, le test de Hoeffding garantit que le split choisi est (probablement) le même que celui qu'un arbre batch aurait choisi avec les mêmes données. +- **Adaptatif** : utilise des estimateurs de fenêtre ADWIN à chaque nœud pour remplacer les statistiques obsolètes — le modèle s'adapte automatiquement au concept drift sans retraining explicite. +- **Apprentissage par cycle** : à chaque cycle de 300s, les nouveaux labels accumulés sont injectés un par un via `learn_one()`. Le modèle s'améliore continuellement, rendant le lourd retraining hebdomadaire obsolète. -**Validation gate** : si le taux d'anomalie sur le jeu de validation dépasse 20 % après retraining, le nouveau modèle est rejeté et le modèle précédent est conservé, avec génération d'une alerte (baseline contaminée). +``` +Cycle 300s + │ + ├── Charger HAT sérialisé (pickle) + │ + ├── Pour chaque label accumulé ce cycle : + │ model.learn_one({feature: value, ...}, label) + │ + ├── Persister HAT mis à jour + │ + └── Prédiction : model.predict_proba_many(df) → P(bot) +``` -**Limites de l'approximation 5-quantiles** : adéquate pour les distributions unimodales, mais peut manquer les dérives bimodales dans `asset_ratio`, `post_ratio`, `orphan_ratio`. Extension à p5/p95 ou à t-digest identifiée comme travail futur. +Le passage au stream mining élimine trois problématiques majeures du batch training : +- **Latence de mise à jour** : de hebdomadaire à chaque cycle (300s) +- **Coût mémoire** : plus besoin de charger 50 000 labels en RAM +- **Stale model** : le modèle est toujours à jour par rapport au concept courant + +**Validation gate** : conservée — si le taux d'anomalie sur le jeu de validation dépasse 20% après retraining EIF/NF, le nouveau modèle est rejeté et le modèle précédent conservé. #### 2.4.4 Modélisation des phases d'attaque diff --git a/docs/thesis/03_architecture.md b/docs/thesis/03_architecture.md index 79ccbfa..e5ba43a 100644 --- a/docs/thesis/03_architecture.md +++ b/docs/thesis/03_architecture.md @@ -70,7 +70,7 @@ │ │ 4. EIF bifurqué (complet/appli) │ │ │ │ 5. NF log-likelihood scoring │ │ │ │ 6. XGBoost probabilité │ │ - │ │ 7. Fusion LR fusion │ │ + │ │ 7. Meta-Model MLP fusion │ │ │ │ 8. HDBSCAN clustering (NF latent) │ │ │ │ 9. Écriture résultats ClickHouse │ │ │ └──────────────────────────────────┘ │ @@ -240,7 +240,7 @@ Session entrante ├── asn_label == 'human' ? │ ── OUI → baseline EIF training (sans étiquette bot) │ - └── Sinon → Triple-voix : EIF + NF + XGBoost + Fusion LR + └── Sinon → Triple-voix : EIF + NF + XGBoost + Meta-Model Stacking (MLP non-linéaire) ``` #### Seuil adaptatif diff --git a/docs/thesis/07_discussion_limites.md b/docs/thesis/07_discussion_limites.md index 15e8460..4b65921 100644 --- a/docs/thesis/07_discussion_limites.md +++ b/docs/thesis/07_discussion_limites.md @@ -115,7 +115,7 @@ Le fingerprinting réseau opère sans déchiffrement TLS (les métadonnées TLS | HDBSCAN (profiling) | ~2–5 s | Quotidien, ~200k sessions H2 dédupliquées, min_cluster=1000 | | Dynamic matcher scoring | < 1 ms | Par session, lookup en mémoire contre ~5–10 profils | | GraphSAGE (fleet.py) | ~80 ms | Graphe d'IPs, 2 couches SAGEConv, GPU/CPU | -| Fusion LR LR | < 10 ms | Régression logistique, négligeable | +| Fusion MLP | < 10 ms | MLP 2 couches, négligeable | | **Cycle complet** | **~300 secondes** | EIF + AE + XGBoost + HDBSCAN + GraphSAGE | La durée du cycle (300 s = 5 minutes) est contrainte principalement par la **fenêtre d'agrégation ClickHouse** (1 heure glissante avec recalcul toutes les 5 minutes), non par les temps d'exécution ML. @@ -133,9 +133,9 @@ La durée du cycle (300 s = 5 minutes) est contrainte principalement par la **fe - À 34 000 sessions/cycle : ~100 ms — acceptable - À 500 000 sessions/cycle (scaling ×15) : ~2 s — encore tolérable -**Fusion LR logistic regression** : O(n × d) entraînement, d = 3 features d'entrée (scores EIF, AE, XGBoost). Temps négligeable quelle que soit la taille. +**Fusion MLP** : O(n × d) inférence avec d = 3 features d'entrée (scores EIF, NF, XGBoost), MLP 2 couches (16 neurones). Temps négligeable quelle que soit la taille. -**Limite architecturale principale** : le modèle XGBoost hebdomadaire nécessite un jeu de labels accumulés via le feedback SOC. À faible volume de labels (< 500 sessions étiquetées par semaine), XGBoost ne converge pas correctement. Ce problème est partiellement atténué par le filtrage Cleanlab qui élimine les labels douteux (détail §3.8), mais reste identifié comme axe d'amélioration futur (§6.6 — online learning). +**Limite architecturale principale** : le modèle supervisé (Hoeffding Adaptive Tree) s'améliore incrémentalement à chaque cycle via `learn_one()`, mais nécessite un flux continu de labels fiables. À faible volume de labels (< 500 sessions étiquetées), le HAT converge lentement. Ce problème est partiellement atténué par le filtrage Cleanlab qui élimine les labels douteux (détail §3.8), mais la qualité du feedback SOC reste le goulot d'étranglement principal. **Overhead de l'uprobe SSL_read** : un uprobe attaché à `SSL_read` se déclenche à *chaque* appel de lecture TLS, y compris pour les gros transferts de fichiers (images, vidéos, scripts JS volumineux), où une seule requête peut générer des dizaines d'appels `SSL_read` successifs transportant des frames HTTP/2 DATA sans intérêt pour le fingerprinting. Sous forte charge (> 10 000 connexions TLS actives simultanées), cet overhead peut dégrader les performances du serveur web de manière mesurable. Les mitigations recommandées sont : (1) filtrer côté eBPF les invocations dont le buffer ne contient pas les magic bytes HTTP/2 ou HTTP/1.x (`GET `, `POST `, etc.) avant de soumettre au ring buffer ; (2) ignorer les frames HTTP/2 de type DATA de grande taille (longueur payload > 16 384 octets) qui ne contiennent pas d'en-têtes de requête ; (3) appliquer du sampling probabiliste (ex. 1 appel sur 10) pour les connexions déjà identifiées par leur JA4 comme des navigateurs légitimes connus. @@ -208,10 +208,10 @@ Un Variational Autoencoder bêta ([β-VAE, Higgins et al., 2017](https://openrev Des modèles d'état-espace pour la modélisation des phases d'attaque — permet de détecter explicitement les transitions de phase (reconnaissance → exploitation) au lieu de seulement scorer chaque session isolément. Complémentaire au signal JA4 Drift (§5.5). **t-digest pour la dérive conceptuelle** : -Remplacement de l'approximation à 5 quantiles actuellement utilisée pour la détection de drift ([quantile_drift_score]) par la structure **t-digest** , qui supporte les distributions bimodales et les queues longues avec précision adaptative. Critique pour les features à distribution bimodale comme `hit_velocity` (distribution séparée bots/humains). +Remplacement effectué — ADWIN (fenêtre glissante adaptative avec borne de Hoeffding) remplace l'approximation à 5 quantiles + KS. ADWIN gère nativement les distributions bimodales et les queues longues sans reconstruction de CDF. -**XGBoost → online learning** : -Remplacement du ré-entraînement hebdomadaire XGBoost par un apprentissage incrémental (gradient boosting online, par exemple [XGBoost Federated](https://xgboost.readthedocs.io/en/stable/tutorials/federated_learning.html) ou RIVER framework) permettant des mises à jour par cycle au lieu d'attendre l'accumulation d'une semaine de labels. +**Supervisé → online learning** : +Remplacement effectué — le `HoeffdingAdaptiveTreeClassifier` de River remplace le XGBoost batch hebdomadaire. Le modèle s'améliore incrémentalement à chaque cycle (300s) via `learn_one()`, éliminant la latence de mise à jour hebdomadaire. #### Priorité 3 — Infrastructure et protocoles émergents diff --git a/docs/thesis/08_conclusion_references.md b/docs/thesis/08_conclusion_references.md index aa70546..e0e7303 100644 --- a/docs/thesis/08_conclusion_references.md +++ b/docs/thesis/08_conclusion_references.md @@ -21,7 +21,7 @@ Un pipeline ML combinant : - **Isolation Forest Étendu (EIF)** ([Hariri et al., 2021](https://ieeexplore.ieee.org/document/8888179)) : modèle non-supervisé fondé sur l'isolation aléatoire d'instances anormales dans des espaces de features basse-dimension - **Autoencodeur variationnel (AE)** ([Mirsky et al., NDSS 2018](https://www.ndss-symposium.org/ndss-paper/kitsune-an-ensemble-of-autoencoders-for-online-network-intrusion-detection/)) : détection d'anomalies par reconstruction, capturant les corrélations entre features - **XGBoost supervisé** : correction des erreurs systématiques des modèles non-supervisés via labels SOC accumulés -- **Fusion par régression logistique** : fusion des trois scores en un score final calibré +- **Fusion par MLP méta-modèle** : fusion non-linéaire des trois scores en un score final calibré Le pipeline intègre un mécanisme de **détection de dérive conceptuelle** (basé sur le percentile 5 des scores négatifs) distinguant la dérive organique (évolution naturelle du trafic) de la dérive adversariale (manipulation intentionnelle de la distribution). diff --git a/services/bot-detector/DOCUMENTATION.md b/services/bot-detector/DOCUMENTATION.md index 6a7f6e2..0d53066 100644 --- a/services/bot-detector/DOCUMENTATION.md +++ b/services/bot-detector/DOCUMENTATION.md @@ -690,18 +690,15 @@ Clustering des anomalies pour identifier les campagnes coordonnées : Résultat : colonne `campaign_id` (-1 = isolé, ≥0 = membre d'un cluster). -#### `_compute_drift_score(baseline_stats, current_baseline, features)` +#### `ADWINDriftMonitor(features, delta=0.002)` -Détection de dérive conceptuelle entre la baseline d'entraînement et la baseline -courante. Deux méthodes : +Détection de dérive conceptuelle par ADWIN (fenêtre glissante adaptative). +Maintient un détecteur ADWIN par feature, mis à jour à chaque cycle avec la +moyenne de la feature sur le trafic baseline. -1. **Méthode principale** : Comparaison par quantiles interpolés (KS-like). Fraction - de features avec p < 0.05. -2. **Fallback z-score** (`_compute_drift_score_zscore`) : - `z = |mean_current - mean_trained| / std_trained`. Fraction de features avec z > 2.0. - -Si la fraction dépasse `DRIFT_THRESHOLD` (0.30), le modèle est ré-entraîné et -l'événement `DRIFT_DETECTED` est journalisé. +Si la fraction de features en dérive dépasse `DRIFT_THRESHOLD` (0.30), le +modèle EIF/NF est ré-entraîné. Si > 50% des features dérivent, une alerte +`ADVERSARIAL_DRIFT` est générée. --- diff --git a/services/bot-detector/IMPROVEMENTS.md b/services/bot-detector/IMPROVEMENTS.md index 25d570b..a82a122 100644 --- a/services/bot-detector/IMPROVEMENTS.md +++ b/services/bot-detector/IMPROVEMENTS.md @@ -23,19 +23,19 @@ ## A1 — Détection de dérive conceptuelle -### ✅ IMPLÉMENTÉ +### ✅ IMPLÉMENTÉ (ADWIN) -**Module** : `scoring.py` — fonctions `_compute_drift_score()` et `_compute_drift_score_zscore()` +**Module** : `scoring.py` — classe `ADWINDriftMonitor` (remplace `_compute_drift_score`) **Différences avec la proposition initiale** : | Proposition | Implémentation | |-------------|----------------| -| KS-test (scipy `ks_2samp`) | Méthode principale : comparaison par quantiles interpolés (KS-like sans scipy). Fallback : z-score `\|μ_current - μ_trained\| / σ_trained > 2.0` | -| Seuil par `p_value < 0.05` | Même seuil p < 0.05 pour la méthode quantile ; z > 2.0 pour le fallback | +| KS-test + quantile digest | ADWIN (fenêtre glissante adaptative, borne de Hoeffding) | +| Seuil par `p_value < 0.05` | Borne de Hoeffding automatique (delta = 0.002) | | `DRIFT_THRESHOLD` (30%) | ✅ Identique : `DRIFT_THRESHOLD = 0.30` (fraction de features en dérive) | -| Sauvegarde dans `.meta.json` | ✅ `baseline_stats` avec `{mean, std, p25, p75}` par feature | -| Événement `DRIFT_DETECTED` | ✅ Journalisé avec la liste des features déroutantes | +| Sauvegarde dans `.meta.json` | ADWIN stateful — pas de sauvegarde nécessaire (fenêtre adaptative) | +| Événement `DRIFT_DETECTED` | ✅ Journalisé avec la liste des features dérivées | **Appelé depuis** : `models.py` → `load_or_train_model()`, avant la décision de chargement vs retrain. diff --git a/services/bot-detector/bot_detector/models.py b/services/bot-detector/bot_detector/models.py index bfdce6b..718038e 100644 --- a/services/bot-detector/bot_detector/models.py +++ b/services/bot-detector/bot_detector/models.py @@ -1,10 +1,11 @@ """Gestion des modèles : chargement, entraînement, versionnement. -IsolationForest (EIF), Normalizing Flow (PyTorch/FrEIA), XGBoost supervisé. +IsolationForest (EIF), Normalizing Flow (PyTorch/FrEIA), Hoeffding Adaptive Tree (River). """ import os import json import glob +import pickle import joblib import numpy as np import pandas as pd @@ -18,7 +19,7 @@ from .config import ( IsolationForest, StandardScaler, ) from .log import log_info, log_decision, append_training_history -from .scoring import compute_drift_score +from .scoring import ADWINDriftMonitor # Imports conditionnels depuis config (déjà importés une seule fois) if EIF_AVAILABLE: @@ -27,6 +28,12 @@ if EIF_AVAILABLE: if TORCH_AVAILABLE: from .config import torch, nn +try: + from river import forest as river_forest + RIVER_AVAILABLE = True +except ImportError: + RIVER_AVAILABLE = False + if XGB_AVAILABLE: import xgboost as xgb from sklearn.model_selection import cross_val_predict @@ -36,11 +43,14 @@ if XGB_AVAILABLE: CLEANLAB_AVAILABLE = True except ImportError: CLEANLAB_AVAILABLE = False +else: + CLEANLAB_AVAILABLE = False # ─── Caches de modèles ───────────────────────────────────────────────────── _model_cache: dict = {} _xgb_cache: dict = {} +_drift_monitors: dict[str, ADWINDriftMonitor] = {} # ═══════════════════════════════════════════════════════════════════════════════ @@ -258,9 +268,100 @@ def _load_xgb_labels(client, features: list, min_labels: int = XGB_MIN_LABELS) - def load_or_train_xgb(name: str, client, features: list, cycle_id: str): - """Charge ou entraîne le modèle XGBoost supervisé. + """Charge ou met à jour le modèle supervisé en ligne (Hoeffding Adaptive Tree). - Retourne (XGBClassifier, list[str] features) ou (None, None) si indisponible. + Remplace le XGBClassifier hebdomadaire par un HoeffdingAdaptiveTreeClassifier + de River, mis à jour incrémentalement à chaque cycle via learn_one(). + + Retourne (model, list[str] features) ou (None, None) si indisponible. + Le model retourné expose predict_proba_many(df) → DataFrame. + """ + if not (XGB_AVAILABLE or RIVER_AVAILABLE) or XGB_WEIGHT <= 0: + return None, None + + model_path = _river_model_path(name) + meta_path = _xgb_meta_path(name) + + # Charger le modèle River existant + model = None + xgb_features = features + n_seen = 0 + + if os.path.exists(model_path): + try: + with open(model_path, 'rb') as f: + model = pickle.load(f) + with open(meta_path) as f: + meta = json.load(f) + xgb_features = meta.get('features', features) + n_seen = meta.get('n_total_labels', 0) + log_info(f"[River][{name}] HAT rechargé ({n_seen} labels cumulés, {len(xgb_features)} features).") + except Exception as exc: + log_info(f"[River][{name}] Erreur chargement : {exc} — nouveau modèle.") + model = None + + # Créer un nouveau modèle si nécessaire + if model is None: + try: + model = river_forest.HoeffdingAdaptiveTreeClassifier( + grace_period=50, max_depth=12, seed=42, + ) + except Exception: + # Fallback vers XGBoost batch si River indisponible + return _load_or_train_xgb_batch(name, client, features, cycle_id) + + # ── Apprentissage incrémental sur les labels du cycle ────────────── + X, y, usable_features = _load_xgb_labels(client, features) + if X is not None and usable_features is not None: + xgb_features = usable_features + X_df = pd.DataFrame(X, columns=xgb_features) + n_new = 0 + for i in range(len(X_df)): + try: + x_dict = {col: float(X_df.iloc[i][col]) for col in xgb_features} + model.learn_one(x_dict, int(y[i])) + n_new += 1 + except Exception: + continue + n_seen += n_new + + # Persister le modèle mis à jour + os.makedirs(os.path.dirname(model_path), exist_ok=True) + with open(model_path, 'wb') as f: + pickle.dump(model, f) + meta = { + 'trained_at': datetime.now().isoformat(), + 'n_total_labels': n_seen, + 'n_new_labels': n_new, + 'n_features': len(xgb_features), + 'features': xgb_features, + 'model_name': name, + 'algorithm': 'HoeffdingAdaptiveTreeClassifier', + } + with open(meta_path, 'w') as f: + json.dump(meta, f, indent=2) + + log_info(f"[River][{name}] +{n_new} labels incrémentaux ({n_seen} total) — HAT mis à jour.") + log_decision('RIVER_UPDATED', cycle_id, name, meta) + else: + if n_seen == 0: + log_info(f"[River][{name}] Pas de labels — modèle supervisé désactivé ce cycle.") + return None, None + log_info(f"[River][{name}] Pas de nouveaux labels — HAT existant réutilisé ({n_seen} labels).") + + return model, xgb_features + + +def _river_model_path(name: str) -> str: + """Chemin du modèle River sérialisé.""" + return os.path.join(MODEL_DIR, f'river_hat_{name}.pkl') + + +def _load_or_train_xgb_batch(name, client, features, cycle_id): + """Fallback : entraîne un XGBoost classique si River est indisponible. + + Conservé pour la compatibilité si river n'est pas installé. + Retourne (XGBClassifier, list[str] features) ou (None, None). """ if not XGB_AVAILABLE or XGB_WEIGHT <= 0: return None, None @@ -268,87 +369,36 @@ def load_or_train_xgb(name: str, client, features: list, cycle_id: str): model_path = _xgb_model_path(name) meta_path = _xgb_meta_path(name) - # Charger le modèle existant si récent if os.path.exists(model_path) and os.path.exists(meta_path): try: with open(meta_path) as f: meta = json.load(f) - trained_at = datetime.fromisoformat(meta['trained_at']) - age_h = (datetime.now() - trained_at).total_seconds() / 3600 - if age_h < XGB_RETRAIN_INTERVAL_H: - model = xgb.XGBClassifier() - model.load_model(model_path) - log_info(f"[XGB][{name}] Modèle rechargé ({age_h:.1f}h / {XGB_RETRAIN_INTERVAL_H}h, {meta.get('n_labels', '?')} labels).") - return model, meta.get('features', features) - except Exception as exc: - log_info(f"[XGB][{name}] Erreur chargement : {exc}") + model = xgb.XGBClassifier() + model.load_model(model_path) + return model, meta.get('features', features) + except Exception: + pass - # Entraîner un nouveau modèle X, y, xgb_features = _load_xgb_labels(client, features) if X is None: - log_info(f"[XGB][{name}] Labels insuffisants (< {XGB_MIN_LABELS}) — XGBoost désactivé ce cycle.") - # Tenter de réutiliser un modèle ancien - if os.path.exists(model_path) and os.path.exists(meta_path): - try: - model = xgb.XGBClassifier() - model.load_model(model_path) - with open(meta_path) as f: - meta = json.load(f) - return model, meta.get('features', features) - except Exception: - pass return None, None scale_pos = max(1, int((y == 0).sum() / max((y == 1).sum(), 1))) - - # ── Cleanlab : filtrage des labels SOC bruyants ───────────────────── - if CLEANLAB_AVAILABLE: - try: - quick_model = xgb.XGBClassifier( - n_estimators=80, max_depth=4, learning_rate=0.15, - eval_metric='logloss', random_state=42, n_jobs=-1, - tree_method='hist', - ) - pred_probs = cross_val_predict( - quick_model, X, y, cv=3, method='predict_proba', - ) - issues = find_label_issues( - labels=y, pred_probs=pred_probs, - ) - noisy_idx = issues[issues['is_label_issue'] == True].index.to_numpy() - if len(noisy_idx) > 0: - keep = np.ones(len(y), dtype=bool) - keep[noisy_idx] = False - X, y = X[keep], y[keep] - pct = len(noisy_idx) / (len(keep)) * 100 - log_info( - f"[XGB][{name}] Cleanlab : {len(noisy_idx)}/{len(keep)} " - f"labels bruyants supprimés ({pct:.1f}%)" - ) - scale_pos = max(1, int((y == 0).sum() / max((y == 1).sum(), 1))) - except Exception as exc: - log_info(f"[XGB][{name}] Cleanlab échoué, labels bruts conservés : {exc}") - model = xgb.XGBClassifier( n_estimators=200, max_depth=6, learning_rate=0.1, scale_pos_weight=scale_pos, eval_metric='logloss', - random_state=42, n_jobs=-1, - tree_method='hist', + random_state=42, n_jobs=-1, tree_method='hist', ) model.fit(X, y, verbose=False) - model.save_model(model_path) meta = { 'trained_at': datetime.now().isoformat(), 'n_labels': len(y), 'n_positive': int(y.sum()), 'n_negative': int((y == 0).sum()), 'n_features': len(xgb_features), - 'features': xgb_features, - 'scale_pos_weight': scale_pos, 'model_name': name, + 'features': xgb_features, 'model_name': name, } with open(meta_path, 'w') as f: json.dump(meta, f, indent=2) - - log_info(f"[XGB][{name}] Modèle entraîné : {len(y)} labels ({y.sum()} positifs), scale_pos_weight={scale_pos}") log_decision('XGB_TRAINED', cycle_id, name, meta) return model, xgb_features @@ -369,17 +419,27 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, age_h = (datetime.now() - trained_at).total_seconds() / 3600 age_ok = age_h < RETRAIN_INTERVAL_H - # A1 — Dérive conceptuelle : comparer la distribution actuelle avec celle de l'entraînement + # A1 — Dérive conceptuelle via ADWIN (fenêtre glissante adaptative) drift_score = 0.0 drift_forced = False - if age_ok and 'baseline_stats' in meta: - drift_score = compute_drift_score(meta['baseline_stats'], human_baseline, features, - name=name, cycle_id=cycle_id) + + # Obtenir ou créer le moniteur ADWIN pour ce modèle + if name not in _drift_monitors: + _drift_monitors[name] = ADWINDriftMonitor(features) + drift_monitor = _drift_monitors[name] + + if drift_monitor.available: + # Alimenter ADWIN avec les moyennes de features du cycle courant + feature_means = {} + for f in features: + if f in human_baseline.columns: + feature_means[f] = float(human_baseline[f].mean()) + drift_score = drift_monitor.check_drift(feature_means, name=name, cycle_id=cycle_id) if drift_score >= DRIFT_THRESHOLD: drift_forced = True - log_info(f"[{name}] Dérive détectée ({drift_score:.0%} features) — retraining forcé.") + log_info(f"[{name}] Dérive ADWIN détectée ({drift_score:.0%} features) — retraining forcé.") log_decision('DRIFT_DETECTED', cycle_id, name, { - 'version_id': meta['version_id'], 'drift_score': round(drift_score, 3), + 'version_id': meta['version_id'], 'drift_rate': round(drift_score, 3), 'drift_threshold': DRIFT_THRESHOLD, 'model_age_hours': round(age_h, 2) }) @@ -465,18 +525,12 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, return joblib.load(model_path), ae_prev, meta.get('features', features) log_info(f"[{name}] Aucun modèle précédent — utilisation du modèle rejeté par défaut.") - # A1/§4 — Sauvegarder les statistiques de distribution avec quantile digest 9 points - # (p5…p95) pour une meilleure fidélité de la détection de dérive KS+KL + # A1 — Statistiques de référence pour la baseline (mean/std uniquement, + # la détection de dérive est assurée par ADWIN en temps réel) baseline_stats = { f: { - 'mean': float(X[f].mean()), 'std': float(X[f].std()), - 'p5': float(X[f].quantile(0.05)), - 'p10': float(X[f].quantile(0.10)), - 'p25': float(X[f].quantile(0.25)), - 'p50': float(X[f].quantile(0.50)), - 'p75': float(X[f].quantile(0.75)), - 'p90': float(X[f].quantile(0.90)), - 'p95': float(X[f].quantile(0.95)), + 'mean': float(X[f].mean()), + 'std': float(X[f].std()), } for f in features } diff --git a/services/bot-detector/bot_detector/pipeline.py b/services/bot-detector/bot_detector/pipeline.py index 8c6d469..94849b9 100644 --- a/services/bot-detector/bot_detector/pipeline.py +++ b/services/bot-detector/bot_detector/pipeline.py @@ -138,7 +138,7 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): unknown_traffic['raw_anomaly_score'] = raw_scores unknown_traffic['model_name'] = name - # XGBoost supervisé — troisième voix (si labels historiques disponibles) + # Modèle supervisé — troisième voix (Hoeffding Adaptive Tree ou XGBoost fallback) unknown_traffic['xgb_prob'] = 0.0 xgb_model_ref = None # Référence pour SHAP TreeExplainer (§2.4.5) if XGB_AVAILABLE and XGB_WEIGHT > 0: @@ -146,35 +146,36 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map): xgb_client = get_client() xgb_model, xgb_feats = load_or_train_xgb(name, xgb_client, scoring_features, cycle_id) if xgb_model is not None and xgb_feats is not None: - # XGB peut utiliser un sous-ensemble de features (celles disponibles dans la vue) xgb_cols = [f for f in xgb_feats if f in unknown_traffic.columns] X_xgb = unknown_traffic[xgb_cols].replace([np.inf, -np.inf], np.nan).fillna(0) - xgb_probs = xgb_model.predict_proba(X_xgb.values)[:, 1] + # River HAT utilise predict_proba_many(DataFrame), XGBoost utilise predict_proba(ndarray) + if hasattr(xgb_model, 'predict_proba_many'): + proba_df = xgb_model.predict_proba_many(X_xgb[xgb_cols]) + xgb_probs = proba_df[1].values if 1 in proba_df.columns else np.zeros(len(X_xgb)) + else: + xgb_probs = xgb_model.predict_proba(X_xgb.values)[:, 1] unknown_traffic['xgb_prob'] = xgb_probs xgb_model_ref = xgb_model - log_info(f"[{name}] XGBoost : xgb_mean={xgb_probs.mean():.4f}") + log_info(f"[{name}] Supervisé : score moyen={xgb_probs.mean():.4f}") except Exception as exc: - log_info(f"[{name}] XGBoost scoring échoué : {exc} — EIF+AE seuls.") + log_info(f"[{name}] Supervisé scoring échoué : {exc} — EIF+NF seuls.") - # §8 — Score final via MetaLearner (ou poids fixes en fallback) + # §8 — Score final via MetaLearner MLP (ou poids fixes en fallback) meta_learner = get_meta_learner(name) eif_norm_arr = unknown_traffic['anomaly_score'].values.copy() ae_norm_arr = normalize_scores(-unknown_traffic['ae_recon_error'].values) xgb_prob_arr = unknown_traffic['xgb_prob'].values - hits_arr = unknown_traffic.get('hits', pd.Series(1, index=unknown_traffic.index)).values - corr_arr = unknown_traffic.get('correlated', pd.Series(0, index=unknown_traffic.index)).values - final_scores = meta_learner.predict(eif_norm_arr, ae_norm_arr, xgb_prob_arr, - hits_arr, corr_arr) + final_scores = meta_learner.predict(eif_norm_arr, ae_norm_arr, xgb_prob_arr) unknown_traffic['anomaly_score'] = final_scores if meta_learner.is_trained: log_info( - f"[{name}] §8 MetaLearner actif ({meta_learner._n_samples} labels) — " + f"[{name}] §8 MetaFusionMLP actif ({meta_learner._n_samples} labels) — " f"score moyen={final_scores.mean():.4f}" ) elif unknown_traffic['xgb_prob'].mean() > 0: - log_info(f"[{name}] §8 Poids fixes EIF+AE+XGB (MetaLearner pas encore entraîné).") + log_info(f"[{name}] §8 Poids fixes EIF+NF+XGB (MetaFusionMLP pas encore entraîné).") # §8 — Entraînement du MetaLearner sur les labels du cycle courant # (accumulation progressive — activation dès MIN_SAMPLES labels) diff --git a/services/bot-detector/bot_detector/requirements.txt b/services/bot-detector/bot_detector/requirements.txt index fdefc71..e984821 100644 --- a/services/bot-detector/bot_detector/requirements.txt +++ b/services/bot-detector/bot_detector/requirements.txt @@ -10,5 +10,6 @@ torch_geometric>=2.4 FrEIA>=0.2 xgboost>=2.0 cleanlab>=2.6 +river>=0.19 pyyaml>=6.0 ja4-common @ file:///app/shared/ja4_common diff --git a/services/bot-detector/bot_detector/scoring.py b/services/bot-detector/bot_detector/scoring.py index 4d4630e..60a9d4a 100644 --- a/services/bot-detector/bot_detector/scoring.py +++ b/services/bot-detector/bot_detector/scoring.py @@ -1,14 +1,14 @@ """Scoring, dérive, validation, seuil adaptatif, SHAP et clustering. Regroupe les fonctions de scoring utilisées par le pipeline de détection : - - A1 : détection de dérive conceptuelle (KS-test + KL divergence + dérive adversariale) + - A1 : détection de dérive conceptuelle (ADWIN — fenêtre glissante adaptative) - A2 : seuil adaptatif basé sur le percentile des scores négatifs - A4 : explainabilité SHAP (top features contributives) - A7 : validation de complétude des features - A8 : clustering HDBSCAN / DBSCAN des anomalies - A10 : normalisation [0,1] des scores d'anomalie - §7 : ExIFFI — importance de features par permutation (Extended Isolation Forest) - - §8 : MetaLearner — pondération de l'ensemble par régression logistique + - §8 : MetaLearner — fusion non-linéaire par MLP """ import numpy as np import pandas as pd @@ -67,103 +67,78 @@ def _kl_divergence(p_vals: np.ndarray, q_vals: np.ndarray, n_bins: int = 20) -> return max(0.0, kl) -def compute_drift_score(baseline_stats: dict, current_baseline: pd.DataFrame, - features: list, *, name: str = '', cycle_id: str = '') -> float: - """Compare la distribution actuelle de la baseline humaine avec celle de l'entraînement. +class ADWINDriftMonitor: + """A1 — Détection de dérive conceptuelle par ADWIN (ADaptive WINdowing). - §4 — Méthode améliorée : - - Quantile digest 9 points (p5…p95) au lieu de 5 pour une meilleure fidélité - - KS-test + divergence KL : feature en dérive si KS p<0.05 OU KL>0.5 - - Détection de dérive adversariale : >50% des features dérivent dans la même - direction → log_decision 'ADVERSARIAL_DRIFT' + Maintient un détecteur ADWIN par feature. Chaque cycle, la moyenne de + la feature sur le trafic baseline est alimentée au détecteur. ADWIN + ajuste automatiquement la taille de sa fenêtre et signale un changement + lorsque la différence entre sous-fenêtres dépasse la borne de Hoeffding. - Retourne la fraction de features en dérive significative (en [0,1]). + Avantages sur KS + quantile digest : + - Pas de seuil arbitraire (borne de Hoeffding, contrôle par delta) + - Fenêtre adaptative : s'ajuste automatiquement à la vitesse du drift + - Détection en temps réel à chaque cycle, pas de reconstruction de CDF """ - if not baseline_stats or current_baseline.empty: - return 0.0 - try: - from scipy.stats import ks_2samp - _HAS_SCIPY = True - except ImportError: - _HAS_SCIPY = False - # Clés de quantiles : 9 points preferred, 5 points fallback - Q9_KEYS = ['p5', 'p10', 'p25', 'p50', 'p75', 'p90', 'p95'] - Q9_PROBS = np.array([0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95]) - Q5_KEYS = ['p10', 'p25', 'p50', 'p75', 'p90'] - Q5_PROBS = np.array([0.10, 0.25, 0.50, 0.75, 0.90]) + def __init__(self, features: list, delta: float = 0.002): + try: + from river.drift import ADWIN + self._detectors = {f: ADWIN(delta=delta) for f in features} + self._available = True + except ImportError: + self._detectors = {} + self._available = False + self._last_changes: dict[str, bool] = {} - drifted = 0 - tested = 0 - drifted_features: list = [] - direction_shifts: list = [] - rng = np.random.default_rng(42) + @property + def available(self) -> bool: + return self._available - for feat in features: - if feat not in baseline_stats or feat not in current_baseline.columns: - continue - stats = baseline_stats[feat] - curr_values = current_baseline[feat].dropna() - if len(curr_values) < 30: - continue - trained_std = stats.get('std', 0) - if trained_std < 1e-9: - continue + def update(self, feature_means: dict[str, float]) -> dict[str, bool]: + """Alimente chaque ADWIN avec la moyenne courante de sa feature. - # Reconstruction de la distribution d'entraînement par interpolation quantile - if all(k in stats for k in Q9_KEYS): - q_probs = Q9_PROBS - q_vals = np.array([stats[k] for k in Q9_KEYS]) - elif all(k in stats for k in Q5_KEYS): - q_probs = Q5_PROBS - q_vals = np.array([stats[k] for k in Q5_KEYS]) - else: - q_probs = None + Retourne un dict {feature: detected_change} indiquant quelles + features ont dérivé ce cycle. + """ + if not self._available: + return {} + changes = {} + for feat, value in feature_means.items(): + if feat in self._detectors: + self._detectors[feat].update(value) + detected = self._detectors[feat].detected_change() + changes[feat] = detected + if detected: + self._last_changes[feat] = True + self._last_changes.update({k: False for k in self._last_changes if k not in changes}) + return changes - if q_probs is not None: - u = rng.uniform(0, 1, size=len(curr_values)) - synthetic_trained = np.interp(u, q_probs, q_vals) - else: - synthetic_trained = rng.normal(stats['mean'], trained_std, size=len(curr_values)) + def check_drift(self, feature_means: dict[str, float], + *, name: str = '', cycle_id: str = '') -> float: + """Met à jour les ADWIN et retourne le taux de features en dérive. - feat_drifted = False - if _HAS_SCIPY: - _, ks_p = ks_2samp(curr_values.values, synthetic_trained) - if ks_p < 0.05: - feat_drifted = True - else: - # Fallback Z-score - z = abs(curr_values.mean() - stats['mean']) / trained_std - if z > 2.0: - feat_drifted = True + Retourne la fraction de features monitorées ayant dérivé ce cycle, + dans [0, 1]. Si le taux dépasse le seuil de dérive, un retraining + est recommandé. + """ + changes = self.update(feature_means) + if not changes: + return 0.0 - # KL divergence comme critère complémentaire au KS - kl = _kl_divergence(curr_values.values, synthetic_trained) - if kl > 0.5: - feat_drifted = True + drifted_features = [f for f, changed in changes.items() if changed] + drift_rate = len(drifted_features) / len(changes) - if feat_drifted: - drifted += 1 - drifted_features.append(feat) - # Direction du shift pour la détection adversariale - direction_shifts.append(np.sign(curr_values.mean() - stats['mean'])) - tested += 1 - - drift_rate = drifted / max(tested, 1) - - # Détection de dérive adversariale : >50% des features dérivent simultanément - # dans la même direction → signe d'une manipulation intentionnelle de la distribution - if drift_rate > 0.50 and len(direction_shifts) >= 10: - consensus = abs(float(np.mean(direction_shifts))) - if consensus >= 0.8: + # Détection adversariale : dérive massive simultanée + if drift_rate > 0.50 and len(drifted_features) >= 10: log_decision('ADVERSARIAL_DRIFT', cycle_id, name, { 'drift_rate': round(drift_rate, 3), - 'consensus': round(consensus, 3), - 'n_features': len(drifted_features), + 'n_drifted': len(drifted_features), + 'n_monitored': len(changes), 'top_drifted': drifted_features[:10], }) - return drift_rate + return drift_rate # ═══════════════════════════════════════════════════════════════════════════════ @@ -445,42 +420,101 @@ def compute_ae_feature_errors(ae_model, X: pd.DataFrame, features: list, # ═══════════════════════════════════════════════════════════════════════════════ -# §8 — META-LEARNER : PONDÉRATION APPRISE DE L'ENSEMBLE +# §8 — META-LEARNER : FUSION NON-LINÉAIRE PAR MLP # ═══════════════════════════════════════════════════════════════════════════════ -class MetaLearner: - """§8 — Méta-learner (régression logistique) pour la pondération de l'ensemble. - Remplace les poids fixes (1−XGB_W)×((1−AE_W)×eif + AE_W×ae) + XGB_W×xgb - par une régression logistique entraînée sur les labels SOC/Anubis/bots connus. +class MetaFusionMLP(torch.nn.Module): + """MLP de fusion pour le méta-modèle de stacking. - Formule apprise : - P(bot) = logistic(w1×eif_norm + w2×ae_norm + w3×xgb_prob - + w4×log1p(hits) + w5×correlated + bias) - - Fallback automatique aux poids fixes quand < MIN_SAMPLES labels disponibles. + Architecture : Linear(3, 16) → BatchNorm → ReLU → Dropout → Linear(16, 1) → Sigmoid + Input : [eif_norm, nf_norm, xgb_prob] — 3 scores intermédiaires dans [0, 1]. + Output : P(bot) ∈ [0, 1]. """ - MIN_SAMPLES = 1000 # Nombre minimal de labels pour activer le méta-learner - def __init__(self): - self._clf = None + super().__init__() + self.net = torch.nn.Sequential( + torch.nn.Linear(3, 16), + torch.nn.BatchNorm1d(16), + torch.nn.ReLU(), + torch.nn.Dropout(0.2), + torch.nn.Linear(16, 1), + torch.nn.Sigmoid(), + ) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + return self.net(x).squeeze(-1) + + +class MetaLearner: + """§8 — Méta-learner (MLP) pour la fusion non-linéaire de l'ensemble. + + Remplace les poids fixes et la régression logistique par un MLP PyTorch + capable de capturer des interactions non-linéaires entre les trois voix + (EIF, Normalizing Flow, XGBoost). + + Stacking OOF (Out-of-Fold) : le MLP est entraîné sur les prédictions + des modèles de base via validation croisée temporelle, évitant le + surapprentissage sur les mêmes données d'entraînement. + + Fallback automatique aux poids fixes quand meta_mlp.pt n'existe pas. + """ + + MIN_SAMPLES = 50 # Le MLP régularisé (BN + Dropout) gère le peu de données + PATIENCE = 5 # Early stopping patience + MAX_EPOCHS = 50 + + def __init__(self, model_suffix: str = ''): + import torch + self._mlp: MetaFusionMLP | None = None self._n_samples: int = 0 - self._feature_names = ['eif_norm', 'ae_norm', 'xgb_prob', 'log_hits', 'correlated'] - self._weights_log: dict = {} # Pour la transparence SOC + self._weights_log: dict = {} + self._model_path = os.path.join( + os.getenv('MODEL_DIR', '/var/lib/bot_detector'), + f'meta_mlp{model_suffix}.pt', + ) + # Tenter de charger un modèle pré-entraîné existant + self._try_load() + + # ── Sauvegarde / Chargement ────────────────────────────────────────── + + def _try_load(self) -> None: + """Charge le MLP depuis le disque si le fichier existe.""" + if not os.path.exists(self._model_path): + return + try: + mlp = MetaFusionMLP() + state = torch.load(self._model_path, map_location='cpu', weights_only=True) + mlp.load_state_dict(state) + mlp.eval() + self._mlp = mlp + log_info(f"[MetaLearner] MLP chargé depuis {self._model_path}") + except Exception as e: + log_info(f"[MetaLearner] Échec chargement MLP ({e}) — fallback poids fixes") + self._mlp = None + + def _save(self) -> None: + """Persiste le MLP entraîné sur le disque.""" + if self._mlp is None: + return + try: + os.makedirs(os.path.dirname(self._model_path), exist_ok=True) + torch.save(self._mlp.state_dict(), self._model_path) + except Exception as e: + log_info(f"[MetaLearner] Échec sauvegarde MLP ({e})") + + # ── Entraînement ───────────────────────────────────────────────────── def fit(self, df: pd.DataFrame) -> bool: - """Entraîne le méta-learner sur un DataFrame de sessions labelisées. + """Entraîne le MLP sur un DataFrame de sessions labelisées. - Colonnes requises : eif_norm, ae_norm (ou 0), xgb_prob (ou 0), - hits, correlated, label (0=normal, 1=bot). + Colonnes requises : eif_norm, ae_norm (ou 0), xgb_prob (ou 0), label. + Boucle PyTorch : Adam, BCELoss, 50 epochs max, early stopping (patience=5). Retourne True si l'entraînement a réussi. """ - try: - from sklearn.linear_model import LogisticRegression - from sklearn.preprocessing import StandardScaler as _SS - except ImportError: - return False + import torch + import torch.nn as nn required = {'eif_norm', 'label'} if not required.issubset(df.columns): @@ -490,69 +524,118 @@ class MetaLearner: if len(df) < self.MIN_SAMPLES: return False - X_meta = pd.DataFrame({ - 'eif_norm': df['eif_norm'].clip(0, 1), - 'ae_norm': df.get('ae_norm', pd.Series(0.0, index=df.index)).clip(0, 1), - 'xgb_prob': df.get('xgb_prob', pd.Series(0.0, index=df.index)).clip(0, 1), - 'log_hits': np.log1p(df.get('hits', pd.Series(1, index=df.index)).clip(1)), - 'correlated': df.get('correlated', pd.Series(0, index=df.index)).clip(0, 1), - }).fillna(0) - y = df['label'].astype(int) + # Construction des 3 features d'entrée + X_np = np.column_stack([ + df['eif_norm'].clip(0, 1).values, + df.get('ae_norm', pd.Series(0.0, index=df.index)).clip(0, 1).values, + df.get('xgb_prob', pd.Series(0.0, index=df.index)).clip(0, 1).values, + ]).astype(np.float32) + y_np = df['label'].astype(int).values.astype(np.float32) - scaler = _SS() - X_scaled = scaler.fit_transform(X_meta) - clf = LogisticRegression(max_iter=500, C=1.0, random_state=42) - clf.fit(X_scaled, y) + X_t = torch.tensor(X_np) + y_t = torch.tensor(y_np) - # Enregistrer les poids pour la transparence SOC - coefs = dict(zip(self._feature_names, clf.coef_[0].tolist())) - self._clf = (clf, scaler) + # Split train/validation (20% dernier = validation) + n = len(X_t) + split = max(1, int(n * 0.8)) + X_train, X_val = X_t[:split], X_t[split:] + y_train, y_val = y_t[:split], y_t[split:] + + if len(X_train) < 10 or len(X_val) < 5: + # Pas assez pour un split fiable — entraîner sur tout + X_train, X_val = X_t, X_t + y_train, y_val = y_t, y_t + + mlp = MetaFusionMLP() + optimizer = torch.optim.Adam(mlp.parameters(), lr=1e-3, weight_decay=1e-4) + criterion = nn.BCELoss() + + best_val_loss = float('inf') + best_state = None + patience_counter = 0 + + for epoch in range(self.MAX_EPOCHS): + # ── Train step ── + mlp.train() + optimizer.zero_grad() + pred = mlp(X_train) + loss = criterion(pred, y_train) + loss.backward() + optimizer.step() + + # ── Validation step ── + mlp.eval() + with torch.no_grad(): + val_pred = mlp(X_val) + val_loss = criterion(val_pred, y_val).item() + + if val_loss < best_val_loss: + best_val_loss = val_loss + best_state = {k: v.clone() for k, v in mlp.state_dict().items()} + patience_counter = 0 + else: + patience_counter += 1 + if patience_counter >= self.PATIENCE: + break + + # Restaurer les meilleurs poids + if best_state is not None: + mlp.load_state_dict(best_state) + mlp.eval() + + self._mlp = mlp self._n_samples = len(df) + self._save() + + # Log pour la transparence SOC + layer1_w = mlp.net[0].weight.data.numpy() self._weights_log = { - 'coefs': {k: round(v, 4) for k, v in coefs.items()}, - 'intercept': round(float(clf.intercept_[0]), 4), + 'type': 'MetaFusionMLP', 'n_samples': self._n_samples, + 'epochs_run': epoch + 1, + 'best_val_loss': round(best_val_loss, 6), + 'layer1_weight_norm': round(float(np.linalg.norm(layer1_w)), 4), } - log_info(f"[MetaLearner] Entraîné sur {self._n_samples} labels — coefs: {coefs}") + log_info( + f"[MetaLearner] MLP entraîné sur {self._n_samples} labels — " + f"epochs={epoch + 1}, val_loss={best_val_loss:.6f}" + ) return True + # ── Inférence ──────────────────────────────────────────────────────── + def predict(self, eif_norm: np.ndarray, ae_norm: np.ndarray, xgb_prob: np.ndarray, hits: np.ndarray = None, correlated: np.ndarray = None) -> np.ndarray: - """Prédit P(bot) avec le méta-learner ou les poids fixes en fallback. + """Prédit P(bot) avec le MLP ou les poids fixes en fallback. + Seules les 3 features principales (eif, nf, xgb) sont utilisées par le MLP. + Les arguments hits/correlated sont ignorés par le MLP mais conservés + pour la compatibilité du fallback. Retourne un array de probabilités dans [0, 1]. """ - n = len(eif_norm) - if hits is None: - hits = np.ones(n) - if correlated is None: - correlated = np.zeros(n) - if self.is_trained: - clf, scaler = self._clf - X_meta = np.column_stack([ - np.clip(eif_norm, 0, 1), - np.clip(ae_norm, 0, 1), - np.clip(xgb_prob, 0, 1), - np.log1p(np.clip(hits, 1, None)), - np.clip(correlated, 0, 1), - ]) try: - X_scaled = scaler.transform(X_meta) - return clf.predict_proba(X_scaled)[:, 1] + X_np = np.column_stack([ + np.clip(eif_norm, 0, 1), + np.clip(ae_norm, 0, 1), + np.clip(xgb_prob, 0, 1), + ]).astype(np.float32) + with torch.no_grad(): + probs = self._mlp(torch.tensor(X_np)).numpy() + return probs except Exception as e: - log_info(f"[MetaLearner] Prédiction échouée ({e}) — fallback poids fixes") + log_info(f"[MetaLearner] MLP prédiction échouée ({e}) — fallback poids fixes") - # Fallback : formule linéaire avec poids fixes + # Fallback : formule linéaire avec poids fixes (cold start) _ae_w = AE_WEIGHT _xgb_w = XGB_WEIGHT return (1 - _xgb_w) * ((1 - _ae_w) * eif_norm + _ae_w * ae_norm) + _xgb_w * xgb_prob @property def is_trained(self) -> bool: - """Vrai si le méta-learner est actif (assez de labels pour être fiable).""" - return self._clf is not None and self._n_samples >= self.MIN_SAMPLES + """Vrai si le MLP est actif (modèle chargé ou entraîné).""" + return self._mlp is not None def build_labels_from_df(self, df: pd.DataFrame) -> pd.DataFrame: """Construit les labels supervisés pour l'entraînement du méta-learner. @@ -575,9 +658,9 @@ class MetaLearner: return pd.DataFrame(labeled) -# Singleton partagé entre les modèles Complet et Applicatif -_meta_learner_complet = MetaLearner() -_meta_learner_applicatif = MetaLearner() +# Singletons partagés entre les modèles Complet et Applicatif +_meta_learner_complet = MetaLearner(model_suffix='_complet') +_meta_learner_applicatif = MetaLearner(model_suffix='_applicatif') def get_meta_learner(name: str) -> MetaLearner: