ja4ebpf: - Refactor BPF TC capture with improved SYN offset handling and TCP option parsing - Enhance TLS uprobe SSL hooking for better key extraction - Add ClickHouse writer improvements for HTTP log materialized views - Update RPM spec for Rocky Linux 8/9/10, fix systemd service - Simplify loader with cleaner bpf2go integration bot-detector: - Add H2 SETTINGS per-parameter comparison in browser_matcher - Enhance browser signatures and scoring pipeline - Improve preprocessing and cycle detection infra: - Multi-distro Vagrantfile (centos8, rocky9, rocky10) with per-distro provisioning - New Makefile targets: vm-up-all, test-vm-matrix, test-vm-centos8/rocky10 - Add debug helpers and run-test-from-host.sh for host-driven VM testing - Update run-tests-vm.sh for cross-distro compatibility - Remove accidental binary blob (\004) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
461 lines
26 KiB
Python
461 lines
26 KiB
Python
"""Cycle principal d'analyse : récupération, scoring et insertion des résultats.
|
||
|
||
Orchestre un cycle complet : requête ClickHouse, preprocessing, scoring
|
||
(Complet + Applicatif), feedback SOC, déduplication et insertion des résultats.
|
||
"""
|
||
import time
|
||
import pandas as pd
|
||
from datetime import datetime
|
||
|
||
from .config import (
|
||
DB, DB_LOGS, CYCLE_INTERVAL, DEDUP_TTL_MIN,
|
||
ENABLE_MULTIWINDOW, MULTIWINDOW_VIEW,
|
||
ENABLE_FEEDBACK, FEEDBACK_WINDOW_DAYS, MAX_FAILURES,
|
||
BROWSER_CONFIDENCE_THRESHOLD, BROWSER_COHORT_RATIO,
|
||
)
|
||
from .log import log_info, log_decision
|
||
from .infra import get_client, set_healthy
|
||
from .preprocessing import preprocess_df, FEATURES, FEATURES_COMPLET
|
||
from .pipeline import run_semi_supervised_logic
|
||
from .fleet import enrich_with_fleet_score
|
||
from .browser_signatures import reload_signatures_from_clickhouse
|
||
from .metrics import record_cycle_metrics
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# A5 — DÉDUPLICATION INTER-CYCLES AVEC TTL
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# FEEDBACK LOOP — Intégration des classifications SOC dans la baseline
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def _load_soc_feedback(client) -> dict:
|
||
"""Charge les classifications SOC récentes pour ajuster la baseline.
|
||
|
||
Retourne un dict {src_ip: classification} où classification est
|
||
'true_positive', 'false_positive', 'suspicious', etc.
|
||
Les faux positifs sont exclus du scoring (considérés comme humains),
|
||
les vrais positifs sont exclus de la baseline humaine.
|
||
"""
|
||
if not ENABLE_FEEDBACK:
|
||
return {}
|
||
try:
|
||
feedback_df = client.query_df(
|
||
f"SELECT entity_id AS src_ip, "
|
||
f" argMax(JSONExtractString(details, 'classification'), timestamp) AS classification "
|
||
f"FROM {DB}.audit_logs "
|
||
f"WHERE action = 'create_classification' "
|
||
f" AND entity_type = 'ip' "
|
||
f" AND timestamp >= now() - INTERVAL {FEEDBACK_WINDOW_DAYS} DAY "
|
||
f"GROUP BY entity_id"
|
||
)
|
||
if feedback_df is None or feedback_df.empty:
|
||
return {}
|
||
result = dict(zip(feedback_df['src_ip'], feedback_df['classification']))
|
||
log_info(f"[Feedback] {len(result)} classification(s) SOC chargées ({FEEDBACK_WINDOW_DAYS}j).")
|
||
return result
|
||
except Exception as e:
|
||
log_info(f"[Feedback] Impossible de charger les classifications SOC : {e}")
|
||
return {}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# A5 — DÉDUPLICATION INTER-CYCLES AVEC TTL
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def _filter_recent_detections(client, all_anom: pd.DataFrame) -> pd.DataFrame:
|
||
"""
|
||
A5 : Filtre les IPs déjà insérées dans ml_detected_anomalies dans les DEDUP_TTL_MIN dernières minutes.
|
||
Exception : une IP est réinsérée si son nouveau score est ≥ 0.05 points plus bas (aggravation).
|
||
"""
|
||
if DEDUP_TTL_MIN <= 0 or all_anom.empty:
|
||
return all_anom
|
||
try:
|
||
recent_df = client.query_df(
|
||
f"SELECT src_ip, min(anomaly_score) AS best_score "
|
||
f"FROM {DB}.ml_detected_anomalies "
|
||
f"WHERE detected_at > now() - INTERVAL {DEDUP_TTL_MIN} MINUTE "
|
||
f"GROUP BY src_ip"
|
||
)
|
||
if recent_df.empty:
|
||
return all_anom
|
||
recent_map = dict(zip(recent_df['src_ip'], recent_df['best_score']))
|
||
def _should_insert(row):
|
||
"""Détermine si une anomalie doit être réinsérée selon l'évolution du score."""
|
||
prev = recent_map.get(row['src_ip'])
|
||
if prev is None:
|
||
return True
|
||
# Réinsérer seulement si le score brut s'est significativement aggravé
|
||
return float(row.get('raw_anomaly_score', row['anomaly_score'])) < float(prev) - 0.05
|
||
mask = all_anom.apply(_should_insert, axis=1)
|
||
filtered = all_anom[mask]
|
||
skipped = len(all_anom) - len(filtered)
|
||
if skipped > 0:
|
||
log_info(f"[Dedup TTL={DEDUP_TTL_MIN}min] {skipped} IP(s) filtrée(s) (déjà détectées récemment).")
|
||
return filtered
|
||
except Exception as e:
|
||
log_info(f"[Dedup] Erreur lors de la déduplication TTL : {e}")
|
||
return all_anom
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# CYCLE PRINCIPAL
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
_consecutive_failures = 0
|
||
|
||
def fetch_and_analyze():
|
||
"""Exécute un cycle complet de détection : requête ClickHouse, scoring et insertion des résultats.
|
||
|
||
Récupère le trafic depuis la vue view_ai_features_1h (et optionnellement view_ai_features_24h),
|
||
applique run_semi_supervised_logic sur les deux modèles (Complet / Applicatif),
|
||
insère les scores dans ml_all_scores et les anomalies dans ml_detected_anomalies.
|
||
Met à jour l'état de santé et _consecutive_failures en cas d'échec de requête.
|
||
"""
|
||
global _consecutive_failures
|
||
cycle_id = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
cycle_start = time.time()
|
||
log_info('')
|
||
log_info('=' * 70)
|
||
log_info(f' CYCLE {cycle_id}')
|
||
log_info('=' * 70)
|
||
|
||
client = get_client()
|
||
|
||
# §3.9.5 — Rechargement périodique des signatures H2 depuis ClickHouse
|
||
try:
|
||
if reload_signatures_from_clickhouse(client):
|
||
log_info('[Signatures] Signatures H2 rechargées depuis browser_h2_signatures.')
|
||
except Exception:
|
||
pass
|
||
|
||
# ── Récupération du trafic (fenêtre 1h) ──────────────────────────────────
|
||
try:
|
||
df = client.query_df(f'SELECT * FROM {DB}.view_ai_features_1h')
|
||
except Exception as e:
|
||
log_info(f'ERREUR REQUETE: {e}')
|
||
_consecutive_failures += 1
|
||
if _consecutive_failures >= MAX_FAILURES:
|
||
set_healthy(False)
|
||
log_decision('CONSECUTIVE_FAILURES', cycle_id, '', {'count': _consecutive_failures, 'error': str(e)})
|
||
return
|
||
|
||
_consecutive_failures = 0
|
||
set_healthy(True)
|
||
|
||
if df is None or df.empty:
|
||
log_info('[Données] Aucun trafic trouvé dans view_ai_features_1h.')
|
||
return
|
||
|
||
log_info(f'[Données] {len(df)} sessions chargées depuis view_ai_features_1h ({len(df.columns)} colonnes).')
|
||
|
||
# ── Enrichissement avec les features avancées de la thèse §5 ─────────────
|
||
try:
|
||
df_thesis = client.query_df(f'SELECT * FROM {DB}.view_thesis_features_1h')
|
||
if df_thesis is not None and not df_thesis.empty:
|
||
df_thesis.columns = [c.split('.')[-1] for c in df_thesis.columns]
|
||
df.columns = [c.split('.')[-1] for c in df.columns]
|
||
thesis_cols = [c for c in df_thesis.columns if c not in ('window_start', 'src_ip', 'ja4', 'host')]
|
||
df = df.merge(
|
||
df_thesis, on=['window_start', 'src_ip', 'ja4', 'host'],
|
||
how='left', suffixes=('', '_thesis')
|
||
)
|
||
for col in thesis_cols:
|
||
if col in df.columns:
|
||
df[col] = df[col].fillna(df[col].median() if df[col].notna().any() else 0)
|
||
log_info(f'[Thèse §5] {len(df_thesis)} sessions enrichies avec {len(thesis_cols)} features avancées.')
|
||
else:
|
||
log_info('[Thèse §5] view_thesis_features_1h vide — features avancées indisponibles.')
|
||
except Exception as e:
|
||
log_info(f'[Thèse §5] view_thesis_features_1h inaccessible : {e} — features avancées ignorées.')
|
||
|
||
df = preprocess_df(df)
|
||
|
||
# §5 — Enrichissement avec le score de flotte JA4×ASN (bipartite fleet detection)
|
||
try:
|
||
df = enrich_with_fleet_score(df)
|
||
n_fleet = int((df.get('fleet_campaign_flag', 0) == 1).sum())
|
||
if n_fleet > 0:
|
||
log_info(f'[Fleet §5] {n_fleet} session(s) appartenant à une flotte suspecte.')
|
||
except Exception as e:
|
||
log_info(f'[Fleet §5] Enrichissement de flotte échoué : {e}')
|
||
|
||
# §3.9.5 — Queue unknown_h2_fingerprints : sessions H2 inconnues mais navigateur-like
|
||
try:
|
||
bm_col = 'bm_score' if 'bm_score' in df.columns else None
|
||
bc_col = 'browser_confidence' if 'browser_confidence' in df.columns else None
|
||
h2_col = 'h2_settings_known' if 'h2_settings_known' in df.columns else None
|
||
tls_col = 'tls_version' if 'tls_version' in df.columns else None
|
||
|
||
if bm_col and h2_col:
|
||
# Conditions : H2 inconnu + comportement navigateur + TLS 1.3
|
||
unknown_h2_mask = (
|
||
(df[h2_col] == 0) # H2 SETTINGS inconnu
|
||
& (
|
||
(df[bm_col] < 0.45) # browser_matcher ne reconnaît pas
|
||
| (bc_col and df[bc_col] >= 0.55) # mais browser_confidence élevé
|
||
)
|
||
)
|
||
if tls_col:
|
||
unknown_h2_mask = unknown_h2_mask & (df[tls_col].astype(str).str.startswith('TLSv1.3'))
|
||
|
||
unknown_h2 = df[unknown_h2_mask]
|
||
if not unknown_h2.empty:
|
||
n_unknown = len(unknown_h2)
|
||
# Insérer les fingerprints inconnus dans la table ClickHouse
|
||
client.command(
|
||
"INSERT INTO ja4_processing.unknown_h2_fingerprints "
|
||
"(observed_at, src_ip, ja4, h2_fingerprint, h2_settings_fp, "
|
||
"h2_window_update, h2_pseudo_order, h2_has_priority, "
|
||
"browser_confidence_score, header_user_agent, tls_version) "
|
||
"SELECT now(), src_ip, ja4, h2_fingerprint, h2_settings_fp, "
|
||
"h2_window_update, h2_pseudo_order, h2_has_priority, "
|
||
"browser_confidence, header_user_agent, tls_version "
|
||
"FROM input"
|
||
)
|
||
log_info(f'[H2 Queue] {n_unknown} fingerprint(s) H2 inconnu(s) mis en file d\'examen.')
|
||
except Exception as e:
|
||
log_info(f'[H2 Queue] Erreur insertion unknown_h2_fingerprints : {e}')
|
||
|
||
# ── Résumé des données chargées ───────────────────────────────────────────
|
||
n_total = len(df)
|
||
n_correlated = int((df.get('correlated', pd.Series()) == 1).sum())
|
||
n_uncorrelated = n_total - n_correlated
|
||
n_isp = int((df.get('asn_label', pd.Series()) == 'isp').sum())
|
||
n_datacenter = int((df.get('asn_label', pd.Series()) == 'datacenter').sum())
|
||
n_cdn = int((df.get('asn_label', pd.Series()) == 'cdn').sum())
|
||
n_known_bot = int((df.get('bot_name', pd.Series()) != '').sum())
|
||
n_anubis_allow = int((df.get('anubis_bot_action', pd.Series()) == 'ALLOW').sum())
|
||
n_anubis_deny = int((df.get('anubis_bot_action', pd.Series()) == 'DENY').sum())
|
||
n_anubis_weigh = int((df.get('anubis_bot_action', pd.Series()) == 'WEIGH').sum())
|
||
n_unique_ips = int(df['src_ip'].nunique()) if 'src_ip' in df.columns else 0
|
||
n_unique_ja4 = int(df['ja4'].nunique()) if 'ja4' in df.columns else 0
|
||
|
||
log_info(f'[Données] Après preprocessing : {n_total} sessions, {n_unique_ips} IP uniques, {n_unique_ja4} JA4 uniques.')
|
||
log_info(f'[Données] Corrélées (L3-L7) : {n_correlated:>6} | Non-corrélées (L7) : {n_uncorrelated:>6}')
|
||
log_info(f'[Données] ASN ISP : {n_isp:>6} | Datacenter : {n_datacenter:>6} | CDN : {n_cdn}')
|
||
log_info(f'[Données] Bots connus (dict) : {n_known_bot:>6} | Anubis ALLOW : {n_anubis_allow:>6}')
|
||
log_info(f'[Données] Anubis DENY : {n_anubis_deny:>6} | Anubis WEIGH : {n_anubis_weigh:>6}')
|
||
|
||
# Distribution navigateurs : dict_browser_ja4 (connu) + inféré (structurel)
|
||
if 'inferred_browser_family' in df.columns:
|
||
ibf_counts = df['inferred_browser_family'].value_counts()
|
||
ibf_known = ibf_counts[ibf_counts.index != '']
|
||
if not ibf_known.empty:
|
||
ibf_summary = ', '.join(f'{fam}={cnt}' for fam, cnt in ibf_known.head(7).items())
|
||
n_dict = int((df.get('browser_family', pd.Series('')).fillna('').astype(str) != '').sum())
|
||
n_inferred = int(ibf_known.sum()) - n_dict
|
||
log_info(f'[Données] Navigateurs : {ibf_known.sum():>6} sessions ({len(ibf_known)} familles : {ibf_summary})')
|
||
log_info(f'[Données] Dict JA4 connu : {n_dict:>6} | Inféré structurel : {max(0, n_inferred):>6}')
|
||
elif 'browser_family' in df.columns:
|
||
bf_counts = df['browser_family'].value_counts()
|
||
bf_known = bf_counts[bf_counts.index != '']
|
||
if not bf_known.empty:
|
||
bf_summary = ', '.join(f'{fam}={cnt}' for fam, cnt in bf_known.head(5).items())
|
||
log_info(f'[Données] Navigateurs JA4 : {bf_known.sum():>6} sessions ({len(bf_known)} familles : {bf_summary})')
|
||
# Distribution browser_confidence
|
||
if 'browser_confidence' in df.columns:
|
||
bc = df['browser_confidence']
|
||
n_high = int((bc >= BROWSER_CONFIDENCE_THRESHOLD).sum())
|
||
log_info(f'[Données] browser_confidence: mean={bc.mean():.3f}, ≥seuil({BROWSER_CONFIDENCE_THRESHOLD})={n_high}')
|
||
|
||
log_decision('CYCLE_START', cycle_id, '', {
|
||
'total_rows': n_total,
|
||
'human_rows': n_isp,
|
||
'known_bot_rows': n_known_bot,
|
||
'correlated_rows': n_correlated,
|
||
'anubis_allow_rows': n_anubis_allow,
|
||
'anubis_deny_rows': n_anubis_deny,
|
||
'anubis_weigh_rows': n_anubis_weigh,
|
||
'multiwindow': ENABLE_MULTIWINDOW,
|
||
})
|
||
|
||
try:
|
||
rec_df = client.query_df(f'SELECT src_ip, recurrence FROM {DB}.view_ip_recurrence')
|
||
recurrence_map = dict(zip(rec_df['src_ip'], rec_df['recurrence']))
|
||
except Exception:
|
||
recurrence_map = {}
|
||
|
||
# ── Feedback SOC : ajuster la baseline selon les classifications humaines ─
|
||
soc_feedback = _load_soc_feedback(client)
|
||
if soc_feedback:
|
||
fp_ips = {ip for ip, cls in soc_feedback.items() if cls in ('false_positive', 'legitimate')}
|
||
tp_ips = {ip for ip, cls in soc_feedback.items() if cls in ('true_positive', 'malicious', 'bot')}
|
||
if fp_ips:
|
||
# Les faux positifs confirmés rejoignent le pool humain
|
||
mask_fp = df['src_ip'].isin(fp_ips) & (df.get('asn_label', pd.Series(dtype=str)) != 'isp')
|
||
df.loc[mask_fp, 'asn_label'] = 'isp'
|
||
log_info(f"[Feedback] {mask_fp.sum()} lignes reclassées 'isp' (FP confirmés).")
|
||
if tp_ips:
|
||
# Les vrais positifs confirmés sont exclus de la baseline humaine
|
||
mask_tp = df['src_ip'].isin(tp_ips) & (df.get('asn_label', pd.Series(dtype=str)) == 'isp')
|
||
df.loc[mask_tp, 'asn_label'] = 'soc_confirmed_bot'
|
||
log_info(f"[Feedback] {mask_tp.sum()} lignes exclues de la baseline humaine (TP confirmés).")
|
||
log_decision('SOC_FEEDBACK', cycle_id, '', {
|
||
'fp_ips': len(fp_ips), 'tp_ips': len(tp_ips),
|
||
'total_classifications': len(soc_feedback),
|
||
})
|
||
|
||
# ── Features par modèle (voir DOCUMENTATION.md §4) ───────────────────────
|
||
feats = FEATURES
|
||
feats_complet = FEATURES_COMPLET
|
||
|
||
# ── Analyse fenêtre 1h ────────────────────────────────────────────────────
|
||
df_corr = df[df['correlated'] == 1].copy()
|
||
df_uncorr = df[df['correlated'] == 0].copy()
|
||
log_info('')
|
||
log_info(f'── Modèle Complet (L3→L7, corrélé) : {len(df_corr)} sessions, {len(feats_complet)} features ──')
|
||
anom_a, scored_a = run_semi_supervised_logic(df_corr, feats_complet, 'Complet', cycle_id, recurrence_map)
|
||
log_info('')
|
||
log_info(f'── Modèle Applicatif (L7 seul, non-corrélé) : {len(df_uncorr)} sessions, {len(feats)} features ──')
|
||
anom_b, scored_b = run_semi_supervised_logic(df_uncorr, feats, 'Applicatif', cycle_id, recurrence_map)
|
||
all_anom = pd.concat([anom_a, anom_b], ignore_index=True)
|
||
all_scored = pd.concat([scored_a, scored_b], ignore_index=True)
|
||
|
||
# ── A3 : Analyse fenêtre 24h (optionnelle) ────────────────────────────────
|
||
if ENABLE_MULTIWINDOW:
|
||
try:
|
||
df_24h = client.query_df(f'SELECT * FROM {DB}.{MULTIWINDOW_VIEW}')
|
||
if df_24h is not None and not df_24h.empty:
|
||
df_24h = preprocess_df(df_24h)
|
||
log_info(f"[24h] {len(df_24h)} sessions dans la fenêtre 24h.")
|
||
anom_c, scored_c = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 1].copy(), feats_complet, 'Complet_24h', cycle_id, recurrence_map)
|
||
anom_d, scored_d = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 0].copy(), feats, 'Applicatif_24h', cycle_id, recurrence_map)
|
||
all_anom_24h = pd.concat([anom_c, anom_d], ignore_index=True)
|
||
all_scored_24h = pd.concat([scored_c, scored_d], ignore_index=True)
|
||
# Fusion : pour les IPs présentes dans les deux fenêtres, conserver le score le plus bas
|
||
if not all_anom_24h.empty:
|
||
all_anom = pd.concat([all_anom, all_anom_24h], ignore_index=True)
|
||
log_info(f"[24h] Fusion 1h+24h : {len(all_anom)} entrées avant déduplication.")
|
||
all_scored = pd.concat([all_scored, all_scored_24h], ignore_index=True)
|
||
else:
|
||
log_info(f"[24h] Vue {MULTIWINDOW_VIEW} vide — analyse mono-fenêtre.")
|
||
except Exception as e:
|
||
log_info(f"[24h] Vue {MULTIWINDOW_VIEW} inaccessible : {e} — analyse mono-fenêtre.")
|
||
|
||
# ── Insertion de toutes les classifications dans ml_all_scores ───────────
|
||
if not all_scored.empty:
|
||
try:
|
||
now = datetime.now().replace(microsecond=0)
|
||
all_scored['detected_at'] = now
|
||
all_scored['ja4'] = all_scored['ja4'].replace({'': 'HTTP_CLEAR_TEXT'})
|
||
# Utiliser la famille inférée (multifactorielle) pour browser_family
|
||
if 'inferred_browser_family' in all_scored.columns:
|
||
all_scored['browser_family'] = all_scored['inferred_browser_family']
|
||
all_scores_cols = [
|
||
'detected_at', 'window_start', 'src_ip', 'ja4', 'host', 'bot_name',
|
||
'browser_family',
|
||
'anubis_bot_name', 'anubis_bot_action', 'anubis_bot_category',
|
||
'anomaly_score', 'raw_anomaly_score', 'threat_level', 'model_name',
|
||
'correlated', 'asn_number', 'asn_org', 'country_code', 'asn_label',
|
||
'hits', 'hit_velocity', 'fuzzing_index', 'post_ratio', 'campaign_id',
|
||
'ae_recon_error', 'xgb_prob'
|
||
]
|
||
scores_df = all_scored[[c for c in all_scores_cols if c in all_scored.columns]]
|
||
client.insert_df(f'{DB}.ml_all_scores', scores_df)
|
||
log_info(f'[ml_all_scores] {len(scores_df)} sessions scorées enregistrées.')
|
||
except Exception as e:
|
||
log_info(f'[ml_all_scores] ERREUR INSERTION: {e}')
|
||
|
||
if not all_anom.empty:
|
||
all_anom = all_anom.sort_values('raw_anomaly_score', ascending=True).drop_duplicates(subset=['src_ip'], keep='first')
|
||
log_info(f'[Dédup] Intra-cycle : {len(all_anom)} IP uniques après déduplication.')
|
||
|
||
# A5 — Déduplication inter-cycles avec TTL
|
||
all_anom = _filter_recent_detections(client, all_anom)
|
||
|
||
if all_anom.empty:
|
||
log_info('[Dédup] Toutes les anomalies filtrées par TTL — rien à insérer.')
|
||
log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN})
|
||
try:
|
||
for _model_name, _feats in [('Complet', FEATURES_COMPLET), ('Applicatif', FEATURES)]:
|
||
record_cycle_metrics(
|
||
client=client, db=DB, cycle_id=cycle_id, model_name=_model_name,
|
||
df_all=df, anomalies=pd.DataFrame(), all_scored=all_scored if not all_scored.empty else pd.DataFrame(),
|
||
drift_rate=0.0, cycle_start_time=cycle_start, baseline_size=0,
|
||
threshold=0.0, valid_features=len(_feats), total_features=len(_feats),
|
||
)
|
||
except Exception:
|
||
pass
|
||
return
|
||
|
||
all_anom['detected_at'] = datetime.now().replace(microsecond=0)
|
||
fake_nav_col = 'is_fake_navigation'
|
||
all_anom['is_headless'] = all_anom[fake_nav_col].astype(int) if fake_nav_col in all_anom.columns else 0
|
||
|
||
cols = [
|
||
'detected_at', 'src_ip', 'ja4', 'host', 'bot_name', 'browser_family', 'anomaly_score',
|
||
'raw_anomaly_score', 'campaign_id',
|
||
'threat_level', 'model_name', 'recurrence',
|
||
'asn_number', 'asn_org', 'asn_detail', 'asn_domain', 'country_code', 'asn_label',
|
||
'hits', 'hit_velocity', 'fuzzing_index', 'post_ratio', 'port_exhaustion_ratio', 'max_keepalives', 'orphan_ratio',
|
||
'tcp_jitter_variance', 'tcp_shared_count', 'true_window_size', 'window_mss_ratio',
|
||
'alpn_http_mismatch', 'is_alpn_missing', 'sni_host_mismatch',
|
||
'header_count', 'has_accept_language', 'has_cookie', 'has_referer',
|
||
'modern_browser_score', 'is_headless', 'ua_ch_mismatch',
|
||
'header_order_shared_count', 'ip_id_zero_ratio', 'request_size_variance',
|
||
'multiplexing_efficiency', 'mss_mobile_mismatch',
|
||
'correlated', 'reason', 'asset_ratio', 'direct_access_ratio', 'is_ua_rotating',
|
||
'distinct_ja4_count', 'src_port_density', 'ja4_asn_concentration',
|
||
'ja4_country_concentration', 'is_rare_ja4',
|
||
'header_order_confidence', 'distinct_header_orders', 'temporal_entropy',
|
||
'path_diversity_ratio', 'url_depth_variance', 'anomalous_payload_ratio',
|
||
'anubis_bot_name', 'anubis_bot_action', 'anubis_bot_category',
|
||
]
|
||
|
||
try:
|
||
final_df = all_anom[[c for c in cols if c in all_anom.columns]]
|
||
client.insert_df(f'{DB}.ml_detected_anomalies', final_df)
|
||
n_critical = int((final_df.get('threat_level', pd.Series()) == 'CRITICAL').sum())
|
||
n_high = int((final_df.get('threat_level', pd.Series()) == 'HIGH').sum())
|
||
n_medium = int((final_df.get('threat_level', pd.Series()) == 'MEDIUM').sum())
|
||
n_known = int((final_df.get('bot_name', pd.Series()) != '').sum())
|
||
log_info('')
|
||
log_info(f'╔══ RÉSULTAT CYCLE {cycle_id} ══════════════════════════════════')
|
||
log_info(f'║ {len(final_df)} menaces insérées dans ml_detected_anomalies')
|
||
log_info(f'║ CRITICAL={n_critical} HIGH={n_high} MEDIUM={n_medium} KNOWN_BOT={n_known}')
|
||
if not all_scored.empty:
|
||
log_info(f'║ {len(all_scored)} sessions scorées dans ml_all_scores')
|
||
log_info(f'╚═══════════════════════════════════════════════════════════')
|
||
log_decision('CYCLE_END', cycle_id, '', {
|
||
'inserted': len(final_df),
|
||
'anomalies': int((final_df.get('bot_name', pd.Series()) == '').sum()),
|
||
'known_bots': n_known,
|
||
'critical': n_critical,
|
||
'high': n_high,
|
||
'dedup_ttl_min': DEDUP_TTL_MIN,
|
||
})
|
||
except Exception as e:
|
||
log_info(f'ERREUR INSERTION: {e}')
|
||
else:
|
||
log_info('')
|
||
log_info(f'╔══ RÉSULTAT CYCLE {cycle_id} ══════════════════════════════════')
|
||
log_info(f'║ Aucune menace détectée ce cycle.')
|
||
if not all_scored.empty:
|
||
log_info(f'║ {len(all_scored)} sessions scorées dans ml_all_scores')
|
||
tl_dist = all_scored['threat_level'].value_counts().to_dict() if not all_scored.empty and 'threat_level' in all_scored.columns else {}
|
||
if tl_dist:
|
||
log_info(f'║ Distribution : {", ".join(f"{k}={v}" for k, v in sorted(tl_dist.items()))}')
|
||
log_info(f'╚═══════════════════════════════════════════════════════════')
|
||
log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN})
|
||
|
||
# §9 — Enregistrer les métriques de performance du cycle dans ml_performance_metrics
|
||
try:
|
||
for _model_name, _feats in [('Complet', FEATURES_COMPLET), ('Applicatif', FEATURES)]:
|
||
record_cycle_metrics(
|
||
client=client,
|
||
db=DB,
|
||
cycle_id=cycle_id,
|
||
model_name=_model_name,
|
||
df_all=df,
|
||
anomalies=all_anom if not all_anom.empty else pd.DataFrame(),
|
||
all_scored=all_scored if not all_scored.empty else pd.DataFrame(),
|
||
drift_rate=0.0,
|
||
cycle_start_time=cycle_start,
|
||
baseline_size=0,
|
||
threshold=0.0,
|
||
valid_features=len(_feats),
|
||
total_features=len(_feats),
|
||
)
|
||
except Exception as e:
|
||
log_info(f'[Métriques §9] Enregistrement des métriques échoué : {e}')
|