Files
ja4-platform/services/bot-detector/bot_detector/cycle.py
toto 1f103392ac refactor(bot-detector): extract monolith into modular package
Split bot_detector.py (~1982 lines) into 10 focused modules:
- config.py: all configuration constants and optional imports
- log.py: logging utilities (log_info, log_decision, append_training_history)
- infra.py: ClickHouse client, health check HTTP server, shutdown
- browser.py: multifactorial browser identification (5 axes)
- scoring.py: drift detection, feature validation, SHAP, clustering
- models.py: EIF, Autoencoder, XGBoost model management
- preprocessing.py: data preprocessing and feature list definitions
- pipeline.py: core semi-supervised scoring loop
- cycle.py: main analysis cycle orchestration
- __main__.py: entry point with startup banner

Update Dockerfile to copy package directory and use python -m bot_detector.

All 36 existing tests pass unchanged.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-09 01:02:04 +02:00

372 lines
22 KiB
Python

"""Cycle principal d'analyse : récupération, scoring et insertion des résultats.
Orchestre un cycle complet : requête ClickHouse, preprocessing, scoring
(Complet + Applicatif), feedback SOC, déduplication et insertion des résultats.
"""
import pandas as pd
from datetime import datetime
from .config import (
DB, DB_LOGS, CYCLE_INTERVAL, DEDUP_TTL_MIN,
ENABLE_MULTIWINDOW, MULTIWINDOW_VIEW,
ENABLE_FEEDBACK, FEEDBACK_WINDOW_DAYS, MAX_FAILURES,
BROWSER_CONFIDENCE_THRESHOLD, BROWSER_COHORT_RATIO,
)
from .log import log_info, log_decision
from .infra import get_client, set_healthy
from .preprocessing import preprocess_df, FEATURES_BASE, FEATURES_COMPLET
from .pipeline import run_semi_supervised_logic
# ═══════════════════════════════════════════════════════════════════════════════
# A5 — DÉDUPLICATION INTER-CYCLES AVEC TTL
# ═══════════════════════════════════════════════════════════════════════════════
# ═══════════════════════════════════════════════════════════════════════════════
# FEEDBACK LOOP — Intégration des classifications SOC dans la baseline
# ═══════════════════════════════════════════════════════════════════════════════
def _load_soc_feedback(client) -> dict:
"""Charge les classifications SOC récentes pour ajuster la baseline.
Retourne un dict {src_ip: classification} où classification est
'true_positive', 'false_positive', 'suspicious', etc.
Les faux positifs sont exclus du scoring (considérés comme humains),
les vrais positifs sont exclus de la baseline humaine.
"""
if not ENABLE_FEEDBACK:
return {}
try:
feedback_df = client.query_df(
f"SELECT entity_id AS src_ip, "
f" argMax(JSONExtractString(details, 'classification'), timestamp) AS classification "
f"FROM {DB}.audit_logs "
f"WHERE action = 'create_classification' "
f" AND entity_type = 'ip' "
f" AND timestamp >= now() - INTERVAL {FEEDBACK_WINDOW_DAYS} DAY "
f"GROUP BY entity_id"
)
if feedback_df is None or feedback_df.empty:
return {}
result = dict(zip(feedback_df['src_ip'], feedback_df['classification']))
log_info(f"[Feedback] {len(result)} classification(s) SOC chargées ({FEEDBACK_WINDOW_DAYS}j).")
return result
except Exception as e:
log_info(f"[Feedback] Impossible de charger les classifications SOC : {e}")
return {}
# ═══════════════════════════════════════════════════════════════════════════════
# A5 — DÉDUPLICATION INTER-CYCLES AVEC TTL
# ═══════════════════════════════════════════════════════════════════════════════
def _filter_recent_detections(client, all_anom: pd.DataFrame) -> pd.DataFrame:
"""
A5 : Filtre les IPs déjà insérées dans ml_detected_anomalies dans les DEDUP_TTL_MIN dernières minutes.
Exception : une IP est réinsérée si son nouveau score est ≥ 0.05 points plus bas (aggravation).
"""
if DEDUP_TTL_MIN <= 0 or all_anom.empty:
return all_anom
try:
recent_df = client.query_df(
f"SELECT src_ip, min(anomaly_score) AS best_score "
f"FROM {DB}.ml_detected_anomalies "
f"WHERE detected_at > now() - INTERVAL {DEDUP_TTL_MIN} MINUTE "
f"GROUP BY src_ip"
)
if recent_df.empty:
return all_anom
recent_map = dict(zip(recent_df['src_ip'], recent_df['best_score']))
def _should_insert(row):
"""Détermine si une anomalie doit être réinsérée selon l'évolution du score."""
prev = recent_map.get(row['src_ip'])
if prev is None:
return True
# Réinsérer seulement si le score brut s'est significativement aggravé
return float(row.get('raw_anomaly_score', row['anomaly_score'])) < float(prev) - 0.05
mask = all_anom.apply(_should_insert, axis=1)
filtered = all_anom[mask]
skipped = len(all_anom) - len(filtered)
if skipped > 0:
log_info(f"[Dedup TTL={DEDUP_TTL_MIN}min] {skipped} IP(s) filtrée(s) (déjà détectées récemment).")
return filtered
except Exception as e:
log_info(f"[Dedup] Erreur lors de la déduplication TTL : {e}")
return all_anom
# ═══════════════════════════════════════════════════════════════════════════════
# CYCLE PRINCIPAL
# ═══════════════════════════════════════════════════════════════════════════════
_consecutive_failures = 0
def fetch_and_analyze():
"""Exécute un cycle complet de détection : requête ClickHouse, scoring et insertion des résultats.
Récupère le trafic depuis la vue view_ai_features_1h (et optionnellement view_ai_features_24h),
applique run_semi_supervised_logic sur les deux modèles (Complet / Applicatif),
insère les scores dans ml_all_scores et les anomalies dans ml_detected_anomalies.
Met à jour l'état de santé et _consecutive_failures en cas d'échec de requête.
"""
global _consecutive_failures
cycle_id = datetime.now().strftime('%Y%m%d_%H%M%S')
log_info('')
log_info('=' * 70)
log_info(f' CYCLE {cycle_id}')
log_info('=' * 70)
client = get_client()
# ── Récupération du trafic (fenêtre 1h) ──────────────────────────────────
try:
df = client.query_df(f'SELECT * FROM {DB}.view_ai_features_1h')
except Exception as e:
log_info(f'ERREUR REQUETE: {e}')
_consecutive_failures += 1
if _consecutive_failures >= MAX_FAILURES:
set_healthy(False)
log_decision('CONSECUTIVE_FAILURES', cycle_id, '', {'count': _consecutive_failures, 'error': str(e)})
return
_consecutive_failures = 0
set_healthy(True)
if df is None or df.empty:
log_info('[Données] Aucun trafic trouvé dans view_ai_features_1h.')
return
log_info(f'[Données] {len(df)} sessions chargées depuis view_ai_features_1h ({len(df.columns)} colonnes).')
# ── Enrichissement avec les features avancées de la thèse §5 ─────────────
try:
df_thesis = client.query_df(f'SELECT * FROM {DB}.view_thesis_features_1h')
if df_thesis is not None and not df_thesis.empty:
df_thesis.columns = [c.split('.')[-1] for c in df_thesis.columns]
df.columns = [c.split('.')[-1] for c in df.columns]
thesis_cols = [c for c in df_thesis.columns if c not in ('window_start', 'src_ip', 'ja4', 'host')]
df = df.merge(
df_thesis, on=['window_start', 'src_ip', 'ja4', 'host'],
how='left', suffixes=('', '_thesis')
)
for col in thesis_cols:
if col in df.columns:
df[col] = df[col].fillna(df[col].median() if df[col].notna().any() else 0)
log_info(f'[Thèse §5] {len(df_thesis)} sessions enrichies avec {len(thesis_cols)} features avancées.')
else:
log_info('[Thèse §5] view_thesis_features_1h vide — features avancées indisponibles.')
except Exception as e:
log_info(f'[Thèse §5] view_thesis_features_1h inaccessible : {e} — features avancées ignorées.')
df = preprocess_df(df)
# ── Résumé des données chargées ───────────────────────────────────────────
n_total = len(df)
n_correlated = int((df.get('correlated', pd.Series()) == 1).sum())
n_uncorrelated = n_total - n_correlated
n_isp = int((df.get('asn_label', pd.Series()) == 'isp').sum())
n_datacenter = int((df.get('asn_label', pd.Series()) == 'datacenter').sum())
n_cdn = int((df.get('asn_label', pd.Series()) == 'cdn').sum())
n_known_bot = int((df.get('bot_name', pd.Series()) != '').sum())
n_anubis_allow = int((df.get('anubis_bot_action', pd.Series()) == 'ALLOW').sum())
n_anubis_deny = int((df.get('anubis_bot_action', pd.Series()) == 'DENY').sum())
n_anubis_weigh = int((df.get('anubis_bot_action', pd.Series()) == 'WEIGH').sum())
n_unique_ips = int(df['src_ip'].nunique()) if 'src_ip' in df.columns else 0
n_unique_ja4 = int(df['ja4'].nunique()) if 'ja4' in df.columns else 0
log_info(f'[Données] Après preprocessing : {n_total} sessions, {n_unique_ips} IP uniques, {n_unique_ja4} JA4 uniques.')
log_info(f'[Données] Corrélées (L3-L7) : {n_correlated:>6} | Non-corrélées (L7) : {n_uncorrelated:>6}')
log_info(f'[Données] ASN ISP : {n_isp:>6} | Datacenter : {n_datacenter:>6} | CDN : {n_cdn}')
log_info(f'[Données] Bots connus (dict) : {n_known_bot:>6} | Anubis ALLOW : {n_anubis_allow:>6}')
log_info(f'[Données] Anubis DENY : {n_anubis_deny:>6} | Anubis WEIGH : {n_anubis_weigh:>6}')
# Distribution navigateurs : dict_browser_ja4 (connu) + inféré (structurel)
if 'inferred_browser_family' in df.columns:
ibf_counts = df['inferred_browser_family'].value_counts()
ibf_known = ibf_counts[ibf_counts.index != '']
if not ibf_known.empty:
ibf_summary = ', '.join(f'{fam}={cnt}' for fam, cnt in ibf_known.head(7).items())
n_dict = int((df.get('browser_family', pd.Series('')).fillna('').astype(str) != '').sum())
n_inferred = int(ibf_known.sum()) - n_dict
log_info(f'[Données] Navigateurs : {ibf_known.sum():>6} sessions ({len(ibf_known)} familles : {ibf_summary})')
log_info(f'[Données] Dict JA4 connu : {n_dict:>6} | Inféré structurel : {max(0, n_inferred):>6}')
elif 'browser_family' in df.columns:
bf_counts = df['browser_family'].value_counts()
bf_known = bf_counts[bf_counts.index != '']
if not bf_known.empty:
bf_summary = ', '.join(f'{fam}={cnt}' for fam, cnt in bf_known.head(5).items())
log_info(f'[Données] Navigateurs JA4 : {bf_known.sum():>6} sessions ({len(bf_known)} familles : {bf_summary})')
# Distribution browser_confidence
if 'browser_confidence' in df.columns:
bc = df['browser_confidence']
n_high = int((bc >= BROWSER_CONFIDENCE_THRESHOLD).sum())
log_info(f'[Données] browser_confidence: mean={bc.mean():.3f}, ≥seuil({BROWSER_CONFIDENCE_THRESHOLD})={n_high}')
log_decision('CYCLE_START', cycle_id, '', {
'total_rows': n_total,
'human_rows': n_isp,
'known_bot_rows': n_known_bot,
'correlated_rows': n_correlated,
'anubis_allow_rows': n_anubis_allow,
'anubis_deny_rows': n_anubis_deny,
'anubis_weigh_rows': n_anubis_weigh,
'multiwindow': ENABLE_MULTIWINDOW,
})
try:
rec_df = client.query_df(f'SELECT src_ip, recurrence FROM {DB}.view_ip_recurrence')
recurrence_map = dict(zip(rec_df['src_ip'], rec_df['recurrence']))
except Exception:
recurrence_map = {}
# ── Feedback SOC : ajuster la baseline selon les classifications humaines ─
soc_feedback = _load_soc_feedback(client)
if soc_feedback:
fp_ips = {ip for ip, cls in soc_feedback.items() if cls in ('false_positive', 'legitimate')}
tp_ips = {ip for ip, cls in soc_feedback.items() if cls in ('true_positive', 'malicious', 'bot')}
if fp_ips:
# Les faux positifs confirmés rejoignent le pool humain
mask_fp = df['src_ip'].isin(fp_ips) & (df.get('asn_label', pd.Series(dtype=str)) != 'isp')
df.loc[mask_fp, 'asn_label'] = 'isp'
log_info(f"[Feedback] {mask_fp.sum()} lignes reclassées 'isp' (FP confirmés).")
if tp_ips:
# Les vrais positifs confirmés sont exclus de la baseline humaine
mask_tp = df['src_ip'].isin(tp_ips) & (df.get('asn_label', pd.Series(dtype=str)) == 'isp')
df.loc[mask_tp, 'asn_label'] = 'soc_confirmed_bot'
log_info(f"[Feedback] {mask_tp.sum()} lignes exclues de la baseline humaine (TP confirmés).")
log_decision('SOC_FEEDBACK', cycle_id, '', {
'fp_ips': len(fp_ips), 'tp_ips': len(tp_ips),
'total_classifications': len(soc_feedback),
})
# ── Features par modèle (voir DOCUMENTATION.md §4) ───────────────────────
feats = FEATURES_BASE
feats_complet = FEATURES_COMPLET
# ── Analyse fenêtre 1h ────────────────────────────────────────────────────
df_corr = df[df['correlated'] == 1].copy()
df_uncorr = df[df['correlated'] == 0].copy()
log_info('')
log_info(f'── Modèle Complet (L3→L7, corrélé) : {len(df_corr)} sessions, {len(feats_complet)} features ──')
anom_a, scored_a = run_semi_supervised_logic(df_corr, feats_complet, 'Complet', cycle_id, recurrence_map)
log_info('')
log_info(f'── Modèle Applicatif (L7 seul, non-corrélé) : {len(df_uncorr)} sessions, {len(feats)} features ──')
anom_b, scored_b = run_semi_supervised_logic(df_uncorr, feats, 'Applicatif', cycle_id, recurrence_map)
all_anom = pd.concat([anom_a, anom_b], ignore_index=True)
all_scored = pd.concat([scored_a, scored_b], ignore_index=True)
# ── A3 : Analyse fenêtre 24h (optionnelle) ────────────────────────────────
if ENABLE_MULTIWINDOW:
try:
df_24h = client.query_df(f'SELECT * FROM {DB}.{MULTIWINDOW_VIEW}')
if df_24h is not None and not df_24h.empty:
df_24h = preprocess_df(df_24h)
log_info(f"[24h] {len(df_24h)} sessions dans la fenêtre 24h.")
anom_c, scored_c = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 1].copy(), feats_complet, 'Complet_24h', cycle_id, recurrence_map)
anom_d, scored_d = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 0].copy(), feats, 'Applicatif_24h', cycle_id, recurrence_map)
all_anom_24h = pd.concat([anom_c, anom_d], ignore_index=True)
all_scored_24h = pd.concat([scored_c, scored_d], ignore_index=True)
# Fusion : pour les IPs présentes dans les deux fenêtres, conserver le score le plus bas
if not all_anom_24h.empty:
all_anom = pd.concat([all_anom, all_anom_24h], ignore_index=True)
log_info(f"[24h] Fusion 1h+24h : {len(all_anom)} entrées avant déduplication.")
all_scored = pd.concat([all_scored, all_scored_24h], ignore_index=True)
else:
log_info(f"[24h] Vue {MULTIWINDOW_VIEW} vide — analyse mono-fenêtre.")
except Exception as e:
log_info(f"[24h] Vue {MULTIWINDOW_VIEW} inaccessible : {e} — analyse mono-fenêtre.")
# ── Insertion de toutes les classifications dans ml_all_scores ───────────
if not all_scored.empty:
try:
now = datetime.now().replace(microsecond=0)
all_scored['detected_at'] = now
all_scored['ja4'] = all_scored['ja4'].replace({'': 'HTTP_CLEAR_TEXT'})
# Utiliser la famille inférée (multifactorielle) pour browser_family
if 'inferred_browser_family' in all_scored.columns:
all_scored['browser_family'] = all_scored['inferred_browser_family']
all_scores_cols = [
'detected_at', 'window_start', 'src_ip', 'ja4', 'host', 'bot_name',
'browser_family',
'anubis_bot_name', 'anubis_bot_action', 'anubis_bot_category',
'anomaly_score', 'raw_anomaly_score', 'threat_level', 'model_name',
'correlated', 'asn_number', 'asn_org', 'country_code', 'asn_label',
'hits', 'hit_velocity', 'fuzzing_index', 'post_ratio', 'campaign_id',
'ae_recon_error', 'xgb_prob'
]
scores_df = all_scored[[c for c in all_scores_cols if c in all_scored.columns]]
client.insert_df(f'{DB}.ml_all_scores', scores_df)
log_info(f'[ml_all_scores] {len(scores_df)} sessions scorées enregistrées.')
except Exception as e:
log_info(f'[ml_all_scores] ERREUR INSERTION: {e}')
if not all_anom.empty:
all_anom = all_anom.sort_values('raw_anomaly_score', ascending=True).drop_duplicates(subset=['src_ip'], keep='first')
log_info(f'[Dédup] Intra-cycle : {len(all_anom)} IP uniques après déduplication.')
# A5 — Déduplication inter-cycles avec TTL
all_anom = _filter_recent_detections(client, all_anom)
if all_anom.empty:
log_info('[Dédup] Toutes les anomalies filtrées par TTL — rien à insérer.')
log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN})
return
all_anom['detected_at'] = datetime.now().replace(microsecond=0)
fake_nav_col = 'is_fake_navigation'
all_anom['is_headless'] = all_anom[fake_nav_col].astype(int) if fake_nav_col in all_anom.columns else 0
cols = [
'detected_at', 'src_ip', 'ja4', 'host', 'bot_name', 'browser_family', 'anomaly_score',
'raw_anomaly_score', 'campaign_id',
'threat_level', 'model_name', 'recurrence',
'asn_number', 'asn_org', 'asn_detail', 'asn_domain', 'country_code', 'asn_label',
'hits', 'hit_velocity', 'fuzzing_index', 'post_ratio', 'port_exhaustion_ratio', 'max_keepalives', 'orphan_ratio',
'tcp_jitter_variance', 'tcp_shared_count', 'true_window_size', 'window_mss_ratio',
'alpn_http_mismatch', 'is_alpn_missing', 'sni_host_mismatch',
'header_count', 'has_accept_language', 'has_cookie', 'has_referer',
'modern_browser_score', 'is_headless', 'ua_ch_mismatch',
'header_order_shared_count', 'ip_id_zero_ratio', 'request_size_variance',
'multiplexing_efficiency', 'mss_mobile_mismatch',
'correlated', 'reason', 'asset_ratio', 'direct_access_ratio', 'is_ua_rotating',
'distinct_ja4_count', 'src_port_density', 'ja4_asn_concentration',
'ja4_country_concentration', 'is_rare_ja4',
'header_order_confidence', 'distinct_header_orders', 'temporal_entropy',
'path_diversity_ratio', 'url_depth_variance', 'anomalous_payload_ratio',
'anubis_bot_name', 'anubis_bot_action', 'anubis_bot_category',
]
try:
final_df = all_anom[[c for c in cols if c in all_anom.columns]]
client.insert_df(f'{DB}.ml_detected_anomalies', final_df)
n_critical = int((final_df.get('threat_level', pd.Series()) == 'CRITICAL').sum())
n_high = int((final_df.get('threat_level', pd.Series()) == 'HIGH').sum())
n_medium = int((final_df.get('threat_level', pd.Series()) == 'MEDIUM').sum())
n_known = int((final_df.get('bot_name', pd.Series()) != '').sum())
log_info('')
log_info(f'╔══ RÉSULTAT CYCLE {cycle_id} ══════════════════════════════════')
log_info(f'{len(final_df)} menaces insérées dans ml_detected_anomalies')
log_info(f'║ CRITICAL={n_critical} HIGH={n_high} MEDIUM={n_medium} KNOWN_BOT={n_known}')
if not all_scored.empty:
log_info(f'{len(all_scored)} sessions scorées dans ml_all_scores')
log_info(f'╚═══════════════════════════════════════════════════════════')
log_decision('CYCLE_END', cycle_id, '', {
'inserted': len(final_df),
'anomalies': int((final_df.get('bot_name', pd.Series()) == '').sum()),
'known_bots': n_known,
'critical': n_critical,
'high': n_high,
'dedup_ttl_min': DEDUP_TTL_MIN,
})
except Exception as e:
log_info(f'ERREUR INSERTION: {e}')
else:
log_info('')
log_info(f'╔══ RÉSULTAT CYCLE {cycle_id} ══════════════════════════════════')
log_info(f'║ Aucune menace détectée ce cycle.')
if not all_scored.empty:
log_info(f'{len(all_scored)} sessions scorées dans ml_all_scores')
tl_dist = all_scored['threat_level'].value_counts().to_dict() if not all_scored.empty and 'threat_level' in all_scored.columns else {}
if tl_dist:
log_info(f'║ Distribution : {", ".join(f"{k}={v}" for k, v in sorted(tl_dist.items()))}')
log_info(f'╚═══════════════════════════════════════════════════════════')
log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN})