Files
ja4-platform/services/bot-detector/bot_detector/browser.py
toto 1f103392ac refactor(bot-detector): extract monolith into modular package
Split bot_detector.py (~1982 lines) into 10 focused modules:
- config.py: all configuration constants and optional imports
- log.py: logging utilities (log_info, log_decision, append_training_history)
- infra.py: ClickHouse client, health check HTTP server, shutdown
- browser.py: multifactorial browser identification (5 axes)
- scoring.py: drift detection, feature validation, SHAP, clustering
- models.py: EIF, Autoencoder, XGBoost model management
- preprocessing.py: data preprocessing and feature list definitions
- pipeline.py: core semi-supervised scoring loop
- cycle.py: main analysis cycle orchestration
- __main__.py: entry point with startup banner

Update Dockerfile to copy package directory and use python -m bot_detector.

All 36 existing tests pass unchanged.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-09 01:02:04 +02:00

171 lines
7.5 KiB
Python

"""A9 — Identification multifactorielle des navigateurs.
5 axes indépendants combinés pour identifier les navigateurs légitimes.
Chaque axe produit un score [0,1]. La combinaison pondérée produit
browser_confidence [0,1], remplaçant l'ancien système binaire JA4-only.
"""
import pandas as pd
import numpy as np
# Profils structurels JA4 des navigateurs connus (TCP).
# Plages de cipher/extension count caractéristiques par famille TLS.
# Sources : FoxIO ja4plus-mapping.csv, ja4db.com, captures réelles.
_BROWSER_JA4_PROFILES = {
'Chromium': {'tls': '13', 'alpn': ('h2', 'h3'),
'ciphers': range(13, 19), 'exts': range(13, 20)},
'Firefox': {'tls': '13', 'alpn': ('h2', 'h3'),
'ciphers': range(15, 20), 'exts': range(13, 18)},
'Safari': {'tls': '13', 'alpn': ('h2', 'h3'),
'ciphers': range(17, 25), 'exts': range(11, 17)},
'Tor_Browser': {'tls': '13', 'alpn': ('h2',),
'ciphers': range(14, 18), 'exts': range(12, 17)},
}
# Pondération des 5 axes pour le score browser_confidence.
_AXIS_WEIGHTS = {
'ja4_known': 0.25, # Axe 1 — Signature JA4 dans dict_browser_ja4
'ja4_struct': 0.15, # Axe 2 — Structure JA4 (TLS1.3, h2, nb ciphers/ext)
'http_modern': 0.25, # Axe 3 — Headers HTTP modernes (sec-ch-ua, Sec-Fetch-*)
'nav_behavior': 0.15, # Axe 4 — Comportement de navigation (assets, referers)
'tls_coherence': 0.20, # Axe 5 — Cohérence TLS/TCP (pas de mismatch)
}
def _parse_ja4_columns(ja4_series: pd.Series) -> pd.DataFrame:
"""Parse la partie structurelle du JA4 (10 premiers caractères) vectorisé.
Format JA4 : {proto}{tls_ver}{sni}{cipher_cnt}{ext_cnt}{alpn}_hash_hash
Exemple : t13d1516h2_8daaf6152771_02713d6af862
Retourne un DataFrame avec : ja4_tls, ja4_ciphers, ja4_exts, ja4_alpn.
"""
s = ja4_series.fillna('').astype(str)
valid = s.str.len() >= 10
result = pd.DataFrame(index=ja4_series.index)
result['ja4_tls'] = s.str[1:3].where(valid, '00')
result['ja4_ciphers'] = pd.to_numeric(s.str[4:6].where(valid, '0'), errors='coerce').fillna(0).astype(int)
result['ja4_exts'] = pd.to_numeric(s.str[6:8].where(valid, '0'), errors='coerce').fillna(0).astype(int)
result['ja4_alpn'] = s.str[8:10].where(valid, '00')
return result
def _compute_browser_axes(df: pd.DataFrame) -> pd.DataFrame:
"""Calcule les 5 axes d'identification navigateur pour chaque session.
Axe 1 (ja4_known) : JA4 présent dans dict_browser_ja4 (0 ou 1)
Axe 2 (ja4_struct) : Structure JA4 compatible navigateur (TLS1.3 + h2/h3 + ciphers/exts)
Axe 3 (http_modern) : Headers HTTP modernes (sec-ch-ua, Accept-Language, Sec-Fetch-*)
Axe 4 (nav_behavior) : Comportement navigation (cookies, referer, assets, accès direct)
Axe 5 (tls_coherence) : Cohérence TLS/TCP (pas de mismatch ALPN, window scaling, TLS1.3)
Retourne un DataFrame avec les colonnes axis_* et browser_confidence.
"""
n = len(df)
axes = pd.DataFrame(index=df.index)
# ── Axe 1 — Signature JA4 connue ──
bf = df.get('browser_family', pd.Series('', index=df.index)).fillna('').astype(str)
axes['axis_ja4_known'] = (bf != '').astype(float)
# ── Axe 2 — Structure JA4 ──
ja4_parsed = _parse_ja4_columns(df.get('ja4', pd.Series('', index=df.index)))
is_tls13 = (ja4_parsed['ja4_tls'] == '13').astype(float)
is_h2h3 = ja4_parsed['ja4_alpn'].isin(['h2', 'h3']).astype(float)
# Plage de ciphers et extensions typique d'un navigateur (10-25)
c = ja4_parsed['ja4_ciphers']
e = ja4_parsed['ja4_exts']
ciphers_ok = ((c >= 10) & (c <= 25)).astype(float)
exts_ok = ((e >= 10) & (e <= 25)).astype(float)
axes['axis_ja4_struct'] = (
is_tls13 * 0.35 + is_h2h3 * 0.25 + ciphers_ok * 0.20 + exts_ok * 0.20
)
# ── Axe 3 — Headers HTTP modernes ──
mbs = df.get('modern_browser_score', pd.Series(0, index=df.index)).fillna(0)
hal = df.get('has_accept_language', pd.Series(0, index=df.index)).fillna(0)
sfa = df.get('sec_fetch_absence_rate', pd.Series(1, index=df.index)).fillna(1)
gar = df.get('generic_accept_ratio', pd.Series(1, index=df.index)).fillna(1)
uam = df.get('ua_ch_mismatch', pd.Series(0, index=df.index)).fillna(0)
axes['axis_http_modern'] = (
(mbs >= 50).astype(float) * 0.35
+ (hal > 0).astype(float) * 0.20
+ (sfa < 0.3).astype(float) * 0.25
+ (gar < 0.3).astype(float) * 0.10
+ (uam == 0).astype(float) * 0.10
)
# ── Axe 4 — Comportement de navigation ──
hck = df.get('has_cookie', pd.Series(0, index=df.index)).fillna(0)
hrf = df.get('has_referer', pd.Series(0, index=df.index)).fillna(0)
asr = df.get('asset_ratio', pd.Series(0, index=df.index)).fillna(0)
dar = df.get('direct_access_ratio', pd.Series(1, index=df.index)).fillna(1)
axes['axis_nav_behavior'] = (
(hck > 0).astype(float) * 0.25
+ (hrf > 0).astype(float) * 0.25
+ (asr > 0.15).astype(float) * 0.25
+ (dar < 0.5).astype(float) * 0.25
)
# ── Axe 5 — Cohérence TLS/TCP ──
alm = df.get('alpn_http_mismatch', pd.Series(0, index=df.index)).fillna(0)
nws = df.get('no_window_scale_ratio', pd.Series(0, index=df.index)).fillna(0)
t12 = df.get('tls12_ratio', pd.Series(0, index=df.index)).fillna(0)
h10 = df.get('http10_ratio', pd.Series(0, index=df.index)).fillna(0)
iam = df.get('is_alpn_missing', pd.Series(0, index=df.index)).fillna(0)
axes['axis_tls_coherence'] = (
(alm == 0).astype(float) * 0.25
+ (nws == 0).astype(float) * 0.20
+ (t12 < 0.1).astype(float) * 0.20
+ (h10 == 0).astype(float) * 0.15
+ (iam == 0).astype(float) * 0.20
)
# ── Score combiné pondéré ──
axes['browser_confidence'] = sum(
axes[f'axis_{k}'] * w for k, w in _AXIS_WEIGHTS.items()
)
return axes
def _infer_browser_family(df: pd.DataFrame, ja4_parsed: pd.DataFrame,
axes: pd.DataFrame) -> pd.Series:
"""Infère la famille navigateur par analyse structurelle quand dict_browser_ja4 manque.
Priorité :
1. browser_family connu (dict_browser_ja4) → conservé
2. Profil structurel JA4 (cipher/ext count) → famille probable
3. Sinon → '' (inconnu)
La famille inférée nécessite browser_confidence ≥ 0.45 pour éviter les faux positifs
(un bot avec un JA4 structurellement similaire mais sans comportement navigateur).
"""
bf = df.get('browser_family', pd.Series('', index=df.index)).fillna('').astype(str)
result = bf.copy()
unknown_mask = (result == '')
if not unknown_mask.any():
return result
# Inférence structurelle sur les sessions sans famille connue
c = ja4_parsed.loc[unknown_mask, 'ja4_ciphers']
e = ja4_parsed.loc[unknown_mask, 'ja4_exts']
tls = ja4_parsed.loc[unknown_mask, 'ja4_tls']
alpn = ja4_parsed.loc[unknown_mask, 'ja4_alpn']
conf = axes.loc[unknown_mask, 'browser_confidence']
# Seuil minimal : le comportement global doit être suffisamment navigateur
eligible = conf >= 0.45
for family, profile in _BROWSER_JA4_PROFILES.items():
match = (
eligible
& (tls == profile['tls'])
& alpn.isin(profile['alpn'])
& c.isin(profile['ciphers'])
& e.isin(profile['exts'])
)
# Affecter seulement si pas encore attribué
match = match & (result.loc[unknown_mask] == '')
result.loc[match[match].index] = family
return result