feat(h2): direct per-parameter SETTINGS comparison in browser_matcher

- Rewrote _d1_h2_settings() with 3-signal weighted formula:
  direct_score×0.60 + dict_match×0.30 + ja4_coherence×0.10
  when individual SETTINGS cols are available in the DataFrame
- Added _H2_SETTINGS_COLS dict (IDs 1,2,3,4,5,6,8 → column names)
- Fallback to dict_match×0.80 + ja4_coherence×0.20 for backward compat
- Fix view_ai_features_1h: pass 7 individual SETTINGS columns through
  base_data CTE (h2_header_table_size, h2_enable_push,
  h2_max_concurrent_streams, h2_initial_window_size, h2_max_frame_size,
  h2_max_header_list_size, h2_enable_connect_protocol)
- Remove non-existent h2_dict_confidence reference from view SQL
  (dict_browser_h2 only exposes browser_family attribute)
- Add 7 new pytest cases: exact match, one wrong setting, forbidden key
  penalty, unknown fingerprint with correct settings, fallback path,
  CDN proxy neutralisation, full Chrome simulation
- 53/53 bot-detector tests pass
- Update thesis §3.9.2: document direct comparison algorithm + fallback

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-11 03:05:36 +02:00
parent 95e87149aa
commit f704541f83
4 changed files with 259 additions and 45 deletions

View File

@ -30,6 +30,17 @@ BROWSER_MATCHER_REPLACE: bool = os.getenv("BROWSER_MATCHER_REPLACE", "false").lo
# Familles Chrome-like dans le dictionnaire dict_browser_ja4.
_CHROME_JA4_FAMILIES = {"Chromium", "Chrome", "Edge"}
# Correspondance SETTINGS ID → nom de colonne dans view_ai_features_1h.
_H2_SETTINGS_COLS: dict[int, str] = {
1: "h2_header_table_size",
2: "h2_enable_push",
3: "h2_max_concurrent_streams",
4: "h2_initial_window_size",
5: "h2_max_frame_size",
6: "h2_max_header_list_size",
8: "h2_enable_connect_protocol",
}
def _col(df: pd.DataFrame, name: str, default=0) -> pd.Series:
"""Retourne une colonne du DataFrame ou une série de valeurs par défaut."""
@ -45,24 +56,62 @@ def _col(df: pd.DataFrame, name: str, default=0) -> pd.Series:
def _d1_h2_settings(df: pd.DataFrame, family: str) -> pd.Series:
"""Dimension 1 — correspondance SETTINGS H2 (poids 0.30).
Utilise h2_dict_family issu du dictionnaire dict_browser_h2.
Score 1.0 si famille correspond, 0.0 sinon.
Désactivé (score neutre 0.5) quand le trafic transite par un proxy CDN.
Deux niveaux de signal selon les données disponibles :
1. Comparaison directe par paramètre (colonnes individuelles) :
- Pour chaque clé attendue dans h2_settings_exact : exact match → 1, écart → 0
- Pour chaque clé interdite dans h2_settings_forbidden_keys : absente (-1) → 1
- Score = (correct / total_vérifications)
Pondération : direct_score × 0.60 + dict_match × 0.30 + ja4_coherence × 0.10
2. Fallback (colonnes indisponibles) : lookup dict_browser_h2 uniquement
Pondération : dict_match × 0.80 + ja4_coherence × 0.20
Neutre (0.5) quand le trafic transite par un proxy CDN (has_xff > 0).
"""
sig = BROWSER_SIGNATURES[family]
has_xff = _col(df, "has_xff")
h2_dict_family = _col(df, "h2_dict_family", "")
# Correspondance exacte de la famille dans le dictionnaire H2
match = (h2_dict_family.astype(str) == family).astype(float)
expected: dict = sig["h2_settings_exact"]
forbidden: list = sig.get("h2_settings_forbidden_keys", [])
# Signal de cohérence JA4↔H2 comme signal de renforcement
h2_ja4_coherence = _col(df, "h2_ja4_coherence")
# Vérifie si les colonnes individuelles sont disponibles
needed_cols = [_H2_SETTINGS_COLS[k] for k in expected if k in _H2_SETTINGS_COLS]
individual_available = bool(needed_cols) and all(c in df.columns for c in needed_cols)
base = match * 0.8 + h2_ja4_coherence * 0.2
if individual_available:
checks: list[pd.Series] = []
# Clés attendues : valeur exacte ET non absente
for settings_id, expected_val in expected.items():
col = _H2_SETTINGS_COLS.get(settings_id)
if col and col in df.columns:
v = _col(df, col, -1)
checks.append(((v == expected_val) & (v >= 0)).astype(float))
# Clés interdites : doivent être absentes (valeur -1)
for settings_id in forbidden:
col = _H2_SETTINGS_COLS.get(settings_id)
if col and col in df.columns:
checks.append((_col(df, col, -1) < 0).astype(float))
n = len(checks)
direct_score = sum(checks) / n if n > 0 else pd.Series(0.5, index=df.index)
h2_dict_family = _col(df, "h2_dict_family", "")
dict_match = (h2_dict_family.astype(str) == family).astype(float)
h2_ja4_coherence = _col(df, "h2_ja4_coherence")
base = direct_score * 0.60 + dict_match * 0.30 + h2_ja4_coherence * 0.10
else:
# Fallback : lookup dictionnaire uniquement
h2_dict_family = _col(df, "h2_dict_family", "")
dict_match = (h2_dict_family.astype(str) == family).astype(float)
h2_ja4_coherence = _col(df, "h2_ja4_coherence")
base = dict_match * 0.80 + h2_ja4_coherence * 0.20
# Neutre (0.5) si proxy CDN car le fingerprint H2 client est perdu
return pd.Series(
np.where(has_xff > 0, 0.5, base.values),
np.where(has_xff > 0, 0.5, base.values if hasattr(base, "values") else base),
index=df.index,
)

View File

@ -8,6 +8,13 @@ Vérifie les 6 cas de base :
5. httpcloak partial → score < seuil, PARTIAL
6. TLS↔H2 mismatch → tls_h2_family_mismatch calculé correctement
Et les cas de comparaison directe SETTINGS (colonnes individuelles) :
7. Colonnes individuelles Chrome exactes → score D1 élevé
8. Une clé SETTINGS erronée → score D1 réduit proportionnellement
9. Clé interdite présente → pénalité
10. Fingerprint inconnu du dict mais SETTINGS exacts → score D1 élevé (avantage vs dict-only)
11. Fallback dict-only quand colonnes individuelles absentes
Les tests sont auto-contenus : ils construisent des DataFrames pandas
représentant des sessions fictives sans interagir avec ClickHouse.
"""
@ -21,6 +28,8 @@ from bot_detector.browser_matcher import (
run_browser_matcher,
_compute_family_score,
_matches_non_browser,
_d1_h2_settings,
_H2_SETTINGS_COLS,
)
@ -315,3 +324,129 @@ def test_non_browser_go_net_http():
)
result = run_browser_matcher(df)
assert result.loc[0, "bm_non_browser"] is True or result.loc[0, "bm_non_browser"] == True
# ─────────────────────────────────────────────────────────────────────────────
# Tests comparaison directe SETTINGS (colonnes individuelles)
# ─────────────────────────────────────────────────────────────────────────────
def _chrome_session_with_settings(**overrides) -> pd.DataFrame:
"""Session Chrome avec colonnes SETTINGS individuelles (valeurs exactes Chrome)."""
base = {
"src_ip": "1.2.3.4",
"ja4": "t13d1517h2_8daaf6152771_e5627efa2ab1",
"browser_family": "Chromium",
"h2_dict_family": "Chrome",
"h2_dict_confidence": 1.0,
"h2_window_update_value": 15663105,
"h2_order_chromesafari": 1,
"h2_order_firefox": 0,
"h2_priority_present": 0,
"h2_settings_known": 1,
"h2_ja4_coherence": 1,
"has_accept_language": 1,
"has_sec_ch_ua": 1,
"sec_fetch_absence_rate": 0.0,
"has_xff": 0,
"tls12_ratio": 0.0,
# Colonnes SETTINGS individuelles — Chrome exact
"h2_header_table_size": 65536, # ID 1 ✓
"h2_enable_push": 0, # ID 2 ✓
"h2_max_concurrent_streams": -1, # ID 3 absent ✓ (forbidden)
"h2_initial_window_size": 6291456, # ID 4 ✓
"h2_max_frame_size": -1, # ID 5 absent ✓ (forbidden)
"h2_max_header_list_size": 262144, # ID 6 ✓
"h2_enable_connect_protocol": -1, # ID 8 absent (pas dans les expected ni forbidden)
}
base.update(overrides)
return pd.DataFrame([base])
def test_d1_direct_chrome_exact_settings():
"""Colonnes individuelles Chrome exactes → score D1 ≥ 0.90."""
df = _chrome_session_with_settings()
d1 = _d1_h2_settings(df, "Chrome")
# 4 expected keys toutes exactes + 2 forbidden absentes = 6/6 direct_score=1.0
# base = 1.0×0.60 + 1.0×0.30 + 1.0×0.10 = 1.0
assert d1.iloc[0] >= 0.90, f"Score D1 direct Chrome attendu ≥0.90, obtenu {d1.iloc[0]:.3f}"
def test_d1_direct_one_wrong_setting():
"""Une clé SETTINGS incorrecte réduit D1 proportionnellement."""
# ENABLE_PUSH=1 au lieu de 0 → 1 clé fausse sur 6 vérifications
df = _chrome_session_with_settings(h2_enable_push=1)
d1 = _d1_h2_settings(df, "Chrome")
d1_perfect = _d1_h2_settings(_chrome_session_with_settings(), "Chrome")
assert d1.iloc[0] < d1_perfect.iloc[0], "Erreur SETTINGS doit réduire D1"
# 5/6 correct → direct_score ≈ 0.833 → base ≈ 0.833×0.60 + 1.0×0.30 + 1.0×0.10 ≈ 0.90
assert d1.iloc[0] >= 0.80, f"Score D1 avec 1 erreur attendu ≥0.80, obtenu {d1.iloc[0]:.3f}"
def test_d1_forbidden_key_present_penalizes():
"""Clé interdite présente (MAX_CONCURRENT_STREAMS) → pénalité D1."""
# MAX_CONCURRENT_STREAMS=100 alors qu'il devrait être absent
df = _chrome_session_with_settings(h2_max_concurrent_streams=100)
d1 = _d1_h2_settings(df, "Chrome")
d1_perfect = _d1_h2_settings(_chrome_session_with_settings(), "Chrome")
assert d1.iloc[0] < d1_perfect.iloc[0], "Clé interdite présente doit pénaliser D1"
def test_d1_unknown_fingerprint_but_correct_settings():
"""Fingerprint inconnu du dict (h2_dict_family='') mais SETTINGS exacts → D1 élevé."""
# httpcloak scenario : fingerprint légèrement modifié → non reconnu par dict
# mais tous les SETTINGS individuels sont corrects
df = _chrome_session_with_settings(
h2_dict_family="", # dict lookup rate
h2_settings_known=0, # non reconnu
h2_ja4_coherence=0,
)
d1 = _d1_h2_settings(df, "Chrome")
# direct_score = 1.0 (SETTINGS exacts)
# dict_match = 0 (non reconnu)
# ja4_coherence = 0
# base = 1.0×0.60 + 0.0×0.30 + 0.0×0.10 = 0.60
assert d1.iloc[0] >= 0.55, (
f"SETTINGS exacts mais dict inconnu : D1 attendu ≥0.55 (0.60), obtenu {d1.iloc[0]:.3f}"
)
# Et sans colonnes individuelles, ce même cas donnerait 0.0 (dict=0, coherence=0)
df_no_cols = _chrome_session_with_settings(
h2_dict_family="",
h2_settings_known=0,
h2_ja4_coherence=0,
).drop(columns=["h2_header_table_size", "h2_enable_push", "h2_max_concurrent_streams",
"h2_initial_window_size", "h2_max_frame_size", "h2_max_header_list_size",
"h2_enable_connect_protocol"])
d1_fallback = _d1_h2_settings(df_no_cols, "Chrome")
assert d1_fallback.iloc[0] < 0.10, (
f"Fallback dict-only sans colonnes : D1 attendu ≈0.0, obtenu {d1_fallback.iloc[0]:.3f}"
)
# Avantage direct vs fallback
assert d1.iloc[0] > d1_fallback.iloc[0], "Comparaison directe doit surpasser le fallback dict-only"
def test_d1_fallback_when_no_individual_cols():
"""Sans colonnes individuelles, fallback sur dict lookup (comportement original)."""
df = _chrome_session_with_settings().drop(
columns=["h2_header_table_size", "h2_enable_push", "h2_max_concurrent_streams",
"h2_initial_window_size", "h2_max_frame_size", "h2_max_header_list_size",
"h2_enable_connect_protocol"]
)
d1 = _d1_h2_settings(df, "Chrome")
# dict=Chrome(1.0) × 0.80 + ja4_coherence(1.0) × 0.20 = 1.0
assert d1.iloc[0] >= 0.90, f"Fallback dict-only Chrome parfait attendu ≥0.90, obtenu {d1.iloc[0]:.3f}"
def test_cdn_proxy_neutralizes_individual_settings():
"""has_xff=1 : score D1 neutre (0.5) même avec SETTINGS exacts."""
df = _chrome_session_with_settings(has_xff=1)
d1 = _d1_h2_settings(df, "Chrome")
assert abs(d1.iloc[0] - 0.5) < 0.01, f"CDN proxy : D1 attendu 0.5, obtenu {d1.iloc[0]:.3f}"
def test_chrome_full_match_with_individual_settings():
"""Chrome avec colonnes individuelles exactes : score final ≥ 0.72 → LEGITIMATE_BROWSER."""
df = _chrome_session_with_settings()
result = run_browser_matcher(df)
assert result.loc[0, "bm_decision"] == "LEGITIMATE_BROWSER"
assert result.loc[0, "bm_family"] == "Chrome"
assert result.loc[0, "bm_score"] >= 0.72