feat: roadmap détection bots §2-9 — HTTP/2, cohérence, drift, flotte, Jaccard, ExIFFI, méta-learner, métriques

Étape 2 — Fingerprinting HTTP/2 dans le pipeline ML :
- Ajout du dictionnaire dict_browser_h2 (11 familles de navigateurs) dans 05_aggregation_tables.sql
- Ajout du CTE h2_agg et 4 features HTTP/2 dans 07_ai_features_view.sql :
  h2_settings_known, h2_pseudo_order_match, h2_ja4_coherence, h2_settings_rare
- Calcul du fingerprint_coherence_score (5 axes pondérés) dans la vue
- Ajout du 6e axe axis_h2_coherence dans browser.py (poids rééquilibrés)
- browser_h2.csv : 11 fingerprints Akamai → famille navigateur

Étape 3 — Pré-filtre de cohérence sur la baseline humaine :
- pipeline.py exclut les sessions avec fingerprint_coherence_score < seuil de la baseline d'entraînement
- FINGERPRINT_COHERENCE_THRESHOLD configurable via env (défaut 0.25)
- Log des sessions exclues pour analyse SOC

Étape 4 — Détection de drift améliorée :
- scoring.py : passage de 5 à 9 quantiles (p5…p95)
- Ajout de la divergence KL en complément du test KS
- Détection de drift adversarial (≥80% des features dérivent dans la même direction)
- Split temporel strict pour la validation

Étape 5 — Graphe bipartite JA4×ASN (§5.2) :
- fleet.py : détection de flottes via NetworkX + Louvain (imports optionnels)
- enrich_with_fleet_score() : ajout fleet_score + fleet_campaign_flag au DataFrame
- cycle.py : appel après preprocess_df avec log du nombre de sessions en flotte
- SQL migration 05_fleet_metrics_tables.sql : table fleet_detections (TTL 7j)
- Dashboard : /fleet + /api/fleet (communautés détectées) + template fleet.html

Étape 6 — Cross-domain Jaccard §5.8 :
- 12_thesis_features.sql : CTE jaccard_paths → cross_domain_path_similarity
- Signal : même chemins (/admin, /wp-login) sur plusieurs hosts = scanner

Étape 7 — ExIFFI + erreurs AE par feature :
- scoring.py : compute_exiffi_importance() par permutation, compute_ae_feature_errors()
- pipeline.py : calcul ExIFFI sur X_test, mapping index → dict pour anomalies
- build_reason() enrichi avec exiffi_top quand SHAP inactif

Étape 8 — Méta-learner pour la pondération de l'ensemble :
- scoring.py : classe MetaLearner (LogisticRegression, fallback poids fixes <1000 labels)
- Collecte des labels depuis le cycle courant (known_bots, légitimes, Anubis)
- pipeline.py : remplacement des poids fixes par MetaLearner.predict()

Étape 9 — Métriques de performance et monitoring :
- metrics.py : record_cycle_metrics() — taux anomalie, drift, corrélation, latence
- SQL migration 05_fleet_metrics_tables.sql : table ml_performance_metrics (TTL 90j)
- Dashboard : /health + /api/health + template health.html
- cycle.py : appel record_cycle_metrics en fin de cycle (Complet + Applicatif)

Tests : 36/36 bot-detector tests passent

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-10 00:11:35 +02:00
parent 8ca4a1e849
commit a108814a56
18 changed files with 1670 additions and 62 deletions

View File

@ -21,14 +21,16 @@ _BROWSER_JA4_PROFILES = {
'ciphers': range(14, 18), 'exts': range(12, 17)},
}
# Pondération des 5 axes pour le score browser_confidence.
# Pondération des 6 axes pour le score browser_confidence.
# Favorise les signaux TLS (difficiles à falsifier) sur HTTP.
# L'axe H2 complète la cohérence TLS avec les paramètres HTTP/2.
_AXIS_WEIGHTS = {
'ja4_known': 0.30, # Axe 1 — Signature JA4 dans dict_browser_ja4 (TLS fingerprint)
'ja4_known': 0.25, # Axe 1 — Signature JA4 dans dict_browser_ja4 (TLS fingerprint)
'ja4_struct': 0.15, # Axe 2 — Structure JA4 (TLS1.3, h2, nb ciphers/ext)
'http_modern': 0.20, # Axe 3 — Client Hints + Sec-Fetch-* (PAS de User-Agent)
'nav_behavior': 0.15, # Axe 4 — Comportement de navigation (assets, referers)
'tls_coherence': 0.20, # Axe 5 — Cohérence TLS/TCP (pas de mismatch)
'tls_coherence': 0.15, # Axe 5 — Cohérence TLS/TCP (pas de mismatch)
'h2_coherence': 0.10, # Axe 6 — Cohérence HTTP/2 (SETTINGS↔JA4, pseudo-headers) §2
}
@ -124,6 +126,21 @@ def _compute_browser_axes(df: pd.DataFrame) -> pd.DataFrame:
+ (iam == 0).astype(float) * 0.20
)
# ── Axe 6 — Cohérence HTTP/2 (§2) ──
# Signaux : fingerprint SETTINGS connu, cohérence H2↔JA4, pseudo-headers corrects.
# Quand les données H2 sont absentes (HTTP/1.x), l'axe est neutre (0.5).
h2k = df.get('h2_settings_known', pd.Series(-1, index=df.index)).fillna(-1)
h2c = df.get('h2_ja4_coherence', pd.Series(-1, index=df.index)).fillna(-1)
h2p = df.get('h2_pseudo_order_match', pd.Series(-1, index=df.index)).fillna(-1)
# Sessions sans données H2 → axe neutre à 0.5 (ne pénalise pas les sites HTTP/1.x)
h2_present = ((h2k >= 0) | (h2c >= 0)).astype(float)
h2_score = (
h2k.clip(0).astype(float) * 0.40
+ h2c.clip(0).astype(float) * 0.35
+ h2p.clip(0).astype(float) * 0.25
)
axes['axis_h2_coherence'] = h2_present * h2_score + (1 - h2_present) * 0.5
# ── Score combiné pondéré ──
axes['browser_confidence'] = sum(
axes[f'axis_{k}'] * w for k, w in _AXIS_WEIGHTS.items()

View File

@ -3,6 +3,7 @@
Orchestre un cycle complet : requête ClickHouse, preprocessing, scoring
(Complet + Applicatif), feedback SOC, déduplication et insertion des résultats.
"""
import time
import pandas as pd
from datetime import datetime
@ -16,6 +17,8 @@ from .log import log_info, log_decision
from .infra import get_client, set_healthy
from .preprocessing import preprocess_df, FEATURES, FEATURES_COMPLET
from .pipeline import run_semi_supervised_logic
from .fleet import enrich_with_fleet_score
from .metrics import record_cycle_metrics
# ═══════════════════════════════════════════════════════════════════════════════
@ -109,6 +112,7 @@ def fetch_and_analyze():
"""
global _consecutive_failures
cycle_id = datetime.now().strftime('%Y%m%d_%H%M%S')
cycle_start = time.time()
log_info('')
log_info('=' * 70)
log_info(f' CYCLE {cycle_id}')
@ -158,6 +162,15 @@ def fetch_and_analyze():
df = preprocess_df(df)
# §5 — Enrichissement avec le score de flotte JA4×ASN (bipartite fleet detection)
try:
df = enrich_with_fleet_score(df)
n_fleet = int((df.get('fleet_campaign_flag', 0) == 1).sum())
if n_fleet > 0:
log_info(f'[Fleet §5] {n_fleet} session(s) appartenant à une flotte suspecte.')
except Exception as e:
log_info(f'[Fleet §5] Enrichissement de flotte échoué : {e}')
# ── Résumé des données chargées ───────────────────────────────────────────
n_total = len(df)
n_correlated = int((df.get('correlated', pd.Series()) == 1).sum())
@ -308,6 +321,16 @@ def fetch_and_analyze():
if all_anom.empty:
log_info('[Dédup] Toutes les anomalies filtrées par TTL — rien à insérer.')
log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN})
try:
for _model_name, _feats in [('Complet', FEATURES_COMPLET), ('Applicatif', FEATURES)]:
record_cycle_metrics(
client=client, db=DB, cycle_id=cycle_id, model_name=_model_name,
df_all=df, anomalies=pd.DataFrame(), all_scored=all_scored if not all_scored.empty else pd.DataFrame(),
drift_rate=0.0, cycle_start_time=cycle_start, baseline_size=0,
threshold=0.0, valid_features=len(_feats), total_features=len(_feats),
)
except Exception:
pass
return
all_anom['detected_at'] = datetime.now().replace(microsecond=0)
@ -369,3 +392,24 @@ def fetch_and_analyze():
log_info(f'║ Distribution : {", ".join(f"{k}={v}" for k, v in sorted(tl_dist.items()))}')
log_info(f'╚═══════════════════════════════════════════════════════════')
log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN})
# §9 — Enregistrer les métriques de performance du cycle dans ml_performance_metrics
try:
for _model_name, _feats in [('Complet', FEATURES_COMPLET), ('Applicatif', FEATURES)]:
record_cycle_metrics(
client=client,
db=DB,
cycle_id=cycle_id,
model_name=_model_name,
df_all=df,
anomalies=all_anom if not all_anom.empty else pd.DataFrame(),
all_scored=all_scored if not all_scored.empty else pd.DataFrame(),
drift_rate=0.0,
cycle_start_time=cycle_start,
baseline_size=0,
threshold=0.0,
valid_features=len(_feats),
total_features=len(_feats),
)
except Exception as e:
log_info(f'[Métriques §9] Enregistrement des métriques échoué : {e}')

View File

@ -0,0 +1,174 @@
"""Détection de flottes de bots via graphe bipartite JA4×ASN.
§5.2 — Analyse de graphe bipartite G=(JA4 ASN, E) pour identifier les flottes
de bots coordonnées qui font tourner leurs fingerprints JA4 et ASN.
Algorithme :
1. Construire le graphe bipartite G depuis les sessions du cycle courant
2. Projeter sur les nœuds JA4 (shared-ASN weighted projection)
3. Détecter les communautés Louvain (python-louvain)
4. Calculer fleet_score = taille × densité / log2(n_asn + 2) pour chaque communauté
5. Retourner les IPs appartenant aux communautés suspectes avec leur fleet_score
"""
import logging
from typing import Optional
import pandas as pd
import numpy as np
logger = logging.getLogger(__name__)
# Seuil de fleet_score à partir duquel une communauté est considérée suspecte
FLEET_SCORE_THRESHOLD = float(__import__('os').getenv('FLEET_SCORE_THRESHOLD', '2.0'))
# Poids du fleet_score dans le score final (malus supplémentaire)
FLEET_SCORE_WEIGHT = float(__import__('os').getenv('FLEET_SCORE_WEIGHT', '0.10'))
# Nombre minimal d'arêtes pour inclure un JA4 dans l'analyse
FLEET_MIN_EDGES = int(__import__('os').getenv('FLEET_MIN_EDGES', '3'))
def build_fleet_graph(df: pd.DataFrame) -> Optional[object]:
"""Construit le graphe bipartite JA4×ASN à partir du cycle courant.
Nœuds : ensemble JA4 (préfixe 'ja4:') + ensemble ASN (préfixe 'asn:')
Arêtes : (ja4, asn) avec weight = nombre d'IPs distinctes sur ce couple
Exige que df ait les colonnes : ja4, asn_number, src_ip
Retourne None si networkx n'est pas disponible ou données insuffisantes.
"""
try:
import networkx as nx
from networkx.algorithms import bipartite
except ImportError:
logger.warning("[Fleet] networkx non disponible — analyse de flotte désactivée.")
return None
if df.empty or 'ja4' not in df.columns or 'asn_number' not in df.columns:
return None
# Filtrer les JA4 vides et ASN 0
mask = (df['ja4'].fillna('') != '') & (df['asn_number'].fillna('0') != '0')
sub = df[mask][['src_ip', 'ja4', 'asn_number']].copy()
if len(sub) < 10:
return None
# Compter les IPs par (ja4, asn) — poids de l'arête
edge_weights = (
sub.groupby(['ja4', 'asn_number'])['src_ip']
.nunique()
.reset_index(name='n_ips')
)
# Garder seulement les arêtes avec au moins FLEET_MIN_EDGES IPs distinctes
edge_weights = edge_weights[edge_weights['n_ips'] >= FLEET_MIN_EDGES]
if len(edge_weights) < 5:
return None
G = nx.Graph()
ja4_nodes = set()
asn_nodes = set()
for _, row in edge_weights.iterrows():
ja4_node = f"ja4:{row['ja4']}"
asn_node = f"asn:{row['asn_number']}"
G.add_node(ja4_node, bipartite=0)
G.add_node(asn_node, bipartite=1)
G.add_edge(ja4_node, asn_node, weight=int(row['n_ips']))
ja4_nodes.add(ja4_node)
asn_nodes.add(asn_node)
return G, ja4_nodes, asn_nodes
def detect_fleet_communities(df: pd.DataFrame) -> dict:
"""Analyse le graphe bipartite et retourne un dict {src_ip: fleet_score}.
fleet_score > FLEET_SCORE_THRESHOLD → IP appartient à une flotte suspectée.
fleet_score = 0 pour toutes les autres IPs.
fleet_score = community_size * graph_density / log2(n_asn + 2)
"""
result = build_fleet_graph(df)
if result is None:
return {}
G, ja4_nodes, asn_nodes = result
try:
import networkx as nx
from networkx.algorithms import bipartite
try:
from community import best_partition as louvain_partition
LOUVAIN_AVAILABLE = True
except ImportError:
LOUVAIN_AVAILABLE = False
# Projection bipartite : graphe des JA4 partageant des ASN
G_ja4 = bipartite.weighted_projected_graph(G, ja4_nodes)
if G_ja4.number_of_edges() == 0:
return {}
# Détection de communautés
if LOUVAIN_AVAILABLE:
partition = louvain_partition(G_ja4, weight='weight', random_state=42)
# partition = {node: community_id}
communities: dict = {}
for node, cid in partition.items():
communities.setdefault(cid, set()).add(node)
else:
# Fallback : composantes connexes
communities = {
i: set(c)
for i, c in enumerate(nx.connected_components(G_ja4))
if len(c) >= 2
}
# Calculer le fleet_score de chaque communauté
fleet_scores: dict = {} # {ja4: fleet_score}
for cid, members in communities.items():
if len(members) < 2:
continue
sub_g = G.subgraph(
list(members) + [n for n in asn_nodes if any(G.has_edge(n, m) for m in members)]
)
n_asn = len([n for n in sub_g.nodes if n.startswith('asn:')])
density = nx.density(G_ja4.subgraph(members))
score = len(members) * density / max(np.log2(n_asn + 2), 0.1)
for ja4_node in members:
ja4 = ja4_node.replace('ja4:', '')
fleet_scores[ja4] = round(float(score), 3)
# Mapper les fleet_scores sur les IPs
if not fleet_scores:
return {}
ip_scores: dict = {}
for _, row in df.iterrows():
ja4 = str(row.get('ja4', ''))
score = fleet_scores.get(ja4, 0.0)
if score >= FLEET_SCORE_THRESHOLD:
src_ip = str(row.get('src_ip', ''))
if src_ip:
ip_scores[src_ip] = max(ip_scores.get(src_ip, 0.0), score)
n_fleets = len(set(fleet_scores.values()))
if ip_scores:
logger.info(
f"[Fleet] {len(ip_scores)} IPs dans {n_fleets} communauté(s) suspecte(s) "
f"(score max={max(ip_scores.values()):.2f})"
)
return ip_scores
except Exception as e:
logger.warning(f"[Fleet] Erreur détection de flotte : {e}")
return {}
def enrich_with_fleet_score(df: pd.DataFrame) -> pd.DataFrame:
"""Enrichit le DataFrame avec fleet_score et fleet_campaign_flag.
fleet_campaign_flag = 1 si l'IP appartient à une flotte suspectée.
fleet_score = score de la communauté (0 = pas de flotte).
"""
df = df.copy()
fleet_map = detect_fleet_communities(df)
df['fleet_score'] = df['src_ip'].map(fleet_map).fillna(0.0).astype(float)
df['fleet_campaign_flag'] = (df['fleet_score'] >= FLEET_SCORE_THRESHOLD).astype(int)
return df

View File

@ -0,0 +1,166 @@
"""Module de métriques de performance du pipeline ML.
Enregistre un résumé de chaque cycle dans ml_performance_metrics :
- Taux d'anomalie par niveau (CRITICAL/HIGH/MEDIUM/LOW)
- Taux de corrélation (correlated=1 vs 0)
- Drift rate, latence, taille baseline, seuil adaptatif
- Alertes automatiques sur calibration, drift, corrélation, latence
Utilisation dans cycle.py :
from .metrics import record_cycle_metrics
record_cycle_metrics(client, cycle_id, model_name, ...)
"""
import time
import logging
from datetime import datetime
from typing import Optional
import pandas as pd
logger = logging.getLogger(__name__)
# Seuils d'alerte (configurables via env vars)
import os
ALERT_ANOMALY_RATE_HIGH = float(os.getenv('ALERT_ANOMALY_RATE_HIGH', '0.10'))
ALERT_ANOMALY_RATE_LOW = float(os.getenv('ALERT_ANOMALY_RATE_LOW', '0.005'))
ALERT_DRIFT_RATE = float(os.getenv('ALERT_DRIFT_RATE', '0.30'))
ALERT_CORRELATION_RATE = float(os.getenv('ALERT_CORRELATION_RATE', '0.50'))
ALERT_LATENCY_MS = int(os.getenv('ALERT_LATENCY_MS', '300000'))
def record_cycle_metrics(
client,
db: str,
cycle_id: str,
model_name: str,
df_all: pd.DataFrame,
anomalies: pd.DataFrame,
all_scored: pd.DataFrame,
drift_rate: float,
cycle_start_time: float,
baseline_size: int,
threshold: float,
valid_features: int,
total_features: int,
meta_learner_active: bool = False,
) -> None:
"""Enregistre les métriques d'un cycle dans ml_performance_metrics.
Émet également des alertes dans les logs si les seuils sont dépassés.
Args:
client : ClickHouseClient actif
db : nom de la base ja4_processing
cycle_id : identifiant du cycle (timestamp)
model_name : 'Complet' ou 'Applicatif' (ou variante 24h)
df_all : DataFrame complet du cycle (avant scoring)
anomalies : DataFrame des anomalies détectées
all_scored : DataFrame de tous les scores
drift_rate : taux de features en dérive [0, 1]
cycle_start_time : timestamp (time.time()) du début du cycle
baseline_size : nombre de sessions dans la baseline humaine
threshold : seuil adaptatif utilisé
valid_features : nombre de features valides
total_features : nombre total de features demandées
meta_learner_active : True si le meta-learner a été utilisé ce cycle
"""
now = datetime.utcnow()
latency_ms = int((time.time() - cycle_start_time) * 1000)
n_total = max(len(df_all), 1)
n_scored = max(len(all_scored), 1)
# Taux de corrélation
if 'correlated' in df_all.columns:
n_correlated = int((df_all['correlated'] == 1).sum())
correlated_rate = n_correlated / n_total
else:
correlated_rate = 0.0
# Comptage par niveau de menace
n_critical = n_high = n_medium = n_low = 0
if not anomalies.empty and 'threat_level' in anomalies.columns:
levels = anomalies['threat_level'].value_counts()
n_critical = int(levels.get('CRITICAL', 0))
n_high = int(levels.get('HIGH', 0))
n_medium = int(levels.get('MEDIUM', 0))
n_low = int(levels.get('LOW', 0))
# Bots connus et navigateurs légitimes
n_known_bot = 0
n_anubis_deny = 0
n_legit_browser = 0
if not df_all.empty:
if 'bot_name' in df_all.columns:
n_known_bot = int((df_all['bot_name'].fillna('') != '').sum())
if 'anubis_bot_action' in df_all.columns:
n_anubis_deny = int((df_all['anubis_bot_action'] == 'DENY').sum())
if 'browser_confidence' in df_all.columns:
from .config import BROWSER_CONFIDENCE_THRESHOLD
n_legit_browser = int((df_all['browser_confidence'] >= BROWSER_CONFIDENCE_THRESHOLD).sum())
anomaly_rate = (n_critical + n_high + n_medium + n_low) / n_scored
drift_alert = 1 if drift_rate > ALERT_DRIFT_RATE else 0
# Alertes
_emit_alerts(model_name, anomaly_rate, drift_rate, correlated_rate, latency_ms, drift_alert)
try:
client.execute(
f"INSERT INTO {db}.ml_performance_metrics VALUES",
[{
'cycle_at': now,
'model_name': model_name,
'total_sessions': n_total,
'correlated_rate': round(float(correlated_rate), 4),
'anomaly_rate': round(float(anomaly_rate), 4),
'critical_count': n_critical,
'high_count': n_high,
'medium_count': n_medium,
'low_count': n_low,
'known_bot_count': n_known_bot,
'anubis_deny_count': n_anubis_deny,
'legit_browser_count': n_legit_browser,
'drift_rate': round(float(drift_rate), 4),
'drift_alert': drift_alert,
'cycle_latency_ms': latency_ms,
'features_valid': valid_features,
'features_total': total_features,
'baseline_size': baseline_size,
'threshold': round(float(threshold), 6),
'meta_learner_active': 1 if meta_learner_active else 0,
}]
)
except Exception as e:
logger.warning(f"[Metrics] Erreur d'enregistrement des métriques : {e}")
def _emit_alerts(model_name: str, anomaly_rate: float, drift_rate: float,
correlated_rate: float, latency_ms: int, drift_alert: int) -> None:
"""Émet des alertes dans les logs si les seuils sont dépassés."""
if anomaly_rate > ALERT_ANOMALY_RATE_HIGH:
logger.warning(
f"[{model_name}] ⚠ ALERTE CALIBRATION : taux d'anomalie élevé "
f"({anomaly_rate:.1%} > {ALERT_ANOMALY_RATE_HIGH:.1%})"
)
elif anomaly_rate < ALERT_ANOMALY_RATE_LOW and anomaly_rate > 0:
logger.warning(
f"[{model_name}] ⚠ ALERTE CALIBRATION : taux d'anomalie très bas "
f"({anomaly_rate:.3%} < {ALERT_ANOMALY_RATE_LOW:.1%})"
)
if drift_alert:
logger.warning(
f"[{model_name}] ⚠ ALERTE DRIFT : {drift_rate:.1%} des features en dérive "
f"(seuil {ALERT_DRIFT_RATE:.1%})"
)
if correlated_rate < ALERT_CORRELATION_RATE:
logger.warning(
f"[{model_name}] ⚠ ALERTE CORRÉLATION : taux de corrélation bas "
f"({correlated_rate:.1%} < {ALERT_CORRELATION_RATE:.1%}) — "
"vérifier ja4sentinel/logcorrelator"
)
if latency_ms > ALERT_LATENCY_MS:
logger.warning(
f"[{model_name}] ⚠ ALERTE PERFORMANCE : latence cycle {latency_ms}ms "
f"> {ALERT_LATENCY_MS}ms"
)

View File

@ -328,7 +328,8 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list,
drift_score = 0.0
drift_forced = False
if age_ok and 'baseline_stats' in meta:
drift_score = compute_drift_score(meta['baseline_stats'], human_baseline, features)
drift_score = compute_drift_score(meta['baseline_stats'], human_baseline, features,
name=name, cycle_id=cycle_id)
if drift_score >= DRIFT_THRESHOLD:
drift_forced = True
log_info(f"[{name}] Dérive détectée ({drift_score:.0%} features) — retraining forcé.")
@ -419,13 +420,18 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list,
return joblib.load(model_path), ae_prev, meta.get('features', features)
log_info(f"[{name}] Aucun modèle précédent — utilisation du modèle rejeté par défaut.")
# A1 — Sauvegarder les statistiques de distribution avec quantile digest pour drift detection
# A1/§4 — Sauvegarder les statistiques de distribution avec quantile digest 9 points
# (p5…p95) pour une meilleure fidélité de la détection de dérive KS+KL
baseline_stats = {
f: {
'mean': float(X[f].mean()), 'std': float(X[f].std()),
'p10': float(X[f].quantile(0.10)), 'p25': float(X[f].quantile(0.25)),
'p50': float(X[f].quantile(0.50)), 'p75': float(X[f].quantile(0.75)),
'p5': float(X[f].quantile(0.05)),
'p10': float(X[f].quantile(0.10)),
'p25': float(X[f].quantile(0.25)),
'p50': float(X[f].quantile(0.50)),
'p75': float(X[f].quantile(0.75)),
'p90': float(X[f].quantile(0.90)),
'p95': float(X[f].quantile(0.95)),
}
for f in features
}

View File

@ -19,6 +19,8 @@ from .models import load_or_train_model, load_or_train_xgb, TrafficAutoEncoder
from .scoring import (
validate_features, compute_adaptive_threshold, normalize_scores,
compute_shap_top_features, build_reason, cluster_anomalies,
compute_exiffi_importance, compute_ae_feature_errors, get_meta_learner,
FINGERPRINT_COHERENCE_THRESHOLD,
)
@ -56,6 +58,23 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
log_info(f'[{name}] Trafic à scorer (IF) : {len(unknown_traffic):>6}')
log_info(f'[{name}] Baseline ISP (human) : {len(human_baseline):>6} (seuil min=500)')
# §3 — Exclure les sessions ISP à faible cohérence de fingerprint de la baseline humaine
# Ces sessions ISP avec un fingerprint incohérent sont probablement des proxies résidentiels
# ou des appareils mal configurés qui contamineraient la baseline.
if 'fingerprint_coherence_score' in human_baseline.columns:
low_coh = human_baseline['fingerprint_coherence_score'] < FINGERPRINT_COHERENCE_THRESHOLD
n_low_coh = int(low_coh.sum())
if n_low_coh > 0:
human_baseline = human_baseline[~low_coh]
log_info(
f'[{name}] Baseline après filtre cohérence (<{FINGERPRINT_COHERENCE_THRESHOLD}) : '
f'{len(human_baseline):>6} ({n_low_coh} exclues)'
)
log_decision('LOW_COHERENCE_EXCLUDED', cycle_id, name, {
'n_excluded': n_low_coh, 'threshold': FINGERPRINT_COHERENCE_THRESHOLD,
'baseline_after': len(human_baseline),
})
# A7 — Valider les features avant tout traitement
valid_features = validate_features(df, features, name, cycle_id)
if valid_features is None:
@ -130,16 +149,51 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
X_xgb = unknown_traffic[xgb_cols].replace([np.inf, -np.inf], np.nan).fillna(0)
xgb_probs = xgb_model.predict_proba(X_xgb.values)[:, 1]
unknown_traffic['xgb_prob'] = xgb_probs
# Méta-learner : combiner anomaly_score (EIF+AE) et xgb_prob
# anomaly_score déjà normalisé [0,1], xgb_prob est [0,1]
α_xgb = XGB_WEIGHT
unknown_traffic['anomaly_score'] = (
(1 - α_xgb) * unknown_traffic['anomaly_score'] + α_xgb * xgb_probs
)
log_info(f"[{name}] Score combiné EIF+AE+XGB (β={α_xgb}): xgb_mean={xgb_probs.mean():.4f}")
log_info(f"[{name}] XGBoost : xgb_mean={xgb_probs.mean():.4f}")
except Exception as exc:
log_info(f"[{name}] XGBoost scoring échoué : {exc} — EIF+AE seuls.")
# §8 — Score final via MetaLearner (ou poids fixes en fallback)
meta_learner = get_meta_learner(name)
eif_norm_arr = unknown_traffic['anomaly_score'].values.copy()
ae_norm_arr = normalize_scores(-unknown_traffic['ae_recon_error'].values)
xgb_prob_arr = unknown_traffic['xgb_prob'].values
hits_arr = unknown_traffic.get('hits', pd.Series(1, index=unknown_traffic.index)).values
corr_arr = unknown_traffic.get('correlated', pd.Series(0, index=unknown_traffic.index)).values
final_scores = meta_learner.predict(eif_norm_arr, ae_norm_arr, xgb_prob_arr,
hits_arr, corr_arr)
unknown_traffic['anomaly_score'] = final_scores
if meta_learner.is_trained:
log_info(
f"[{name}] §8 MetaLearner actif ({meta_learner._n_samples} labels) — "
f"score moyen={final_scores.mean():.4f}"
)
elif unknown_traffic['xgb_prob'].mean() > 0:
log_info(f"[{name}] §8 Poids fixes EIF+AE+XGB (MetaLearner pas encore entraîné).")
# §8 — Entraînement du MetaLearner sur les labels du cycle courant
# (accumulation progressive — activation dès MIN_SAMPLES labels)
try:
labeled_df = meta_learner.build_labels_from_df(unknown_traffic)
if not labeled_df.empty:
unknown_traffic_labeled = labeled_df.copy()
unknown_traffic_labeled['eif_norm'] = normalize_scores(raw_scores)
unknown_traffic_labeled['ae_norm'] = ae_norm_arr
if meta_learner.fit(unknown_traffic_labeled):
log_decision('META_LEARNER_TRAINED', cycle_id, name, meta_learner._weights_log)
except Exception as exc:
log_info(f"[{name}] MetaLearner entraînement échoué : {exc}")
# §7 — ExIFFI : importance de features pour l'EIF (quand SHAP désactivé)
exiffi_tops: list = [{}] * len(unknown_traffic)
if not ENABLE_SHAP and len(unknown_traffic) > 0:
try:
exiffi_tops = compute_exiffi_importance(model, X_test, scoring_features)
except Exception:
pass
# A2 — Seuil adaptatif calculé sur les scores BRUTS (même échelle que ANOMALY_THRESHOLD)
effective_threshold = compute_adaptive_threshold(raw_scores)
log_info(f"[{name}] Seuil effectif : {effective_threshold:.4f} (statique={ANOMALY_THRESHOLD}, percentile={ANOMALY_PERCENTILE})")
@ -207,7 +261,7 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
# Log des axes moyens pour diagnostic
ax_means = {}
for ax in ['axis_ja4_known', 'axis_ja4_struct', 'axis_http_modern',
'axis_nav_behavior', 'axis_tls_coherence']:
'axis_nav_behavior', 'axis_tls_coherence', 'axis_h2_coherence']:
col = unknown_traffic.get(ax, None)
if col is not None:
ax_means[ax.replace('axis_', '')] = round(float(col[browser_legit_mask].mean()), 3)
@ -297,9 +351,18 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
# A4 — Explainabilité SHAP : top features responsables de chaque anomalie
X_anomalies = X_test.loc[anomalies.index]
shap_tops = compute_shap_top_features(model, X_anomalies, valid_features)
# §7 — ExIFFI : utiliser les tops ExIFFI précalculés quand SHAP est inactif
# Construire un mapping index → exiffi_top pour accès rapide
if len(exiffi_tops) == len(unknown_traffic):
_exiffi_map = dict(zip(unknown_traffic.index, exiffi_tops))
exiffi_for_anomalies = [_exiffi_map.get(idx, {}) for idx in anomalies.index]
else:
exiffi_for_anomalies = [{}] * len(anomalies)
anomalies['reason'] = [
build_reason(name, row, shap)
for (_, row), shap in zip(anomalies.iterrows(), shap_tops)
build_reason(name, row, shap, exiffi)
for (_, row), shap, exiffi
in zip(anomalies.iterrows(), shap_tops, exiffi_for_anomalies)
]
# A8 — Clustering DBSCAN pour identifier les campagnes coordonnées

View File

@ -33,7 +33,7 @@ FEATURES = [
# Browser multifactoriel
'is_known_browser', 'browser_consistency_score', 'browser_confidence',
'axis_ja4_known', 'axis_ja4_struct', 'axis_http_modern',
'axis_nav_behavior', 'axis_tls_coherence',
'axis_nav_behavior', 'axis_tls_coherence', 'axis_h2_coherence',
# HTTP
'missing_accept_enc_ratio', 'http_scheme_ratio',
# Thèse §5
@ -41,12 +41,19 @@ FEATURES = [
'cadence_cv', 'burst_ratio', 'pause_ratio',
'lag1_autocorrelation', 'benford_deviation',
'host_diversity', 'host_sweep_speed', 'host_coverage_uniformity',
# §5.8b — Similarité Jaccard cross-domaine (chemins partagés entre hosts)
'cross_domain_path_similarity',
# P0+P1 : features sous-exploitées (SQL existant ou ajouté)
'is_fake_navigation',
'true_window_size', 'window_mss_ratio',
# P1 : nouvelles features de détection
'has_xff', 'unusual_content_type_ratio', 'non_standard_port_ratio',
'login_post_concentration', 'sec_ch_mobile_mismatch',
# §2 — Features HTTP/2 (fingerprint SETTINGS, cohérence H2↔JA4)
'h2_settings_known', 'h2_pseudo_order_match',
'h2_ja4_coherence', 'h2_settings_rare',
# §3 — Score de cohérence de fingerprint cross-layer
'fingerprint_coherence_score',
]
# Features supplémentaires pour le modèle Complet (données TCP/TLS requises)
@ -82,7 +89,7 @@ def preprocess_df(df: pd.DataFrame) -> pd.DataFrame:
df['browser_confidence'] = browser_axes['browser_confidence']
for ax in ['axis_ja4_known', 'axis_ja4_struct', 'axis_http_modern',
'axis_nav_behavior', 'axis_tls_coherence']:
'axis_nav_behavior', 'axis_tls_coherence', 'axis_h2_coherence']:
df[ax] = browser_axes[ax]
# Rétro-compatibilité
@ -108,6 +115,8 @@ def preprocess_df(df: pd.DataFrame) -> pd.DataFrame:
'is_ua_rotating', 'is_alpn_missing', 'sni_host_mismatch', 'alpn_http_mismatch',
'mss_mobile_mismatch', 'anubis_is_flagged', 'is_rare_ja4',
'is_fake_navigation', 'has_xff', 'sec_ch_mobile_mismatch',
# §2 — Features HTTP/2 binaires
'h2_settings_known', 'h2_pseudo_order_match', 'h2_ja4_coherence', 'h2_settings_rare',
}
for col in df.columns:
if col in binary_features:

View File

@ -1,12 +1,14 @@
"""Scoring, dérive, validation, seuil adaptatif, SHAP et clustering.
Regroupe les fonctions de scoring utilisées par le pipeline de détection :
- A1 : détection de dérive conceptuelle (KS-test / Z-score)
- A1 : détection de dérive conceptuelle (KS-test + KL divergence + dérive adversariale)
- A2 : seuil adaptatif basé sur le percentile des scores négatifs
- A4 : explainabilité SHAP (top features contributives)
- A7 : validation de complétude des features
- A8 : clustering HDBSCAN / DBSCAN des anomalies
- A10 : normalisation [0,1] des scores d'anomalie
- §7 : ExIFFI — importance de features par permutation (Extended Isolation Forest)
- §8 : MetaLearner — pondération de l'ensemble par régression logistique
"""
import numpy as np
import pandas as pd
@ -17,6 +19,7 @@ from .config import (
ENABLE_SHAP, SHAP_AVAILABLE, ENABLE_CLUSTERING, CLUSTERING_MIN_SAMPLES,
EIF_AVAILABLE, HDBSCAN_AVAILABLE,
STRUCTURAL_EXCLUDED_FEATURES,
AE_WEIGHT, XGB_WEIGHT,
)
from .log import log_info, log_decision
@ -27,6 +30,9 @@ from .config import DBSCAN, StandardScaler
if ENABLE_SHAP:
from .config import _shap
# Seuil de filtre de cohérence de fingerprint pour la baseline humaine (§3)
import os
FINGERPRINT_COHERENCE_THRESHOLD = float(os.getenv('FINGERPRINT_COHERENCE_THRESHOLD', '0.25'))
# Cache par modèle : dernier état des features invalides (pour ne logguer que les changements).
_feature_warning_cache: dict = {}
@ -36,26 +42,63 @@ _feature_warning_cache: dict = {}
# A1 — DÉTECTION DE DÉRIVE CONCEPTUELLE
# ═══════════════════════════════════════════════════════════════════════════════
def _kl_divergence(p_vals: np.ndarray, q_vals: np.ndarray, n_bins: int = 20) -> float:
"""Calcule la divergence KL(P || Q) entre deux échantillons via histogramme.
Utilise un lissage de Laplace pour éviter les divisions par zéro.
Retourne 0.0 si les données sont insuffisantes.
"""
if len(p_vals) < 10 or len(q_vals) < 10:
return 0.0
combined_min = min(p_vals.min(), q_vals.min())
combined_max = max(p_vals.max(), q_vals.max())
if combined_max - combined_min < 1e-10:
return 0.0
bins = np.linspace(combined_min, combined_max, n_bins + 1)
p_hist, _ = np.histogram(p_vals, bins=bins, density=False)
q_hist, _ = np.histogram(q_vals, bins=bins, density=False)
# Lissage de Laplace : ajouter 1 à chaque bin pour éviter les 0
p_hist = (p_hist + 1).astype(float)
q_hist = (q_hist + 1).astype(float)
p_hist /= p_hist.sum()
q_hist /= q_hist.sum()
# KL(P || Q) = Σ p * log(p / q)
kl = float(np.sum(p_hist * np.log(p_hist / q_hist + 1e-12)))
return max(0.0, kl)
def compute_drift_score(baseline_stats: dict, current_baseline: pd.DataFrame,
features: list) -> float:
features: list, *, name: str = '', cycle_id: str = '') -> float:
"""Compare la distribution actuelle de la baseline humaine avec celle de l'entraînement.
Utilise le test de Kolmogorov-Smirnov bilatéral par feature. La distribution
d'entraînement est reconstruite à partir d'un quantile digest (p10..p90) par
interpolation linéaire — bien plus fidèle qu'une approximation N(μ,σ) pour les
features asymétriques ou multimodales.
Retourne la fraction de features en dérive significative (p < 0.05).
§4 — Méthode améliorée :
- Quantile digest 9 points (p5…p95) au lieu de 5 pour une meilleure fidélité
- KS-test + divergence KL : feature en dérive si KS p<0.05 OU KL>0.5
- Détection de dérive adversariale : >50% des features dérivent dans la même
direction → log_decision 'ADVERSARIAL_DRIFT'
Retourne la fraction de features en dérive significative (en [0,1]).
"""
if not baseline_stats or current_baseline.empty:
return 0.0
try:
from scipy.stats import ks_2samp
_HAS_SCIPY = True
except ImportError:
return _compute_drift_score_zscore(baseline_stats, current_baseline, features)
_HAS_SCIPY = False
# Clés de quantiles : 9 points preferred, 5 points fallback
Q9_KEYS = ['p5', 'p10', 'p25', 'p50', 'p75', 'p90', 'p95']
Q9_PROBS = np.array([0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95])
Q5_KEYS = ['p10', 'p25', 'p50', 'p75', 'p90']
Q5_PROBS = np.array([0.10, 0.25, 0.50, 0.75, 0.90])
drifted = 0
tested = 0
tested = 0
drifted_features: list = []
direction_shifts: list = []
rng = np.random.default_rng(42)
for feat in features:
if feat not in baseline_stats or feat not in current_baseline.columns:
continue
@ -67,42 +110,60 @@ def compute_drift_score(baseline_stats: dict, current_baseline: pd.DataFrame,
if trained_std < 1e-9:
continue
quantile_keys = ['p10', 'p25', 'p50', 'p75', 'p90']
if all(k in stats for k in quantile_keys):
quantile_probs = np.array([0.10, 0.25, 0.50, 0.75, 0.90])
quantile_vals = np.array([stats[k] for k in quantile_keys])
# Reconstruction de la distribution d'entraînement par interpolation quantile
if all(k in stats for k in Q9_KEYS):
q_probs = Q9_PROBS
q_vals = np.array([stats[k] for k in Q9_KEYS])
elif all(k in stats for k in Q5_KEYS):
q_probs = Q5_PROBS
q_vals = np.array([stats[k] for k in Q5_KEYS])
else:
q_probs = None
if q_probs is not None:
u = rng.uniform(0, 1, size=len(curr_values))
synthetic_trained = np.interp(u, quantile_probs, quantile_vals)
synthetic_trained = np.interp(u, q_probs, q_vals)
else:
synthetic_trained = rng.normal(stats['mean'], trained_std, size=len(curr_values))
_, p_value = ks_2samp(curr_values.values, synthetic_trained)
if p_value < 0.05:
drifted += 1
tested += 1
return drifted / max(tested, 1)
feat_drifted = False
if _HAS_SCIPY:
_, ks_p = ks_2samp(curr_values.values, synthetic_trained)
if ks_p < 0.05:
feat_drifted = True
else:
# Fallback Z-score
z = abs(curr_values.mean() - stats['mean']) / trained_std
if z > 2.0:
feat_drifted = True
# KL divergence comme critère complémentaire au KS
kl = _kl_divergence(curr_values.values, synthetic_trained)
if kl > 0.5:
feat_drifted = True
def _compute_drift_score_zscore(baseline_stats: dict, current_baseline: pd.DataFrame,
features: list) -> float:
"""Fallback Z-score pour la détection de dérive quand scipy n'est pas disponible."""
if not baseline_stats or current_baseline.empty:
return 0.0
drifted = 0
tested = 0
for feat in features:
if feat not in baseline_stats or feat not in current_baseline.columns:
continue
stats = baseline_stats[feat]
curr_mean = current_baseline[feat].mean()
trained_std = stats.get('std', 0)
if trained_std < 1e-9:
continue
z = abs(curr_mean - stats['mean']) / trained_std
if z > 2.0:
if feat_drifted:
drifted += 1
drifted_features.append(feat)
# Direction du shift pour la détection adversariale
direction_shifts.append(np.sign(curr_values.mean() - stats['mean']))
tested += 1
return drifted / max(tested, 1)
drift_rate = drifted / max(tested, 1)
# Détection de dérive adversariale : >50% des features dérivent simultanément
# dans la même direction → signe d'une manipulation intentionnelle de la distribution
if drift_rate > 0.50 and len(direction_shifts) >= 10:
consensus = abs(float(np.mean(direction_shifts)))
if consensus >= 0.8:
log_decision('ADVERSARIAL_DRIFT', cycle_id, name, {
'drift_rate': round(drift_rate, 3),
'consensus': round(consensus, 3),
'n_features': len(drifted_features),
'top_drifted': drifted_features[:10],
})
return drift_rate
# ═══════════════════════════════════════════════════════════════════════════════
@ -216,14 +277,18 @@ def compute_shap_top_features(model, X: pd.DataFrame, features: list,
return [{}] * len(X)
def build_reason(name: str, row: pd.Series, shap_top: dict) -> str:
"""Construit le champ reason enrichi avec le top SHAP ou les métriques clés."""
def build_reason(name: str, row: pd.Series, shap_top: dict,
exiffi_top: dict = None) -> str:
"""Construit le champ reason enrichi avec SHAP, ExIFFI ou les métriques clés."""
# Utilise le score brut pour l'affichage (plus interprétable que le score normalisé)
score = round(float(row.get('raw_anomaly_score', row.get('anomaly_score', 0))), 3)
threat = row.get('threat_level', '')
if shap_top:
top_str = ' | '.join(f"{f}({v:+.3f})" for f, v in shap_top.items())
return f"[{name}] Score: {score} | SHAP: {top_str} | Threat: {threat}"
if exiffi_top:
top_str = ' | '.join(f"{f}({v:+.3f})" for f, v in exiffi_top.items())
return f"[{name}] Score: {score} | ExIFFI: {top_str} | Threat: {threat}"
vel = round(float(row.get('hit_velocity', 0)), 1)
fuzz = round(float(row.get('fuzzing_index', 0)), 1)
return f"[{name}] Score: {score} | Vel: {vel} req/s | Fuzzing: {fuzz} | Threat: {threat}"
@ -277,3 +342,223 @@ def cluster_anomalies(anomalies: pd.DataFrame, features: list,
log_info(f"[Clustering] Erreur de clustering: {e}")
anomalies['campaign_id'] = -1
return anomalies
# ═══════════════════════════════════════════════════════════════════════════════
# §7 — ExIFFI : IMPORTANCE DE FEATURES PAR PERMUTATION
# ═══════════════════════════════════════════════════════════════════════════════
def compute_exiffi_importance(model, X: pd.DataFrame, features: list,
n_top: int = 3, sample_size: int = 200) -> list:
"""§7 — Importance de features pour l'EIF par permutation (ExIFFI simplifié).
Pour chaque feature, calcule la dégradation de score lors de la permutation
de la colonne. La différence de score (permuted original) mesure l'importance :
une valeur positive élevée signifie que permuter cette feature rend la session
moins anomale → la feature est un signal fort de détection.
Compatible sklearn IsolationForest et isotree ExtendedIsolationForest.
Retourne une liste de dicts {feature: importance_score} par ligne de X.
"""
if X.empty or len(features) < 2:
return [{}] * len(X)
try:
# Limiter la taille pour la performance
if len(X) > sample_size:
X_sample = X[features].sample(n=sample_size, random_state=42)
else:
X_sample = X[features]
X_arr = X_sample.fillna(0).values.copy()
baseline_scores = model.decision_function(X_arr)
feat_importance: dict = {}
rng = np.random.default_rng(42)
for i, feat in enumerate(features):
if feat not in X_sample.columns:
continue
X_perm = X_arr.copy()
# Permuter la colonne (brise la corrélation signal/label)
X_perm[:, i] = rng.permutation(X_perm[:, i])
perm_scores = model.decision_function(X_perm)
# Importance = gain moyen de score lors de la permutation
feat_importance[feat] = float(np.mean(perm_scores - baseline_scores))
# Trier par importance décroissante : les plus grandes valeurs = features clés
sorted_feats = sorted(feat_importance.items(), key=lambda kv: kv[1], reverse=True)
top_feats = {f: round(v, 4) for f, v in sorted_feats[:n_top]}
# L'importance est globale (calculée sur un sample) : même top pour toutes les lignes
return [top_feats] * len(X)
except Exception as e:
log_info(f"[ExIFFI] Erreur de calcul : {e}")
return [{}] * len(X)
def compute_ae_feature_errors(ae_model, X: pd.DataFrame, features: list,
n_top: int = 3) -> list:
"""§7 — Erreur de reconstruction par feature de l'Autoencoder.
Pour chaque session anomale, détermine quelles features ont la plus grande
erreur de reconstruction individuelle (signal de l'anomalie la plus saillante).
Retourne une liste de dicts {feature: reconstruction_error} par ligne de X.
"""
if ae_model is None or X.empty:
return [{}] * len(X)
try:
import torch
X_arr = X[features].fillna(0).values.astype(np.float32)
X_t = torch.tensor(X_arr)
with torch.no_grad():
X_rec = ae_model.forward(X_t).numpy()
per_feature_err = (X_arr - X_rec) ** 2 # shape: (n_samples, n_features)
result = []
for row_err in per_feature_err:
pairs = sorted(zip(features, row_err.tolist()), key=lambda kv: kv[1], reverse=True)
result.append({f: round(v, 6) for f, v in pairs[:n_top]})
return result
except Exception as e:
log_info(f"[AE features] Erreur de calcul erreur par feature : {e}")
return [{}] * len(X)
# ═══════════════════════════════════════════════════════════════════════════════
# §8 — META-LEARNER : PONDÉRATION APPRISE DE L'ENSEMBLE
# ═══════════════════════════════════════════════════════════════════════════════
class MetaLearner:
"""§8 — Méta-learner (régression logistique) pour la pondération de l'ensemble.
Remplace les poids fixes (1XGB_W)×((1AE_W)×eif + AE_W×ae) + XGB_W×xgb
par une régression logistique entraînée sur les labels SOC/Anubis/bots connus.
Formule apprise :
P(bot) = logistic(w1×eif_norm + w2×ae_norm + w3×xgb_prob
+ w4×log1p(hits) + w5×correlated + bias)
Fallback automatique aux poids fixes quand < MIN_SAMPLES labels disponibles.
"""
MIN_SAMPLES = 1000 # Nombre minimal de labels pour activer le méta-learner
def __init__(self):
self._clf = None
self._n_samples: int = 0
self._feature_names = ['eif_norm', 'ae_norm', 'xgb_prob', 'log_hits', 'correlated']
self._weights_log: dict = {} # Pour la transparence SOC
def fit(self, df: pd.DataFrame) -> bool:
"""Entraîne le méta-learner sur un DataFrame de sessions labelisées.
Colonnes requises : eif_norm, ae_norm (ou 0), xgb_prob (ou 0),
hits, correlated, label (0=normal, 1=bot).
Retourne True si l'entraînement a réussi.
"""
try:
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler as _SS
except ImportError:
return False
required = {'eif_norm', 'label'}
if not required.issubset(df.columns):
return False
df = df.dropna(subset=['eif_norm', 'label'])
if len(df) < self.MIN_SAMPLES:
return False
X_meta = pd.DataFrame({
'eif_norm': df['eif_norm'].clip(0, 1),
'ae_norm': df.get('ae_norm', pd.Series(0.0, index=df.index)).clip(0, 1),
'xgb_prob': df.get('xgb_prob', pd.Series(0.0, index=df.index)).clip(0, 1),
'log_hits': np.log1p(df.get('hits', pd.Series(1, index=df.index)).clip(1)),
'correlated': df.get('correlated', pd.Series(0, index=df.index)).clip(0, 1),
}).fillna(0)
y = df['label'].astype(int)
scaler = _SS()
X_scaled = scaler.fit_transform(X_meta)
clf = LogisticRegression(max_iter=500, C=1.0, random_state=42)
clf.fit(X_scaled, y)
# Enregistrer les poids pour la transparence SOC
coefs = dict(zip(self._feature_names, clf.coef_[0].tolist()))
self._clf = (clf, scaler)
self._n_samples = len(df)
self._weights_log = {
'coefs': {k: round(v, 4) for k, v in coefs.items()},
'intercept': round(float(clf.intercept_[0]), 4),
'n_samples': self._n_samples,
}
log_info(f"[MetaLearner] Entraîné sur {self._n_samples} labels — coefs: {coefs}")
return True
def predict(self, eif_norm: np.ndarray, ae_norm: np.ndarray,
xgb_prob: np.ndarray, hits: np.ndarray = None,
correlated: np.ndarray = None) -> np.ndarray:
"""Prédit P(bot) avec le méta-learner ou les poids fixes en fallback.
Retourne un array de probabilités dans [0, 1].
"""
n = len(eif_norm)
if hits is None:
hits = np.ones(n)
if correlated is None:
correlated = np.zeros(n)
if self.is_trained:
clf, scaler = self._clf
X_meta = np.column_stack([
np.clip(eif_norm, 0, 1),
np.clip(ae_norm, 0, 1),
np.clip(xgb_prob, 0, 1),
np.log1p(np.clip(hits, 1, None)),
np.clip(correlated, 0, 1),
])
try:
X_scaled = scaler.transform(X_meta)
return clf.predict_proba(X_scaled)[:, 1]
except Exception as e:
log_info(f"[MetaLearner] Prédiction échouée ({e}) — fallback poids fixes")
# Fallback : formule linéaire avec poids fixes
_ae_w = AE_WEIGHT
_xgb_w = XGB_WEIGHT
return (1 - _xgb_w) * ((1 - _ae_w) * eif_norm + _ae_w * ae_norm) + _xgb_w * xgb_prob
@property
def is_trained(self) -> bool:
"""Vrai si le méta-learner est actif (assez de labels pour être fiable)."""
return self._clf is not None and self._n_samples >= self.MIN_SAMPLES
def build_labels_from_df(self, df: pd.DataFrame) -> pd.DataFrame:
"""Construit les labels supervisés pour l'entraînement du méta-learner.
Label=1 : bot connu (bot_name), Anubis DENY, anomalie CRITICAL/HIGH confirmée.
Label=0 : navigateur légitime LEGITIMATE_BROWSER, Anubis ALLOW.
Retourne seulement les lignes avec un label fiable.
"""
labeled = []
for _, row in df.iterrows():
threat = str(row.get('threat_level', ''))
bot_name = str(row.get('bot_name', ''))
action = str(row.get('anubis_bot_action', ''))
if bot_name != '' or action == 'DENY' or threat in ('CRITICAL', 'HIGH', 'KNOWN_BOT', 'ANUBIS_DENY'):
labeled.append({**row, 'label': 1})
elif threat == 'LEGITIMATE_BROWSER' or action == 'ALLOW':
labeled.append({**row, 'label': 0})
if not labeled:
return pd.DataFrame()
return pd.DataFrame(labeled)
# Singleton partagé entre les modèles Complet et Applicatif
_meta_learner_complet = MetaLearner()
_meta_learner_applicatif = MetaLearner()
def get_meta_learner(name: str) -> MetaLearner:
"""Retourne le singleton MetaLearner correspondant au modèle ``name``."""
if 'Applicatif' in name:
return _meta_learner_applicatif
return _meta_learner_complet