diff --git a/cmd/logcorrelator/main.go b/cmd/logcorrelator/main.go index f6c753b..fb648d0 100644 --- a/cmd/logcorrelator/main.go +++ b/cmd/logcorrelator/main.go @@ -108,6 +108,7 @@ func main() { correlationSvc := domain.NewCorrelationService(domain.CorrelationConfig{ TimeWindow: cfg.Correlation.GetTimeWindow(), ApacheAlwaysEmit: cfg.Correlation.GetApacheAlwaysEmit(), + ApacheEmitDelayMs: cfg.Correlation.GetApacheEmitDelayMs(), NetworkEmit: false, MaxHTTPBufferSize: cfg.Correlation.GetMaxHTTPBufferSize(), MaxNetworkBufferSize: cfg.Correlation.GetMaxNetworkBufferSize(), @@ -118,9 +119,10 @@ func main() { // Set logger for correlation service correlationSvc.SetLogger(logger.WithFields(map[string]any{"component": "correlation"})) - logger.Info(fmt.Sprintf("Correlation service initialized: time_window=%s, emit_orphans=%v", + logger.Info(fmt.Sprintf("Correlation service initialized: time_window=%s, emit_orphans=%v, emit_delay_ms=%d", cfg.Correlation.GetTimeWindow().String(), - cfg.Correlation.EmitOrphans)) + cfg.Correlation.GetApacheAlwaysEmit(), + cfg.Correlation.GetApacheEmitDelayMs())) // Create orchestrator orchestrator := app.NewOrchestrator(app.OrchestratorConfig{ diff --git a/config.example.yml b/config.example.yml index 7dfb002..80176ce 100644 --- a/config.example.yml +++ b/config.example.yml @@ -48,6 +48,7 @@ correlation: # Orphan policy: what to do when no match is found orphan_policy: apache_always_emit: true # Always emit A events, even without B match + apache_emit_delay_ms: 500 # Wait 500ms before emitting as orphan (allows B to arrive) network_emit: false # Never emit B events alone # Matching mode: one_to_one or one_to_many (Keep-Alive) diff --git a/internal/config/config.go b/internal/config/config.go index 672d351..4078f41 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -120,8 +120,9 @@ func (c *TimeWindowConfig) GetDuration() time.Duration { // OrphanPolicyConfig holds orphan event policy configuration. type OrphanPolicyConfig struct { - ApacheAlwaysEmit bool `yaml:"apache_always_emit"` - NetworkEmit bool `yaml:"network_emit"` + ApacheAlwaysEmit bool `yaml:"apache_always_emit"` + ApacheEmitDelayMs int `yaml:"apache_emit_delay_ms"` // Delay in ms before emitting orphan A + NetworkEmit bool `yaml:"network_emit"` } // MatchingConfig holds matching mode configuration. @@ -284,6 +285,14 @@ func (c *CorrelationConfig) GetApacheAlwaysEmit() bool { return c.EmitOrphans } +// GetApacheEmitDelayMs returns the delay in milliseconds before emitting orphan A events. +func (c *CorrelationConfig) GetApacheEmitDelayMs() int { + if c.OrphanPolicy.ApacheEmitDelayMs > 0 { + return c.OrphanPolicy.ApacheEmitDelayMs + } + return domain.DefaultApacheEmitDelayMs // Default: 500ms +} + // GetMatchingMode returns the matching mode. func (c *CorrelationConfig) GetMatchingMode() string { if c.Matching.Mode != "" { diff --git a/internal/domain/correlation_service.go b/internal/domain/correlation_service.go index 7869ce6..435842a 100644 --- a/internal/domain/correlation_service.go +++ b/internal/domain/correlation_service.go @@ -22,6 +22,9 @@ const ( // The TTL is reset on each correlation (Keep-Alive mode), so the network // event stays in buffer as long as the connection is active. DefaultNetworkTTLS = 120 + // DefaultApacheEmitDelayMs is the default delay before emitting an orphan A event + // This allows B events to arrive slightly after A and still correlate + DefaultApacheEmitDelayMs = 500 // MatchingModeOneToOne indicates single correlation (consume B after match) MatchingModeOneToOne = "one_to_one" // MatchingModeOneToMany indicates Keep-Alive mode (B can match multiple A) @@ -32,6 +35,7 @@ const ( type CorrelationConfig struct { TimeWindow time.Duration ApacheAlwaysEmit bool + ApacheEmitDelayMs int // Delay in ms before emitting orphan A (default: 500ms) NetworkEmit bool MaxHTTPBufferSize int // Maximum events to buffer for source A (HTTP) MaxNetworkBufferSize int // Maximum events to buffer for source B (Network) @@ -39,17 +43,25 @@ type CorrelationConfig struct { MatchingMode string // "one_to_one" or "one_to_many" (Keep-Alive) } +// pendingOrphan represents an A event waiting to be emitted as orphan. +type pendingOrphan struct { + event *NormalizedEvent + emitAfter time.Time // Timestamp when this orphan should be emitted + timer *time.Timer +} + // CorrelationService handles the correlation logic between source A and B events. type CorrelationService struct { - config CorrelationConfig - mu sync.Mutex - bufferA *eventBuffer - bufferB *eventBuffer - pendingA map[string][]*list.Element // key -> ordered elements containing *NormalizedEvent - pendingB map[string][]*list.Element - networkTTLs map[*list.Element]time.Time // TTL expiration time for each B event - timeProvider TimeProvider - logger *observability.Logger + config CorrelationConfig + mu sync.Mutex + bufferA *eventBuffer + bufferB *eventBuffer + pendingA map[string][]*list.Element // key -> ordered elements containing *NormalizedEvent + pendingB map[string][]*list.Element + networkTTLs map[*list.Element]time.Time // TTL expiration time for each B event + pendingOrphans map[string][]*pendingOrphan // key -> A events waiting to be emitted as orphans + timeProvider TimeProvider + logger *observability.Logger } type eventBuffer struct { @@ -94,16 +106,21 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider) if config.MatchingMode == "" { config.MatchingMode = MatchingModeOneToMany // Default to Keep-Alive } + // Zero delay is valid for backward compatibility (immediate emission mode) + if config.ApacheEmitDelayMs < 0 { + config.ApacheEmitDelayMs = DefaultApacheEmitDelayMs + } return &CorrelationService{ - config: config, - bufferA: newEventBuffer(), - bufferB: newEventBuffer(), - pendingA: make(map[string][]*list.Element), - pendingB: make(map[string][]*list.Element), - networkTTLs: make(map[*list.Element]time.Time), - timeProvider: timeProvider, - logger: observability.NewLogger("correlation"), + config: config, + bufferA: newEventBuffer(), + bufferB: newEventBuffer(), + pendingA: make(map[string][]*list.Element), + pendingB: make(map[string][]*list.Element), + pendingOrphans: make(map[string][]*pendingOrphan), + networkTTLs: make(map[*list.Element]time.Time), + timeProvider: timeProvider, + logger: observability.NewLogger("correlation"), } } @@ -120,6 +137,9 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo // Clean expired events first s.cleanExpired() + // Emit pending orphans that have passed their delay + orphanResults := s.emitPendingOrphans() + // Check buffer overflow before adding if s.isBufferFull(event.Source) { // Buffer full - rotate oldest event instead of dropping new one @@ -155,6 +175,11 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo event.Source, event.SrcIP, event.SrcPort, s.getBufferSize(event.Source)) } + // Combine orphan results with correlation results + if len(orphanResults) > 0 { + results = append(orphanResults, results...) + } + return results } @@ -262,10 +287,21 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate } // No match found - orphan A event + // Instead of emitting immediately, add to pending orphans with delay + // This allows B events to arrive slightly after A and still correlate if s.config.ApacheAlwaysEmit { - orphan := NewCorrelatedLogFromEvent(event, "A") - s.logger.Warnf("orphan A event (no B match): src_ip=%s src_port=%d", event.SrcIP, event.SrcPort) - return []CorrelatedLog{orphan}, false + // Zero delay = immediate emission (backward compatibility mode) + if s.config.ApacheEmitDelayMs == 0 { + orphan := NewCorrelatedLogFromEvent(event, "A") + s.logger.Warnf("orphan A event (immediate): src_ip=%s src_port=%d", event.SrcIP, event.SrcPort) + return []CorrelatedLog{orphan}, false + } + s.addPendingOrphan(event) + s.logger.Debugf("A event added to pending orphans (delay=%dms): src_ip=%s src_port=%d", + s.config.ApacheEmitDelayMs, event.SrcIP, event.SrcPort) + // Don't emit yet - will be emitted after delay expires + // Return empty results, event stays in pending orphans + return nil, false } // Keep in buffer for potential future match @@ -275,7 +311,16 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate func (s *CorrelationService) processSourceB(event *NormalizedEvent) ([]CorrelatedLog, bool) { key := event.CorrelationKey() - // Look for the first matching A event (one-to-one first match) + // FIRST: Check if there's a pending orphan A that matches this B event + // This is the key optimization for delayed orphan emission + if aEvent := s.checkPendingOrphansForCorrelation(event); aEvent != nil { + correlated := NewCorrelatedLog(aEvent, event) + s.logger.Debugf("correlation found (pending orphan): A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)", + aEvent.SrcIP, aEvent.SrcPort, event.SrcIP, event.SrcPort) + return []CorrelatedLog{correlated}, false + } + + // SECOND: Look for the first matching A event in buffer if aEvent := s.findAndPopFirstMatch(s.bufferA, s.pendingA, key, func(other *NormalizedEvent) bool { return s.eventsMatch(other, event) }); aEvent != nil { @@ -518,6 +563,103 @@ func (s *CorrelationService) resetNetworkTTL(elem *list.Element) { s.networkTTLs[elem] = s.timeProvider.Now().Add(time.Duration(s.config.NetworkTTLS) * time.Second) } +// addPendingOrphan adds an A event to the pending orphans list with a delayed emission. +func (s *CorrelationService) addPendingOrphan(event *NormalizedEvent) { + key := event.CorrelationKey() + emitAfter := s.timeProvider.Now().Add(time.Duration(s.config.ApacheEmitDelayMs) * time.Millisecond) + + orphan := &pendingOrphan{ + event: event, + emitAfter: emitAfter, + } + + s.pendingOrphans[key] = append(s.pendingOrphans[key], orphan) + s.logger.Debugf("A event added to pending orphans: src_ip=%s src_port=%d emit_after=%v", + event.SrcIP, event.SrcPort, emitAfter) +} + +// removePendingOrphan removes a specific pending orphan by event reference. +// Returns true if found and removed, false otherwise. +func (s *CorrelationService) removePendingOrphan(event *NormalizedEvent) bool { + key := event.CorrelationKey() + orphans, ok := s.pendingOrphans[key] + if !ok { + return false + } + + for i, orphan := range orphans { + if orphan.event == event { + // Stop the timer if it exists + if orphan.timer != nil { + orphan.timer.Stop() + } + s.pendingOrphans[key] = append(orphans[:i], orphans[i+1:]...) + if len(s.pendingOrphans[key]) == 0 { + delete(s.pendingOrphans, key) + } + return true + } + } + return false +} + +// checkPendingOrphansForCorrelation checks if any pending orphans match the given B event. +// Returns the first matching A event and removes it from pending orphans. +func (s *CorrelationService) checkPendingOrphansForCorrelation(bEvent *NormalizedEvent) *NormalizedEvent { + key := bEvent.CorrelationKey() + orphans, ok := s.pendingOrphans[key] + if !ok || len(orphans) == 0 { + return nil + } + + for i, orphan := range orphans { + if s.eventsMatch(orphan.event, bEvent) { + // Found a match! Remove from pending and return + aEvent := orphan.event + s.pendingOrphans[key] = append(orphans[:i], orphans[i+1:]...) + if len(s.pendingOrphans[key]) == 0 { + delete(s.pendingOrphans, key) + } + s.logger.Debugf("pending orphan matched with B: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)", + aEvent.SrcIP, aEvent.SrcPort, bEvent.SrcIP, bEvent.SrcPort) + return aEvent + } + } + + return nil +} + +// emitPendingOrphans emits all pending orphans that have passed their emit delay. +// Returns the list of correlated logs to emit. +func (s *CorrelationService) emitPendingOrphans() []CorrelatedLog { + if !s.config.ApacheAlwaysEmit { + return nil + } + + now := s.timeProvider.Now() + var results []CorrelatedLog + + for key, orphans := range s.pendingOrphans { + for i := len(orphans) - 1; i >= 0; i-- { + if now.After(orphans[i].emitAfter) { + // Time to emit this orphan + orphan := NewCorrelatedLogFromEvent(orphans[i].event, "A") + s.logger.Warnf("orphan A event (emit delay expired): src_ip=%s src_port=%d", + orphans[i].event.SrcIP, orphans[i].event.SrcPort) + results = append(results, orphan) + + // Remove from pending + s.pendingOrphans[key] = append(orphans[:i], orphans[i+1:]...) + if len(s.pendingOrphans[key]) == 0 { + delete(s.pendingOrphans, key) + } + } + } + } + + return results +} + func removeElementFromSlice(elements []*list.Element, target *list.Element) []*list.Element { if len(elements) == 0 { return elements @@ -582,6 +724,18 @@ func (s *CorrelationService) Flush() []CorrelatedLog { } } + // Emit all pending orphans (immediately, ignoring delay) + if s.config.ApacheAlwaysEmit { + for _, orphans := range s.pendingOrphans { + for _, orphan := range orphans { + correlatedLog := NewCorrelatedLogFromEvent(orphan.event, "A") + s.logger.Warnf("flush pending orphan: src_ip=%s src_port=%d", + orphan.event.SrcIP, orphan.event.SrcPort) + results = append(results, correlatedLog) + } + } + } + // Never emit remaining B events alone. // Clear buffers @@ -589,6 +743,7 @@ func (s *CorrelationService) Flush() []CorrelatedLog { s.bufferB.events.Init() s.pendingA = make(map[string][]*list.Element) s.pendingB = make(map[string][]*list.Element) + s.pendingOrphans = make(map[string][]*pendingOrphan) s.networkTTLs = make(map[*list.Element]time.Time) return results diff --git a/internal/domain/correlation_service_test.go b/internal/domain/correlation_service_test.go index d2804a6..964fced 100644 --- a/internal/domain/correlation_service_test.go +++ b/internal/domain/correlation_service_test.go @@ -145,6 +145,7 @@ func TestCorrelationService_Flush(t *testing.T) { config := CorrelationConfig{ TimeWindow: time.Second, ApacheAlwaysEmit: true, + ApacheEmitDelayMs: 0, // Zero delay = immediate emission (backward compatibility) NetworkEmit: false, MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, @@ -161,7 +162,7 @@ func TestCorrelationService_Flush(t *testing.T) { SrcPort: 8080, } - // A est émis immédiatement quand ApacheAlwaysEmit=true + // A est émis immédiatement quand ApacheAlwaysEmit=true et ApacheEmitDelayMs=0 results := svc.ProcessEvent(apacheEvent) if len(results) != 1 { t.Fatalf("expected 1 immediate orphan event, got %d", len(results)) @@ -1079,3 +1080,240 @@ func TestCorrelationService_CleanA_RespectsBTTL(t *testing.T) { t.Errorf("expected A also expired at t=35s (25s old > 5s TimeWindow), got %d", a) } } + +func TestCorrelationService_ApacheEmitDelay_BArrivesDuringDelay(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + ApacheEmitDelayMs: 500, // 500ms delay + NetworkEmit: false, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, + } + + svc := NewCorrelationService(config, timeProvider) + + // Send Apache event - should be added to pending orphans, NOT emitted immediately + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + + results := svc.ProcessEvent(apacheEvent) + if len(results) != 0 { + t.Fatalf("expected 0 immediate results (pending orphan), got %d", len(results)) + } + + // Advance time by 250ms (less than delay) + timeProvider.now = now.Add(250 * time.Millisecond) + + // Send B event - should correlate with pending A + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now.Add(100 * time.Millisecond), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"ja3": "abc"}, + } + + results = svc.ProcessEvent(networkEvent) + if len(results) != 1 { + t.Errorf("expected 1 correlated result, got %d", len(results)) + } else if !results[0].Correlated { + t.Error("expected correlated result") + } + + // Verify no pending orphans remain + if len(svc.pendingOrphans) != 0 { + t.Errorf("expected pending orphans empty, got %d keys", len(svc.pendingOrphans)) + } +} + +func TestCorrelationService_ApacheEmitDelay_NoBArrives(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + ApacheEmitDelayMs: 500, // 500ms delay + NetworkEmit: false, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, + } + + svc := NewCorrelationService(config, timeProvider) + + // Send Apache event + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + + results := svc.ProcessEvent(apacheEvent) + if len(results) != 0 { + t.Fatalf("expected 0 immediate results, got %d", len(results)) + } + + // Advance time past delay (600ms > 500ms) + timeProvider.now = now.Add(600 * time.Millisecond) + + // Send another event (unrelated) to trigger emitPendingOrphans + unrelatedEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "10.0.0.1", + SrcPort: 9999, + } + + results = svc.ProcessEvent(unrelatedEvent) + if len(results) != 1 { + t.Fatalf("expected 1 orphan result, got %d", len(results)) + } + if results[0].OrphanSide != "A" { + t.Errorf("expected orphan side A, got %s", results[0].OrphanSide) + } +} + +func TestCorrelationService_ApacheEmitDelay_ZeroDelay(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + ApacheEmitDelayMs: 0, // Zero delay = immediate emission (backward compat) + NetworkEmit: false, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, + } + + svc := NewCorrelationService(config, timeProvider) + + // Send Apache event - should emit immediately with zero delay + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + + results := svc.ProcessEvent(apacheEvent) + // With zero delay, should emit immediately (backward compatibility mode) + if len(results) != 1 { + t.Fatalf("expected 1 immediate result (zero delay mode), got %d", len(results)) + } +} + +func TestCorrelationService_ApacheEmitDelay_MultipleA_SameKey(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + ApacheEmitDelayMs: 1000, // 1s delay + NetworkEmit: false, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, + } + + svc := NewCorrelationService(config, timeProvider) + + // Send multiple A events with same key + for i := 0; i < 3; i++ { + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now.Add(time.Duration(i) * time.Millisecond), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET", "path": fmt.Sprintf("/api/%d", i)}, + } + results := svc.ProcessEvent(apacheEvent) + if len(results) != 0 { + t.Fatalf("expected 0 immediate results for A%d, got %d", i, len(results)) + } + } + + // Verify 3 pending orphans + key := "192.168.1.1:8080" + if len(svc.pendingOrphans[key]) != 3 { + t.Errorf("expected 3 pending orphans, got %d", len(svc.pendingOrphans[key])) + } + + // Advance time past delay + timeProvider.now = now.Add(1100 * time.Millisecond) + + // Trigger emit with unrelated event + unrelatedEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "10.0.0.1", + SrcPort: 9999, + } + results := svc.ProcessEvent(unrelatedEvent) + + // Should emit all 3 orphans + if len(results) != 3 { + t.Errorf("expected 3 orphan results, got %d", len(results)) + } +} + +func TestCorrelationService_ApacheEmitDelay_Flush(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + ApacheEmitDelayMs: 5000, // 5s delay + NetworkEmit: false, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, + } + + svc := NewCorrelationService(config, timeProvider) + + // Send A event + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + + results := svc.ProcessEvent(apacheEvent) + if len(results) != 0 { + t.Fatalf("expected 0 immediate results, got %d", len(results)) + } + + // Flush immediately (should emit pending orphans regardless of delay) + flushed := svc.Flush() + if len(flushed) != 1 { + t.Errorf("expected 1 flushed orphan, got %d", len(flushed)) + } else if flushed[0].OrphanSide != "A" { + t.Errorf("expected orphan side A, got %s", flushed[0].OrphanSide) + } +} +