From b47f4258fd0e187f91b01731f110705d77d76cc8 Mon Sep 17 00:00:00 2001 From: toto Date: Thu, 5 Mar 2026 18:20:08 +0100 Subject: [PATCH] fix(correlation/bug3): emit pending orphans on B TTL expiry (v1.1.15) cleanNetworkBufferByTTL was deleting pendingOrphans without emitting them, causing silent data loss when a B event (network connection) expired while A events were still waiting in the 500ms orphan delay buffer. Fix: cleanNetworkBufferByTTL now returns []CorrelatedLog for forced orphans; cleanExpired propagates them; ProcessEvent includes them in returned results. TestBTTLExpiry_PurgesPendingOrphans extended to assert the orphan is actually returned in ProcessEvent results (not just removed from internal state). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- Makefile | 2 +- internal/domain/correlation_service.go | 26 +++++-- internal/domain/correlation_service_test.go | 83 ++++++++++++--------- packaging/rpm/logcorrelator.spec | 7 ++ 4 files changed, 74 insertions(+), 44 deletions(-) diff --git a/Makefile b/Makefile index 26e2d9a..6514f9d 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ BINARY_NAME=logcorrelator DIST_DIR=dist # Package version -PKG_VERSION ?= 1.1.14 +PKG_VERSION ?= 1.1.15 # Enable BuildKit for better performance export DOCKER_BUILDKIT=1 diff --git a/internal/domain/correlation_service.go b/internal/domain/correlation_service.go index 92ded24..b4f51b1 100644 --- a/internal/domain/correlation_service.go +++ b/internal/domain/correlation_service.go @@ -221,11 +221,14 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo // Record event received s.metrics.RecordEventReceived(string(event.Source)) - // Clean expired events first - s.cleanExpired() + // Clean expired events first; collect any orphans forced out by B TTL expiry + ttlOrphans := s.cleanExpired() // Emit pending orphans that have passed their delay orphanResults := s.emitPendingOrphans() + if len(ttlOrphans) > 0 { + orphanResults = append(ttlOrphans, orphanResults...) + } // Check buffer overflow before adding if s.isBufferFull(event.Source) { @@ -540,17 +543,20 @@ func (s *CorrelationService) addEvent(event *NormalizedEvent) { } } -func (s *CorrelationService) cleanExpired() { +func (s *CorrelationService) cleanExpired() []CorrelatedLog { // Clean expired B events first - use TTL map only (not event timestamp) // This is critical for Keep-Alive: TTL is reset on each correlation, // so we must check the reset TTL, not the original event timestamp. - s.cleanNetworkBufferByTTL() + // Returns any orphan A events that must be emitted because their B expired. + orphans := s.cleanNetworkBufferByTTL() // Clean A events based on B TTL, not system time // An A event should only be removed if: // 1. It has already been correlated (no longer in buffer), OR // 2. All potential matching B events have expired (TTL exceeded) s.cleanBufferAByBTTL() + + return orphans } // cleanBufferAByBTTL cleans A events based on whether corresponding B events still exist. @@ -634,10 +640,11 @@ func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string // For Keep-Alive support, we check the TTL map (which is reset on each correlation) // rather than the original event timestamp. // When a B event expires, any pending orphan A events for that key are immediately -// emitted (they can no longer ever be correlated). -func (s *CorrelationService) cleanNetworkBufferByTTL() { +// returned for emission (they can no longer be correlated — their B connection is gone). +func (s *CorrelationService) cleanNetworkBufferByTTL() []CorrelatedLog { now := s.timeProvider.Now() var removed int + var forced []CorrelatedLog for elem, ttl := range s.networkTTLs { if now.After(ttl) { event := elem.Value.(*NormalizedEvent) @@ -649,8 +656,9 @@ func (s *CorrelationService) cleanNetworkBufferByTTL() { // Connection fully gone: reset Keep-Alive counter for this key delete(s.keepAliveSeqA, key) - // Purge any pending orphan A events for this key: they can no longer - // correlate with a B (the connection is closed), emit them immediately. + // Immediately emit any pending orphan A events for this key. + // They can no longer correlate with a B (the connection is closed), + // and waiting for the timer would delay them unnecessarily. if s.config.ApacheAlwaysEmit { if orphans, ok := s.pendingOrphans[key]; ok && len(orphans) > 0 { s.logger.Debugf("B TTL expired, emitting %d pending orphan(s) for key=%s", len(orphans), key) @@ -658,6 +666,7 @@ func (s *CorrelationService) cleanNetworkBufferByTTL() { s.logger.Warnf("orphan A event (B TTL expired): src_ip=%s src_port=%d key=%s keepalive_seq=%d", o.event.SrcIP, o.event.SrcPort, key, o.event.KeepAliveSeq) s.metrics.RecordOrphanEmitted("A") + forced = append(forced, NewCorrelatedLogFromEvent(o.event, "A")) } delete(s.pendingOrphans, key) } @@ -673,6 +682,7 @@ func (s *CorrelationService) cleanNetworkBufferByTTL() { if removed > 0 { s.logger.Debugf("cleaned %d expired B events", removed) } + return forced } func (s *CorrelationService) findAndPopFirstMatch( diff --git a/internal/domain/correlation_service_test.go b/internal/domain/correlation_service_test.go index a359a4b..f3c1516 100644 --- a/internal/domain/correlation_service_test.go +++ b/internal/domain/correlation_service_test.go @@ -1733,48 +1733,61 @@ t.Errorf("expected pendingOrphans empty after match, got %d keys", len(svc.pendi } // TestBTTLExpiry_PurgesPendingOrphans verifies that when a B event TTL expires, -// pending orphan A events for that key are purged. Regression test for Bug #3. +// pending orphan A events for that key are emitted immediately (not lost). +// Regression test for Bug #3 — including the data-loss fix. func TestBTTLExpiry_PurgesPendingOrphans(t *testing.T) { -now := time.Date(2026, 3, 5, 16, 30, 0, 0, time.UTC) -tp := &mockTimeProvider{now: now} + now := time.Date(2026, 3, 5, 16, 30, 0, 0, time.UTC) + tp := &mockTimeProvider{now: now} -// Use one_to_one + tight TimeWindow so A goes to pending orphans in old code path -config := CorrelationConfig{ -TimeWindow: 2 * time.Second, -ApacheAlwaysEmit: true, -ApacheEmitDelayMs: 500, -NetworkTTLS: 5, -MatchingMode: MatchingModeOneToOne, -MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, -MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, -} -svc := NewCorrelationService(config, tp) + // Use one_to_one + tight TimeWindow so A goes to pending orphans + config := CorrelationConfig{ + TimeWindow: 2 * time.Second, + ApacheAlwaysEmit: true, + ApacheEmitDelayMs: 500, + NetworkTTLS: 5, + MatchingMode: MatchingModeOneToOne, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + } + svc := NewCorrelationService(config, tp) -// B arrives -svc.ProcessEvent(&NormalizedEvent{ -Source: SourceB, Timestamp: now, SrcIP: "10.9.9.9", SrcPort: 8888, -}) + // B arrives + svc.ProcessEvent(&NormalizedEvent{ + Source: SourceB, Timestamp: now, SrcIP: "10.9.9.9", SrcPort: 8888, + }) -// A arrives 3s later -- beyond TimeWindow(2s), no match -> pending orphan -tp.now = now.Add(3 * time.Second) -svc.ProcessEvent(&NormalizedEvent{ -Source: SourceA, Timestamp: tp.now, SrcIP: "10.9.9.9", SrcPort: 8888, -}) + // A arrives 3s later -- beyond TimeWindow(2s), no match -> pending orphan + tp.now = now.Add(3 * time.Second) + svc.ProcessEvent(&NormalizedEvent{ + Source: SourceA, Timestamp: tp.now, SrcIP: "10.9.9.9", SrcPort: 8888, + }) -if _, exists := svc.pendingOrphans["10.9.9.9:8888"]; !exists { -t.Skip("A not in pendingOrphans (different code path) -- skipping") -} + if _, exists := svc.pendingOrphans["10.9.9.9:8888"]; !exists { + t.Skip("A not in pendingOrphans (different code path) -- skipping") + } -// Advance past B TTL (5s from t=0, now at t=6s) -tp.now = now.Add(6 * time.Second) -// Trigger cleanExpired via ProcessEvent with a dummy event -svc.ProcessEvent(&NormalizedEvent{ -Source: SourceB, Timestamp: tp.now, SrcIP: "99.99.99.1", SrcPort: 1, -}) + // Advance past B TTL (5s from t=0, now at t=6s) + // Trigger cleanExpired via ProcessEvent with a dummy event; collect returned logs + tp.now = now.Add(6 * time.Second) + returned := svc.ProcessEvent(&NormalizedEvent{ + Source: SourceB, Timestamp: tp.now, SrcIP: "99.99.99.1", SrcPort: 1, + }) -if _, exists := svc.pendingOrphans["10.9.9.9:8888"]; exists { -t.Errorf("Bug #3: pendingOrphans not purged after B TTL expired") -} + // pendingOrphans must be cleared + if _, exists := svc.pendingOrphans["10.9.9.9:8888"]; exists { + t.Errorf("Bug #3: pendingOrphans not purged after B TTL expired") + } + + // The orphan must have been returned (not silently lost) — data-loss fix + orphanFound := false + for _, r := range returned { + if !r.Correlated && r.SrcIP == "10.9.9.9" { + orphanFound = true + } + } + if !orphanFound { + t.Errorf("Bug #3 data-loss: orphan A not returned when B TTL expired (got %d results)", len(returned)) + } } // TestEmitPendingOrphans_PublicMethod verifies EmitPendingOrphans() emits orphans diff --git a/packaging/rpm/logcorrelator.spec b/packaging/rpm/logcorrelator.spec index 4912ba8..18c8892 100644 --- a/packaging/rpm/logcorrelator.spec +++ b/packaging/rpm/logcorrelator.spec @@ -145,6 +145,13 @@ exit 0 %config(noreplace) /etc/logrotate.d/logcorrelator %changelog +* Thu Mar 05 2026 logcorrelator - 1.1.15-1 +- Fix(correlation/bug3): perte de donnees quand B expire avec des orphelins en attente + cleanNetworkBufferByTTL supprimait les pendingOrphans sans les emettre (perte silencieuse). + Desormais, les orphelins A sont retournes immediatement a l'appelant quand B expire, + et cleanExpired/ProcessEvent propagent ces resultats vers le sink. + Test: TestBTTLExpiry_PurgesPendingOrphans etendu pour verifier l'emission effective. + * Thu Mar 05 2026 logcorrelator - 1.1.14-1 - Fix(correlation/bug1): Keep-Alive sessions au-dela de TimeWindow ne correlent plus en orphelins Le matcher dans processSourceA utilisait eventsMatch (comparaison de timestamps) en mode