feat(ml): replace logistic regression with MLP fusion and KS drift with ADWIN online learning

Replace the LogisticRegression meta-learner with a PyTorch MetaFusionMLP
(Linear(3,16)->BN->ReLU->Dropout->Linear(16,1)->Sigmoid) for non-linear
fusion of EIF, NF, and XGBoost scores. Replace KS-test + quantile digest
drift detection with ADWIN (adaptive sliding window, Hoeffding bound).
Replace weekly XGBoost batch retraining with River HoeffdingAdaptiveTree
for incremental online learning (learn_one per cycle). Update all thesis
documentation sections (2.4.2c, 2.4.3, 3.8, discussion, conclusion).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jacquin Antoine
2026-04-13 16:32:34 +02:00
parent c6cb12981c
commit 7894d39f1c
12 changed files with 502 additions and 306 deletions

View File

@ -11,7 +11,7 @@
## Résumé
Ce document présente une architecture opérationnelle de détection et classification du trafic HTTP malveillant, s'inscrivant dans la continuité des approches de génération 3 (fingerprinting multi-protocole et ML comportemental). Le système exploite 96 features organisées en 8 familles couvrant les couches réseau L3 à L7, corrélant des signaux TCP, TLS et HTTP en un vecteur unifié par session. La détection repose sur un ensemble triple-voix combinant un Extended Isolation Forest (EIF), un autoencodeur (AE) et XGBoost, fusionnés par une régression logistique calibrée activée à partir de 1 000 étiquettes accumulées. L'explicabilité est assurée par l'importance des features par profondeur d'isolation (EIF) et SHAP TreeExplainer (XGBoost). Le clustering de campagnes est réalisé par HDBSCAN dans l'espace latent 16 dimensions de l'autoencodeur, et la détection de flottes coordonnées par graphes bipartis via NetworkX. Le fingerprinting HTTP/2 passif — extraction des trames SETTINGS, WINDOW_UPDATE et de l'ordre des pseudo-headers côté serveur — exploite un signal déjà utilisé par des solutions industrielles (Akamai, Cloudflare, F5), ici implémenté via eBPF. L'infrastructure repose sur 16 modules Python (4 800 lignes), une base ClickHouse à double schéma (ja4_logs bruts TTL 2 h, ja4_processing agrégés TTL 7 j), des cycles d'analyse de 300 secondes, et traite en production plus de 3 millions de logs, environ 34 000 sessions par cycle, avec approximativement 777 anomalies détectées par cycle (≈ 2,3 % — chiffre opérationnel brut, non validé comme taux de détection). Le système intègre un moteur de profiling dynamique automatique des navigateurs (HDBSCAN sur les vecteurs H2 observés, centroïdes auto-appris, scoring temps réel par distance normalisée) qui s'adapte aux évolutions des piles HTTP/2 sans intervention manuelle.
Ce document présente une architecture opérationnelle de détection et classification du trafic HTTP malveillant, s'inscrivant dans la continuité des approches de génération 3 (fingerprinting multi-protocole et ML comportemental). Le système exploite 96 features organisées en 8 familles couvrant les couches réseau L3 à L7, corrélant des signaux TCP, TLS et HTTP en un vecteur unifié par session. La détection repose sur un ensemble triple-voix combinant un Extended Isolation Forest (EIF), un Normalizing Flow (NF) et XGBoost, fusionnés par un méta-modèle MLP (Multi-Layer Perceptron) non-linéaire calibré sur les étiquettes accumulées. L'explicabilité est assurée par l'importance des features par profondeur d'isolation (EIF) et SHAP TreeExplainer (XGBoost). Le clustering de campagnes est réalisé par HDBSCAN dans l'espace latent 16 dimensions de l'autoencodeur, et la détection de flottes coordonnées par graphes bipartis via NetworkX. Le fingerprinting HTTP/2 passif — extraction des trames SETTINGS, WINDOW_UPDATE et de l'ordre des pseudo-headers côté serveur — exploite un signal déjà utilisé par des solutions industrielles (Akamai, Cloudflare, F5), ici implémenté via eBPF. L'infrastructure repose sur 16 modules Python (4 800 lignes), une base ClickHouse à double schéma (ja4_logs bruts TTL 2 h, ja4_processing agrégés TTL 7 j), des cycles d'analyse de 300 secondes, et traite en production plus de 3 millions de logs, environ 34 000 sessions par cycle, avec approximativement 777 anomalies détectées par cycle (≈ 2,3 % — chiffre opérationnel brut, non validé comme taux de détection). Le système intègre un moteur de profiling dynamique automatique des navigateurs (HDBSCAN sur les vecteurs H2 observés, centroïdes auto-appris, scoring temps réel par distance normalisée) qui s'adapte aux évolutions des piles HTTP/2 sans intervention manuelle.
**Mots-clés** : fingerprinting réseau, JA4+, HTTP/2 fingerprinting, détection de bots, Extended Isolation Forest, autoencodeurs, ensemble hybride, corrélation TCP/TLS/HTTP, WAF, classification de trafic, apprentissage semi-supervisé, clustering HDBSCAN

View File

@ -43,7 +43,7 @@ Ce document décrit une architecture opérationnelle s'inscrivant dans la contin
1. **Corrélation TCP/TLS/HTTP** en temps réel via ja4ebpf (clé : `src_ip:src_port`, 256 shards, timeout orphelin 500 ms)
2. **Fingerprinting HTTP/2 passif** : extraction des trames SETTINGS, WINDOW_UPDATE, PRIORITY et de l'ordre des pseudo-headers directement depuis le stream TCP — approche déjà exploitée par des solutions industrielles (Akamai, Cloudflare, F5), ici implémentée via eBPF
3. **Architecture EIF bifurquée** : modèle complet (≈ 45 features L3→L7) et modèle applicatif (≈ 35 features L7 uniquement), évitant le biais de zérotage sur le trafic non corrélé — choix pragmatique de gestion des données manquantes
4. **Ensemble triple-voix avec fusion par régression logistique** : combinaison EIF + AE + XGBoost avec régression logistique apprise sur étiquettes accumulées
4. **Ensemble triple-voix avec fusion par MLP non-linéaire** : combinaison EIF + NF + XGBoost avec méta-modèle MLP apprenant les interactions non-linéaires entre les trois voix
5. **HDBSCAN dans l'espace latent AE** : clustering de campagnes par similarité de comportement compressé en 16 dimensions
6. **Détection de dérive adversariale** : distinction entre dérive organique (mises à jour navigateur) et manipulation directionnelle coordonnée
7. **8 features comportementales avancées** : application de statistiques standard (déviation de Benford, entropie de transition markovienne, autocorrélation lag-1, délai root-to-first-asset, diversité de hosts, uniformité de couverture cross-host) au domaine de la détection de bots

View File

@ -413,20 +413,20 @@ Le système de détection combine trois « voix » complémentaires :
┌──────────────┼──────────────┐
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌────▼──────┐
│ EIF │ │ AE │ │ XGBoost │
│ (semi- │ │ (semi- │ │(supervisé)│
│supervisé) │ │supervisé) │ │ │
│ EIF │ │ NF │ │ XGBoost │
│ (semi- │ │ (Normal- │ │(supervisé)│
│supervisé) │ │ izing │ │ │
│ │ │ Flow) │ │ │
└─────┬─────┘ └─────┬─────┘ └────┬──────┘
│ │ │
eif_norm ae_norm xgb_prob
eif_norm nf_norm xgb_prob
│ │ │
└──────────────┼──────────────┘
┌────────▼────────┐
Fusion LR
(régression
logistique,
│ ≥1000 labels) │
Meta-Model
Stacking MLP
(non-linéaire)
└────────┬────────┘
┌────────▼────────┐
@ -435,33 +435,47 @@ Le système de détection combine trois « voix » complémentaires :
└─────────────────┘
```
**Formule de combinaison** :
**Limites de la fusion linéaire**
```python
final = (1 - XGB_WEIGHT) × ((1 - AE_WEIGHT) × eif_norm + AE_WEIGHT × ae_norm) \
+ XGB_WEIGHT × xgb_prob
```
Valeurs par défaut : `AE_WEIGHT=0.30`, `XGB_WEIGHT=0.20`. Configurable via variables d'environnement pour ajustement en production sans modification du code.
**Fusion LR**
Le Fusion LR est une régression logistique activée lorsqu'au moins 1 000 étiquettes ont été accumulées (sessions DENY Anubis + annotations manuelles + seuillage de confiance).
Régression logistique : modèle linéaire probabiliste qui apprend des poids w pour chaque signal intermédiaire en minimisant l'entropie croisée binaire sur les étiquettes accumulées :
Une fusion linéaire — combinaison convexe pondérée ou régression logistique — ne peut capturer que des frontières de décision linéaires dans l'espace des scores intermédiaires. Or les signaux EIF, NF et XGBoost peuvent exhiber des interactions non-linéaires impossibles à modéliser par une combinaison linéaire :
```
P(bot) = σ(w1×eif + w2×ae + w3×xgb + w4×volume + bias)
Problème XOR des scores :
NF élevé
┌────────┐
EIF bas │ BOT ✓ │ ← combinaison inhabituelle = bot confirmé
├────────┤
EIF haut │normal ✗│ ← faux positif fréquent de l'EIF seul
└────────┘
NF bas
```
σ est la fonction sigmoïde : σ(z) = 1 / (1 + e^{-z})
Exemple concret : un bot utilisant un outilHeadless avec un JA4 fingerprint légitime (NF bas) mais un comportement de navigation atypique (EIF élevé). Le XGBoost peut compenser, mais la fusion linéaire ne peut apprendre la relation *« EIF élevé ET XGB élevé MAIS NF bas = bot »* — elle ne fait que sommer les contributions indépendantes.
Les poids w1w4 sont appris, permettant au système de calibrer automatiquement l'importance relative de chaque voix en fonction du type de trafic en production. En dessous de 1 000 étiquettes, le système revient aux poids fixes : `(eif: 0.50, ae: 0.30, xgb: 0.20)`.
**Stacking OOF (Out-of-Fold) et MLP méta-modèle**
Pour résoudre cette limitation, le système utilise un méta-modèle non-linéaire de type MLP (*Multi-Layer Perceptron*) entraîné via stacking Out-of-Fold :
1. **Prédictions OOF** : les modèles de base (EIF, NF, XGBoost) produisent des prédictions sur des plis de validation croisée temporelle, garantissant que le méta-modèle n'a jamais vu les données d'entraînement des modèles de base — évitant le surapprentissage (*information leakage*).
2. **Méta-modèle MLP** : un réseau de neurones à 2 couches apprend la fonction de fusion optimale :
```
MetaFusionMLP : [eif, nf, xgb] → Linear(3,16) → BatchNorm → ReLU → Dropout(0.2)
→ Linear(16,1) → Sigmoid → P(bot)
```
- **BatchNorm** : normalise les activations intermédiaires, stabilise l'apprentissage et régularise implicitement — crucial avec peu de données labellisées.
- **Dropout(0.2)** : désactive aléatoirement 20% des neurones pendant l'entraînement, prévenant la co-adaptation des poids.
- **Early stopping** (patience = 5 epochs) : arrête l'entraînement dès que la loss de validation ne s'améliore plus, évitant le surapprentissage.
- **Weight decay** ($\lambda = 10^{-4}$) : pénalité L2 sur les poids du MLP pour une régularisation supplémentaire.
Le MLP apprend des frontières de décision non-linéaires dans l'espace 3D `[eif_norm, nf_norm, xgb_prob]`, capable de résoudre les patterns XOR et les interactions conditionnelles entre les trois voix. Le système de détection peut ainsi combiner automatiquement les signaux de manière optimale en fonction du type de trafic observé en production.
**Calendrier de retraining** :
- XGBoost : hebdomadaire sur les étiquettes accumulées, après filtrage Cleanlab des labels SOC bruyants (voir ci-dessous)
- EIF : toutes les 24 heures
- AE : continu avec arrêt précoce sur la loss de validation
- HAT (supervisé) : apprentissage incrémental à chaque cycle (300s) sur les étiquettes accumulées, après filtrage Cleanlab des labels SOC bruyants (voir ci-dessous)
- EIF : toutes les 24 heures ou sur détection ADWIN
- NF : continu avec arrêt précoce sur la loss de validation
**Filtrage des labels SOC bruyants (Cleanlab)** :
@ -481,7 +495,7 @@ X_clean, y_clean = X[~noisy_mask], y[~noisy_mask]
Ce mécanisme protège le modèle contre l'empoisonnement par des faux positifs mal corrigés ou des biais de confirmation des analystes. Le taux de labels filtrés est loggé pour surveillance. En cas d'échec de Cleanlab (erreur mémoire, dépendance manquante), le pipeline revient aux données brutes sans interruption.
#### 2.4.3 Concept Drift et retraining adaptatif
#### 2.4.3 Concept Drift : ADWIN et Online Learning
**Définition du concept drift**
@ -494,28 +508,74 @@ En apprentissage automatique, le concept drift désigne le changement des propri
En détection de bots, le drift adversarial est particulièrement critique : les attaquants adaptent délibérément leurs outils pour contourner les modèles déployés.
**Test de Kolmogorov-Smirnov**
**Limites de l'approche KS + quantile digest**
Le test KS (Kolmogorov-Smirnov) mesure la différence maximale entre deux fonctions de distribution empiriques cumulées (ECDF) :
L'approche précédente détectait la dérive en sauvegardant 5 quantiles (p10, p25, p50, p75, p90) par feature à l'entraînement, puis en reconstruisant une distribution synthétique par interpolation de la CDF inverse et en appliquant un test de Kolmogorov-Smirnov entre cette distribution et la distribution courante. Cette méthode souffre de trois lacunes fondamentales :
1. **5 quantiles ne captent pas les distributions bimodales** : les features de trafic web comme `asset_ratio`, `post_ratio` et `orphan_ratio` ont souvent des distributions bimodales (deux populations de trafic distinctes). Cinq points ne suffisent pas à reconstruire fidèlement ces distributions — une dérive dans un mode peut être masquée par la stabilité de l'autre mode.
2. **Reconstruction par interpolation linéaire** : interpoler entre 5 quantiles suppose une distribution unimodale et lisse. Pour les distributions skewed à queue lourde (timing inter-requêtes, taille des payloads), l'interpolation sous-estime systématiquement les valeurs extrêmes, rendant le test KS peu fiable.
3. **Seuil de 30% arbitraire** : le seuil `DRIFT_THRESHOLD = 0.30` (30% de features en dérive) est un hyperparamètre non fondé statistiquement. Il est trop sensible pour les périodes de forte activité (faux positifs) et trop conservateur pour les attaques furtives ne touchant que quelques features.
**ADWIN : fenêtre glissante adaptative**
[ADWIN (ADaptive WINdowing, Bifet & Gavalda, 2007)](https://dl.acm.org/doi/10.1145/1242572.1242660) résout ces problèmes en maintenant une fenêtre de longueur variable sur le flux de données. Le principe :
1. **Fenêtre adaptative** : ADWIN maintient une fenêtre $W$ de valeurs récentes. La taille de $W$ s'ajuste automatiquement — elle grandit quand la distribution est stable et rétrécit quand un changement est détecté.
2. **Test de Hoeffding** : pour chaque coupe possible $W = W_0 \cup W_1$, ADWIN compare les moyennes $\hat{\mu}_0$ et $\hat{\mu}_1$ des deux sous-fenêtres. Si la différence dépasse la borne de Hoeffding :
$$|\hat{\mu}_0 - \hat{\mu}_1| \geq \sqrt{\frac{1}{2m} \ln\frac{4}{\delta}}$$
où $m = \frac{1}{1/|W_0| + 1/|W_1|}$ et $\delta$ est le paramètre de confiance, alors un changement est détecté et la sous-fenêtre la plus ancienne est supprimée.
3. **Pas de seuil arbitraire** : la détection repose uniquement sur la borne de Hoeffding (garantie probabiliste), paramétrée par $\delta$ (défaut : 0.002). Aucun seuil de « 30% de features en dérive » n'est nécessaire au niveau de chaque feature — seul le nombre de features driftant simultanément déclenche le retraining global.
**Architecture de monitoring ADWIN** :
```
D = max|F1(x) - F2(x)|
Cycle n (300s)
├── Calculer μ_f pour chaque feature f sur le trafic baseline
├── ADWIN_f.update(μ_f) pour chaque feature
│ └── Fenêtre interne W_f ajustée automatiquement
├── ADWIN_f.detected_change() ?
│ └── Si oui → feature f marquée « en dérive » ce cycle
└── Si > 30% features en dérive → flag retraining EIF/NF
Si > 50% features en dérive → alerte ADVERSARIAL_DRIFT
```
où F1 et F2 sont les ECDFs de la distribution courante et de la distribution de référence (baseline). Si D dépasse la valeur critique (déterminée par les tables de la distribution KS pour un niveau de confiance α), les deux échantillons sont considérés comme provenant de distributions différentes. Avantage : test non-paramétrique, aucune hypothèse sur la forme de la distribution.
**Online Learning : Hoeffding Adaptive Tree**
**Méthode de détection de dérive** :
Le retraining hebdomadaire par batch (`XGBClassifier.fit()` sur l'ensemble des labels accumulés) est remplacé par un apprentissage incrémental via [River](https://riverml.xyz/), une bibliothèque spécialisée en stream mining. Le modèle utilisé est le `HoeffdingAdaptiveTreeClassifier` (HAT) :
1. Sauvegarde avec chaque modèle sérialisé de l'approximation 5-quantiles par feature : (p10, p25, p50, p75, p90)
2. Génération d'échantillons synthétiques par interpolation de la CDF inverse (quantile function)
3. Test KS + divergence KL sur chaque feature entre la distribution courante et la baseline
4. Feature marquée comme « en dérive » si le test KS OU la divergence KL dépasse le seuil configuré
5. Retraining forcé si > 30 % des features dérivent simultanément
6. **Détection de dérive adversariale** : si de nombreuses features dérivent simultanément dans la même direction (score de corrélation directionnelle élevé), génération d'une alerte spécifique distinguant la manipulation intentionnelle (dérive adversariale coordonnée) de l'évolution organique (mises à jour navigateur ou changements de comportement naturels)
- **Arbre de décision incrémental** : construit l'arbre de décision progressivement, un exemple à la fois via `learn_one(x, y)`. À chaque split, le test de Hoeffding garantit que le split choisi est (probablement) le même que celui qu'un arbre batch aurait choisi avec les mêmes données.
- **Adaptatif** : utilise des estimateurs de fenêtre ADWIN à chaque nœud pour remplacer les statistiques obsolètes — le modèle s'adapte automatiquement au concept drift sans retraining explicite.
- **Apprentissage par cycle** : à chaque cycle de 300s, les nouveaux labels accumulés sont injectés un par un via `learn_one()`. Le modèle s'améliore continuellement, rendant le lourd retraining hebdomadaire obsolète.
**Validation gate** : si le taux d'anomalie sur le jeu de validation dépasse 20 % après retraining, le nouveau modèle est rejeté et le modèle précédent est conservé, avec génération d'une alerte (baseline contaminée).
```
Cycle 300s
├── Charger HAT sérialisé (pickle)
├── Pour chaque label accumulé ce cycle :
│ model.learn_one({feature: value, ...}, label)
├── Persister HAT mis à jour
└── Prédiction : model.predict_proba_many(df) → P(bot)
```
**Limites de l'approximation 5-quantiles** : adéquate pour les distributions unimodales, mais peut manquer les dérives bimodales dans `asset_ratio`, `post_ratio`, `orphan_ratio`. Extension à p5/p95 ou à t-digest identifiée comme travail futur.
Le passage au stream mining élimine trois problématiques majeures du batch training :
- **Latence de mise à jour** : de hebdomadaire à chaque cycle (300s)
- **Coût mémoire** : plus besoin de charger 50 000 labels en RAM
- **Stale model** : le modèle est toujours à jour par rapport au concept courant
**Validation gate** : conservée — si le taux d'anomalie sur le jeu de validation dépasse 20% après retraining EIF/NF, le nouveau modèle est rejeté et le modèle précédent conservé.
#### 2.4.4 Modélisation des phases d'attaque

View File

@ -70,7 +70,7 @@
│ │ 4. EIF bifurqué (complet/appli) │ │
│ │ 5. NF log-likelihood scoring │ │
│ │ 6. XGBoost probabilité │ │
│ │ 7. Fusion LR fusion │ │
│ │ 7. Meta-Model MLP fusion │ │
│ │ 8. HDBSCAN clustering (NF latent) │ │
│ │ 9. Écriture résultats ClickHouse │ │
│ └──────────────────────────────────┘ │
@ -240,7 +240,7 @@ Session entrante
├── asn_label == 'human' ?
│ ── OUI → baseline EIF training (sans étiquette bot)
└── Sinon → Triple-voix : EIF + NF + XGBoost + Fusion LR
└── Sinon → Triple-voix : EIF + NF + XGBoost + Meta-Model Stacking (MLP non-linéaire)
```
#### Seuil adaptatif

View File

@ -115,7 +115,7 @@ Le fingerprinting réseau opère sans déchiffrement TLS (les métadonnées TLS
| HDBSCAN (profiling) | ~25 s | Quotidien, ~200k sessions H2 dédupliquées, min_cluster=1000 |
| Dynamic matcher scoring | < 1 ms | Par session, lookup en mémoire contre ~510 profils |
| GraphSAGE (fleet.py) | ~80 ms | Graphe d'IPs, 2 couches SAGEConv, GPU/CPU |
| Fusion LR LR | < 10 ms | Régression logistique, négligeable |
| Fusion MLP | < 10 ms | MLP 2 couches, négligeable |
| **Cycle complet** | **~300 secondes** | EIF + AE + XGBoost + HDBSCAN + GraphSAGE |
La durée du cycle (300 s = 5 minutes) est contrainte principalement par la **fenêtre d'agrégation ClickHouse** (1 heure glissante avec recalcul toutes les 5 minutes), non par les temps d'exécution ML.
@ -133,9 +133,9 @@ La durée du cycle (300 s = 5 minutes) est contrainte principalement par la **fe
- À 34 000 sessions/cycle : ~100 ms acceptable
- À 500 000 sessions/cycle (scaling ×15) : ~2 s encore tolérable
**Fusion LR logistic regression** : O(n × d) entraînement, d = 3 features d'entrée (scores EIF, AE, XGBoost). Temps négligeable quelle que soit la taille.
**Fusion MLP** : O(n × d) inférence avec d = 3 features d'entrée (scores EIF, NF, XGBoost), MLP 2 couches (16 neurones). Temps négligeable quelle que soit la taille.
**Limite architecturale principale** : le modèle XGBoost hebdomadaire nécessite un jeu de labels accumulés via le feedback SOC. À faible volume de labels (< 500 sessions étiquetées par semaine), XGBoost ne converge pas correctement. Ce problème est partiellement atténué par le filtrage Cleanlab qui élimine les labels douteux (détail §3.8), mais reste identifié comme axe d'amélioration futur 6.6 online learning).
**Limite architecturale principale** : le modèle supervisé (Hoeffding Adaptive Tree) s'améliore incrémentalement à chaque cycle via `learn_one()`, mais nécessite un flux continu de labels fiables. À faible volume de labels (< 500 sessions étiquetées), le HAT converge lentement. Ce problème est partiellement atténué par le filtrage Cleanlab qui élimine les labels douteux (détail §3.8), mais la qualité du feedback SOC reste le goulot d'étranglement principal.
**Overhead de l'uprobe SSL_read** : un uprobe attaché à `SSL_read` se déclenche à *chaque* appel de lecture TLS, y compris pour les gros transferts de fichiers (images, vidéos, scripts JS volumineux), une seule requête peut générer des dizaines d'appels `SSL_read` successifs transportant des frames HTTP/2 DATA sans intérêt pour le fingerprinting. Sous forte charge (> 10 000 connexions TLS actives simultanées), cet overhead peut dégrader les performances du serveur web de manière mesurable. Les mitigations recommandées sont : (1) filtrer côté eBPF les invocations dont le buffer ne contient pas les magic bytes HTTP/2 ou HTTP/1.x (`GET `, `POST `, etc.) avant de soumettre au ring buffer ; (2) ignorer les frames HTTP/2 de type DATA de grande taille (longueur payload > 16 384 octets) qui ne contiennent pas d'en-têtes de requête ; (3) appliquer du sampling probabiliste (ex. 1 appel sur 10) pour les connexions déjà identifiées par leur JA4 comme des navigateurs légitimes connus.
@ -208,10 +208,10 @@ Un Variational Autoencoder bêta ([β-VAE, Higgins et al., 2017](https://openrev
Des modèles d'état-espace pour la modélisation des phases d'attaque — permet de détecter explicitement les transitions de phase (reconnaissance → exploitation) au lieu de seulement scorer chaque session isolément. Complémentaire au signal JA4 Drift (§5.5).
**t-digest pour la dérive conceptuelle** :
Remplacement de l'approximation à 5 quantiles actuellement utilisée pour la détection de drift ([quantile_drift_score]) par la structure **t-digest** , qui supporte les distributions bimodales et les queues longues avec précision adaptative. Critique pour les features à distribution bimodale comme `hit_velocity` (distribution séparée bots/humains).
Remplacement effectué — ADWIN (fenêtre glissante adaptative avec borne de Hoeffding) remplace l'approximation à 5 quantiles + KS. ADWIN gère nativement les distributions bimodales et les queues longues sans reconstruction de CDF.
**XGBoost → online learning** :
Remplacement du ré-entraînement hebdomadaire XGBoost par un apprentissage incrémental (gradient boosting online, par exemple [XGBoost Federated](https://xgboost.readthedocs.io/en/stable/tutorials/federated_learning.html) ou RIVER framework) permettant des mises à jour par cycle au lieu d'attendre l'accumulation d'une semaine de labels.
**Supervisé → online learning** :
Remplacement effectué — le `HoeffdingAdaptiveTreeClassifier` de River remplace le XGBoost batch hebdomadaire. Le modèle s'améliore incrémentalement à chaque cycle (300s) via `learn_one()`, éliminant la latence de mise à jour hebdomadaire.
#### Priorité 3 — Infrastructure et protocoles émergents

View File

@ -21,7 +21,7 @@ Un pipeline ML combinant :
- **Isolation Forest Étendu (EIF)** ([Hariri et al., 2021](https://ieeexplore.ieee.org/document/8888179)) : modèle non-supervisé fondé sur l'isolation aléatoire d'instances anormales dans des espaces de features basse-dimension
- **Autoencodeur variationnel (AE)** ([Mirsky et al., NDSS 2018](https://www.ndss-symposium.org/ndss-paper/kitsune-an-ensemble-of-autoencoders-for-online-network-intrusion-detection/)) : détection d'anomalies par reconstruction, capturant les corrélations entre features
- **XGBoost supervisé** : correction des erreurs systématiques des modèles non-supervisés via labels SOC accumulés
- **Fusion par régression logistique** : fusion des trois scores en un score final calibré
- **Fusion par MLP méta-modèle** : fusion non-linéaire des trois scores en un score final calibré
Le pipeline intègre un mécanisme de **détection de dérive conceptuelle** (basé sur le percentile 5 des scores négatifs) distinguant la dérive organique (évolution naturelle du trafic) de la dérive adversariale (manipulation intentionnelle de la distribution).

View File

@ -690,18 +690,15 @@ Clustering des anomalies pour identifier les campagnes coordonnées :
Résultat : colonne `campaign_id` (-1 = isolé, 0 = membre d'un cluster).
#### `_compute_drift_score(baseline_stats, current_baseline, features)`
#### `ADWINDriftMonitor(features, delta=0.002)`
Détection de dérive conceptuelle entre la baseline d'entraînement et la baseline
courante. Deux méthodes :
Détection de dérive conceptuelle par ADWIN (fenêtre glissante adaptative).
Maintient un détecteur ADWIN par feature, mis à jour à chaque cycle avec la
moyenne de la feature sur le trafic baseline.
1. **Méthode principale** : Comparaison par quantiles interpolés (KS-like). Fraction
de features avec p < 0.05.
2. **Fallback z-score** (`_compute_drift_score_zscore`) :
`z = |mean_current - mean_trained| / std_trained`. Fraction de features avec z > 2.0.
Si la fraction dépasse `DRIFT_THRESHOLD` (0.30), le modèle est ré-entraîné et
l'événement `DRIFT_DETECTED` est journalisé.
Si la fraction de features en dérive dépasse `DRIFT_THRESHOLD` (0.30), le
modèle EIF/NF est ré-entraîné. Si > 50% des features dérivent, une alerte
`ADVERSARIAL_DRIFT` est générée.
---

View File

@ -23,19 +23,19 @@
## A1 — Détection de dérive conceptuelle
### ✅ IMPLÉMENTÉ
### ✅ IMPLÉMENTÉ (ADWIN)
**Module** : `scoring.py`fonctions `_compute_drift_score()` et `_compute_drift_score_zscore()`
**Module** : `scoring.py`classe `ADWINDriftMonitor` (remplace `_compute_drift_score`)
**Différences avec la proposition initiale** :
| Proposition | Implémentation |
|-------------|----------------|
| KS-test (scipy `ks_2samp`) | Méthode principale : comparaison par quantiles interpolés (KS-like sans scipy). Fallback : z-score `\|μ_current - μ_trained\| / σ_trained > 2.0` |
| Seuil par `p_value < 0.05` | Même seuil p < 0.05 pour la méthode quantile ; z > 2.0 pour le fallback |
| KS-test + quantile digest | ADWIN (fenêtre glissante adaptative, borne de Hoeffding) |
| Seuil par `p_value < 0.05` | Borne de Hoeffding automatique (delta = 0.002) |
| `DRIFT_THRESHOLD` (30%) | ✅ Identique : `DRIFT_THRESHOLD = 0.30` (fraction de features en dérive) |
| Sauvegarde dans `.meta.json` | `baseline_stats` avec `{mean, std, p25, p75}` par feature |
| Événement `DRIFT_DETECTED` | ✅ Journalisé avec la liste des features déroutantes |
| Sauvegarde dans `.meta.json` | ADWIN stateful — pas de sauvegarde nécessaire (fenêtre adaptative) |
| Événement `DRIFT_DETECTED` | ✅ Journalisé avec la liste des features dérivées |
**Appelé depuis** : `models.py``load_or_train_model()`, avant la décision de chargement vs retrain.

View File

@ -1,10 +1,11 @@
"""Gestion des modèles : chargement, entraînement, versionnement.
IsolationForest (EIF), Normalizing Flow (PyTorch/FrEIA), XGBoost supervisé.
IsolationForest (EIF), Normalizing Flow (PyTorch/FrEIA), Hoeffding Adaptive Tree (River).
"""
import os
import json
import glob
import pickle
import joblib
import numpy as np
import pandas as pd
@ -18,7 +19,7 @@ from .config import (
IsolationForest, StandardScaler,
)
from .log import log_info, log_decision, append_training_history
from .scoring import compute_drift_score
from .scoring import ADWINDriftMonitor
# Imports conditionnels depuis config (déjà importés une seule fois)
if EIF_AVAILABLE:
@ -27,6 +28,12 @@ if EIF_AVAILABLE:
if TORCH_AVAILABLE:
from .config import torch, nn
try:
from river import forest as river_forest
RIVER_AVAILABLE = True
except ImportError:
RIVER_AVAILABLE = False
if XGB_AVAILABLE:
import xgboost as xgb
from sklearn.model_selection import cross_val_predict
@ -36,11 +43,14 @@ if XGB_AVAILABLE:
CLEANLAB_AVAILABLE = True
except ImportError:
CLEANLAB_AVAILABLE = False
else:
CLEANLAB_AVAILABLE = False
# ─── Caches de modèles ─────────────────────────────────────────────────────
_model_cache: dict = {}
_xgb_cache: dict = {}
_drift_monitors: dict[str, ADWINDriftMonitor] = {}
# ═══════════════════════════════════════════════════════════════════════════════
@ -258,9 +268,100 @@ def _load_xgb_labels(client, features: list, min_labels: int = XGB_MIN_LABELS) -
def load_or_train_xgb(name: str, client, features: list, cycle_id: str):
"""Charge ou entraîne le modèle XGBoost supervisé.
"""Charge ou met à jour le modèle supervisé en ligne (Hoeffding Adaptive Tree).
Retourne (XGBClassifier, list[str] features) ou (None, None) si indisponible.
Remplace le XGBClassifier hebdomadaire par un HoeffdingAdaptiveTreeClassifier
de River, mis à jour incrémentalement à chaque cycle via learn_one().
Retourne (model, list[str] features) ou (None, None) si indisponible.
Le model retourné expose predict_proba_many(df) → DataFrame.
"""
if not (XGB_AVAILABLE or RIVER_AVAILABLE) or XGB_WEIGHT <= 0:
return None, None
model_path = _river_model_path(name)
meta_path = _xgb_meta_path(name)
# Charger le modèle River existant
model = None
xgb_features = features
n_seen = 0
if os.path.exists(model_path):
try:
with open(model_path, 'rb') as f:
model = pickle.load(f)
with open(meta_path) as f:
meta = json.load(f)
xgb_features = meta.get('features', features)
n_seen = meta.get('n_total_labels', 0)
log_info(f"[River][{name}] HAT rechargé ({n_seen} labels cumulés, {len(xgb_features)} features).")
except Exception as exc:
log_info(f"[River][{name}] Erreur chargement : {exc} — nouveau modèle.")
model = None
# Créer un nouveau modèle si nécessaire
if model is None:
try:
model = river_forest.HoeffdingAdaptiveTreeClassifier(
grace_period=50, max_depth=12, seed=42,
)
except Exception:
# Fallback vers XGBoost batch si River indisponible
return _load_or_train_xgb_batch(name, client, features, cycle_id)
# ── Apprentissage incrémental sur les labels du cycle ──────────────
X, y, usable_features = _load_xgb_labels(client, features)
if X is not None and usable_features is not None:
xgb_features = usable_features
X_df = pd.DataFrame(X, columns=xgb_features)
n_new = 0
for i in range(len(X_df)):
try:
x_dict = {col: float(X_df.iloc[i][col]) for col in xgb_features}
model.learn_one(x_dict, int(y[i]))
n_new += 1
except Exception:
continue
n_seen += n_new
# Persister le modèle mis à jour
os.makedirs(os.path.dirname(model_path), exist_ok=True)
with open(model_path, 'wb') as f:
pickle.dump(model, f)
meta = {
'trained_at': datetime.now().isoformat(),
'n_total_labels': n_seen,
'n_new_labels': n_new,
'n_features': len(xgb_features),
'features': xgb_features,
'model_name': name,
'algorithm': 'HoeffdingAdaptiveTreeClassifier',
}
with open(meta_path, 'w') as f:
json.dump(meta, f, indent=2)
log_info(f"[River][{name}] +{n_new} labels incrémentaux ({n_seen} total) — HAT mis à jour.")
log_decision('RIVER_UPDATED', cycle_id, name, meta)
else:
if n_seen == 0:
log_info(f"[River][{name}] Pas de labels — modèle supervisé désactivé ce cycle.")
return None, None
log_info(f"[River][{name}] Pas de nouveaux labels — HAT existant réutilisé ({n_seen} labels).")
return model, xgb_features
def _river_model_path(name: str) -> str:
"""Chemin du modèle River sérialisé."""
return os.path.join(MODEL_DIR, f'river_hat_{name}.pkl')
def _load_or_train_xgb_batch(name, client, features, cycle_id):
"""Fallback : entraîne un XGBoost classique si River est indisponible.
Conservé pour la compatibilité si river n'est pas installé.
Retourne (XGBClassifier, list[str] features) ou (None, None).
"""
if not XGB_AVAILABLE or XGB_WEIGHT <= 0:
return None, None
@ -268,87 +369,36 @@ def load_or_train_xgb(name: str, client, features: list, cycle_id: str):
model_path = _xgb_model_path(name)
meta_path = _xgb_meta_path(name)
# Charger le modèle existant si récent
if os.path.exists(model_path) and os.path.exists(meta_path):
try:
with open(meta_path) as f:
meta = json.load(f)
trained_at = datetime.fromisoformat(meta['trained_at'])
age_h = (datetime.now() - trained_at).total_seconds() / 3600
if age_h < XGB_RETRAIN_INTERVAL_H:
model = xgb.XGBClassifier()
model.load_model(model_path)
log_info(f"[XGB][{name}] Modèle rechargé ({age_h:.1f}h / {XGB_RETRAIN_INTERVAL_H}h, {meta.get('n_labels', '?')} labels).")
return model, meta.get('features', features)
except Exception as exc:
log_info(f"[XGB][{name}] Erreur chargement : {exc}")
model = xgb.XGBClassifier()
model.load_model(model_path)
return model, meta.get('features', features)
except Exception:
pass
# Entraîner un nouveau modèle
X, y, xgb_features = _load_xgb_labels(client, features)
if X is None:
log_info(f"[XGB][{name}] Labels insuffisants (< {XGB_MIN_LABELS}) — XGBoost désactivé ce cycle.")
# Tenter de réutiliser un modèle ancien
if os.path.exists(model_path) and os.path.exists(meta_path):
try:
model = xgb.XGBClassifier()
model.load_model(model_path)
with open(meta_path) as f:
meta = json.load(f)
return model, meta.get('features', features)
except Exception:
pass
return None, None
scale_pos = max(1, int((y == 0).sum() / max((y == 1).sum(), 1)))
# ── Cleanlab : filtrage des labels SOC bruyants ─────────────────────
if CLEANLAB_AVAILABLE:
try:
quick_model = xgb.XGBClassifier(
n_estimators=80, max_depth=4, learning_rate=0.15,
eval_metric='logloss', random_state=42, n_jobs=-1,
tree_method='hist',
)
pred_probs = cross_val_predict(
quick_model, X, y, cv=3, method='predict_proba',
)
issues = find_label_issues(
labels=y, pred_probs=pred_probs,
)
noisy_idx = issues[issues['is_label_issue'] == True].index.to_numpy()
if len(noisy_idx) > 0:
keep = np.ones(len(y), dtype=bool)
keep[noisy_idx] = False
X, y = X[keep], y[keep]
pct = len(noisy_idx) / (len(keep)) * 100
log_info(
f"[XGB][{name}] Cleanlab : {len(noisy_idx)}/{len(keep)} "
f"labels bruyants supprimés ({pct:.1f}%)"
)
scale_pos = max(1, int((y == 0).sum() / max((y == 1).sum(), 1)))
except Exception as exc:
log_info(f"[XGB][{name}] Cleanlab échoué, labels bruts conservés : {exc}")
model = xgb.XGBClassifier(
n_estimators=200, max_depth=6, learning_rate=0.1,
scale_pos_weight=scale_pos, eval_metric='logloss',
random_state=42, n_jobs=-1,
tree_method='hist',
random_state=42, n_jobs=-1, tree_method='hist',
)
model.fit(X, y, verbose=False)
model.save_model(model_path)
meta = {
'trained_at': datetime.now().isoformat(),
'n_labels': len(y), 'n_positive': int(y.sum()),
'n_negative': int((y == 0).sum()), 'n_features': len(xgb_features),
'features': xgb_features,
'scale_pos_weight': scale_pos, 'model_name': name,
'features': xgb_features, 'model_name': name,
}
with open(meta_path, 'w') as f:
json.dump(meta, f, indent=2)
log_info(f"[XGB][{name}] Modèle entraîné : {len(y)} labels ({y.sum()} positifs), scale_pos_weight={scale_pos}")
log_decision('XGB_TRAINED', cycle_id, name, meta)
return model, xgb_features
@ -369,17 +419,27 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list,
age_h = (datetime.now() - trained_at).total_seconds() / 3600
age_ok = age_h < RETRAIN_INTERVAL_H
# A1 — Dérive conceptuelle : comparer la distribution actuelle avec celle de l'entraînement
# A1 — Dérive conceptuelle via ADWIN (fenêtre glissante adaptative)
drift_score = 0.0
drift_forced = False
if age_ok and 'baseline_stats' in meta:
drift_score = compute_drift_score(meta['baseline_stats'], human_baseline, features,
name=name, cycle_id=cycle_id)
# Obtenir ou créer le moniteur ADWIN pour ce modèle
if name not in _drift_monitors:
_drift_monitors[name] = ADWINDriftMonitor(features)
drift_monitor = _drift_monitors[name]
if drift_monitor.available:
# Alimenter ADWIN avec les moyennes de features du cycle courant
feature_means = {}
for f in features:
if f in human_baseline.columns:
feature_means[f] = float(human_baseline[f].mean())
drift_score = drift_monitor.check_drift(feature_means, name=name, cycle_id=cycle_id)
if drift_score >= DRIFT_THRESHOLD:
drift_forced = True
log_info(f"[{name}] Dérive détectée ({drift_score:.0%} features) — retraining forcé.")
log_info(f"[{name}] Dérive ADWIN détectée ({drift_score:.0%} features) — retraining forcé.")
log_decision('DRIFT_DETECTED', cycle_id, name, {
'version_id': meta['version_id'], 'drift_score': round(drift_score, 3),
'version_id': meta['version_id'], 'drift_rate': round(drift_score, 3),
'drift_threshold': DRIFT_THRESHOLD, 'model_age_hours': round(age_h, 2)
})
@ -465,18 +525,12 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list,
return joblib.load(model_path), ae_prev, meta.get('features', features)
log_info(f"[{name}] Aucun modèle précédent — utilisation du modèle rejeté par défaut.")
# A1/§4 — Sauvegarder les statistiques de distribution avec quantile digest 9 points
# (p5…p95) pour une meilleure fidélité de la détection de dérive KS+KL
# A1 — Statistiques de référence pour la baseline (mean/std uniquement,
# la détection de dérive est assurée par ADWIN en temps réel)
baseline_stats = {
f: {
'mean': float(X[f].mean()), 'std': float(X[f].std()),
'p5': float(X[f].quantile(0.05)),
'p10': float(X[f].quantile(0.10)),
'p25': float(X[f].quantile(0.25)),
'p50': float(X[f].quantile(0.50)),
'p75': float(X[f].quantile(0.75)),
'p90': float(X[f].quantile(0.90)),
'p95': float(X[f].quantile(0.95)),
'mean': float(X[f].mean()),
'std': float(X[f].std()),
}
for f in features
}

View File

@ -138,7 +138,7 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
unknown_traffic['raw_anomaly_score'] = raw_scores
unknown_traffic['model_name'] = name
# XGBoost supervisé — troisième voix (si labels historiques disponibles)
# Modèle supervisé — troisième voix (Hoeffding Adaptive Tree ou XGBoost fallback)
unknown_traffic['xgb_prob'] = 0.0
xgb_model_ref = None # Référence pour SHAP TreeExplainer (§2.4.5)
if XGB_AVAILABLE and XGB_WEIGHT > 0:
@ -146,35 +146,36 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
xgb_client = get_client()
xgb_model, xgb_feats = load_or_train_xgb(name, xgb_client, scoring_features, cycle_id)
if xgb_model is not None and xgb_feats is not None:
# XGB peut utiliser un sous-ensemble de features (celles disponibles dans la vue)
xgb_cols = [f for f in xgb_feats if f in unknown_traffic.columns]
X_xgb = unknown_traffic[xgb_cols].replace([np.inf, -np.inf], np.nan).fillna(0)
xgb_probs = xgb_model.predict_proba(X_xgb.values)[:, 1]
# River HAT utilise predict_proba_many(DataFrame), XGBoost utilise predict_proba(ndarray)
if hasattr(xgb_model, 'predict_proba_many'):
proba_df = xgb_model.predict_proba_many(X_xgb[xgb_cols])
xgb_probs = proba_df[1].values if 1 in proba_df.columns else np.zeros(len(X_xgb))
else:
xgb_probs = xgb_model.predict_proba(X_xgb.values)[:, 1]
unknown_traffic['xgb_prob'] = xgb_probs
xgb_model_ref = xgb_model
log_info(f"[{name}] XGBoost : xgb_mean={xgb_probs.mean():.4f}")
log_info(f"[{name}] Supervisé : score moyen={xgb_probs.mean():.4f}")
except Exception as exc:
log_info(f"[{name}] XGBoost scoring échoué : {exc} — EIF+AE seuls.")
log_info(f"[{name}] Supervisé scoring échoué : {exc} — EIF+NF seuls.")
# §8 — Score final via MetaLearner (ou poids fixes en fallback)
# §8 — Score final via MetaLearner MLP (ou poids fixes en fallback)
meta_learner = get_meta_learner(name)
eif_norm_arr = unknown_traffic['anomaly_score'].values.copy()
ae_norm_arr = normalize_scores(-unknown_traffic['ae_recon_error'].values)
xgb_prob_arr = unknown_traffic['xgb_prob'].values
hits_arr = unknown_traffic.get('hits', pd.Series(1, index=unknown_traffic.index)).values
corr_arr = unknown_traffic.get('correlated', pd.Series(0, index=unknown_traffic.index)).values
final_scores = meta_learner.predict(eif_norm_arr, ae_norm_arr, xgb_prob_arr,
hits_arr, corr_arr)
final_scores = meta_learner.predict(eif_norm_arr, ae_norm_arr, xgb_prob_arr)
unknown_traffic['anomaly_score'] = final_scores
if meta_learner.is_trained:
log_info(
f"[{name}] §8 MetaLearner actif ({meta_learner._n_samples} labels) — "
f"[{name}] §8 MetaFusionMLP actif ({meta_learner._n_samples} labels) — "
f"score moyen={final_scores.mean():.4f}"
)
elif unknown_traffic['xgb_prob'].mean() > 0:
log_info(f"[{name}] §8 Poids fixes EIF+AE+XGB (MetaLearner pas encore entraîné).")
log_info(f"[{name}] §8 Poids fixes EIF+NF+XGB (MetaFusionMLP pas encore entraîné).")
# §8 — Entraînement du MetaLearner sur les labels du cycle courant
# (accumulation progressive — activation dès MIN_SAMPLES labels)

View File

@ -10,5 +10,6 @@ torch_geometric>=2.4
FrEIA>=0.2
xgboost>=2.0
cleanlab>=2.6
river>=0.19
pyyaml>=6.0
ja4-common @ file:///app/shared/ja4_common

View File

@ -1,14 +1,14 @@
"""Scoring, dérive, validation, seuil adaptatif, SHAP et clustering.
Regroupe les fonctions de scoring utilisées par le pipeline de détection :
- A1 : détection de dérive conceptuelle (KS-test + KL divergence + dérive adversariale)
- A1 : détection de dérive conceptuelle (ADWIN — fenêtre glissante adaptative)
- A2 : seuil adaptatif basé sur le percentile des scores négatifs
- A4 : explainabilité SHAP (top features contributives)
- A7 : validation de complétude des features
- A8 : clustering HDBSCAN / DBSCAN des anomalies
- A10 : normalisation [0,1] des scores d'anomalie
- §7 : ExIFFI — importance de features par permutation (Extended Isolation Forest)
- §8 : MetaLearner — pondération de l'ensemble par régression logistique
- §8 : MetaLearner — fusion non-linéaire par MLP
"""
import numpy as np
import pandas as pd
@ -67,103 +67,78 @@ def _kl_divergence(p_vals: np.ndarray, q_vals: np.ndarray, n_bins: int = 20) ->
return max(0.0, kl)
def compute_drift_score(baseline_stats: dict, current_baseline: pd.DataFrame,
features: list, *, name: str = '', cycle_id: str = '') -> float:
"""Compare la distribution actuelle de la baseline humaine avec celle de l'entraînement.
class ADWINDriftMonitor:
"""A1 — Détection de dérive conceptuelle par ADWIN (ADaptive WINdowing).
§4 — Méthode améliorée :
- Quantile digest 9 points (p5…p95) au lieu de 5 pour une meilleure fidélité
- KS-test + divergence KL : feature en dérive si KS p<0.05 OU KL>0.5
- Détection de dérive adversariale : >50% des features dérivent dans la même
direction → log_decision 'ADVERSARIAL_DRIFT'
Maintient un détecteur ADWIN par feature. Chaque cycle, la moyenne de
la feature sur le trafic baseline est alimentée au détecteur. ADWIN
ajuste automatiquement la taille de sa fenêtre et signale un changement
lorsque la différence entre sous-fenêtres dépasse la borne de Hoeffding.
Retourne la fraction de features en dérive significative (en [0,1]).
Avantages sur KS + quantile digest :
- Pas de seuil arbitraire (borne de Hoeffding, contrôle par delta)
- Fenêtre adaptative : s'ajuste automatiquement à la vitesse du drift
- Détection en temps réel à chaque cycle, pas de reconstruction de CDF
"""
if not baseline_stats or current_baseline.empty:
return 0.0
try:
from scipy.stats import ks_2samp
_HAS_SCIPY = True
except ImportError:
_HAS_SCIPY = False
# Clés de quantiles : 9 points preferred, 5 points fallback
Q9_KEYS = ['p5', 'p10', 'p25', 'p50', 'p75', 'p90', 'p95']
Q9_PROBS = np.array([0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95])
Q5_KEYS = ['p10', 'p25', 'p50', 'p75', 'p90']
Q5_PROBS = np.array([0.10, 0.25, 0.50, 0.75, 0.90])
def __init__(self, features: list, delta: float = 0.002):
try:
from river.drift import ADWIN
self._detectors = {f: ADWIN(delta=delta) for f in features}
self._available = True
except ImportError:
self._detectors = {}
self._available = False
self._last_changes: dict[str, bool] = {}
drifted = 0
tested = 0
drifted_features: list = []
direction_shifts: list = []
rng = np.random.default_rng(42)
@property
def available(self) -> bool:
return self._available
for feat in features:
if feat not in baseline_stats or feat not in current_baseline.columns:
continue
stats = baseline_stats[feat]
curr_values = current_baseline[feat].dropna()
if len(curr_values) < 30:
continue
trained_std = stats.get('std', 0)
if trained_std < 1e-9:
continue
def update(self, feature_means: dict[str, float]) -> dict[str, bool]:
"""Alimente chaque ADWIN avec la moyenne courante de sa feature.
# Reconstruction de la distribution d'entraînement par interpolation quantile
if all(k in stats for k in Q9_KEYS):
q_probs = Q9_PROBS
q_vals = np.array([stats[k] for k in Q9_KEYS])
elif all(k in stats for k in Q5_KEYS):
q_probs = Q5_PROBS
q_vals = np.array([stats[k] for k in Q5_KEYS])
else:
q_probs = None
Retourne un dict {feature: detected_change} indiquant quelles
features ont dérivé ce cycle.
"""
if not self._available:
return {}
changes = {}
for feat, value in feature_means.items():
if feat in self._detectors:
self._detectors[feat].update(value)
detected = self._detectors[feat].detected_change()
changes[feat] = detected
if detected:
self._last_changes[feat] = True
self._last_changes.update({k: False for k in self._last_changes if k not in changes})
return changes
if q_probs is not None:
u = rng.uniform(0, 1, size=len(curr_values))
synthetic_trained = np.interp(u, q_probs, q_vals)
else:
synthetic_trained = rng.normal(stats['mean'], trained_std, size=len(curr_values))
def check_drift(self, feature_means: dict[str, float],
*, name: str = '', cycle_id: str = '') -> float:
"""Met à jour les ADWIN et retourne le taux de features en dérive.
feat_drifted = False
if _HAS_SCIPY:
_, ks_p = ks_2samp(curr_values.values, synthetic_trained)
if ks_p < 0.05:
feat_drifted = True
else:
# Fallback Z-score
z = abs(curr_values.mean() - stats['mean']) / trained_std
if z > 2.0:
feat_drifted = True
Retourne la fraction de features monitorées ayant dérivé ce cycle,
dans [0, 1]. Si le taux dépasse le seuil de dérive, un retraining
est recommandé.
"""
changes = self.update(feature_means)
if not changes:
return 0.0
# KL divergence comme critère complémentaire au KS
kl = _kl_divergence(curr_values.values, synthetic_trained)
if kl > 0.5:
feat_drifted = True
drifted_features = [f for f, changed in changes.items() if changed]
drift_rate = len(drifted_features) / len(changes)
if feat_drifted:
drifted += 1
drifted_features.append(feat)
# Direction du shift pour la détection adversariale
direction_shifts.append(np.sign(curr_values.mean() - stats['mean']))
tested += 1
drift_rate = drifted / max(tested, 1)
# Détection de dérive adversariale : >50% des features dérivent simultanément
# dans la même direction → signe d'une manipulation intentionnelle de la distribution
if drift_rate > 0.50 and len(direction_shifts) >= 10:
consensus = abs(float(np.mean(direction_shifts)))
if consensus >= 0.8:
# Détection adversariale : dérive massive simultanée
if drift_rate > 0.50 and len(drifted_features) >= 10:
log_decision('ADVERSARIAL_DRIFT', cycle_id, name, {
'drift_rate': round(drift_rate, 3),
'consensus': round(consensus, 3),
'n_features': len(drifted_features),
'n_drifted': len(drifted_features),
'n_monitored': len(changes),
'top_drifted': drifted_features[:10],
})
return drift_rate
return drift_rate
# ═══════════════════════════════════════════════════════════════════════════════
@ -445,42 +420,101 @@ def compute_ae_feature_errors(ae_model, X: pd.DataFrame, features: list,
# ═══════════════════════════════════════════════════════════════════════════════
# §8 — META-LEARNER : PONDÉRATION APPRISE DE L'ENSEMBLE
# §8 — META-LEARNER : FUSION NON-LINÉAIRE PAR MLP
# ═══════════════════════════════════════════════════════════════════════════════
class MetaLearner:
"""§8 — Méta-learner (régression logistique) pour la pondération de l'ensemble.
Remplace les poids fixes (1XGB_W)×((1AE_W)×eif + AE_W×ae) + XGB_W×xgb
par une régression logistique entraînée sur les labels SOC/Anubis/bots connus.
class MetaFusionMLP(torch.nn.Module):
"""MLP de fusion pour le méta-modèle de stacking.
Formule apprise :
P(bot) = logistic(w1×eif_norm + w2×ae_norm + w3×xgb_prob
+ w4×log1p(hits) + w5×correlated + bias)
Fallback automatique aux poids fixes quand < MIN_SAMPLES labels disponibles.
Architecture : Linear(3, 16) → BatchNorm → ReLU → Dropout → Linear(16, 1) → Sigmoid
Input : [eif_norm, nf_norm, xgb_prob] — 3 scores intermédiaires dans [0, 1].
Output : P(bot) ∈ [0, 1].
"""
MIN_SAMPLES = 1000 # Nombre minimal de labels pour activer le méta-learner
def __init__(self):
self._clf = None
super().__init__()
self.net = torch.nn.Sequential(
torch.nn.Linear(3, 16),
torch.nn.BatchNorm1d(16),
torch.nn.ReLU(),
torch.nn.Dropout(0.2),
torch.nn.Linear(16, 1),
torch.nn.Sigmoid(),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.net(x).squeeze(-1)
class MetaLearner:
"""§8 — Méta-learner (MLP) pour la fusion non-linéaire de l'ensemble.
Remplace les poids fixes et la régression logistique par un MLP PyTorch
capable de capturer des interactions non-linéaires entre les trois voix
(EIF, Normalizing Flow, XGBoost).
Stacking OOF (Out-of-Fold) : le MLP est entraîné sur les prédictions
des modèles de base via validation croisée temporelle, évitant le
surapprentissage sur les mêmes données d'entraînement.
Fallback automatique aux poids fixes quand meta_mlp.pt n'existe pas.
"""
MIN_SAMPLES = 50 # Le MLP régularisé (BN + Dropout) gère le peu de données
PATIENCE = 5 # Early stopping patience
MAX_EPOCHS = 50
def __init__(self, model_suffix: str = ''):
import torch
self._mlp: MetaFusionMLP | None = None
self._n_samples: int = 0
self._feature_names = ['eif_norm', 'ae_norm', 'xgb_prob', 'log_hits', 'correlated']
self._weights_log: dict = {} # Pour la transparence SOC
self._weights_log: dict = {}
self._model_path = os.path.join(
os.getenv('MODEL_DIR', '/var/lib/bot_detector'),
f'meta_mlp{model_suffix}.pt',
)
# Tenter de charger un modèle pré-entraîné existant
self._try_load()
# ── Sauvegarde / Chargement ──────────────────────────────────────────
def _try_load(self) -> None:
"""Charge le MLP depuis le disque si le fichier existe."""
if not os.path.exists(self._model_path):
return
try:
mlp = MetaFusionMLP()
state = torch.load(self._model_path, map_location='cpu', weights_only=True)
mlp.load_state_dict(state)
mlp.eval()
self._mlp = mlp
log_info(f"[MetaLearner] MLP chargé depuis {self._model_path}")
except Exception as e:
log_info(f"[MetaLearner] Échec chargement MLP ({e}) — fallback poids fixes")
self._mlp = None
def _save(self) -> None:
"""Persiste le MLP entraîné sur le disque."""
if self._mlp is None:
return
try:
os.makedirs(os.path.dirname(self._model_path), exist_ok=True)
torch.save(self._mlp.state_dict(), self._model_path)
except Exception as e:
log_info(f"[MetaLearner] Échec sauvegarde MLP ({e})")
# ── Entraînement ─────────────────────────────────────────────────────
def fit(self, df: pd.DataFrame) -> bool:
"""Entraîne le méta-learner sur un DataFrame de sessions labelisées.
"""Entraîne le MLP sur un DataFrame de sessions labelisées.
Colonnes requises : eif_norm, ae_norm (ou 0), xgb_prob (ou 0),
hits, correlated, label (0=normal, 1=bot).
Colonnes requises : eif_norm, ae_norm (ou 0), xgb_prob (ou 0), label.
Boucle PyTorch : Adam, BCELoss, 50 epochs max, early stopping (patience=5).
Retourne True si l'entraînement a réussi.
"""
try:
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler as _SS
except ImportError:
return False
import torch
import torch.nn as nn
required = {'eif_norm', 'label'}
if not required.issubset(df.columns):
@ -490,69 +524,118 @@ class MetaLearner:
if len(df) < self.MIN_SAMPLES:
return False
X_meta = pd.DataFrame({
'eif_norm': df['eif_norm'].clip(0, 1),
'ae_norm': df.get('ae_norm', pd.Series(0.0, index=df.index)).clip(0, 1),
'xgb_prob': df.get('xgb_prob', pd.Series(0.0, index=df.index)).clip(0, 1),
'log_hits': np.log1p(df.get('hits', pd.Series(1, index=df.index)).clip(1)),
'correlated': df.get('correlated', pd.Series(0, index=df.index)).clip(0, 1),
}).fillna(0)
y = df['label'].astype(int)
# Construction des 3 features d'entrée
X_np = np.column_stack([
df['eif_norm'].clip(0, 1).values,
df.get('ae_norm', pd.Series(0.0, index=df.index)).clip(0, 1).values,
df.get('xgb_prob', pd.Series(0.0, index=df.index)).clip(0, 1).values,
]).astype(np.float32)
y_np = df['label'].astype(int).values.astype(np.float32)
scaler = _SS()
X_scaled = scaler.fit_transform(X_meta)
clf = LogisticRegression(max_iter=500, C=1.0, random_state=42)
clf.fit(X_scaled, y)
X_t = torch.tensor(X_np)
y_t = torch.tensor(y_np)
# Enregistrer les poids pour la transparence SOC
coefs = dict(zip(self._feature_names, clf.coef_[0].tolist()))
self._clf = (clf, scaler)
# Split train/validation (20% dernier = validation)
n = len(X_t)
split = max(1, int(n * 0.8))
X_train, X_val = X_t[:split], X_t[split:]
y_train, y_val = y_t[:split], y_t[split:]
if len(X_train) < 10 or len(X_val) < 5:
# Pas assez pour un split fiable — entraîner sur tout
X_train, X_val = X_t, X_t
y_train, y_val = y_t, y_t
mlp = MetaFusionMLP()
optimizer = torch.optim.Adam(mlp.parameters(), lr=1e-3, weight_decay=1e-4)
criterion = nn.BCELoss()
best_val_loss = float('inf')
best_state = None
patience_counter = 0
for epoch in range(self.MAX_EPOCHS):
# ── Train step ──
mlp.train()
optimizer.zero_grad()
pred = mlp(X_train)
loss = criterion(pred, y_train)
loss.backward()
optimizer.step()
# ── Validation step ──
mlp.eval()
with torch.no_grad():
val_pred = mlp(X_val)
val_loss = criterion(val_pred, y_val).item()
if val_loss < best_val_loss:
best_val_loss = val_loss
best_state = {k: v.clone() for k, v in mlp.state_dict().items()}
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= self.PATIENCE:
break
# Restaurer les meilleurs poids
if best_state is not None:
mlp.load_state_dict(best_state)
mlp.eval()
self._mlp = mlp
self._n_samples = len(df)
self._save()
# Log pour la transparence SOC
layer1_w = mlp.net[0].weight.data.numpy()
self._weights_log = {
'coefs': {k: round(v, 4) for k, v in coefs.items()},
'intercept': round(float(clf.intercept_[0]), 4),
'type': 'MetaFusionMLP',
'n_samples': self._n_samples,
'epochs_run': epoch + 1,
'best_val_loss': round(best_val_loss, 6),
'layer1_weight_norm': round(float(np.linalg.norm(layer1_w)), 4),
}
log_info(f"[MetaLearner] Entraîné sur {self._n_samples} labels — coefs: {coefs}")
log_info(
f"[MetaLearner] MLP entraîné sur {self._n_samples} labels — "
f"epochs={epoch + 1}, val_loss={best_val_loss:.6f}"
)
return True
# ── Inférence ────────────────────────────────────────────────────────
def predict(self, eif_norm: np.ndarray, ae_norm: np.ndarray,
xgb_prob: np.ndarray, hits: np.ndarray = None,
correlated: np.ndarray = None) -> np.ndarray:
"""Prédit P(bot) avec le méta-learner ou les poids fixes en fallback.
"""Prédit P(bot) avec le MLP ou les poids fixes en fallback.
Seules les 3 features principales (eif, nf, xgb) sont utilisées par le MLP.
Les arguments hits/correlated sont ignorés par le MLP mais conservés
pour la compatibilité du fallback.
Retourne un array de probabilités dans [0, 1].
"""
n = len(eif_norm)
if hits is None:
hits = np.ones(n)
if correlated is None:
correlated = np.zeros(n)
if self.is_trained:
clf, scaler = self._clf
X_meta = np.column_stack([
np.clip(eif_norm, 0, 1),
np.clip(ae_norm, 0, 1),
np.clip(xgb_prob, 0, 1),
np.log1p(np.clip(hits, 1, None)),
np.clip(correlated, 0, 1),
])
try:
X_scaled = scaler.transform(X_meta)
return clf.predict_proba(X_scaled)[:, 1]
X_np = np.column_stack([
np.clip(eif_norm, 0, 1),
np.clip(ae_norm, 0, 1),
np.clip(xgb_prob, 0, 1),
]).astype(np.float32)
with torch.no_grad():
probs = self._mlp(torch.tensor(X_np)).numpy()
return probs
except Exception as e:
log_info(f"[MetaLearner] Prédiction échouée ({e}) — fallback poids fixes")
log_info(f"[MetaLearner] MLP prédiction échouée ({e}) — fallback poids fixes")
# Fallback : formule linéaire avec poids fixes
# Fallback : formule linéaire avec poids fixes (cold start)
_ae_w = AE_WEIGHT
_xgb_w = XGB_WEIGHT
return (1 - _xgb_w) * ((1 - _ae_w) * eif_norm + _ae_w * ae_norm) + _xgb_w * xgb_prob
@property
def is_trained(self) -> bool:
"""Vrai si le méta-learner est actif (assez de labels pour être fiable)."""
return self._clf is not None and self._n_samples >= self.MIN_SAMPLES
"""Vrai si le MLP est actif (modèle chargé ou entraîné)."""
return self._mlp is not None
def build_labels_from_df(self, df: pd.DataFrame) -> pd.DataFrame:
"""Construit les labels supervisés pour l'entraînement du méta-learner.
@ -575,9 +658,9 @@ class MetaLearner:
return pd.DataFrame(labeled)
# Singleton partagé entre les modèles Complet et Applicatif
_meta_learner_complet = MetaLearner()
_meta_learner_applicatif = MetaLearner()
# Singletons partagés entre les modèles Complet et Applicatif
_meta_learner_complet = MetaLearner(model_suffix='_complet')
_meta_learner_applicatif = MetaLearner(model_suffix='_applicatif')
def get_meta_learner(name: str) -> MetaLearner: