fix: ASN dictionary pipeline + verbose bot-detector logging

- Fix dict_iplocate_asn: remove non-existent org/domain columns (4→4 cols)
- Add CSV header to iplocate-ip-to-asn.csv (CSVWithNames format)
- Replace org/domain dictGet calls with empty string literals in MV
- Full 714K CIDR stub for complete ASN resolution in tests
- Add header generation to generate_asn_data.py
- Verbose bot-detector stdout: data summary, triage breakdown, model
  training details, scoring stats, browser classification, boxed results
- Fix IPv6 filter in traffic seeder (_ips_from_cidrs)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-08 17:43:55 +02:00
parent 7b7b69dee3
commit 98289ccf04
9 changed files with 709482 additions and 45 deletions

View File

@ -246,8 +246,8 @@ AS SELECT
toIPv4(coalesce(JSONExtractString(raw_json, 'dst_ip'), '0.0.0.0')) AS dst_ip,
toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port,
dictGetOrDefault('ja4_processing.dict_iplocate_asn', 'name', toIPv6(src_ip), '') AS src_as_name,
dictGetOrDefault('ja4_processing.dict_iplocate_asn', 'org', toIPv6(src_ip), '') AS src_org,
dictGetOrDefault('ja4_processing.dict_iplocate_asn', 'domain', toIPv6(src_ip), '') AS src_domain,
'' AS src_org,
'' AS src_domain,
coalesce(JSONExtractString(raw_json, 'method'), '') AS method,
coalesce(JSONExtractString(raw_json, 'scheme'), '') AS scheme,
coalesce(JSONExtractString(raw_json, 'host'), '') AS host,

View File

@ -16,8 +16,8 @@ SELECT
toIPv4(coalesce(JSONExtractString(raw_json, 'dst_ip'), '0.0.0.0')) AS dst_ip,
toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port,
dictGetOrDefault('ja4_processing.dict_iplocate_asn', 'name', _ip, '') AS src_as_name,
dictGetOrDefault('ja4_processing.dict_iplocate_asn', 'org', _ip, '') AS src_org,
dictGetOrDefault('ja4_processing.dict_iplocate_asn', 'domain', _ip, '') AS src_domain,
'' AS src_org,
'' AS src_domain,
coalesce(JSONExtractString(raw_json, 'method'), '') AS method,
coalesce(JSONExtractString(raw_json, 'scheme'), '') AS scheme,
coalesce(JSONExtractString(raw_json, 'host'), '') AS host,

View File

@ -580,7 +580,7 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list,
log_info(f"[{name}] Modèle v{meta['version_id']} expiré ({age_h:.1f}h ≥ {RETRAIN_INTERVAL_H}h) — retraining.")
version_id = datetime.now().strftime('%Y%m%d_%H%M%S')
log_info(f"[{name}] Entraînement version {version_id} sur {len(human_baseline)} sessions humaines... (contamination={CONTAMINATION})")
log_info(f"[{name}] Entraînement EIF v{version_id} {len(human_baseline)} sessions ISP, {len(features)} features, contamination={CONTAMINATION}")
X = human_baseline[features].replace([np.inf, -np.inf], np.nan)
X = X.fillna(X.median())
@ -688,7 +688,7 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list,
_append_training_history({k: v for k, v in new_meta.items() if k != 'baseline_stats'})
_purge_old_versions(name)
log_info(f"[{name}] Modèle v{version_id} sauvegardé → {new_model_path}")
log_info(f"[{name}] Modèle v{version_id} sauvegardé → {new_model_path} (AE={'oui' if ae_model is not None else 'non'})")
log_decision('MODEL_TRAINED', cycle_id, name, {
'version_id': version_id, 'previous_version': previous_version,
'human_samples': len(human_baseline), 'next_retrain_in_h': RETRAIN_INTERVAL_H,
@ -979,18 +979,32 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
unknown_traffic = rest[rest['anubis_bot_action'] != 'ALLOW'].copy()
human_baseline = unknown_traffic[unknown_traffic['asn_label'] == 'isp']
log_info(f'[{name}] ── Triage ──────────────────────────────────────')
log_info(f'[{name}] Total sessions : {len(df):>6}')
log_info(f'[{name}] Bots connus (dict) : {len(known_bots):>6}')
log_info(f'[{name}] Anubis ALLOW : {len(anubis_allow):>6}')
log_info(f'[{name}] Trafic à scorer (IF) : {len(unknown_traffic):>6}')
log_info(f'[{name}] Baseline ISP (human) : {len(human_baseline):>6} (seuil min=500)')
# A7 — Valider les features avant tout traitement
valid_features = validate_features(df, features, name, cycle_id)
if valid_features is None:
return pd.DataFrame(), pd.DataFrame()
if len(human_baseline) < 500:
log_info(f"[{name}] Données humaines insuffisantes ({len(human_baseline)} < 500).")
log_info(f"[{name}] Données humaines insuffisantes ({len(human_baseline)} < 500) — cycle ignoré.")
log_info(f"[{name}] Distribution asn_label dans le trafic à scorer :")
if 'asn_label' in unknown_traffic.columns:
for label, cnt in unknown_traffic['asn_label'].value_counts().head(8).items():
log_info(f"[{name}] {label:>15} : {cnt}")
log_decision('SKIPPED_LOW_DATA', cycle_id, name, {
'human_count': len(human_baseline), 'unknown_count': len(unknown_traffic)
})
return pd.DataFrame(), pd.DataFrame()
log_info(f'[{name}] ── Modèle EIF ─────────────────────────────────')
log_info(f'[{name}] Features validées : {len(valid_features)}/{len(features)} ({", ".join(valid_features[:5])}{"" if len(valid_features) > 5 else ""})')
# A1 — Dérive conceptuelle intégrée dans load_or_train_model
model, ae_model = load_or_train_model(name, human_baseline, valid_features, cycle_id)
unknown_traffic = unknown_traffic.copy()
@ -998,6 +1012,7 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
X_test = unknown_traffic[valid_features].replace([np.inf, -np.inf], np.nan)
X_test = X_test.fillna(X_test.median())
raw_scores = model.decision_function(X_test)
log_info(f'[{name}] Scoring EIF : {len(X_test)} sessions scorées (min={raw_scores.min():.4f}, max={raw_scores.max():.4f}, mean={raw_scores.mean():.4f})')
# Combinaison EIF + Autoencoder si disponible
# Score final = (1-α) * eif_norm + α * ae_norm où α = AE_WEIGHT
@ -1206,6 +1221,20 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
if not anubis_allow.empty:
all_scored = pd.concat([all_scored, anubis_allow], ignore_index=True)
# ── Résumé du modèle ─────────────────────────────────────────────────────
n_threats = len(threats) if not threats.empty else 0
n_anomalies = len(anomalies) if not anomalies.empty else 0
n_legit_browser = int(browser_legit_mask.sum()) if browser_legit_mask is not None else 0
n_deny = len(anubis_deny) if not anubis_deny.empty else 0
tl_counts = threats['threat_level'].value_counts().to_dict() if not threats.empty else {}
tl_str = ', '.join(f'{k}={v}' for k, v in sorted(tl_counts.items())) if tl_counts else 'aucune'
log_info(f'[{name}] ── Résultat ────────────────────────────────────')
log_info(f'[{name}] Menaces totales : {n_threats:>6} ({tl_str})')
log_info(f'[{name}] Anomalies IF : {n_anomalies:>6} (seuil={effective_threshold:.4f})')
log_info(f'[{name}] Navigateurs légit. : {n_legit_browser:>6}')
log_info(f'[{name}] Anubis DENY (forcé) : {n_deny:>6}')
log_info(f'[{name}] Sessions scorées : {len(all_scored):>6} (→ ml_all_scores)')
return threats, all_scored
# ═══════════════════════════════════════════════════════════════════════════════
@ -1366,7 +1395,10 @@ def fetch_and_analyze():
"""
global _consecutive_failures, _service_healthy
cycle_id = datetime.now().strftime('%Y%m%d_%H%M%S')
log_info('=== Lancement cycle IA ===')
log_info('')
log_info('=' * 70)
log_info(f' CYCLE {cycle_id}')
log_info('=' * 70)
client = get_client()
@ -1387,9 +1419,11 @@ def fetch_and_analyze():
_service_healthy = True
if df is None or df.empty:
log_info('Aucun trafic trouvé.')
log_info('[Données] Aucun trafic trouvé dans view_ai_features_1h.')
return
log_info(f'[Données] {len(df)} sessions chargées depuis view_ai_features_1h ({len(df.columns)} colonnes).')
# ── Enrichissement avec les features avancées de la thèse §5 ─────────────
try:
df_thesis = client.query_df(f'SELECT * FROM {DB}.view_thesis_features_1h')
@ -1412,14 +1446,42 @@ def fetch_and_analyze():
df = _preprocess_df(df)
# ── Résumé des données chargées ───────────────────────────────────────────
n_total = len(df)
n_correlated = int((df.get('correlated', pd.Series()) == 1).sum())
n_uncorrelated = n_total - n_correlated
n_isp = int((df.get('asn_label', pd.Series()) == 'isp').sum())
n_datacenter = int((df.get('asn_label', pd.Series()) == 'datacenter').sum())
n_cdn = int((df.get('asn_label', pd.Series()) == 'cdn').sum())
n_known_bot = int((df.get('bot_name', pd.Series()) != '').sum())
n_anubis_allow = int((df.get('anubis_bot_action', pd.Series()) == 'ALLOW').sum())
n_anubis_deny = int((df.get('anubis_bot_action', pd.Series()) == 'DENY').sum())
n_anubis_weigh = int((df.get('anubis_bot_action', pd.Series()) == 'WEIGH').sum())
n_unique_ips = int(df['src_ip'].nunique()) if 'src_ip' in df.columns else 0
n_unique_ja4 = int(df['ja4'].nunique()) if 'ja4' in df.columns else 0
log_info(f'[Données] Après preprocessing : {n_total} sessions, {n_unique_ips} IP uniques, {n_unique_ja4} JA4 uniques.')
log_info(f'[Données] Corrélées (L3-L7) : {n_correlated:>6} | Non-corrélées (L7) : {n_uncorrelated:>6}')
log_info(f'[Données] ASN ISP : {n_isp:>6} | Datacenter : {n_datacenter:>6} | CDN : {n_cdn}')
log_info(f'[Données] Bots connus (dict) : {n_known_bot:>6} | Anubis ALLOW : {n_anubis_allow:>6}')
log_info(f'[Données] Anubis DENY : {n_anubis_deny:>6} | Anubis WEIGH : {n_anubis_weigh:>6}')
# Distribution browser_family
if 'browser_family' in df.columns:
bf_counts = df['browser_family'].value_counts()
bf_known = bf_counts[bf_counts.index != '']
if not bf_known.empty:
bf_summary = ', '.join(f'{fam}={cnt}' for fam, cnt in bf_known.head(5).items())
log_info(f'[Données] Navigateurs JA4 : {bf_known.sum():>6} sessions ({len(bf_known)} familles : {bf_summary})')
log_decision('CYCLE_START', cycle_id, '', {
'total_rows': len(df),
'human_rows': int((df.get('asn_label', pd.Series()) == 'isp').sum()),
'known_bot_rows': int((df.get('bot_name', pd.Series()) != '').sum()),
'correlated_rows': int((df.get('correlated', pd.Series()) == 1).sum()),
'anubis_allow_rows': int((df.get('anubis_bot_action', pd.Series()) == 'ALLOW').sum()),
'anubis_deny_rows': int((df.get('anubis_bot_action', pd.Series()) == 'DENY').sum()),
'anubis_weigh_rows': int((df.get('anubis_bot_action', pd.Series()) == 'WEIGH').sum()),
'total_rows': n_total,
'human_rows': n_isp,
'known_bot_rows': n_known_bot,
'correlated_rows': n_correlated,
'anubis_allow_rows': n_anubis_allow,
'anubis_deny_rows': n_anubis_deny,
'anubis_weigh_rows': n_anubis_weigh,
'multiwindow': ENABLE_MULTIWINDOW,
})
@ -1490,8 +1552,14 @@ def fetch_and_analyze():
]
# ── Analyse fenêtre 1h ────────────────────────────────────────────────────
anom_a, scored_a = run_semi_supervised_logic(df[df['correlated'] == 1].copy(), feats_complet, 'Complet', cycle_id, recurrence_map)
anom_b, scored_b = run_semi_supervised_logic(df[df['correlated'] == 0].copy(), feats, 'Applicatif', cycle_id, recurrence_map)
df_corr = df[df['correlated'] == 1].copy()
df_uncorr = df[df['correlated'] == 0].copy()
log_info('')
log_info(f'── Modèle Complet (L3→L7, corrélé) : {len(df_corr)} sessions, {len(feats_complet)} features ──')
anom_a, scored_a = run_semi_supervised_logic(df_corr, feats_complet, 'Complet', cycle_id, recurrence_map)
log_info('')
log_info(f'── Modèle Applicatif (L7 seul, non-corrélé) : {len(df_uncorr)} sessions, {len(feats)} features ──')
anom_b, scored_b = run_semi_supervised_logic(df_uncorr, feats, 'Applicatif', cycle_id, recurrence_map)
all_anom = pd.concat([anom_a, anom_b], ignore_index=True)
all_scored = pd.concat([scored_a, scored_b], ignore_index=True)
@ -1539,13 +1607,13 @@ def fetch_and_analyze():
if not all_anom.empty:
all_anom = all_anom.sort_values('raw_anomaly_score', ascending=True).drop_duplicates(subset=['src_ip'], keep='first')
log_info(f'Après déduplication intra-cycle : {len(all_anom)} IP uniques.')
log_info(f'[Dédup] Intra-cycle : {len(all_anom)} IP uniques après déduplication.')
# A5 — Déduplication inter-cycles avec TTL
all_anom = _filter_recent_detections(client, all_anom)
if all_anom.empty:
log_info('Toutes les anomalies filtrées par déduplication TTL.')
log_info('[Dédup] Toutes les anomalies filtrées par TTL — rien à insérer.')
log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN})
return
@ -1575,40 +1643,67 @@ def fetch_and_analyze():
try:
final_df = all_anom[[c for c in cols if c in all_anom.columns]]
client.insert_df(f'{DB}.ml_detected_anomalies', final_df)
log_info(f'Succès: {len(final_df)} menaces enregistrées.')
n_critical = int((final_df.get('threat_level', pd.Series()) == 'CRITICAL').sum())
n_high = int((final_df.get('threat_level', pd.Series()) == 'HIGH').sum())
n_medium = int((final_df.get('threat_level', pd.Series()) == 'MEDIUM').sum())
n_known = int((final_df.get('bot_name', pd.Series()) != '').sum())
log_info('')
log_info(f'╔══ RÉSULTAT CYCLE {cycle_id} ══════════════════════════════════')
log_info(f'{len(final_df)} menaces insérées dans ml_detected_anomalies')
log_info(f'║ CRITICAL={n_critical} HIGH={n_high} MEDIUM={n_medium} KNOWN_BOT={n_known}')
if not all_scored.empty:
log_info(f'{len(all_scored)} sessions scorées dans ml_all_scores')
log_info(f'╚═══════════════════════════════════════════════════════════')
log_decision('CYCLE_END', cycle_id, '', {
'inserted': len(final_df),
'anomalies': int((final_df.get('bot_name', pd.Series()) == '').sum()),
'known_bots': int((final_df.get('bot_name', pd.Series()) != '').sum()),
'critical': int((final_df.get('threat_level', pd.Series()) == 'CRITICAL').sum()),
'high': int((final_df.get('threat_level', pd.Series()) == 'HIGH').sum()),
'known_bots': n_known,
'critical': n_critical,
'high': n_high,
'dedup_ttl_min': DEDUP_TTL_MIN,
})
except Exception as e:
log_info(f'ERREUR INSERTION: {e}')
else:
log_info('Aucune menace détectée.')
log_info('')
log_info(f'╔══ RÉSULTAT CYCLE {cycle_id} ══════════════════════════════════')
log_info(f'║ Aucune menace détectée ce cycle.')
if not all_scored.empty:
log_info(f'{len(all_scored)} sessions scorées dans ml_all_scores')
tl_dist = all_scored['threat_level'].value_counts().to_dict() if not all_scored.empty and 'threat_level' in all_scored.columns else {}
if tl_dist:
log_info(f'║ Distribution : {", ".join(f"{k}={v}" for k, v in sorted(tl_dist.items()))}')
log_info(f'╚═══════════════════════════════════════════════════════════')
log_decision('CYCLE_END', cycle_id, '', {'inserted': 0, 'anomalies': 0, 'known_bots': 0, 'critical': 0, 'high': 0, 'dedup_ttl_min': DEDUP_TTL_MIN})
if __name__ == '__main__':
log_info('*' * 65)
log_info(' DÉMARRAGE DU SERVICE BOT DETECTOR IA v12 (+ Anubis)')
log_info(f' DB : {DB}')
log_info(f' Contamination : {CONTAMINATION}')
log_info(f' Seuil anomalie : {ANOMALY_THRESHOLD} (adaptatif percentile={ANOMALY_PERCENTILE})')
log_info(f' Cycle : {CYCLE_INTERVAL}s | Fenêtre 1h | Multi-fenêtres : {ENABLE_MULTIWINDOW}')
log_info(f' Retraining : toutes les {RETRAIN_INTERVAL_H}h | Drift threshold : {DRIFT_THRESHOLD:.0%}')
log_info(f' Modèles : {MODEL_DIR}')
log_info(f' SHAP : {"activé" if ENABLE_SHAP else "désactivé (shap non installé)" if not SHAP_AVAILABLE else "désactivé"}')
log_info(f' Clustering : {"activé" if ENABLE_CLUSTERING else "désactivé"} | Dedup TTL : {DEDUP_TTL_MIN}min')
log_info(f' Récurrence weight : {RECURRENCE_WEIGHT} | Min features ratio : {MIN_VALID_FEATURE_RATIO:.0%}')
log_info(f' Anubis : ALLOW→KNOWN_BOT (score=0), DENY→ANUBIS_DENY (score IF réel)')
log_info(f' Browser : LEGITIMATE_BROWSER si consistency>={BROWSER_LEGIT_MIN_CONSISTENCY}/5 + JA4 reconnu')
log_info('*' * 65)
log_info('')
log_info('╔═══════════════════════════════════════════════════════════════╗')
log_info('║ BOT DETECTOR IA v12 — Pipeline de détection semi-supervisé ║')
log_info('╚═══════════════════════════════════════════════════════════════╝')
log_info(f' Base de données : {DB}')
log_info(f' Contamination EIF : {CONTAMINATION}')
log_info(f' Seuil anomalie : {ANOMALY_THRESHOLD} (adaptatif p{ANOMALY_PERCENTILE})')
log_info(f' Cycle d\'analyse : toutes les {CYCLE_INTERVAL}s')
log_info(f' Retraining : toutes les {RETRAIN_INTERVAL_H}h (drift seuil={DRIFT_THRESHOLD:.0%})')
log_info(f' Modèles : {MODEL_DIR}')
log_info(f' Autoencoder (AE) : poids={AE_WEIGHT} {"(PyTorch disponible)" if TORCH_AVAILABLE else "(PyTorch absent — désactivé)"}')
log_info(f' XGBoost : poids={XGB_WEIGHT} (min labels={XGB_MIN_LABELS})')
log_info(f' SHAP explainabilité : {"activé" if ENABLE_SHAP else "désactivé (shap non installé)" if not SHAP_AVAILABLE else "désactivé"}')
log_info(f' Clustering HDBSCAN : {"activé" if ENABLE_CLUSTERING else "désactivé"}')
log_info(f' Dédup inter-cycles : TTL={DEDUP_TTL_MIN}min')
log_info(f' Récurrence : weight={RECURRENCE_WEIGHT}')
log_info(f' Multi-fenêtres (24h) : {"activé" if ENABLE_MULTIWINDOW else "désactivé"}')
log_info(f' Feature ratio minimum : {MIN_VALID_FEATURE_RATIO:.0%}')
log_info(f' Anubis : ALLOW→KNOWN_BOT, DENY→forcé menace (score IF réel)')
log_info(f' Browser légitime : consistency≥{BROWSER_LEGIT_MIN_CONSISTENCY}/5 + JA4 reconnu')
log_info(f' Feedback SOC : {"activé" if ENABLE_FEEDBACK else "désactivé"} (fenêtre={FEEDBACK_WINDOW_DAYS}j)')
log_info('')
log_decision('SERVICE_START', 'boot', '', {
'db': DB, 'contamination': CONTAMINATION, 'anomaly_threshold': ANOMALY_THRESHOLD,
'cycle_interval': CYCLE_INTERVAL, 'retrain_interval_h': RETRAIN_INTERVAL_H
})
log_info(f'En attente du premier cycle dans {CYCLE_INTERVAL}s…')
while True:
try: fetch_and_analyze()
except Exception as e: log_info(f"Erreur globale : {e}")