feat(bot-detector): add dynamic browser profiling engine with HDBSCAN clustering

Implement offline profile building (profile_builder.py) and real-time
dynamic scoring (browser_matcher_dynamic.py) using HDBSCAN-based browser
fingerprint clustering. Add ClickHouse materialized view (13_h2_profiling.sql)
for h2_profile_stats aggregation. Update thesis and project documentation
to cover the new dynamic profiling architecture.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jacquin Antoine
2026-04-13 02:06:00 +02:00
parent 64dada980f
commit c60ce97f23
9 changed files with 1325 additions and 23 deletions

View File

@ -0,0 +1,387 @@
"""Moteur de scoring dynamique des sessions H2 (browser_matcher_dynamic).
Remplace le dictionnaire statique browser_signatures.py par un scoring
basé sur les profils auto-appris (auto_browser_profiles).
Ce module est appelé pour chaque session lors du cycle ML de 300s.
Au démarrage, il charge les profils en mémoire et les rafraîchit toutes
les 24h (même cadence que le profile_builder).
Architecture :
auto_browser_profiles (ClickHouse)
↓ (chargement initial + refresh 24h)
_DynamicMatcher (singleton en mémoire)
↓ (score_session par session)
(family, score) → pipeline ML
Calcul du score :
Score = Σ (poids_i × similarité_i) × confiance_volumétrique
Similarité continue (window_update, initial_window_size) :
sim = 1 - |x - centroid| / range
où range = max(|centroid|, 1) pour la normalisation
Similarité catégorielle (pseudo_order, has_priority) :
sim = 1.0 si match exact, 0.0 sinon (forte discriminabilité)
Confiance volumétrique :
conf = min(1.0, log10(count_ips + 1) / 4)
Un profil basé sur 10 000 IPs est plus fiable qu'un profil basé sur 100.
Pondération des dimensions :
h2_window_update : 0.40 (le plus discriminant entre familles)
pseudo_order : 0.40 (ordre des pseudo-headers, invariant par version)
h2_initial_window_size : 0.10
h2_has_priority : 0.10
"""
import math
import time
from dataclasses import dataclass
from typing import Optional
import numpy as np
import pandas as pd
from .config import DB
from .log import log_info
# ---------------------------------------------------------------------------
# Constantes
# ---------------------------------------------------------------------------
# Poids des dimensions (somme = 1.0)
_WEIGHT_WINDOW_UPDATE: float = 0.40
_WEIGHT_PSEUDO_ORDER: float = 0.40
_WEIGHT_INITIAL_WINDOW: float = 0.10
_WEIGHT_HAS_PRIORITY: float = 0.10
# Mapping string → id (cohérent avec le multiIf SQL)
_PSEUDO_ORDER_MAP: dict[str, int] = {
"m,a,s,p": 1,
"m,p,a,s": 2,
"m,s,p,a": 3,
"m,p,s,a": 4,
"m,a,p,s": 5,
}
# Intervalle de rafraîchissement des profils (secondes)
_REFRESH_INTERVAL: float = 86400.0 # 24 heures
# ---------------------------------------------------------------------------
# Data class pour un profil chargé en mémoire
# ---------------------------------------------------------------------------
@dataclass
class _Profile:
"""Profil navigateur en mémoire pour le scoring temps réel.
Attributes:
profile_id: Identifiant unique du profil.
family: Famille détectée (Auto_Chrome, Auto_Firefox, etc.).
count_ips: Nombre d'IPs dans le cluster source.
iws_mean: Moyenne de h2_initial_window_size.
wu_mean: Moyenne de h2_window_update.
wu_tol: Tolérance sur window_update (mean + 3σ).
po_mode: Mode de pseudo_order_id (1-5).
prio_mode: Mode de h2_has_priority (0 ou 1).
"""
profile_id: str
family: str
count_ips: int
iws_mean: int
wu_mean: int
wu_tol: int
po_mode: int
prio_mode: int
# ---------------------------------------------------------------------------
# Scorer
# ---------------------------------------------------------------------------
class _DynamicMatcher:
"""Chargeur et scorer des profils dynamiques.
Thread-safety : non requis. Le bot_detector est mono-thread (cycle séquentiel).
"""
def __init__(self):
self._profiles: list[_Profile] = []
self._last_load: float = 0.0
self._loaded: bool = False
# --- Chargement ---
def load_profiles(self, client, force: bool = False) -> bool:
"""Charge les profils depuis auto_browser_profiles en mémoire.
Ne recharge que toutes les 24h sauf si force=True.
Si la table est vide ou n'existe pas, conserve les profils existants.
Args:
client: Client ClickHouse.
force: Forcer le rechargement même si < 24h.
Returns:
True si les profils ont été (re)chargés.
"""
now = time.time()
if not force and self._loaded and (now - self._last_load < _REFRESH_INTERVAL):
return False
try:
df = client.query_df(
f"SELECT * FROM {DB}.auto_browser_profiles FINAL"
)
except Exception as e:
log_info(f"[dynamic_matcher] Erreur chargement profils: {e}")
return False
if df is None or df.empty:
if not self._loaded:
log_info("[dynamic_matcher] Aucun profil dynamique disponible.")
return False
profiles = []
for _, row in df.iterrows():
profiles.append(_Profile(
profile_id=str(row["profile_id"]),
family=str(row["detected_family"]),
count_ips=int(row["count_ips"]),
iws_mean=int(row["h2_initial_window_size_mean"]),
wu_mean=int(row["h2_window_update_mean"]),
wu_tol=int(row["h2_window_update_tol"]),
po_mode=int(row["pseudo_order_mode"]),
prio_mode=int(row["h2_has_priority_mode"]),
))
self._profiles = profiles
self._last_load = now
self._loaded = True
log_info(f"[dynamic_matcher] {len(profiles)} profil(s) chargé(s).")
return True
@property
def profiles(self) -> list[_Profile]:
"""Accès en lecture seule aux profils chargés."""
return self._profiles
@property
def is_loaded(self) -> bool:
"""Indique si les profils ont été chargés au moins une fois."""
return self._loaded
# --- Scoring ---
def score_session(self, session: dict) -> tuple[str, float]:
"""Score une session contre tous les profils chargés.
Pipeline de scoring :
1. Extraction du vecteur H2 de la session
2. Pour chaque profil :
a. Rejet rapide si pseudo_order incompatible
b. Rejet rapide si window_update dépasse la tolérance
c. Calcul de la similarité pondérée par dimension
d. Application de la confiance volumétrique
3. Retourne la famille et le score du meilleur match
Args:
session: dict avec clés h2_initial_window_size, h2_window_update,
h2_pseudo_order (str), h2_has_priority (0/1).
Returns:
(famille, score) : famille = "Auto_*" ou "" si aucun match,
score entre 0.0 et 1.0.
"""
if not self._profiles:
return ("", 0.0)
# Extraction du vecteur session
s_iws = int(session.get("h2_initial_window_size", -1))
s_wu = int(session.get("h2_window_update", 0))
s_po_raw = str(session.get("h2_pseudo_order", ""))
s_po = _PSEUDO_ORDER_MAP.get(s_po_raw, 0)
s_prio = int(session.get("h2_has_priority", 0))
# Si la session n'a pas de données H2 valides, pas de scoring
if s_po == 0 and s_wu == 0:
return ("", 0.0)
best_family = ""
best_score = 0.0
for p in self._profiles:
# --- Rejet rapide 1 : pseudo_order incompatible ---
# Si la session a un pseudo_order et le profil en exige un différent,
# c'est un mismatch immédiat. On saute ce profil.
if s_po != 0 and p.po_mode != 0 and s_po != p.po_mode:
continue
# --- Rejet rapide 2 : window_update hors tolérance ---
# Si |wu_session - wu_profil| > tolérance, la session est trop éloignée
if p.wu_mean > 0 and abs(s_wu - p.wu_mean) > p.wu_tol:
continue
# --- Calcul de la similarité pondérée ---
score = self._compute_weighted_score(s_iws, s_wu, s_po, s_prio, p)
# --- Confiance volumétrique ---
# score *= min(1.0, log10(count_ips + 1) / 4)
# count_ips=10000 → log10(10001)/4 ≈ 1.0 → confiance max
# count_ips=100 → log10(101)/4 ≈ 0.50 → demi-confiance
volumetric = min(1.0, math.log10(p.count_ips + 1) / 4.0)
score *= volumetric
if score > best_score:
best_score = score
best_family = p.family
# Plafonnement à [0, 1]
return (best_family, min(1.0, max(0.0, best_score)))
def score_sessions_batch(self, df: pd.DataFrame) -> pd.DataFrame:
"""Score un batch de sessions (vectorisé pour la performance).
Args:
df: DataFrame avec colonnes H2 (h2_initial_window_size,
h2_window_update, h2_pseudo_order, h2_has_priority).
Returns:
DataFrame d'origine avec colonnes ajoutées :
- dynamic_family (str)
- dynamic_score (float)
"""
df = df.copy()
families = []
scores = []
# Construction d'un dict par session pour le scoring
for _, row in df.iterrows():
session = {
"h2_initial_window_size": row.get("h2_initial_window_size", -1),
"h2_window_update": row.get("h2_window_update", 0),
"h2_pseudo_order": row.get("h2_pseudo_order", ""),
"h2_has_priority": row.get("h2_has_priority", 0),
}
fam, sc = self.score_session(session)
families.append(fam)
scores.append(sc)
df["dynamic_family"] = families
df["dynamic_score"] = scores
return df
# --- Calcul de distance ---
@staticmethod
def _compute_weighted_score(
s_iws: int, s_wu: int, s_po: int, s_prio: int, profile: _Profile
) -> float:
"""Calcule le score de similarité pondéré entre une session et un profil.
Formule :
score = Σ (w_i × sim_i)
Similarité continue (window_update, initial_window_size) :
sim = max(0, 1 - |x - μ| / max(|μ|, 1))
Normalisation par la valeur absolue du centroïde pour que la
distance soit relative (une déviation de 1000 est négligeable
pour un centroïde de 15M, mais significative pour un de 5000).
Similarité catégorielle (pseudo_order, has_priority) :
sim = 1.0 si match exact, 0.0 sinon
Rationnel : l'ordre des pseudo-headers est un signal binaire fort —
il ne varie pas au sein d'une même version de navigateur.
Returns:
Score brut (non plafonné) dans [0, ~1.5] avant confiance volumétrique.
"""
score = 0.0
# --- Dimension 1 : h2_window_update (poids 0.40) ---
if profile.wu_mean > 0:
delta = abs(s_wu - profile.wu_mean)
# Normalisation par la valeur absolue du centroïde
norm = max(abs(profile.wu_mean), 1)
sim_wu = max(0.0, 1.0 - delta / norm)
elif s_wu == 0 and profile.wu_mean == 0:
sim_wu = 1.0 # Les deux sont à 0 = match parfait
else:
sim_wu = 0.0
score += _WEIGHT_WINDOW_UPDATE * sim_wu
# --- Dimension 2 : pseudo_order (poids 0.40) ---
if s_po != 0 and s_po == profile.po_mode:
sim_po = 1.0
elif s_po == 0 or profile.po_mode == 0:
sim_po = 0.0 # Inconnu = pas de signal
else:
sim_po = 0.0 # Mismatch
score += _WEIGHT_PSEUDO_ORDER * sim_po
# --- Dimension 3 : h2_initial_window_size (poids 0.10) ---
if s_iws > 0 and profile.iws_mean > 0:
delta = abs(s_iws - profile.iws_mean)
norm = max(abs(profile.iws_mean), 1)
sim_iws = max(0.0, 1.0 - delta / norm)
elif s_iws <= 0 and profile.iws_mean <= 0:
sim_iws = 1.0 # Les deux absents
else:
sim_iws = 0.0
score += _WEIGHT_INITIAL_WINDOW * sim_iws
# --- Dimension 4 : h2_has_priority (poids 0.10) ---
sim_prio = 1.0 if s_prio == profile.prio_mode else 0.0
score += _WEIGHT_HAS_PRIORITY * sim_prio
return score
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
_matcher: Optional[_DynamicMatcher] = None
def get_dynamic_matcher() -> _DynamicMatcher:
"""Retourne le singleton _DynamicMatcher (lazy initialization)."""
global _matcher
if _matcher is None:
_matcher = _DynamicMatcher()
return _matcher
def load_dynamic_profiles(client=None, force: bool = False) -> bool:
"""Charge les profils dynamiques en mémoire (convenience wrapper).
Appelé au démarrage du bot_detector et une fois par cycle (déduplication
interne par intervalle 24h).
"""
if client is None:
from .infra import get_client
client = get_client()
return get_dynamic_matcher().load_profiles(client, force=force)
def score_session_dynamic(session: dict) -> tuple[str, float]:
"""Score une session H2 contre les profils dynamiques (convenience wrapper).
Args:
session: dict avec clés h2_*. Voir _DynamicMatcher.score_session.
Returns:
(famille, score) — famille "" si aucun match, score ∈ [0.0, 1.0].
"""
return get_dynamic_matcher().score_session(session)
def score_sessions_batch_dynamic(df: pd.DataFrame) -> pd.DataFrame:
"""Score un batch de sessions (convenience wrapper).
Ajoute les colonnes dynamic_family et dynamic_score au DataFrame.
"""
return get_dynamic_matcher().score_sessions_batch(df)

View File

@ -0,0 +1,614 @@
"""Moteur de profiling dynamique automatique des navigateurs (profile_builder).
Lancé via cron quotidiennement, ce script :
1. Lit la vue view_h2_profiling_raw (sessions H2 filtrées)
2. Clusterise les sessions similaires via HDBSCAN (min_cluster_size=1000)
3. Calcule les centroïdes (moyenne + 3σ pour les variables continues,
mode pour les catégorielles)
4. Labelise les clusters par famille (Auto_Chrome, Auto_Firefox, etc.)
5. Fusionne les clusters redondants (même famille + pseudo_order + window_update proches)
6. Écrit les profils dans ja4_processing.auto_browser_profiles
7. Gère le cycle de vie : mise à jour last_seen_date, purge > 14 jours
Usage CLI :
python -m bot_detector.profile_builder
Architecture :
http_logs → view_h2_profiling_raw → HDBSCAN → auto_browser_profiles
browser_matcher_dynamic.py (scoring)
"""
import hashlib
import re
import uuid
from collections import Counter
from datetime import date, datetime, timedelta
import numpy as np
import pandas as pd
from .config import HDBSCAN_AVAILABLE, DB
from .log import log_info
# ---------------------------------------------------------------------------
# Constantes
# ---------------------------------------------------------------------------
# Taille minimale de cluster pour HDBSCAN (évite le bruit statistique)
_MIN_CLUSTER_SIZE: int = 1000
# Poids des variables dans l'espace de clustering.
# Les variables continues sont normalisées avant HDBSCAN,
# les catégorielles sont encodées en one-hot pour la distance.
_CONTINUOUS_COLS = ["h2_initial_window_size", "h2_window_update"]
_CATEGORICAL_COLS = ["pseudo_order_id", "h2_has_priority"]
_EMBEDDING_COLS = _CONTINUOUS_COLS + _CATEGORICAL_COLS
# Nombre max de jours de lookback pour la requête de profiling
_LOOKBACK_DAYS: int = 7
# Seuil de similarité pour la fusion des clusters : si deux profils de même
# famille ont des h2_window_update à moins de _MERGE_TOLERANCE_RATIO (5%),
# ils sont fusionnés en un seul profil (moyenne des centroïdes).
_MERGE_TOLERANCE_RATIO: float = 0.05
# Durée de rétention des profils inactifs (en jours)
_PROFILE_TTL_DAYS: int = 14
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _label_family(ua_series: pd.Series) -> str:
"""Attribue une famille navigateur à un cluster en analysant les User-Agents.
Logique :
- Si > 50% des UAs contiennent "Chrome" ou "Chromium" → Auto_Chrome
- Si > 50% des UAs contiennent "Firefox" → Auto_Firefox
- Si > 50% des UAs contiennent "Safari" (mais pas Chrome) → Auto_Safari
- Sinon → Auto_Unknown
Le test Safari exclut "Chrome" car Chrome sur iOS se déclare "Safari"
dans son UA mais avec un moteur Blink, ce qui fausserait le label.
"""
uas = ua_series.fillna("").astype(str).str.lower()
n = len(uas)
if n == 0:
return "Auto_Unknown"
chrome_ratio = uas.str.contains("chrome|chromium|edg|opr", regex=True).sum() / n
firefox_ratio = uas.str.contains("firefox", regex=False).sum() / n
# Safari "pur" : contient Safari mais PAS Chrome (pour exclure Chrome iOS)
safari_mask = uas.str.contains("safari", regex=False) & ~uas.str.contains("chrome", regex=False)
safari_ratio = safari_mask.sum() / n
if chrome_ratio > 0.50:
return "Auto_Chrome"
if firefox_ratio > 0.50:
return "Auto_Firefox"
if safari_ratio > 0.50:
return "Auto_Safari"
return "Auto_Unknown"
def _mode(series: pd.Series):
"""Retourne le mode (valeur la plus fréquente) d'une série, ou 0 si vide."""
if series.empty:
return 0
return int(series.mode().iloc[0])
def _compute_tolerance(values: pd.Series, mean: float) -> float:
"""Calcule la tolérance = |mean| + 3 * std, avec garde-fous.
Tolérance = intervalle de confiance à 99.7% (règle des 3 sigma).
Si std = 0 (toutes les valeurs identiques), tolérance = 5% de la moyenne
pour laisser une marge de manoeuvre aux variations mineures de version.
"""
std = float(values.std())
if std == 0:
# Pas de variance : on garde 5% de marge
return max(abs(mean) * 0.05, 100.0)
return abs(mean) + 3.0 * std
def _generate_profile_id(family: str, pseudo_order: int,
window_update_mean: float) -> str:
"""Génère un identifiant de profil déterministe basé sur les caractéristiques.
Utilise SHA-256 tronqué pour éviter les collisions tout en restant lisible.
"""
raw = f"{family}|{pseudo_order}|{int(window_update_mean)}"
h = hashlib.sha256(raw.encode()).hexdigest()[:12]
return f"bp_{family.lower()}_{h}"
# ---------------------------------------------------------------------------
# Core pipeline
# ---------------------------------------------------------------------------
def _fetch_profiling_data(client) -> pd.DataFrame:
"""Récupère les sessions H2 depuis view_h2_profiling_raw.
Requiert le client ClickHouse partagé (ja4_common.clickhouse).
Utilise un lookback de _LOOKBACK_DAYS pour avoir suffisamment de données.
Échantillonne si le volume dépasse 2M lignes (pour la performance HDBSCAN).
Returns:
DataFrame avec colonnes : src_ip, h2_initial_window_size,
h2_window_update, pseudo_order_id, h2_has_priority, header_user_agent
"""
query = f"""
SELECT
src_ip,
h2_initial_window_size,
h2_window_update,
pseudo_order_id,
h2_has_priority,
header_user_agent
FROM {DB}.view_h2_profiling_raw
WHERE log_date >= today() - {_LOOKBACK_DAYS}
LIMIT 2000000
"""
df = client.query_df(query)
if df is None or df.empty:
log_info("[profile_builder] Aucune donnée dans view_h2_profiling_raw.")
return pd.DataFrame()
# Déduplication par IP + vecteur H2 : on garde une seule observation
# par (IP, pseudo_order_id, h2_window_update) pour éviter qu'une IP
# unique avec 10k requêtes ne fausse le clustering.
df = df.drop_duplicates(
subset=["src_ip", "pseudo_order_id", "h2_window_update", "h2_has_priority"]
)
log_info(f"[profile_builder] {len(df)} sessions H2 uniques chargées.")
return df
def _cluster_sessions(df: pd.DataFrame) -> pd.DataFrame:
"""Applique HDBSCAN sur les features normalisées.
Pipeline de clustering :
1. StandardScaler sur les variables continues
2. Concaténation avec les variables catégorielles (encodage brut,
car ce sont des entiers de faible cardinalité : 0-5 et 0-1)
3. HDBSCAN avec min_cluster_size=1000, cluster_selection_method='eom'
(Excess of Mass : sélectionne les clusters stables dans la hiérarchie)
4. Le cluster -1 = bruit (sessions atypiques non rattachées à un cluster)
Returns:
DataFrame d'origine avec colonne 'cluster_id' ajoutée.
"""
from sklearn.preprocessing import StandardScaler
df = df.copy()
if len(df) < _MIN_CLUSTER_SIZE:
log_info(
f"[profile_builder] Volume insuffisant ({len(df)}) pour le clustering "
f"(min={_MIN_CLUSTER_SIZE})."
)
df["cluster_id"] = -1
return df
# Normalisation des variables continues
cont = df[_CONTINUOUS_COLS].replace([np.inf, -np.inf], np.nan).fillna(0)
cont_scaled = StandardScaler().fit_transform(cont)
# Les catégorielles sont des petits entiers : on les conserve telles quelles
# après StandardScaler sur les continues (elles restent sur leur échelle 0-5)
cat = df[_CATEGORICAL_COLS].fillna(0).values.astype(np.float64)
X = np.hstack([cont_scaled, cat])
if HDBSCAN_AVAILABLE:
import hdbscan
clusterer = hdbscan.HDBSCAN(
min_cluster_size=_MIN_CLUSTER_SIZE,
min_samples=max(2, _MIN_CLUSTER_SIZE // 10),
cluster_selection_method="eom",
)
labels = clusterer.fit_predict(X)
else:
from sklearn.cluster import DBSCAN
labels = DBSCAN(eps=0.5, min_samples=_MIN_CLUSTER_SIZE).fit_predict(X)
df["cluster_id"] = labels
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
algo = "HDBSCAN" if HDBSCAN_AVAILABLE else "DBSCAN"
log_info(f"[profile_builder] {algo}: {n_clusters} cluster(s), {len(df)} sessions.")
return df
def _compute_centroids(df: pd.DataFrame) -> list[dict]:
"""Calcule le centroïde de chaque cluster (ignore le bruit cluster_id=-1).
Pour chaque cluster :
- Variables continues → moyenne et écart-type
Tolérance = |moyenne| + 3 * σ (intervalle de confiance 99.7%)
- Variables catégorielles → mode (valeur la plus fréquente)
- Label de famille → analyse des User-Agents du cluster
Returns:
Liste de dicts, chaque dict = un profil prêt à être inséré.
"""
profiles = []
for cid, group in df.groupby("cluster_id"):
if cid == -1:
continue # Bruit HDBSCAN
# --- Variables continues ---
iws_mean = float(group["h2_initial_window_size"].mean())
wu_series = group["h2_window_update"]
wu_mean = float(wu_series.mean())
wu_tol = _compute_tolerance(wu_series, wu_mean)
# --- Variables catégorielles ---
po_mode = _mode(group["pseudo_order_id"])
prio_mode = _mode(group["h2_has_priority"])
# --- Labeling ---
family = _label_family(group["header_user_agent"])
# --- IPs uniques ---
count_ips = int(group["src_ip"].nunique())
profiles.append({
"profile_id": _generate_profile_id(family, po_mode, wu_mean),
"detected_family": family,
"count_ips": count_ips,
"last_seen_date": date.today(),
"h2_initial_window_size_mean": int(round(iws_mean)),
"h2_window_update_mean": int(round(wu_mean)),
"h2_window_update_tol": int(round(wu_tol)),
"pseudo_order_mode": po_mode,
"h2_has_priority_mode": prio_mode,
})
return profiles
def _merge_profiles(profiles: list[dict]) -> list[dict]:
"""Fusionne les clusters redondants.
Critères de fusion (deux profils sont fusionnés si TOUS sont vrais) :
1. Même detected_family (ex: Auto_Chrome)
2. Même pseudo_order_mode (même ordre des pseudo-headers)
3. h2_window_update_mean proches : |moyenne_A - moyenne_B| / max(A, B) < 5%
La fusion calcule la moyenne pondérée par count_ips :
nouvelle_moyenne = (mean_A * count_A + mean_B * count_B) / (count_A + count_B)
Returns:
Liste de profils dédupliqués et fusionnés.
"""
if not profiles:
return profiles
merged = []
used = set()
# Grouper par (famille, pseudo_order) — prérequis pour la fusion
buckets: dict[tuple, list[int]] = {}
for i, p in enumerate(profiles):
key = (p["detected_family"], p["pseudo_order_mode"])
buckets.setdefault(key, []).append(i)
for key, indices in buckets.items():
# Parcourir les profils du bucket et fusionner les proches
cluster_groups: list[list[int]] = []
for idx in indices:
if idx in used:
continue
# Nouveau groupe de fusion
group = [idx]
used.add(idx)
# Chercher d'autres profils fusionnables dans ce bucket
for other_idx in indices:
if other_idx in used:
continue
# Vérifier la proximité de window_update
wu_a = profiles[idx]["h2_window_update_mean"]
wu_b = profiles[other_idx]["h2_window_update_mean"]
if wu_a == 0 and wu_b == 0:
# Les deux sont à 0 : fusionnable
group.append(other_idx)
used.add(other_idx)
elif max(wu_a, wu_b) > 0:
ratio = abs(wu_a - wu_b) / max(wu_a, wu_b)
if ratio < _MERGE_TOLERANCE_RATIO:
group.append(other_idx)
used.add(other_idx)
cluster_groups.append(group)
# Construire le profil fusionné pour chaque groupe
for group in cluster_groups:
if len(group) == 1:
merged.append(profiles[group[0]])
continue
# Moyenne pondérée par count_ips
total_count = sum(profiles[i]["count_ips"] for i in group)
w_iws = sum(
profiles[i]["h2_initial_window_size_mean"] * profiles[i]["count_ips"]
for i in group
) / total_count
w_wu = sum(
profiles[i]["h2_window_update_mean"] * profiles[i]["count_ips"]
for i in group
) / total_count
# Tolérance = max des tolérances du groupe (on élargit le seuil)
w_tol = max(profiles[i]["h2_window_update_tol"] for i in group)
# Mode majoritaire (le plus fréquent parmi les membres)
prio_modes = Counter(profiles[i]["h2_has_priority_mode"] for i in group)
merged.append({
"profile_id": _generate_profile_id(
key[0], key[1], w_wu
),
"detected_family": key[0],
"count_ips": total_count,
"last_seen_date": date.today(),
"h2_initial_window_size_mean": int(round(w_iws)),
"h2_window_update_mean": int(round(w_wu)),
"h2_window_update_tol": int(round(w_tol)),
"pseudo_order_mode": key[1],
"h2_has_priority_mode": prio_modes.most_common(1)[0][0],
})
return merged
def _persist_profiles(client, profiles: list[dict]) -> int:
"""Insère les profils dans auto_browser_profiles via ReplacingMergeTree.
Le moteur ReplacingMergeTree(created_at) déduplique automatiquement
sur ORDER BY (detected_family, profile_id) : si un profil avec le même
profile_id existe déjà, seule la version la plus récente est conservée.
Returns:
Nombre de profils insérés.
"""
if not profiles:
return 0
columns = [
"profile_id", "detected_family", "count_ips", "last_seen_date",
"h2_initial_window_size_mean", "h2_window_update_mean",
"h2_window_update_tol", "pseudo_order_mode", "h2_has_priority_mode",
"created_at",
]
now = datetime.now()
rows = [
(
p["profile_id"],
p["detected_family"],
p["count_ips"],
p["last_seen_date"],
p["h2_initial_window_size_mean"],
p["h2_window_update_mean"],
p["h2_window_update_tol"],
p["pseudo_order_mode"],
p["h2_has_priority_mode"],
now,
)
for p in profiles
]
client.insert(f"{DB}.auto_browser_profiles", rows, column_names=columns)
return len(rows)
# ---------------------------------------------------------------------------
# Partie 4 : Cycle de vie des profils
# ---------------------------------------------------------------------------
def _update_last_seen(client) -> int:
"""Met à jour last_seen_date = today() pour les profils dont des IPs
ont été vues dans les logs des dernières 24h.
Pour chaque profil actif, on vérifie si le vecteur (pseudo_order_mode,
h2_window_update_mean) correspond à des sessions récentes dans
view_h2_profiling_raw. Si oui, last_seen_date est mis à jour.
Implémentation : INSERT INTO avec les profils existants mis à jour
(ReplacingMergeTree garantit la déduplication).
Returns:
Nombre de profils rafraîchis.
"""
today_str = date.today().isoformat()
# Récupérer les profils existants
existing = client.query_df(
f"SELECT * FROM {DB}.auto_browser_profiles FINAL"
)
if existing is None or existing.empty:
return 0
# Récupérer les vecteurs H2 observés dans les dernières 24h
recent = client.query_df(f"""
SELECT
pseudo_order_id,
avg(toUInt64(h2_window_update)) AS avg_wu
FROM {DB}.view_h2_profiling_raw
WHERE log_date >= today() - 1
GROUP BY pseudo_order_id
""")
if recent is None or recent.empty:
return 0
# Construire un set des (pseudo_order, window_update) observés récemment
# avec une tolérance de ±5% sur window_update
recent_set = set()
for _, r in recent.iterrows():
recent_set.add((int(r["pseudo_order_id"]), float(r["avg_wu"])))
# Mettre à jour les profils dont le vecteur correspond
refreshed = 0
profiles_to_update = []
for _, profile in existing.iterrows():
po = int(profile["pseudo_order_mode"])
wu_mean = float(profile["h2_window_update_mean"])
wu_tol = float(profile["h2_window_update_tol"])
# Vérifier si un vecteur récent correspond
for (r_po, r_wu) in recent_set:
if r_po != po:
continue
if wu_mean == 0:
continue
if abs(r_wu - wu_mean) <= wu_tol:
profiles_to_update.append(profile)
refreshed += 1
break
# Ré-insérer avec last_seen_date = today
if profiles_to_update:
columns = [
"profile_id", "detected_family", "count_ips", "last_seen_date",
"h2_initial_window_size_mean", "h2_window_update_mean",
"h2_window_update_tol", "pseudo_order_mode",
"h2_has_priority_mode", "created_at",
]
now = datetime.now()
rows = [
(
str(p["profile_id"]),
str(p["detected_family"]),
int(p["count_ips"]),
date.today(),
int(p["h2_initial_window_size_mean"]),
int(p["h2_window_update_mean"]),
int(p["h2_window_update_tol"]),
int(p["pseudo_order_mode"]),
int(p["h2_has_priority_mode"]),
now,
)
for p in profiles_to_update
]
client.insert(
f"{DB}.auto_browser_profiles", rows, column_names=columns
)
log_info(f"[profile_builder] {refreshed} profil(s) rafraîchi(s) (last_seen = {today_str}).")
return refreshed
def _purge_stale_profiles(client) -> int:
"""Supprime les profils dont last_seen_date < today() - 14 jours.
Utilise ALTER TABLE DELETE (mutation ClickHouse) car ReplacingMergeTree
ne supporte pas les DELETE directs. La suppression est asynchrone
(effectuée lors du prochain merge de partitions).
Returns:
Nombre de profils supprimés.
"""
cutoff = (date.today() - timedelta(days=_PROFILE_TTL_DAYS)).isoformat()
# Compter d'abord
count_result = client.query(
f"SELECT count() AS n FROM {DB}.auto_browser_profiles "
f"WHERE last_seen_date < '{cutoff}'"
)
n_stale = count_result.result_rows[0][0] if count_result.result_rows else 0
if n_stale > 0:
client.command(
f"ALTER TABLE {DB}.auto_browser_profiles DELETE "
f"WHERE last_seen_date < '{cutoff}'"
)
log_info(
f"[profile_builder] {n_stale} profil(s) obsolète(s) supprimé(s) "
f"(last_seen < {cutoff})."
)
return n_stale
# ---------------------------------------------------------------------------
# Pipeline principal
# ---------------------------------------------------------------------------
def run_profile_builder(client=None):
"""Point d'entrée principal du cron quotidien de profiling.
Pipeline :
1. Chargement des sessions H2 (view_h2_profiling_raw)
2. Clustering HDBSCAN
3. Calcul des centroïdes par cluster
4. Fusion des clusters redondants
5. Persistance dans auto_browser_profiles
6. Mise à jour des last_seen_date
7. Purge des profils obsolètes (> 14 jours)
Args:
client: Client ClickHouse (si None, utilise le singleton partagé).
"""
if client is None:
from .infra import get_client
client = get_client()
log_info("[profile_builder] Début du cycle de profiling quotidien.")
# --- Étape 1 : Chargement ---
df = _fetch_profiling_data(client)
if df.empty:
log_info("[profile_builder] Arrêt : aucune donnée à profiler.")
return
# --- Étape 2 : Clustering ---
df = _cluster_sessions(df)
# --- Étape 3 : Centroïdes ---
profiles = _compute_centroids(df)
if not profiles:
log_info("[profile_builder] Aucun cluster significatif trouvé.")
return
log_info(f"[profile_builder] {len(profiles)} profil(s) candidat(s) avant fusion.")
# --- Étape 4 : Fusion ---
profiles = _merge_profiles(profiles)
log_info(f"[profile_builder] {len(profiles)} profil(s) après fusion.")
# --- Étape 5 : Persistance ---
n_inserted = _persist_profiles(client, profiles)
log_info(f"[profile_builder] {n_inserted} profil(s) inséré(s) dans auto_browser_profiles.")
# --- Étape 6 : Cycle de vie — rafraîchissement ---
_update_last_seen(client)
# --- Étape 7 : Cycle de vie — purge ---
_purge_stale_profiles(client)
log_info("[profile_builder] Cycle de profiling terminé.")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
run_profile_builder()