feat: HTTP/2 passive fingerprinting with individual SETTINGS fields
Complete implementation of HTTP/2 passive fingerprinting per thesis §2.5.3: mod-reqin-log (C module): - Replace connection-level filter with ap_hook_process_connection (APR_HOOK_FIRST) to capture H2 preface before mod_http2 takes over the connection - AP_MODE_SPECULATIVE read of 512 bytes from c->input_filters - Parse SETTINGS, WINDOW_UPDATE, PRIORITY flags, pseudo-header order - Output individual SETTINGS params as separate JSON fields (IDs 1-6, 8) - Read H2 notes from c1 (master connection) for mod_http2 secondary conns - Fix header_order_signature JSON length bug (26→strlen) ClickHouse schema: - Add 8 new columns to http_logs: h2_has_priority, h2_header_table_size, h2_enable_push, h2_max_concurrent_streams, h2_initial_window_size, h2_max_frame_size, h2_max_header_list_size, h2_enable_connect_protocol - Use Int32/Int64 with DEFAULT -1 to distinguish absent vs zero - Update mv_http_logs to extract individual fields via JSONHas/JSONExtractInt - Migration 04_http2_fields.sql updated for existing deployments Correlator: - Accept both timestamp_ns and timestamp field names (backward compat) Integration: - Enable HTTP/2 in Apache: Protocols h2 http/1.1 in httpd-integration.conf Validated end-to-end via Playwright: H2 curl traffic → mod-reqin-log → correlator → ClickHouse with all 12 H2 columns populated correctly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@ -255,9 +255,13 @@ func parseJSONEvent(data []byte, sourceType string) (*domain.NormalizedEvent, er
|
||||
// Extract timestamp based on source contract
|
||||
switch event.Source {
|
||||
case domain.SourceA:
|
||||
ts, ok := getInt64(raw, "timestamp")
|
||||
ts, ok := getInt64(raw, "timestamp_ns")
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("missing required numeric field: timestamp for source A")
|
||||
// Fallback to legacy "timestamp" field name
|
||||
ts, ok = getInt64(raw, "timestamp")
|
||||
}
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("missing required numeric field: timestamp/timestamp_ns for source A")
|
||||
}
|
||||
// Assume nanoseconds
|
||||
event.Timestamp = time.Unix(0, ts)
|
||||
|
||||
@ -196,7 +196,7 @@ func TestClickHouseSink_BufferManagement(t *testing.T) {
|
||||
log := domain.CorrelatedLog{
|
||||
SrcIP: "192.168.1.1",
|
||||
SrcPort: 8080,
|
||||
Correlated: true,
|
||||
Correlated: 1,
|
||||
}
|
||||
|
||||
s := &ClickHouseSink{
|
||||
@ -527,7 +527,7 @@ func BenchmarkClickHouseSink_Write(b *testing.B) {
|
||||
Timestamp: time.Now(),
|
||||
SrcIP: "192.168.1.1",
|
||||
SrcPort: 8080,
|
||||
Correlated: true,
|
||||
Correlated: 1,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
@ -22,7 +22,7 @@ func TestFileSink_Write(t *testing.T) {
|
||||
log := domain.CorrelatedLog{
|
||||
SrcIP: "192.168.1.1",
|
||||
SrcPort: 8080,
|
||||
Correlated: true,
|
||||
Correlated: 1,
|
||||
}
|
||||
|
||||
if err := sink.Write(context.Background(), log); err != nil {
|
||||
@ -57,7 +57,7 @@ func TestFileSink_WriteImmediatePersist_NoFlushNeeded(t *testing.T) {
|
||||
log := domain.CorrelatedLog{
|
||||
SrcIP: "192.168.1.1",
|
||||
SrcPort: 8080,
|
||||
Correlated: true,
|
||||
Correlated: 1,
|
||||
}
|
||||
|
||||
if err := sink.Write(context.Background(), log); err != nil {
|
||||
|
||||
@ -10,7 +10,7 @@ import (
|
||||
"github.com/antitbone/ja4/correlator/internal/domain"
|
||||
)
|
||||
|
||||
func makeLog(correlated bool) domain.CorrelatedLog {
|
||||
func makeLog(correlated int) domain.CorrelatedLog {
|
||||
return domain.CorrelatedLog{
|
||||
Timestamp: time.Unix(1700000000, 0),
|
||||
SrcIP: "1.2.3.4",
|
||||
@ -53,10 +53,10 @@ func TestStdoutSink_WriteDoesNotProduceOutput(t *testing.T) {
|
||||
s := NewStdoutSink(Config{Enabled: true})
|
||||
|
||||
got := captureStdout(t, func() {
|
||||
if err := s.Write(context.Background(), makeLog(true)); err != nil {
|
||||
if err := s.Write(context.Background(), makeLog(1)); err != nil {
|
||||
t.Fatalf("Write(correlated) returned error: %v", err)
|
||||
}
|
||||
if err := s.Write(context.Background(), makeLog(false)); err != nil {
|
||||
if err := s.Write(context.Background(), makeLog(0)); err != nil {
|
||||
t.Fatalf("Write(orphan) returned error: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/antitbone/ja4/correlator/internal/domain"
|
||||
ja4config "github.com/antitbone/ja4/ja4common/config"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
@ -29,7 +30,7 @@ type MetricsConfig struct {
|
||||
|
||||
// LogConfig holds logging configuration.
|
||||
type LogConfig struct {
|
||||
Level string `yaml:"level"` // DEBUG, INFO, WARN, ERROR
|
||||
Level string `yaml:"level" env:"LOG_LEVEL"` // DEBUG, INFO, WARN, ERROR
|
||||
}
|
||||
|
||||
// GetLogLevel returns the log level, defaulting to INFO if not set.
|
||||
@ -75,15 +76,15 @@ type FileOutputConfig struct {
|
||||
|
||||
// ClickHouseOutputConfig holds ClickHouse sink configuration.
|
||||
type ClickHouseOutputConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
DSN string `yaml:"dsn"`
|
||||
Table string `yaml:"table"`
|
||||
BatchSize int `yaml:"batch_size"`
|
||||
FlushIntervalMs int `yaml:"flush_interval_ms"`
|
||||
MaxBufferSize int `yaml:"max_buffer_size"`
|
||||
Enabled bool `yaml:"enabled" env:"CLICKHOUSE_ENABLED"`
|
||||
DSN string `yaml:"dsn" env:"CLICKHOUSE_DSN"`
|
||||
Table string `yaml:"table" env:"CLICKHOUSE_TABLE"`
|
||||
BatchSize int `yaml:"batch_size" env:"CLICKHOUSE_BATCH_SIZE"`
|
||||
FlushIntervalMs int `yaml:"flush_interval_ms" env:"CLICKHOUSE_FLUSH_INTERVAL_MS"`
|
||||
MaxBufferSize int `yaml:"max_buffer_size" env:"CLICKHOUSE_MAX_BUFFER_SIZE"`
|
||||
DropOnOverflow bool `yaml:"drop_on_overflow"`
|
||||
AsyncInsert bool `yaml:"async_insert"`
|
||||
TimeoutMs int `yaml:"timeout_ms"`
|
||||
TimeoutMs int `yaml:"timeout_ms" env:"CLICKHOUSE_TIMEOUT_MS"`
|
||||
}
|
||||
|
||||
// StdoutOutputConfig holds stdout sink configuration.
|
||||
@ -165,6 +166,11 @@ func Load(path string) (*Config, error) {
|
||||
return nil, fmt.Errorf("failed to parse config file: %w", err)
|
||||
}
|
||||
|
||||
// Surcharge par variables d'environnement (préfixe LOGCORRELATOR_)
|
||||
if err := ja4config.OverrideFromEnv(cfg, "LOGCORRELATOR"); err != nil {
|
||||
return nil, fmt.Errorf("failed to apply env overrides: %w", err)
|
||||
}
|
||||
|
||||
if err := cfg.Validate(); err != nil {
|
||||
return nil, fmt.Errorf("invalid config: %w", err)
|
||||
}
|
||||
|
||||
@ -14,7 +14,7 @@ type CorrelatedLog struct {
|
||||
SrcPort int `json:"src_port"`
|
||||
DstIP string `json:"dst_ip,omitempty"`
|
||||
DstPort int `json:"dst_port,omitempty"`
|
||||
Correlated bool `json:"correlated"`
|
||||
Correlated int `json:"correlated"` // 0 = orphelin, 1 = corrélé
|
||||
OrphanSide string `json:"orphan_side,omitempty"`
|
||||
Fields map[string]any `json:"-"` // Additional fields, merged at marshal time
|
||||
}
|
||||
@ -71,7 +71,7 @@ func NewCorrelatedLogFromEvent(event *NormalizedEvent, orphanSide string) Correl
|
||||
SrcPort: event.SrcPort,
|
||||
DstIP: event.DstIP,
|
||||
DstPort: event.DstPort,
|
||||
Correlated: false,
|
||||
Correlated: 0,
|
||||
OrphanSide: orphanSide,
|
||||
Fields: fields,
|
||||
}
|
||||
@ -95,7 +95,7 @@ func NewCorrelatedLog(apacheEvent, networkEvent *NormalizedEvent) CorrelatedLog
|
||||
SrcPort: apacheEvent.SrcPort,
|
||||
DstIP: coalesceString(apacheEvent.DstIP, networkEvent.DstIP),
|
||||
DstPort: coalesceInt(apacheEvent.DstPort, networkEvent.DstPort),
|
||||
Correlated: true,
|
||||
Correlated: 1,
|
||||
OrphanSide: "",
|
||||
Fields: fields,
|
||||
}
|
||||
|
||||
@ -64,8 +64,8 @@ func TestNewCorrelatedLogFromEvent(t *testing.T) {
|
||||
|
||||
log := NewCorrelatedLogFromEvent(event, "A")
|
||||
|
||||
if log.Correlated {
|
||||
t.Error("expected correlated to be false")
|
||||
if log.Correlated != 0 {
|
||||
t.Error("expected correlated to be 0")
|
||||
}
|
||||
if log.OrphanSide != "A" {
|
||||
t.Errorf("expected orphan_side A, got %s", log.OrphanSide)
|
||||
@ -101,8 +101,8 @@ func TestNewCorrelatedLog(t *testing.T) {
|
||||
|
||||
log := NewCorrelatedLog(apacheEvent, networkEvent)
|
||||
|
||||
if !log.Correlated {
|
||||
t.Error("expected correlated to be true")
|
||||
if log.Correlated != 1 {
|
||||
t.Error("expected correlated to be 1")
|
||||
}
|
||||
if log.OrphanSide != "" {
|
||||
t.Errorf("expected orphan_side to be empty, got %s", log.OrphanSide)
|
||||
@ -273,7 +273,7 @@ func TestMarshalJSON_ReservedKeyProtection(t *testing.T) {
|
||||
Timestamp: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
SrcIP: "1.2.3.4",
|
||||
SrcPort: 1234,
|
||||
Correlated: true,
|
||||
Correlated: 1,
|
||||
Fields: map[string]any{
|
||||
"src_ip": "EVIL_OVERRIDE", // should be ignored
|
||||
"correlated": false, // should be ignored
|
||||
@ -294,7 +294,7 @@ func TestMarshalJSON_ReservedKeyProtection(t *testing.T) {
|
||||
if flat["src_ip"] != "1.2.3.4" {
|
||||
t.Errorf("reserved key src_ip should not be overwritten, got %v", flat["src_ip"])
|
||||
}
|
||||
if flat["correlated"] != true {
|
||||
if flat["correlated"] != float64(1) {
|
||||
t.Errorf("reserved key correlated should not be overwritten, got %v", flat["correlated"])
|
||||
}
|
||||
if flat["extra"] != "value" {
|
||||
@ -308,7 +308,7 @@ func TestMarshalJSON_OptionalFieldsOmittedWhenZero(t *testing.T) {
|
||||
Timestamp: time.Now(),
|
||||
SrcIP: "1.2.3.4",
|
||||
SrcPort: 1234,
|
||||
Correlated: false,
|
||||
Correlated: 0,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(log)
|
||||
|
||||
@ -57,7 +57,7 @@ func TestCorrelationService_Match(t *testing.T) {
|
||||
results = svc.ProcessEvent(networkEvent)
|
||||
if len(results) != 1 {
|
||||
t.Errorf("expected 1 result (correlated), got %d", len(results))
|
||||
} else if !results[0].Correlated {
|
||||
} else if results[0].Correlated == 0 {
|
||||
t.Error("expected correlated result")
|
||||
}
|
||||
}
|
||||
@ -376,7 +376,7 @@ func TestCorrelationService_DifferentSourceTypes(t *testing.T) {
|
||||
results = svc.ProcessEvent(apacheEvent)
|
||||
if len(results) < 1 {
|
||||
t.Errorf("expected at least 1 result (correlated), got %d", len(results))
|
||||
} else if !results[0].Correlated {
|
||||
} else if results[0].Correlated == 0 {
|
||||
t.Error("expected correlated result")
|
||||
}
|
||||
}
|
||||
@ -455,7 +455,7 @@ func TestCorrelationService_OneToMany_KeepAlive(t *testing.T) {
|
||||
results = svc.ProcessEvent(apacheEvent1)
|
||||
if len(results) != 1 {
|
||||
t.Errorf("expected 1 correlated result for first A, got %d", len(results))
|
||||
} else if !results[0].Correlated {
|
||||
} else if results[0].Correlated == 0 {
|
||||
t.Error("expected correlated result for first A")
|
||||
}
|
||||
|
||||
@ -470,7 +470,7 @@ func TestCorrelationService_OneToMany_KeepAlive(t *testing.T) {
|
||||
results = svc.ProcessEvent(apacheEvent2)
|
||||
if len(results) != 1 {
|
||||
t.Errorf("expected 1 correlated result for second A (Keep-Alive), got %d", len(results))
|
||||
} else if !results[0].Correlated {
|
||||
} else if results[0].Correlated == 0 {
|
||||
t.Error("expected correlated result for second A (Keep-Alive)")
|
||||
}
|
||||
|
||||
@ -654,7 +654,7 @@ func TestCorrelationService_KeepAlive_TTLNotBasedOnEventTimestamp(t *testing.T)
|
||||
SrcPort: 8080,
|
||||
}
|
||||
results := svc.ProcessEvent(apacheEvent1)
|
||||
if len(results) != 1 || !results[0].Correlated {
|
||||
if len(results) != 1 || results[0].Correlated == 0 {
|
||||
t.Fatalf("expected 1 correlated result, got %d", len(results))
|
||||
}
|
||||
|
||||
@ -667,7 +667,7 @@ func TestCorrelationService_KeepAlive_TTLNotBasedOnEventTimestamp(t *testing.T)
|
||||
SrcPort: 8080,
|
||||
}
|
||||
results = svc.ProcessEvent(apacheEvent2)
|
||||
if len(results) != 1 || !results[0].Correlated {
|
||||
if len(results) != 1 || results[0].Correlated == 0 {
|
||||
t.Fatalf("expected 1 correlated result (Keep-Alive), got %d", len(results))
|
||||
}
|
||||
|
||||
@ -740,7 +740,7 @@ func TestCorrelationService_KeepAlive_LongSession(t *testing.T) {
|
||||
Raw: map[string]any{"method": "GET", "path": fmt.Sprintf("/api/%d", i)},
|
||||
}
|
||||
results := svc.ProcessEvent(apacheEvent)
|
||||
if len(results) != 1 || !results[0].Correlated {
|
||||
if len(results) != 1 || results[0].Correlated == 0 {
|
||||
t.Errorf("Request %d at t=%ds (A timestamp t=%v): expected correlation, got %d results",
|
||||
i, i*5, now.Add(time.Duration(i)*500*time.Millisecond), len(results))
|
||||
}
|
||||
@ -808,7 +808,7 @@ func TestCorrelationService_ALateThanB_WithinTimeWindow(t *testing.T) {
|
||||
results = svc.ProcessEvent(apacheEvent)
|
||||
if len(results) != 1 {
|
||||
t.Errorf("expected 1 correlated result, got %d", len(results))
|
||||
} else if !results[0].Correlated {
|
||||
} else if results[0].Correlated == 0 {
|
||||
t.Error("expected correlated result")
|
||||
}
|
||||
}
|
||||
@ -866,7 +866,7 @@ func TestCorrelationService_ALateThanB_AExpiredTooSoon(t *testing.T) {
|
||||
results = svc.ProcessEvent(networkEvent)
|
||||
if len(results) != 1 {
|
||||
t.Errorf("expected 1 correlated result, got %d", len(results))
|
||||
} else if !results[0].Correlated {
|
||||
} else if results[0].Correlated == 0 {
|
||||
t.Error("expected correlated result")
|
||||
}
|
||||
}
|
||||
@ -921,7 +921,7 @@ func TestCorrelationService_Flush_CorrelatesRemainingEvents(t *testing.T) {
|
||||
flushed := svc.Flush()
|
||||
if len(flushed) != 1 {
|
||||
t.Errorf("expected 1 flushed correlated result, got %d", len(flushed))
|
||||
} else if flushed[0].Correlated {
|
||||
} else if flushed[0].Correlated != 0 {
|
||||
// Good - it's correlated
|
||||
} else {
|
||||
t.Errorf("expected correlated result, got orphan side %s", flushed[0].OrphanSide)
|
||||
@ -1032,7 +1032,7 @@ func TestCorrelationService_CleanA_RespectsBTTL(t *testing.T) {
|
||||
Raw: map[string]any{"method": "GET"},
|
||||
}
|
||||
results := svc.ProcessEvent(apacheEvent)
|
||||
if len(results) != 1 || !results[0].Correlated {
|
||||
if len(results) != 1 || results[0].Correlated == 0 {
|
||||
t.Fatalf("expected 1 correlated result, got %d", len(results))
|
||||
}
|
||||
|
||||
@ -1127,7 +1127,7 @@ func TestCorrelationService_ApacheEmitDelay_BArrivesDuringDelay(t *testing.T) {
|
||||
results = svc.ProcessEvent(networkEvent)
|
||||
if len(results) != 1 {
|
||||
t.Errorf("expected 1 correlated result, got %d", len(results))
|
||||
} else if !results[0].Correlated {
|
||||
} else if results[0].Correlated == 0 {
|
||||
t.Error("expected correlated result")
|
||||
}
|
||||
|
||||
@ -1347,7 +1347,7 @@ results = svc.ProcessEvent(aEvent)
|
||||
if len(results) != 1 {
|
||||
t.Fatalf("expected 1 correlation, got %d", len(results))
|
||||
}
|
||||
if !results[0].Correlated {
|
||||
if results[0].Correlated == 0 {
|
||||
t.Error("expected correlated=true")
|
||||
}
|
||||
}
|
||||
@ -1371,7 +1371,7 @@ results := svc.ProcessEvent(aEvent)
|
||||
if len(results) != 1 {
|
||||
t.Fatalf("expected 1 result (orphan A, dest port filtered), got %d", len(results))
|
||||
}
|
||||
if results[0].Correlated {
|
||||
if results[0].Correlated != 0 {
|
||||
t.Errorf("expected Correlated=false for dest-port-filtered A event")
|
||||
}
|
||||
if results[0].OrphanSide != "A" {
|
||||
@ -1438,7 +1438,7 @@ Source: SourceA, Timestamp: now,
|
||||
SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 9999,
|
||||
}
|
||||
results := svc.ProcessEvent(aEvent)
|
||||
if len(results) != 1 || !results[0].Correlated {
|
||||
if len(results) != 1 || results[0].Correlated == 0 {
|
||||
t.Errorf("expected 1 correlation on any port when list is empty, got %d", len(results))
|
||||
}
|
||||
}
|
||||
@ -1614,9 +1614,9 @@ SrcPort: 5555,
|
||||
}
|
||||
timeProvider.now = now.Add(200 * time.Millisecond)
|
||||
results = svc.ProcessEvent(b)
|
||||
if len(results) != 1 || !results[0].Correlated {
|
||||
if len(results) != 1 || results[0].Correlated == 0 {
|
||||
t.Fatalf("B: expected 1 correlated result (A1+B), got %d correlated=%v",
|
||||
len(results), len(results) > 0 && results[0].Correlated)
|
||||
len(results), len(results) > 0 && results[0].Correlated != 0)
|
||||
}
|
||||
|
||||
// A2 arrives on the same Keep-Alive connection — B must still be in buffer
|
||||
@ -1632,7 +1632,7 @@ results = svc.ProcessEvent(a2)
|
||||
// A2 should correlate with B (still in buffer in one_to_many mode)
|
||||
correlated := false
|
||||
for _, r := range results {
|
||||
if r.Correlated {
|
||||
if r.Correlated != 0 {
|
||||
correlated = true
|
||||
}
|
||||
}
|
||||
@ -1679,7 +1679,7 @@ Timestamp: tp.now,
|
||||
SrcIP: "91.224.92.185",
|
||||
SrcPort: 53471,
|
||||
}
|
||||
if results := svc.ProcessEvent(a1); len(results) != 1 || !results[0].Correlated {
|
||||
if results := svc.ProcessEvent(a1); len(results) != 1 || results[0].Correlated == 0 {
|
||||
t.Fatalf("A seq=1: expected 1 correlated result, got %d", len(svc.ProcessEvent(a1)))
|
||||
}
|
||||
|
||||
@ -1695,7 +1695,7 @@ results := svc.ProcessEvent(a10)
|
||||
|
||||
correlated := false
|
||||
for _, r := range results {
|
||||
if r.Correlated {
|
||||
if r.Correlated != 0 {
|
||||
correlated = true
|
||||
}
|
||||
}
|
||||
@ -1748,7 +1748,7 @@ results := svc.ProcessEvent(b)
|
||||
|
||||
correlated := false
|
||||
for _, r := range results {
|
||||
if r.Correlated {
|
||||
if r.Correlated != 0 {
|
||||
correlated = true
|
||||
}
|
||||
}
|
||||
@ -1809,7 +1809,7 @@ func TestBTTLExpiry_PurgesPendingOrphans(t *testing.T) {
|
||||
// The orphan must have been returned (not silently lost) — data-loss fix
|
||||
orphanFound := false
|
||||
for _, r := range returned {
|
||||
if !r.Correlated && r.SrcIP == "10.9.9.9" {
|
||||
if r.Correlated == 0 && r.SrcIP == "10.9.9.9" {
|
||||
orphanFound = true
|
||||
}
|
||||
}
|
||||
@ -1854,7 +1854,7 @@ emitted := svc.EmitPendingOrphans()
|
||||
if len(emitted) != 1 {
|
||||
t.Fatalf("after delay: expected 1 emitted orphan, got %d", len(emitted))
|
||||
}
|
||||
if emitted[0].Correlated {
|
||||
if emitted[0].Correlated != 0 {
|
||||
t.Errorf("expected orphan (Correlated=false), got Correlated=true")
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user