"""Détection de flottes de bots via graphe bipartite JA4×ASN. §5.2 — Analyse de graphe bipartite G=(JA4 ∪ ASN, E) pour identifier les flottes de bots coordonnées qui font tourner leurs fingerprints JA4 et ASN. Algorithme : 1. Construire le graphe bipartite G depuis les sessions du cycle courant 2. Projeter sur les nœuds JA4 (shared-ASN weighted projection) 3. Détecter les communautés Louvain (python-louvain) 4. Calculer fleet_score = taille × densité / log2(n_asn + 2) pour chaque communauté 5. Retourner les IPs appartenant aux communautés suspectes avec leur fleet_score """ import logging from typing import Optional import pandas as pd import numpy as np logger = logging.getLogger(__name__) # Seuil de fleet_score à partir duquel une communauté est considérée suspecte FLEET_SCORE_THRESHOLD = float(__import__('os').getenv('FLEET_SCORE_THRESHOLD', '2.0')) # Poids du fleet_score dans le score final (malus supplémentaire) FLEET_SCORE_WEIGHT = float(__import__('os').getenv('FLEET_SCORE_WEIGHT', '0.10')) # Nombre minimal d'arêtes pour inclure un JA4 dans l'analyse FLEET_MIN_EDGES = int(__import__('os').getenv('FLEET_MIN_EDGES', '3')) def build_fleet_graph(df: pd.DataFrame) -> Optional[object]: """Construit le graphe bipartite JA4×ASN à partir du cycle courant. Nœuds : ensemble JA4 (préfixe 'ja4:') + ensemble ASN (préfixe 'asn:') Arêtes : (ja4, asn) avec weight = nombre d'IPs distinctes sur ce couple Exige que df ait les colonnes : ja4, asn_number, src_ip Retourne None si networkx n'est pas disponible ou données insuffisantes. """ try: import networkx as nx from networkx.algorithms import bipartite except ImportError: logger.warning("[Fleet] networkx non disponible — analyse de flotte désactivée.") return None if df.empty or 'ja4' not in df.columns or 'asn_number' not in df.columns: return None # Filtrer les JA4 vides et ASN 0 mask = (df['ja4'].fillna('') != '') & (df['asn_number'].fillna('0') != '0') sub = df[mask][['src_ip', 'ja4', 'asn_number']].copy() if len(sub) < 10: return None # Compter les IPs par (ja4, asn) — poids de l'arête edge_weights = ( sub.groupby(['ja4', 'asn_number'])['src_ip'] .nunique() .reset_index(name='n_ips') ) # Garder seulement les arêtes avec au moins FLEET_MIN_EDGES IPs distinctes edge_weights = edge_weights[edge_weights['n_ips'] >= FLEET_MIN_EDGES] if len(edge_weights) < 5: return None G = nx.Graph() ja4_nodes = set() asn_nodes = set() for _, row in edge_weights.iterrows(): ja4_node = f"ja4:{row['ja4']}" asn_node = f"asn:{row['asn_number']}" G.add_node(ja4_node, bipartite=0) G.add_node(asn_node, bipartite=1) G.add_edge(ja4_node, asn_node, weight=int(row['n_ips'])) ja4_nodes.add(ja4_node) asn_nodes.add(asn_node) return G, ja4_nodes, asn_nodes def detect_fleet_communities(df: pd.DataFrame) -> dict: """Analyse le graphe bipartite et retourne un dict {src_ip: fleet_score}. fleet_score > FLEET_SCORE_THRESHOLD → IP appartient à une flotte suspectée. fleet_score = 0 pour toutes les autres IPs. fleet_score = community_size * graph_density / log2(n_asn + 2) """ result = build_fleet_graph(df) if result is None: return {} G, ja4_nodes, asn_nodes = result try: import networkx as nx from networkx.algorithms import bipartite try: from community import best_partition as louvain_partition LOUVAIN_AVAILABLE = True except ImportError: LOUVAIN_AVAILABLE = False # Projection bipartite : graphe des JA4 partageant des ASN G_ja4 = bipartite.weighted_projected_graph(G, ja4_nodes) if G_ja4.number_of_edges() == 0: return {} # Détection de communautés if LOUVAIN_AVAILABLE: partition = louvain_partition(G_ja4, weight='weight', random_state=42) # partition = {node: community_id} communities: dict = {} for node, cid in partition.items(): communities.setdefault(cid, set()).add(node) else: # Fallback : composantes connexes communities = { i: set(c) for i, c in enumerate(nx.connected_components(G_ja4)) if len(c) >= 2 } # Calculer le fleet_score de chaque communauté fleet_scores: dict = {} # {ja4: fleet_score} for cid, members in communities.items(): if len(members) < 2: continue sub_g = G.subgraph( list(members) + [n for n in asn_nodes if any(G.has_edge(n, m) for m in members)] ) n_asn = len([n for n in sub_g.nodes if n.startswith('asn:')]) density = nx.density(G_ja4.subgraph(members)) score = len(members) * density / max(np.log2(n_asn + 2), 0.1) for ja4_node in members: ja4 = ja4_node.replace('ja4:', '') fleet_scores[ja4] = round(float(score), 3) # Mapper les fleet_scores sur les IPs if not fleet_scores: return {} ip_scores: dict = {} for _, row in df.iterrows(): ja4 = str(row.get('ja4', '')) score = fleet_scores.get(ja4, 0.0) if score >= FLEET_SCORE_THRESHOLD: src_ip = str(row.get('src_ip', '')) if src_ip: ip_scores[src_ip] = max(ip_scores.get(src_ip, 0.0), score) n_fleets = len(set(fleet_scores.values())) if ip_scores: logger.info( f"[Fleet] {len(ip_scores)} IPs dans {n_fleets} communauté(s) suspecte(s) " f"(score max={max(ip_scores.values()):.2f})" ) return ip_scores except Exception as e: logger.warning(f"[Fleet] Erreur détection de flotte : {e}") return {} def enrich_with_fleet_score(df: pd.DataFrame) -> pd.DataFrame: """Enrichit le DataFrame avec fleet_score et fleet_campaign_flag. fleet_campaign_flag = 1 si l'IP appartient à une flotte suspectée. fleet_score = score de la communauté (0 = pas de flotte). """ df = df.copy() fleet_map = detect_fleet_communities(df) df['fleet_score'] = df['src_ip'].map(fleet_map).fillna(0.0).astype(float) df['fleet_campaign_flag'] = (df['fleet_score'] >= FLEET_SCORE_THRESHOLD).astype(int) return df