diff --git a/internal/domain/correlation_service.go b/internal/domain/correlation_service.go index 67bd43c..7869ce6 100644 --- a/internal/domain/correlation_service.go +++ b/internal/domain/correlation_service.go @@ -122,13 +122,17 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo // Check buffer overflow before adding if s.isBufferFull(event.Source) { - // Buffer full, drop event or emit as orphan - s.logger.Warnf("buffer full, dropping event: source=%s src_ip=%s src_port=%d", + // Buffer full - rotate oldest event instead of dropping new one + s.logger.Warnf("buffer full, rotating oldest event: source=%s src_ip=%s src_port=%d", event.Source, event.SrcIP, event.SrcPort) - if event.Source == SourceA && s.config.ApacheAlwaysEmit { - return []CorrelatedLog{NewCorrelatedLogFromEvent(event, "A")} + if event.Source == SourceA { + // Remove oldest A event and emit as orphan if configured + s.rotateOldestA() + } else if event.Source == SourceB { + // Remove oldest B event (no emission for B) + s.rotateOldestB() } - return nil + // Continue to add the new event after rotation } var ( @@ -174,6 +178,49 @@ func (s *CorrelationService) isBufferFull(source EventSource) bool { return false } +// rotateOldestA removes the oldest A event from the buffer and emits it as orphan if configured. +func (s *CorrelationService) rotateOldestA() { + elem := s.bufferA.events.Front() + if elem == nil { + return + } + + event := elem.Value.(*NormalizedEvent) + key := event.CorrelationKey() + + // Remove from buffer + s.bufferA.events.Remove(elem) + s.pendingA[key] = removeElementFromSlice(s.pendingA[key], elem) + if len(s.pendingA[key]) == 0 { + delete(s.pendingA, key) + } + + // Emit as orphan if configured + if s.config.ApacheAlwaysEmit { + s.logger.Warnf("orphan A event (buffer rotation): src_ip=%s src_port=%d", event.SrcIP, event.SrcPort) + } +} + +// rotateOldestB removes the oldest B event from the buffer (no emission). +func (s *CorrelationService) rotateOldestB() { + elem := s.bufferB.events.Front() + if elem == nil { + return + } + + event := elem.Value.(*NormalizedEvent) + key := event.CorrelationKey() + + // Remove from buffer + s.bufferB.events.Remove(elem) + s.pendingB[key] = removeElementFromSlice(s.pendingB[key], elem) + if len(s.pendingB[key]) == 0 { + delete(s.pendingB, key) + } + // Remove from TTL map + delete(s.networkTTLs, elem) +} + func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]CorrelatedLog, bool) { key := event.CorrelationKey() @@ -266,19 +313,74 @@ func (s *CorrelationService) addEvent(event *NormalizedEvent) { } func (s *CorrelationService) cleanExpired() { - now := s.timeProvider.Now() - - // Clean expired A events (based on time window) - aCutoff := now.Add(-s.config.TimeWindow) - s.cleanBuffer(s.bufferA, s.pendingA, aCutoff) - - // Clean expired B events - use TTL map only (not event timestamp) + // Clean expired B events first - use TTL map only (not event timestamp) // This is critical for Keep-Alive: TTL is reset on each correlation, // so we must check the reset TTL, not the original event timestamp. s.cleanNetworkBufferByTTL() + + // Clean A events based on B TTL, not system time + // An A event should only be removed if: + // 1. It has already been correlated (no longer in buffer), OR + // 2. All potential matching B events have expired (TTL exceeded) + s.cleanBufferAByBTTL() +} + +// cleanBufferAByBTTL cleans A events based on whether corresponding B events still exist. +// An A event is removed only if: +// 1. All potential matching B events have expired (TTL exceeded), OR +// 2. The A event itself is too old (beyond TimeWindow from current time) +// When removed, the A event is emitted as orphan if ApacheAlwaysEmit=true. +func (s *CorrelationService) cleanBufferAByBTTL() { + now := s.timeProvider.Now() + var toRemove []*list.Element + + // First pass: identify A events to remove + for elem := s.bufferA.events.Front(); elem != nil; elem = elem.Next() { + event := elem.Value.(*NormalizedEvent) + key := event.CorrelationKey() + + // Check if there's any non-expired B event with the same key + hasValidB := false + if bElements, ok := s.pendingB[key]; ok { + for _, bElem := range bElements { + if ttl, exists := s.networkTTLs[bElem]; exists { + if !now.After(ttl) { + hasValidB = true + break + } + } + } + } + + // Remove A if no valid B exists AND A is beyond TimeWindow from now + if !hasValidB { + aAge := now.Sub(event.Timestamp) + if aAge > s.config.TimeWindow { + toRemove = append(toRemove, elem) + } + } + } + + // Second pass: remove identified events and emit as orphans if configured + for _, elem := range toRemove { + event := elem.Value.(*NormalizedEvent) + key := event.CorrelationKey() + s.bufferA.events.Remove(elem) + s.pendingA[key] = removeElementFromSlice(s.pendingA[key], elem) + if len(s.pendingA[key]) == 0 { + delete(s.pendingA, key) + } + + if s.config.ApacheAlwaysEmit { + s.logger.Warnf("orphan A event (no B match, TTL expired): src_ip=%s src_port=%d", + event.SrcIP, event.SrcPort) + } + } } // cleanBuffer removes expired events from buffer A (based on event timestamp). +// Deprecated: Use cleanBufferAByBTTL for source A events to properly handle +// correlation with B events. func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time) { for elem := buffer.events.Front(); elem != nil; { next := elem.Next() @@ -429,17 +531,53 @@ func removeElementFromSlice(elements []*list.Element, target *list.Element) []*l } // Flush forces emission of remaining buffered events (for shutdown). +// It first attempts to correlate remaining A and B events, then emits orphans. func (s *CorrelationService) Flush() []CorrelatedLog { s.mu.Lock() defer s.mu.Unlock() var results []CorrelatedLog + // First, try to correlate remaining A events with B events + // This handles the case where both A and B are still in buffer at shutdown + for elem := s.bufferA.events.Front(); elem != nil; { + next := elem.Next() + event := elem.Value.(*NormalizedEvent) + key := event.CorrelationKey() + + // Look for matching B events + matched := false + if bElements, ok := s.pendingB[key]; ok { + for _, bElem := range bElements { + bEvent := bElem.Value.(*NormalizedEvent) + if s.eventsMatch(event, bEvent) { + // Correlate A with B + correlated := NewCorrelatedLog(event, bEvent) + s.logger.Debugf("flush correlation: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)", + event.SrcIP, event.SrcPort, bEvent.SrcIP, bEvent.SrcPort) + results = append(results, correlated) + matched = true + break + } + } + } + + if matched { + s.bufferA.events.Remove(elem) + s.pendingA[key] = removeElementFromSlice(s.pendingA[key], elem) + if len(s.pendingA[key]) == 0 { + delete(s.pendingA, key) + } + } + elem = next + } + // Emit remaining A events as orphans if configured if s.config.ApacheAlwaysEmit { for elem := s.bufferA.events.Front(); elem != nil; elem = elem.Next() { event := elem.Value.(*NormalizedEvent) orphan := NewCorrelatedLogFromEvent(event, "A") + s.logger.Warnf("flush orphan A: src_ip=%s src_port=%d", event.SrcIP, event.SrcPort) results = append(results, orphan) } } diff --git a/internal/domain/correlation_service_test.go b/internal/domain/correlation_service_test.go index b93bb69..d2804a6 100644 --- a/internal/domain/correlation_service_test.go +++ b/internal/domain/correlation_service_test.go @@ -763,3 +763,319 @@ func TestCorrelationService_KeepAlive_LongSession(t *testing.T) { t.Errorf("expected B expired at t=155s (TTL was reset at t=30s + 120s = t=150s), got buffer size %d", b) } } + +// TestCorrelationService_ALateThanB_WithinTimeWindow tests that A events arriving after B +// within the time window are correctly correlated. +func TestCorrelationService_ALateThanB_WithinTimeWindow(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: 5 * time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + MatchingMode: MatchingModeOneToMany, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + } + + svc := NewCorrelationService(config, timeProvider) + + // B arrives first at t=0 + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"ja3": "abc123"}, + } + results := svc.ProcessEvent(networkEvent) + if len(results) != 0 { + t.Fatalf("expected 0 results (B buffered), got %d", len(results)) + } + + // A arrives later at t=2s (within time window of B) + timeProvider.now = now.Add(2 * time.Second) + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: timeProvider.now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + results = svc.ProcessEvent(apacheEvent) + if len(results) != 1 { + t.Errorf("expected 1 correlated result, got %d", len(results)) + } else if !results[0].Correlated { + t.Error("expected correlated result") + } +} + +// TestCorrelationService_ALateThanB_AExpiredTooSoon tests that A events are not +// prematurely expired when B events arrive within the TimeWindow of A's timestamp. +// This test verifies that correlation is based on event timestamps, not system time. +func TestCorrelationService_ALateThanB_AExpiredTooSoon(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: 10 * time.Second, // Larger window to allow correlation + ApacheAlwaysEmit: false, + NetworkEmit: false, + MatchingMode: MatchingModeOneToMany, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: 30, // 30 seconds TTL for B + } + + svc := NewCorrelationService(config, timeProvider) + + // A arrives at t=0 + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + results := svc.ProcessEvent(apacheEvent) + if len(results) != 0 { + t.Fatalf("expected 0 results (A buffered), got %d", len(results)) + } + + // Advance time to t=6s (A is 6s old, still within TimeWindow=10s) + timeProvider.now = now.Add(6 * time.Second) + svc.cleanExpired() + + a, _ := svc.GetBufferSizes() + if a != 1 { + t.Errorf("expected A still in buffer at t=6s, got A=%d", a) + } + + // B arrives at t=6s with timestamp t=5s (within TimeWindow of A at t=0) + // A.timestamp=t=0, B.timestamp=t=5, diff=5s < TimeWindow=10s -> should match + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now.Add(5 * time.Second), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"ja3": "abc123"}, + } + results = svc.ProcessEvent(networkEvent) + if len(results) != 1 { + t.Errorf("expected 1 correlated result, got %d", len(results)) + } else if !results[0].Correlated { + t.Error("expected correlated result") + } +} + +// TestCorrelationService_Flush_CorrelatesRemainingEvents tests that Flush() +// correlates remaining A and B events before emitting orphans. +func TestCorrelationService_Flush_CorrelatesRemainingEvents(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: 5 * time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + MatchingMode: MatchingModeOneToMany, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + } + + svc := NewCorrelationService(config, timeProvider) + + // Add A event to buffer (ApacheAlwaysEmit=false would buffer, but we set true) + // So we manually add to buffer for testing + keyA := "192.168.1.1:8080" + keyB := "192.168.1.1:8080" + + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now.Add(2 * time.Second), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"ja3": "abc123"}, + } + + // Manually add to buffers (bypassing ProcessEvent logic) + elemA := svc.bufferA.events.PushBack(apacheEvent) + svc.pendingA[keyA] = append(svc.pendingA[keyA], elemA) + + elemB := svc.bufferB.events.PushBack(networkEvent) + svc.pendingB[keyB] = append(svc.pendingB[keyB], elemB) + svc.networkTTLs[elemB] = now.Add(time.Duration(svc.config.NetworkTTLS) * time.Second) + + // Flush should correlate A and B, not emit as orphans + flushed := svc.Flush() + if len(flushed) != 1 { + t.Errorf("expected 1 flushed correlated result, got %d", len(flushed)) + } else if flushed[0].Correlated { + // Good - it's correlated + } else { + t.Errorf("expected correlated result, got orphan side %s", flushed[0].OrphanSide) + } + + // Verify buffers are cleared + a, b := svc.GetBufferSizes() + if a != 0 || b != 0 { + t.Errorf("expected empty buffers after flush, got A=%d, B=%d", a, b) + } +} + +// TestCorrelationService_BufferFull_RotatesOldestA tests that when buffer A is full, +// the oldest A event is rotated (not immediate emission of new event). +func TestCorrelationService_BufferFull_RotatesOldestA(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: 5 * time.Second, + ApacheAlwaysEmit: false, // Must be false to buffer events + NetworkEmit: false, + MatchingMode: MatchingModeOneToMany, + MaxHTTPBufferSize: 3, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + } + + svc := NewCorrelationService(config, timeProvider) + + // Fill buffer to capacity + for i := 0; i < 3; i++ { + event := &NormalizedEvent{ + Source: SourceA, + Timestamp: now.Add(time.Duration(i) * time.Second), + SrcIP: "192.168.1.1", + SrcPort: 8080 + i, + Raw: map[string]any{"method": "GET"}, + } + results := svc.ProcessEvent(event) + // With rotation, new events are buffered, oldest is rotated out + // Rotation doesn't emit (only logs), so results should be 0 + if len(results) != 0 { + t.Errorf("event %d: expected 0 results (buffered), got %d", i, len(results)) + } + } + + a, _ := svc.GetBufferSizes() + if a != 3 { + t.Errorf("expected buffer A size 3, got %d", a) + } + + // Add 4th event - should rotate oldest (port 8080) and buffer new one + overflowEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now.Add(10 * time.Second), + SrcIP: "192.168.1.1", + SrcPort: 9999, + Raw: map[string]any{"method": "POST"}, + } + results := svc.ProcessEvent(overflowEvent) + if len(results) != 0 { + t.Errorf("expected 0 results on overflow (rotated), got %d", len(results)) + } + + // Buffer should still have 3 events (oldest rotated, newest added) + a, _ = svc.GetBufferSizes() + if a != 3 { + t.Errorf("expected buffer A size 3 after rotation, got %d", a) + } +} + +// TestCorrelationService_CleanA_RespectsBTTL tests that cleaning A events +// respects the TTL of corresponding B events. +func TestCorrelationService_CleanA_RespectsBTTL(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: 5 * time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + MatchingMode: MatchingModeOneToMany, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: 30, // 30 seconds TTL for B + } + + svc := NewCorrelationService(config, timeProvider) + + // Add B event at t=0 + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"ja3": "abc123"}, + } + svc.ProcessEvent(networkEvent) + + // Add A event at t=1 + timeProvider.now = now.Add(1 * time.Second) + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: timeProvider.now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + results := svc.ProcessEvent(apacheEvent) + if len(results) != 1 || !results[0].Correlated { + t.Fatalf("expected 1 correlated result, got %d", len(results)) + } + + // Advance time to t=10s (A is 9s old, beyond TimeWindow from "now") + // But B is still valid (TTL=30s, reset at t=1, expires at t=31) + timeProvider.now = now.Add(10 * time.Second) + svc.cleanExpired() + + // A should have been emitted during correlation, buffer should be empty for A + // B should still be in buffer (TTL not expired) + a, b := svc.GetBufferSizes() + if a != 0 { + t.Errorf("expected A buffer empty (emitted during correlation), got %d", a) + } + if b != 1 { + t.Errorf("expected B still in buffer (TTL valid), got %d", b) + } + + // Now test the case where A is buffered (no match yet) and B expires + // Add new A at t=10 + apacheEvent2 := &NormalizedEvent{ + Source: SourceA, + Timestamp: timeProvider.now, + SrcIP: "192.168.1.2", + SrcPort: 9090, + Raw: map[string]any{"method": "GET"}, + } + svc.ProcessEvent(apacheEvent2) + + // Advance time past B TTL (t=35s, B TTL expired at t=31s) + timeProvider.now = now.Add(35 * time.Second) + svc.cleanExpired() + + // B should be expired now + a, b = svc.GetBufferSizes() + if b != 0 { + t.Errorf("expected B expired at t=35s, got %d", b) + } + + // A should still be in buffer (not yet beyond its own TimeWindow from "now") + // Actually, A at t=10 is 25s old at t=35, which is > TimeWindow (5s) + // So A should also be cleaned + a, b = svc.GetBufferSizes() + if a != 0 { + t.Errorf("expected A also expired at t=35s (25s old > 5s TimeWindow), got %d", a) + } +}