Files
ja4-platform/services/bot-detector/bot_detector/pipeline.py
toto 039086a0b3 feat: nouvelles techniques de détection et page tactiques SOC
SQL:
- Ajout 5 colonnes d'agrégation (count_xff, count_unusual_ct,
  count_non_std_port, count_login_post, sec_ch_mobile_mismatch)
- Exposition de 5 features calculées dans view_ai_features_1h
- Migration ALTER TABLE pour déploiements existants

Bot-detector:
- 7 nouvelles features ML (has_xff, unusual_content_type_ratio,
  non_standard_port_ratio, login_post_concentration,
  sec_ch_mobile_mismatch, true_window_size, window_mss_ratio)
- Propagation campaign_id vers ml_all_scores (était toujours -1)
- Escalade campagne : HIGH→CRITICAL si cluster ≥5 membres

Dashboard:
- Page Tactiques SOC : brute-force, rotation JA4, récurrence,
  alertes temps réel — 4 KPIs + 4 panneaux + infobulles doc
- Ajout fmtDate() helper global
- Navigation sidebar mise à jour

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-09 14:29:18 +02:00

379 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Pipeline de détection semi-supervisée.
Fonction principale run_semi_supervised_logic() : triage, scoring EIF, AE, XGB,
classification navigateur, détection d'anomalies, SHAP et clustering.
"""
import numpy as np
import pandas as pd
from .config import (
DB, CONTAMINATION, AE_WEIGHT, XGB_WEIGHT, RECURRENCE_WEIGHT,
ANOMALY_THRESHOLD, ANOMALY_PERCENTILE, ENABLE_CLUSTERING,
ENABLE_SHAP, EIF_AVAILABLE, TORCH_AVAILABLE, XGB_AVAILABLE,
BROWSER_CONFIDENCE_THRESHOLD, BROWSER_COHORT_RATIO,
MIN_VALID_FEATURE_RATIO, STRUCTURAL_EXCLUDED_FEATURES,
)
from .log import log_info, log_decision
from .infra import score_to_threat_level, get_client
from .models import load_or_train_model, load_or_train_xgb, TrafficAutoEncoder
from .scoring import (
validate_features, compute_adaptive_threshold, normalize_scores,
compute_shap_top_features, build_reason, cluster_anomalies,
)
# ═══════════════════════════════════════════════════════════════════════════════
# ANALYSE SEMI-SUPERVISÉE
# ═══════════════════════════════════════════════════════════════════════════════
def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
"""Applique le pipeline de détection semi-supervisée sur un sous-ensemble du trafic.
Trifurque le trafic en bots connus, bots Anubis ALLOW et trafic inconnu,
entraîne ou charge le modèle IsolationForest sur la baseline humaine,
score le trafic inconnu, applique les améliorations A2/A4/A6/A8,
et retourne (threats, all_scored) sous forme de DataFrames.
Effets de bord : écriture dans les logs de décision via log_decision.
"""
# 1. Bots connus (dict_bot_ip / dict_bot_ja4) → exclus du scoring IF
known_bots = df[df['bot_name'] != ''].copy()
rest = df[df['bot_name'] == ''].copy()
# 2. Bots Anubis ALLOW → bots légitimes, exclus du scoring IF
anubis_allow = rest[rest['anubis_bot_action'] == 'ALLOW'].copy()
# 3. Tout le reste passe par l'IsolationForest pour un score réel :
# - DENY : menaces identifiées par règles Anubis → IF donne le score de sévérité
# - WEIGH / inconnu → scorés normalement (anubis_is_flagged=1 pour WEIGH)
# Les DENY sont TOUJOURS inclus dans les threats, indépendamment du seuil IF.
unknown_traffic = rest[rest['anubis_bot_action'] != 'ALLOW'].copy()
human_baseline = unknown_traffic[unknown_traffic['asn_label'] == 'isp']
log_info(f'[{name}] ── Triage ──────────────────────────────────────')
log_info(f'[{name}] Total sessions : {len(df):>6}')
log_info(f'[{name}] Bots connus (dict) : {len(known_bots):>6}')
log_info(f'[{name}] Anubis ALLOW : {len(anubis_allow):>6}')
log_info(f'[{name}] Trafic à scorer (IF) : {len(unknown_traffic):>6}')
log_info(f'[{name}] Baseline ISP (human) : {len(human_baseline):>6} (seuil min=500)')
# A7 — Valider les features avant tout traitement
valid_features = validate_features(df, features, name, cycle_id)
if valid_features is None:
return pd.DataFrame(), pd.DataFrame()
if len(human_baseline) < 500:
log_info(f"[{name}] ⚠ Données humaines insuffisantes ({len(human_baseline)} < 500) — cycle ignoré.")
log_info(f"[{name}] Distribution asn_label dans le trafic à scorer :")
if 'asn_label' in unknown_traffic.columns:
for label, cnt in unknown_traffic['asn_label'].value_counts().head(8).items():
log_info(f"[{name}] {label:>15} : {cnt}")
log_decision('SKIPPED_LOW_DATA', cycle_id, name, {
'human_count': len(human_baseline), 'unknown_count': len(unknown_traffic)
})
return pd.DataFrame(), pd.DataFrame()
log_info(f'[{name}] ── Modèle EIF ─────────────────────────────────')
log_info(f'[{name}] Features validées : {len(valid_features)}/{len(features)} ({", ".join(valid_features[:5])}{"" if len(valid_features) > 5 else ""})')
# A1 — Dérive conceptuelle intégrée dans load_or_train_model
model, ae_model, model_features = load_or_train_model(name, human_baseline, valid_features, cycle_id)
# Utiliser les features du modèle (possiblement différentes après pruning/chargement)
scoring_features = [f for f in model_features if f in unknown_traffic.columns]
unknown_traffic = unknown_traffic.copy()
X_test = unknown_traffic[scoring_features].replace([np.inf, -np.inf], np.nan)
X_test = X_test.fillna(X_test.median())
raw_scores = model.decision_function(X_test)
# isotree renvoie des scores dans [0, 1] : 0.5 = frontière, >0.5 = anomal
# sklearn renvoie des scores centrés sur 0 : <0 = anomal, >0 = normal
# Conversion : sklearn_equiv = 0.5 - isotree_score
# isotree 0.8 → -0.3 (CRITICAL) | isotree 0.5 → 0.0 (frontière)
# isotree 0.3 → +0.2 (NORMAL)
if EIF_AVAILABLE:
raw_scores = 0.5 - raw_scores
log_info(f'[{name}] Scoring EIF : {len(X_test)} sessions scorées (min={raw_scores.min():.4f}, max={raw_scores.max():.4f}, mean={raw_scores.mean():.4f})')
# Combinaison EIF + Autoencoder si disponible
# Score final = (1-α) * eif_norm + α * ae_norm où α = AE_WEIGHT
if ae_model is not None and AE_WEIGHT > 0:
try:
ae_recon_errors = ae_model.score_samples(X_test.values)
ae_norm = normalize_scores(-ae_recon_errors) # plus élevé = plus anomal
eif_norm = normalize_scores(raw_scores)
combined_norm = (1 - AE_WEIGHT) * eif_norm + AE_WEIGHT * ae_norm
unknown_traffic['ae_recon_error'] = ae_recon_errors
unknown_traffic['anomaly_score'] = combined_norm
log_info(f"[{name}] Score combiné EIF+AE (α={AE_WEIGHT}): ae_mean={ae_recon_errors.mean():.6f}")
except Exception as exc:
log_info(f"[{name}] AE scoring échoué : {exc} — utilisation EIF seul.")
unknown_traffic['ae_recon_error'] = 0.0
unknown_traffic['anomaly_score'] = normalize_scores(raw_scores)
else:
unknown_traffic['ae_recon_error'] = 0.0
unknown_traffic['anomaly_score'] = normalize_scores(raw_scores)
# raw_anomaly_score : score brut IF pour comparaison au seuil et assignation du threat_level
unknown_traffic['raw_anomaly_score'] = raw_scores
unknown_traffic['model_name'] = name
# XGBoost supervisé — troisième voix (si labels historiques disponibles)
unknown_traffic['xgb_prob'] = 0.0
if XGB_AVAILABLE and XGB_WEIGHT > 0:
try:
xgb_client = get_client()
xgb_model, xgb_feats = load_or_train_xgb(name, xgb_client, scoring_features, cycle_id)
if xgb_model is not None and xgb_feats is not None:
# XGB peut utiliser un sous-ensemble de features (celles disponibles dans la vue)
xgb_cols = [f for f in xgb_feats if f in unknown_traffic.columns]
X_xgb = unknown_traffic[xgb_cols].replace([np.inf, -np.inf], np.nan).fillna(0)
xgb_probs = xgb_model.predict_proba(X_xgb.values)[:, 1]
unknown_traffic['xgb_prob'] = xgb_probs
# Méta-learner : combiner anomaly_score (EIF+AE) et xgb_prob
# anomaly_score déjà normalisé [0,1], xgb_prob est [0,1]
α_xgb = XGB_WEIGHT
unknown_traffic['anomaly_score'] = (
(1 - α_xgb) * unknown_traffic['anomaly_score'] + α_xgb * xgb_probs
)
log_info(f"[{name}] Score combiné EIF+AE+XGB (β={α_xgb}): xgb_mean={xgb_probs.mean():.4f}")
except Exception as exc:
log_info(f"[{name}] XGBoost scoring échoué : {exc} — EIF+AE seuls.")
# A2 — Seuil adaptatif calculé sur les scores BRUTS (même échelle que ANOMALY_THRESHOLD)
effective_threshold = compute_adaptive_threshold(raw_scores)
log_info(f"[{name}] Seuil effectif : {effective_threshold:.4f} (statique={ANOMALY_THRESHOLD}, percentile={ANOMALY_PERCENTILE})")
# A6 — Pénaliser les IPs récurrentes sur le score BRUT avant comparaison au seuil
if RECURRENCE_WEIGHT > 0:
recurrences = unknown_traffic['src_ip'].map(recurrence_map).fillna(0)
penalty = np.log1p(recurrences.values) * RECURRENCE_WEIGHT
unknown_traffic['raw_anomaly_score'] = unknown_traffic['raw_anomaly_score'] - penalty
# Assigner threat_level à TOUTES les sessions scorées (pour ml_all_scores)
unknown_traffic['threat_level'] = unknown_traffic['raw_anomaly_score'].apply(score_to_threat_level)
unknown_traffic['recurrence'] = unknown_traffic['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1
unknown_traffic['campaign_id'] = -1
# Extraire les DENY (maintenant avec leur vrai score IF) et forcer leur threat_level
deny_mask = unknown_traffic['anubis_bot_action'] == 'DENY'
unknown_traffic.loc[deny_mask, 'threat_level'] = 'ANUBIS_DENY'
# ── A9 — Classification multifactorielle des navigateurs légitimes ─────────
# 5 axes indépendants : JA4 connu, structure JA4, headers HTTP modernes,
# comportement de navigation, cohérence TLS/TCP.
# browser_confidence [0..1] = combinaison pondérée des 5 axes.
# La classification n'exige plus que le JA4 soit dans le dictionnaire statique :
# un JA4 inconnu avec des signaux HTTP/TLS/nav forts sera quand même reconnu.
_bconf = unknown_traffic.get('browser_confidence', pd.Series(0, index=unknown_traffic.index)).fillna(0)
_ibf = unknown_traffic.get('inferred_browser_family', pd.Series('', index=unknown_traffic.index)).fillna('').astype(str)
browser_legit_mask = (
(_bconf >= BROWSER_CONFIDENCE_THRESHOLD) & # confiance multifactorielle
(_ibf != '') & # famille identifiée (dict ou inférée)
(unknown_traffic['threat_level'].isin(['NORMAL', 'LOW'])) & # pas de menace IF
(~deny_mask) # pas un DENY Anubis
)
# Propagation par cohorte JA4 : si ≥ BROWSER_COHORT_RATIO% des sessions d'un JA4
# sont déjà classées navigateur, propager aux sessions restantes du même JA4.
if browser_legit_mask.any():
ja4_col = unknown_traffic['ja4']
legit_per_ja4 = ja4_col[browser_legit_mask].value_counts()
total_per_ja4 = ja4_col.value_counts()
ratio_per_ja4 = (legit_per_ja4 / total_per_ja4).dropna()
cohort_ja4s = set(ratio_per_ja4[ratio_per_ja4 >= BROWSER_COHORT_RATIO].index)
if cohort_ja4s:
cohort_mask = (
ja4_col.isin(cohort_ja4s) &
(~browser_legit_mask) & # pas déjà classé
(unknown_traffic['threat_level'].isin(['NORMAL', 'LOW'])) &
(~deny_mask)
)
browser_legit_mask = browser_legit_mask | cohort_mask
n_cohort = cohort_mask.sum()
if n_cohort > 0:
log_info(f"[{name}] Propagation cohorte JA4 : {n_cohort} sessions supplémentaires ({len(cohort_ja4s)} JA4)")
if browser_legit_mask.any():
unknown_traffic.loc[browser_legit_mask, 'threat_level'] = 'LEGITIMATE_BROWSER'
# Utiliser la famille inférée (dict ou structurel)
_family_disp = _ibf[browser_legit_mask].where(_ibf[browser_legit_mask] != '', 'Unknown')
unknown_traffic.loc[browser_legit_mask, 'reason'] = (
'[Navigateur légitime] ' + _family_disp +
' (confiance=' + _bconf[browser_legit_mask].round(2).astype(str) + ')'
)
n_legit = browser_legit_mask.sum()
families = _ibf[browser_legit_mask].value_counts().to_dict()
# Log des axes moyens pour diagnostic
ax_means = {}
for ax in ['axis_ja4_known', 'axis_ja4_struct', 'axis_http_modern',
'axis_nav_behavior', 'axis_tls_coherence']:
col = unknown_traffic.get(ax, None)
if col is not None:
ax_means[ax.replace('axis_', '')] = round(float(col[browser_legit_mask].mean()), 3)
log_info(f"[{name}] {n_legit} session(s) classée(s) LEGITIMATE_BROWSER : {families}")
log_info(f"[{name}] Axes moyens : {ax_means}")
log_decision('LEGITIMATE_BROWSER', cycle_id, name, {
'count': int(n_legit), 'families': families,
'mean_confidence': round(float(_bconf[browser_legit_mask].mean()), 3),
'axis_means': ax_means,
})
# Capturer toutes les sessions scorées (avant filtrage par seuil) — pour ml_all_scores
all_scored = unknown_traffic.copy()
if not known_bots.empty:
known_bots = known_bots.copy()
known_bots['anomaly_score'] = 0.0
known_bots['raw_anomaly_score'] = 0.0
known_bots['ae_recon_error'] = 0.0
known_bots['xgb_prob'] = 0.0
known_bots['threat_level'] = 'KNOWN_BOT'
known_bots['model_name'] = name
known_bots['campaign_id'] = -1
known_bots['reason'] = '[Identification] Bot légitime: ' + known_bots['bot_name']
known_bots['recurrence'] = known_bots['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1
for _, row in known_bots.iterrows():
log_decision('KNOWN_BOT', cycle_id, name, {
'src_ip': row.get('src_ip', ''), 'bot_name': row.get('bot_name', ''),
'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''),
'asn_domain': row.get('asn_domain', ''), 'country_code': row.get('country_code', ''),
'recurrence': int(row.get('recurrence', 1))
})
# ── Anubis ALLOW : bots légitimes identifiés par règles Anubis ───────────
if not anubis_allow.empty:
anubis_allow = anubis_allow.copy()
anubis_allow['anomaly_score'] = 0.0
anubis_allow['raw_anomaly_score'] = 0.0
anubis_allow['ae_recon_error'] = 0.0
anubis_allow['xgb_prob'] = 0.0
anubis_allow['threat_level'] = 'KNOWN_BOT'
anubis_allow['bot_name'] = anubis_allow['anubis_bot_name']
anubis_allow['model_name'] = name
anubis_allow['campaign_id'] = -1
anubis_allow['reason'] = '[Anubis ALLOW] ' + anubis_allow['anubis_bot_name']
anubis_allow['recurrence'] = anubis_allow['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1
for _, row in anubis_allow.iterrows():
log_decision('KNOWN_BOT', cycle_id, name, {
'src_ip': row.get('src_ip', ''), 'bot_name': row.get('anubis_bot_name', ''),
'anubis_bot_name': row.get('anubis_bot_name', ''),
'anubis_bot_action': row.get('anubis_bot_action', ''),
'anubis_bot_category': row.get('anubis_bot_category', ''),
'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''),
'asn_domain': row.get('asn_domain', ''), 'country_code': row.get('country_code', ''),
'recurrence': int(row.get('recurrence', 1)),
})
# ── Anubis DENY : scorés par IF, toujours inclus dans les threats ────────
# Extraits de unknown_traffic après scoring — ils ont leur vrai score IF.
anubis_deny = unknown_traffic[deny_mask].copy()
if not anubis_deny.empty:
anubis_deny['reason'] = '[Anubis DENY] ' + anubis_deny['anubis_bot_name'].fillna('') + \
' | ' + anubis_deny['raw_anomaly_score'].apply(lambda s: f'IF={s:.4f}')
log_info(f"[{name}] Anubis DENY: {len(anubis_deny)} IP(s) scorées par IF "
f"(score moyen: {anubis_deny['raw_anomaly_score'].mean():.4f}).")
for _, row in anubis_deny.iterrows():
log_decision('ANUBIS_DENY', cycle_id, name, {
'src_ip': row.get('src_ip', ''), 'anubis_bot_name': row.get('anubis_bot_name', ''),
'anubis_bot_action': row.get('anubis_bot_action', ''),
'anubis_bot_category': row.get('anubis_bot_category', ''),
'anomaly_score': round(float(row.get('anomaly_score', 0)), 4),
'raw_anomaly_score': round(float(row.get('raw_anomaly_score', 0)), 4),
'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''),
'asn_domain': row.get('asn_domain', ''), 'country_code': row.get('country_code', ''),
'recurrence': int(row.get('recurrence', 1)),
})
# Filtrer sur raw_anomaly_score (A6 inclus) — seulement le trafic non-DENY et non-navigateur légitime
# Les DENY sont toujours des threats, indépendamment du seuil IF
# Les LEGITIMATE_BROWSER sont exclus des anomalies (navigateurs confirmés)
non_deny_traffic = unknown_traffic[~deny_mask & (unknown_traffic['threat_level'] != 'LEGITIMATE_BROWSER')]
anomalies = non_deny_traffic[non_deny_traffic['raw_anomaly_score'] < effective_threshold].copy()
if not anomalies.empty:
log_info(f"[{name}] ALERT: {len(anomalies)} anomalies détectées (seuil={effective_threshold:.4f}).")
anomalies['recurrence'] = anomalies['src_ip'].map(recurrence_map).fillna(0).astype(int) + 1
# A4 — Explainabilité SHAP : top features responsables de chaque anomalie
X_anomalies = X_test.loc[anomalies.index]
shap_tops = compute_shap_top_features(model, X_anomalies, valid_features)
anomalies['reason'] = [
build_reason(name, row, shap)
for (_, row), shap in zip(anomalies.iterrows(), shap_tops)
]
# A8 — Clustering DBSCAN pour identifier les campagnes coordonnées
if ENABLE_CLUSTERING:
anomalies = cluster_anomalies(anomalies, scoring_features, ae_model=ae_model)
# P2 — Escalade par taille de campagne : les IPs dans un cluster
# coordonné de grande taille sont plus menaçantes que des IPs isolées.
# Escalader HIGH → CRITICAL si cluster_size ≥ 5.
if 'campaign_id' in anomalies.columns:
cid_counts = anomalies['campaign_id'].value_counts()
for cid, size in cid_counts.items():
if cid < 0:
continue
if size >= 5:
mask = (anomalies['campaign_id'] == cid) & (anomalies['threat_level'] == 'HIGH')
n_escalated = mask.sum()
if n_escalated > 0:
anomalies.loc[mask, 'threat_level'] = 'CRITICAL'
anomalies.loc[mask, 'reason'] = anomalies.loc[mask, 'reason'] + \
f' [Escalade campagne #{cid}, {size} IPs coordonnées]'
log_info(f"[{name}] Escalade campagne #{cid}: {n_escalated} IP(s) HIGH→CRITICAL ({size} membres)")
anomalies['ja4'] = anomalies['ja4'].replace({'': 'HTTP_CLEAR_TEXT'})
for _, row in anomalies.iterrows():
log_decision('ANOMALY', cycle_id, name, {
'src_ip': row.get('src_ip', ''), 'anomaly_score': round(float(row.get('anomaly_score', 0)), 4),
'raw_anomaly_score': round(float(row.get('raw_anomaly_score', 0)), 4),
'threat_level': row.get('threat_level', ''), 'recurrence': int(row.get('recurrence', 1)),
'hit_velocity': round(float(row.get('hit_velocity', 0)), 2),
'fuzzing_index': round(float(row.get('fuzzing_index', 0)), 2),
'post_ratio': round(float(row.get('post_ratio', 0)), 3),
'asn_number': row.get('asn_number', ''), 'asn_org': row.get('asn_org', ''),
'asn_detail': row.get('asn_detail', ''), 'asn_domain': row.get('asn_domain', ''),
'country_code': row.get('country_code', ''), 'asn_label': row.get('asn_label', ''),
'ja4': row.get('ja4', ''), 'host': row.get('host', ''),
'correlated': int(row.get('correlated', 0)), 'campaign_id': int(row.get('campaign_id', -1)),
'effective_threshold': round(effective_threshold, 4), 'reason': row.get('reason', '')
})
threats = pd.concat([df for df in [
anomalies if not anomalies.empty else None,
known_bots if not known_bots.empty else None,
anubis_allow if not anubis_allow.empty else None,
anubis_deny if not anubis_deny.empty else None,
] if df is not None], ignore_index=True)
# Propager campaign_id des anomalies clusterisées vers all_scored
# (all_scored a été capturé avant clustering, ses campaign_id sont tous -1)
if not anomalies.empty and 'campaign_id' in anomalies.columns:
cid_map = anomalies.set_index(anomalies.index)['campaign_id']
matched = all_scored.index.isin(cid_map.index)
if matched.any():
all_scored.loc[matched, 'campaign_id'] = cid_map
# Inclure anubis_allow dans all_scored pour traçabilité dans ml_all_scores.
# Ces IPs sont exclues de l'analyse IF mais doivent apparaître dans la table
# de scores avec threat_level='KNOWN_BOT' et anomaly_score=0.0.
if not anubis_allow.empty:
all_scored = pd.concat([all_scored, anubis_allow], ignore_index=True)
# ── Résumé du modèle ─────────────────────────────────────────────────────
n_threats = len(threats) if not threats.empty else 0
n_anomalies = len(anomalies) if not anomalies.empty else 0
n_legit_browser = int(browser_legit_mask.sum()) if browser_legit_mask is not None else 0
n_deny = len(anubis_deny) if not anubis_deny.empty else 0
tl_counts = threats['threat_level'].value_counts().to_dict() if not threats.empty else {}
tl_str = ', '.join(f'{k}={v}' for k, v in sorted(tl_counts.items())) if tl_counts else 'aucune'
log_info(f'[{name}] ── Résultat ────────────────────────────────────')
log_info(f'[{name}] Menaces totales : {n_threats:>6} ({tl_str})')
log_info(f'[{name}] Anomalies IF : {n_anomalies:>6} (seuil={effective_threshold:.4f})')
log_info(f'[{name}] Navigateurs légit. : {n_legit_browser:>6}')
log_info(f'[{name}] Anubis DENY (forcé) : {n_deny:>6}')
log_info(f'[{name}] Sessions scorées : {len(all_scored):>6} (→ ml_all_scores)')
return threats, all_scored