diff --git a/Makefile b/Makefile index 004f445..d26b273 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ BINARY_NAME=logcorrelator DIST_DIR=dist # Package version -PKG_VERSION ?= 1.1.6 +PKG_VERSION ?= 1.1.7 ## build: Build the logcorrelator binary locally build: diff --git a/architecture.yml b/architecture.yml index f36952b..d41d28c 100644 --- a/architecture.yml +++ b/architecture.yml @@ -175,8 +175,9 @@ config: correlation: # Fenêtre de corrélation : si le log HTTP arrive avant le réseau, il attend # au plus cette durée (sauf éviction du cache HTTP). + # Augmentée à 10s pour supporter le Keep-Alive HTTP. time_window: - value: 1 + value: 10 unit: s orphan_policy: @@ -192,9 +193,10 @@ config: max_network_items: 20000 ttl: - # Durée de vie standard d’un log réseau (B) en mémoire. Chaque corrélation + # Durée de vie standard d'un log réseau (B) en mémoire. Chaque corrélation # réussie avec un A réinitialise ce TTL. - network_ttl_s: 30 + # Augmenté à 120s pour supporter les sessions HTTP Keep-Alive longues. + network_ttl_s: 120 inputs: description: > @@ -267,16 +269,16 @@ outputs: correlation: description: > Corrélation stricte basée sur src_ip + src_port et une fenêtre temporelle - configurable. Aucun autre champ n’est utilisé pour la décision de corrélation. + configurable. Aucun autre champ n'est utilisé pour la décision de corrélation. key: - src_ip - src_port time_window: - value: 1 + value: 10 unit: s description: > - Fenêtre de temps appliquée aux timestamps de A et B. Si B n’arrive pas dans - ce délai, A est émis comme orphelin. + Fenêtre de temps appliquée aux timestamps de A et B. Si B n'arrive pas dans + ce délai, A est émis comme orphelin. Augmentée à 10s pour le Keep-Alive. retention_limits: max_http_items: 10000 max_network_items: 20000 @@ -285,10 +287,10 @@ correlation: évincé et émis orphelin. Si max_network_items est atteint, le plus ancien B est supprimé silencieusement. ttl_management: - network_ttl_s: 30 + network_ttl_s: 120 description: > - TTL des logs réseau. Chaque fois qu’un B est corrélé à un A (Keep‑Alive), - son TTL est remis à cette valeur. + TTL des logs réseau. Chaque fois qu'un B est corrélé à un A (Keep-Alive), + son TTL est remis à cette valeur. Augmenté à 120s pour les sessions longues. timestamp_source: apache: field_timestamp network: reception_time diff --git a/config.example.yml b/config.example.yml index 4d1ff72..7dfb002 100644 --- a/config.example.yml +++ b/config.example.yml @@ -40,25 +40,27 @@ outputs: correlation: # Time window for correlation (A and B must be within this window) + # Increased to 10s to support HTTP Keep-Alive scenarios time_window: - value: 1 + value: 10 unit: s - + # Orphan policy: what to do when no match is found orphan_policy: apache_always_emit: true # Always emit A events, even without B match network_emit: false # Never emit B events alone - + # Matching mode: one_to_one or one_to_many (Keep-Alive) matching: mode: one_to_many - + # Buffer limits (max events in memory) buffers: max_http_items: 10000 max_network_items: 20000 - - # TTL for network events (source B) - ttl: - network_ttl_s: 30 + + # TTL for network events (source B) + # Increased to 120s to support long-lived HTTP Keep-Alive sessions + ttl: + network_ttl_s: 120 diff --git a/internal/adapters/inbound/unixsocket/source.go b/internal/adapters/inbound/unixsocket/source.go index 4062b09..522b40c 100644 --- a/internal/adapters/inbound/unixsocket/source.go +++ b/internal/adapters/inbound/unixsocket/source.go @@ -259,8 +259,23 @@ func parseJSONEvent(data []byte, sourceType string) (*domain.NormalizedEvent, er // Assume nanoseconds event.Timestamp = time.Unix(0, ts) case domain.SourceB: - // For network source, always use local reception time - event.Timestamp = time.Now() + // For network source, try to use event timestamp if available, + // fallback to reception time. This improves correlation accuracy + // when network logs include their own timestamp (e.g., from packet capture). + if ts, ok := getInt64(raw, "timestamp"); ok { + event.Timestamp = time.Unix(0, ts) + } else if timeStr, ok := getString(raw, "time"); ok { + // Try RFC3339 format + if t, err := time.Parse(time.RFC3339, timeStr); err == nil { + event.Timestamp = t + } else if t, err := time.Parse(time.RFC3339Nano, timeStr); err == nil { + event.Timestamp = t + } else { + event.Timestamp = time.Now() + } + } else { + event.Timestamp = time.Now() + } default: return nil, fmt.Errorf("unsupported source type: %s", event.Source) } diff --git a/internal/adapters/inbound/unixsocket/source_test.go b/internal/adapters/inbound/unixsocket/source_test.go index f83e745..7d1a119 100644 --- a/internal/adapters/inbound/unixsocket/source_test.go +++ b/internal/adapters/inbound/unixsocket/source_test.go @@ -62,9 +62,7 @@ func TestParseJSONEvent_Network(t *testing.T) { "tcp_meta_flags": "SYN" }`) - before := time.Now() event, err := parseJSONEvent(data, "B") - after := time.Now() if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -78,8 +76,10 @@ func TestParseJSONEvent_Network(t *testing.T) { if event.Source != domain.SourceB { t.Errorf("expected source B, got %s", event.Source) } - if event.Timestamp.Before(before.Add(-2*time.Second)) || event.Timestamp.After(after.Add(2*time.Second)) { - t.Errorf("expected network timestamp near now, got %v", event.Timestamp) + // Network source now uses payload timestamp if available + expectedTs := time.Unix(0, 1704110400000000000) + if !event.Timestamp.Equal(expectedTs) { + t.Errorf("expected network timestamp %v, got %v", expectedTs, event.Timestamp) } } @@ -114,11 +114,47 @@ func TestParseJSONEvent_SourceARequiresNumericTimestamp(t *testing.T) { } } -func TestParseJSONEvent_SourceBIgnoresPayloadTimestamp(t *testing.T) { +func TestParseJSONEvent_SourceBUsesPayloadTimestamp(t *testing.T) { + expectedTs := int64(1704110400000000000) data := []byte(`{ "src_ip": "192.168.1.1", "src_port": 8080, - "timestamp": 1 + "timestamp": 1704110400000000000 + }`) + + event, err := parseJSONEvent(data, "B") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expectedTime := time.Unix(0, expectedTs) + if !event.Timestamp.Equal(expectedTime) { + t.Errorf("expected source B to use payload timestamp %v, got %v", expectedTime, event.Timestamp) + } +} + +func TestParseJSONEvent_SourceBUsesTimeField(t *testing.T) { + data := []byte(`{ + "src_ip": "192.168.1.1", + "src_port": 8080, + "time": "2024-01-01T12:00:00Z" + }`) + + event, err := parseJSONEvent(data, "B") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expectedTime := time.Unix(0, 1704110400000000000) + if !event.Timestamp.Equal(expectedTime) { + t.Errorf("expected source B to use time field %v, got %v", expectedTime, event.Timestamp) + } +} + +func TestParseJSONEvent_SourceBFallbackToNow(t *testing.T) { + data := []byte(`{ + "src_ip": "192.168.1.1", + "src_port": 8080 }`) before := time.Now() diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 8e8f365..3fbf0dc 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -719,8 +719,8 @@ func TestCorrelationConfig_GetNetworkTTLS_Default(t *testing.T) { } result := cfg.GetNetworkTTLS() - if result != 30 { - t.Errorf("expected default 30, got %d", result) + if result != 120 { + t.Errorf("expected default 120, got %d", result) } } diff --git a/internal/domain/correlation_service.go b/internal/domain/correlation_service.go index a5f0d16..67bd43c 100644 --- a/internal/domain/correlation_service.go +++ b/internal/domain/correlation_service.go @@ -14,9 +14,14 @@ const ( // DefaultMaxNetworkBufferSize is the default maximum number of network events (source B) DefaultMaxNetworkBufferSize = 20000 // DefaultTimeWindow is used when no valid time window is provided - DefaultTimeWindow = time.Second + // Increased to 10s to support HTTP Keep-Alive scenarios where multiple + // HTTP requests arrive within a short time window on the same connection. + DefaultTimeWindow = 10 * time.Second // DefaultNetworkTTLS is the default TTL for network events in seconds - DefaultNetworkTTLS = 30 + // Increased to 120s to support long-lived HTTP Keep-Alive sessions. + // The TTL is reset on each correlation (Keep-Alive mode), so the network + // event stays in buffer as long as the connection is active. + DefaultNetworkTTLS = 120 // MatchingModeOneToOne indicates single correlation (consume B after match) MatchingModeOneToOne = "one_to_one" // MatchingModeOneToMany indicates Keep-Alive mode (B can match multiple A) @@ -262,48 +267,54 @@ func (s *CorrelationService) addEvent(event *NormalizedEvent) { func (s *CorrelationService) cleanExpired() { now := s.timeProvider.Now() - + // Clean expired A events (based on time window) aCutoff := now.Add(-s.config.TimeWindow) - s.cleanBuffer(s.bufferA, s.pendingA, aCutoff, nil) - - // Clean expired B events (based on TTL) - bCutoff := now.Add(-time.Duration(s.config.NetworkTTLS) * time.Second) - s.cleanBuffer(s.bufferB, s.pendingB, bCutoff, s.networkTTLs) + s.cleanBuffer(s.bufferA, s.pendingA, aCutoff) + + // Clean expired B events - use TTL map only (not event timestamp) + // This is critical for Keep-Alive: TTL is reset on each correlation, + // so we must check the reset TTL, not the original event timestamp. + s.cleanNetworkBufferByTTL() } -// cleanBuffer removes expired events from a buffer. -func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time, networkTTLs map[*list.Element]time.Time) { +// cleanBuffer removes expired events from buffer A (based on event timestamp). +func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time) { for elem := buffer.events.Front(); elem != nil; { next := elem.Next() event := elem.Value.(*NormalizedEvent) - // Check if event is expired - isExpired := event.Timestamp.Before(cutoff) - - // For B events, also check TTL - if !isExpired && networkTTLs != nil { - if ttl, exists := networkTTLs[elem]; exists { - isExpired = s.timeProvider.Now().After(ttl) - } - } - - if isExpired { + if event.Timestamp.Before(cutoff) { key := event.CorrelationKey() buffer.events.Remove(elem) pending[key] = removeElementFromSlice(pending[key], elem) if len(pending[key]) == 0 { delete(pending, key) } - // Remove from TTL map - if networkTTLs != nil { - delete(networkTTLs, elem) - } } elem = next } } +// cleanNetworkBufferByTTL removes expired B events based on their reset TTL. +// For Keep-Alive support, we check the TTL map (which is reset on each correlation) +// rather than the original event timestamp. +func (s *CorrelationService) cleanNetworkBufferByTTL() { + now := s.timeProvider.Now() + for elem, ttl := range s.networkTTLs { + if now.After(ttl) { + event := elem.Value.(*NormalizedEvent) + key := event.CorrelationKey() + s.bufferB.events.Remove(elem) + s.pendingB[key] = removeElementFromSlice(s.pendingB[key], elem) + if len(s.pendingB[key]) == 0 { + delete(s.pendingB, key) + } + delete(s.networkTTLs, elem) + } + } +} + func (s *CorrelationService) findAndPopFirstMatch( buffer *eventBuffer, pending map[string][]*list.Element, diff --git a/internal/domain/correlation_service_test.go b/internal/domain/correlation_service_test.go index 029f851..b93bb69 100644 --- a/internal/domain/correlation_service_test.go +++ b/internal/domain/correlation_service_test.go @@ -1,6 +1,7 @@ package domain import ( + "fmt" "testing" "time" ) @@ -613,3 +614,152 @@ func TestCorrelationService_NetworkTTL_ResetOnMatch(t *testing.T) { t.Errorf("expected B expired after reset TTL, got %d", b) } } + +// TestCorrelationService_KeepAlive_TTLNotBasedOnEventTimestamp tests the critical bug fix: +// B events must NOT be evicted based on their original timestamp when TTL is reset. +// This is essential for HTTP Keep-Alive where multiple A events correlate with the same B. +func TestCorrelationService_KeepAlive_TTLNotBasedOnEventTimestamp(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: 5 * time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + MatchingMode: MatchingModeOneToMany, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: 10, // 10 seconds TTL + } + + svc := NewCorrelationService(config, timeProvider) + + // T0: B event arrives (network log with old timestamp) + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, // t=0 + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"ja3": "abc123"}, + } + svc.ProcessEvent(networkEvent) + + // T0+1s: First A event - correlates with B, TTL reset to t=11s + timeProvider.now = now.Add(1 * time.Second) + apacheEvent1 := &NormalizedEvent{ + Source: SourceA, + Timestamp: timeProvider.now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + results := svc.ProcessEvent(apacheEvent1) + if len(results) != 1 || !results[0].Correlated { + t.Fatalf("expected 1 correlated result, got %d", len(results)) + } + + // T0+2s: Second A event - should also correlate (Keep-Alive), TTL reset to t=12s + timeProvider.now = now.Add(2 * time.Second) + apacheEvent2 := &NormalizedEvent{ + Source: SourceA, + Timestamp: timeProvider.now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + results = svc.ProcessEvent(apacheEvent2) + if len(results) != 1 || !results[0].Correlated { + t.Fatalf("expected 1 correlated result (Keep-Alive), got %d", len(results)) + } + + // T0+11s: B should still be alive (TTL was reset to t=12s) + // The bug would evict B here because B.timestamp (t=0) < cutoff (t=11s-10s=t=1s) + timeProvider.now = now.Add(11 * time.Second) + svc.cleanExpired() + _, b := svc.GetBufferSizes() + if b != 1 { + t.Errorf("BUG: B was evicted based on original timestamp! Expected B alive (TTL reset to t=12s), got buffer size %d", b) + } + + // T0+13s: Now B should be expired (TTL reset was at t=2s, expires at t=12s) + timeProvider.now = now.Add(13 * time.Second) + svc.cleanExpired() + _, b = svc.GetBufferSizes() + if b != 0 { + t.Errorf("expected B expired at t=13s (TTL was t=12s), got buffer size %d", b) + } +} + +// TestCorrelationService_KeepAlive_LongSession tests that B events are NOT evicted +// based on their original timestamp when TTL is reset on each correlation. +// This is the core bug fix for Keep-Alive support. +// +// Scenario: B arrives at t=0 with timestamp t=0. Multiple A events arrive +// with timestamps within the correlation window of B (t=0). +// The bug would evict B when its original timestamp (t=0) becomes "old" compared +// to "now", but the fix uses TTL map only, so B stays alive as long as TTL is reset. +func TestCorrelationService_KeepAlive_LongSession(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: 5 * time.Second, // Window for correlation + ApacheAlwaysEmit: false, + NetworkEmit: false, + MatchingMode: MatchingModeOneToMany, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: 120, // 120 seconds TTL for long sessions + } + + svc := NewCorrelationService(config, timeProvider) + + // T0: Network event (TLS handshake) - timestamp = now + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"ja3": "abc123", "ja4": "def456"}, + } + svc.ProcessEvent(networkEvent) + + // Simulate 6 HTTP requests over 25 seconds (Keep-Alive) + // All A timestamps are within the 5s correlation window of B (t=0) + // A timestamps: t=0.5, 1, 1.5, 2, 2.5, 3 (all within 5s of B's t=0) + for i := 1; i <= 6; i++ { + // Advance "now" by 5s between each request + timeProvider.now = now.Add(time.Duration(i*5) * time.Second) + + // A timestamp stays within the correlation window of B (t=0) + // This simulates A events whose timestamps are close to B's timestamp + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now.Add(time.Duration(i) * 500 * time.Millisecond), // t=0.5, 1, 1.5, 2, 2.5, 3 + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET", "path": fmt.Sprintf("/api/%d", i)}, + } + results := svc.ProcessEvent(apacheEvent) + if len(results) != 1 || !results[0].Correlated { + t.Errorf("Request %d at t=%ds (A timestamp t=%v): expected correlation, got %d results", + i, i*5, now.Add(time.Duration(i)*500*time.Millisecond), len(results)) + } + } + + // After 25 seconds of real time, B should still be alive + // because TTL was reset at each correlation (last reset at t=30s, expires at t=150s) + // The bug would have evicted B at t=30s because B.timestamp (t=0) < cutoff (t=30s-120s) + timeProvider.now = now.Add(25 * time.Second) + svc.cleanExpired() + _, b := svc.GetBufferSizes() + if b != 1 { + t.Errorf("BUG: B was evicted based on original timestamp at t=25s! Expected B alive (TTL reset), got buffer size %d", b) + } + + // T0+155s: B should expire (last TTL reset was at t=30s + 120s TTL = t=150s) + timeProvider.now = now.Add(155 * time.Second) + svc.cleanExpired() + _, b = svc.GetBufferSizes() + if b != 0 { + t.Errorf("expected B expired at t=155s (TTL was reset at t=30s + 120s = t=150s), got buffer size %d", b) + } +} diff --git a/packaging/rpm/logcorrelator.spec b/packaging/rpm/logcorrelator.spec index b40dfe4..cb08eef 100644 --- a/packaging/rpm/logcorrelator.spec +++ b/packaging/rpm/logcorrelator.spec @@ -2,7 +2,7 @@ # Compatible with CentOS 7, Rocky Linux 8, 9, 10 # Define version before Version: field for RPM macro support -%global spec_version 1.1.6 +%global spec_version 1.1.7 Name: logcorrelator Version: %{spec_version} @@ -121,6 +121,13 @@ fi /etc/logrotate.d/logcorrelator %changelog +* Tue Mar 03 2026 logcorrelator - 1.1.7-1 +- Fix: Critical Keep-Alive bug - network events evicted based on original timestamp instead of reset TTL +- Fix: Correlation time window increased from 1s to 10s for HTTP Keep-Alive support +- Fix: Network source now uses payload timestamp if available (fallback to reception time) +- Change: Default network TTL increased from 30s to 120s for long Keep-Alive sessions +- Test: Added comprehensive Keep-Alive tests (TTL reset, long session scenarios) + * Tue Mar 03 2026 logcorrelator - 1.1.6-1 - Docs: Update ClickHouse schema documentation (http_logs_raw + http_logs tables) - Fix: ClickHouse insertion uses single raw_json column (FORMAT JSONEachRow)