feat(e2e): add distributed E2E test framework with parametric traffic generation

Add run-e2e-test.sh with CLI parameters (--hits, --http-ratio, --dns, --tls,
--src-ips, --keep-analysis, --up) for configurable traffic generation. Traffic
runs from VM endpoints with multiple source IPs (alias IPs on eth0) to produce
distinct sessions for the ML pipeline. Fix curl TLS flags (--tlsv1.2 instead
of --tls-v1-2), skip redundant local verification in distributed mode, and
fix dashboard is_available() cache that never retried after ClickHouse recovery.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jacquin Antoine
2026-04-15 00:09:32 +02:00
parent 7894d39f1c
commit f88b739992
40 changed files with 2154 additions and 337 deletions

View File

@ -72,6 +72,11 @@ HEALTH_PORT = int(os.getenv('HEALTH_PORT', '8080'))
DEDUP_TTL_MIN = int(os.getenv('DEDUP_TTL_MIN', '60'))
RECURRENCE_WEIGHT = _require_float('RECURRENCE_WEIGHT', 0.005)
# ─── Baseline minimum — nombre minimum de sessions humaines pour l'IF ─────
MIN_HUMAN_BASELINE = int(os.getenv('MIN_HUMAN_BASELINE', '500'))
# En mode test, les IPs privées n'ont pas d'ASN 'isp' — utiliser 'unknown' comme fallback
BASELINE_ACCEPT_UNKNOWN = os.getenv('BASELINE_ACCEPT_UNKNOWN', 'false').lower() == 'true'
# ─── Autoencoder (AE) — second scorer parallèle ────────────────────────────
AE_WEIGHT = _require_float('AE_WEIGHT', 0.30, 0, 1)
@ -79,6 +84,9 @@ AE_EPOCHS = int(os.getenv('AE_EPOCHS', '50'))
AE_LATENT_DIM = int(os.getenv('AE_LATENT_DIM', '16'))
AE_LEARNING_RATE = float(os.getenv('AE_LEARNING_RATE', '1e-3'))
# ─── NFEnsemble — Deep Ensemble (M=5) incertitude ──────────────────────────
NF_UNCERTAINTY_THRESHOLD = float(os.getenv('NF_UNCERTAINTY_THRESHOLD', '1.0'))
SESSION_TRANSFORMER_PATH = os.getenv(
'SESSION_TRANSFORMER_PATH',
os.path.join(MODEL_DIR, 'session_transformer.pt')

View File

@ -218,16 +218,28 @@ def fetch_and_analyze():
if not unknown_h2.empty:
n_unknown = len(unknown_h2)
# Insérer les fingerprints inconnus dans la table ClickHouse
client.command(
"INSERT INTO ja4_processing.unknown_h2_fingerprints "
"(observed_at, src_ip, ja4, h2_fingerprint, h2_settings_fp, "
"h2_window_update, h2_pseudo_order, h2_has_priority, "
"browser_confidence_score, header_user_agent, tls_version) "
"SELECT now(), src_ip, ja4, h2_fingerprint, h2_settings_fp, "
"h2_window_update, h2_pseudo_order, h2_has_priority, "
"browser_confidence, header_user_agent, tls_version "
"FROM input"
)
cols = [
'observed_at', 'src_ip', 'ja4', 'h2_fingerprint', 'h2_settings_fp',
'h2_window_update', 'h2_pseudo_order', 'h2_has_priority',
'browser_confidence_score', 'header_user_agent', 'tls_version',
]
rows = []
for _, row in unknown_h2.iterrows():
rows.append({
'observed_at': row.get('time', ''),
'src_ip': row.get('src_ip', ''),
'ja4': row.get('ja4', ''),
'h2_fingerprint': row.get('h2_fingerprint', ''),
'h2_settings_fp': row.get('h2_settings_fp', ''),
'h2_window_update': int(row.get('h2_window_update', 0)),
'h2_pseudo_order': row.get('h2_pseudo_order', ''),
'h2_has_priority': int(row.get('h2_has_priority', 0)),
'browser_confidence_score': float(row.get('browser_confidence', 0.0)),
'header_user_agent': row.get('header_user_agent', ''),
'tls_version': row.get('tls_version', ''),
})
client.insert('ja4_processing.unknown_h2_fingerprints', rows,
column_names=cols)
log_info(f'[H2 Queue] {n_unknown} fingerprint(s) H2 inconnu(s) mis en file d\'examen.')
except Exception as e:
log_info(f'[H2 Queue] Erreur insertion unknown_h2_fingerprints : {e}')
@ -324,8 +336,12 @@ def fetch_and_analyze():
log_info('')
log_info(f'── Modèle Applicatif (L7 seul, non-corrélé) : {len(df_uncorr)} sessions, {len(feats)} features ──')
anom_b, scored_b = run_semi_supervised_logic(df_uncorr, feats, 'Applicatif', cycle_id, recurrence_map)
all_anom = pd.concat([anom_a, anom_b], ignore_index=True)
all_scored = pd.concat([scored_a, scored_b], ignore_index=True)
_anom_dfs = [df for df in [anom_a, anom_b]
if df is not None and not df.empty]
all_anom = pd.concat(_anom_dfs, ignore_index=True) if _anom_dfs else pd.DataFrame()
_scored_dfs = [df for df in [scored_a, scored_b]
if df is not None and not df.empty]
all_scored = pd.concat(_scored_dfs, ignore_index=True) if _scored_dfs else pd.DataFrame()
# ── A3 : Analyse fenêtre 24h (optionnelle) ────────────────────────────────
if ENABLE_MULTIWINDOW:
@ -336,8 +352,12 @@ def fetch_and_analyze():
log_info(f"[24h] {len(df_24h)} sessions dans la fenêtre 24h.")
anom_c, scored_c = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 1].copy(), feats_complet, 'Complet_24h', cycle_id, recurrence_map)
anom_d, scored_d = run_semi_supervised_logic(df_24h[df_24h['correlated'] == 0].copy(), feats, 'Applicatif_24h', cycle_id, recurrence_map)
all_anom_24h = pd.concat([anom_c, anom_d], ignore_index=True)
all_scored_24h = pd.concat([scored_c, scored_d], ignore_index=True)
_anom_24h_dfs = [df for df in [anom_c, anom_d]
if df is not None and not df.empty]
all_anom_24h = pd.concat(_anom_24h_dfs, ignore_index=True) if _anom_24h_dfs else pd.DataFrame()
_scored_24h_dfs = [df for df in [scored_c, scored_d]
if df is not None and not df.empty]
all_scored_24h = pd.concat(_scored_24h_dfs, ignore_index=True) if _scored_24h_dfs else pd.DataFrame()
# Fusion : pour les IPs présentes dans les deux fenêtres, conserver le score le plus bas
if not all_anom_24h.empty:
all_anom = pd.concat([all_anom, all_anom_24h], ignore_index=True)

View File

@ -106,31 +106,40 @@ def record_cycle_metrics(
_emit_alerts(model_name, anomaly_rate, drift_rate, correlated_rate, latency_ms, drift_alert)
try:
client.execute(
f"INSERT INTO {db}.ml_performance_metrics VALUES",
[{
'cycle_at': now,
'model_name': model_name,
'total_sessions': n_total,
'correlated_rate': round(float(correlated_rate), 4),
'anomaly_rate': round(float(anomaly_rate), 4),
'critical_count': n_critical,
'high_count': n_high,
'medium_count': n_medium,
'low_count': n_low,
'known_bot_count': n_known_bot,
'anubis_deny_count': n_anubis_deny,
'legit_browser_count': n_legit_browser,
'drift_rate': round(float(drift_rate), 4),
'drift_alert': drift_alert,
'cycle_latency_ms': latency_ms,
'features_valid': valid_features,
'features_total': total_features,
'baseline_size': baseline_size,
'threshold': round(float(threshold), 6),
'meta_learner_active': 1 if meta_learner_active else 0,
}]
# Vérifier que la table existe avant d'insérer (optionnelle)
table_check = client.query(
f"SELECT name FROM system.tables WHERE database = '{db}' AND name = 'ml_performance_metrics'"
)
if not table_check.result_rows:
logger.debug("[Metrics] Table ml_performance_metrics absente — métriques non enregistrées")
return
client.insert(
f"{db}.ml_performance_metrics",
[[
now,
model_name,
n_total,
round(float(correlated_rate), 4),
round(float(anomaly_rate), 4),
n_critical,
n_high,
n_medium,
n_low,
n_known_bot,
n_anubis_deny,
n_legit_browser,
round(float(drift_rate), 4),
drift_alert,
latency_ms,
valid_features,
total_features,
baseline_size,
round(float(threshold), 6),
1 if meta_learner_active else 0,
]]
)
logger.debug(f"[Metrics] Cycle {cycle_id} enregistré ({latency_ms}ms)")
except Exception as e:
logger.warning(f"[Metrics] Erreur d'enregistrement des métriques : {e}")

View File

@ -203,6 +203,78 @@ class TrafficNormalizingFlow:
return nf
class NFEnsemble:
"""Deep Ensemble de M=5 Normalizing Flows pour quantification d'incertitude.
Chaque membre est un TrafficNormalizingFlow indépendant, entraîné sur un
échantillon bootstrap (avec remise) de la baseline humaine. L'incertitude
(variance inter-modèles) discrimine la dérive organique (variance faible,
les modèles s'accordent) de la dérive adversariale (variance élevée, les
modèles ne s'accordent pas sur la nouveauté).
Référence : Lakshminarayanan et al., 2017 — "Simple and Scalable Predictive
Uncertainty Estimation using Deep Ensembles" (NeurIPS).
"""
ENSEMBLE_SIZE = 5
def __init__(self, n_features: int):
if not TORCH_AVAILABLE:
raise RuntimeError("PyTorch non disponible — NFEnsemble désactivé.")
self.n_features = n_features
self.models = [TrafficNormalizingFlow(n_features) for _ in range(self.ENSEMBLE_SIZE)]
def fit(self, X: np.ndarray, epochs: int = AE_EPOCHS, lr: float = AE_LEARNING_RATE,
batch_size: int = 256) -> dict:
"""Entraîne les M modèles sur des échantillons bootstrapés (avec remise)."""
n = len(X)
all_losses = []
for i, nf in enumerate(self.models):
idx = np.random.choice(n, size=n, replace=True)
X_boot = X[idx]
stats = nf.fit(X_boot, epochs=epochs, lr=lr, batch_size=batch_size)
all_losses.append(stats['final_loss'])
return {
'final_losses': all_losses,
'mean_loss': float(np.mean(all_losses)),
'ensemble_size': self.ENSEMBLE_SIZE,
'n_samples': n,
}
def predict_anomalies(self, X: np.ndarray) -> tuple:
"""Retourne (mean_score, uncertainty_score) — tuple de np.ndarray.
mean_score : moyenne des -log p(x) sur les M modèles.
uncertainty_score : variance des -log p(x) sur les M modèles.
"""
scores = np.stack([nf.score_samples(X) for nf in self.models], axis=0)
return scores.mean(axis=0), scores.var(axis=0)
def score_samples(self, X: np.ndarray) -> np.ndarray:
"""Compatibilité : retourne mean_score seul (comme TrafficNormalizingFlow)."""
mean, _ = self.predict_anomalies(X)
return mean
def encode(self, X: np.ndarray) -> np.ndarray:
"""Espace latent moyen sur l'ensemble."""
latents = np.stack([nf.encode(X) for nf in self.models], axis=0)
return latents.mean(axis=0)
def state_dict(self) -> dict:
return {
'ensemble_size': self.ENSEMBLE_SIZE,
'n_features': self.n_features,
'members': [nf.state_dict() for nf in self.models],
}
@classmethod
def load_state_dict(cls, state: dict) -> 'NFEnsemble':
ens = cls(state['n_features'])
for i, member_state in enumerate(state['members']):
ens.models[i] = TrafficNormalizingFlow.load_state_dict(member_state)
return ens
def _ae_model_path(name: str, version_id: str) -> str:
return os.path.join(MODEL_DIR, f'ae_{name}_{version_id}.pt')
@ -411,7 +483,7 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list,
dérive, entraîne un nouveau modèle sur ``human_baseline``, le sérialise sur disque,
met à jour le fichier pointeur et purge les anciennes versions.
Retourne (IsolationForest, TrafficNormalizingFlow|None, list[str] features).
Retourne (IsolationForest, NFEnsemble|None, list[str] features).
"""
model_path, meta = _get_current_version(name)
if model_path and meta:
@ -455,8 +527,8 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list,
ae_path = _ae_model_path(name, meta['version_id'])
if os.path.exists(ae_path):
try:
ae_loaded = TrafficNormalizingFlow.load_state_dict(torch.load(ae_path, weights_only=False))
log_info(f"[{name}] Normalizing Flow v{meta['version_id']} rechargé.")
ae_loaded = NFEnsemble.load_state_dict(torch.load(ae_path, weights_only=False))
log_info(f"[{name}] NFEnsemble v{meta['version_id']} rechargé (M={NFEnsemble.ENSEMBLE_SIZE}).")
except Exception as exc:
log_info(f"[{name}] Erreur chargement AE : {exc} — AE désactivé ce cycle.")
return joblib.load(model_path), ae_loaded, meta.get('features', features)
@ -519,7 +591,7 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list,
ae_prev_path = _ae_model_path(name, meta.get('version_id', ''))
if os.path.exists(ae_prev_path):
try:
ae_prev = TrafficNormalizingFlow.load_state_dict(torch.load(ae_prev_path, weights_only=False))
ae_prev = NFEnsemble.load_state_dict(torch.load(ae_prev_path, weights_only=False))
except Exception:
pass
return joblib.load(model_path), ae_prev, meta.get('features', features)
@ -539,17 +611,17 @@ def load_or_train_model(name: str, human_baseline: pd.DataFrame, features: list,
new_meta_path = os.path.join(MODEL_DIR, f'model_{name}_{version_id}.meta.json')
joblib.dump(model, new_model_path)
# Entraînement du Normalizing Flow en parallèle (si PyTorch disponible et AE_WEIGHT > 0)
# Entraînement du NFEnsemble (M=5) en parallèle (si PyTorch disponible et AE_WEIGHT > 0)
ae_model = None
if TORCH_AVAILABLE and AE_WEIGHT > 0:
try:
ae_model = TrafficNormalizingFlow(n_features=len(features))
ae_model = NFEnsemble(n_features=len(features))
ae_stats = ae_model.fit(X_train.values)
ae_path = _ae_model_path(name, version_id)
torch.save(ae_model.state_dict(), ae_path)
log_info(f"[{name}] Normalizing Flow entraîné : NLL={ae_stats['final_loss']:.6f}, epochs={ae_stats['epochs']}")
log_info(f"[{name}] NFEnsemble entraîné (M={NFEnsemble.ENSEMBLE_SIZE}) : NLL moyen={ae_stats['mean_loss']:.6f}")
except Exception as exc:
log_info(f"[{name}] Normalizing Flow training échoué : {exc} — NF désactivé.")
log_info(f"[{name}] NFEnsemble training échoué : {exc} — NF désactivé.")
ae_model = None
previous_version = meta.get('version_id', None) if meta else None

View File

@ -11,11 +11,12 @@ from .config import (
ANOMALY_THRESHOLD, ANOMALY_PERCENTILE, ENABLE_CLUSTERING,
ENABLE_SHAP, EIF_AVAILABLE, TORCH_AVAILABLE, XGB_AVAILABLE,
BROWSER_CONFIDENCE_THRESHOLD, BROWSER_COHORT_RATIO,
MIN_VALID_FEATURE_RATIO, STRUCTURAL_EXCLUDED_FEATURES,
MIN_VALID_FEATURE_RATIO, MIN_HUMAN_BASELINE, BASELINE_ACCEPT_UNKNOWN, STRUCTURAL_EXCLUDED_FEATURES,
NF_UNCERTAINTY_THRESHOLD,
)
from .log import log_info, log_decision
from .infra import score_to_threat_level, get_client
from .models import load_or_train_model, load_or_train_xgb, TrafficNormalizingFlow
from .models import load_or_train_model, load_or_train_xgb, TrafficNormalizingFlow, NFEnsemble
from .scoring import (
validate_features, compute_adaptive_threshold, normalize_scores,
compute_shap_top_features, build_reason, cluster_anomalies,
@ -51,13 +52,18 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
# Les DENY sont TOUJOURS inclus dans les threats, indépendamment du seuil IF.
unknown_traffic = rest[rest['anubis_bot_action'] != 'ALLOW'].copy()
human_baseline = unknown_traffic[unknown_traffic['asn_label'] == 'isp']
# En mode test (BASELINE_ACCEPT_UNKNOWN), les IPs sans ASN 'isp' utilisent 'unknown' comme fallback
if len(human_baseline) < MIN_HUMAN_BASELINE and BASELINE_ACCEPT_UNKNOWN:
unknown_baseline = unknown_traffic[unknown_traffic['asn_label'] == 'unknown']
if len(unknown_baseline) > len(human_baseline):
human_baseline = unknown_baseline
log_info(f'[{name}] ── Triage ──────────────────────────────────────')
log_info(f'[{name}] Total sessions : {len(df):>6}')
log_info(f'[{name}] Bots connus (dict) : {len(known_bots):>6}')
log_info(f'[{name}] Anubis ALLOW : {len(anubis_allow):>6}')
log_info(f'[{name}] Trafic à scorer (IF) : {len(unknown_traffic):>6}')
log_info(f'[{name}] Baseline ISP (human) : {len(human_baseline):>6} (seuil min=500)')
log_info(f'[{name}] Baseline ISP (human) : {len(human_baseline):>6} (seuil min={MIN_HUMAN_BASELINE})')
# §3 — Exclure les sessions ISP à faible cohérence de fingerprint de la baseline humaine
# Ces sessions ISP avec un fingerprint incohérent sont probablement des proxies résidentiels
@ -81,8 +87,8 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
if valid_features is None:
return pd.DataFrame(), pd.DataFrame()
if len(human_baseline) < 500:
log_info(f"[{name}] ⚠ Données humaines insuffisantes ({len(human_baseline)} < 500) — cycle ignoré.")
if len(human_baseline) < MIN_HUMAN_BASELINE:
log_info(f"[{name}] ⚠ Données humaines insuffisantes ({len(human_baseline)} < {MIN_HUMAN_BASELINE}) — cycle ignoré.")
log_info(f"[{name}] Distribution asn_label dans le trafic à scorer :")
if 'asn_label' in unknown_traffic.columns:
for label, cnt in unknown_traffic['asn_label'].value_counts().head(8).items():
@ -115,17 +121,38 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
log_info(f'[{name}] Scoring EIF : {len(X_test)} sessions scorées (min={raw_scores.min():.4f}, max={raw_scores.max():.4f}, mean={raw_scores.mean():.4f})')
# Combinaison EIF + Normalizing Flow si disponible
# Combinaison EIF + NFEnsemble (Deep Ensemble M=5) si disponible
# Score final = (1-α) * eif_norm + α * nf_norm où α = AE_WEIGHT
# Incertitude = variance inter-modèles → détection adversariale
unknown_traffic['nf_uncertainty'] = 0.0
unknown_traffic['is_adversarial_drift'] = False
if ae_model is not None and AE_WEIGHT > 0:
try:
nf_neg_ll = ae_model.score_samples(X_test.values) # -log p(x)
if isinstance(ae_model, NFEnsemble):
nf_neg_ll, nf_uncertainty = ae_model.predict_anomalies(X_test.values)
else:
nf_neg_ll = ae_model.score_samples(X_test.values)
nf_uncertainty = np.zeros(len(nf_neg_ll))
nf_norm = normalize_scores(-nf_neg_ll) # plus élevé = plus anomal
eif_norm = normalize_scores(raw_scores)
combined_norm = (1 - AE_WEIGHT) * eif_norm + AE_WEIGHT * nf_norm
unknown_traffic['ae_recon_error'] = nf_neg_ll # nom conservé pour rétro-compatibilité
unknown_traffic['nf_uncertainty'] = nf_uncertainty
adversarial_mask = nf_uncertainty > NF_UNCERTAINTY_THRESHOLD
unknown_traffic['is_adversarial_drift'] = adversarial_mask
n_adversarial = int(adversarial_mask.sum())
unknown_traffic['anomaly_score'] = combined_norm
log_info(f"[{name}] Score combiné EIF+NF (α={AE_WEIGHT}): nf_mean={nf_neg_ll.mean():.6f}")
log_info(
f"[{name}] Score combiné EIF+NF (α={AE_WEIGHT}): nf_mean={nf_neg_ll.mean():.6f}, "
f"uncertainty_mean={nf_uncertainty.mean():.6f}, adversarial={n_adversarial}"
)
if n_adversarial > 0:
log_decision('ADVERSARIAL_DRIFT_NF', cycle_id, name, {
'n_adversarial': n_adversarial,
'uncertainty_threshold': NF_UNCERTAINTY_THRESHOLD,
'uncertainty_mean': round(float(nf_uncertainty.mean()), 6),
'uncertainty_max': round(float(nf_uncertainty.max()), 6),
})
except Exception as exc:
log_info(f"[{name}] NF scoring échoué : {exc} — utilisation EIF seul.")
unknown_traffic['ae_recon_error'] = 0.0
@ -435,12 +462,13 @@ def run_semi_supervised_logic(df, features, name, cycle_id, recurrence_map):
'effective_threshold': round(effective_threshold, 4), 'reason': row.get('reason', '')
})
threats = pd.concat([df for df in [
_threat_dfs = [df for df in [
anomalies if not anomalies.empty else None,
known_bots if not known_bots.empty else None,
anubis_allow if not anubis_allow.empty else None,
anubis_deny if not anubis_deny.empty else None,
] if df is not None], ignore_index=True)
] if df is not None]
threats = pd.concat(_threat_dfs, ignore_index=True) if _threat_dfs else pd.DataFrame()
# Propager campaign_id des anomalies clusterisées vers all_scored
# (all_scored a été capturé avant clustering, ses campaign_id sont tous -1)

View File

@ -12,6 +12,7 @@ Regroupe les fonctions de scoring utilisées par le pipeline de détection :
"""
import numpy as np
import pandas as pd
import torch
from .config import (
ANOMALY_THRESHOLD, ANOMALY_PERCENTILE,
@ -107,7 +108,7 @@ class ADWINDriftMonitor:
for feat, value in feature_means.items():
if feat in self._detectors:
self._detectors[feat].update(value)
detected = self._detectors[feat].detected_change()
detected = self._detectors[feat].drift_detected
changes[feat] = detected
if detected:
self._last_changes[feat] = True

View File

@ -6,17 +6,43 @@ from typing import Any
import clickhouse_connect
from clickhouse_connect.driver.client import Client
from clickhouse_connect.driver.exceptions import DatabaseError
from backend.config import CLICKHOUSE_HOST, CLICKHOUSE_PORT, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD
logger = logging.getLogger(__name__)
_client: Client | None = None
_available: bool | None = None # None = not tested yet
class ClickHouseUnavailable(Exception):
"""Raised when ClickHouse is not reachable."""
def is_available() -> bool:
"""Check ClickHouse connectivity (retries on every call if previously failed)."""
global _client, _available
try:
# Force re-creation if previously marked unavailable
if _available is False:
_client = None
get_client()
_available = True
return True
except Exception:
_available = False
_client = None
logger.warning("ClickHouse unavailable at %s:%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT)
return False
def get_client() -> Client:
"""Return a lazily-initialised ClickHouse client (singleton)."""
global _client
"""Return a lazily-initialised ClickHouse client (singleton).
Resets the singleton on connection failure so the next call retries.
"""
global _client, _available
if _client is None:
_client = clickhouse_connect.get_client(
host=CLICKHOUSE_HOST,
@ -25,9 +51,17 @@ def get_client() -> Client:
password=CLICKHOUSE_PASSWORD,
)
logger.info("Connected to ClickHouse at %s:%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT)
_available = True
return _client
def _mark_unavailable() -> None:
"""Reset client and mark ClickHouse as unavailable."""
global _client, _available
_client = None
_available = False
def _normalise_value(v: Any) -> Any:
"""Convert ClickHouse-specific types to JSON-friendly Python types."""
if isinstance(v, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
@ -41,26 +75,47 @@ def _normalise_value(v: Any) -> Any:
def query(sql: str, params: dict | None = None) -> list[dict[str, Any]]:
"""Execute *sql* and return a list of row-dicts."""
client = get_client()
result = client.query(sql, parameters=params or {})
columns = result.column_names
rows: list[dict[str, Any]] = []
for row in result.result_rows:
rows.append({col: _normalise_value(val) for col, val in zip(columns, row)})
return rows
"""Execute *sql* and return a list of row-dicts.
Raises ClickHouseUnavailable if the server is not reachable.
"""
try:
client = get_client()
result = client.query(sql, parameters=params or {})
columns = result.column_names
rows: list[dict[str, Any]] = []
for row in result.result_rows:
rows.append({col: _normalise_value(val) for col, val in zip(columns, row)})
return rows
except (DatabaseError, ConnectionError, OSError) as exc:
_mark_unavailable()
raise ClickHouseUnavailable(str(exc)) from exc
def query_scalar(sql: str, params: dict | None = None) -> Any:
"""Execute *sql* and return the single scalar value."""
client = get_client()
result = client.query(sql, parameters=params or {})
if result.result_rows:
return _normalise_value(result.result_rows[0][0])
return None
"""Execute *sql* and return the single scalar value.
Raises ClickHouseUnavailable if the server is not reachable.
"""
try:
client = get_client()
result = client.query(sql, parameters=params or {})
if result.result_rows:
return _normalise_value(result.result_rows[0][0])
return None
except (DatabaseError, ConnectionError, OSError) as exc:
_mark_unavailable()
raise ClickHouseUnavailable(str(exc)) from exc
def execute(sql: str, params: dict | None = None) -> None:
"""Execute a DDL / DML statement that returns no rows."""
client = get_client()
client.command(sql, parameters=params or {})
"""Execute a DDL / DML statement that returns no rows.
Raises ClickHouseUnavailable if the server is not reachable.
"""
try:
client = get_client()
client.command(sql, parameters=params or {})
except (DatabaseError, ConnectionError, OSError) as exc:
_mark_unavailable()
raise ClickHouseUnavailable(str(exc)) from exc

View File

@ -4,15 +4,28 @@ from __future__ import annotations
import logging
from fastapi import FastAPI
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from backend.database import ClickHouseUnavailable, is_available
from backend.routes.api import router as api_router
from backend.routes.pages import router as pages_router
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
_templates = Jinja2Templates(directory="backend/templates")
_PAGE_MAP = {
"/": "overview", "/detections": "detections", "/scores": "scores",
"/traffic": "traffic", "/classify": "classify", "/features": "features",
"/models": "models", "/network": "network", "/campaigns": "campaigns",
"/tactics": "tactics", "/reflists": "reflists", "/fleet": "fleet",
"/health": "health", "/browsers": "browsers", "/fingerprints": "fingerprints",
}
app = FastAPI(title="JA4 SOC Dashboard", version="1.0.0")
# CORS — allow all origins for dashboard access
@ -24,6 +37,29 @@ app.add_middleware(
allow_headers=["*"],
)
@app.exception_handler(ClickHouseUnavailable)
async def ch_unavailable_handler(request: Request, exc: ClickHouseUnavailable):
"""Return 503 for API calls, render degraded pages for HTML requests."""
accept = request.headers.get("accept", "")
path = request.url.path
# If the client expects JSON (API call), return 503 JSON
if "application/json" in accept or path.startswith("/api/"):
return JSONResponse(
status_code=503,
content={"detail": "ClickHouse unavailable", "error": str(exc)},
)
# For HTML pages, render the template with ch_available=False
page_name = _PAGE_MAP.get(path, "overview")
return _templates.TemplateResponse(
f"{page_name}.html",
{"request": request, "active_page": page_name, "ch_available": False},
status_code=503,
)
# Static assets
app.mount("/static", StaticFiles(directory="backend/static"), name="static")
@ -32,6 +68,7 @@ app.include_router(api_router)
app.include_router(pages_router)
@app.get("/health")
async def health():
return {"status": "ok"}
@app.get("/api/healthcheck")
async def healthcheck():
ch = is_available()
return {"status": "ok" if ch else "degraded", "clickhouse": "up" if ch else "down"}

View File

@ -13,6 +13,8 @@ from typing import Any
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from backend.database import ClickHouseUnavailable
from backend.config import DB_PROCESSING, DB_LOGS, safe_identifier
from backend.database import query, query_scalar, execute
@ -29,6 +31,17 @@ _SHAP_RE = re.compile(r"(?:SHAP|ExIFFI):\s*(.+?)(?:\s*\|\s*Threat|$)")
_FEAT_RE = re.compile(r"(\w+)\(([+-]?\d+\.\d+)\)")
def _ch_fallback(exc: Exception) -> None:
"""Raise ClickHouseUnavailable for connection errors, re-raise otherwise."""
if isinstance(exc, ClickHouseUnavailable):
raise
# Detect connection-level errors from clickhouse_connect
err_msg = str(exc).lower()
if "connection" in err_msg or "refused" in err_msg or "unavailable" in err_msg:
raise ClickHouseUnavailable(str(exc)) from exc
raise HTTPException(status_code=500, detail=str(exc)) from exc
def _aggregate_shap_importance(reasons: list[str]) -> list[dict]:
"""Agrège les valeurs SHAP/ExIFFI extraites des champs reason."""
totals: dict[str, float] = defaultdict(float)
@ -171,7 +184,7 @@ async def overview() -> dict[str, Any]:
}
except Exception as exc:
logger.exception("overview query failed")
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
# ---------------------------------------------------------------------------
@ -253,7 +266,7 @@ async def detections(
}
except Exception as exc:
logger.exception("detections query failed")
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
# ---------------------------------------------------------------------------
@ -330,7 +343,7 @@ async def scores(
}
except Exception as exc:
logger.exception("scores query failed")
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
# ---------------------------------------------------------------------------
@ -401,7 +414,7 @@ async def traffic(
}
except Exception as exc:
logger.exception("traffic query failed")
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
# ---------------------------------------------------------------------------
@ -470,7 +483,7 @@ async def ip_detail(ip: str) -> dict[str, Any]:
}
except Exception as exc:
logger.exception("ip detail query failed for %s", ip)
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
# ---------------------------------------------------------------------------
@ -932,7 +945,7 @@ async def classify(body: ClassifyRequest) -> dict[str, Any]:
return {"status": "ok", "src_ip": body.src_ip, "classification": body.classification}
except Exception as exc:
logger.exception("classify insert failed")
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
# ---------------------------------------------------------------------------
@ -1403,7 +1416,7 @@ async def ja4_detail(fingerprint: str) -> dict[str, Any]:
}
except Exception as exc:
logger.exception("ja4 detail query failed for %s", fingerprint)
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
# ---------------------------------------------------------------------------
@ -1526,7 +1539,7 @@ async def cluster_detail(cid: int) -> dict[str, Any]:
}
except Exception as exc:
logger.exception("cluster detail query failed for %s", cid)
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
# ═══════════════════════════════════════════════════════════════════════════════
@ -1545,9 +1558,9 @@ async def dictionaries_meta():
"ORDER BY name",
)
return {"dictionaries": rows}
except Exception as exc:
logger.exception("dictionaries meta query failed")
raise HTTPException(status_code=500, detail=str(exc))
except Exception:
logger.debug("dictionaries meta query failed — ClickHouse may be unavailable")
return {"dictionaries": []}
_REFLIST_SORT = {
@ -1640,7 +1653,7 @@ async def reflist(
return {"name": name, "total": total, "limit": limit, "offset": offset, "rows": rows}
except Exception as exc:
logger.exception("reflist query failed for %s", name)
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
@router.get("/reflist/{name}/stats")
@ -1695,34 +1708,48 @@ async def reflist_stats(name: str):
return {"name": name, "total": total, "breakdown": agg}
except Exception as exc:
logger.exception("reflist stats query failed for %s", name)
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
@router.get("/fleet")
async def fleet() -> dict[str, Any]:
"""Détections de flottes JA4×ASN (§5.2)."""
rows = query(
f"SELECT detected_at, community_id, fleet_score, n_ips, ja4_set, asn_set, ip_sample "
f"FROM {_DB}.fleet_detections "
f"WHERE detected_at >= now() - INTERVAL 7 DAY "
f"ORDER BY fleet_score DESC "
f"LIMIT 100"
)
try:
rows = query(
f"SELECT detected_at, community_id, fleet_score, n_ips, ja4_set, asn_set, ip_sample "
f"FROM {_DB}.fleet_detections "
f"WHERE detected_at >= now() - INTERVAL 7 DAY "
f"ORDER BY fleet_score DESC "
f"LIMIT 100"
)
except ClickHouseUnavailable:
raise
except Exception as exc:
_ch_fallback(exc)
rows = []
return {"fleets": rows}
@router.get("/health")
async def health_metrics() -> dict[str, Any]:
"""Métriques de santé du pipeline ML (Étape 9)."""
rows = query(
f"SELECT cycle_at, model_name, total_sessions, correlated_rate, anomaly_rate, "
f" critical_count, high_count, drift_rate, drift_alert, cycle_latency_ms, "
f" features_valid, features_total, baseline_size, meta_learner_active "
f"FROM {_DB}.ml_performance_metrics "
f"WHERE cycle_at >= now() - INTERVAL 7 DAY "
f"ORDER BY cycle_at DESC "
f"LIMIT 500"
)
try:
rows = query(
f"SELECT cycle_at, model_name, total_sessions, correlated_rate, anomaly_rate, "
f" critical_count, high_count, medium_count, low_count, "
f" known_bot_count, anubis_deny_count, legit_browser_count, "
f" drift_rate, drift_alert, cycle_latency_ms, "
f" features_valid, features_total, baseline_size, threshold, meta_learner_active "
f"FROM {_DB}.ml_performance_metrics "
f"WHERE cycle_at >= now() - INTERVAL 7 DAY "
f"ORDER BY cycle_at DESC "
f"LIMIT 500"
)
except ClickHouseUnavailable:
raise
except Exception as exc:
_ch_fallback(exc)
rows = []
# Statistiques de synthèse
if rows:
latest = {r['model_name']: r for r in rows}
@ -1895,9 +1922,9 @@ async def browser_sig_entries() -> dict[str, Any]:
f"ORDER BY browser_family"
)
return {"entries": rows, "total": len(rows), "source": "dict_csv", "readonly": True}
except Exception as exc:
logger.exception("browser_h2 entries fallback failed")
raise HTTPException(status_code=500, detail=str(exc))
except Exception:
logger.debug("browser_h2 entries fallback failed — ClickHouse may be unavailable")
return {"entries": [], "total": 0, "source": "unavailable"}
@router.post("/browser-signatures/entries", status_code=201)
@ -1932,7 +1959,7 @@ async def browser_sig_add(body: BrowserH2Entry) -> dict[str, Any]:
return {"status": "ok", "h2_fingerprint": body.h2_fingerprint.strip()}
except Exception as exc:
logger.exception("browser_h2_signatures insert failed")
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
@router.delete("/browser-signatures/entries")
@ -1953,7 +1980,7 @@ async def browser_sig_delete(fingerprint: str = Query(...)) -> dict[str, Any]:
return {"status": "ok", "deleted": fingerprint.strip()}
except Exception as exc:
logger.exception("browser_h2_signatures delete failed")
raise HTTPException(status_code=500, detail=str(exc))
_ch_fallback(exc)
# ---------------------------------------------------------------------------
@ -2042,8 +2069,8 @@ async def fingerprint_discovery(
{"days": days, "min_hits": min_hits, "lim": limit},
)
except Exception as exc:
logger.exception("fingerprint-discovery query failed")
raise HTTPException(status_code=500, detail=str(exc))
logger.debug("fingerprint-discovery query failed — ClickHouse may be unavailable")
return {"profiles": [], "groups": [], "meta": {"total_ja4": 0, "total_groups": 0, "days": days, "min_hits": min_hits}}
# ── Regroupement par famille navigateur côté Python ──
groups: dict[str, dict[str, Any]] = {}

View File

@ -5,12 +5,14 @@ from __future__ import annotations
from fastapi import APIRouter, Request
from fastapi.templating import Jinja2Templates
from backend.database import is_available
router = APIRouter()
templates = Jinja2Templates(directory="backend/templates")
def _ctx(request: Request, page: str, **extra) -> dict:
return {"request": request, "active_page": page, **extra}
return {"request": request, "active_page": page, "ch_available": is_available(), **extra}
@router.get("/")

View File

@ -219,6 +219,12 @@
{% block header_actions %}{% endblock %}
</div>
</header>
{% if not ch_available %}
<div id="ch-banner" class="bg-amber-900/60 border-b border-amber-600 px-4 py-2 text-amber-200 text-sm flex items-center gap-2">
<svg class="w-4 h-4 shrink-0" fill="none" stroke="currentColor" stroke-width="2" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" d="M12 9v3.75m-9.303 3.376c-.866 1.5.217 3.374 1.948 3.374h14.71c1.73 0 2.813-1.874 1.948-3.374L13.949 3.378c-.866-1.5-3.032-1.5-3.898 0L2.697 16.126zM12 15.75h.008v.008H12v-.008z"/></svg>
<span><strong>ClickHouse indisponible</strong> — Les données ne peuvent pas être chargées. Vérifiez la connexion au serveur.</span>
</div>
{% endif %}
<main class="px-3 py-4 lg:px-5 lg:py-5 xl:px-6">
{% block content %}{% endblock %}
</main>

View File

@ -215,13 +215,23 @@ int capture_tc(struct __sk_buff *ctx)
tls_evt->src_ip = bpf_ntohl(src_ip);
tls_evt->src_port = src_port;
tls_evt->timestamp_ns = bpf_ktime_get_ns();
tls_evt->payload_len = (__u16)avail;
/* Copie via bpf_skb_load_bytes avec taille constante 256.
/* Copie via bpf_skb_load_bytes avec tailles constantes en cascade.
* Kernel 4.18 ne supporte pas les tailles variables vers map values.
* 256 octets capture le ClientHello dans la majorité des cas. */
if (bpf_skb_load_bytes(ctx, payload_off, tls_evt, 256))
* On essaie 512 puis 256 puis 128 pour capturer SNI et extensions.
* La taille réellement copiée est stockée dans payload_len. */
if (payload_off + 512 <= pkt_len) {
bpf_skb_load_bytes(ctx, payload_off, tls_evt, 512);
tls_evt->payload_len = 512;
} else if (payload_off + 256 <= pkt_len) {
bpf_skb_load_bytes(ctx, payload_off, tls_evt, 256);
tls_evt->payload_len = 256;
} else if (payload_off + 128 <= pkt_len) {
bpf_skb_load_bytes(ctx, payload_off, tls_evt, 128);
tls_evt->payload_len = 128;
} else {
return TC_ACT_OK;
}
bpf_perf_event_output(ctx, &pb_tls_hello, BPF_F_CURRENT_CPU,
tls_evt, sizeof(*tls_evt));
@ -269,11 +279,21 @@ int capture_tc(struct __sk_buff *ctx)
h_evt->src_port = src_port;
h_evt->dst_port = dst_port;
h_evt->timestamp_ns = bpf_ktime_get_ns();
h_evt->payload_len = (__u16)avail;
/* Taille constante 256 pour compatibilité vérificateur kernel 4.18 */
if (bpf_skb_load_bytes(ctx, payload_off, h_evt, 256))
/* Copie via bpf_skb_load_bytes avec tailles constantes en cascade.
* Les requêtes HTTP sont souvent < 256 octets, on descend à 128 puis 64. */
if (payload_off + 256 <= pkt_len) {
bpf_skb_load_bytes(ctx, payload_off, h_evt, 256);
h_evt->payload_len = 256;
} else if (payload_off + 128 <= pkt_len) {
bpf_skb_load_bytes(ctx, payload_off, h_evt, 128);
h_evt->payload_len = 128;
} else if (payload_off + 64 <= pkt_len) {
bpf_skb_load_bytes(ctx, payload_off, h_evt, 64);
h_evt->payload_len = 64;
} else {
return TC_ACT_OK;
}
bpf_perf_event_output(ctx, &pb_http_plain, BPF_F_CURRENT_CPU,
h_evt, sizeof(*h_evt));

View File

@ -315,7 +315,9 @@ func consumeSynEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
// src_ip et src_port stockés en host byte order (bpf_ntohl/bpf_ntohs dans BPF C).
srcIPRaw := binary.LittleEndian.Uint32(data[0:4])
dstIPRaw := binary.LittleEndian.Uint32(data[4:8])
srcPort := binary.LittleEndian.Uint16(data[8:10])
dstPort := binary.LittleEndian.Uint16(data[10:12])
var key correlation.SessionKey
key.SrcIP[0] = byte(srcIPRaw >> 24)
@ -324,6 +326,12 @@ func consumeSynEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
key.SrcIP[3] = byte(srcIPRaw)
key.SrcPort = srcPort
var dstIP [4]byte
dstIP[0] = byte(dstIPRaw >> 24)
dstIP[1] = byte(dstIPRaw >> 16)
dstIP[2] = byte(dstIPRaw >> 8)
dstIP[3] = byte(dstIPRaw)
// Champs IP/TCP aux offsets corrects (dst_ip occupe les octets 4-7)
ttl := data[12]
dfBit := data[13] != 0
@ -342,6 +350,8 @@ func consumeSynEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
mgr.Update(key, func(s *correlation.SessionState) {
s.L3L4 = &correlation.L3L4{
DstIP: dstIP,
DstPort: dstPort,
TTL: ttl,
DFBit: dfBit,
IPID: ipID,
@ -420,6 +430,17 @@ func consumeTLSEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
ciphers = ch.CipherSuites
alpn = ch.ALPN
// Déterminer la version TLS la plus haute (comme ComputeJA4)
var tlsVer uint16
for _, v := range ch.SupportedVersions {
if !parser.IsGREASE(v) && v > tlsVer {
tlsVer = v
}
}
if tlsVer == 0 {
tlsVer = ch.HandshakeVersion
}
mgr.Update(key, func(s *correlation.SessionState) {
s.TLS = &correlation.TLSInfo{
ClientHelloRaw: payload,
@ -428,6 +449,7 @@ func consumeTLSEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
ALPN: alpn,
CipherSuites: ciphers,
Extensions: extensions,
TLSVersion: tlsVer,
Timestamp: time.Now(),
}
// Corréler si L3/L4 est déjà présent
@ -540,6 +562,7 @@ func consumeSSLEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
}
}
if len(s.Requests) == 0 {
req.HTTPVersion = "HTTP/2"
s.Requests = append(s.Requests, req)
}
_ = s.TLS // corrélation implicite
@ -559,8 +582,11 @@ func consumeSSLEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
Method: req.Method,
Path: req.Path,
QueryString: req.Query,
Host: req.HeaderKV["Host"],
HeaderOrder: req.Headers,
HeaderOrderSig: req.HeaderSig,
HeaderKV: req.HeaderKV,
HTTPVersion: req.Protocol,
})
_ = s.TLS // corrélation implicite
})
@ -699,8 +725,11 @@ func consumeHTTPPlainEvents(ctx context.Context, rd *perf.Reader, mgr *correlati
Method: req.Method,
Path: req.Path,
QueryString: req.Query,
Host: req.HeaderKV["Host"],
HeaderOrder: req.Headers,
HeaderOrderSig: req.HeaderSig,
HeaderKV: req.HeaderKV,
HTTPVersion: req.Protocol,
})
// Corréler si L3/L4 est déjà présent (TCP SYN capturé)
_ = s.L3L4 // corrélation implicite

View File

@ -15,6 +15,8 @@ type SessionKey struct {
// L3L4 contient les caractéristiques réseau et transport de la connexion.
type L3L4 struct {
DstIP [4]byte // adresse IP destination
DstPort uint16 // port destination
TTL uint8 // TTL IP observé dans le SYN
DFBit bool // bit Don't Fragment actif
IPID uint16 // champ identification IP
@ -52,16 +54,19 @@ type HTTP2Settings struct {
// HTTPRequest représente une requête HTTP observée dans la session.
type HTTPRequest struct {
Method string // méthode HTTP (GET, POST, etc.)
Path string // chemin de la requête
QueryString string // paramètres de requête
StatusCode int // code de statut de la réponse
ResponseSize int64 // taille de la réponse en octets
DurationMS float64 // durée de traitement en millisecondes
HeaderOrder []string // ordre exact des en-têtes HTTP bruts
HeaderOrderSig string // signature de l'ordre des en-têtes (hash)
HTTP2Settings *HTTP2Settings // non nil uniquement pour HTTP/2
Timestamp time.Time // horodatage de la requête
Method string // méthode HTTP (GET, POST, etc.)
Path string // chemin de la requête
QueryString string // paramètres de requête
Host string // en-tête Host (ou :authority pour H2)
StatusCode int // code de statut de la réponse
ResponseSize int64 // taille de la réponse en octets
DurationMS float64 // durée de traitement en millisecondes
HeaderOrder []string // ordre exact des en-têtes HTTP bruts
HeaderOrderSig string // signature de l'ordre des en-têtes (hash)
HeaderKV map[string]string // valeurs des en-têtes capturés (User-Agent, etc.)
HTTPVersion string // "HTTP/1.1", "HTTP/2", etc.
HTTP2Settings *HTTP2Settings // non nil uniquement pour HTTP/2
Timestamp time.Time // horodatage de la requête
}
// SessionState représente l'état complet d'une connexion TCP corrélée.

View File

@ -25,18 +25,47 @@ type Ja4SslAcceptKey struct {
Fd uint32
}
type Ja4SslHttpPlainEvent struct {
Payload [4096]uint8
SrcIp uint32
DstIp uint32
SrcPort uint16
DstPort uint16
PayloadLen uint16
TimestampNs uint64
}
type Ja4SslSslConnInfo struct {
Fd uint32
SrcIp uint32
SrcPort uint16
}
type Ja4SslSslDataEvent struct {
PidTgid uint64
Fd uint32
SrcIp uint32
SrcPort uint16
Data [4096]uint8
DataLen uint32
TimestampNs uint64
Direction uint8
}
type Ja4SslSslReadArgs struct {
SslPtr uint64
BufPtr uint64
Num uint32
}
type Ja4SslTlsHelloEvent struct {
Payload [2048]uint8
SrcIp uint32
SrcPort uint16
PayloadLen uint16
TimestampNs uint64
}
// LoadJa4Ssl returns the embedded CollectionSpec for Ja4Ssl.
func LoadJa4Ssl() (*ebpf.CollectionSpec, error) {
reader := bytes.NewReader(_Ja4SslBytes)
@ -89,14 +118,17 @@ type Ja4SslProgramSpecs struct {
//
// It can be passed ebpf.CollectionSpec.Assign.
type Ja4SslMapSpecs struct {
HttpBuf *ebpf.MapSpec `ebpf:"__http_buf"`
SslBuf *ebpf.MapSpec `ebpf:"__ssl_buf"`
TlsBuf *ebpf.MapSpec `ebpf:"__tls_buf"`
AcceptArgsMap *ebpf.MapSpec `ebpf:"accept_args_map"`
AcceptMap *ebpf.MapSpec `ebpf:"accept_map"`
FdConnMap *ebpf.MapSpec `ebpf:"fd_conn_map"`
RbAccept *ebpf.MapSpec `ebpf:"rb_accept"`
RbHttpPlain *ebpf.MapSpec `ebpf:"rb_http_plain"`
RbSslData *ebpf.MapSpec `ebpf:"rb_ssl_data"`
RbTcpSyn *ebpf.MapSpec `ebpf:"rb_tcp_syn"`
RbTlsHello *ebpf.MapSpec `ebpf:"rb_tls_hello"`
PbAccept *ebpf.MapSpec `ebpf:"pb_accept"`
PbHttpPlain *ebpf.MapSpec `ebpf:"pb_http_plain"`
PbSslData *ebpf.MapSpec `ebpf:"pb_ssl_data"`
PbTcpSyn *ebpf.MapSpec `ebpf:"pb_tcp_syn"`
PbTlsHello *ebpf.MapSpec `ebpf:"pb_tls_hello"`
SslArgsMap *ebpf.MapSpec `ebpf:"ssl_args_map"`
SslConnMap *ebpf.MapSpec `ebpf:"ssl_conn_map"`
}
@ -120,28 +152,34 @@ func (o *Ja4SslObjects) Close() error {
//
// It can be passed to LoadJa4SslObjects or ebpf.CollectionSpec.LoadAndAssign.
type Ja4SslMaps struct {
HttpBuf *ebpf.Map `ebpf:"__http_buf"`
SslBuf *ebpf.Map `ebpf:"__ssl_buf"`
TlsBuf *ebpf.Map `ebpf:"__tls_buf"`
AcceptArgsMap *ebpf.Map `ebpf:"accept_args_map"`
AcceptMap *ebpf.Map `ebpf:"accept_map"`
FdConnMap *ebpf.Map `ebpf:"fd_conn_map"`
RbAccept *ebpf.Map `ebpf:"rb_accept"`
RbHttpPlain *ebpf.Map `ebpf:"rb_http_plain"`
RbSslData *ebpf.Map `ebpf:"rb_ssl_data"`
RbTcpSyn *ebpf.Map `ebpf:"rb_tcp_syn"`
RbTlsHello *ebpf.Map `ebpf:"rb_tls_hello"`
PbAccept *ebpf.Map `ebpf:"pb_accept"`
PbHttpPlain *ebpf.Map `ebpf:"pb_http_plain"`
PbSslData *ebpf.Map `ebpf:"pb_ssl_data"`
PbTcpSyn *ebpf.Map `ebpf:"pb_tcp_syn"`
PbTlsHello *ebpf.Map `ebpf:"pb_tls_hello"`
SslArgsMap *ebpf.Map `ebpf:"ssl_args_map"`
SslConnMap *ebpf.Map `ebpf:"ssl_conn_map"`
}
func (m *Ja4SslMaps) Close() error {
return _Ja4SslClose(
m.HttpBuf,
m.SslBuf,
m.TlsBuf,
m.AcceptArgsMap,
m.AcceptMap,
m.FdConnMap,
m.RbAccept,
m.RbHttpPlain,
m.RbSslData,
m.RbTcpSyn,
m.RbTlsHello,
m.PbAccept,
m.PbHttpPlain,
m.PbSslData,
m.PbTcpSyn,
m.PbTlsHello,
m.SslArgsMap,
m.SslConnMap,
)

View File

@ -25,18 +25,47 @@ type Ja4TcAcceptKey struct {
Fd uint32
}
type Ja4TcHttpPlainEvent struct {
Payload [4096]uint8
SrcIp uint32
DstIp uint32
SrcPort uint16
DstPort uint16
PayloadLen uint16
TimestampNs uint64
}
type Ja4TcSslConnInfo struct {
Fd uint32
SrcIp uint32
SrcPort uint16
}
type Ja4TcSslDataEvent struct {
PidTgid uint64
Fd uint32
SrcIp uint32
SrcPort uint16
Data [4096]uint8
DataLen uint32
TimestampNs uint64
Direction uint8
}
type Ja4TcSslReadArgs struct {
SslPtr uint64
BufPtr uint64
Num uint32
}
type Ja4TcTlsHelloEvent struct {
Payload [2048]uint8
SrcIp uint32
SrcPort uint16
PayloadLen uint16
TimestampNs uint64
}
// LoadJa4Tc returns the embedded CollectionSpec for Ja4Tc.
func LoadJa4Tc() (*ebpf.CollectionSpec, error) {
reader := bytes.NewReader(_Ja4TcBytes)
@ -78,22 +107,26 @@ type Ja4TcSpecs struct {
//
// It can be passed ebpf.CollectionSpec.Assign.
type Ja4TcProgramSpecs struct {
CaptureXdp *ebpf.ProgramSpec `ebpf:"capture_xdp"`
CaptureTc *ebpf.ProgramSpec `ebpf:"capture_tc"`
}
// Ja4TcMapSpecs contains maps before they are loaded into the kernel.
//
// It can be passed ebpf.CollectionSpec.Assign.
type Ja4TcMapSpecs struct {
HttpBuf *ebpf.MapSpec `ebpf:"__http_buf"`
SslBuf *ebpf.MapSpec `ebpf:"__ssl_buf"`
TlsBuf *ebpf.MapSpec `ebpf:"__tls_buf"`
AcceptMap *ebpf.MapSpec `ebpf:"accept_map"`
FdConnMap *ebpf.MapSpec `ebpf:"fd_conn_map"`
RbAccept *ebpf.MapSpec `ebpf:"rb_accept"`
RbHttpPlain *ebpf.MapSpec `ebpf:"rb_http_plain"`
RbSslData *ebpf.MapSpec `ebpf:"rb_ssl_data"`
RbTcpSyn *ebpf.MapSpec `ebpf:"rb_tcp_syn"`
RbTlsHello *ebpf.MapSpec `ebpf:"rb_tls_hello"`
PbAccept *ebpf.MapSpec `ebpf:"pb_accept"`
PbHttpPlain *ebpf.MapSpec `ebpf:"pb_http_plain"`
PbSslData *ebpf.MapSpec `ebpf:"pb_ssl_data"`
PbTcpSyn *ebpf.MapSpec `ebpf:"pb_tcp_syn"`
PbTlsHello *ebpf.MapSpec `ebpf:"pb_tls_hello"`
SslArgsMap *ebpf.MapSpec `ebpf:"ssl_args_map"`
SslConnMap *ebpf.MapSpec `ebpf:"ssl_conn_map"`
TcStats *ebpf.MapSpec `ebpf:"tc_stats"`
}
// Ja4TcObjects contains all objects after they have been loaded into the kernel.
@ -115,28 +148,36 @@ func (o *Ja4TcObjects) Close() error {
//
// It can be passed to LoadJa4TcObjects or ebpf.CollectionSpec.LoadAndAssign.
type Ja4TcMaps struct {
HttpBuf *ebpf.Map `ebpf:"__http_buf"`
SslBuf *ebpf.Map `ebpf:"__ssl_buf"`
TlsBuf *ebpf.Map `ebpf:"__tls_buf"`
AcceptMap *ebpf.Map `ebpf:"accept_map"`
FdConnMap *ebpf.Map `ebpf:"fd_conn_map"`
RbAccept *ebpf.Map `ebpf:"rb_accept"`
RbHttpPlain *ebpf.Map `ebpf:"rb_http_plain"`
RbSslData *ebpf.Map `ebpf:"rb_ssl_data"`
RbTcpSyn *ebpf.Map `ebpf:"rb_tcp_syn"`
RbTlsHello *ebpf.Map `ebpf:"rb_tls_hello"`
PbAccept *ebpf.Map `ebpf:"pb_accept"`
PbHttpPlain *ebpf.Map `ebpf:"pb_http_plain"`
PbSslData *ebpf.Map `ebpf:"pb_ssl_data"`
PbTcpSyn *ebpf.Map `ebpf:"pb_tcp_syn"`
PbTlsHello *ebpf.Map `ebpf:"pb_tls_hello"`
SslArgsMap *ebpf.Map `ebpf:"ssl_args_map"`
SslConnMap *ebpf.Map `ebpf:"ssl_conn_map"`
TcStats *ebpf.Map `ebpf:"tc_stats"`
}
func (m *Ja4TcMaps) Close() error {
return _Ja4TcClose(
m.HttpBuf,
m.SslBuf,
m.TlsBuf,
m.AcceptMap,
m.FdConnMap,
m.RbAccept,
m.RbHttpPlain,
m.RbSslData,
m.RbTcpSyn,
m.RbTlsHello,
m.PbAccept,
m.PbHttpPlain,
m.PbSslData,
m.PbTcpSyn,
m.PbTlsHello,
m.SslArgsMap,
m.SslConnMap,
m.TcStats,
)
}
@ -144,12 +185,12 @@ func (m *Ja4TcMaps) Close() error {
//
// It can be passed to LoadJa4TcObjects or ebpf.CollectionSpec.LoadAndAssign.
type Ja4TcPrograms struct {
CaptureXdp *ebpf.Program `ebpf:"capture_xdp"`
CaptureTc *ebpf.Program `ebpf:"capture_tc"`
}
func (p *Ja4TcPrograms) Close() error {
return _Ja4TcClose(
p.CaptureXdp,
p.CaptureTc,
)
}

View File

@ -8,12 +8,13 @@ import (
// HTTP1Request représente une requête HTTP/1.x parsée depuis le flux déchiffré.
type HTTP1Request struct {
Method string // méthode HTTP (GET, POST, …)
Path string // chemin (sans query string)
Query string // query string (sans le '?')
Protocol string // "HTTP/1.0" ou "HTTP/1.1"
Headers []string // noms des en-têtes dans l'ordre exact d'arrivée
HeaderSig string // signature : noms joints par ";"
Method string // méthode HTTP (GET, POST, …)
Path string // chemin (sans query string)
Query string // query string (sans le '?')
Protocol string // "HTTP/1.0" ou "HTTP/1.1"
Headers []string // noms des en-têtes dans l'ordre exact d'arrivée
HeaderSig string // signature : noms joints par ";"
HeaderKV map[string]string // valeurs des en-têtes clés (Host, User-Agent, etc.)
}
// HTTP1Response représente le début d'une réponse HTTP/1.x (status line).
@ -27,6 +28,14 @@ var knownMethods = []string{
"OPTIONS", "PATCH", "CONNECT", "TRACE",
}
// capturedHeaders est la liste des en-têtes dont on capture la valeur.
var capturedHeaders = []string{
"Host", "User-Agent", "Accept", "Accept-Encoding", "Accept-Language",
"Content-Type", "X-Request-Id", "X-Trace-Id", "X-Forwarded-For",
"Sec-CH-UA", "Sec-CH-UA-Mobile", "Sec-CH-UA-Platform",
"Sec-Fetch-Dest", "Sec-Fetch-Mode", "Sec-Fetch-Site",
}
// IsHTTP1Request retourne true si les premiers octets ressemblent à une
// requête HTTP/1.x (commence par une méthode reconnue suivi d'un espace).
func IsHTTP1Request(data []byte) bool {
@ -91,8 +100,9 @@ func ParseHTTP1Request(data []byte) *HTTP1Request {
query = rawPath[idx+1:]
}
// Extraire les noms d'en-têtes dans l'ordre
// Extraire les noms d'en-têtes dans l'ordre + capturer les valeurs clés
headers := make([]string, 0, len(lines)-1)
headerKV := make(map[string]string, len(capturedHeaders))
for _, line := range lines[1:] {
if line == "" {
break
@ -101,6 +111,13 @@ func ParseHTTP1Request(data []byte) *HTTP1Request {
name := strings.TrimSpace(line[:colon])
if name != "" {
headers = append(headers, name)
// Capturer la valeur si c'est un header d'intérêt
for _, key := range capturedHeaders {
if strings.EqualFold(name, key) {
headerKV[key] = strings.TrimSpace(line[colon+1:])
break
}
}
}
}
}
@ -114,6 +131,7 @@ func ParseHTTP1Request(data []byte) *HTTP1Request {
Protocol: protocol,
Headers: headers,
HeaderSig: sig,
HeaderKV: headerKV,
}
}
@ -143,4 +161,4 @@ func ParseHTTP1Response(data []byte) *HTTP1Response {
return nil
}
return &HTTP1Response{StatusCode: code}
}
}

View File

@ -247,9 +247,9 @@ func parseSupportedVersions(data []byte) []uint16 {
return versions
}
// isGREASE vérifie si une valeur est une valeur GREASE (RFC 8701).
// IsGREASE vérifie si une valeur est une valeur GREASE (RFC 8701).
// Les valeurs GREASE suivent le motif 0x?A?A (ex: 0x0A0A, 0x1A1A, ...).
func isGREASE(v uint16) bool {
func IsGREASE(v uint16) bool {
return v&0x0F0F == 0x0A0A && v>>8 == v&0xFF
}
@ -279,7 +279,7 @@ func ComputeJA4(ch *ClientHello) string {
// --- Version TLS : version la plus haute annoncée ---
var tlsVer uint16
for _, v := range ch.SupportedVersions {
if !isGREASE(v) && v > tlsVer {
if !IsGREASE(v) && v > tlsVer {
tlsVer = v
}
}
@ -298,7 +298,7 @@ func ComputeJA4(ch *ClientHello) string {
// --- Comptage des cipher suites (sans GREASE) ---
var ciphers []uint16
for _, cs := range ch.CipherSuites {
if !isGREASE(cs) {
if !IsGREASE(cs) {
ciphers = append(ciphers, cs)
}
}
@ -307,7 +307,7 @@ func ComputeJA4(ch *ClientHello) string {
// --- Comptage des extensions (sans GREASE, sans SNI 0x0000) ---
var extensions []uint16
for _, ext := range ch.Extensions {
if isGREASE(ext.Type) {
if IsGREASE(ext.Type) {
continue
}
if ext.Type == 0x0000 { // SNI exclue du comptage

View File

@ -52,14 +52,32 @@ type sessionRecord struct {
TLSVersion string `json:"tls_version,omitempty"`
// HTTP
Method string `json:"method,omitempty"`
Path string `json:"path,omitempty"`
QueryString string `json:"query_string,omitempty"`
StatusCode *int `json:"status_code,omitempty"`
ResponseSize *int64 `json:"response_size,omitempty"`
DurationMS *float64 `json:"duration_ms,omitempty"`
KeepAlives int `json:"keepalives,omitempty"`
HeaderOrderSig string `json:"header_order_signature,omitempty"`
Method string `json:"method,omitempty"`
Path string `json:"path,omitempty"`
Host string `json:"host,omitempty"`
QueryString string `json:"query_string,omitempty"`
Scheme string `json:"scheme,omitempty"`
HTTPVersion string `json:"http_version,omitempty"`
StatusCode *int `json:"status_code,omitempty"`
ResponseSize *int64 `json:"response_size,omitempty"`
DurationMS *float64 `json:"duration_ms,omitempty"`
KeepAlives int `json:"keepalives,omitempty"`
HeaderOrderSig string `json:"header_order_signature,omitempty"`
HeadersRaw string `json:"headers_raw,omitempty"`
HeaderUserAgent string `json:"header_User-Agent,omitempty"`
HeaderAccept string `json:"header_Accept,omitempty"`
HeaderAcceptEnc string `json:"header_Accept-Encoding,omitempty"`
HeaderAcceptLang string `json:"header_Accept-Language,omitempty"`
HeaderContentType string `json:"header_Content-Type,omitempty"`
HeaderXReqID string `json:"header_X-Request-Id,omitempty"`
HeaderXTraceID string `json:"header_X-Trace-Id,omitempty"`
HeaderXForwarded string `json:"header_X-Forwarded-For,omitempty"`
HeaderSecCHUA string `json:"header_Sec-CH-UA,omitempty"`
HeaderSecCHUAMobile string `json:"header_Sec-CH-UA-Mobile,omitempty"`
HeaderSecCHUAPlat string `json:"header_Sec-CH-UA-Platform,omitempty"`
HeaderSecFetchDest string `json:"header_Sec-Fetch-Dest,omitempty"`
HeaderSecFetchMode string `json:"header_Sec-Fetch-Mode,omitempty"`
HeaderSecFetchSite string `json:"header_Sec-Fetch-Site,omitempty"`
// HTTP/2 fingerprinting passif
H2Fingerprint string `json:"h2_fingerprint,omitempty"`
@ -67,13 +85,13 @@ type sessionRecord struct {
H2WindowUpdate uint32 `json:"h2_window_update,omitempty"`
H2PseudoOrder string `json:"h2_pseudo_order,omitempty"`
H2HasPriority uint8 `json:"h2_has_priority,omitempty"`
H2HeaderTableSize int32 `json:"h2_header_table_size"`
H2EnablePush int32 `json:"h2_enable_push"`
H2MaxConcurrentStreams int32 `json:"h2_max_concurrent_streams"`
H2InitialWindowSize int64 `json:"h2_initial_window_size"`
H2MaxFrameSize int32 `json:"h2_max_frame_size"`
H2MaxHeaderListSize int32 `json:"h2_max_header_list_size"`
H2EnableConnectProtocol int32 `json:"h2_enable_connect_protocol"`
H2HeaderTableSize *int32 `json:"h2_header_table_size,omitempty"`
H2EnablePush *int32 `json:"h2_enable_push,omitempty"`
H2MaxConcurrentStreams *int32 `json:"h2_max_concurrent_streams,omitempty"`
H2InitialWindowSize *int64 `json:"h2_initial_window_size,omitempty"`
H2MaxFrameSize *int32 `json:"h2_max_frame_size,omitempty"`
H2MaxHeaderListSize *int32 `json:"h2_max_header_list_size,omitempty"`
H2EnableConnectProtocol *int32 `json:"h2_enable_connect_protocol,omitempty"`
}
// NewClickHouseWriter crée un writer et établit la connexion ClickHouse.
@ -199,19 +217,26 @@ func sessionToRecord(s *correlation.SessionState) sessionRecord {
Time: s.FirstSeen,
SrcIP: srcIP,
SrcPort: int(s.Key.SrcPort),
DstIP: "0.0.0.0", // destination non capturée par les sondes eBPF actuelles
DstPort: 0,
KeepAlives: len(s.Requests),
}
// Champs métadonnées IP/TCP
if s.L3L4 != nil {
rec.DstIP = fmt.Sprintf("%d.%d.%d.%d",
s.L3L4.DstIP[0], s.L3L4.DstIP[1], s.L3L4.DstIP[2], s.L3L4.DstIP[3])
rec.DstPort = int(s.L3L4.DstPort)
rec.IPMetaDF = &s.L3L4.DFBit
rec.IPMetaID = &s.L3L4.IPID
rec.IPMetaTTL = &s.L3L4.TTL
rec.TCPMetaWindowSize = &s.L3L4.WindowSize
rec.TCPMetaWindowScale = &s.L3L4.WindowScale
rec.TCPMetaMSS = &s.L3L4.MSS
// WindowScale 0xFF = absent (convention C), ne pas inclure
if s.L3L4.WindowScale != 0xFF {
rec.TCPMetaWindowScale = &s.L3L4.WindowScale
}
// MSS 0 = absent, ne pas inclure
if s.L3L4.MSS > 0 {
rec.TCPMetaMSS = &s.L3L4.MSS
}
}
// Champs TLS
@ -220,6 +245,14 @@ func sessionToRecord(s *correlation.SessionState) sessionRecord {
rec.TLSSNI = s.TLS.SNI
rec.TLSALPN = strings.Join(s.TLS.ALPN, ",")
rec.TLSVersion = formatTLSVersion(s.TLS.TLSVersion)
// Fallback : si pas de Host HTTP, utiliser TLS SNI
if rec.Host == "" && s.TLS.SNI != "" {
rec.Host = s.TLS.SNI
}
// Scheme déduit de la présence TLS
if s.TLS.SNI != "" {
rec.Scheme = "https"
}
}
// Champs HTTP (dernière requête)
@ -228,11 +261,37 @@ func sessionToRecord(s *correlation.SessionState) sessionRecord {
rec.Method = last.Method
rec.Path = last.Path
rec.QueryString = last.QueryString
rec.Host = last.Host
rec.Scheme = "" // sera rempli par le dispatcher si TLS
rec.HTTPVersion = last.HTTPVersion
rec.StatusCode = &last.StatusCode
rec.ResponseSize = &last.ResponseSize
rec.DurationMS = &last.DurationMS
rec.HeaderOrderSig = last.HeaderOrderSig
// En-têtes HTTP individuels
if last.HeaderKV != nil {
rec.HeaderUserAgent = last.HeaderKV["User-Agent"]
rec.HeaderAccept = last.HeaderKV["Accept"]
rec.HeaderAcceptEnc = last.HeaderKV["Accept-Encoding"]
rec.HeaderAcceptLang = last.HeaderKV["Accept-Language"]
rec.HeaderContentType = last.HeaderKV["Content-Type"]
rec.HeaderXReqID = last.HeaderKV["X-Request-Id"]
rec.HeaderXTraceID = last.HeaderKV["X-Trace-Id"]
rec.HeaderXForwarded = last.HeaderKV["X-Forwarded-For"]
rec.HeaderSecCHUA = last.HeaderKV["Sec-CH-UA"]
rec.HeaderSecCHUAMobile = last.HeaderKV["Sec-CH-UA-Mobile"]
rec.HeaderSecCHUAPlat = last.HeaderKV["Sec-CH-UA-Platform"]
rec.HeaderSecFetchDest = last.HeaderKV["Sec-Fetch-Dest"]
rec.HeaderSecFetchMode = last.HeaderKV["Sec-Fetch-Mode"]
rec.HeaderSecFetchSite = last.HeaderKV["Sec-Fetch-Site"]
}
// Construire headers_raw : ordre des noms joints par ";"
if len(last.HeaderOrder) > 0 {
rec.HeadersRaw = strings.Join(last.HeaderOrder, ";")
}
// Champs HTTP/2 passifs
if last.HTTP2Settings != nil {
h2 := last.HTTP2Settings
@ -243,13 +302,14 @@ func sessionToRecord(s *correlation.SessionState) sessionRecord {
rec.H2PseudoOrder = pseudoOrderToShort(h2.PseudoHeaderOrder)
}
// Paramètres SETTINGS individuels (-1 = absent)
rec.H2HeaderTableSize = h2.HeaderTableSize
rec.H2EnablePush = h2.EnablePush
rec.H2MaxConcurrentStreams = h2.MaxConcurrentStreams
rec.H2InitialWindowSize = int64(h2.InitialWindowSize)
rec.H2MaxFrameSize = h2.MaxFrameSize
rec.H2MaxHeaderListSize = h2.MaxHeaderListSize
// Paramètres SETTINGS individuels (pointeurs : nil = absent du preface)
rec.H2HeaderTableSize = &h2.HeaderTableSize
rec.H2EnablePush = &h2.EnablePush
rec.H2MaxConcurrentStreams = &h2.MaxConcurrentStreams
h2InitWin := int64(h2.InitialWindowSize)
rec.H2InitialWindowSize = &h2InitWin
rec.H2MaxFrameSize = &h2.MaxFrameSize
rec.H2MaxHeaderListSize = &h2.MaxHeaderListSize
// Fingerprints composites Akamai
rec.H2Fingerprint = buildH2Fingerprint(h2)