diff --git a/docs/services/dashboard.md b/docs/services/dashboard.md index ac5584a..cc7837c 100644 --- a/docs/services/dashboard.md +++ b/docs/services/dashboard.md @@ -1,6 +1,6 @@ # Dashboard -Application web SOC (Security Operations Center) construite avec **FastAPI + Jinja2 + htmx**, +Application web SOC (Security Operations Center) construite avec **FastAPI + Jinja2 + ECharts**, offrant la visualisation en temps réel, l'investigation et l'analyse des détections de bots générées par le [bot-detector](bot-detector.md). Interroge ClickHouse sur deux bases de données (`ja4_processing` et `ja4_logs`). @@ -13,8 +13,8 @@ générées par le [bot-detector](bot-detector.md). Interroge ClickHouse sur deu |-----------|-------------| | Backend | Python 3.11 + FastAPI | | Templates | Jinja2 (rendu côté serveur) | -| Interactions dynamiques | htmx (mises à jour partielles via JSON API) | -| Graphiques | Chart.js + ECharts | +| Interactions dynamiques | Vanilla `fetch()` (appels JSON API avec rechargement partiel côté JS) | +| Graphiques | ECharts 5.5 (CDN) | | Style | Tailwind CSS (CDN) | | Base de données | ClickHouse via `clickhouse-connect` (client propre, **PAS** `ja4_common`) | | Documentation API | Swagger UI (`/docs`) + OpenAPI JSON (`/openapi.json`) | diff --git a/services/bot-detector/bot_detector/models.py b/services/bot-detector/bot_detector/models.py index 681bc38..59f891e 100644 --- a/services/bot-detector/bot_detector/models.py +++ b/services/bot-detector/bot_detector/models.py @@ -89,8 +89,9 @@ class TrafficAutoEncoder: self._scaler_range = None def _build_model(self): - dim1 = min(64, max(self.n_features, self.latent_dim + 4)) - dim2 = min(32, max(dim1 // 2, self.latent_dim + 2)) + # Architecture fixe n→64→32→16→32→64→n (§2.4.3 thèse) + dim1 = 64 + dim2 = 32 self.encoder = nn.Sequential( nn.Linear(self.n_features, dim1), nn.BatchNorm1d(dim1), nn.ReLU(), nn.Linear(dim1, dim2), nn.BatchNorm1d(dim2), nn.ReLU(), diff --git a/services/correlator/internal/adapters/inbound/unixsocket/source.go b/services/correlator/internal/adapters/inbound/unixsocket/source.go index 55f663a..7afc667 100644 --- a/services/correlator/internal/adapters/inbound/unixsocket/source.go +++ b/services/correlator/internal/adapters/inbound/unixsocket/source.go @@ -255,9 +255,13 @@ func parseJSONEvent(data []byte, sourceType string) (*domain.NormalizedEvent, er // Extract timestamp based on source contract switch event.Source { case domain.SourceA: - ts, ok := getInt64(raw, "timestamp") + ts, ok := getInt64(raw, "timestamp_ns") if !ok { - return nil, fmt.Errorf("missing required numeric field: timestamp for source A") + // Fallback to legacy "timestamp" field name + ts, ok = getInt64(raw, "timestamp") + } + if !ok { + return nil, fmt.Errorf("missing required numeric field: timestamp/timestamp_ns for source A") } // Assume nanoseconds event.Timestamp = time.Unix(0, ts) diff --git a/services/correlator/internal/adapters/outbound/clickhouse/sink_test.go b/services/correlator/internal/adapters/outbound/clickhouse/sink_test.go index e4872c8..56e8f6f 100644 --- a/services/correlator/internal/adapters/outbound/clickhouse/sink_test.go +++ b/services/correlator/internal/adapters/outbound/clickhouse/sink_test.go @@ -196,7 +196,7 @@ func TestClickHouseSink_BufferManagement(t *testing.T) { log := domain.CorrelatedLog{ SrcIP: "192.168.1.1", SrcPort: 8080, - Correlated: true, + Correlated: 1, } s := &ClickHouseSink{ @@ -527,7 +527,7 @@ func BenchmarkClickHouseSink_Write(b *testing.B) { Timestamp: time.Now(), SrcIP: "192.168.1.1", SrcPort: 8080, - Correlated: true, + Correlated: 1, } ctx := context.Background() diff --git a/services/correlator/internal/adapters/outbound/file/sink_test.go b/services/correlator/internal/adapters/outbound/file/sink_test.go index 9168d45..827f657 100644 --- a/services/correlator/internal/adapters/outbound/file/sink_test.go +++ b/services/correlator/internal/adapters/outbound/file/sink_test.go @@ -22,7 +22,7 @@ func TestFileSink_Write(t *testing.T) { log := domain.CorrelatedLog{ SrcIP: "192.168.1.1", SrcPort: 8080, - Correlated: true, + Correlated: 1, } if err := sink.Write(context.Background(), log); err != nil { @@ -57,7 +57,7 @@ func TestFileSink_WriteImmediatePersist_NoFlushNeeded(t *testing.T) { log := domain.CorrelatedLog{ SrcIP: "192.168.1.1", SrcPort: 8080, - Correlated: true, + Correlated: 1, } if err := sink.Write(context.Background(), log); err != nil { diff --git a/services/correlator/internal/adapters/outbound/stdout/sink_test.go b/services/correlator/internal/adapters/outbound/stdout/sink_test.go index 1b5ccd2..ced00e4 100644 --- a/services/correlator/internal/adapters/outbound/stdout/sink_test.go +++ b/services/correlator/internal/adapters/outbound/stdout/sink_test.go @@ -10,7 +10,7 @@ import ( "github.com/antitbone/ja4/correlator/internal/domain" ) -func makeLog(correlated bool) domain.CorrelatedLog { +func makeLog(correlated int) domain.CorrelatedLog { return domain.CorrelatedLog{ Timestamp: time.Unix(1700000000, 0), SrcIP: "1.2.3.4", @@ -53,10 +53,10 @@ func TestStdoutSink_WriteDoesNotProduceOutput(t *testing.T) { s := NewStdoutSink(Config{Enabled: true}) got := captureStdout(t, func() { - if err := s.Write(context.Background(), makeLog(true)); err != nil { + if err := s.Write(context.Background(), makeLog(1)); err != nil { t.Fatalf("Write(correlated) returned error: %v", err) } - if err := s.Write(context.Background(), makeLog(false)); err != nil { + if err := s.Write(context.Background(), makeLog(0)); err != nil { t.Fatalf("Write(orphan) returned error: %v", err) } }) diff --git a/services/correlator/internal/config/config.go b/services/correlator/internal/config/config.go index 2402c99..c3701de 100644 --- a/services/correlator/internal/config/config.go +++ b/services/correlator/internal/config/config.go @@ -9,6 +9,7 @@ import ( "time" "github.com/antitbone/ja4/correlator/internal/domain" + ja4config "github.com/antitbone/ja4/ja4common/config" "gopkg.in/yaml.v3" ) @@ -29,7 +30,7 @@ type MetricsConfig struct { // LogConfig holds logging configuration. type LogConfig struct { - Level string `yaml:"level"` // DEBUG, INFO, WARN, ERROR + Level string `yaml:"level" env:"LOG_LEVEL"` // DEBUG, INFO, WARN, ERROR } // GetLogLevel returns the log level, defaulting to INFO if not set. @@ -75,15 +76,15 @@ type FileOutputConfig struct { // ClickHouseOutputConfig holds ClickHouse sink configuration. type ClickHouseOutputConfig struct { - Enabled bool `yaml:"enabled"` - DSN string `yaml:"dsn"` - Table string `yaml:"table"` - BatchSize int `yaml:"batch_size"` - FlushIntervalMs int `yaml:"flush_interval_ms"` - MaxBufferSize int `yaml:"max_buffer_size"` + Enabled bool `yaml:"enabled" env:"CLICKHOUSE_ENABLED"` + DSN string `yaml:"dsn" env:"CLICKHOUSE_DSN"` + Table string `yaml:"table" env:"CLICKHOUSE_TABLE"` + BatchSize int `yaml:"batch_size" env:"CLICKHOUSE_BATCH_SIZE"` + FlushIntervalMs int `yaml:"flush_interval_ms" env:"CLICKHOUSE_FLUSH_INTERVAL_MS"` + MaxBufferSize int `yaml:"max_buffer_size" env:"CLICKHOUSE_MAX_BUFFER_SIZE"` DropOnOverflow bool `yaml:"drop_on_overflow"` AsyncInsert bool `yaml:"async_insert"` - TimeoutMs int `yaml:"timeout_ms"` + TimeoutMs int `yaml:"timeout_ms" env:"CLICKHOUSE_TIMEOUT_MS"` } // StdoutOutputConfig holds stdout sink configuration. @@ -165,6 +166,11 @@ func Load(path string) (*Config, error) { return nil, fmt.Errorf("failed to parse config file: %w", err) } + // Surcharge par variables d'environnement (préfixe LOGCORRELATOR_) + if err := ja4config.OverrideFromEnv(cfg, "LOGCORRELATOR"); err != nil { + return nil, fmt.Errorf("failed to apply env overrides: %w", err) + } + if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("invalid config: %w", err) } diff --git a/services/correlator/internal/domain/correlated_log.go b/services/correlator/internal/domain/correlated_log.go index 0a94476..12b06b8 100644 --- a/services/correlator/internal/domain/correlated_log.go +++ b/services/correlator/internal/domain/correlated_log.go @@ -14,7 +14,7 @@ type CorrelatedLog struct { SrcPort int `json:"src_port"` DstIP string `json:"dst_ip,omitempty"` DstPort int `json:"dst_port,omitempty"` - Correlated bool `json:"correlated"` + Correlated int `json:"correlated"` // 0 = orphelin, 1 = corrélé OrphanSide string `json:"orphan_side,omitempty"` Fields map[string]any `json:"-"` // Additional fields, merged at marshal time } @@ -71,7 +71,7 @@ func NewCorrelatedLogFromEvent(event *NormalizedEvent, orphanSide string) Correl SrcPort: event.SrcPort, DstIP: event.DstIP, DstPort: event.DstPort, - Correlated: false, + Correlated: 0, OrphanSide: orphanSide, Fields: fields, } @@ -95,7 +95,7 @@ func NewCorrelatedLog(apacheEvent, networkEvent *NormalizedEvent) CorrelatedLog SrcPort: apacheEvent.SrcPort, DstIP: coalesceString(apacheEvent.DstIP, networkEvent.DstIP), DstPort: coalesceInt(apacheEvent.DstPort, networkEvent.DstPort), - Correlated: true, + Correlated: 1, OrphanSide: "", Fields: fields, } diff --git a/services/correlator/internal/domain/correlated_log_test.go b/services/correlator/internal/domain/correlated_log_test.go index e11879f..ace732a 100644 --- a/services/correlator/internal/domain/correlated_log_test.go +++ b/services/correlator/internal/domain/correlated_log_test.go @@ -64,8 +64,8 @@ func TestNewCorrelatedLogFromEvent(t *testing.T) { log := NewCorrelatedLogFromEvent(event, "A") - if log.Correlated { - t.Error("expected correlated to be false") + if log.Correlated != 0 { + t.Error("expected correlated to be 0") } if log.OrphanSide != "A" { t.Errorf("expected orphan_side A, got %s", log.OrphanSide) @@ -101,8 +101,8 @@ func TestNewCorrelatedLog(t *testing.T) { log := NewCorrelatedLog(apacheEvent, networkEvent) - if !log.Correlated { - t.Error("expected correlated to be true") + if log.Correlated != 1 { + t.Error("expected correlated to be 1") } if log.OrphanSide != "" { t.Errorf("expected orphan_side to be empty, got %s", log.OrphanSide) @@ -273,7 +273,7 @@ func TestMarshalJSON_ReservedKeyProtection(t *testing.T) { Timestamp: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC), SrcIP: "1.2.3.4", SrcPort: 1234, - Correlated: true, + Correlated: 1, Fields: map[string]any{ "src_ip": "EVIL_OVERRIDE", // should be ignored "correlated": false, // should be ignored @@ -294,7 +294,7 @@ func TestMarshalJSON_ReservedKeyProtection(t *testing.T) { if flat["src_ip"] != "1.2.3.4" { t.Errorf("reserved key src_ip should not be overwritten, got %v", flat["src_ip"]) } - if flat["correlated"] != true { + if flat["correlated"] != float64(1) { t.Errorf("reserved key correlated should not be overwritten, got %v", flat["correlated"]) } if flat["extra"] != "value" { @@ -308,7 +308,7 @@ func TestMarshalJSON_OptionalFieldsOmittedWhenZero(t *testing.T) { Timestamp: time.Now(), SrcIP: "1.2.3.4", SrcPort: 1234, - Correlated: false, + Correlated: 0, } data, err := json.Marshal(log) diff --git a/services/correlator/internal/domain/correlation_service_test.go b/services/correlator/internal/domain/correlation_service_test.go index a98ed92..05b57a1 100644 --- a/services/correlator/internal/domain/correlation_service_test.go +++ b/services/correlator/internal/domain/correlation_service_test.go @@ -57,7 +57,7 @@ func TestCorrelationService_Match(t *testing.T) { results = svc.ProcessEvent(networkEvent) if len(results) != 1 { t.Errorf("expected 1 result (correlated), got %d", len(results)) - } else if !results[0].Correlated { + } else if results[0].Correlated == 0 { t.Error("expected correlated result") } } @@ -376,7 +376,7 @@ func TestCorrelationService_DifferentSourceTypes(t *testing.T) { results = svc.ProcessEvent(apacheEvent) if len(results) < 1 { t.Errorf("expected at least 1 result (correlated), got %d", len(results)) - } else if !results[0].Correlated { + } else if results[0].Correlated == 0 { t.Error("expected correlated result") } } @@ -455,7 +455,7 @@ func TestCorrelationService_OneToMany_KeepAlive(t *testing.T) { results = svc.ProcessEvent(apacheEvent1) if len(results) != 1 { t.Errorf("expected 1 correlated result for first A, got %d", len(results)) - } else if !results[0].Correlated { + } else if results[0].Correlated == 0 { t.Error("expected correlated result for first A") } @@ -470,7 +470,7 @@ func TestCorrelationService_OneToMany_KeepAlive(t *testing.T) { results = svc.ProcessEvent(apacheEvent2) if len(results) != 1 { t.Errorf("expected 1 correlated result for second A (Keep-Alive), got %d", len(results)) - } else if !results[0].Correlated { + } else if results[0].Correlated == 0 { t.Error("expected correlated result for second A (Keep-Alive)") } @@ -654,7 +654,7 @@ func TestCorrelationService_KeepAlive_TTLNotBasedOnEventTimestamp(t *testing.T) SrcPort: 8080, } results := svc.ProcessEvent(apacheEvent1) - if len(results) != 1 || !results[0].Correlated { + if len(results) != 1 || results[0].Correlated == 0 { t.Fatalf("expected 1 correlated result, got %d", len(results)) } @@ -667,7 +667,7 @@ func TestCorrelationService_KeepAlive_TTLNotBasedOnEventTimestamp(t *testing.T) SrcPort: 8080, } results = svc.ProcessEvent(apacheEvent2) - if len(results) != 1 || !results[0].Correlated { + if len(results) != 1 || results[0].Correlated == 0 { t.Fatalf("expected 1 correlated result (Keep-Alive), got %d", len(results)) } @@ -740,7 +740,7 @@ func TestCorrelationService_KeepAlive_LongSession(t *testing.T) { Raw: map[string]any{"method": "GET", "path": fmt.Sprintf("/api/%d", i)}, } results := svc.ProcessEvent(apacheEvent) - if len(results) != 1 || !results[0].Correlated { + if len(results) != 1 || results[0].Correlated == 0 { t.Errorf("Request %d at t=%ds (A timestamp t=%v): expected correlation, got %d results", i, i*5, now.Add(time.Duration(i)*500*time.Millisecond), len(results)) } @@ -808,7 +808,7 @@ func TestCorrelationService_ALateThanB_WithinTimeWindow(t *testing.T) { results = svc.ProcessEvent(apacheEvent) if len(results) != 1 { t.Errorf("expected 1 correlated result, got %d", len(results)) - } else if !results[0].Correlated { + } else if results[0].Correlated == 0 { t.Error("expected correlated result") } } @@ -866,7 +866,7 @@ func TestCorrelationService_ALateThanB_AExpiredTooSoon(t *testing.T) { results = svc.ProcessEvent(networkEvent) if len(results) != 1 { t.Errorf("expected 1 correlated result, got %d", len(results)) - } else if !results[0].Correlated { + } else if results[0].Correlated == 0 { t.Error("expected correlated result") } } @@ -921,7 +921,7 @@ func TestCorrelationService_Flush_CorrelatesRemainingEvents(t *testing.T) { flushed := svc.Flush() if len(flushed) != 1 { t.Errorf("expected 1 flushed correlated result, got %d", len(flushed)) - } else if flushed[0].Correlated { + } else if flushed[0].Correlated != 0 { // Good - it's correlated } else { t.Errorf("expected correlated result, got orphan side %s", flushed[0].OrphanSide) @@ -1032,7 +1032,7 @@ func TestCorrelationService_CleanA_RespectsBTTL(t *testing.T) { Raw: map[string]any{"method": "GET"}, } results := svc.ProcessEvent(apacheEvent) - if len(results) != 1 || !results[0].Correlated { + if len(results) != 1 || results[0].Correlated == 0 { t.Fatalf("expected 1 correlated result, got %d", len(results)) } @@ -1127,7 +1127,7 @@ func TestCorrelationService_ApacheEmitDelay_BArrivesDuringDelay(t *testing.T) { results = svc.ProcessEvent(networkEvent) if len(results) != 1 { t.Errorf("expected 1 correlated result, got %d", len(results)) - } else if !results[0].Correlated { + } else if results[0].Correlated == 0 { t.Error("expected correlated result") } @@ -1347,7 +1347,7 @@ results = svc.ProcessEvent(aEvent) if len(results) != 1 { t.Fatalf("expected 1 correlation, got %d", len(results)) } -if !results[0].Correlated { +if results[0].Correlated == 0 { t.Error("expected correlated=true") } } @@ -1371,7 +1371,7 @@ results := svc.ProcessEvent(aEvent) if len(results) != 1 { t.Fatalf("expected 1 result (orphan A, dest port filtered), got %d", len(results)) } -if results[0].Correlated { +if results[0].Correlated != 0 { t.Errorf("expected Correlated=false for dest-port-filtered A event") } if results[0].OrphanSide != "A" { @@ -1438,7 +1438,7 @@ Source: SourceA, Timestamp: now, SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 9999, } results := svc.ProcessEvent(aEvent) -if len(results) != 1 || !results[0].Correlated { +if len(results) != 1 || results[0].Correlated == 0 { t.Errorf("expected 1 correlation on any port when list is empty, got %d", len(results)) } } @@ -1614,9 +1614,9 @@ SrcPort: 5555, } timeProvider.now = now.Add(200 * time.Millisecond) results = svc.ProcessEvent(b) -if len(results) != 1 || !results[0].Correlated { +if len(results) != 1 || results[0].Correlated == 0 { t.Fatalf("B: expected 1 correlated result (A1+B), got %d correlated=%v", -len(results), len(results) > 0 && results[0].Correlated) +len(results), len(results) > 0 && results[0].Correlated != 0) } // A2 arrives on the same Keep-Alive connection — B must still be in buffer @@ -1632,7 +1632,7 @@ results = svc.ProcessEvent(a2) // A2 should correlate with B (still in buffer in one_to_many mode) correlated := false for _, r := range results { -if r.Correlated { +if r.Correlated != 0 { correlated = true } } @@ -1679,7 +1679,7 @@ Timestamp: tp.now, SrcIP: "91.224.92.185", SrcPort: 53471, } -if results := svc.ProcessEvent(a1); len(results) != 1 || !results[0].Correlated { +if results := svc.ProcessEvent(a1); len(results) != 1 || results[0].Correlated == 0 { t.Fatalf("A seq=1: expected 1 correlated result, got %d", len(svc.ProcessEvent(a1))) } @@ -1695,7 +1695,7 @@ results := svc.ProcessEvent(a10) correlated := false for _, r := range results { -if r.Correlated { +if r.Correlated != 0 { correlated = true } } @@ -1748,7 +1748,7 @@ results := svc.ProcessEvent(b) correlated := false for _, r := range results { -if r.Correlated { +if r.Correlated != 0 { correlated = true } } @@ -1809,7 +1809,7 @@ func TestBTTLExpiry_PurgesPendingOrphans(t *testing.T) { // The orphan must have been returned (not silently lost) — data-loss fix orphanFound := false for _, r := range returned { - if !r.Correlated && r.SrcIP == "10.9.9.9" { + if r.Correlated == 0 && r.SrcIP == "10.9.9.9" { orphanFound = true } } @@ -1854,7 +1854,7 @@ emitted := svc.EmitPendingOrphans() if len(emitted) != 1 { t.Fatalf("after delay: expected 1 emitted orphan, got %d", len(emitted)) } -if emitted[0].Correlated { +if emitted[0].Correlated != 0 { t.Errorf("expected orphan (Correlated=false), got Correlated=true") } diff --git a/services/correlator/sql/migrations/04_http2_fields.sql b/services/correlator/sql/migrations/04_http2_fields.sql index b002c45..4054ce0 100644 --- a/services/correlator/sql/migrations/04_http2_fields.sql +++ b/services/correlator/sql/migrations/04_http2_fields.sql @@ -1,8 +1,8 @@ -- === 04_http2_fields.sql — Ajout des colonnes HTTP/2 à http_logs === -- --- Migration pour les déploiements existants : ajoute les 4 colonnes de --- fingerprint HTTP/2 passif extraites par mod_reqin_log via son filtre --- de connexion (APR_HOOK_LAST, AP_FTYPE_CONNECTION). +-- Migration pour les déploiements existants : ajoute les colonnes de +-- fingerprint HTTP/2 passif extraites par mod_reqin_log via son hook +-- process_connection (APR_HOOK_FIRST, AP_MODE_SPECULATIVE). -- -- Format du fingerprint Akamai (h2_fingerprint) : -- Chrome : "1:65536,2:0,4:6291456,6:262144|15663105|0|m,a,s,p" @@ -12,6 +12,7 @@ -- Appliquer avec : -- clickhouse-client --multiquery < 04_http2_fields.sql +-- Champs composites (fingerprint global + valeurs agrégées) ALTER TABLE ja4_logs.http_logs ADD COLUMN IF NOT EXISTS `h2_fingerprint` String DEFAULT '' CODEC(ZSTD(3)); @@ -23,3 +24,29 @@ ALTER TABLE ja4_logs.http_logs ALTER TABLE ja4_logs.http_logs ADD COLUMN IF NOT EXISTS `h2_pseudo_order` LowCardinality(String) DEFAULT ''; + +ALTER TABLE ja4_logs.http_logs + ADD COLUMN IF NOT EXISTS `h2_has_priority` UInt8 DEFAULT 0; + +-- Paramètres SETTINGS individuels (RFC 9113 §6.5.2) +-- Valeur -1 = paramètre absent du preface client (non envoyé) +ALTER TABLE ja4_logs.http_logs + ADD COLUMN IF NOT EXISTS `h2_header_table_size` Int32 DEFAULT -1; + +ALTER TABLE ja4_logs.http_logs + ADD COLUMN IF NOT EXISTS `h2_enable_push` Int32 DEFAULT -1; + +ALTER TABLE ja4_logs.http_logs + ADD COLUMN IF NOT EXISTS `h2_max_concurrent_streams` Int32 DEFAULT -1; + +ALTER TABLE ja4_logs.http_logs + ADD COLUMN IF NOT EXISTS `h2_initial_window_size` Int64 DEFAULT -1; + +ALTER TABLE ja4_logs.http_logs + ADD COLUMN IF NOT EXISTS `h2_max_frame_size` Int32 DEFAULT -1; + +ALTER TABLE ja4_logs.http_logs + ADD COLUMN IF NOT EXISTS `h2_max_header_list_size` Int32 DEFAULT -1; + +ALTER TABLE ja4_logs.http_logs + ADD COLUMN IF NOT EXISTS `h2_enable_connect_protocol` Int32 DEFAULT -1; diff --git a/services/dashboard/backend/routes/api.py b/services/dashboard/backend/routes/api.py index 9d7049c..c585170 100644 --- a/services/dashboard/backend/routes/api.py +++ b/services/dashboard/backend/routes/api.py @@ -5,6 +5,8 @@ from __future__ import annotations import json import logging import os +import re +from collections import defaultdict from pathlib import Path from typing import Any @@ -21,6 +23,34 @@ router = APIRouter(prefix="/api") _DB = safe_identifier(DB_PROCESSING) _DB_LOGS = safe_identifier(DB_LOGS) +# Regex pour extraire les features SHAP/ExIFFI depuis le champ reason +# Format: "SHAP: feat1(+0.123) | feat2(-0.456)" ou "ExIFFI: ..." +_SHAP_RE = re.compile(r"(?:SHAP|ExIFFI):\s*(.+?)(?:\s*\|\s*Threat|$)") +_FEAT_RE = re.compile(r"(\w+)\(([+-]?\d+\.\d+)\)") + + +def _aggregate_shap_importance(reasons: list[str]) -> list[dict]: + """Agrège les valeurs SHAP/ExIFFI extraites des champs reason.""" + totals: dict[str, float] = defaultdict(float) + counts: dict[str, int] = defaultdict(int) + for reason in reasons: + m = _SHAP_RE.search(reason or "") + if not m: + continue + for feat_match in _FEAT_RE.finditer(m.group(1)): + name = feat_match.group(1) + val = abs(float(feat_match.group(2))) + totals[name] += val + counts[name] += 1 + if not totals: + return [] + return sorted( + [{"name": k, "importance": round(totals[k] / counts[k], 4), "occurrences": counts[k]} + for k in totals], + key=lambda x: -x["importance"], + ) + + # Whitelists for sort/order to prevent SQL injection _DETECTION_SORT_COLS = { "detected_at", "src_ip", "ja4", "host", "anomaly_score", @@ -500,7 +530,7 @@ async def features() -> dict[str, Any]: except Exception: pass - # Feature variance (importance proxy) + # Feature variance (importance proxy — fallback si SHAP indisponible) try: variance_rows = query( f"SELECT " @@ -523,6 +553,22 @@ async def features() -> dict[str, Any]: except Exception: pass + # SHAP/ExIFFI — importance réelle extraite des anomalies détectées + try: + reason_rows = query( + f"SELECT reason FROM {_DB}.ml_detected_anomalies " + "WHERE reason LIKE '%SHAP:%' OR reason LIKE '%ExIFFI:%' " + "ORDER BY detected_at DESC LIMIT 500" + ) + if reason_rows: + shap_importance = _aggregate_shap_importance( + [r["reason"] for r in reason_rows] + ) + if shap_importance: + result["shap_importance"] = shap_importance + except Exception: + logger.debug("SHAP importance extraction unavailable") + return result @@ -846,11 +892,11 @@ async def classify_suggested() -> dict[str, Any]: # --------------------------------------------------------------------------- class ClassifyRequest(BaseModel): src_ip: str - classification: str # bot | legitimate | suspicious + classification: str # true_positive | false_positive | suspicious comment: str = "" -_VALID_CLASSIFICATIONS = {"bot", "legitimate", "suspicious"} +_VALID_CLASSIFICATIONS = {"true_positive", "false_positive", "suspicious"} _feedback_table_ensured = False diff --git a/services/dashboard/backend/templates/classify.html b/services/dashboard/backend/templates/classify.html index 0606a74..858b427 100644 --- a/services/dashboard/backend/templates/classify.html +++ b/services/dashboard/backend/templates/classify.html @@ -6,7 +6,7 @@

Feedback analyste SOC

Classifiez les IPs pour entraîner le modèle XGBoost supervisé. Les labels sont utilisés au prochain cycle ML.

Workflow : 1. Consultez les IPs suggérées (non classifiées). 2. Classifiez-les. 3. Les labels alimentent XGBoost au prochain cycle.

-

Bot : Confirme une IP malveillante. Légitime : Faux positif. Suspect : À surveiller.

+

Vrai positif : Confirme un bot détecté. Faux positif : Trafic légitime mal détecté. Suspect : À surveiller.

Source : soc_feedback → XGBoost training

{% endblock %} @@ -15,8 +15,8 @@
Total classifiées
0
-
🤖 Bots confirmés
0
-
✅ Légitimes
0
+
✅ Vrais positifs
0
+
❌ Faux positifs
0
⚠️ Suspects
0
@@ -38,8 +38,8 @@
- - + +
@@ -114,7 +114,7 @@ document.querySelectorAll('.cls-type-btn').forEach(btn => { selectedCls = btn.dataset.cls; const sub = document.getElementById('cls-submit'); sub.disabled = false; - sub.textContent = {bot:'🤖 Classifier comme Bot',legitimate:'✅ Classifier comme Légitime',suspicious:'⚠️ Classifier comme Suspect'}[selectedCls]; + sub.textContent = {true_positive:'✅ Classifier comme Vrai positif',false_positive:'❌ Classifier comme Faux positif',suspicious:'⚠️ Classifier comme Suspect'}[selectedCls]; }; }); @@ -158,13 +158,13 @@ async function loadAll() { const byType = {}; (stats.stats||[]).forEach(r => { byType[r.classification] = r.cnt; }); document.getElementById('kpi-total').textContent = fmtNum(stats.total||0); - document.getElementById('kpi-bots').textContent = fmtNum(byType.bot||0); - document.getElementById('kpi-legit').textContent = fmtNum(byType.legitimate||0); + document.getElementById('kpi-tp').textContent = fmtNum(byType.true_positive||0); + document.getElementById('kpi-fp').textContent = fmtNum(byType.false_positive||0); document.getElementById('kpi-suspect').textContent = fmtNum(byType.suspicious||0); // ── Distribution chart ── - const CLS_COLORS = {bot:'#ef4444',legitimate:'#22c55e',suspicious:'#eab308'}; - const CLS_LABELS = {bot:'🤖 Bot',legitimate:'✅ Légitime',suspicious:'⚠️ Suspect'}; + const CLS_COLORS = {true_positive:'#ef4444',false_positive:'#22c55e',suspicious:'#eab308'}; + const CLS_LABELS = {true_positive:'✅ Vrai positif',false_positive:'❌ Faux positif',suspicious:'⚠️ Suspect'}; if (stats.total > 0) { const el = document.getElementById('dist-chart'); const ch = echarts.init(el); @@ -188,8 +188,8 @@ async function loadAll() { ${row.asn_org ? fmtASN(row.asn_org) : ''} ${fmtCountry(row.country_code)} - - + + 🔍 `).join('') || 'Toutes les IPs ont été classifiées 🎉'; @@ -198,7 +198,7 @@ async function loadAll() { document.getElementById('cls-history').innerHTML = (history.data||[]).map(row => ` ${(row.created_at||'').substring(0,16)} ${fmtIP(row.src_ip)} - ${escapeHtml(row.classification)} + ${escapeHtml(row.classification)} ${escapeHtml(row.comment||'')} `).join('') || 'Aucune classification'; diff --git a/services/dashboard/backend/templates/features.html b/services/dashboard/backend/templates/features.html index 53307b7..cb37e4e 100644 --- a/services/dashboard/backend/templates/features.html +++ b/services/dashboard/backend/templates/features.html @@ -26,12 +26,12 @@
-
Importance des features (Variance) +
Importance des features (SHAP/ExIFFI)

Feature importance

-

Variance inter-classe (ISP vs datacenter) de chaque feature. Les features à haute variance discriminent le mieux bots et humains.

-

Usage : Les features en tête sont les plus utiles pour le modèle EIF. Celles à variance nulle sont élaguées automatiquement.

-

Source : view_ai_features_1h

+

Importance moyenne des features issue de SHAP (XGBoost) ou ExIFFI (EIF). Chaque barre représente la contribution absolue moyenne d'une feature aux décisions d'anomalie récentes.

+

Fallback : Si aucune donnée SHAP/ExIFFI n'est disponible, la variance inter-classe (proxy statistique) est affichée à la place.

+

Source : ml_detected_anomalies.reason (SHAP/ExIFFI) ou view_ai_features_1h (variance)

@@ -158,8 +158,16 @@ async function loadAll() { })); } - // ── Feature Importance (horizontal bar) ── - const fi = (feat.feature_importance || []).sort((a,b) => a.variance - b.variance); + // ── Feature Importance (horizontal bar) — SHAP/ExIFFI si disponible, variance sinon ── + const shapData = feat.shap_importance || []; + const varianceData = (feat.feature_importance || []).sort((a,b) => a.variance - b.variance); + const useShap = shapData.length > 0; + const fi = useShap + ? shapData.slice().sort((a,b) => a.importance - b.importance) + : varianceData; + const impLabel = useShap ? 'SHAP/ExIFFI (|valeur| moyenne)' : 'Variance'; + document.getElementById('importance-title').childNodes[0].textContent = + useShap ? 'Importance des features (SHAP/ExIFFI) ' : 'Importance des features (Variance) '; const impChart = initChart('chart-importance'); if (impChart && fi.length) { impChart.setOption(ecBase({ @@ -175,12 +183,13 @@ async function loadAll() { type:'value', splitLine:{lineStyle:{color:EC_GRID, type:'dashed'}}, axisLabel:{color:EC_TEXT}, - name:'Variance', nameTextStyle:{color:EC_TEXT}, + name: impLabel, nameTextStyle:{color:EC_TEXT}, }, series:[{ - type:'bar', data: fi.map(f => f.variance), barWidth:'60%', + type:'bar', data: fi.map(f => useShap ? f.importance : f.variance), barWidth:'60%', itemStyle:{color: new echarts.graphic.LinearGradient(0,0,1,0,[ - {offset:0, color:'#6366f1'}, {offset:1, color:'#8b5cf6'} + {offset:0, color: useShap ? '#f59e0b' : '#6366f1'}, + {offset:1, color: useShap ? '#ef4444' : '#8b5cf6'} ])}, label:{show:true, position:'right', color:EC_TEXT, fontSize:10, formatter:p => p.value.toFixed(4)}, }] diff --git a/services/mod-reqin-log/src/mod_reqin_log.c b/services/mod-reqin-log/src/mod_reqin_log.c index 2a46b73..55c4446 100644 --- a/services/mod-reqin-log/src/mod_reqin_log.c +++ b/services/mod-reqin-log/src/mod_reqin_log.c @@ -130,15 +130,13 @@ static const char *cmd_set_log_level(cmd_parms *cmd, void *dummy, const char *ar /* Forward declarations for hooks */ static int reqin_log_post_read_request(request_rec *r); +static int reqin_log_log_transaction(request_rec *r); static void reqin_log_child_init(apr_pool_t *p, server_rec *s); static int reqin_log_post_config(apr_pool_t *pconf, apr_pool_t *plog, apr_pool_t *ptemp, server_rec *s); static void reqin_log_register_hooks(apr_pool_t *p); -/* Forward declarations for le filtre HTTP/2 */ -static apr_status_t reqin_h2_filter(ap_filter_t *f, apr_bucket_brigade *bb, - ap_input_mode_t mode, apr_read_type_e block, - apr_off_t readbytes); -static void reqin_h2_add_filter(conn_rec *c, void *csd); +/* Forward declarations for la capture HTTP/2 */ +static int reqin_h2_process_connection(conn_rec *c, void *csd); /* Command table */ static const command_rec reqin_log_cmds[] = { @@ -934,12 +932,16 @@ static void log_request(request_rec *r, reqin_log_config_t *cfg, reqin_log_child format_iso8601(&buf, r->request_time); dynbuf_append(&buf, "\",", 2); - /* timestamp (nanoseconds since epoch, from request reception time) */ + /* timestamp_ns (nanoseconds since epoch, via clock_gettime CLOCK_REALTIME) */ { - apr_uint64_t ns = ((apr_uint64_t)r->request_time) * APR_UINT64_C(1000); + struct timespec ts_now; + apr_uint64_t ns; char ts_buf[32]; + clock_gettime(CLOCK_REALTIME, &ts_now); + ns = (apr_uint64_t)ts_now.tv_sec * APR_UINT64_C(1000000000) + + (apr_uint64_t)ts_now.tv_nsec; snprintf(ts_buf, sizeof(ts_buf), "%" APR_UINT64_T_FMT, ns); - dynbuf_append(&buf, "\"timestamp\":", 12); + dynbuf_append(&buf, "\"timestamp_ns\":", 15); dynbuf_append(&buf, ts_buf, -1); dynbuf_append(&buf, ",", 1); } @@ -989,8 +991,8 @@ static void log_request(request_rec *r, reqin_log_config_t *cfg, reqin_log_child append_json_string(&buf, path); dynbuf_append(&buf, "\",", 2); - /* query */ - dynbuf_append(&buf, "\"query\":\"", 9); + /* query_string */ + dynbuf_append(&buf, "\"query_string\":\"", 16); append_json_string(&buf, query); dynbuf_append(&buf, "\",", 2); @@ -1013,11 +1015,15 @@ static void log_request(request_rec *r, reqin_log_config_t *cfg, reqin_log_child } /* client_headers - ordered list of all header names as received from the client, - * preserving original order and case */ + * preserving original order and case. + * headers_raw - all headers concatenated "Name: Value\r\n" preserving order. + * header_order_signature - FNV-1a 64-bit hash of the ordered header names. */ { const apr_array_header_t *arr = apr_table_elts(r->headers_in); const apr_table_entry_t *elts = (const apr_table_entry_t *)arr->elts; int first = 1; + apr_uint64_t fnv_hash = APR_UINT64_C(14695981039346656037); + char hash_buf[24]; dynbuf_append(&buf, ",\"client_headers\":[", 19); for (int i = 0; i < arr->nelts; i++) { @@ -1029,9 +1035,35 @@ static void log_request(request_rec *r, reqin_log_config_t *cfg, reqin_log_child append_json_string(&buf, elts[i].key); dynbuf_append(&buf, "\"", 1); first = 0; + /* FNV-1a sur chaque octet du nom de header */ + for (const char *p = elts[i].key; *p; p++) { + fnv_hash ^= (apr_uint64_t)(unsigned char)*p; + fnv_hash *= APR_UINT64_C(1099511628211); + } + /* Séparateur entre noms */ + fnv_hash ^= (apr_uint64_t)'\n'; + fnv_hash *= APR_UINT64_C(1099511628211); } } dynbuf_append(&buf, "]", 1); + + /* headers_raw — en-têtes bruts dans leur ordre d'émission */ + dynbuf_append(&buf, ",\"headers_raw\":\"", 16); + for (int i = 0; i < arr->nelts; i++) { + if (elts[i].key != NULL) { + append_json_string(&buf, elts[i].key); + dynbuf_append(&buf, ": ", 2); + append_json_string(&buf, elts[i].val ? elts[i].val : ""); + dynbuf_append(&buf, "\\r\\n", 4); + } + } + dynbuf_append(&buf, "\"", 1); + + /* header_order_signature — FNV-1a 64-bit hash de l'ordre des noms */ + snprintf(hash_buf, sizeof(hash_buf), "%" APR_UINT64_T_FMT, fnv_hash); + dynbuf_append(&buf, ",\"header_order_signature\":\"", (apr_size_t)-1); + dynbuf_append(&buf, hash_buf, -1); + dynbuf_append(&buf, "\"", 1); } /* Check buffer size before adding headers to prevent memory exhaustion */ @@ -1096,14 +1128,20 @@ static void log_request(request_rec *r, reqin_log_config_t *cfg, reqin_log_child } } - /* Champs HTTP/2 passif depuis les notes de connexion (vides si HTTP/1.x) */ + /* Champs HTTP/2 passif depuis les notes de connexion (vides si HTTP/1.x). + * Pour les connexions HTTP/2, mod_http2 crée des connexions secondaires (c2) + * par stream. Le preface H2 est stocké dans les notes de la connexion + * primaire (c1), accessible via r->connection->master. */ { - const char *h2_fp = apr_table_get(r->connection->notes, H2_NOTE_FINGERPRINT); - const char *h2_set = apr_table_get(r->connection->notes, H2_NOTE_SETTINGS); - const char *h2_wu = apr_table_get(r->connection->notes, H2_NOTE_WUPDATE); - const char *h2_ps = apr_table_get(r->connection->notes, H2_NOTE_PSEUDO_ORDER); + conn_rec *c1 = r->connection->master ? r->connection->master : r->connection; + const char *h2_fp = apr_table_get(c1->notes, H2_NOTE_FINGERPRINT); + const char *h2_set = apr_table_get(c1->notes, H2_NOTE_SETTINGS); + const char *h2_wu = apr_table_get(c1->notes, H2_NOTE_WUPDATE); + const char *h2_ps = apr_table_get(c1->notes, H2_NOTE_PSEUDO_ORDER); + const char *h2_pri = apr_table_get(c1->notes, H2_NOTE_HAS_PRIORITY); if (h2_set && h2_set[0] != '\0') { + /* Champs composites (rétrocompatibilité + fingerprint matching) */ dynbuf_append(&buf, ",\"h2_fingerprint\":\"", (apr_size_t)-1); append_json_string(&buf, h2_fp ? h2_fp : ""); dynbuf_append(&buf, "\",\"h2_settings_fp\":\"", (apr_size_t)-1); @@ -1113,11 +1151,35 @@ static void log_request(request_rec *r, reqin_log_config_t *cfg, reqin_log_child dynbuf_append(&buf, ",\"h2_pseudo_order\":\"", (apr_size_t)-1); append_json_string(&buf, h2_ps ? h2_ps : ""); dynbuf_append(&buf, "\"", 1); + dynbuf_append(&buf, ",\"h2_has_priority\":", (apr_size_t)-1); + dynbuf_append(&buf, (h2_pri && h2_pri[0] == '1') ? "1" : "0", 1); + + /* Champs SETTINGS individuels (RFC 9113 §6.5.2). + * Émis uniquement si le client a envoyé le paramètre + * (-1 / absent = non émis → le champ JSON est absent). */ + static const struct { const char *note; const char *json; } sfields[] = { + {H2_NOTE_SET_HEADER_TABLE_SIZE, ",\"h2_header_table_size\":"}, + {H2_NOTE_SET_ENABLE_PUSH, ",\"h2_enable_push\":"}, + {H2_NOTE_SET_MAX_CONCURRENT_STREAMS, ",\"h2_max_concurrent_streams\":"}, + {H2_NOTE_SET_INITIAL_WINDOW_SIZE, ",\"h2_initial_window_size\":"}, + {H2_NOTE_SET_MAX_FRAME_SIZE, ",\"h2_max_frame_size\":"}, + {H2_NOTE_SET_MAX_HEADER_LIST_SIZE, ",\"h2_max_header_list_size\":"}, + {H2_NOTE_SET_ENABLE_CONNECT, ",\"h2_enable_connect_protocol\":"}, + }; + int si; + for (si = 0; si < (int)(sizeof(sfields) / sizeof(sfields[0])); si++) { + const char *v = apr_table_get(c1->notes, sfields[si].note); + if (v) { + dynbuf_append(&buf, sfields[si].json, (apr_size_t)-1); + dynbuf_append(&buf, v, (apr_size_t)-1); + } + } } } - dynbuf_append(&buf, "}\n", 2); - + /* Ne pas fermer le JSON ici — les champs de réponse (status_code, + * response_size, duration_ms) seront ajoutés par le hook log_transaction + * qui s'exécute après le traitement complet de la requête. */ if (buf.len > MAX_JSON_SIZE) { apr_time_t now = apr_time_now(); apr_time_t error_interval = apr_time_from_sec(cfg->error_report_interval); @@ -1137,7 +1199,11 @@ static void log_request(request_rec *r, reqin_log_config_t *cfg, reqin_log_child return; } - write_to_socket(buf.data, buf.len, s, cfg, state); + /* Stocker le JSON partiel dans les notes de la requête pour log_transaction */ + { + char *partial = apr_pstrmemdup(r->pool, buf.data, buf.len); + apr_table_setn(r->notes, "reqin_partial_json", partial); + } } /* ====== Fingerprinting HTTP/2 passif ====== */ @@ -1307,6 +1373,13 @@ static void h2_parse_preface(conn_rec *c, const char *buf, apr_size_t len) int has_priority = 0; int settings_pos_out = 0; + /* Valeurs individuelles des paramètres SETTINGS (RFC 9113 §6.5.2). + * -1 signifie « absent du preface client » (distinction importante : + * un paramètre absent ≠ un paramètre à 0). */ + int64_t setting_vals[9]; + int i; + for (i = 0; i < 9; i++) setting_vals[i] = -1; + /* Vérification du magic HTTP/2 */ if (len < MAGIC_LEN || memcmp(buf, H2_MAGIC, MAGIC_LEN) != 0) return; @@ -1346,6 +1419,12 @@ static void h2_parse_preface(conn_rec *c, const char *buf, apr_size_t len) settings_pos_out += snprintf(settings_buf + settings_pos_out, (int)sizeof(settings_buf) - settings_pos_out, "%u:%u", id, val); + + /* Stocker la valeur individuelle (IDs 1-6 et 8) */ + if (id >= 1 && id <= 6) + setting_vals[id] = (int64_t)val; + else if (id == 8) + setting_vals[8] = (int64_t)val; } } else if (type == 0x08u && stream_id == 0) { @@ -1412,6 +1491,26 @@ static void h2_parse_preface(conn_rec *c, const char *buf, apr_size_t len) apr_table_setn(c->notes, H2_NOTE_SETTINGS, apr_pstrdup(c->pool, settings_buf)); apr_table_setn(c->notes, H2_NOTE_WUPDATE, apr_pstrdup(c->pool, wupdate_buf)); apr_table_setn(c->notes, H2_NOTE_PSEUDO_ORDER, apr_pstrdup(c->pool, pseudo_buf)); + apr_table_setn(c->notes, H2_NOTE_HAS_PRIORITY, has_priority ? "1" : "0"); + + /* Stocker chaque paramètre SETTINGS individuel (absent = note absente) */ + static const struct { int id; const char *note; } smap[] = { + {1, H2_NOTE_SET_HEADER_TABLE_SIZE}, + {2, H2_NOTE_SET_ENABLE_PUSH}, + {3, H2_NOTE_SET_MAX_CONCURRENT_STREAMS}, + {4, H2_NOTE_SET_INITIAL_WINDOW_SIZE}, + {5, H2_NOTE_SET_MAX_FRAME_SIZE}, + {6, H2_NOTE_SET_MAX_HEADER_LIST_SIZE}, + {8, H2_NOTE_SET_ENABLE_CONNECT}, + }; + for (i = 0; i < (int)(sizeof(smap) / sizeof(smap[0])); i++) { + int64_t v = setting_vals[smap[i].id]; + if (v >= 0) { + char tmp[16]; + snprintf(tmp, sizeof(tmp), "%u", (uint32_t)v); + apr_table_setn(c->notes, smap[i].note, apr_pstrdup(c->pool, tmp)); + } + } } /** @@ -1419,9 +1518,13 @@ static void h2_parse_preface(conn_rec *c, const char *buf, apr_size_t len) * * S'injecte entre le filtre SSL (déchiffrement) et mod_http2 grâce à sa * priorité AP_FTYPE_CONNECTION et à l'inscription via APR_HOOK_LAST. - * À la première invocation, effectue une lecture spéculative non-destructive - * (AP_MODE_SPECULATIVE) de H2_PEEK_SIZE octets, parse le preface HTTP/2, - * stocke les résultats dans c->notes, puis se retire de la chaîne. + * + * Stratégie : au lieu d'une lecture spéculative séparée (qui interfère avec + * le handshake SSL et le traitement mod_http2), ce filtre se greffe sur les + * lectures réelles. Il laisse passer les lectures spéculatives (utilisées par + * mod_http2 pour détecter le magic H2) sans intervenir, puis sur la première + * lecture non-spéculative, il inspecte les données déjà lues (via + * apr_brigade_flatten, qui copie sans consommer) pour parser le preface H2. * * @param f Filtre courant. * @param bb Brigade cible pour la lecture réelle. @@ -1430,51 +1533,58 @@ static void h2_parse_preface(conn_rec *c, const char *buf, apr_size_t len) * @param readbytes Nombre d'octets demandés. * @return Statut APR de la lecture réelle. */ -static apr_status_t reqin_h2_filter(ap_filter_t *f, apr_bucket_brigade *bb, - ap_input_mode_t mode, apr_read_type_e block, - apr_off_t readbytes) -{ - conn_rec *c = f->c; - - if (!apr_table_get(c->notes, H2_NOTE_PARSED)) { - /* Lecture spéculative : ne consomme pas les données du flux */ - apr_bucket_brigade *peek = apr_brigade_create(c->pool, c->bucket_alloc); - apr_status_t rv = ap_get_brigade(f->next, peek, - AP_MODE_SPECULATIVE, APR_BLOCK_READ, - H2_PEEK_SIZE); - if (rv == APR_SUCCESS) { - char peek_buf[H2_PEEK_SIZE]; - apr_size_t peek_len = sizeof(peek_buf); - if (apr_brigade_flatten(peek, peek_buf, &peek_len) == APR_SUCCESS - && peek_len > 0) { - h2_parse_preface(c, peek_buf, peek_len); - } - } - apr_brigade_cleanup(peek); - apr_table_setn(c->notes, H2_NOTE_PARSED, "1"); - } - - /* Le filtre n'est nécessaire qu'une seule fois par connexion */ - ap_remove_input_filter(f); - - return ap_get_brigade(f->next, bb, mode, block, readbytes); -} - /** - * @brief Hook pre_connection — enregistre le filtre HTTP/2 sur chaque connexion. + * @brief Filtre d'entrée de connexion pour la capture passive du preface HTTP/2. * - * Appelé à l'établissement de chaque connexion. Inscrit reqin_h2_filter dans - * la chaîne d'entrée avec APR_HOOK_LAST, ce qui garantit son positionnement - * après le filtre SSL (qui s'inscrit avec APR_HOOK_MIDDLE) et donc son accès - * au flux HTTP/2 en clair. + * S'injecte entre le filtre SSL (déchiffrement) et mod_http2 grâce à sa + * priorité AP_FTYPE_CONNECTION et à l'inscription via APR_HOOK_LAST. + * + * Stratégie : au lieu d'une lecture spéculative séparée (qui interfère avec + * le traitement mod_http2), ce filtre se greffe sur les lectures réelles. + * Il laisse passer les lectures spéculatives (utilisées par mod_http2 pour + * détecter le magic H2) sans intervenir, puis sur la première lecture + * non-spéculative, il inspecte les données déjà lues (via apr_brigade_flatten, + * qui copie sans consommer) pour parser le preface H2. + * + * @param f Filtre courant. + * @param bb Brigade cible pour la lecture réelle. + * @param mode Mode de lecture demandé (transmis à f->next). + * @param block Type de blocage (transmis à f->next). + * @param readbytes Nombre d'octets demandés. + * @return Statut APR de la lecture réelle. + */ +/** + * @brief Hook process_connection — capture passive du preface HTTP/2. + * + * S'exécute AVANT mod_http2 (APR_HOOK_FIRST) et effectue une lecture + * spéculative non-destructive de H2_PEEK_SIZE octets sur la connexion. + * Si le preface HTTP/2 (RFC 9113 §3.4) est détecté, parse les frames + * SETTINGS, WINDOW_UPDATE et le premier HEADERS, puis stocke les + * résultats dans c->notes. Retourne DECLINED pour laisser mod_http2 + * (ou le handler HTTP/1.x) prendre le relais. * * @param c Connexion Apache. * @param csd Socket descriptor (non utilisé). + * @return DECLINED — ne gère pas la connexion, laisse les hooks suivants. */ -static void reqin_h2_add_filter(conn_rec *c, void *csd) +static int reqin_h2_process_connection(conn_rec *c, void *csd) { (void)csd; - ap_add_input_filter(H2_FILTER_NAME, NULL, NULL, c); + + apr_bucket_brigade *bb = apr_brigade_create(c->pool, c->bucket_alloc); + apr_status_t rv = ap_get_brigade(c->input_filters, bb, + AP_MODE_SPECULATIVE, APR_BLOCK_READ, + H2_PEEK_SIZE); + if (rv == APR_SUCCESS) { + char buf[H2_PEEK_SIZE]; + apr_size_t len = sizeof(buf); + if (apr_brigade_flatten(bb, buf, &len) == APR_SUCCESS && len >= 24) { + h2_parse_preface(c, buf, len); + } + } + apr_brigade_destroy(bb); + + return DECLINED; } /* ====== Hooks Apache ====== */ @@ -1506,6 +1616,73 @@ static int reqin_log_post_read_request(request_rec *r) return DECLINED; } +/** + * @brief Hook log_transaction — complète le JSON avec les champs de réponse et envoie. + * + * Récupère le JSON partiel stocké par post_read_request dans r->notes, + * ajoute status_code, response_size et duration_ms, puis envoie le JSON + * complet via le socket Unix. + * + * @param r request_rec — la requête traitée. + * @return DECLINED pour permettre aux autres modules de logger. + */ +static int reqin_log_log_transaction(request_rec *r) +{ + reqin_log_server_conf_t *srv_conf; + reqin_log_config_t *cfg; + reqin_log_child_state_t *state; + const char *partial; + dynbuf_t buf; + char num_buf[32]; + apr_time_t duration_us; + + if (r->main != NULL || r->prev != NULL) { + return DECLINED; + } + + srv_conf = get_server_conf(r->server); + if (srv_conf == NULL || srv_conf->config == NULL || + !srv_conf->config->enabled || srv_conf->config->socket_path == NULL) { + return DECLINED; + } + + partial = apr_table_get(r->notes, "reqin_partial_json"); + if (partial == NULL) { + return DECLINED; + } + + cfg = srv_conf->config; + state = &srv_conf->child_state; + + dynbuf_init(&buf, r->pool, 4096); + dynbuf_append(&buf, partial, -1); + + /* status_code */ + snprintf(num_buf, sizeof(num_buf), "%d", r->status); + dynbuf_append(&buf, ",\"status_code\":", 15); + dynbuf_append(&buf, num_buf, -1); + + /* response_size (bytes sent to client) */ + snprintf(num_buf, sizeof(num_buf), "%" APR_INT64_T_FMT, (apr_int64_t)r->bytes_sent); + dynbuf_append(&buf, ",\"response_size\":", 17); + dynbuf_append(&buf, num_buf, -1); + + /* duration_ms (request processing time in milliseconds) */ + duration_us = apr_time_now() - r->request_time; + snprintf(num_buf, sizeof(num_buf), "%" APR_INT64_T_FMT, (apr_int64_t)(duration_us / 1000)); + dynbuf_append(&buf, ",\"duration_ms\":", 15); + dynbuf_append(&buf, num_buf, -1); + + /* Fermer le JSON */ + dynbuf_append(&buf, "}\n", 2); + + if (buf.len <= MAX_JSON_SIZE) { + write_to_socket(buf.data, buf.len, r->server, cfg, state); + } + + return DECLINED; +} + /** * @brief Hook child_init — initialise l'état du processus enfant et établit la connexion socket. * @@ -1627,11 +1804,11 @@ static int reqin_log_post_config(apr_pool_t *pconf, apr_pool_t *plog, apr_pool_t static void reqin_log_register_hooks(apr_pool_t *p) { (void)p; - /* Enregistrement du filtre de connexion HTTP/2 (avant les hooks de requête) */ - ap_register_input_filter(H2_FILTER_NAME, reqin_h2_filter, NULL, AP_FTYPE_CONNECTION); - ap_hook_pre_connection(reqin_h2_add_filter, NULL, NULL, APR_HOOK_LAST); + /* Hook process_connection AVANT mod_http2 pour capturer le preface H2 */ + ap_hook_process_connection(reqin_h2_process_connection, NULL, NULL, APR_HOOK_FIRST); ap_hook_post_config(reqin_log_post_config, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_post_read_request(reqin_log_post_read_request, NULL, NULL, APR_HOOK_MIDDLE); + ap_hook_log_transaction(reqin_log_log_transaction, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_child_init(reqin_log_child_init, NULL, NULL, APR_HOOK_MIDDLE); } diff --git a/services/mod-reqin-log/src/mod_reqin_log.h b/services/mod-reqin-log/src/mod_reqin_log.h index 6a693c9..938c370 100644 --- a/services/mod-reqin-log/src/mod_reqin_log.h +++ b/services/mod-reqin-log/src/mod_reqin_log.h @@ -36,14 +36,21 @@ extern module AP_MODULE_DECLARE_DATA reqin_log_module; /* ====== Fingerprinting HTTP/2 passif ====== */ -/* Nom du filtre d'entrée de connexion pour la capture du preface HTTP/2 */ -#define H2_FILTER_NAME "REQIN_H2_PEEK" - /* Clés des notes de connexion stockant le fingerprint HTTP/2 parsé */ #define H2_NOTE_FINGERPRINT "reqin_h2_fp" /* Fingerprint Akamai complet */ #define H2_NOTE_SETTINGS "reqin_h2_set" /* Entrées SETTINGS brutes */ #define H2_NOTE_WUPDATE "reqin_h2_wu" /* Incrément WINDOW_UPDATE */ #define H2_NOTE_PSEUDO_ORDER "reqin_h2_ps" /* Ordre pseudo-headers */ +#define H2_NOTE_HAS_PRIORITY "reqin_h2_pri" /* Flag PRIORITY présent */ #define H2_NOTE_PARSED "reqin_h2_done" /* Marqueur "déjà parsé" */ +/* Clés des notes pour chaque paramètre SETTINGS individuel (RFC 9113 §6.5.2) */ +#define H2_NOTE_SET_HEADER_TABLE_SIZE "reqin_h2_s1" /* ID 1 */ +#define H2_NOTE_SET_ENABLE_PUSH "reqin_h2_s2" /* ID 2 */ +#define H2_NOTE_SET_MAX_CONCURRENT_STREAMS "reqin_h2_s3" /* ID 3 */ +#define H2_NOTE_SET_INITIAL_WINDOW_SIZE "reqin_h2_s4" /* ID 4 */ +#define H2_NOTE_SET_MAX_FRAME_SIZE "reqin_h2_s5" /* ID 5 */ +#define H2_NOTE_SET_MAX_HEADER_LIST_SIZE "reqin_h2_s6" /* ID 6 */ +#define H2_NOTE_SET_ENABLE_CONNECT "reqin_h2_s8" /* ID 8 */ + #endif /* MOD_REQIN_LOG_H */ diff --git a/services/sentinel/api/types.go b/services/sentinel/api/types.go index d75d4f3..c929287 100644 --- a/services/sentinel/api/types.go +++ b/services/sentinel/api/types.go @@ -42,6 +42,7 @@ type TCPMeta struct { MSS uint16 `json:"mss,omitempty"` WindowScale uint8 `json:"window_scale,omitempty"` Options []string `json:"options"` + OptionKinds []uint8 `json:"-"` // Raw TCP option kind numbers for JA4T } // RawPacket represents a raw packet captured from the network @@ -73,6 +74,7 @@ type TLSClientHello struct { type Fingerprints struct { JA4 string `json:"ja4"` JA4Hash string `json:"ja4_hash,omitempty"` // Internal use, not serialized to LogRecord + JA4T string `json:"ja4t,omitempty"` JA3 string `json:"ja3,omitempty"` JA3Hash string `json:"ja3_hash,omitempty"` } @@ -111,6 +113,7 @@ type LogRecord struct { // Fingerprints // Note: ja4_hash is NOT included - the JA4 format already includes its own hash portions JA4 string `json:"ja4"` + JA4T string `json:"ja4t,omitempty"` JA3 string `json:"ja3,omitempty"` JA3Hash string `json:"ja3_hash,omitempty"` @@ -265,6 +268,7 @@ func NewLogRecord(ch TLSClientHello, fp *Fingerprints) LogRecord { if fp != nil { rec.JA4 = fp.JA4 + rec.JA4T = fp.JA4T rec.JA3 = fp.JA3 rec.JA3Hash = fp.JA3Hash } diff --git a/services/sentinel/internal/fingerprint/engine.go b/services/sentinel/internal/fingerprint/engine.go index d82d7a5..5cc5417 100644 --- a/services/sentinel/internal/fingerprint/engine.go +++ b/services/sentinel/internal/fingerprint/engine.go @@ -4,6 +4,8 @@ package fingerprint import ( "encoding/binary" "fmt" + "strconv" + "strings" "github.com/antitbone/ja4/sentinel/api" @@ -59,14 +61,33 @@ func (e *EngineImpl) FromClientHello(ch api.TLSClientHello) (*api.Fingerprints, // This is kept for internal use but NOT serialized to LogRecord ja4Hash := extractJA4Hash(ja4) + // Generate JA4T fingerprint from TCP SYN parameters + ja4t := computeJA4T(ch.TCPMeta) + return &api.Fingerprints{ JA4: ja4, JA4Hash: ja4Hash, // Internal use only - not serialized to LogRecord + JA4T: ja4t, JA3: ja3, JA3Hash: ja3Hash, }, nil } +// computeJA4T génère l'empreinte JA4T à partir des métadonnées TCP SYN. +// Format : {WindowSize}_{OptionKinds}_{WindowScale}_{MSS} +func computeJA4T(tcp api.TCPMeta) string { + optStr := "" + if len(tcp.OptionKinds) > 0 { + parts := make([]string, len(tcp.OptionKinds)) + for i, k := range tcp.OptionKinds { + parts[i] = strconv.Itoa(int(k)) + } + optStr = strings.Join(parts, "-") + } + + return fmt.Sprintf("%d_%s_%d_%d", tcp.WindowSize, optStr, tcp.WindowScale, tcp.MSS) +} + // extractJA4Hash extracts the hash portion from a JA4 string // JA4 format: __ -> returns "_" func extractJA4Hash(ja4 string) string { diff --git a/services/sentinel/internal/fingerprint/engine_test.go b/services/sentinel/internal/fingerprint/engine_test.go index 519157d..b6bbd62 100644 --- a/services/sentinel/internal/fingerprint/engine_test.go +++ b/services/sentinel/internal/fingerprint/engine_test.go @@ -487,3 +487,99 @@ t.Errorf("expected 'somehash', got %q", hash) var _ interface { FromClientHello(api.TLSClientHello) (*api.Fingerprints, error) } = (*EngineImpl)(nil) + +// TestComputeJA4T tests the JA4T fingerprint generation. +func TestComputeJA4T(t *testing.T) { + tests := []struct { + name string + tcp api.TCPMeta + want string + }{ + { + name: "linux_5x_typical", + tcp: api.TCPMeta{ + WindowSize: 64240, + OptionKinds: []uint8{2, 4, 8, 1, 3}, + WindowScale: 7, + MSS: 1460, + }, + want: "64240_2-4-8-1-3_7_1460", + }, + { + name: "windows_11_typical", + tcp: api.TCPMeta{ + WindowSize: 64240, + OptionKinds: []uint8{2, 4, 8, 1, 3}, + WindowScale: 8, + MSS: 1460, + }, + want: "64240_2-4-8-1-3_8_1460", + }, + { + name: "macos_14_typical", + tcp: api.TCPMeta{ + WindowSize: 65535, + OptionKinds: []uint8{2, 4, 8, 1, 3}, + WindowScale: 6, + MSS: 1460, + }, + want: "65535_2-4-8-1-3_6_1460", + }, + { + name: "no_options", + tcp: api.TCPMeta{ + WindowSize: 8192, + OptionKinds: nil, + WindowScale: 0, + MSS: 0, + }, + want: "8192__0_0", + }, + { + name: "windows_no_ts", + tcp: api.TCPMeta{ + WindowSize: 8192, + OptionKinds: []uint8{2, 4, 1, 3}, + WindowScale: 2, + MSS: 1460, + }, + want: "8192_2-4-1-3_2_1460", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := computeJA4T(tt.tcp) + if got != tt.want { + t.Errorf("computeJA4T() = %q, want %q", got, tt.want) + } + }) + } +} + +// TestFromClientHello_JA4T_Populated tests that JA4T is populated in FromClientHello. +func TestFromClientHello_JA4T_Populated(t *testing.T) { + clientHello := buildMinimalClientHelloForTest() + + ch := api.TLSClientHello{ + Payload: clientHello, + TCPMeta: api.TCPMeta{ + WindowSize: 64240, + MSS: 1460, + WindowScale: 7, + OptionKinds: []uint8{2, 4, 8, 1, 3}, + Options: []string{"MSS", "SACK", "TS", "NOP", "WS"}, + }, + } + + engine := NewEngine() + fp, err := engine.FromClientHello(ch) + if err != nil { + t.Fatalf("FromClientHello() error = %v", err) + } + + expected := "64240_2-4-8-1-3_7_1460" + if fp.JA4T != expected { + t.Errorf("JA4T = %q, want %q", fp.JA4T, expected) + } +} diff --git a/services/sentinel/internal/tlsparse/parser.go b/services/sentinel/internal/tlsparse/parser.go index 33733c1..48815d6 100644 --- a/services/sentinel/internal/tlsparse/parser.go +++ b/services/sentinel/internal/tlsparse/parser.go @@ -615,8 +615,9 @@ func extractIPMeta(ipLayer gopacket.Layer) api.IPMeta { // extractTCPMeta extracts TCP metadata from the TCP layer func extractTCPMeta(tcp *layers.TCP) api.TCPMeta { meta := api.TCPMeta{ - WindowSize: tcp.Window, - Options: make([]string, 0, len(tcp.Options)), + WindowSize: tcp.Window, + Options: make([]string, 0, len(tcp.Options)), + OptionKinds: make([]uint8, 0, len(tcp.Options)), } // Parse TCP options @@ -635,20 +636,26 @@ func extractTCPMeta(tcp *layers.TCP) api.TCPMeta { } else { meta.Options = append(meta.Options, "MSS_INVALID") } + meta.OptionKinds = append(meta.OptionKinds, uint8(opt.OptionType)) case layers.TCPOptionKindWindowScale: if len(opt.OptionData) > 0 { meta.WindowScale = opt.OptionData[0] } meta.Options = append(meta.Options, "WS") + meta.OptionKinds = append(meta.OptionKinds, uint8(opt.OptionType)) case layers.TCPOptionKindSACKPermitted: meta.Options = append(meta.Options, "SACK") + meta.OptionKinds = append(meta.OptionKinds, uint8(opt.OptionType)) case layers.TCPOptionKindSACK: // SACK blocks (actual SACK data, not just permitted) meta.Options = append(meta.Options, "SACK") + meta.OptionKinds = append(meta.OptionKinds, uint8(opt.OptionType)) case layers.TCPOptionKindTimestamps: meta.Options = append(meta.Options, "TS") + meta.OptionKinds = append(meta.OptionKinds, uint8(opt.OptionType)) default: meta.Options = append(meta.Options, fmt.Sprintf("OPT%d", opt.OptionType)) + meta.OptionKinds = append(meta.OptionKinds, uint8(opt.OptionType)) } } diff --git a/shared/clickhouse/01_raw_tables.sql b/shared/clickhouse/01_raw_tables.sql index d2c0fc6..3c41207 100644 --- a/shared/clickhouse/01_raw_tables.sql +++ b/shared/clickhouse/01_raw_tables.sql @@ -10,7 +10,7 @@ CREATE TABLE IF NOT EXISTS ja4_logs.http_logs_raw ENGINE = MergeTree PARTITION BY toDate(ingest_time) ORDER BY ingest_time -TTL ingest_time + INTERVAL 1 DAY +TTL ingest_time + INTERVAL 2 HOUR SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; diff --git a/shared/clickhouse/04_mv_http_logs.sql b/shared/clickhouse/04_mv_http_logs.sql index 16d0f1e..f07d2a6 100644 --- a/shared/clickhouse/04_mv_http_logs.sql +++ b/shared/clickhouse/04_mv_http_logs.sql @@ -46,6 +46,15 @@ CREATE TABLE IF NOT EXISTS ja4_logs.http_logs `b_timestamp` UInt64, `conn_id` String CODEC(ZSTD(3)), + -- Response metadata (captured at log_transaction phase) + `status_code` UInt16 DEFAULT 0, + `response_size` UInt64 DEFAULT 0, + `duration_ms` UInt64 DEFAULT 0, + + -- Header fingerprinting + `headers_raw` String DEFAULT '' CODEC(ZSTD(3)), + `header_order_signature` String DEFAULT '' CODEC(ZSTD(3)), + -- IP metadata `ip_meta_df` UInt8, `ip_meta_id` UInt16, @@ -94,6 +103,17 @@ CREATE TABLE IF NOT EXISTS ja4_logs.http_logs `h2_settings_fp` String DEFAULT '' CODEC(ZSTD(3)), `h2_window_update` UInt32 DEFAULT 0, `h2_pseudo_order` LowCardinality(String) DEFAULT '', + `h2_has_priority` UInt8 DEFAULT 0, + + -- Paramètres SETTINGS HTTP/2 individuels (RFC 9113 §6.5.2) + -- -1 = absent du preface client (le client n'a pas envoyé ce paramètre) + `h2_header_table_size` Int32 DEFAULT -1, + `h2_enable_push` Int32 DEFAULT -1, + `h2_max_concurrent_streams` Int32 DEFAULT -1, + `h2_initial_window_size` Int64 DEFAULT -1, + `h2_max_frame_size` Int32 DEFAULT -1, + `h2_max_header_list_size` Int32 DEFAULT -1, + `h2_enable_connect_protocol` Int32 DEFAULT -1, -- Index bloom_filter sur src_ip : les requêtes WHERE src_ip = X sautent -- les granules qui ne contiennent pas cette IP (~90% des granules en pratique). @@ -104,7 +124,7 @@ CREATE TABLE IF NOT EXISTS ja4_logs.http_logs ENGINE = MergeTree PARTITION BY log_date ORDER BY (time, src_ip, dst_ip, ja4) -TTL log_date + INTERVAL 7 DAY +TTL log_date + INTERVAL 30 DAY SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; @@ -142,14 +162,19 @@ SELECT coalesce(JSONExtractString(raw_json, 'scheme'), '') AS scheme, coalesce(JSONExtractString(raw_json, 'host'), '') AS host, coalesce(JSONExtractString(raw_json, 'path'), '') AS path, - coalesce(JSONExtractString(raw_json, 'query'), '') AS query, + coalesce(JSONExtractString(raw_json, 'query_string'), JSONExtractString(raw_json, 'query'), '') AS query, coalesce(JSONExtractString(raw_json, 'http_version'), '') AS http_version, coalesce(JSONExtractString(raw_json, 'orphan_side'), '') AS orphan_side, - toUInt8(coalesce(JSONExtractBool(raw_json, 'correlated'), 0)) AS correlated, + toUInt8(coalesce(JSONExtractUInt(raw_json, 'correlated'), 0)) AS correlated, toUInt16(coalesce(JSONExtractUInt(raw_json, 'keepalives'), 0)) AS keepalives, coalesce(JSONExtractUInt(raw_json, 'a_timestamp'), 0) AS a_timestamp, coalesce(JSONExtractUInt(raw_json, 'b_timestamp'), 0) AS b_timestamp, coalesce(JSONExtractString(raw_json, 'conn_id'), '') AS conn_id, + toUInt16(coalesce(JSONExtractUInt(raw_json, 'status_code'), 0)) AS status_code, + coalesce(JSONExtractUInt(raw_json, 'response_size'), 0) AS response_size, + coalesce(JSONExtractUInt(raw_json, 'duration_ms'), 0) AS duration_ms, + coalesce(JSONExtractString(raw_json, 'headers_raw'), '') AS headers_raw, + coalesce(JSONExtractString(raw_json, 'header_order_signature'), '') AS header_order_signature, toUInt8(coalesce(JSONExtractBool(raw_json, 'ip_meta_df'), 0)) AS ip_meta_df, toUInt16(coalesce(JSONExtractUInt(raw_json, 'ip_meta_id'), 0)) AS ip_meta_id, toUInt16(coalesce(JSONExtractUInt(raw_json, 'ip_meta_total_length'), 0)) AS ip_meta_total_length, @@ -204,6 +229,16 @@ SELECT coalesce(JSONExtractString(raw_json, 'h2_fingerprint'), '') AS h2_fingerprint, coalesce(JSONExtractString(raw_json, 'h2_settings_fp'), '') AS h2_settings_fp, toUInt32(coalesce(JSONExtractUInt(raw_json, 'h2_window_update'), 0)) AS h2_window_update, - coalesce(JSONExtractString(raw_json, 'h2_pseudo_order'), '') AS h2_pseudo_order + coalesce(JSONExtractString(raw_json, 'h2_pseudo_order'), '') AS h2_pseudo_order, + toUInt8(coalesce(JSONExtractUInt(raw_json, 'h2_has_priority'), 0)) AS h2_has_priority, + + -- Paramètres SETTINGS HTTP/2 individuels (-1 = absent du preface client) + toInt32(if(JSONHas(raw_json, 'h2_header_table_size'), JSONExtractInt(raw_json, 'h2_header_table_size'), -1)) AS h2_header_table_size, + toInt32(if(JSONHas(raw_json, 'h2_enable_push'), JSONExtractInt(raw_json, 'h2_enable_push'), -1)) AS h2_enable_push, + toInt32(if(JSONHas(raw_json, 'h2_max_concurrent_streams'), JSONExtractInt(raw_json, 'h2_max_concurrent_streams'), -1)) AS h2_max_concurrent_streams, + toInt64(if(JSONHas(raw_json, 'h2_initial_window_size'), JSONExtractInt(raw_json, 'h2_initial_window_size'), -1)) AS h2_initial_window_size, + toInt32(if(JSONHas(raw_json, 'h2_max_frame_size'), JSONExtractInt(raw_json, 'h2_max_frame_size'), -1)) AS h2_max_frame_size, + toInt32(if(JSONHas(raw_json, 'h2_max_header_list_size'), JSONExtractInt(raw_json, 'h2_max_header_list_size'), -1)) AS h2_max_header_list_size, + toInt32(if(JSONHas(raw_json, 'h2_enable_connect_protocol'), JSONExtractInt(raw_json, 'h2_enable_connect_protocol'), -1)) AS h2_enable_connect_protocol FROM ja4_logs.http_logs_raw; diff --git a/shared/clickhouse/05_aggregation_tables.sql b/shared/clickhouse/05_aggregation_tables.sql index 4334ffc..1aa0629 100644 --- a/shared/clickhouse/05_aggregation_tables.sql +++ b/shared/clickhouse/05_aggregation_tables.sql @@ -144,7 +144,10 @@ CREATE TABLE IF NOT EXISTS ja4_processing.agg_host_ip_ja4_1h ) ENGINE = AggregatingMergeTree() ORDER BY (window_start, src_ip, ja4, host) -SETTINGS deduplicate_merge_projection_mode = 'drop'; +TTL window_start + INTERVAL 7 DAY +SETTINGS + deduplicate_merge_projection_mode = 'drop', + ttl_only_drop_parts = 1; -- ----------------------------------------------------------------------------- @@ -177,7 +180,15 @@ SELECT sum(IF(match(src.path, '(?i)\.(png|jpg|jpeg|gif|css|js|ico|woff2|svg|eot)$'), 1, 0)) AS count_assets, sum(IF(position(src.client_headers, 'Referer') = 0, 1, 0)) AS count_no_referer, uniqState(src.header_user_agent) AS uniq_ua, - 0 AS max_requests_per_sec, -- TODO(P0): calculer via sous-requête par seconde (impossible dans un seul GROUP BY) + toUInt32(if(count() > 0, + arrayMax( + arrayMap( + s -> toUInt64(countEqual(groupArray(toStartOfSecond(src.time)), s)), + arrayDistinct(groupArray(toStartOfSecond(src.time))) + ) + ), + 0 + )) AS max_requests_per_sec, varPopState(toFloat64(length(replaceAll(src.path, '/', '//')) - length(src.path))) AS url_depth_variance, sum(IF(src.ip_meta_total_length < 60 OR src.ip_meta_total_length > 1500, 1, 0)) AS count_anomalous_payload, uniqState(src.ja3) AS uniq_ja3, @@ -224,7 +235,9 @@ CREATE TABLE IF NOT EXISTS ja4_processing.agg_header_fingerprint_1h sec_fetch_dest SimpleAggregateFunction(any, String) ) ENGINE = AggregatingMergeTree() -ORDER BY (window_start, src_ip); +ORDER BY (window_start, src_ip) +TTL window_start + INTERVAL 7 DAY +SETTINGS ttl_only_drop_parts = 1; DROP VIEW IF EXISTS ja4_processing.mv_agg_header_fingerprint_1h; @@ -249,3 +262,36 @@ SELECT any(src.header_sec_fetch_dest) AS sec_fetch_dest FROM ja4_logs.http_logs AS src GROUP BY window_start, src.src_ip; + + +-- ----------------------------------------------------------------------------- +-- unknown_h2_fingerprints — file d'examen pour signatures H2 inconnues (§3.9.5) +-- +-- Sessions dont le fingerprint H2 ne correspond à aucune famille connue +-- (browser_match_max < 0.45) mais qui présentent un comportement navigateur +-- (browser_confidence ≥ 0.55, Sec-Fetch-* présent, TLS 1.3). +-- Utilisée pour enrichir progressivement browser_signatures. +-- ----------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS ja4_processing.unknown_h2_fingerprints +( + observed_at DateTime DEFAULT now(), + src_ip IPv6, + ja4 String CODEC(ZSTD(3)), + h2_fingerprint String CODEC(ZSTD(3)), + h2_settings_fp String CODEC(ZSTD(3)), + h2_window_update UInt32, + h2_pseudo_order LowCardinality(String), + h2_has_priority UInt8, + browser_confidence_score Float32, + header_user_agent String CODEC(ZSTD(3)), + tls_version LowCardinality(String), + hit_count UInt64 DEFAULT 1, + + INDEX idx_observed_at observed_at TYPE minmax GRANULARITY 4 +) +ENGINE = ReplacingMergeTree(observed_at) +ORDER BY (h2_fingerprint, ja4, src_ip) +TTL observed_at + INTERVAL 30 DAY +SETTINGS + index_granularity = 8192, + ttl_only_drop_parts = 1; diff --git a/shared/clickhouse/06_ml_tables.sql b/shared/clickhouse/06_ml_tables.sql index 6df5cdb..3e1d77d 100644 --- a/shared/clickhouse/06_ml_tables.sql +++ b/shared/clickhouse/06_ml_tables.sql @@ -73,7 +73,7 @@ SETTINGS -- ----------------------------------------------------------------------------- -- ml_all_scores — all classifications (no threshold, for observability) -- --- PARTITION BY date : TTL de 3 jours → les partitions expirées sont supprimées +-- PARTITION BY date : TTL de 7 jours → les partitions expirées sont supprimées -- entièrement sans avoir à lire chaque granule (ttl_only_drop_parts). -- INDEX idx_detected_at : idem ml_detected_anomalies. -- ----------------------------------------------------------------------------- @@ -115,7 +115,7 @@ CREATE TABLE IF NOT EXISTS ja4_processing.ml_all_scores ENGINE = ReplacingMergeTree(detected_at) PARTITION BY toYYYYMMDD(window_start) ORDER BY (window_start, src_ip, ja4, host, model_name) -TTL window_start + INTERVAL 3 DAY +TTL window_start + INTERVAL 7 DAY SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; diff --git a/tests/integration/platform/httpd-integration.conf b/tests/integration/platform/httpd-integration.conf index cbaf910..dd7a833 100644 --- a/tests/integration/platform/httpd-integration.conf +++ b/tests/integration/platform/httpd-integration.conf @@ -3,6 +3,9 @@ # Load mod-reqin-log LoadModule reqin_log_module modules/mod_reqin_log.so +# Enable HTTP/2 negotiation (mod_http2 loaded by default on Rocky 9) +Protocols h2 http/1.1 + # mod_remoteip: trust X-Forwarded-For from Docker internal subnets. # mod_reqin_log reads r->useragent_ip which mod_remoteip updates, # so the XFF IP appears as src_ip in the correlated logs.