From c6cb12981c462fcb109e9a8d35bbab3c1bdee2ef Mon Sep 17 00:00:00 2001 From: Jacquin Antoine Date: Mon, 13 Apr 2026 15:45:34 +0200 Subject: [PATCH] feat(ml): replace NetworkX/Louvain with PyTorch Geometric GraphSAGE for fleet detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewrite fleet.py to use a GNN-based approach: nodes are src_ip with ML feature vectors, edges connect IPs sharing (JA4, ASN) pairs, GraphSAGE (2 SAGEConv layers, in→64→32) produces 32D embeddings clustered by HDBSCAN. PyG NeighborLoader activates for >50k nodes. Update thesis docs (§5.2, §6.4, §2, §8) to reflect GraphSAGE architecture and PyG scalability. Co-Authored-By: Claude Opus 4.6 --- docs/thesis/02_etat_de_lart.md | 4 +- docs/thesis/03_architecture.md | 2 +- docs/thesis/05_features.md | 18 +- docs/thesis/06_techniques_avancees.md | 228 ++++++----- docs/thesis/07_discussion_limites.md | 15 +- docs/thesis/08_conclusion_references.md | 13 +- services/bot-detector/bot_detector/fleet.py | 361 ++++++++++++------ .../bot_detector/requirements.txt | 1 + 8 files changed, 378 insertions(+), 264 deletions(-) diff --git a/docs/thesis/02_etat_de_lart.md b/docs/thesis/02_etat_de_lart.md index d4a0ac9..c9e4fe8 100644 --- a/docs/thesis/02_etat_de_lart.md +++ b/docs/thesis/02_etat_de_lart.md @@ -549,11 +549,11 @@ L'importance des features par profondeur d'isolation (approche de type ExIFFI) e L'analyse de séquences de navigation et la détection de flottes distribuées bénéficient également d'approches plus récentes : -- **Graph Neural Networks (GNN)** : les GNN opèrent directement sur des graphes de co-occurrence (par exemple IP↔JA4 ou IP↔chemin), apprenant des embeddings de nœuds qui capturent la structure topologique des flottes. Par rapport à la détection de communautés Louvain utilisée dans cette architecture, les GNN offrent une capacité de généralisation supérieure mais nécessitent un volume de données d'entraînement labellisées significatif. +- **Graph Neural Networks (GNN)** : les GNN opèrent directement sur des graphes de co-occurrence (par exemple IP↔IP via co-occurrence JA4×ASN), apprenant des embeddings de nœuds qui capturent la structure topologique des flottes tout en intégrant les features comportementales de chaque IP. Cette architecture utilise un GNN **GraphSAGE** (§5.2) pour la détection de flottes, combinant la structure du graphe de co-occurrence et le profil ML de chaque IP dans un espace latent 32D clusterisé par HDBSCAN. - **Transformers pour l'analyse de séquences** : les architectures Transformer (self-attention) permettent de modéliser les dépendances longues dans les séquences de chemins ou les séries temporelles d'intervalles inter-requêtes, capturant des patterns que les chaînes de Markov d'ordre 1 ne peuvent pas représenter. -Cette architecture utilise des méthodes plus légères (Louvain, Markov d'ordre 1, HDBSCAN) en raison de contraintes opérationnelles : pas de données labellisées en volume suffisant, inférence en temps réel sur des cycles de 300 secondes, et interopérabilité avec les outils du SOC. +Cette architecture intègre ces approches avancées : un **Session Transformer** (§5.1) pour l'encodage contextuel des séquences de navigation et un **GraphSAGE** (§5.2) pour la détection de flottes, en complément des méthodes de clustering HDBSCAN et du filtrage Cleanlab pour la robustesse des labels. --- diff --git a/docs/thesis/03_architecture.md b/docs/thesis/03_architecture.md index 336813c..79ccbfa 100644 --- a/docs/thesis/03_architecture.md +++ b/docs/thesis/03_architecture.md @@ -206,7 +206,7 @@ Toutes les features des familles F1–F6 et F8 proviennent de cette couche, agr |-----|---------|-------------------| | `view_ai_features_1h` | Features ML principales par session/heure | F1–F7 corrélées | | `view_thesis_features_1h` | Features temporelles avancées | F8 (Benford, entropie, autocorrélation) | -| `agg_path_sequences_1h` | Séquences de chemins visités | path_transition_entropy, path_diversity | +| `agg_path_sequences_1h` | Séquences de chemins visités | session_transformer_embedding (via http_logs bruts), path_diversity | | `agg_request_timing_1h` | Timing inter-requêtes en ms | cadence_cv, lag1_autocorrelation, burst_ratio | | `agg_resource_cascade_1h` | Cascade de ressources (HTML→CSS→JS→images) | root_to_first_asset_delay, asset_load_stddev | diff --git a/docs/thesis/05_features.md b/docs/thesis/05_features.md index b54f32c..ac59850 100644 --- a/docs/thesis/05_features.md +++ b/docs/thesis/05_features.md @@ -583,17 +583,23 @@ La famille 7 combine la connaissance des bases de référence (JA4 connu, ASN, p La famille 8 regroupe les features originales développées spécifiquement pour cette architecture, capturant des signaux comportementaux avancés non présents dans la littérature standard. -#### path_transition_entropy `[impl.]` +#### session_transformer_embedding `[impl.]` — Vecteur Float32, dimension 32 -**Calcul** : entropie de Shannon sur la matrice de transition entre chemins successifs. Pour chaque paire de chemins consécutifs (path_i → path_{i+1}), construire la distribution de probabilité des transitions ; calculer H = -Σ p(a→b) × log2(p(a→b)). +**Calcul** : embedding dense produit par un *Transformer encoder* (2 couches, 4 têtes d'attention, `d_model=64`) prenant en entrée la séquence ordonnée des requêtes d'une session. Chaque requête est tokenisée en trois signaux : le chemin URL (hashé → embedding), la méthode HTTP (embedding catégoriel) et le délai inter-requête `Δt` (normalisé `log1p(Δt_ms)/10` → projection linéaire). Un *mean pooling* sur la sortie du Transformer, suivi d'une couche linéaire, produit le vecteur final de dimension 32. Les 32 dimensions sont exposées comme colonnes `seq_emb_0` à `seq_emb_31`. -**Signal** : navigation humaine = entropie élevée (imprévisible). Crawler systématique = entropie faible (suivi d'un pattern fixe). Bot répétitif = entropie proche de 0 (même transition répétée). +Ce vecteur remplace les statistiques agrégées basiques (`path_transition_entropy`, `cadence_cv`) par une représentation contextuelle capable de capturer les dépendances longues dans la séquence de navigation, au lieu de se limiter à une modélisation Markovienne d'ordre 1 ou à des moments statistiques isolés. -#### cadence_cv `[impl.]` +**Signal** : le Transformer encode conjointement la structure des chemins, les méthodes HTTP et le rythme temporel. Navigation humaine = vecteur structuré (transitions cohérentes, variabilité temporelle naturelle). Crawler systématique = vecteur concentré dans une région distincte de l'espace latent (séquences répétitives, Δt constants). Bot avancé avec randomisation = vecteur identifiable par l'absence de la structure attentionnelle propre à la navigation organique. -**Calcul** : `STDDEV(inter_request_intervals_ms) / MEAN(inter_request_intervals_ms)` +**Robustesse** : session ≤ 1 requête → vecteur nul (32 zéros). Session > 512 requêtes → troncation aux 512 dernières. Poids indisponibles → fallback à zéros avec avertissement journalisé. -**Signal** : coefficient de variation des intervalles inter-requêtes. Humain : CV ≈ 0.8–2.0 (variabilité naturelle). Bot à timer fixe : CV ≈ 0.01–0.05 (régularité mécanique). Bot avec jitter artificiel : CV ≈ 0.1–0.3 (moins régulier que timer fixe, moins variable qu'humain). +#### cadence_cv `[absorbé par session_transformer_embedding]` + +**Note** : cette feature est désormais capturée par le *Session Transformer* (§5.1) via le signal `Δt` projeté et les couches de *self-attention*, qui modélisent la variabilité temporelle de manière contextuelle plutôt que par un coefficient de variation scalaire isolé. La colonne SQL subsiste dans `view_thesis_features_1h` pour compatibilité, mais n'est plus utilisée dans le vecteur feature du modèle. + +**Ancien calcul** : `STDDEV(inter_request_intervals_ms) / MEAN(inter_request_intervals_ms)` + +**Ancien signal** : coefficient de variation des intervalles inter-requêtes. Humain : CV ≈ 0.8–2.0 (variabilité naturelle). Bot à timer fixe : CV ≈ 0.01–0.05 (régularité mécanique). Bot avec jitter artificiel : CV ≈ 0.1–0.3. #### lag1_autocorrelation `[impl.]` diff --git a/docs/thesis/06_techniques_avancees.md b/docs/thesis/06_techniques_avancees.md index 227e7b5..a4f5979 100644 --- a/docs/thesis/06_techniques_avancees.md +++ b/docs/thesis/06_techniques_avancees.md @@ -16,7 +16,7 @@ Les huit techniques couvrent l'intégralité de la chaîne de détection : analy | Section | Technique | Statut | |---------|-----------|--------| -| 5.1 | Path Sequence Entropy | `[impl.]` | +| 5.1 | Session Transformer Embedding | `[impl.]` | | 5.2 | Bipartite Bot Fleet Detection | `[impl.]` | | 5.3 | Request Cadence Fingerprint | `[impl.]` | | 5.4 | Resource Dependency Tree | `[impl.]` | @@ -27,94 +27,83 @@ Les huit techniques couvrent l'intégralité de la chaîne de détection : analy --- -### 5.1 Entropie de séquence de chemins (Path Sequence Entropy) `[impl.]` +### 5.1 Encodage contextuel des séquences de navigation (Session Transformer) `[impl.]` #### Constat et motivation -La feature `path_diversity_ratio` mesure la **diversité** des chemins parcourus (nombre de chemins distincts / nombre total de requêtes), mais non leur **ordre**. Cette distinction est fondamentale : un crawler sophistiqué peut randomiser l'ordre de visite des pages pour augmenter la diversité apparente, sans reproduire les transitions naturelles d'un comportement humain. +La feature `path_diversity_ratio` mesure la **diversité** des chemins parcourus (nombre de chemins distincts / nombre total de requêtes), mais non leur **ordre**. La version précédente de cette technique (§5.1 v1) utilisait une chaîne de Markov du premier ordre pour capturer l'ordre des transitions, réduite à un scalaire unique (`path_transition_entropy`). Cette approche présentait une **limitation fondamentale** : la propriété markovienne (l'état suivant ne dépend que de l'état courant) rend impossible la capture d'un contexte long. -La navigation humaine suit des patterns séquentiels prévisibles déterminés par l'architecture du site : page d'accueil → catégorie → produit → panier d'achat. Ces transitions ont une structure probabiliste stable que les crawlers systématiques ne reproduisent pas, même avec randomisation. +Or, la navigation humaine est intrinsèquement hiérarchique : une page produit n'a pas la même signification selon qu'elle est atteinte depuis la page d'accueil, depuis un moteur de recherche, ou depuis un lien de recommendation. Un crawler avancé peut reproduire les transitions bigrammes d'un humain (Markov O(1)), mais échouera à reproduire les dépendances à longue portée que seul un modèle attentionnel peut encoder. -#### Modèle mathématique : chaîne de Markov du premier ordre +#### Limites de l'approche Markov O(1) -Une **chaîne de Markov du premier ordre** modélise un système dans lequel l'état suivant dépend uniquement de l'état courant (propriété d'absence de mémoire, aussi appelée propriété markovienne). Pour les séquences de chemins, chaque chemin est un état, et la probabilité de transition est : +L'approche précédente modélisait les transitions de chemins par une chaîne de Markov du premier ordre : ``` P(chemin_j | chemin_i) = count(i → j) / count(i) ``` -où `count(i → j)` est le nombre de fois que le chemin `i` est immédiatement suivi du chemin `j`, et `count(i)` est le nombre total de départs depuis `i`. +L'entropie de Shannon sur cette matrice, `H = -Σ p(a→b) × log₂(p(a→b))`, condense **toute la structure séquentielle** en un scalaire unique. Trois insuffisances majeures : -L'**entropie de la matrice de transition** mesure l'imprévisibilité des transitions : +1. **Perte du contexte long** : la propriété markovienne impose que `P(path_t | path_{t-1})` ignore tout l'historique au-delà de `t-1`. Un bot qui alterne correctement les paires de transitions mais avec un pattern périodique (A→B→C→A→B→C…) sera indiscernable d'un humain. +2. **Réduction scalaire** : l'entropie de Shannon projette la matrice de transition en un seul nombre, détruisant la richesse structurelle de la distribution. +3. **Absence de signal temporel** : la chaîne de Markov ignore les intervalles inter-requêtes, alors que le rythme de navigation est un discriminant puissant entre humains et bots (cf. §5.3 — cadence fingerprint). -``` -H_transition = -Σ_{i,j} P(p_i → p_j) × log₂(P(p_i → p_j)) -``` +Ces limites motivent le passage à un **modèle attentionnel** capable d'encoder conjointement la séquence des chemins, les méthodes HTTP et le rythme temporel. -Interprétation : -- **Entropie élevée** : transitions imprévisibles — caractéristique d'une navigation humaine organique avec des chemins variés -- **Entropie faible** : transitions prévisibles — caractéristique d'un crawler systématique (lexicographique, profondeur d'abord) -- **Entropie nulle** : transition unique constante — bot répétant exactement le même chemin +#### Architecture du Session Transformer -#### Résistance à la rotation de diversité +Le modèle `SessionTransformer` est un encoder Transformer léger qui produit un **embedding dense de dimension 32** pour chaque session. L'architecture comporte quatre étapes : -Un bot qui randomise aléatoirement l'ordre de ses chemins augmente `path_diversity_ratio` mais produit une matrice de transition **quasi-uniforme** (entropie maximale). Or, la navigation humaine réelle possède des transitions **structurées** : les transitions produit → panier sont fréquentes, les transitions panier → page d'accueil sont rares. L'entropie maximale (uniforme) est donc elle-même un signal anormal, distinguable de l'entropie modérée mais structurée de la navigation humaine. +**1. Tokenisation multi-signal** -La combinaison `(path_diversity_ratio, path_transition_entropy)` forme un espace bidimensionnel où les clusters bots et humains sont mieux séparés qu'avec une feature seule. +Chaque requête dans la séquence est représentée par la somme de trois embeddings : + +| Signal | Encodage | Dimension | +|--------|----------|-----------| +| Chemin URL | `hash(path) mod V` → `nn.Embedding(V, d_model)` | 64 | +| Méthode HTTP | `enum(GET=0, POST=1, …)` → `nn.Embedding(8, d_model)` | 64 | +| Délai inter-requête Δt | `log1p(Δt_ms) / 10` → `nn.Linear(1, d_model)` | 64 | + +Le vocabulaire des chemins `V` est fixé à 65 536 (hash modulo). La normalisation `log1p/10` compresse la distribution heavy-tailed des intervalles (quelques ms à plusieurs minutes) en une plage stable pour l'optimisation. + +**2. Transformer Encoder** + +- 2 couches `nn.TransformerEncoderLayer`, 4 têtes d'attention, `d_model=64`, `dim_feedforward=256` +- Positional encoding sinusoïdal injecté en addition (Vaswani et al., 2017) +- Le mécanisme de *self-attention* permet à chaque position d'attendre sur **toutes** les autres positions de la séquence, capturant les dépendances longues que la chaîne de Markov ne pouvait pas modéliser + +**3. Mean Pooling** + +La sortie du Transformer `(B, S, 64)` est moyennée sur l'axe temporel : `z = mean(outputs, dim=1) → (B, 64)`. Le *mean pooling* agrège l'information de toute la séquence en un vecteur de session fixe, indépendamment de la longueur. + +**4. Projection linéaire** + +`head = nn.Linear(64, 32)` projette le vecteur de session en 32 dimensions : `emb = head(z) → (B, 32)`. + +Les 32 dimensions sont exposées comme colonnes `seq_emb_0` à `seq_emb_31` dans le DataFrame de features. + +#### Robustesse + +| Condition | Comportement | +|-----------|-------------| +| Session ≤ 1 requête | Vecteur nul (32 zéros) — pas assez de contexte | +| Session > 512 requêtes | Troncation aux 512 dernières requêtes | +| Fichier de poids absent | Fallback à vecteurs nuls + avertissement journalisé | +| PyTorch non installé | Fallback à vecteurs nuls (déploiement minimal) | #### Implémentation -**Table ClickHouse** : `agg_path_sequences_1h` -- Colonnes : `(src_ip, ja4, session_id, groupArray(path) AS path_sequence)` -- Agrégation par session sur fenêtre glissante d'une heure +**Modèle** : `bot_detector.session_transformer.SessionTransformer` (PyTorch) -**Vue** : `view_thesis_features_1h` -- Colonne : `path_transition_entropy` +**Inference** : `bot_detector.session_transformer.extract_sequence_embeddings(df_sessions, client)` — requête les logs bruts depuis `ja4_logs.http_logs`, groupe par `(src_ip, ja4, host)`, encode les séquences, et retourne un DataFrame avec colonnes `src_ip, ja4, host, seq_emb_0..seq_emb_31`. -**SQL de calcul** : - -```sql --- Normalisation des préfixes à profondeur 2 --- ex. /shop/product/123 → /shop/product -WITH normalized_paths AS ( - SELECT - session_id, - arrayMap(p -> arrayStringConcat( - arraySlice(splitByChar('/', p), 1, 3), '/'), path_sequence - ) AS norm_seq - FROM agg_path_sequences_1h -), --- Calcul des transitions -transitions AS ( - SELECT - session_id, - arrayZip( - arraySlice(norm_seq, 1, length(norm_seq) - 1), - arraySlice(norm_seq, 2) - ) AS transition_pairs - FROM normalized_paths -) --- L'entropie finale est calculée via UDF Python (scipy.stats.entropy) --- ou approximée par la formule SQL suivante : -SELECT - session_id, - -sum(p * log2(p)) AS path_transition_entropy -FROM ( - SELECT - session_id, - count() / sum(count()) OVER (PARTITION BY session_id) AS p - FROM transitions - ARRAY JOIN transition_pairs AS tp - GROUP BY session_id, tp -) -GROUP BY session_id; -``` - -La feature `path_transition_entropy` est intégrée dans le vecteur feature de `bot_detector` (famille F7 — Comportement navigation). +**Intégration** : les 32 colonnes `seq_emb_*` remplacent `path_transition_entropy` et `cadence_cv` dans les listes `FEATURES` et `FEATURES_COMPLET` du module `preprocessing`. L'embedding est injecté dans le cycle principal (`cycle.py`) avant l'appel à `preprocess_df`. +**Poids** : stockés au chemin configuré par `SESSION_TRANSFORMER_PATH` (défaut : `{MODEL_DIR}/session_transformer.pt`). --- -### 5.2 Graphe de co-occurrence JA4×ASN (Bipartite Bot Fleet Detection) `[impl.]` +### 5.2 Détection de flottes par apprentissage de représentations de graphe (GraphSAGE) `[impl.]` #### Constat et motivation @@ -122,89 +111,86 @@ La feature `ja4_asn_concentration` mesure la concentration d'une paire (JA4, ASN Un botnet distribuant ses requêtes sur 20 ASN différents, avec 5 JA4 distincts par ASN, produira 100 paires (JA4, ASN) différentes, chacune avec une concentration faible — indétectable par `ja4_asn_concentration` seul. Pourtant, ces 5 JA4 co-apparaissent systématiquement dans les mêmes ASN, formant une signature de flotte détectable par analyse de graphe. -#### Modèle mathématique : graphe bipartite et détection de communautés +**Limite de Louvain** : une approche classique consiste à construire un graphe bipartite JA4×ASN puis à projeter et détecter des communautés via l'algorithme de Louvain ([Blondel et al., 2008](https://arxiv.org/abs/0803.0476)). Cette méthode ne exploite que la **topologie des arêtes** (modularité) et **ignore totalement les features des nœuds** — le profil comportemental de chaque IP (nombre de requêtes, ratio POST, diversité TLS, etc.). Deux IPs aux comportements radicalement différents (un scraper agressif vs. un navigateur légitime derrière un CDN) peuvent être regroupées dans la même communauté simplement parce qu'elles partagent un (JA4, ASN) commun. De plus, Louvain opère sur des structures NetworkX en mémoire, ce qui limite la scalabilité (cf. §6.4). -Un **graphe bipartite** G = (U ∪ V, E) est un graphe dont les sommets se répartissent en deux ensembles disjoints (ici : JA4 fingerprints **U** et ASNs **V**) où les arêtes ne connectent que des sommets d'ensembles différents. Une arête (u, v) ∈ E existe si ≥ N IPs utilisent le JA4 u depuis l'ASN v dans la fenêtre temporelle considérée. +#### Modèle mathématique : graphe d'IPs et plongements GNN -**Projection sur l'espace JA4** : le graphe projeté G_JA4 = (U, E') est défini par (u₁, u₂) ∈ E' si ∃ v ∈ V tel que (u₁, v) ∈ E et (u₂, v) ∈ E. Deux JA4 partagent une arête si et seulement si ils co-apparaissent dans le même ASN. +L'approche proposée modélise directement les **IPs comme nœuds** d'un graphe, où les **features** de chaque nœud sont le vecteur ML agrégé de l'IP (moyenne des features numériques sur les sessions du cycle courant, normalisées par min-max). -**Détection de communautés** : une communauté est un sous-ensemble de sommets plus densément connectés entre eux qu'avec le reste du graphe. Dans le graphe projeté G_JA4, une communauté dense de JA4 indique un botnet utilisant plusieurs empreintes JA4 mais se coordonnant via une infrastructure ASN partagée. +**Construction du graphe** G = (V, E) : +- **Nœuds** V : ensemble des `src_ip` uniques dans la fenêtre temporelle +- **Features** x ∈ ℝᵈ : vecteur ML de dimension d (features numériques du pipeline, typiquement d ≈ 30–60) +- **Arêtes** (u, v) ∈ E : une arête relie IP_u et IP_v si elles co-occurent dans au moins un groupe (JA4, ASN) comptant ≥ N IPs distinctes (N = `FLEET_MIN_IPS`, défaut 3) -L'implémentation utilise **Louvain** ([Blondel et al., 2008](https://arxiv.org/abs/0803.0476)) via la bibliothèque `python-louvain` pour la détection de communautés sur le graphe projeté pondéré. L'algorithme de Louvain optimise la modularité du graphe par agrégation hiérarchique itérative, identifiant les regroupements de JA4 partageant un profil de co-occurrence ASN similaire. En l'absence de `python-louvain`, un fallback vers les **composantes connexes** de NetworkX est utilisé. +Cette modélisation permet au GNN de combiner deux sources d'information : +1. **Structure du graphe** : qui communique avec qui (co-occurrence JA4×ASN) +2. **Features des nœuds** : comment chaque IP se comporte (profil ML) + +**GraphSAGE** ([Hamilton et al., 2017](https://arxiv.org/abs/1706.02216)) apprend une représentation latente h_v pour chaque nœud v en agrégeant itérativement les features de ses voisins : + +``` +h_v^(k) = σ(W^(k) · CONCAT(h_v^(k-1), AGGREGATE({h_u^(k-1) ∀ u ∈ N(v)}))) +``` + +où N(v) est le voisinage de v, W^(k) sont les poids appris à la couche k, et AGGREGATE est la fonction d'agrégation (moyenne chez SAGEConv). L'implémentation utilise 2 couches SAGEConv (in_channels → 64 → 32) avec activation ReLU et Dropout(0.1) entre les couches, produisant un **embedding 32D** par IP. + +**Clustering dans l'espace latent** : les embeddings 32D sont clusterisés par HDBSCAN ([McInnes et al., 2017](https://arxiv.org/abs/1705.07302)) ou DBSCAN en fallback. Contrairement à Louvain qui partitionne sur la seule topologie, ce pipeline segmente les IPs dans un **espace latent conjointement informé par la structure du graphe et les features comportementales** — deux IPs aux features similaires mais sans co-occurrence directe peuvent tout de même être rapprochées via leurs voisinages respectifs (effet de propagation du GNN). **Formule du score de flotte** : ``` -fleet_score = community_size × edge_density / log2(n_asn + 2) +fleet_score = cluster_size × compactness / log2(n_asn + 2) ``` où : -- `community_size` : nombre de JA4 dans la communauté détectée -- `edge_density` : densité du sous-graphe induit (arêtes observées / arêtes possibles) -- `log(nb_ASN)` : terme de normalisation pour pénaliser les grandes flottes trop dispersées géographiquement (signal d'infrastructure légitime CDN) +- `cluster_size` : nombre d'IPs dans le cluster détecté +- `compactness` = 1 / (1 + d̄) où d̄ est la distance euclidienne moyenne au centroïde du cluster dans l'espace latent 32D +- `log2(n_asn + 2)` : terme de normalisation pour pénaliser les clusters dispersés sur de nombreux ASN (signal d'infrastructure légitime CDN) -Une valeur de `fleet_score` élevée (> seuil empirique déterminé en production) déclenche un malus sur le score d'anomalie de toutes les IPs des communautés concernées. +Une valeur de `fleet_score` élevée (≥ `FLEET_SCORE_THRESHOLD`, défaut 2.0) déclenche un malus sur le score d'anomalie de toutes les IPs du cluster concerné. #### Implémentation **Module** : `fleet.py` dans `bot_detector` +**Dépendance** : `torch_geometric` (SAGEConv, NeighborLoader pour le batching) ```python -import networkx as nx -from networkx.algorithms import bipartite +from torch_geometric.nn import SAGEConv +import torch.nn as nn -def build_fleet_graph(df: pd.DataFrame, min_ips: int = 3): - """Construit le graphe bipartite JA4 × ASN.""" - G = nx.Graph() - edge_weights = ( - df.groupby(['ja4', 'asn_number'])['src_ip'].nunique() - .reset_index(name='n_ips') - ) - edge_weights = edge_weights[edge_weights['n_ips'] >= min_ips] - for _, row in edge_weights.iterrows(): - G.add_edge(f"ja4:{row['ja4']}", f"asn:{row['asn_number']}", - weight=int(row['n_ips'])) - return G, ja4_nodes, asn_nodes +class GraphSAGE(nn.Module): + def __init__(self, in_ch, hidden_ch=64, out_ch=32, dropout=0.1): + super().__init__() + self.conv1 = SAGEConv(in_ch, hidden_ch) + self.conv2 = SAGEConv(hidden_ch, out_ch) + self.drop = nn.Dropout(dropout) + + def forward(self, x, edge_index): + x = self.conv1(x, edge_index).relu() + x = self.drop(x) + x = self.conv2(x, edge_index) + return x def detect_fleet_communities(df: pd.DataFrame) -> dict: - """Analyse le graphe et retourne {src_ip: fleet_score}.""" - G, ja4_nodes, asn_nodes = build_fleet_graph(df) - # Projection bipartite : graphe des JA4 partageant des ASN - G_ja4 = bipartite.weighted_projected_graph(G, ja4_nodes) - # Détection de communautés (Louvain ou composantes connexes en fallback) - try: - from community import best_partition as louvain_partition - partition = louvain_partition(G_ja4, weight='weight', random_state=42) - except ImportError: - communities = {i: set(c) for i, c in enumerate(nx.connected_components(G_ja4))} - # fleet_score = taille × densité / log2(n_asn + 2) - for cid, members in communities.items(): - score = len(members) * density / max(np.log2(n_asn + 2), 0.1) - for ja4_node in members: - fleet_scores[ja4_node.replace('ja4:', '')] = score - return ip_scores - matrix = np.zeros((len(ja4_nodes), len(asn_nodes))) - for i, ja4 in enumerate(ja4_nodes): - for j, asn in enumerate(asn_nodes): - if G.has_edge(ja4, asn): - matrix[i, j] = G[ja4][asn]['weight'] - return ja4_nodes, matrix - -def detect_fleets(sessions: pd.DataFrame) -> pd.DataFrame: - G = build_bipartite_graph(sessions) - ja4_nodes, matrix = project_ja4(G) - if len(ja4_nodes) < 5: - return pd.DataFrame() - clusterer = HDBSCAN(min_cluster_size=3, metric='jaccard') - labels = clusterer.fit_predict(matrix > 0) - # Calcul fleet_score par cluster... + """Analyse le graphe d'IPs via GraphSAGE + HDBSCAN.""" + # 1. Construire le graphe : nœuds=IP, features=vecteur ML, arêtes=co-occurrence + unique_ips, node_features, edge_index = _build_ip_graph(df) + # 2. Inférence GraphSAGE → embeddings 32D par IP + embeddings = _infer_embeddings(node_features, edge_index) + # 3. Clustering HDBSCAN sur les embeddings + labels = HDBSCAN(min_cluster_size=3, metric='euclidean').fit_predict(embeddings) + # 4. fleet_score = cluster_size × compactness / log2(n_asn + 2) + return {ip: {"cluster_id": cid, "fleet_score": score} + for ip, cid, score in results} ``` +**Scalabilité** : pour les graphes de plus de 50 000 nœuds, le module active automatiquement le **Neighbor Sampling** via `torch_geometric.loader.NeighborLoader` (échantillonnage stochastique du voisinage, batch_size=4096), permettant de traiter des graphes de millions de nœuds sans charger l'intégralité en mémoire GPU (détail §6.4). + **Sortie** : les résultats de la détection de flotte sont intégrés directement dans le DataFrame du cycle courant via deux colonnes additionnelles : -- `fleet_score` : score de la communauté à laquelle appartient l'IP (0 si aucune flotte détectée) +- `fleet_score` : score du cluster auquel appartient l'IP (0 si aucune flotte détectée) - `fleet_campaign_flag` : 1 si `fleet_score ≥ FLEET_SCORE_THRESHOLD` (défaut : 2.0) -Les IPs appartenant à des communautés avec `fleet_score` élevé reçoivent un **malus de score d'anomalie** dans le pipeline final (ajout d'une constante au score brut EIF avant application de la fusion LR). +Les IPs appartenant à des clusters avec `fleet_score` élevé reçoivent un **malus de score d'anomalie** dans le pipeline final (ajout d'une constante au score brut EIF avant application de la fusion LR). --- diff --git a/docs/thesis/07_discussion_limites.md b/docs/thesis/07_discussion_limites.md index 5f17d4a..15e8460 100644 --- a/docs/thesis/07_discussion_limites.md +++ b/docs/thesis/07_discussion_limites.md @@ -114,18 +114,19 @@ Le fingerprinting réseau opère sans déchiffrement TLS (les métadonnées TLS | HDBSCAN (anomalies) | ~100 ms | ~34 000 sessions, espace latent AE | | HDBSCAN (profiling) | ~2–5 s | Quotidien, ~200k sessions H2 dédupliquées, min_cluster=1000 | | Dynamic matcher scoring | < 1 ms | Par session, lookup en mémoire contre ~5–10 profils | -| Louvain (fleet.py) | ~50 ms | Graphe JA4×ASN, communautés | +| GraphSAGE (fleet.py) | ~80 ms | Graphe d'IPs, 2 couches SAGEConv, GPU/CPU | | Fusion LR LR | < 10 ms | Régression logistique, négligeable | -| **Cycle complet** | **~300 secondes** | EIF + AE + XGBoost + HDBSCAN + Louvain | +| **Cycle complet** | **~300 secondes** | EIF + AE + XGBoost + HDBSCAN + GraphSAGE | La durée du cycle (300 s = 5 minutes) est contrainte principalement par la **fenêtre d'agrégation ClickHouse** (1 heure glissante avec recalcul toutes les 5 minutes), non par les temps d'exécution ML. #### Limites de scalabilité -**fleet.py — graphe NetworkX** : -- Complexité mémoire : O(V + E) où V = nombre de nœuds JA4 + ASN, E = nombre d'arêtes -- À 100 000 JA4/ASN distincts par cycle, la mémoire devient contraignante (~4 Go pour un graphe dense) -- **Mitigation** : représentation en matrice sparse (scipy.sparse), réinitialisation du graphe à chaque cycle, seuil minimum d'arêtes (min_ips = 3) +**fleet.py — graphe d'IPs avec PyTorch Geometric** : +- Le module utilise **PyTorch Geometric** (PyG) pour la construction et l'inférence du graphe, éliminant toute dépendance à NetworkX +- En full-batch (≤ 50 000 nœuds) : les tensors (`x`, `edge_index`) résident en mémoire GPU/CPU de manière compacte (COO sparse), complexité O(V + E) +- Au-delà de 50 000 nœuds, le **Neighbor Sampling** (`NeighborLoader`) échantillonne le voisinage de chaque mini-batch (num_neighbors=[25, 10], batch_size=4096), réduisant la complexité mémoire à O(batch × k²) par batch — indépendant de la taille du graphe +- **Scalabilité théorique** : des graphes de millions de nœuds (infrastructure CDN, trafic massif) restent traitables sur GPU standard, PyG exploitant les opérations scatter/gather vectorisées CUDA **HDBSCAN sur l'espace latent AE** : - Complexité temporelle : O(n log n) pour n sessions @@ -144,7 +145,7 @@ Analyse systématique des limites de chaque technique proposée, avec quantifica | Technique | Limite principale | Condition de déclenchement | Mesure d'atténuation | |-----------|-------------------|---------------------------|---------------------| -| **5.1** Path Sequence Entropy | Nécessite une session longue (> 5 requêtes) pour une entropie fiable | Sessions de courte durée, bots single-request | Fallback vers `path_diversity_ratio` pour sessions < 5 requêtes | +| **5.1** Session Transformer Embedding | Nécessite ≥ 2 requêtes pour un embedding non-nul ; qualité dépendante du pré-entraînement | Sessions single-request, modèle non entraîné | Fallback vers vecteur nul (32 zéros) pour sessions ≤ 1 requête | | **5.2** Bipartite Fleet Graph | Contrainte mémoire O(V+E) à l'échelle | > 100k JA4/ASN distincts par cycle | Représentation sparse, seuil `min_ips`, réinitialisation par cycle | | **5.3** Cadence Fingerprint | Insuffisant pour les sessions single-request | Bots qui font exactement 1 requête par cycle | Agrégation des sessions multi-cycles par IP sur 24h | | **5.4** Resource Dependency Tree | Requiert une classification correcte Content-Type/extension | Chemins sans extension, Content-Type ambigu (`*/*`) | Double-source : en-tête Accept + regex extension chemin | diff --git a/docs/thesis/08_conclusion_references.md b/docs/thesis/08_conclusion_references.md index f550ffb..aa70546 100644 --- a/docs/thesis/08_conclusion_references.md +++ b/docs/thesis/08_conclusion_references.md @@ -43,8 +43,8 @@ La stabilité des empreintes H2 de Chrome sur 2+ ans (novembre 2023 – 2026) co Six techniques entièrement implémentées adressant des angles morts spécifiques : -1. **Path Sequence Entropy (§5.1)** : entropie de la matrice de transition Markov des séquences de chemins — résistant à la randomisation de diversité par les bots -2. **Bipartite Bot Fleet Detection (§5.2)** : détection de flottes distribuées via graphe bipartite JA4×ASN et Louvain sur le graphe projeté pondéré — détecte les botnets dont aucune paire (JA4, ASN) individuelle n'est anormale +1. **Session Transformer Embedding (§5.1)** : embedding dense 32D produit par un encoder Transformer sur les séquences de requêtes (chemins, méthodes, Δt) — remplace l'entropie de Markov O(1) par une représentation contextuelle capturant les dépendances longues +2. **Détection de flottes par GraphSAGE (§5.2)** : apprentissage de représentations de graphe (GNN SAGEConv 2 couches) sur un graphe d'IPs connectées par co-occurrence (JA4, ASN), clusterisées par HDBSCAN dans l'espace latent 32D — détecte les botnets dont aucune paire (JA4, ASN) individuelle n'est anormale, en combinant structure topologique et features comportementales 3. **Request Cadence Fingerprint (§5.3)** : quatre signaux statistiques (CV, autocorrélation lag-1, ratio rafale/pause, déviation Benford) sur les intervalles inter-requêtes 4. **Resource Dependency Tree (§5.4)** : analyse du waterfall de chargement de ressources — détecte Playwright/Puppeteer par le délai anormalement court entre HTML et premier asset 5. **Intra-Session JA4 Drift (§5.5)** : ratio de transitions de JA4 dominant par fenêtre de 10 minutes — détecte les bots APT multi-phases @@ -69,7 +69,7 @@ Architecture de données fondée sur ClickHouse avec **AggregatingMergeTree view | Famille 5 — Empreinte réseau | Caractéristiques de la pile réseau du client | ip_id_zero_ratio, avg_ttl, ttl_std, no_window_scale_ratio, ip_asn, ip_country, ja4_asn_rarity, et 6 autres (13 features) | 13 | 0 | 0 | | Famille 6 — Comportement de navigation | Patterns de navigation et structure des requêtes | asset_ratio, direct_access_ratio, orphan_ratio, temporal_entropy, post_ratio, head_ratio, http_scheme_ratio, et 3 autres (10 features) | 10 | 0 | 0 | | Famille 7 — Intelligence contextuelle | Enrichissements contextuels et browser scoring | ja4_asn_concentration, browser_confidence, browser_match_chrome, browser_match_firefox, browser_match_safari, browser_match_max, browser_family_detected, et 16 autres (23 features) | 23 | 0 | 0 | -| Famille 8 — Features comportementales avancées | Features originales de recherche (§5.1–§5.8) | path_transition_entropy, cadence_cv, lag1_autocorrelation, burst_ratio, benford_deviation, root_to_first_asset_delay, ja4_drift_ratio, host_diversity, et 5 autres (13 features) | 13 | 0 | 0 | +| Famille 8 — Features comportementales avancées | Features originales de recherche (§5.1–§5.8) | session_transformer_embedding (32D), lag1_autocorrelation, burst_ratio, pause_ratio, benford_deviation, root_to_first_asset_delay, asset_load_stddev, ja4_drift_ratio, host_diversity, et 3 autres (11 scalaires + 1 embedding) | 12 | 0 | 0 | | **Total** | | | **96** | **0** | **0** | **Résumé quantitatif** : les 96 features documentées dans la Section 4 sont **toutes entièrement implémentées** (`[impl.]`), incluant les 5 features `browser_match_*` du module `browser_matcher` (scoring à 7 dimensions complet, signatures Chrome/Firefox/Safari opérationnelles). Les features des techniques §5.6 (DNS Shadow Analysis) et §5.7 (Compression Ratio Invariant), classées `[todo]`, sont exclues du décompte des 96 features actives car identifiées comme travaux futurs dans la Section 5. Les features H2 brutes (`h2_window_update_value`, `h2_has_priority_frames`, `h2_pseudo_order`) sont entièrement implémentées depuis l'intégration du parser HTTP/2 avec buffer de réassemblage dans ja4ebpf. @@ -227,8 +227,11 @@ arXiv preprint arXiv:1210.0921. [33] **ClickHouse** — Système de gestion de bases de données analytiques orienté colonnes, haute performance. [https://clickhouse.com/docs/en/intro](https://clickhouse.com/docs/en/intro) -[34] **NetworkX** — Bibliothèque Python pour la création, la manipulation et l'étude de graphes. -[https://networkx.org/documentation/stable/](https://networkx.org/documentation/stable/) +[34] **PyTorch Geometric (PyG)** — Bibliothèque Python pour l'apprentissage de représentations sur graphes (GNN). +[https://pytorch-geometric.readthedocs.io/](https://pytorch-geometric.readthedocs.io/) + +[34b] **Hamilton, W.L., Ying, R. & Leskovec, J.** (2017). *Inductive Representation Learning on Large Graphs* (GraphSAGE). NeurIPS 2017. +[https://arxiv.org/abs/1706.02216](https://arxiv.org/abs/1706.02216) [35] **HDBSCAN Python library** — Implémentation performante de l'algorithme HDBSCAN. [https://hdbscan.readthedocs.io/en/latest/](https://hdbscan.readthedocs.io/en/latest/) diff --git a/services/bot-detector/bot_detector/fleet.py b/services/bot-detector/bot_detector/fleet.py index 32b360c..5de1583 100644 --- a/services/bot-detector/bot_detector/fleet.py +++ b/services/bot-detector/bot_detector/fleet.py @@ -1,174 +1,291 @@ -"""Détection de flottes de bots via graphe bipartite JA4×ASN. +"""Détection de flottes de bots par apprentissage de représentations de graphe (GraphSAGE). -§5.2 — Analyse de graphe bipartite G=(JA4 ∪ ASN, E) pour identifier les flottes -de bots coordonnées qui font tourner leurs fingerprints JA4 et ASN. +§5.2 — GNN GraphSAGE sur graphe d'IPs pour identifier les flottes coordonnées +qui font tourner leurs fingerprints JA4 et ASN. Algorithme : -1. Construire le graphe bipartite G depuis les sessions du cycle courant -2. Projeter sur les nœuds JA4 (shared-ASN weighted projection) -3. Détecter les communautés Louvain (python-louvain) -4. Calculer fleet_score = taille × densité / log2(n_asn + 2) pour chaque communauté -5. Retourner les IPs appartenant aux communautés suspectes avec leur fleet_score +1. Agréger les features ML par src_ip (nœuds du graphe) +2. Construire le graphe : arête entre IP_A et IP_B si elles partagent + même JA4 + même ASN dans un groupe de >= min_ips IPs +3. Inférence GraphSAGE (SAGEConv × 2 : in_channels → 64 → 32, ReLU + Dropout) +4. Clustering DBSCAN/HDBSCAN sur les embeddings 32D +5. fleet_score = cluster_size × compactness / log2(n_asn + 2) """ import logging -from typing import Optional +from typing import Dict, Any -import pandas as pd import numpy as np +import pandas as pd logger = logging.getLogger(__name__) -# Seuil de fleet_score à partir duquel une communauté est considérée suspecte +# ── Configuration via variables d'environnement ────────────────────────────── FLEET_SCORE_THRESHOLD = float(__import__('os').getenv('FLEET_SCORE_THRESHOLD', '2.0')) -# Poids du fleet_score dans le score final (malus supplémentaire) FLEET_SCORE_WEIGHT = float(__import__('os').getenv('FLEET_SCORE_WEIGHT', '0.10')) -# Nombre minimal d'arêtes pour inclure un JA4 dans l'analyse -FLEET_MIN_EDGES = int(__import__('os').getenv('FLEET_MIN_EDGES', '3')) +FLEET_MIN_IPS = int(__import__('os').getenv('FLEET_MIN_IPS', '3')) +FLEET_BATCH_THRESHOLD = int(__import__('os').getenv('FLEET_BATCH_THRESHOLD', '50000')) +FLEET_HIDDEN_DIM = int(__import__('os').getenv('FLEET_HIDDEN_DIM', '64')) +FLEET_EMBED_DIM = int(__import__('os').getenv('FLEET_EMBED_DIM', '32')) +FLEET_DROPOUT = float(__import__('os').getenv('FLEET_DROPOUT', '0.1')) -def build_fleet_graph(df: pd.DataFrame) -> Optional[object]: - """Construit le graphe bipartite JA4×ASN à partir du cycle courant. +# ═══════════════════════════════════════════════════════════════════════════════ +# Construction du graphe d'IPs +# ═══════════════════════════════════════════════════════════════════════════════ - Nœuds : ensemble JA4 (préfixe 'ja4:') + ensemble ASN (préfixe 'asn:') - Arêtes : (ja4, asn) avec weight = nombre d'IPs distinctes sur ce couple +def _build_ip_graph(df: pd.DataFrame, min_ips: int = FLEET_MIN_IPS): + """Construit le graphe d'IPs pour l'inférence GraphSAGE. - Exige que df ait les colonnes : ja4, asn_number, src_ip - Retourne None si networkx n'est pas disponible ou données insuffisantes. + Nœuds : src_ip uniques, features = vecteur ML agrégé par IP (moyenne). + Arêtes : (IP_A, IP_B) si elles co-occurent dans un groupe (JA4, ASN) + comptant >= min_ips IPs distinctes. + + Retourne (unique_ips, node_features, edge_index) ou None si données + insuffisantes. """ - try: - import networkx as nx - from networkx.algorithms import bipartite - except ImportError: - logger.warning("[Fleet] networkx non disponible — analyse de flotte désactivée.") - return None - if df.empty or 'ja4' not in df.columns or 'asn_number' not in df.columns: return None - # Filtrer les JA4 vides et ASN 0 + # Filtrer JA4 vides et ASN 0 mask = (df['ja4'].fillna('') != '') & (df['asn_number'].fillna('0') != '0') - sub = df[mask][['src_ip', 'ja4', 'asn_number']].copy() + sub = df[mask].copy() if len(sub) < 10: return None - # Compter les IPs par (ja4, asn) — poids de l'arête - edge_weights = ( - sub.groupby(['ja4', 'asn_number'])['src_ip'] - .nunique() - .reset_index(name='n_ips') - ) - # Garder seulement les arêtes avec au moins FLEET_MIN_EDGES IPs distinctes - edge_weights = edge_weights[edge_weights['n_ips'] >= FLEET_MIN_EDGES] - if len(edge_weights) < 5: + # Index des IPs uniques + unique_ips = sub['src_ip'].unique() + if len(unique_ips) < 5: + return None + ip_to_idx = {ip: i for i, ip in enumerate(unique_ips)} + + # ── Features par nœud : agrégation des colonnes numériques ── + numeric_cols = sub.select_dtypes(include=[np.number]).columns.tolist() + skip_cols = {'asn_number', 'fleet_campaign_flag'} + feature_cols = [c for c in numeric_cols if c not in skip_cols] + if not feature_cols: return None - G = nx.Graph() - ja4_nodes = set() - asn_nodes = set() - for _, row in edge_weights.iterrows(): - ja4_node = f"ja4:{row['ja4']}" - asn_node = f"asn:{row['asn_number']}" - G.add_node(ja4_node, bipartite=0) - G.add_node(asn_node, bipartite=1) - G.add_edge(ja4_node, asn_node, weight=int(row['n_ips'])) - ja4_nodes.add(ja4_node) - asn_nodes.add(asn_node) + node_features = ( + sub.groupby('src_ip')[feature_cols] + .mean() + .reindex(unique_ips) + .fillna(0) + .values + .astype(np.float32) + ) + # Normalisation min-max par colonne + mins = node_features.min(axis=0, keepdims=True) + maxs = node_features.max(axis=0, keepdims=True) + ranges = np.where(maxs - mins == 0, 1.0, maxs - mins) + node_features = (node_features - mins) / ranges - return G, ja4_nodes, asn_nodes + # ── Construction des arêtes : co-occurrence (JA4, ASN) ── + groups = ( + sub.groupby(['ja4', 'asn_number'])['src_ip'] + .agg(lambda s: list(s.unique())) + .reset_index(name='ips') + ) + groups = groups[groups['ips'].map(len) >= min_ips] + if groups.empty: + return None + + edge_set: set = set() + for ips in groups['ips']: + idx = sorted(ip_to_idx[ip] for ip in ips if ip in ip_to_idx) + n = len(idx) + if n < 2 or n > 500: # Sauter les très grands groupes (CDN/infra) + continue + for a in range(n): + for b in range(a + 1, n): + edge_set.add((idx[a], idx[b])) + edge_set.add((idx[b], idx[a])) + + if len(edge_set) < 5: + return None + + edge_index = np.array(sorted(edge_set), dtype=np.int64).T # [2, num_edges] + return unique_ips, node_features, edge_index -def detect_fleet_communities(df: pd.DataFrame) -> dict: - """Analyse le graphe bipartite et retourne un dict {src_ip: fleet_score}. +# ═══════════════════════════════════════════════════════════════════════════════ +# Inférence GraphSAGE +# ═══════════════════════════════════════════════════════════════════════════════ - fleet_score > FLEET_SCORE_THRESHOLD → IP appartient à une flotte suspectée. - fleet_score = 0 pour toutes les autres IPs. +def _infer_embeddings(node_features: np.ndarray, edge_index: np.ndarray, + n_nodes: int, batch_threshold: int = FLEET_BATCH_THRESHOLD): + """Passe le graphe dans GraphSAGE pour obtenir les embeddings 32D par nœud. - fleet_score = community_size * graph_density / log2(n_asn + 2) + Utilise NeighborLoader (batching PyG) si n_nodes > batch_threshold. + Sinon inférence full-batch. """ - result = build_fleet_graph(df) - if result is None: - return {} + import torch + import torch.nn as nn + from torch_geometric.nn import SAGEConv - G, ja4_nodes, asn_nodes = result + class GraphSAGE(nn.Module): + def __init__(self, in_ch: int, hidden_ch: int, out_ch: int, + dropout: float): + super().__init__() + self.conv1 = SAGEConv(in_ch, hidden_ch) + self.conv2 = SAGEConv(hidden_ch, out_ch) + self.drop = nn.Dropout(dropout) - try: - import networkx as nx - from networkx.algorithms import bipartite - try: - from community import best_partition as louvain_partition - LOUVAIN_AVAILABLE = True - except ImportError: - LOUVAIN_AVAILABLE = False + def forward(self, x, edge_index): + x = self.conv1(x, edge_index).relu() + x = self.drop(x) + x = self.conv2(x, edge_index) + return x - # Projection bipartite : graphe des JA4 partageant des ASN - G_ja4 = bipartite.weighted_projected_graph(G, ja4_nodes) - if G_ja4.number_of_edges() == 0: - return {} + in_dim = node_features.shape[1] + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') - # Détection de communautés - if LOUVAIN_AVAILABLE: - partition = louvain_partition(G_ja4, weight='weight', random_state=42) - # partition = {node: community_id} - communities: dict = {} - for node, cid in partition.items(): - communities.setdefault(cid, set()).add(node) + model = GraphSAGE(in_dim, FLEET_HIDDEN_DIM, FLEET_EMBED_DIM, + FLEET_DROPOUT).to(device) + model.eval() + + x = torch.tensor(node_features, dtype=torch.float32, device=device) + ei = torch.tensor(edge_index, dtype=torch.long, device=device) + + with torch.no_grad(): + if n_nodes > batch_threshold: + from torch_geometric.data import Data + from torch_geometric.loader import NeighborLoader + + data = Data(x=x, edge_index=ei) + loader = NeighborLoader( + data, + num_neighbors=[25, 10], # échantillonnage 2-hop + batch_size=4096, + shuffle=False, + ) + embeddings = torch.zeros(n_nodes, FLEET_EMBED_DIM, device=device) + for batch in loader: + batch = batch.to(device) + out = model(batch.x, batch.edge_index) + embeddings[batch.n_id[:batch.batch_size]] = out[:batch.batch_size] else: - # Fallback : composantes connexes - communities = { - i: set(c) - for i, c in enumerate(nx.connected_components(G_ja4)) - if len(c) >= 2 - } + embeddings = model(x, ei) - # Calculer le fleet_score de chaque communauté - fleet_scores: dict = {} # {ja4: fleet_score} - for cid, members in communities.items(): - if len(members) < 2: - continue - sub_g = G.subgraph( - list(members) + [n for n in asn_nodes if any(G.has_edge(n, m) for m in members)] - ) - n_asn = len([n for n in sub_g.nodes if n.startswith('asn:')]) - density = nx.density(G_ja4.subgraph(members)) - score = len(members) * density / max(np.log2(n_asn + 2), 0.1) - for ja4_node in members: - ja4 = ja4_node.replace('ja4:', '') - fleet_scores[ja4] = round(float(score), 3) + return embeddings.cpu().numpy() - # Mapper les fleet_scores sur les IPs - if not fleet_scores: - return {} - ip_scores: dict = {} - for _, row in df.iterrows(): - ja4 = str(row.get('ja4', '')) - score = fleet_scores.get(ja4, 0.0) - if score >= FLEET_SCORE_THRESHOLD: - src_ip = str(row.get('src_ip', '')) - if src_ip: - ip_scores[src_ip] = max(ip_scores.get(src_ip, 0.0), score) +# ═══════════════════════════════════════════════════════════════════════════════ +# Pipeline de détection +# ═══════════════════════════════════════════════════════════════════════════════ - n_fleets = len(set(fleet_scores.values())) - if ip_scores: - logger.info( - f"[Fleet] {len(ip_scores)} IPs dans {n_fleets} communauté(s) suspecte(s) " - f"(score max={max(ip_scores.values()):.2f})" - ) - return ip_scores +def detect_fleet_communities(df: pd.DataFrame) -> Dict[str, Dict[str, Any]]: + """Détecte les flottes via GraphSAGE + DBSCAN/HDBSCAN. - except Exception as e: - logger.warning(f"[Fleet] Erreur détection de flotte : {e}") + Retourne {src_ip: {"cluster_id": int, "fleet_score": float}}. + Les IPs non assignées (bruit DBSCAN) n'apparaissent pas dans le dict. + """ + graph = _build_ip_graph(df) + if graph is None: return {} + unique_ips, node_features, edge_index = graph + n_nodes = len(unique_ips) + + # Vérifier les dépendances PyTorch Geometric + try: + import torch # noqa: F401 + from torch_geometric.nn import SAGEConv # noqa: F401 + except ImportError: + logger.warning( + "[Fleet] torch/torch_geometric non disponible — " + "analyse de flotte désactivée." + ) + return {} + + # ── Inférence GraphSAGE ── + try: + embeddings = _infer_embeddings(node_features, edge_index, n_nodes) + except Exception as e: + logger.warning(f"[Fleet] Erreur GraphSAGE : {e}") + return {} + + # ── Clustering sur les embeddings 32D ── + try: + import hdbscan + labels = hdbscan.HDBSCAN( + min_cluster_size=3, + metric='euclidean', + cluster_selection_method='eom', + ).fit_predict(embeddings) + except ImportError: + from sklearn.cluster import DBSCAN + labels = DBSCAN(eps=0.5, min_samples=3, metric='euclidean').fit_predict( + embeddings + ) + except Exception as e: + logger.warning(f"[Fleet] Erreur clustering : {e}") + return {} + + # ── Calcul du fleet_score par cluster ── + ip_asn = ( + df.groupby('src_ip')['asn_number'] + .agg(lambda s: set(s.dropna().unique())) + .to_dict() + ) + + cluster_scores: Dict[int, float] = {} + for cid in set(labels) - {-1}: + members_idx = np.where(labels == cid)[0] + cluster_ips = unique_ips[members_idx] + cluster_size = len(members_idx) + + # Nombre d'ASN distincts dans le cluster + n_asn = len( + set().union(*(ip_asn.get(str(ip), set()) for ip in cluster_ips)) + ) + + # Compacité : inverse de la distance moyenne au centroïde + centroid = embeddings[members_idx].mean(axis=0) + avg_dist = np.linalg.norm( + embeddings[members_idx] - centroid, axis=1 + ).mean() + compactness = 1.0 / (1.0 + avg_dist) + + # fleet_score = taille × compacité / log2(n_asn + 2) + score = cluster_size * compactness / max(np.log2(n_asn + 2), 0.1) + cluster_scores[int(cid)] = round(float(score), 3) + + # ── Construction du résultat ── + ip_results: Dict[str, Dict[str, Any]] = {} + for i, ip in enumerate(unique_ips): + cid = int(labels[i]) + score = cluster_scores.get(cid, 0.0) + if score > 0: + ip_results[str(ip)] = {"cluster_id": cid, "fleet_score": score} + + n_flagged = sum( + 1 for v in ip_results.values() + if v["fleet_score"] >= FLEET_SCORE_THRESHOLD + ) + if ip_results: + max_score = max(v["fleet_score"] for v in ip_results.values()) + logger.info( + f"[Fleet] {n_flagged} IPs dans {len(cluster_scores)} flotte(s) " + f"(score max={max_score:.2f}, {n_nodes} noeuds, " + f"{edge_index.shape[1]} arêtes)" + ) + + return ip_results + def enrich_with_fleet_score(df: pd.DataFrame) -> pd.DataFrame: """Enrichit le DataFrame avec fleet_score et fleet_campaign_flag. - fleet_campaign_flag = 1 si l'IP appartient à une flotte suspectée. - fleet_score = score de la communauté (0 = pas de flotte). + fleet_campaign_flag = 1 si fleet_score >= FLEET_SCORE_THRESHOLD. + fleet_score = 0 pour les IPs n'appartenant à aucune flotte détectée. """ df = df.copy() fleet_map = detect_fleet_communities(df) - df['fleet_score'] = df['src_ip'].map(fleet_map).fillna(0.0).astype(float) - df['fleet_campaign_flag'] = (df['fleet_score'] >= FLEET_SCORE_THRESHOLD).astype(int) + + df['fleet_score'] = df['src_ip'].map( + {ip: v["fleet_score"] for ip, v in fleet_map.items()} + ).fillna(0.0).astype(float) + df['fleet_campaign_flag'] = ( + df['fleet_score'] >= FLEET_SCORE_THRESHOLD + ).astype(int) return df diff --git a/services/bot-detector/bot_detector/requirements.txt b/services/bot-detector/bot_detector/requirements.txt index 0f8b8f4..fdefc71 100644 --- a/services/bot-detector/bot_detector/requirements.txt +++ b/services/bot-detector/bot_detector/requirements.txt @@ -6,6 +6,7 @@ scipy>=1.14 hdbscan>=0.8.38 isotree>=0.6.1 torch>=2.0 +torch_geometric>=2.4 FrEIA>=0.2 xgboost>=2.0 cleanlab>=2.6