Étape 2 — Fingerprinting HTTP/2 dans le pipeline ML : - Ajout du dictionnaire dict_browser_h2 (11 familles de navigateurs) dans 05_aggregation_tables.sql - Ajout du CTE h2_agg et 4 features HTTP/2 dans 07_ai_features_view.sql : h2_settings_known, h2_pseudo_order_match, h2_ja4_coherence, h2_settings_rare - Calcul du fingerprint_coherence_score (5 axes pondérés) dans la vue - Ajout du 6e axe axis_h2_coherence dans browser.py (poids rééquilibrés) - browser_h2.csv : 11 fingerprints Akamai → famille navigateur Étape 3 — Pré-filtre de cohérence sur la baseline humaine : - pipeline.py exclut les sessions avec fingerprint_coherence_score < seuil de la baseline d'entraînement - FINGERPRINT_COHERENCE_THRESHOLD configurable via env (défaut 0.25) - Log des sessions exclues pour analyse SOC Étape 4 — Détection de drift améliorée : - scoring.py : passage de 5 à 9 quantiles (p5…p95) - Ajout de la divergence KL en complément du test KS - Détection de drift adversarial (≥80% des features dérivent dans la même direction) - Split temporel strict pour la validation Étape 5 — Graphe bipartite JA4×ASN (§5.2) : - fleet.py : détection de flottes via NetworkX + Louvain (imports optionnels) - enrich_with_fleet_score() : ajout fleet_score + fleet_campaign_flag au DataFrame - cycle.py : appel après preprocess_df avec log du nombre de sessions en flotte - SQL migration 05_fleet_metrics_tables.sql : table fleet_detections (TTL 7j) - Dashboard : /fleet + /api/fleet (communautés détectées) + template fleet.html Étape 6 — Cross-domain Jaccard §5.8 : - 12_thesis_features.sql : CTE jaccard_paths → cross_domain_path_similarity - Signal : même chemins (/admin, /wp-login) sur plusieurs hosts = scanner Étape 7 — ExIFFI + erreurs AE par feature : - scoring.py : compute_exiffi_importance() par permutation, compute_ae_feature_errors() - pipeline.py : calcul ExIFFI sur X_test, mapping index → dict pour anomalies - build_reason() enrichi avec exiffi_top quand SHAP inactif Étape 8 — Méta-learner pour la pondération de l'ensemble : - scoring.py : classe MetaLearner (LogisticRegression, fallback poids fixes <1000 labels) - Collecte des labels depuis le cycle courant (known_bots, légitimes, Anubis) - pipeline.py : remplacement des poids fixes par MetaLearner.predict() Étape 9 — Métriques de performance et monitoring : - metrics.py : record_cycle_metrics() — taux anomalie, drift, corrélation, latence - SQL migration 05_fleet_metrics_tables.sql : table ml_performance_metrics (TTL 90j) - Dashboard : /health + /api/health + template health.html - cycle.py : appel record_cycle_metrics en fin de cycle (Complet + Applicatif) Tests : 36/36 bot-detector tests passent Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
175 lines
6.4 KiB
Python
175 lines
6.4 KiB
Python
"""Détection de flottes de bots via graphe bipartite JA4×ASN.
|
||
|
||
§5.2 — Analyse de graphe bipartite G=(JA4 ∪ ASN, E) pour identifier les flottes
|
||
de bots coordonnées qui font tourner leurs fingerprints JA4 et ASN.
|
||
|
||
Algorithme :
|
||
1. Construire le graphe bipartite G depuis les sessions du cycle courant
|
||
2. Projeter sur les nœuds JA4 (shared-ASN weighted projection)
|
||
3. Détecter les communautés Louvain (python-louvain)
|
||
4. Calculer fleet_score = taille × densité / log2(n_asn + 2) pour chaque communauté
|
||
5. Retourner les IPs appartenant aux communautés suspectes avec leur fleet_score
|
||
"""
|
||
import logging
|
||
from typing import Optional
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Seuil de fleet_score à partir duquel une communauté est considérée suspecte
|
||
FLEET_SCORE_THRESHOLD = float(__import__('os').getenv('FLEET_SCORE_THRESHOLD', '2.0'))
|
||
# Poids du fleet_score dans le score final (malus supplémentaire)
|
||
FLEET_SCORE_WEIGHT = float(__import__('os').getenv('FLEET_SCORE_WEIGHT', '0.10'))
|
||
# Nombre minimal d'arêtes pour inclure un JA4 dans l'analyse
|
||
FLEET_MIN_EDGES = int(__import__('os').getenv('FLEET_MIN_EDGES', '3'))
|
||
|
||
|
||
def build_fleet_graph(df: pd.DataFrame) -> Optional[object]:
|
||
"""Construit le graphe bipartite JA4×ASN à partir du cycle courant.
|
||
|
||
Nœuds : ensemble JA4 (préfixe 'ja4:') + ensemble ASN (préfixe 'asn:')
|
||
Arêtes : (ja4, asn) avec weight = nombre d'IPs distinctes sur ce couple
|
||
|
||
Exige que df ait les colonnes : ja4, asn_number, src_ip
|
||
Retourne None si networkx n'est pas disponible ou données insuffisantes.
|
||
"""
|
||
try:
|
||
import networkx as nx
|
||
from networkx.algorithms import bipartite
|
||
except ImportError:
|
||
logger.warning("[Fleet] networkx non disponible — analyse de flotte désactivée.")
|
||
return None
|
||
|
||
if df.empty or 'ja4' not in df.columns or 'asn_number' not in df.columns:
|
||
return None
|
||
|
||
# Filtrer les JA4 vides et ASN 0
|
||
mask = (df['ja4'].fillna('') != '') & (df['asn_number'].fillna('0') != '0')
|
||
sub = df[mask][['src_ip', 'ja4', 'asn_number']].copy()
|
||
if len(sub) < 10:
|
||
return None
|
||
|
||
# Compter les IPs par (ja4, asn) — poids de l'arête
|
||
edge_weights = (
|
||
sub.groupby(['ja4', 'asn_number'])['src_ip']
|
||
.nunique()
|
||
.reset_index(name='n_ips')
|
||
)
|
||
# Garder seulement les arêtes avec au moins FLEET_MIN_EDGES IPs distinctes
|
||
edge_weights = edge_weights[edge_weights['n_ips'] >= FLEET_MIN_EDGES]
|
||
if len(edge_weights) < 5:
|
||
return None
|
||
|
||
G = nx.Graph()
|
||
ja4_nodes = set()
|
||
asn_nodes = set()
|
||
for _, row in edge_weights.iterrows():
|
||
ja4_node = f"ja4:{row['ja4']}"
|
||
asn_node = f"asn:{row['asn_number']}"
|
||
G.add_node(ja4_node, bipartite=0)
|
||
G.add_node(asn_node, bipartite=1)
|
||
G.add_edge(ja4_node, asn_node, weight=int(row['n_ips']))
|
||
ja4_nodes.add(ja4_node)
|
||
asn_nodes.add(asn_node)
|
||
|
||
return G, ja4_nodes, asn_nodes
|
||
|
||
|
||
def detect_fleet_communities(df: pd.DataFrame) -> dict:
|
||
"""Analyse le graphe bipartite et retourne un dict {src_ip: fleet_score}.
|
||
|
||
fleet_score > FLEET_SCORE_THRESHOLD → IP appartient à une flotte suspectée.
|
||
fleet_score = 0 pour toutes les autres IPs.
|
||
|
||
fleet_score = community_size * graph_density / log2(n_asn + 2)
|
||
"""
|
||
result = build_fleet_graph(df)
|
||
if result is None:
|
||
return {}
|
||
|
||
G, ja4_nodes, asn_nodes = result
|
||
|
||
try:
|
||
import networkx as nx
|
||
from networkx.algorithms import bipartite
|
||
try:
|
||
from community import best_partition as louvain_partition
|
||
LOUVAIN_AVAILABLE = True
|
||
except ImportError:
|
||
LOUVAIN_AVAILABLE = False
|
||
|
||
# Projection bipartite : graphe des JA4 partageant des ASN
|
||
G_ja4 = bipartite.weighted_projected_graph(G, ja4_nodes)
|
||
if G_ja4.number_of_edges() == 0:
|
||
return {}
|
||
|
||
# Détection de communautés
|
||
if LOUVAIN_AVAILABLE:
|
||
partition = louvain_partition(G_ja4, weight='weight', random_state=42)
|
||
# partition = {node: community_id}
|
||
communities: dict = {}
|
||
for node, cid in partition.items():
|
||
communities.setdefault(cid, set()).add(node)
|
||
else:
|
||
# Fallback : composantes connexes
|
||
communities = {
|
||
i: set(c)
|
||
for i, c in enumerate(nx.connected_components(G_ja4))
|
||
if len(c) >= 2
|
||
}
|
||
|
||
# Calculer le fleet_score de chaque communauté
|
||
fleet_scores: dict = {} # {ja4: fleet_score}
|
||
for cid, members in communities.items():
|
||
if len(members) < 2:
|
||
continue
|
||
sub_g = G.subgraph(
|
||
list(members) + [n for n in asn_nodes if any(G.has_edge(n, m) for m in members)]
|
||
)
|
||
n_asn = len([n for n in sub_g.nodes if n.startswith('asn:')])
|
||
density = nx.density(G_ja4.subgraph(members))
|
||
score = len(members) * density / max(np.log2(n_asn + 2), 0.1)
|
||
for ja4_node in members:
|
||
ja4 = ja4_node.replace('ja4:', '')
|
||
fleet_scores[ja4] = round(float(score), 3)
|
||
|
||
# Mapper les fleet_scores sur les IPs
|
||
if not fleet_scores:
|
||
return {}
|
||
|
||
ip_scores: dict = {}
|
||
for _, row in df.iterrows():
|
||
ja4 = str(row.get('ja4', ''))
|
||
score = fleet_scores.get(ja4, 0.0)
|
||
if score >= FLEET_SCORE_THRESHOLD:
|
||
src_ip = str(row.get('src_ip', ''))
|
||
if src_ip:
|
||
ip_scores[src_ip] = max(ip_scores.get(src_ip, 0.0), score)
|
||
|
||
n_fleets = len(set(fleet_scores.values()))
|
||
if ip_scores:
|
||
logger.info(
|
||
f"[Fleet] {len(ip_scores)} IPs dans {n_fleets} communauté(s) suspecte(s) "
|
||
f"(score max={max(ip_scores.values()):.2f})"
|
||
)
|
||
return ip_scores
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[Fleet] Erreur détection de flotte : {e}")
|
||
return {}
|
||
|
||
|
||
def enrich_with_fleet_score(df: pd.DataFrame) -> pd.DataFrame:
|
||
"""Enrichit le DataFrame avec fleet_score et fleet_campaign_flag.
|
||
|
||
fleet_campaign_flag = 1 si l'IP appartient à une flotte suspectée.
|
||
fleet_score = score de la communauté (0 = pas de flotte).
|
||
"""
|
||
df = df.copy()
|
||
fleet_map = detect_fleet_communities(df)
|
||
df['fleet_score'] = df['src_ip'].map(fleet_map).fillna(0.0).astype(float)
|
||
df['fleet_campaign_flag'] = (df['fleet_score'] >= FLEET_SCORE_THRESHOLD).astype(int)
|
||
return df
|