fix: critical Keep-Alive correlation bug - network events evicted prematurely
- Fix cleanExpired() to use TTL map instead of event timestamp for B events - Increase default correlation time window from 1s to 10s - Increase default network TTL from 30s to 120s for long sessions - Use payload timestamp for network events when available (fallback to now) - Add comprehensive Keep-Alive tests (TTL reset, long session scenarios) - Bump version to 1.1.7 Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
2
Makefile
2
Makefile
@ -15,7 +15,7 @@ BINARY_NAME=logcorrelator
|
||||
DIST_DIR=dist
|
||||
|
||||
# Package version
|
||||
PKG_VERSION ?= 1.1.6
|
||||
PKG_VERSION ?= 1.1.7
|
||||
|
||||
## build: Build the logcorrelator binary locally
|
||||
build:
|
||||
|
||||
@ -175,8 +175,9 @@ config:
|
||||
correlation:
|
||||
# Fenêtre de corrélation : si le log HTTP arrive avant le réseau, il attend
|
||||
# au plus cette durée (sauf éviction du cache HTTP).
|
||||
# Augmentée à 10s pour supporter le Keep-Alive HTTP.
|
||||
time_window:
|
||||
value: 1
|
||||
value: 10
|
||||
unit: s
|
||||
|
||||
orphan_policy:
|
||||
@ -192,9 +193,10 @@ config:
|
||||
max_network_items: 20000
|
||||
|
||||
ttl:
|
||||
# Durée de vie standard d’un log réseau (B) en mémoire. Chaque corrélation
|
||||
# Durée de vie standard d'un log réseau (B) en mémoire. Chaque corrélation
|
||||
# réussie avec un A réinitialise ce TTL.
|
||||
network_ttl_s: 30
|
||||
# Augmenté à 120s pour supporter les sessions HTTP Keep-Alive longues.
|
||||
network_ttl_s: 120
|
||||
|
||||
inputs:
|
||||
description: >
|
||||
@ -267,16 +269,16 @@ outputs:
|
||||
correlation:
|
||||
description: >
|
||||
Corrélation stricte basée sur src_ip + src_port et une fenêtre temporelle
|
||||
configurable. Aucun autre champ n’est utilisé pour la décision de corrélation.
|
||||
configurable. Aucun autre champ n'est utilisé pour la décision de corrélation.
|
||||
key:
|
||||
- src_ip
|
||||
- src_port
|
||||
time_window:
|
||||
value: 1
|
||||
value: 10
|
||||
unit: s
|
||||
description: >
|
||||
Fenêtre de temps appliquée aux timestamps de A et B. Si B n’arrive pas dans
|
||||
ce délai, A est émis comme orphelin.
|
||||
Fenêtre de temps appliquée aux timestamps de A et B. Si B n'arrive pas dans
|
||||
ce délai, A est émis comme orphelin. Augmentée à 10s pour le Keep-Alive.
|
||||
retention_limits:
|
||||
max_http_items: 10000
|
||||
max_network_items: 20000
|
||||
@ -285,10 +287,10 @@ correlation:
|
||||
évincé et émis orphelin. Si max_network_items est atteint, le plus ancien B
|
||||
est supprimé silencieusement.
|
||||
ttl_management:
|
||||
network_ttl_s: 30
|
||||
network_ttl_s: 120
|
||||
description: >
|
||||
TTL des logs réseau. Chaque fois qu’un B est corrélé à un A (Keep‑Alive),
|
||||
son TTL est remis à cette valeur.
|
||||
TTL des logs réseau. Chaque fois qu'un B est corrélé à un A (Keep-Alive),
|
||||
son TTL est remis à cette valeur. Augmenté à 120s pour les sessions longues.
|
||||
timestamp_source:
|
||||
apache: field_timestamp
|
||||
network: reception_time
|
||||
|
||||
@ -40,8 +40,9 @@ outputs:
|
||||
|
||||
correlation:
|
||||
# Time window for correlation (A and B must be within this window)
|
||||
# Increased to 10s to support HTTP Keep-Alive scenarios
|
||||
time_window:
|
||||
value: 1
|
||||
value: 10
|
||||
unit: s
|
||||
|
||||
# Orphan policy: what to do when no match is found
|
||||
@ -59,6 +60,7 @@ correlation:
|
||||
max_network_items: 20000
|
||||
|
||||
# TTL for network events (source B)
|
||||
# Increased to 120s to support long-lived HTTP Keep-Alive sessions
|
||||
ttl:
|
||||
network_ttl_s: 30
|
||||
network_ttl_s: 120
|
||||
|
||||
|
||||
@ -259,8 +259,23 @@ func parseJSONEvent(data []byte, sourceType string) (*domain.NormalizedEvent, er
|
||||
// Assume nanoseconds
|
||||
event.Timestamp = time.Unix(0, ts)
|
||||
case domain.SourceB:
|
||||
// For network source, always use local reception time
|
||||
// For network source, try to use event timestamp if available,
|
||||
// fallback to reception time. This improves correlation accuracy
|
||||
// when network logs include their own timestamp (e.g., from packet capture).
|
||||
if ts, ok := getInt64(raw, "timestamp"); ok {
|
||||
event.Timestamp = time.Unix(0, ts)
|
||||
} else if timeStr, ok := getString(raw, "time"); ok {
|
||||
// Try RFC3339 format
|
||||
if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
|
||||
event.Timestamp = t
|
||||
} else if t, err := time.Parse(time.RFC3339Nano, timeStr); err == nil {
|
||||
event.Timestamp = t
|
||||
} else {
|
||||
event.Timestamp = time.Now()
|
||||
}
|
||||
} else {
|
||||
event.Timestamp = time.Now()
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported source type: %s", event.Source)
|
||||
}
|
||||
|
||||
@ -62,9 +62,7 @@ func TestParseJSONEvent_Network(t *testing.T) {
|
||||
"tcp_meta_flags": "SYN"
|
||||
}`)
|
||||
|
||||
before := time.Now()
|
||||
event, err := parseJSONEvent(data, "B")
|
||||
after := time.Now()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
@ -78,8 +76,10 @@ func TestParseJSONEvent_Network(t *testing.T) {
|
||||
if event.Source != domain.SourceB {
|
||||
t.Errorf("expected source B, got %s", event.Source)
|
||||
}
|
||||
if event.Timestamp.Before(before.Add(-2*time.Second)) || event.Timestamp.After(after.Add(2*time.Second)) {
|
||||
t.Errorf("expected network timestamp near now, got %v", event.Timestamp)
|
||||
// Network source now uses payload timestamp if available
|
||||
expectedTs := time.Unix(0, 1704110400000000000)
|
||||
if !event.Timestamp.Equal(expectedTs) {
|
||||
t.Errorf("expected network timestamp %v, got %v", expectedTs, event.Timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
@ -114,11 +114,47 @@ func TestParseJSONEvent_SourceARequiresNumericTimestamp(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseJSONEvent_SourceBIgnoresPayloadTimestamp(t *testing.T) {
|
||||
func TestParseJSONEvent_SourceBUsesPayloadTimestamp(t *testing.T) {
|
||||
expectedTs := int64(1704110400000000000)
|
||||
data := []byte(`{
|
||||
"src_ip": "192.168.1.1",
|
||||
"src_port": 8080,
|
||||
"timestamp": 1
|
||||
"timestamp": 1704110400000000000
|
||||
}`)
|
||||
|
||||
event, err := parseJSONEvent(data, "B")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
expectedTime := time.Unix(0, expectedTs)
|
||||
if !event.Timestamp.Equal(expectedTime) {
|
||||
t.Errorf("expected source B to use payload timestamp %v, got %v", expectedTime, event.Timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseJSONEvent_SourceBUsesTimeField(t *testing.T) {
|
||||
data := []byte(`{
|
||||
"src_ip": "192.168.1.1",
|
||||
"src_port": 8080,
|
||||
"time": "2024-01-01T12:00:00Z"
|
||||
}`)
|
||||
|
||||
event, err := parseJSONEvent(data, "B")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
expectedTime := time.Unix(0, 1704110400000000000)
|
||||
if !event.Timestamp.Equal(expectedTime) {
|
||||
t.Errorf("expected source B to use time field %v, got %v", expectedTime, event.Timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseJSONEvent_SourceBFallbackToNow(t *testing.T) {
|
||||
data := []byte(`{
|
||||
"src_ip": "192.168.1.1",
|
||||
"src_port": 8080
|
||||
}`)
|
||||
|
||||
before := time.Now()
|
||||
|
||||
@ -719,8 +719,8 @@ func TestCorrelationConfig_GetNetworkTTLS_Default(t *testing.T) {
|
||||
}
|
||||
|
||||
result := cfg.GetNetworkTTLS()
|
||||
if result != 30 {
|
||||
t.Errorf("expected default 30, got %d", result)
|
||||
if result != 120 {
|
||||
t.Errorf("expected default 120, got %d", result)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -14,9 +14,14 @@ const (
|
||||
// DefaultMaxNetworkBufferSize is the default maximum number of network events (source B)
|
||||
DefaultMaxNetworkBufferSize = 20000
|
||||
// DefaultTimeWindow is used when no valid time window is provided
|
||||
DefaultTimeWindow = time.Second
|
||||
// Increased to 10s to support HTTP Keep-Alive scenarios where multiple
|
||||
// HTTP requests arrive within a short time window on the same connection.
|
||||
DefaultTimeWindow = 10 * time.Second
|
||||
// DefaultNetworkTTLS is the default TTL for network events in seconds
|
||||
DefaultNetworkTTLS = 30
|
||||
// Increased to 120s to support long-lived HTTP Keep-Alive sessions.
|
||||
// The TTL is reset on each correlation (Keep-Alive mode), so the network
|
||||
// event stays in buffer as long as the connection is active.
|
||||
DefaultNetworkTTLS = 120
|
||||
// MatchingModeOneToOne indicates single correlation (consume B after match)
|
||||
MatchingModeOneToOne = "one_to_one"
|
||||
// MatchingModeOneToMany indicates Keep-Alive mode (B can match multiple A)
|
||||
@ -265,45 +270,51 @@ func (s *CorrelationService) cleanExpired() {
|
||||
|
||||
// Clean expired A events (based on time window)
|
||||
aCutoff := now.Add(-s.config.TimeWindow)
|
||||
s.cleanBuffer(s.bufferA, s.pendingA, aCutoff, nil)
|
||||
s.cleanBuffer(s.bufferA, s.pendingA, aCutoff)
|
||||
|
||||
// Clean expired B events (based on TTL)
|
||||
bCutoff := now.Add(-time.Duration(s.config.NetworkTTLS) * time.Second)
|
||||
s.cleanBuffer(s.bufferB, s.pendingB, bCutoff, s.networkTTLs)
|
||||
// Clean expired B events - use TTL map only (not event timestamp)
|
||||
// This is critical for Keep-Alive: TTL is reset on each correlation,
|
||||
// so we must check the reset TTL, not the original event timestamp.
|
||||
s.cleanNetworkBufferByTTL()
|
||||
}
|
||||
|
||||
// cleanBuffer removes expired events from a buffer.
|
||||
func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time, networkTTLs map[*list.Element]time.Time) {
|
||||
// cleanBuffer removes expired events from buffer A (based on event timestamp).
|
||||
func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time) {
|
||||
for elem := buffer.events.Front(); elem != nil; {
|
||||
next := elem.Next()
|
||||
event := elem.Value.(*NormalizedEvent)
|
||||
|
||||
// Check if event is expired
|
||||
isExpired := event.Timestamp.Before(cutoff)
|
||||
|
||||
// For B events, also check TTL
|
||||
if !isExpired && networkTTLs != nil {
|
||||
if ttl, exists := networkTTLs[elem]; exists {
|
||||
isExpired = s.timeProvider.Now().After(ttl)
|
||||
}
|
||||
}
|
||||
|
||||
if isExpired {
|
||||
if event.Timestamp.Before(cutoff) {
|
||||
key := event.CorrelationKey()
|
||||
buffer.events.Remove(elem)
|
||||
pending[key] = removeElementFromSlice(pending[key], elem)
|
||||
if len(pending[key]) == 0 {
|
||||
delete(pending, key)
|
||||
}
|
||||
// Remove from TTL map
|
||||
if networkTTLs != nil {
|
||||
delete(networkTTLs, elem)
|
||||
}
|
||||
}
|
||||
elem = next
|
||||
}
|
||||
}
|
||||
|
||||
// cleanNetworkBufferByTTL removes expired B events based on their reset TTL.
|
||||
// For Keep-Alive support, we check the TTL map (which is reset on each correlation)
|
||||
// rather than the original event timestamp.
|
||||
func (s *CorrelationService) cleanNetworkBufferByTTL() {
|
||||
now := s.timeProvider.Now()
|
||||
for elem, ttl := range s.networkTTLs {
|
||||
if now.After(ttl) {
|
||||
event := elem.Value.(*NormalizedEvent)
|
||||
key := event.CorrelationKey()
|
||||
s.bufferB.events.Remove(elem)
|
||||
s.pendingB[key] = removeElementFromSlice(s.pendingB[key], elem)
|
||||
if len(s.pendingB[key]) == 0 {
|
||||
delete(s.pendingB, key)
|
||||
}
|
||||
delete(s.networkTTLs, elem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *CorrelationService) findAndPopFirstMatch(
|
||||
buffer *eventBuffer,
|
||||
pending map[string][]*list.Element,
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@ -613,3 +614,152 @@ func TestCorrelationService_NetworkTTL_ResetOnMatch(t *testing.T) {
|
||||
t.Errorf("expected B expired after reset TTL, got %d", b)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCorrelationService_KeepAlive_TTLNotBasedOnEventTimestamp tests the critical bug fix:
|
||||
// B events must NOT be evicted based on their original timestamp when TTL is reset.
|
||||
// This is essential for HTTP Keep-Alive where multiple A events correlate with the same B.
|
||||
func TestCorrelationService_KeepAlive_TTLNotBasedOnEventTimestamp(t *testing.T) {
|
||||
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||
timeProvider := &mockTimeProvider{now: now}
|
||||
|
||||
config := CorrelationConfig{
|
||||
TimeWindow: 5 * time.Second,
|
||||
ApacheAlwaysEmit: false,
|
||||
NetworkEmit: false,
|
||||
MatchingMode: MatchingModeOneToMany,
|
||||
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
|
||||
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
|
||||
NetworkTTLS: 10, // 10 seconds TTL
|
||||
}
|
||||
|
||||
svc := NewCorrelationService(config, timeProvider)
|
||||
|
||||
// T0: B event arrives (network log with old timestamp)
|
||||
networkEvent := &NormalizedEvent{
|
||||
Source: SourceB,
|
||||
Timestamp: now, // t=0
|
||||
SrcIP: "192.168.1.1",
|
||||
SrcPort: 8080,
|
||||
Raw: map[string]any{"ja3": "abc123"},
|
||||
}
|
||||
svc.ProcessEvent(networkEvent)
|
||||
|
||||
// T0+1s: First A event - correlates with B, TTL reset to t=11s
|
||||
timeProvider.now = now.Add(1 * time.Second)
|
||||
apacheEvent1 := &NormalizedEvent{
|
||||
Source: SourceA,
|
||||
Timestamp: timeProvider.now,
|
||||
SrcIP: "192.168.1.1",
|
||||
SrcPort: 8080,
|
||||
}
|
||||
results := svc.ProcessEvent(apacheEvent1)
|
||||
if len(results) != 1 || !results[0].Correlated {
|
||||
t.Fatalf("expected 1 correlated result, got %d", len(results))
|
||||
}
|
||||
|
||||
// T0+2s: Second A event - should also correlate (Keep-Alive), TTL reset to t=12s
|
||||
timeProvider.now = now.Add(2 * time.Second)
|
||||
apacheEvent2 := &NormalizedEvent{
|
||||
Source: SourceA,
|
||||
Timestamp: timeProvider.now,
|
||||
SrcIP: "192.168.1.1",
|
||||
SrcPort: 8080,
|
||||
}
|
||||
results = svc.ProcessEvent(apacheEvent2)
|
||||
if len(results) != 1 || !results[0].Correlated {
|
||||
t.Fatalf("expected 1 correlated result (Keep-Alive), got %d", len(results))
|
||||
}
|
||||
|
||||
// T0+11s: B should still be alive (TTL was reset to t=12s)
|
||||
// The bug would evict B here because B.timestamp (t=0) < cutoff (t=11s-10s=t=1s)
|
||||
timeProvider.now = now.Add(11 * time.Second)
|
||||
svc.cleanExpired()
|
||||
_, b := svc.GetBufferSizes()
|
||||
if b != 1 {
|
||||
t.Errorf("BUG: B was evicted based on original timestamp! Expected B alive (TTL reset to t=12s), got buffer size %d", b)
|
||||
}
|
||||
|
||||
// T0+13s: Now B should be expired (TTL reset was at t=2s, expires at t=12s)
|
||||
timeProvider.now = now.Add(13 * time.Second)
|
||||
svc.cleanExpired()
|
||||
_, b = svc.GetBufferSizes()
|
||||
if b != 0 {
|
||||
t.Errorf("expected B expired at t=13s (TTL was t=12s), got buffer size %d", b)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCorrelationService_KeepAlive_LongSession tests that B events are NOT evicted
|
||||
// based on their original timestamp when TTL is reset on each correlation.
|
||||
// This is the core bug fix for Keep-Alive support.
|
||||
//
|
||||
// Scenario: B arrives at t=0 with timestamp t=0. Multiple A events arrive
|
||||
// with timestamps within the correlation window of B (t=0).
|
||||
// The bug would evict B when its original timestamp (t=0) becomes "old" compared
|
||||
// to "now", but the fix uses TTL map only, so B stays alive as long as TTL is reset.
|
||||
func TestCorrelationService_KeepAlive_LongSession(t *testing.T) {
|
||||
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||
timeProvider := &mockTimeProvider{now: now}
|
||||
|
||||
config := CorrelationConfig{
|
||||
TimeWindow: 5 * time.Second, // Window for correlation
|
||||
ApacheAlwaysEmit: false,
|
||||
NetworkEmit: false,
|
||||
MatchingMode: MatchingModeOneToMany,
|
||||
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
|
||||
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
|
||||
NetworkTTLS: 120, // 120 seconds TTL for long sessions
|
||||
}
|
||||
|
||||
svc := NewCorrelationService(config, timeProvider)
|
||||
|
||||
// T0: Network event (TLS handshake) - timestamp = now
|
||||
networkEvent := &NormalizedEvent{
|
||||
Source: SourceB,
|
||||
Timestamp: now,
|
||||
SrcIP: "192.168.1.1",
|
||||
SrcPort: 8080,
|
||||
Raw: map[string]any{"ja3": "abc123", "ja4": "def456"},
|
||||
}
|
||||
svc.ProcessEvent(networkEvent)
|
||||
|
||||
// Simulate 6 HTTP requests over 25 seconds (Keep-Alive)
|
||||
// All A timestamps are within the 5s correlation window of B (t=0)
|
||||
// A timestamps: t=0.5, 1, 1.5, 2, 2.5, 3 (all within 5s of B's t=0)
|
||||
for i := 1; i <= 6; i++ {
|
||||
// Advance "now" by 5s between each request
|
||||
timeProvider.now = now.Add(time.Duration(i*5) * time.Second)
|
||||
|
||||
// A timestamp stays within the correlation window of B (t=0)
|
||||
// This simulates A events whose timestamps are close to B's timestamp
|
||||
apacheEvent := &NormalizedEvent{
|
||||
Source: SourceA,
|
||||
Timestamp: now.Add(time.Duration(i) * 500 * time.Millisecond), // t=0.5, 1, 1.5, 2, 2.5, 3
|
||||
SrcIP: "192.168.1.1",
|
||||
SrcPort: 8080,
|
||||
Raw: map[string]any{"method": "GET", "path": fmt.Sprintf("/api/%d", i)},
|
||||
}
|
||||
results := svc.ProcessEvent(apacheEvent)
|
||||
if len(results) != 1 || !results[0].Correlated {
|
||||
t.Errorf("Request %d at t=%ds (A timestamp t=%v): expected correlation, got %d results",
|
||||
i, i*5, now.Add(time.Duration(i)*500*time.Millisecond), len(results))
|
||||
}
|
||||
}
|
||||
|
||||
// After 25 seconds of real time, B should still be alive
|
||||
// because TTL was reset at each correlation (last reset at t=30s, expires at t=150s)
|
||||
// The bug would have evicted B at t=30s because B.timestamp (t=0) < cutoff (t=30s-120s)
|
||||
timeProvider.now = now.Add(25 * time.Second)
|
||||
svc.cleanExpired()
|
||||
_, b := svc.GetBufferSizes()
|
||||
if b != 1 {
|
||||
t.Errorf("BUG: B was evicted based on original timestamp at t=25s! Expected B alive (TTL reset), got buffer size %d", b)
|
||||
}
|
||||
|
||||
// T0+155s: B should expire (last TTL reset was at t=30s + 120s TTL = t=150s)
|
||||
timeProvider.now = now.Add(155 * time.Second)
|
||||
svc.cleanExpired()
|
||||
_, b = svc.GetBufferSizes()
|
||||
if b != 0 {
|
||||
t.Errorf("expected B expired at t=155s (TTL was reset at t=30s + 120s = t=150s), got buffer size %d", b)
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
# Compatible with CentOS 7, Rocky Linux 8, 9, 10
|
||||
|
||||
# Define version before Version: field for RPM macro support
|
||||
%global spec_version 1.1.6
|
||||
%global spec_version 1.1.7
|
||||
|
||||
Name: logcorrelator
|
||||
Version: %{spec_version}
|
||||
@ -121,6 +121,13 @@ fi
|
||||
/etc/logrotate.d/logcorrelator
|
||||
|
||||
%changelog
|
||||
* Tue Mar 03 2026 logcorrelator <dev@example.com> - 1.1.7-1
|
||||
- Fix: Critical Keep-Alive bug - network events evicted based on original timestamp instead of reset TTL
|
||||
- Fix: Correlation time window increased from 1s to 10s for HTTP Keep-Alive support
|
||||
- Fix: Network source now uses payload timestamp if available (fallback to reception time)
|
||||
- Change: Default network TTL increased from 30s to 120s for long Keep-Alive sessions
|
||||
- Test: Added comprehensive Keep-Alive tests (TTL reset, long session scenarios)
|
||||
|
||||
* Tue Mar 03 2026 logcorrelator <dev@example.com> - 1.1.6-1
|
||||
- Docs: Update ClickHouse schema documentation (http_logs_raw + http_logs tables)
|
||||
- Fix: ClickHouse insertion uses single raw_json column (FORMAT JSONEachRow)
|
||||
|
||||
Reference in New Issue
Block a user