fix(correlation): Keep-Alive time window + orphan timer + TTL purge (v1.1.14)
Some checks failed
Build and Test / test (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / docker (push) Has been cancelled

Bug #1 - processSourceA: utilise bEventHasValidTTL en mode one_to_many
  au lieu de eventsMatch qui comparait les timestamps originaux. Apres ~10s
  les requetes A devenaient toutes orphelines alors que la session KA etait active.

Bug #4 - checkPendingOrphansForCorrelation: meme correction, cle identique
  = meme connexion en one_to_many, pas besoin de comparer les timestamps.

Bug #3 - cleanNetworkBufferByTTL: expiration B => emission immediate
  des pending orphans associes (ils ne peuvent plus jamais corréler).

Bug #2 - Orchestrateur: goroutine ticker 250ms appelle EmitPendingOrphans()
  pour drainer les orphans independamment du flux d'evenements entrants.
  EmitPendingOrphans() expose la methode comme publique thread-safe.

Tests: 4 nouveaux tests de non-regression (un par bug).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-03-05 17:01:37 +01:00
parent 9979644b62
commit 0fca6e4e93
7 changed files with 6567 additions and 11 deletions

View File

@ -15,6 +15,10 @@ const (
DefaultEventChannelBufferSize = 1000
// ShutdownTimeout is the maximum time to wait for graceful shutdown
ShutdownTimeout = 30 * time.Second
// OrphanTickInterval is how often the orchestrator drains pending orphans.
// Set to half the default emit delay (500ms/2) so orphans are emitted promptly
// even when no new events arrive.
OrphanTickInterval = 250 * time.Millisecond
)
// OrchestratorConfig holds the orchestrator configuration.
@ -77,6 +81,27 @@ func (o *Orchestrator) Start() error {
}(source, eventChan)
}
// Start a periodic ticker to drain pending orphan A events independently of the
// event flow. Without this, orphans are only emitted when a new event arrives,
// causing them to accumulate silently when the source goes quiet.
o.wg.Add(1)
go func() {
defer o.wg.Done()
ticker := time.NewTicker(OrphanTickInterval)
defer ticker.Stop()
for {
select {
case <-o.ctx.Done():
return
case <-ticker.C:
logs := o.correlationSvc.EmitPendingOrphans()
for _, log := range logs {
o.config.Sink.Write(o.ctx, log) //nolint:errcheck
}
}
}
}()
return nil
}

View File

@ -352,7 +352,13 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate
s.logger.Debugf("processing A event: key=%s keepalive_seq=%d timestamp=%v", key, event.KeepAliveSeq, event.Timestamp)
// Look for matching B events
// In one_to_many (Keep-Alive) mode use TTL-based matching: B is valid as long as
// its TTL has not expired (the timestamp is fixed at TCP connection time and grows
// beyond TimeWindow after ~10s, making timestamp comparison incorrect).
matches := s.findMatches(s.bufferB, s.pendingB, key, func(other *NormalizedEvent) bool {
if s.config.MatchingMode == MatchingModeOneToMany {
return s.bEventHasValidTTL(other)
}
return s.eventsMatch(event, other)
})
@ -392,17 +398,22 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate
return results, false
}
// No match found - check if there are B events with same key but outside time window
// No match found - check if there are B events with same key but outside match criteria
if bElements, ok := s.pendingB[key]; ok && len(bElements) > 0 {
// Key exists but no time match - log the time difference for debugging
for _, bElem := range bElements {
bEvent := bElem.Value.(*NormalizedEvent)
timeDiff := event.Timestamp.Sub(bEvent.Timestamp)
if timeDiff < 0 {
timeDiff = -timeDiff
if s.config.MatchingMode == MatchingModeOneToMany {
// In Keep-Alive mode, B exists but TTL expired
s.logger.Debugf("A event has same key as B but B TTL expired: key=%s", key)
} else {
// In one-to-one mode, B exists but outside time window
for _, bElem := range bElements {
bEvent := bElem.Value.(*NormalizedEvent)
timeDiff := event.Timestamp.Sub(bEvent.Timestamp)
if timeDiff < 0 {
timeDiff = -timeDiff
}
s.logger.Debugf("A event has same key as B but outside time window: key=%s time_diff=%v window=%v",
key, timeDiff, s.config.TimeWindow)
}
s.logger.Debugf("A event has same key as B but outside time window: key=%s time_diff=%v window=%v",
key, timeDiff, s.config.TimeWindow)
}
s.metrics.RecordCorrelationFailed("time_window")
} else {
@ -497,6 +508,23 @@ func (s *CorrelationService) eventsMatch(a, b *NormalizedEvent) bool {
return diff <= s.config.TimeWindow
}
// bEventHasValidTTL returns true if the B event is still alive (TTL not expired).
// Used in one_to_many mode where the B timestamp is fixed at TCP connection time and
// grows beyond TimeWindow — TTL validity is the correct liveness indicator.
func (s *CorrelationService) bEventHasValidTTL(bEvent *NormalizedEvent) bool {
key := bEvent.CorrelationKey()
if elements, ok := s.pendingB[key]; ok {
for _, elem := range elements {
if elem.Value.(*NormalizedEvent) == bEvent {
if ttl, exists := s.networkTTLs[elem]; exists {
return !s.timeProvider.Now().After(ttl)
}
}
}
}
return false
}
func (s *CorrelationService) addEvent(event *NormalizedEvent) {
key := event.CorrelationKey()
@ -605,6 +633,8 @@ func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string
// cleanNetworkBufferByTTL removes expired B events based on their reset TTL.
// For Keep-Alive support, we check the TTL map (which is reset on each correlation)
// rather than the original event timestamp.
// When a B event expires, any pending orphan A events for that key are immediately
// emitted (they can no longer ever be correlated).
func (s *CorrelationService) cleanNetworkBufferByTTL() {
now := s.timeProvider.Now()
var removed int
@ -618,6 +648,20 @@ func (s *CorrelationService) cleanNetworkBufferByTTL() {
delete(s.pendingB, key)
// Connection fully gone: reset Keep-Alive counter for this key
delete(s.keepAliveSeqA, key)
// Purge any pending orphan A events for this key: they can no longer
// correlate with a B (the connection is closed), emit them immediately.
if s.config.ApacheAlwaysEmit {
if orphans, ok := s.pendingOrphans[key]; ok && len(orphans) > 0 {
s.logger.Debugf("B TTL expired, emitting %d pending orphan(s) for key=%s", len(orphans), key)
for _, o := range orphans {
s.logger.Warnf("orphan A event (B TTL expired): src_ip=%s src_port=%d key=%s keepalive_seq=%d",
o.event.SrcIP, o.event.SrcPort, key, o.event.KeepAliveSeq)
s.metrics.RecordOrphanEmitted("A")
}
delete(s.pendingOrphans, key)
}
}
}
delete(s.networkTTLs, elem)
removed++
@ -778,7 +822,16 @@ func (s *CorrelationService) checkPendingOrphansForCorrelation(bEvent *Normalize
}
for i, orphan := range orphans {
if s.eventsMatch(orphan.event, bEvent) {
// In one_to_many mode: same key = same Keep-Alive connection → always correlate.
// The incoming B event may have an old original timestamp (fixed at TCP connection
// time) so eventsMatch would fail for long-lived sessions. Use key identity instead.
var matched bool
if s.config.MatchingMode == MatchingModeOneToMany {
matched = true
} else {
matched = s.eventsMatch(orphan.event, bEvent)
}
if matched {
// Found a match! Remove from pending and return
aEvent := orphan.event
s.pendingOrphans[key] = append(orphans[:i], orphans[i+1:]...)
@ -828,6 +881,15 @@ func (s *CorrelationService) emitPendingOrphans() []CorrelatedLog {
return results
}
// EmitPendingOrphans emits all pending orphans that have passed their emit delay.
// This is the public, thread-safe version intended to be called by an external ticker
// (e.g. the Orchestrator) so that orphans are emitted even when no new events arrive.
func (s *CorrelationService) EmitPendingOrphans() []CorrelatedLog {
s.mu.Lock()
defer s.mu.Unlock()
return s.emitPendingOrphans()
}
func removeElementFromSlice(elements []*list.Element, target *list.Element) []*list.Element {
if len(elements) == 0 {
return elements

View File

@ -1612,3 +1612,213 @@ if !correlated {
t.Errorf("A2 Keep-Alive: expected correlated result, got %d results — B was not buffered after pending orphan match (bug)", len(results))
}
}
// TestKeepAlive_BeyondTimeWindow_StillCorrelates verifies that in one_to_many mode,
// A events are correlated with the buffered B event even when the time difference
// between A.Timestamp and B.original Timestamp exceeds TimeWindow.
// Regression test for Bug #1.
func TestKeepAlive_BeyondTimeWindow_StillCorrelates(t *testing.T) {
now := time.Date(2026, 3, 5, 16, 30, 0, 0, time.UTC)
tp := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: 10 * time.Second,
ApacheAlwaysEmit: true,
ApacheEmitDelayMs: 500,
NetworkTTLS: 120,
MatchingMode: MatchingModeOneToMany,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
}
svc := NewCorrelationService(config, tp)
// B arrives first (TCP connection established)
b := &NormalizedEvent{
Source: SourceB,
Timestamp: now,
SrcIP: "91.224.92.185",
SrcPort: 53471,
}
if results := svc.ProcessEvent(b); len(results) != 0 {
t.Fatalf("B buffered: expected 0 results, got %d", len(results))
}
// A seq=1 arrives within window -- correlates and resets TTL
tp.now = now.Add(100 * time.Millisecond)
a1 := &NormalizedEvent{
Source: SourceA,
Timestamp: tp.now,
SrcIP: "91.224.92.185",
SrcPort: 53471,
}
if results := svc.ProcessEvent(a1); len(results) != 1 || !results[0].Correlated {
t.Fatalf("A seq=1: expected 1 correlated result, got %d", len(svc.ProcessEvent(a1)))
}
// A seq=10 arrives 11s after B (beyond TimeWindow) but B TTL is still valid (reset at seq=1)
tp.now = now.Add(11 * time.Second)
a10 := &NormalizedEvent{
Source: SourceA,
Timestamp: tp.now,
SrcIP: "91.224.92.185",
SrcPort: 53471,
}
results := svc.ProcessEvent(a10)
correlated := false
for _, r := range results {
if r.Correlated {
correlated = true
}
}
if !correlated {
t.Errorf("A seq=10 (diff>TimeWindow, TTL valid): expected Correlated=true, got %d results -- Bug #1 not fixed", len(results))
}
}
// TestKeepAlive_PendingOrphan_MatchesBeyondTimeWindow verifies that a pending orphan A
// is correctly matched by an incoming B when timestamps differ by more than TimeWindow.
// Regression test for Bug #4.
func TestKeepAlive_PendingOrphan_MatchesBeyondTimeWindow(t *testing.T) {
now := time.Date(2026, 3, 5, 16, 30, 0, 0, time.UTC)
tp := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: 10 * time.Second,
ApacheAlwaysEmit: true,
ApacheEmitDelayMs: 500,
NetworkTTLS: 120,
MatchingMode: MatchingModeOneToMany,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
}
svc := NewCorrelationService(config, tp)
// A arrives first, no B in buffer -> pending orphan
a := &NormalizedEvent{
Source: SourceA,
Timestamp: now.Add(15 * time.Second),
SrcIP: "10.1.2.3",
SrcPort: 9999,
}
if results := svc.ProcessEvent(a); len(results) != 0 {
t.Fatalf("A with no B: expected 0 results (pending orphan), got %d", len(results))
}
if len(svc.pendingOrphans) != 1 {
t.Fatalf("expected 1 pending orphan key, got %d", len(svc.pendingOrphans))
}
// B arrives with old timestamp (diff with A = 15s > TimeWindow)
// In one_to_many: same key = same connection, must match pending orphan regardless of timestamps
b := &NormalizedEvent{
Source: SourceB,
Timestamp: now,
SrcIP: "10.1.2.3",
SrcPort: 9999,
}
results := svc.ProcessEvent(b)
correlated := false
for _, r := range results {
if r.Correlated {
correlated = true
}
}
if !correlated {
t.Errorf("pending orphan + B (diff>TimeWindow): expected Correlated=true -- Bug #4 not fixed")
}
if len(svc.pendingOrphans) != 0 {
t.Errorf("expected pendingOrphans empty after match, got %d keys", len(svc.pendingOrphans))
}
}
// TestBTTLExpiry_PurgesPendingOrphans verifies that when a B event TTL expires,
// pending orphan A events for that key are purged. Regression test for Bug #3.
func TestBTTLExpiry_PurgesPendingOrphans(t *testing.T) {
now := time.Date(2026, 3, 5, 16, 30, 0, 0, time.UTC)
tp := &mockTimeProvider{now: now}
// Use one_to_one + tight TimeWindow so A goes to pending orphans in old code path
config := CorrelationConfig{
TimeWindow: 2 * time.Second,
ApacheAlwaysEmit: true,
ApacheEmitDelayMs: 500,
NetworkTTLS: 5,
MatchingMode: MatchingModeOneToOne,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
}
svc := NewCorrelationService(config, tp)
// B arrives
svc.ProcessEvent(&NormalizedEvent{
Source: SourceB, Timestamp: now, SrcIP: "10.9.9.9", SrcPort: 8888,
})
// A arrives 3s later -- beyond TimeWindow(2s), no match -> pending orphan
tp.now = now.Add(3 * time.Second)
svc.ProcessEvent(&NormalizedEvent{
Source: SourceA, Timestamp: tp.now, SrcIP: "10.9.9.9", SrcPort: 8888,
})
if _, exists := svc.pendingOrphans["10.9.9.9:8888"]; !exists {
t.Skip("A not in pendingOrphans (different code path) -- skipping")
}
// Advance past B TTL (5s from t=0, now at t=6s)
tp.now = now.Add(6 * time.Second)
// Trigger cleanExpired via ProcessEvent with a dummy event
svc.ProcessEvent(&NormalizedEvent{
Source: SourceB, Timestamp: tp.now, SrcIP: "99.99.99.1", SrcPort: 1,
})
if _, exists := svc.pendingOrphans["10.9.9.9:8888"]; exists {
t.Errorf("Bug #3: pendingOrphans not purged after B TTL expired")
}
}
// TestEmitPendingOrphans_PublicMethod verifies EmitPendingOrphans() emits orphans
// after the delay expires and nothing before. Regression test for Bug #2.
func TestEmitPendingOrphans_PublicMethod(t *testing.T) {
now := time.Date(2026, 3, 5, 16, 30, 0, 0, time.UTC)
tp := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: 10 * time.Second,
ApacheAlwaysEmit: true,
ApacheEmitDelayMs: 500,
NetworkTTLS: 120,
MatchingMode: MatchingModeOneToMany,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
}
svc := NewCorrelationService(config, tp)
// A arrives with no B -> pending orphan
if results := svc.ProcessEvent(&NormalizedEvent{
Source: SourceA, Timestamp: now, SrcIP: "192.0.2.1", SrcPort: 4444,
}); len(results) != 0 {
t.Fatalf("expected 0 results (pending), got %d", len(results))
}
// Before delay: nothing emitted
tp.now = now.Add(200 * time.Millisecond)
if emitted := svc.EmitPendingOrphans(); len(emitted) != 0 {
t.Errorf("before delay: expected 0 emitted orphans, got %d", len(emitted))
}
// After delay (>500ms): orphan emitted
tp.now = now.Add(600 * time.Millisecond)
emitted := svc.EmitPendingOrphans()
if len(emitted) != 1 {
t.Fatalf("after delay: expected 1 emitted orphan, got %d", len(emitted))
}
if emitted[0].Correlated {
t.Errorf("expected orphan (Correlated=false), got Correlated=true")
}
// Second call: nothing (already emitted)
if emitted2 := svc.EmitPendingOrphans(); len(emitted2) != 0 {
t.Errorf("second call: expected 0 (already emitted), got %d", len(emitted2))
}
}

View File

@ -47,6 +47,11 @@ type CorrelationProcessor interface {
// Flush forces emission of remaining buffered events.
Flush() []domain.CorrelatedLog
// EmitPendingOrphans emits orphan A events whose delay has expired.
// Called periodically by the Orchestrator ticker so orphans are not blocked
// waiting for the next incoming event.
EmitPendingOrphans() []domain.CorrelatedLog
// GetBufferSizes returns the current buffer sizes for monitoring.
GetBufferSizes() (int, int)
}