fix(correlation/bug3): emit pending orphans on B TTL expiry (v1.1.15)
cleanNetworkBufferByTTL was deleting pendingOrphans without emitting them, causing silent data loss when a B event (network connection) expired while A events were still waiting in the 500ms orphan delay buffer. Fix: cleanNetworkBufferByTTL now returns []CorrelatedLog for forced orphans; cleanExpired propagates them; ProcessEvent includes them in returned results. TestBTTLExpiry_PurgesPendingOrphans extended to assert the orphan is actually returned in ProcessEvent results (not just removed from internal state). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2
Makefile
2
Makefile
@ -20,7 +20,7 @@ BINARY_NAME=logcorrelator
|
||||
DIST_DIR=dist
|
||||
|
||||
# Package version
|
||||
PKG_VERSION ?= 1.1.14
|
||||
PKG_VERSION ?= 1.1.15
|
||||
|
||||
# Enable BuildKit for better performance
|
||||
export DOCKER_BUILDKIT=1
|
||||
|
||||
@ -221,11 +221,14 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
|
||||
// Record event received
|
||||
s.metrics.RecordEventReceived(string(event.Source))
|
||||
|
||||
// Clean expired events first
|
||||
s.cleanExpired()
|
||||
// Clean expired events first; collect any orphans forced out by B TTL expiry
|
||||
ttlOrphans := s.cleanExpired()
|
||||
|
||||
// Emit pending orphans that have passed their delay
|
||||
orphanResults := s.emitPendingOrphans()
|
||||
if len(ttlOrphans) > 0 {
|
||||
orphanResults = append(ttlOrphans, orphanResults...)
|
||||
}
|
||||
|
||||
// Check buffer overflow before adding
|
||||
if s.isBufferFull(event.Source) {
|
||||
@ -540,17 +543,20 @@ func (s *CorrelationService) addEvent(event *NormalizedEvent) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *CorrelationService) cleanExpired() {
|
||||
func (s *CorrelationService) cleanExpired() []CorrelatedLog {
|
||||
// Clean expired B events first - use TTL map only (not event timestamp)
|
||||
// This is critical for Keep-Alive: TTL is reset on each correlation,
|
||||
// so we must check the reset TTL, not the original event timestamp.
|
||||
s.cleanNetworkBufferByTTL()
|
||||
// Returns any orphan A events that must be emitted because their B expired.
|
||||
orphans := s.cleanNetworkBufferByTTL()
|
||||
|
||||
// Clean A events based on B TTL, not system time
|
||||
// An A event should only be removed if:
|
||||
// 1. It has already been correlated (no longer in buffer), OR
|
||||
// 2. All potential matching B events have expired (TTL exceeded)
|
||||
s.cleanBufferAByBTTL()
|
||||
|
||||
return orphans
|
||||
}
|
||||
|
||||
// cleanBufferAByBTTL cleans A events based on whether corresponding B events still exist.
|
||||
@ -634,10 +640,11 @@ func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string
|
||||
// For Keep-Alive support, we check the TTL map (which is reset on each correlation)
|
||||
// rather than the original event timestamp.
|
||||
// When a B event expires, any pending orphan A events for that key are immediately
|
||||
// emitted (they can no longer ever be correlated).
|
||||
func (s *CorrelationService) cleanNetworkBufferByTTL() {
|
||||
// returned for emission (they can no longer be correlated — their B connection is gone).
|
||||
func (s *CorrelationService) cleanNetworkBufferByTTL() []CorrelatedLog {
|
||||
now := s.timeProvider.Now()
|
||||
var removed int
|
||||
var forced []CorrelatedLog
|
||||
for elem, ttl := range s.networkTTLs {
|
||||
if now.After(ttl) {
|
||||
event := elem.Value.(*NormalizedEvent)
|
||||
@ -649,8 +656,9 @@ func (s *CorrelationService) cleanNetworkBufferByTTL() {
|
||||
// Connection fully gone: reset Keep-Alive counter for this key
|
||||
delete(s.keepAliveSeqA, key)
|
||||
|
||||
// Purge any pending orphan A events for this key: they can no longer
|
||||
// correlate with a B (the connection is closed), emit them immediately.
|
||||
// Immediately emit any pending orphan A events for this key.
|
||||
// They can no longer correlate with a B (the connection is closed),
|
||||
// and waiting for the timer would delay them unnecessarily.
|
||||
if s.config.ApacheAlwaysEmit {
|
||||
if orphans, ok := s.pendingOrphans[key]; ok && len(orphans) > 0 {
|
||||
s.logger.Debugf("B TTL expired, emitting %d pending orphan(s) for key=%s", len(orphans), key)
|
||||
@ -658,6 +666,7 @@ func (s *CorrelationService) cleanNetworkBufferByTTL() {
|
||||
s.logger.Warnf("orphan A event (B TTL expired): src_ip=%s src_port=%d key=%s keepalive_seq=%d",
|
||||
o.event.SrcIP, o.event.SrcPort, key, o.event.KeepAliveSeq)
|
||||
s.metrics.RecordOrphanEmitted("A")
|
||||
forced = append(forced, NewCorrelatedLogFromEvent(o.event, "A"))
|
||||
}
|
||||
delete(s.pendingOrphans, key)
|
||||
}
|
||||
@ -673,6 +682,7 @@ func (s *CorrelationService) cleanNetworkBufferByTTL() {
|
||||
if removed > 0 {
|
||||
s.logger.Debugf("cleaned %d expired B events", removed)
|
||||
}
|
||||
return forced
|
||||
}
|
||||
|
||||
func (s *CorrelationService) findAndPopFirstMatch(
|
||||
|
||||
@ -1733,12 +1733,13 @@ t.Errorf("expected pendingOrphans empty after match, got %d keys", len(svc.pendi
|
||||
}
|
||||
|
||||
// TestBTTLExpiry_PurgesPendingOrphans verifies that when a B event TTL expires,
|
||||
// pending orphan A events for that key are purged. Regression test for Bug #3.
|
||||
// pending orphan A events for that key are emitted immediately (not lost).
|
||||
// Regression test for Bug #3 — including the data-loss fix.
|
||||
func TestBTTLExpiry_PurgesPendingOrphans(t *testing.T) {
|
||||
now := time.Date(2026, 3, 5, 16, 30, 0, 0, time.UTC)
|
||||
tp := &mockTimeProvider{now: now}
|
||||
|
||||
// Use one_to_one + tight TimeWindow so A goes to pending orphans in old code path
|
||||
// Use one_to_one + tight TimeWindow so A goes to pending orphans
|
||||
config := CorrelationConfig{
|
||||
TimeWindow: 2 * time.Second,
|
||||
ApacheAlwaysEmit: true,
|
||||
@ -1766,15 +1767,27 @@ t.Skip("A not in pendingOrphans (different code path) -- skipping")
|
||||
}
|
||||
|
||||
// Advance past B TTL (5s from t=0, now at t=6s)
|
||||
// Trigger cleanExpired via ProcessEvent with a dummy event; collect returned logs
|
||||
tp.now = now.Add(6 * time.Second)
|
||||
// Trigger cleanExpired via ProcessEvent with a dummy event
|
||||
svc.ProcessEvent(&NormalizedEvent{
|
||||
returned := svc.ProcessEvent(&NormalizedEvent{
|
||||
Source: SourceB, Timestamp: tp.now, SrcIP: "99.99.99.1", SrcPort: 1,
|
||||
})
|
||||
|
||||
// pendingOrphans must be cleared
|
||||
if _, exists := svc.pendingOrphans["10.9.9.9:8888"]; exists {
|
||||
t.Errorf("Bug #3: pendingOrphans not purged after B TTL expired")
|
||||
}
|
||||
|
||||
// The orphan must have been returned (not silently lost) — data-loss fix
|
||||
orphanFound := false
|
||||
for _, r := range returned {
|
||||
if !r.Correlated && r.SrcIP == "10.9.9.9" {
|
||||
orphanFound = true
|
||||
}
|
||||
}
|
||||
if !orphanFound {
|
||||
t.Errorf("Bug #3 data-loss: orphan A not returned when B TTL expired (got %d results)", len(returned))
|
||||
}
|
||||
}
|
||||
|
||||
// TestEmitPendingOrphans_PublicMethod verifies EmitPendingOrphans() emits orphans
|
||||
|
||||
@ -145,6 +145,13 @@ exit 0
|
||||
%config(noreplace) /etc/logrotate.d/logcorrelator
|
||||
|
||||
%changelog
|
||||
* Thu Mar 05 2026 logcorrelator <dev@example.com> - 1.1.15-1
|
||||
- Fix(correlation/bug3): perte de donnees quand B expire avec des orphelins en attente
|
||||
cleanNetworkBufferByTTL supprimait les pendingOrphans sans les emettre (perte silencieuse).
|
||||
Desormais, les orphelins A sont retournes immediatement a l'appelant quand B expire,
|
||||
et cleanExpired/ProcessEvent propagent ces resultats vers le sink.
|
||||
Test: TestBTTLExpiry_PurgesPendingOrphans etendu pour verifier l'emission effective.
|
||||
|
||||
* Thu Mar 05 2026 logcorrelator <dev@example.com> - 1.1.14-1
|
||||
- Fix(correlation/bug1): Keep-Alive sessions au-dela de TimeWindow ne correlent plus en orphelins
|
||||
Le matcher dans processSourceA utilisait eventsMatch (comparaison de timestamps) en mode
|
||||
|
||||
Reference in New Issue
Block a user