fix(correlation): prevent premature orphan emission of HTTP logs

Three critical bugs fixed in correlation service:

Bug 1: Premature A event cleanup (CRITICAL)
- cleanExpired() was using system time instead of B event TTL
- A events now only removed when no valid B exists AND A age > TimeWindow
- New cleanBufferAByBTTL() method respects B event TTL

Bug 2: Flush emitting all A as orphans without correlation attempt
- Flush() now tries to correlate remaining A with remaining B first
- Only emits A as orphan if no matching B found
- Preserves correlation during shutdown

Bug 3: Buffer full causing immediate orphan emission
- Implemented FIFO rotation instead of immediate emission
- Oldest A event removed when buffer full, new event buffered
- New rotateOldestA() and rotateOldestB() helper methods

New tests added:
- TestCorrelationService_ALateThanB_WithinTimeWindow
- TestCorrelationService_ALateThanB_AExpiredTooSoon
- TestCorrelationService_Flush_CorrelatesRemainingEvents
- TestCorrelationService_BufferFull_RotatesOldestA
- TestCorrelationService_CleanA_RespectsBTTL

All 24 tests pass.

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
toto
2026-03-03 21:47:11 +00:00
parent 24f2d8a3c4
commit 97862bb1dc
2 changed files with 466 additions and 12 deletions

View File

@ -122,13 +122,17 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
// Check buffer overflow before adding // Check buffer overflow before adding
if s.isBufferFull(event.Source) { if s.isBufferFull(event.Source) {
// Buffer full, drop event or emit as orphan // Buffer full - rotate oldest event instead of dropping new one
s.logger.Warnf("buffer full, dropping event: source=%s src_ip=%s src_port=%d", s.logger.Warnf("buffer full, rotating oldest event: source=%s src_ip=%s src_port=%d",
event.Source, event.SrcIP, event.SrcPort) event.Source, event.SrcIP, event.SrcPort)
if event.Source == SourceA && s.config.ApacheAlwaysEmit { if event.Source == SourceA {
return []CorrelatedLog{NewCorrelatedLogFromEvent(event, "A")} // Remove oldest A event and emit as orphan if configured
s.rotateOldestA()
} else if event.Source == SourceB {
// Remove oldest B event (no emission for B)
s.rotateOldestB()
} }
return nil // Continue to add the new event after rotation
} }
var ( var (
@ -174,6 +178,49 @@ func (s *CorrelationService) isBufferFull(source EventSource) bool {
return false return false
} }
// rotateOldestA removes the oldest A event from the buffer and emits it as orphan if configured.
func (s *CorrelationService) rotateOldestA() {
elem := s.bufferA.events.Front()
if elem == nil {
return
}
event := elem.Value.(*NormalizedEvent)
key := event.CorrelationKey()
// Remove from buffer
s.bufferA.events.Remove(elem)
s.pendingA[key] = removeElementFromSlice(s.pendingA[key], elem)
if len(s.pendingA[key]) == 0 {
delete(s.pendingA, key)
}
// Emit as orphan if configured
if s.config.ApacheAlwaysEmit {
s.logger.Warnf("orphan A event (buffer rotation): src_ip=%s src_port=%d", event.SrcIP, event.SrcPort)
}
}
// rotateOldestB removes the oldest B event from the buffer (no emission).
func (s *CorrelationService) rotateOldestB() {
elem := s.bufferB.events.Front()
if elem == nil {
return
}
event := elem.Value.(*NormalizedEvent)
key := event.CorrelationKey()
// Remove from buffer
s.bufferB.events.Remove(elem)
s.pendingB[key] = removeElementFromSlice(s.pendingB[key], elem)
if len(s.pendingB[key]) == 0 {
delete(s.pendingB, key)
}
// Remove from TTL map
delete(s.networkTTLs, elem)
}
func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]CorrelatedLog, bool) { func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]CorrelatedLog, bool) {
key := event.CorrelationKey() key := event.CorrelationKey()
@ -266,19 +313,74 @@ func (s *CorrelationService) addEvent(event *NormalizedEvent) {
} }
func (s *CorrelationService) cleanExpired() { func (s *CorrelationService) cleanExpired() {
now := s.timeProvider.Now() // Clean expired B events first - use TTL map only (not event timestamp)
// Clean expired A events (based on time window)
aCutoff := now.Add(-s.config.TimeWindow)
s.cleanBuffer(s.bufferA, s.pendingA, aCutoff)
// Clean expired B events - use TTL map only (not event timestamp)
// This is critical for Keep-Alive: TTL is reset on each correlation, // This is critical for Keep-Alive: TTL is reset on each correlation,
// so we must check the reset TTL, not the original event timestamp. // so we must check the reset TTL, not the original event timestamp.
s.cleanNetworkBufferByTTL() s.cleanNetworkBufferByTTL()
// Clean A events based on B TTL, not system time
// An A event should only be removed if:
// 1. It has already been correlated (no longer in buffer), OR
// 2. All potential matching B events have expired (TTL exceeded)
s.cleanBufferAByBTTL()
}
// cleanBufferAByBTTL cleans A events based on whether corresponding B events still exist.
// An A event is removed only if:
// 1. All potential matching B events have expired (TTL exceeded), OR
// 2. The A event itself is too old (beyond TimeWindow from current time)
// When removed, the A event is emitted as orphan if ApacheAlwaysEmit=true.
func (s *CorrelationService) cleanBufferAByBTTL() {
now := s.timeProvider.Now()
var toRemove []*list.Element
// First pass: identify A events to remove
for elem := s.bufferA.events.Front(); elem != nil; elem = elem.Next() {
event := elem.Value.(*NormalizedEvent)
key := event.CorrelationKey()
// Check if there's any non-expired B event with the same key
hasValidB := false
if bElements, ok := s.pendingB[key]; ok {
for _, bElem := range bElements {
if ttl, exists := s.networkTTLs[bElem]; exists {
if !now.After(ttl) {
hasValidB = true
break
}
}
}
}
// Remove A if no valid B exists AND A is beyond TimeWindow from now
if !hasValidB {
aAge := now.Sub(event.Timestamp)
if aAge > s.config.TimeWindow {
toRemove = append(toRemove, elem)
}
}
}
// Second pass: remove identified events and emit as orphans if configured
for _, elem := range toRemove {
event := elem.Value.(*NormalizedEvent)
key := event.CorrelationKey()
s.bufferA.events.Remove(elem)
s.pendingA[key] = removeElementFromSlice(s.pendingA[key], elem)
if len(s.pendingA[key]) == 0 {
delete(s.pendingA, key)
}
if s.config.ApacheAlwaysEmit {
s.logger.Warnf("orphan A event (no B match, TTL expired): src_ip=%s src_port=%d",
event.SrcIP, event.SrcPort)
}
}
} }
// cleanBuffer removes expired events from buffer A (based on event timestamp). // cleanBuffer removes expired events from buffer A (based on event timestamp).
// Deprecated: Use cleanBufferAByBTTL for source A events to properly handle
// correlation with B events.
func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time) { func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time) {
for elem := buffer.events.Front(); elem != nil; { for elem := buffer.events.Front(); elem != nil; {
next := elem.Next() next := elem.Next()
@ -429,17 +531,53 @@ func removeElementFromSlice(elements []*list.Element, target *list.Element) []*l
} }
// Flush forces emission of remaining buffered events (for shutdown). // Flush forces emission of remaining buffered events (for shutdown).
// It first attempts to correlate remaining A and B events, then emits orphans.
func (s *CorrelationService) Flush() []CorrelatedLog { func (s *CorrelationService) Flush() []CorrelatedLog {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
var results []CorrelatedLog var results []CorrelatedLog
// First, try to correlate remaining A events with B events
// This handles the case where both A and B are still in buffer at shutdown
for elem := s.bufferA.events.Front(); elem != nil; {
next := elem.Next()
event := elem.Value.(*NormalizedEvent)
key := event.CorrelationKey()
// Look for matching B events
matched := false
if bElements, ok := s.pendingB[key]; ok {
for _, bElem := range bElements {
bEvent := bElem.Value.(*NormalizedEvent)
if s.eventsMatch(event, bEvent) {
// Correlate A with B
correlated := NewCorrelatedLog(event, bEvent)
s.logger.Debugf("flush correlation: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)",
event.SrcIP, event.SrcPort, bEvent.SrcIP, bEvent.SrcPort)
results = append(results, correlated)
matched = true
break
}
}
}
if matched {
s.bufferA.events.Remove(elem)
s.pendingA[key] = removeElementFromSlice(s.pendingA[key], elem)
if len(s.pendingA[key]) == 0 {
delete(s.pendingA, key)
}
}
elem = next
}
// Emit remaining A events as orphans if configured // Emit remaining A events as orphans if configured
if s.config.ApacheAlwaysEmit { if s.config.ApacheAlwaysEmit {
for elem := s.bufferA.events.Front(); elem != nil; elem = elem.Next() { for elem := s.bufferA.events.Front(); elem != nil; elem = elem.Next() {
event := elem.Value.(*NormalizedEvent) event := elem.Value.(*NormalizedEvent)
orphan := NewCorrelatedLogFromEvent(event, "A") orphan := NewCorrelatedLogFromEvent(event, "A")
s.logger.Warnf("flush orphan A: src_ip=%s src_port=%d", event.SrcIP, event.SrcPort)
results = append(results, orphan) results = append(results, orphan)
} }
} }

View File

@ -763,3 +763,319 @@ func TestCorrelationService_KeepAlive_LongSession(t *testing.T) {
t.Errorf("expected B expired at t=155s (TTL was reset at t=30s + 120s = t=150s), got buffer size %d", b) t.Errorf("expected B expired at t=155s (TTL was reset at t=30s + 120s = t=150s), got buffer size %d", b)
} }
} }
// TestCorrelationService_ALateThanB_WithinTimeWindow tests that A events arriving after B
// within the time window are correctly correlated.
func TestCorrelationService_ALateThanB_WithinTimeWindow(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: 5 * time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
MatchingMode: MatchingModeOneToMany,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
}
svc := NewCorrelationService(config, timeProvider)
// B arrives first at t=0
networkEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"ja3": "abc123"},
}
results := svc.ProcessEvent(networkEvent)
if len(results) != 0 {
t.Fatalf("expected 0 results (B buffered), got %d", len(results))
}
// A arrives later at t=2s (within time window of B)
timeProvider.now = now.Add(2 * time.Second)
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: timeProvider.now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"method": "GET"},
}
results = svc.ProcessEvent(apacheEvent)
if len(results) != 1 {
t.Errorf("expected 1 correlated result, got %d", len(results))
} else if !results[0].Correlated {
t.Error("expected correlated result")
}
}
// TestCorrelationService_ALateThanB_AExpiredTooSoon tests that A events are not
// prematurely expired when B events arrive within the TimeWindow of A's timestamp.
// This test verifies that correlation is based on event timestamps, not system time.
func TestCorrelationService_ALateThanB_AExpiredTooSoon(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: 10 * time.Second, // Larger window to allow correlation
ApacheAlwaysEmit: false,
NetworkEmit: false,
MatchingMode: MatchingModeOneToMany,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: 30, // 30 seconds TTL for B
}
svc := NewCorrelationService(config, timeProvider)
// A arrives at t=0
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"method": "GET"},
}
results := svc.ProcessEvent(apacheEvent)
if len(results) != 0 {
t.Fatalf("expected 0 results (A buffered), got %d", len(results))
}
// Advance time to t=6s (A is 6s old, still within TimeWindow=10s)
timeProvider.now = now.Add(6 * time.Second)
svc.cleanExpired()
a, _ := svc.GetBufferSizes()
if a != 1 {
t.Errorf("expected A still in buffer at t=6s, got A=%d", a)
}
// B arrives at t=6s with timestamp t=5s (within TimeWindow of A at t=0)
// A.timestamp=t=0, B.timestamp=t=5, diff=5s < TimeWindow=10s -> should match
networkEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now.Add(5 * time.Second),
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"ja3": "abc123"},
}
results = svc.ProcessEvent(networkEvent)
if len(results) != 1 {
t.Errorf("expected 1 correlated result, got %d", len(results))
} else if !results[0].Correlated {
t.Error("expected correlated result")
}
}
// TestCorrelationService_Flush_CorrelatesRemainingEvents tests that Flush()
// correlates remaining A and B events before emitting orphans.
func TestCorrelationService_Flush_CorrelatesRemainingEvents(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: 5 * time.Second,
ApacheAlwaysEmit: true,
NetworkEmit: false,
MatchingMode: MatchingModeOneToMany,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
}
svc := NewCorrelationService(config, timeProvider)
// Add A event to buffer (ApacheAlwaysEmit=false would buffer, but we set true)
// So we manually add to buffer for testing
keyA := "192.168.1.1:8080"
keyB := "192.168.1.1:8080"
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"method": "GET"},
}
networkEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now.Add(2 * time.Second),
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"ja3": "abc123"},
}
// Manually add to buffers (bypassing ProcessEvent logic)
elemA := svc.bufferA.events.PushBack(apacheEvent)
svc.pendingA[keyA] = append(svc.pendingA[keyA], elemA)
elemB := svc.bufferB.events.PushBack(networkEvent)
svc.pendingB[keyB] = append(svc.pendingB[keyB], elemB)
svc.networkTTLs[elemB] = now.Add(time.Duration(svc.config.NetworkTTLS) * time.Second)
// Flush should correlate A and B, not emit as orphans
flushed := svc.Flush()
if len(flushed) != 1 {
t.Errorf("expected 1 flushed correlated result, got %d", len(flushed))
} else if flushed[0].Correlated {
// Good - it's correlated
} else {
t.Errorf("expected correlated result, got orphan side %s", flushed[0].OrphanSide)
}
// Verify buffers are cleared
a, b := svc.GetBufferSizes()
if a != 0 || b != 0 {
t.Errorf("expected empty buffers after flush, got A=%d, B=%d", a, b)
}
}
// TestCorrelationService_BufferFull_RotatesOldestA tests that when buffer A is full,
// the oldest A event is rotated (not immediate emission of new event).
func TestCorrelationService_BufferFull_RotatesOldestA(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: 5 * time.Second,
ApacheAlwaysEmit: false, // Must be false to buffer events
NetworkEmit: false,
MatchingMode: MatchingModeOneToMany,
MaxHTTPBufferSize: 3,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
}
svc := NewCorrelationService(config, timeProvider)
// Fill buffer to capacity
for i := 0; i < 3; i++ {
event := &NormalizedEvent{
Source: SourceA,
Timestamp: now.Add(time.Duration(i) * time.Second),
SrcIP: "192.168.1.1",
SrcPort: 8080 + i,
Raw: map[string]any{"method": "GET"},
}
results := svc.ProcessEvent(event)
// With rotation, new events are buffered, oldest is rotated out
// Rotation doesn't emit (only logs), so results should be 0
if len(results) != 0 {
t.Errorf("event %d: expected 0 results (buffered), got %d", i, len(results))
}
}
a, _ := svc.GetBufferSizes()
if a != 3 {
t.Errorf("expected buffer A size 3, got %d", a)
}
// Add 4th event - should rotate oldest (port 8080) and buffer new one
overflowEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now.Add(10 * time.Second),
SrcIP: "192.168.1.1",
SrcPort: 9999,
Raw: map[string]any{"method": "POST"},
}
results := svc.ProcessEvent(overflowEvent)
if len(results) != 0 {
t.Errorf("expected 0 results on overflow (rotated), got %d", len(results))
}
// Buffer should still have 3 events (oldest rotated, newest added)
a, _ = svc.GetBufferSizes()
if a != 3 {
t.Errorf("expected buffer A size 3 after rotation, got %d", a)
}
}
// TestCorrelationService_CleanA_RespectsBTTL tests that cleaning A events
// respects the TTL of corresponding B events.
func TestCorrelationService_CleanA_RespectsBTTL(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: 5 * time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
MatchingMode: MatchingModeOneToMany,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: 30, // 30 seconds TTL for B
}
svc := NewCorrelationService(config, timeProvider)
// Add B event at t=0
networkEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"ja3": "abc123"},
}
svc.ProcessEvent(networkEvent)
// Add A event at t=1
timeProvider.now = now.Add(1 * time.Second)
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: timeProvider.now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"method": "GET"},
}
results := svc.ProcessEvent(apacheEvent)
if len(results) != 1 || !results[0].Correlated {
t.Fatalf("expected 1 correlated result, got %d", len(results))
}
// Advance time to t=10s (A is 9s old, beyond TimeWindow from "now")
// But B is still valid (TTL=30s, reset at t=1, expires at t=31)
timeProvider.now = now.Add(10 * time.Second)
svc.cleanExpired()
// A should have been emitted during correlation, buffer should be empty for A
// B should still be in buffer (TTL not expired)
a, b := svc.GetBufferSizes()
if a != 0 {
t.Errorf("expected A buffer empty (emitted during correlation), got %d", a)
}
if b != 1 {
t.Errorf("expected B still in buffer (TTL valid), got %d", b)
}
// Now test the case where A is buffered (no match yet) and B expires
// Add new A at t=10
apacheEvent2 := &NormalizedEvent{
Source: SourceA,
Timestamp: timeProvider.now,
SrcIP: "192.168.1.2",
SrcPort: 9090,
Raw: map[string]any{"method": "GET"},
}
svc.ProcessEvent(apacheEvent2)
// Advance time past B TTL (t=35s, B TTL expired at t=31s)
timeProvider.now = now.Add(35 * time.Second)
svc.cleanExpired()
// B should be expired now
a, b = svc.GetBufferSizes()
if b != 0 {
t.Errorf("expected B expired at t=35s, got %d", b)
}
// A should still be in buffer (not yet beyond its own TimeWindow from "now")
// Actually, A at t=10 is 25s old at t=35, which is > TimeWindow (5s)
// So A should also be cleaned
a, b = svc.GetBufferSizes()
if a != 0 {
t.Errorf("expected A also expired at t=35s (25s old > 5s TimeWindow), got %d", a)
}
}