Files
Jacquin Antoine f88b739992 feat(e2e): add distributed E2E test framework with parametric traffic generation
Add run-e2e-test.sh with CLI parameters (--hits, --http-ratio, --dns, --tls,
--src-ips, --keep-analysis, --up) for configurable traffic generation. Traffic
runs from VM endpoints with multiple source IPs (alias IPs on eth0) to produce
distinct sessions for the ML pipeline. Fix curl TLS flags (--tlsv1.2 instead
of --tls-v1-2), skip redundant local verification in distributed mode, and
fix dashboard is_available() cache that never retried after ClickHouse recovery.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-15 00:09:32 +02:00

656 lines
30 KiB
Python

"""Gestion des modèles : chargement, entraînement, versionnement.
IsolationForest (EIF), Normalizing Flow (PyTorch/FrEIA), Hoeffding Adaptive Tree (River).
"""
import os
import json
import glob
import pickle
import joblib
import numpy as np
import pandas as pd
from datetime import datetime
from .config import (
MODEL_DIR, MODEL_HISTORY_COUNT, RETRAIN_INTERVAL_H, DRIFT_THRESHOLD,
N_ESTIMATORS, CONTAMINATION, ANOMALY_THRESHOLD, AE_WEIGHT, AE_EPOCHS, AE_LATENT_DIM,
AE_LEARNING_RATE, XGB_WEIGHT, XGB_MIN_LABELS, XGB_RETRAIN_INTERVAL_H,
EIF_AVAILABLE, TORCH_AVAILABLE, XGB_AVAILABLE, DB,
IsolationForest, StandardScaler,
)
from .log import log_info, log_decision, append_training_history
from .scoring import ADWINDriftMonitor
# Imports conditionnels depuis config (déjà importés une seule fois)
if EIF_AVAILABLE:
from .config import ExtendedIsolationForest
if TORCH_AVAILABLE:
from .config import torch, nn
try:
from river import forest as river_forest
RIVER_AVAILABLE = True
except ImportError:
RIVER_AVAILABLE = False
if XGB_AVAILABLE:
import xgboost as xgb
from sklearn.model_selection import cross_val_predict
try:
from cleanlab.filter import find_label_issues
CLEANLAB_AVAILABLE = True
except ImportError:
CLEANLAB_AVAILABLE = False
else:
CLEANLAB_AVAILABLE = False
# ─── Caches de modèles ─────────────────────────────────────────────────────
_model_cache: dict = {}
_xgb_cache: dict = {}
_drift_monitors: dict[str, ADWINDriftMonitor] = {}
# ═══════════════════════════════════════════════════════════════════════════════
# GESTION DES MODÈLES
# ═══════════════════════════════════════════════════════════════════════════════
def _current_pointer_path(name: str) -> str:
"""Retourne le chemin du fichier pointeur vers la version courante du modèle ``name``."""
return os.path.join(MODEL_DIR, f'model_{name}.current')
def _get_current_version(name: str):
"""Lit le fichier pointeur et retourne (chemin_modèle, métadonnées) ou (None, None) si absent."""
pointer = _current_pointer_path(name)
if not os.path.exists(pointer): return None, None
with open(pointer) as f: version_id = f.read().strip()
model_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.joblib')
meta_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.meta.json')
if not os.path.exists(model_path) or not os.path.exists(meta_path): return None, None
with open(meta_path) as f: meta = json.load(f)
return model_path, meta
def _purge_old_versions(name: str):
"""Supprime les versions excédentaires du modèle ``name`` en ne conservant que MODEL_HISTORY_COUNT fichiers."""
pattern = os.path.join(MODEL_DIR, f'model_{name}_*.joblib')
versions = sorted(glob.glob(pattern))
to_delete = versions[:-MODEL_HISTORY_COUNT] if len(versions) > MODEL_HISTORY_COUNT else []
for joblib_path in to_delete:
version_id = os.path.basename(joblib_path).replace(f'model_{name}_', '').replace('.joblib', '')
meta_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.meta.json')
os.remove(joblib_path)
if os.path.exists(meta_path): os.remove(meta_path)
log_info(f"[{name}] Version purgée : {version_id} (limite={MODEL_HISTORY_COUNT})")
# ═══════════════════════════════════════════════════════════════════════════════
# AUTOENCODER — Second scorer parallèle (détection d'anomalies par reconstruction)
# ═══════════════════════════════════════════════════════════════════════════════
class TrafficNormalizingFlow:
"""Normalizing Flow (RealNVP) pour détection d'anomalies par vraisemblance.
Architecture : 4 blocs de couplage affine (AllInOneBlock), sous-réseaux MLP
(2 couches, 64 neurones, ReLU). L'espace latent = input_dim (pas de bottleneck).
Score d'anomalie = -log p(x), estimé via le changement de variable.
L'espace latent peut servir de features compressées pour HDBSCAN.
"""
def __init__(self, n_features: int, latent_dim: int = 0):
if not TORCH_AVAILABLE:
raise RuntimeError("PyTorch non disponible — Normalizing Flow désactivé.")
try:
import FrEIA.framework as Ff
import FrEIA.modules as Fm
except ImportError:
raise RuntimeError("FrEIA non disponible — installer : pip install FrEIA")
self.n_features = n_features
self.device = torch.device('cpu')
self._build_model()
self._scaler_min = None
self._scaler_range = None
def _subnet_fc(self, c_in, c_out):
"""Sous-réseau MLP pour les blocs de couplage (2 couches, 64 neurones)."""
return nn.Sequential(
nn.Linear(c_in, 64), nn.ReLU(),
nn.Linear(64, 64), nn.ReLU(),
nn.Linear(64, c_out),
)
def _build_model(self):
import FrEIA.framework as Ff
import FrEIA.modules as Fm
nodes = [Ff.InputNode(self.n_features, name='input')]
for i in range(4):
nodes.append(Ff.Node(
nodes[-1],
Fm.AllInOneBlock,
{'subnet_constructor': self._subnet_fc, 'affine_clamping': 2.0},
name=f'coupling_{i}',
))
nodes.append(Ff.OutputNode(nodes[-1], name='output'))
self.flow = Ff.GraphINN(nodes, verbose=False).to(self.device)
def _to_tensor(self, X: np.ndarray) -> 'torch.Tensor':
"""Normalise [0,1] via min-max puis convertit en Tensor."""
if self._scaler_min is not None:
X_norm = (X - self._scaler_min) / (self._scaler_range + 1e-9)
else:
X_norm = X
return torch.tensor(np.clip(X_norm, 0, 1), dtype=torch.float32, device=self.device)
def log_likelihood(self, x: 'torch.Tensor') -> 'torch.Tensor':
"""Calcule log p(x) = log p_z(f(x)) + log|det J_f(x)|."""
z, log_det = self.flow(x)
log_pz = -0.5 * (z ** 2).sum(dim=1) - 0.5 * self.n_features * np.log(2 * np.pi)
return log_pz + log_det
def fit(self, X: np.ndarray, epochs: int = AE_EPOCHS, lr: float = AE_LEARNING_RATE,
batch_size: int = 256) -> dict:
"""Entraîne le Normalizing Flow sur la baseline humaine (données normales)."""
self._scaler_min = X.min(axis=0)
self._scaler_range = X.max(axis=0) - self._scaler_min
X_t = self._to_tensor(X)
dataset = torch.utils.data.TensorDataset(X_t)
loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
optimizer = torch.optim.Adam(self.flow.parameters(), lr=lr, weight_decay=1e-5)
self.flow.train()
losses = []
for epoch in range(epochs):
epoch_loss = 0.0
for (batch,) in loader:
log_p = self.log_likelihood(batch)
loss = -log_p.mean() # NLL
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch_loss += loss.item() * len(batch)
losses.append(epoch_loss / len(X_t))
return {'final_loss': losses[-1], 'epochs': epochs, 'n_samples': len(X)}
def score_samples(self, X: np.ndarray) -> np.ndarray:
"""Retourne -log p(x) par échantillon (plus élevé = plus anomal)."""
self.flow.eval()
X_t = self._to_tensor(X)
with torch.no_grad():
return -self.log_likelihood(X_t).numpy()
def encode(self, X: np.ndarray) -> np.ndarray:
"""Retourne l'espace latent z = f(x) (pour HDBSCAN clustering)."""
self.flow.eval()
X_t = self._to_tensor(X)
with torch.no_grad():
z, _ = self.flow(X_t)
return z.numpy()
def state_dict(self) -> dict:
return {
'flow': self.flow.state_dict(),
'scaler_min': self._scaler_min,
'scaler_range': self._scaler_range,
'n_features': self.n_features,
}
@classmethod
def load_state_dict(cls, state: dict) -> 'TrafficNormalizingFlow':
nf = cls(state['n_features'])
nf._scaler_min = state['scaler_min']
nf._scaler_range = state['scaler_range']
nf.flow.load_state_dict(state['flow'])
return nf
class NFEnsemble:
"""Deep Ensemble de M=5 Normalizing Flows pour quantification d'incertitude.
Chaque membre est un TrafficNormalizingFlow indépendant, entraîné sur un
échantillon bootstrap (avec remise) de la baseline humaine. L'incertitude
(variance inter-modèles) discrimine la dérive organique (variance faible,
les modèles s'accordent) de la dérive adversariale (variance élevée, les
modèles ne s'accordent pas sur la nouveauté).
Référence : Lakshminarayanan et al., 2017 — "Simple and Scalable Predictive
Uncertainty Estimation using Deep Ensembles" (NeurIPS).
"""
ENSEMBLE_SIZE = 5
def __init__(self, n_features: int):
if not TORCH_AVAILABLE:
raise RuntimeError("PyTorch non disponible — NFEnsemble désactivé.")
self.n_features = n_features
self.models = [TrafficNormalizingFlow(n_features) for _ in range(self.ENSEMBLE_SIZE)]
def fit(self, X: np.ndarray, epochs: int = AE_EPOCHS, lr: float = AE_LEARNING_RATE,
batch_size: int = 256) -> dict:
"""Entraîne les M modèles sur des échantillons bootstrapés (avec remise)."""
n = len(X)
all_losses = []
for i, nf in enumerate(self.models):
idx = np.random.choice(n, size=n, replace=True)
X_boot = X[idx]
stats = nf.fit(X_boot, epochs=epochs, lr=lr, batch_size=batch_size)
all_losses.append(stats['final_loss'])
return {
'final_losses': all_losses,
'mean_loss': float(np.mean(all_losses)),
'ensemble_size': self.ENSEMBLE_SIZE,
'n_samples': n,
}
def predict_anomalies(self, X: np.ndarray) -> tuple:
"""Retourne (mean_score, uncertainty_score) — tuple de np.ndarray.
mean_score : moyenne des -log p(x) sur les M modèles.
uncertainty_score : variance des -log p(x) sur les M modèles.
"""
scores = np.stack([nf.score_samples(X) for nf in self.models], axis=0)
return scores.mean(axis=0), scores.var(axis=0)
def score_samples(self, X: np.ndarray) -> np.ndarray:
"""Compatibilité : retourne mean_score seul (comme TrafficNormalizingFlow)."""
mean, _ = self.predict_anomalies(X)
return mean
def encode(self, X: np.ndarray) -> np.ndarray:
"""Espace latent moyen sur l'ensemble."""
latents = np.stack([nf.encode(X) for nf in self.models], axis=0)
return latents.mean(axis=0)
def state_dict(self) -> dict:
return {
'ensemble_size': self.ENSEMBLE_SIZE,
'n_features': self.n_features,
'members': [nf.state_dict() for nf in self.models],
}
@classmethod
def load_state_dict(cls, state: dict) -> 'NFEnsemble':
ens = cls(state['n_features'])
for i, member_state in enumerate(state['members']):
ens.models[i] = TrafficNormalizingFlow.load_state_dict(member_state)
return ens
def _ae_model_path(name: str, version_id: str) -> str:
return os.path.join(MODEL_DIR, f'ae_{name}_{version_id}.pt')
# ═══════════════════════════════════════════════════════════════════════════════
# XGBOOST — Troisième voix supervisée (labels historiques + feedback SOC)
# ═══════════════════════════════════════════════════════════════════════════════
def _xgb_model_path(name: str) -> str:
return os.path.join(MODEL_DIR, f'xgb_{name}.json')
def _xgb_meta_path(name: str) -> str:
return os.path.join(MODEL_DIR, f'xgb_{name}.meta.json')
def _load_xgb_labels(client, features: list, min_labels: int = XGB_MIN_LABELS) -> tuple:
"""Charge les labels historiques depuis ml_all_scores + view_ai_features_1h.
Les labels (threat_level) viennent de ml_all_scores, les features de
view_ai_features_1h via une jointure sur (src_ip, ja4, host).
Les features absentes de la vue (ex: thesis §5 features) sont ignorées.
Positifs : threat_level IN ('HIGH', 'CRITICAL', 'ANUBIS_DENY', 'KNOWN_BOT') → label=1
Négatifs : threat_level IN ('NORMAL', 'LEGITIMATE_BROWSER') → label=0
Retourne (X, y, usable_features) ou (None, None, None) si insuffisant.
"""
try:
# Découvrir les colonnes disponibles dans la vue
cols_result = client.query(
f"SELECT name FROM system.columns "
f"WHERE database = '{DB}' AND table = 'view_ai_features_1h'"
)
available_cols = {row[0] for row in cols_result.result_rows} if cols_result.result_rows else set()
usable_features = [f for f in features if f in available_cols]
if len(usable_features) < 10:
log_info(f"[XGB] Seulement {len(usable_features)} features disponibles dans view_ai_features_1h — insuffisant.")
return None, None, None
feature_cols = ', '.join(f'f.{c}' for c in usable_features)
result = client.query(
f"SELECT {feature_cols}, s.threat_level "
f"FROM {DB}.ml_all_scores AS s "
f"INNER JOIN {DB}.view_ai_features_1h AS f "
f" ON s.src_ip = f.src_ip AND s.ja4 = f.ja4 AND s.host = f.host "
f"WHERE s.threat_level IN ('NORMAL', 'LEGITIMATE_BROWSER', 'HIGH', 'CRITICAL', 'ANUBIS_DENY', 'KNOWN_BOT') "
f"AND s.window_start >= now() - INTERVAL 7 DAY "
f"ORDER BY rand() LIMIT 50000"
)
if not result.result_rows:
return None, None, None
cols = usable_features + ['threat_level']
df = pd.DataFrame(result.result_rows, columns=cols)
df[usable_features] = df[usable_features].apply(pd.to_numeric, errors='coerce')
df = df.replace([np.inf, -np.inf], np.nan).dropna(subset=usable_features)
y = (~df['threat_level'].isin(['NORMAL', 'LEGITIMATE_BROWSER'])).astype(int)
if y.sum() < 10 or len(y) < min_labels:
return None, None, None
X = df[usable_features].values
return X, y.values, usable_features
except Exception as exc:
log_info(f"[XGB] Erreur chargement labels : {exc}")
return None, None, None
def load_or_train_xgb(name: str, client, features: list, cycle_id: str):
"""Charge ou met à jour le modèle supervisé en ligne (Hoeffding Adaptive Tree).
Remplace le XGBClassifier hebdomadaire par un HoeffdingAdaptiveTreeClassifier
de River, mis à jour incrémentalement à chaque cycle via learn_one().
Retourne (model, list[str] features) ou (None, None) si indisponible.
Le model retourné expose predict_proba_many(df) → DataFrame.
"""
if not (XGB_AVAILABLE or RIVER_AVAILABLE) or XGB_WEIGHT <= 0:
return None, None
model_path = _river_model_path(name)
meta_path = _xgb_meta_path(name)
# Charger le modèle River existant
model = None
xgb_features = features
n_seen = 0
if os.path.exists(model_path):
try:
with open(model_path, 'rb') as f:
model = pickle.load(f)
with open(meta_path) as f:
meta = json.load(f)
xgb_features = meta.get('features', features)
n_seen = meta.get('n_total_labels', 0)
log_info(f"[River][{name}] HAT rechargé ({n_seen} labels cumulés, {len(xgb_features)} features).")
except Exception as exc:
log_info(f"[River][{name}] Erreur chargement : {exc} — nouveau modèle.")
model = None
# Créer un nouveau modèle si nécessaire
if model is None:
try:
model = river_forest.HoeffdingAdaptiveTreeClassifier(
grace_period=50, max_depth=12, seed=42,
)
except Exception:
# Fallback vers XGBoost batch si River indisponible
return _load_or_train_xgb_batch(name, client, features, cycle_id)
# ── Apprentissage incrémental sur les labels du cycle ──────────────
X, y, usable_features = _load_xgb_labels(client, features)
if X is not None and usable_features is not None:
xgb_features = usable_features
X_df = pd.DataFrame(X, columns=xgb_features)
n_new = 0
for i in range(len(X_df)):
try:
x_dict = {col: float(X_df.iloc[i][col]) for col in xgb_features}
model.learn_one(x_dict, int(y[i]))
n_new += 1
except Exception:
continue
n_seen += n_new
# Persister le modèle mis à jour
os.makedirs(os.path.dirname(model_path), exist_ok=True)
with open(model_path, 'wb') as f:
pickle.dump(model, f)
meta = {
'trained_at': datetime.now().isoformat(),
'n_total_labels': n_seen,
'n_new_labels': n_new,
'n_features': len(xgb_features),
'features': xgb_features,
'model_name': name,
'algorithm': 'HoeffdingAdaptiveTreeClassifier',
}
with open(meta_path, 'w') as f:
json.dump(meta, f, indent=2)
log_info(f"[River][{name}] +{n_new} labels incrémentaux ({n_seen} total) — HAT mis à jour.")
log_decision('RIVER_UPDATED', cycle_id, name, meta)
else:
if n_seen == 0:
log_info(f"[River][{name}] Pas de labels — modèle supervisé désactivé ce cycle.")
return None, None
log_info(f"[River][{name}] Pas de nouveaux labels — HAT existant réutilisé ({n_seen} labels).")
return model, xgb_features
def _river_model_path(name: str) -> str:
"""Chemin du modèle River sérialisé."""
return os.path.join(MODEL_DIR, f'river_hat_{name}.pkl')
def _load_or_train_xgb_batch(name, client, features, cycle_id):
"""Fallback : entraîne un XGBoost classique si River est indisponible.
Conservé pour la compatibilité si river n'est pas installé.
Retourne (XGBClassifier, list[str] features) ou (None, None).
"""
if not XGB_AVAILABLE or XGB_WEIGHT <= 0:
return None, None
model_path = _xgb_model_path(name)
meta_path = _xgb_meta_path(name)
if os.path.exists(model_path) and os.path.exists(meta_path):
try:
with open(meta_path) as f:
meta = json.load(f)
model = xgb.XGBClassifier()
model.load_model(model_path)
return model, meta.get('features', features)
except Exception:
pass
X, y, xgb_features = _load_xgb_labels(client, features)
if X is None:
return None, None
scale_pos = max(1, int((y == 0).sum() / max((y == 1).sum(), 1)))
model = xgb.XGBClassifier(
n_estimators=200, max_depth=6, learning_rate=0.1,
scale_pos_weight=scale_pos, eval_metric='logloss',
random_state=42, n_jobs=-1, tree_method='hist',
)
model.fit(X, y, verbose=False)
model.save_model(model_path)
meta = {
'trained_at': datetime.now().isoformat(),
'n_labels': len(y), 'n_positive': int(y.sum()),
'n_negative': int((y == 0).sum()), 'n_features': len(xgb_features),
'features': xgb_features, 'model_name': name,
}
with open(meta_path, 'w') as f:
json.dump(meta, f, indent=2)
log_decision('XGB_TRAINED', cycle_id, name, meta)
return model, xgb_features
def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list, cycle_id: str):
"""Charge le modèle IsolationForest existant ou en entraîne un nouveau si nécessaire.
Réutilise le modèle si son âge est inférieur à RETRAIN_INTERVAL_H et si aucune
dérive conceptuelle significative n'est détectée (A1). En cas d'expiration ou de
dérive, entraîne un nouveau modèle sur ``human_baseline``, le sérialise sur disque,
met à jour le fichier pointeur et purge les anciennes versions.
Retourne (IsolationForest, NFEnsemble|None, list[str] features).
"""
model_path, meta = _get_current_version(name)
if model_path and meta:
trained_at = datetime.fromisoformat(meta['trained_at'])
age_h = (datetime.now() - trained_at).total_seconds() / 3600
age_ok = age_h < RETRAIN_INTERVAL_H
# A1 — Dérive conceptuelle via ADWIN (fenêtre glissante adaptative)
drift_score = 0.0
drift_forced = False
# Obtenir ou créer le moniteur ADWIN pour ce modèle
if name not in _drift_monitors:
_drift_monitors[name] = ADWINDriftMonitor(features)
drift_monitor = _drift_monitors[name]
if drift_monitor.available:
# Alimenter ADWIN avec les moyennes de features du cycle courant
feature_means = {}
for f in features:
if f in human_baseline.columns:
feature_means[f] = float(human_baseline[f].mean())
drift_score = drift_monitor.check_drift(feature_means, name=name, cycle_id=cycle_id)
if drift_score >= DRIFT_THRESHOLD:
drift_forced = True
log_info(f"[{name}] Dérive ADWIN détectée ({drift_score:.0%} features) — retraining forcé.")
log_decision('DRIFT_DETECTED', cycle_id, name, {
'version_id': meta['version_id'], 'drift_rate': round(drift_score, 3),
'drift_threshold': DRIFT_THRESHOLD, 'model_age_hours': round(age_h, 2)
})
if age_ok and not drift_forced:
log_info(f"[{name}] Modèle v{meta['version_id']} valide ({age_h:.1f}h / {RETRAIN_INTERVAL_H}h, drift={drift_score:.0%}) — réutilisation.")
log_decision('MODEL_LOADED', cycle_id, name, {
'version_id': meta['version_id'], 'model_age_hours': round(age_h, 2),
'trained_at': meta['trained_at'], 'human_samples': meta.get('human_samples', '?'),
'retrain_in_hours': round(RETRAIN_INTERVAL_H - age_h, 1), 'drift_score': round(drift_score, 3)
})
ae_loaded = None
if TORCH_AVAILABLE and AE_WEIGHT > 0:
ae_path = _ae_model_path(name, meta['version_id'])
if os.path.exists(ae_path):
try:
ae_loaded = NFEnsemble.load_state_dict(torch.load(ae_path, weights_only=False))
log_info(f"[{name}] NFEnsemble v{meta['version_id']} rechargé (M={NFEnsemble.ENSEMBLE_SIZE}).")
except Exception as exc:
log_info(f"[{name}] Erreur chargement AE : {exc} — AE désactivé ce cycle.")
return joblib.load(model_path), ae_loaded, meta.get('features', features)
elif not drift_forced:
log_info(f"[{name}] Modèle v{meta['version_id']} expiré ({age_h:.1f}h ≥ {RETRAIN_INTERVAL_H}h) — retraining.")
version_id = datetime.now().strftime('%Y%m%d_%H%M%S')
log_info(f"[{name}] Entraînement EIF v{version_id}{len(human_baseline)} sessions ISP, {len(features)} features, contamination={CONTAMINATION}")
X = human_baseline[features].replace([np.inf, -np.inf], np.nan)
X = X.fillna(X.median())
# Feature pruning : retirer les features à variance quasi-nulle (inutiles pour les arbres)
PRUNE_VARIANCE_THRESHOLD = float(os.getenv('PRUNE_VARIANCE_THRESHOLD', '1e-6'))
feature_variances = X.var()
low_var_features = feature_variances[feature_variances < PRUNE_VARIANCE_THRESHOLD].index.tolist()
if low_var_features:
log_info(f"[{name}] Élagage : {len(low_var_features)} feature(s) à variance < {PRUNE_VARIANCE_THRESHOLD} retirées : {low_var_features}")
X = X.drop(columns=low_var_features)
features = [f for f in features if f not in low_var_features]
log_decision('FEATURE_PRUNED', cycle_id, name, {'pruned': low_var_features, 'remaining': len(features)})
# Validation split : réserver 20% pour évaluation offline
val_size = max(1, int(len(X) * 0.2))
X_train = X.iloc[:-val_size]
X_val = X.iloc[-val_size:]
if EIF_AVAILABLE:
model = ExtendedIsolationForest(
ntrees=300, ndim=min(3, len(features)),
sample_size='auto', missing_action='impute',
random_seed=42, nthreads=-1
)
else:
model = IsolationForest(n_estimators=300, contamination=CONTAMINATION, random_state=42, n_jobs=-1)
model.fit(X_train)
# Évaluation offline : score moyen sur la validation (devrait être > 0 pour du trafic humain sklearn)
val_scores = model.decision_function(X_val)
# Unifier la convention : négatif = anomal (isotree: 0.5 - score)
if EIF_AVAILABLE:
val_scores = 0.5 - val_scores
val_mean_score = float(np.mean(val_scores))
val_anomaly_rate = float(np.mean(val_scores < 0))
log_info(f"[{name}] Validation : score moyen={val_mean_score:.4f}, taux anomalie={val_anomaly_rate:.2%} ({len(X_val)} échantillons)")
# GATE CONDITION : rejeter le modèle si la baseline semble contaminée
VAL_ANOMALY_GATE = float(os.getenv('VAL_ANOMALY_GATE', '0.20'))
if val_anomaly_rate > VAL_ANOMALY_GATE:
log_info(f"[{name}] ⚠ REJET : val_anomaly_rate={val_anomaly_rate:.2%} > gate={VAL_ANOMALY_GATE:.0%} — baseline probablement contaminée.")
log_decision('MODEL_REJECTED', cycle_id, name, {
'val_anomaly_rate': round(val_anomaly_rate, 4), 'gate': VAL_ANOMALY_GATE,
'val_mean_score': round(val_mean_score, 4), 'version_id': version_id,
})
# Tenter de réutiliser le modèle précédent
if model_path and os.path.exists(model_path):
log_info(f"[{name}] Conservation du modèle précédent v{meta.get('version_id', '?')}.")
ae_prev = None
if TORCH_AVAILABLE and AE_WEIGHT > 0:
ae_prev_path = _ae_model_path(name, meta.get('version_id', ''))
if os.path.exists(ae_prev_path):
try:
ae_prev = NFEnsemble.load_state_dict(torch.load(ae_prev_path, weights_only=False))
except Exception:
pass
return joblib.load(model_path), ae_prev, meta.get('features', features)
log_info(f"[{name}] Aucun modèle précédent — utilisation du modèle rejeté par défaut.")
# A1 — Statistiques de référence pour la baseline (mean/std uniquement,
# la détection de dérive est assurée par ADWIN en temps réel)
baseline_stats = {
f: {
'mean': float(X[f].mean()),
'std': float(X[f].std()),
}
for f in features
}
new_model_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.joblib')
new_meta_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.meta.json')
joblib.dump(model, new_model_path)
# Entraînement du NFEnsemble (M=5) en parallèle (si PyTorch disponible et AE_WEIGHT > 0)
ae_model = None
if TORCH_AVAILABLE and AE_WEIGHT > 0:
try:
ae_model = NFEnsemble(n_features=len(features))
ae_stats = ae_model.fit(X_train.values)
ae_path = _ae_model_path(name, version_id)
torch.save(ae_model.state_dict(), ae_path)
log_info(f"[{name}] NFEnsemble entraîné (M={NFEnsemble.ENSEMBLE_SIZE}) : NLL moyen={ae_stats['mean_loss']:.6f}")
except Exception as exc:
log_info(f"[{name}] NFEnsemble training échoué : {exc} — NF désactivé.")
ae_model = None
previous_version = meta.get('version_id', None) if meta else None
new_meta = {
'version_id': version_id, 'trained_at': datetime.now().isoformat(),
'human_samples': len(human_baseline), 'contamination': CONTAMINATION,
'threshold': ANOMALY_THRESHOLD, 'features': features,
'model_name': name, 'previous_version': previous_version,
'retrain_interval': RETRAIN_INTERVAL_H, 'baseline_stats': baseline_stats,
'algorithm': 'ExtendedIsolationForest' if EIF_AVAILABLE else 'IsolationForest',
'autoencoder': ae_model is not None, # NF en réalité, clé conservée pour rétro-compatibilité
'ae_weight': AE_WEIGHT if ae_model else 0.0,
'validation': {
'val_size': len(X_val), 'train_size': len(X_train),
'val_mean_score': round(val_mean_score, 4),
'val_anomaly_rate': round(val_anomaly_rate, 4),
}
}
with open(new_meta_path, 'w') as f: json.dump(new_meta, f, indent=2)
with open(_current_pointer_path(name), 'w') as f: f.write(version_id)
append_training_history({k: v for k, v in new_meta.items() if k != 'baseline_stats'})
_purge_old_versions(name)
log_info(f"[{name}] Modèle v{version_id} sauvegardé → {new_model_path} (NF={'oui' if ae_model is not None else 'non'})")
log_decision('MODEL_TRAINED', cycle_id, name, {
'version_id': version_id, 'previous_version': previous_version,
'human_samples': len(human_baseline), 'next_retrain_in_h': RETRAIN_INTERVAL_H,
'history_kept': MODEL_HISTORY_COUNT
})
return model, ae_model, features