feat(correlation): add configurable delay before emitting orphan A events

New feature: Apache events can now wait for B events before being emitted as orphans.

Changes:
- Add ApacheEmitDelayMs config (default: 500ms)
- Add pendingOrphans structure for delayed emission
- processSourceA(): add A to pending orphans instead of immediate emission
- processSourceB(): check pending orphans before buffer A
- emitPendingOrphans(): emit orphans after delay expires
- Flush(): emit all pending orphans immediately (shutdown)

Configuration:
correlation:
  orphan_policy:
    apache_always_emit: true
    apache_emit_delay_ms: 500  # Wait 500ms before emitting as orphan

Backward compatibility:
- apache_emit_delay_ms: 0 → immediate emission (legacy mode)
- apache_emit_delay_ms < 0 → default 500ms

Tests added (5 new tests):
- TestCorrelationService_ApacheEmitDelay_BArrivesDuringDelay
- TestCorrelationService_ApacheEmitDelay_NoBArrives
- TestCorrelationService_ApacheEmitDelay_ZeroDelay
- TestCorrelationService_ApacheEmitDelay_MultipleA_SameKey
- TestCorrelationService_ApacheEmitDelay_Flush

All 30 tests pass. Coverage: 75.1%

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
toto
2026-03-03 22:03:31 +00:00
parent 97862bb1dc
commit e0c622f635
5 changed files with 431 additions and 26 deletions

View File

@ -108,6 +108,7 @@ func main() {
correlationSvc := domain.NewCorrelationService(domain.CorrelationConfig{
TimeWindow: cfg.Correlation.GetTimeWindow(),
ApacheAlwaysEmit: cfg.Correlation.GetApacheAlwaysEmit(),
ApacheEmitDelayMs: cfg.Correlation.GetApacheEmitDelayMs(),
NetworkEmit: false,
MaxHTTPBufferSize: cfg.Correlation.GetMaxHTTPBufferSize(),
MaxNetworkBufferSize: cfg.Correlation.GetMaxNetworkBufferSize(),
@ -118,9 +119,10 @@ func main() {
// Set logger for correlation service
correlationSvc.SetLogger(logger.WithFields(map[string]any{"component": "correlation"}))
logger.Info(fmt.Sprintf("Correlation service initialized: time_window=%s, emit_orphans=%v",
logger.Info(fmt.Sprintf("Correlation service initialized: time_window=%s, emit_orphans=%v, emit_delay_ms=%d",
cfg.Correlation.GetTimeWindow().String(),
cfg.Correlation.EmitOrphans))
cfg.Correlation.GetApacheAlwaysEmit(),
cfg.Correlation.GetApacheEmitDelayMs()))
// Create orchestrator
orchestrator := app.NewOrchestrator(app.OrchestratorConfig{

View File

@ -48,6 +48,7 @@ correlation:
# Orphan policy: what to do when no match is found
orphan_policy:
apache_always_emit: true # Always emit A events, even without B match
apache_emit_delay_ms: 500 # Wait 500ms before emitting as orphan (allows B to arrive)
network_emit: false # Never emit B events alone
# Matching mode: one_to_one or one_to_many (Keep-Alive)

View File

@ -121,6 +121,7 @@ func (c *TimeWindowConfig) GetDuration() time.Duration {
// OrphanPolicyConfig holds orphan event policy configuration.
type OrphanPolicyConfig struct {
ApacheAlwaysEmit bool `yaml:"apache_always_emit"`
ApacheEmitDelayMs int `yaml:"apache_emit_delay_ms"` // Delay in ms before emitting orphan A
NetworkEmit bool `yaml:"network_emit"`
}
@ -284,6 +285,14 @@ func (c *CorrelationConfig) GetApacheAlwaysEmit() bool {
return c.EmitOrphans
}
// GetApacheEmitDelayMs returns the delay in milliseconds before emitting orphan A events.
func (c *CorrelationConfig) GetApacheEmitDelayMs() int {
if c.OrphanPolicy.ApacheEmitDelayMs > 0 {
return c.OrphanPolicy.ApacheEmitDelayMs
}
return domain.DefaultApacheEmitDelayMs // Default: 500ms
}
// GetMatchingMode returns the matching mode.
func (c *CorrelationConfig) GetMatchingMode() string {
if c.Matching.Mode != "" {

View File

@ -22,6 +22,9 @@ const (
// The TTL is reset on each correlation (Keep-Alive mode), so the network
// event stays in buffer as long as the connection is active.
DefaultNetworkTTLS = 120
// DefaultApacheEmitDelayMs is the default delay before emitting an orphan A event
// This allows B events to arrive slightly after A and still correlate
DefaultApacheEmitDelayMs = 500
// MatchingModeOneToOne indicates single correlation (consume B after match)
MatchingModeOneToOne = "one_to_one"
// MatchingModeOneToMany indicates Keep-Alive mode (B can match multiple A)
@ -32,6 +35,7 @@ const (
type CorrelationConfig struct {
TimeWindow time.Duration
ApacheAlwaysEmit bool
ApacheEmitDelayMs int // Delay in ms before emitting orphan A (default: 500ms)
NetworkEmit bool
MaxHTTPBufferSize int // Maximum events to buffer for source A (HTTP)
MaxNetworkBufferSize int // Maximum events to buffer for source B (Network)
@ -39,6 +43,13 @@ type CorrelationConfig struct {
MatchingMode string // "one_to_one" or "one_to_many" (Keep-Alive)
}
// pendingOrphan represents an A event waiting to be emitted as orphan.
type pendingOrphan struct {
event *NormalizedEvent
emitAfter time.Time // Timestamp when this orphan should be emitted
timer *time.Timer
}
// CorrelationService handles the correlation logic between source A and B events.
type CorrelationService struct {
config CorrelationConfig
@ -48,6 +59,7 @@ type CorrelationService struct {
pendingA map[string][]*list.Element // key -> ordered elements containing *NormalizedEvent
pendingB map[string][]*list.Element
networkTTLs map[*list.Element]time.Time // TTL expiration time for each B event
pendingOrphans map[string][]*pendingOrphan // key -> A events waiting to be emitted as orphans
timeProvider TimeProvider
logger *observability.Logger
}
@ -94,6 +106,10 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider)
if config.MatchingMode == "" {
config.MatchingMode = MatchingModeOneToMany // Default to Keep-Alive
}
// Zero delay is valid for backward compatibility (immediate emission mode)
if config.ApacheEmitDelayMs < 0 {
config.ApacheEmitDelayMs = DefaultApacheEmitDelayMs
}
return &CorrelationService{
config: config,
@ -101,6 +117,7 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider)
bufferB: newEventBuffer(),
pendingA: make(map[string][]*list.Element),
pendingB: make(map[string][]*list.Element),
pendingOrphans: make(map[string][]*pendingOrphan),
networkTTLs: make(map[*list.Element]time.Time),
timeProvider: timeProvider,
logger: observability.NewLogger("correlation"),
@ -120,6 +137,9 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
// Clean expired events first
s.cleanExpired()
// Emit pending orphans that have passed their delay
orphanResults := s.emitPendingOrphans()
// Check buffer overflow before adding
if s.isBufferFull(event.Source) {
// Buffer full - rotate oldest event instead of dropping new one
@ -155,6 +175,11 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
event.Source, event.SrcIP, event.SrcPort, s.getBufferSize(event.Source))
}
// Combine orphan results with correlation results
if len(orphanResults) > 0 {
results = append(orphanResults, results...)
}
return results
}
@ -262,11 +287,22 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate
}
// No match found - orphan A event
// Instead of emitting immediately, add to pending orphans with delay
// This allows B events to arrive slightly after A and still correlate
if s.config.ApacheAlwaysEmit {
// Zero delay = immediate emission (backward compatibility mode)
if s.config.ApacheEmitDelayMs == 0 {
orphan := NewCorrelatedLogFromEvent(event, "A")
s.logger.Warnf("orphan A event (no B match): src_ip=%s src_port=%d", event.SrcIP, event.SrcPort)
s.logger.Warnf("orphan A event (immediate): src_ip=%s src_port=%d", event.SrcIP, event.SrcPort)
return []CorrelatedLog{orphan}, false
}
s.addPendingOrphan(event)
s.logger.Debugf("A event added to pending orphans (delay=%dms): src_ip=%s src_port=%d",
s.config.ApacheEmitDelayMs, event.SrcIP, event.SrcPort)
// Don't emit yet - will be emitted after delay expires
// Return empty results, event stays in pending orphans
return nil, false
}
// Keep in buffer for potential future match
return nil, true
@ -275,7 +311,16 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate
func (s *CorrelationService) processSourceB(event *NormalizedEvent) ([]CorrelatedLog, bool) {
key := event.CorrelationKey()
// Look for the first matching A event (one-to-one first match)
// FIRST: Check if there's a pending orphan A that matches this B event
// This is the key optimization for delayed orphan emission
if aEvent := s.checkPendingOrphansForCorrelation(event); aEvent != nil {
correlated := NewCorrelatedLog(aEvent, event)
s.logger.Debugf("correlation found (pending orphan): A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)",
aEvent.SrcIP, aEvent.SrcPort, event.SrcIP, event.SrcPort)
return []CorrelatedLog{correlated}, false
}
// SECOND: Look for the first matching A event in buffer
if aEvent := s.findAndPopFirstMatch(s.bufferA, s.pendingA, key, func(other *NormalizedEvent) bool {
return s.eventsMatch(other, event)
}); aEvent != nil {
@ -518,6 +563,103 @@ func (s *CorrelationService) resetNetworkTTL(elem *list.Element) {
s.networkTTLs[elem] = s.timeProvider.Now().Add(time.Duration(s.config.NetworkTTLS) * time.Second)
}
// addPendingOrphan adds an A event to the pending orphans list with a delayed emission.
func (s *CorrelationService) addPendingOrphan(event *NormalizedEvent) {
key := event.CorrelationKey()
emitAfter := s.timeProvider.Now().Add(time.Duration(s.config.ApacheEmitDelayMs) * time.Millisecond)
orphan := &pendingOrphan{
event: event,
emitAfter: emitAfter,
}
s.pendingOrphans[key] = append(s.pendingOrphans[key], orphan)
s.logger.Debugf("A event added to pending orphans: src_ip=%s src_port=%d emit_after=%v",
event.SrcIP, event.SrcPort, emitAfter)
}
// removePendingOrphan removes a specific pending orphan by event reference.
// Returns true if found and removed, false otherwise.
func (s *CorrelationService) removePendingOrphan(event *NormalizedEvent) bool {
key := event.CorrelationKey()
orphans, ok := s.pendingOrphans[key]
if !ok {
return false
}
for i, orphan := range orphans {
if orphan.event == event {
// Stop the timer if it exists
if orphan.timer != nil {
orphan.timer.Stop()
}
s.pendingOrphans[key] = append(orphans[:i], orphans[i+1:]...)
if len(s.pendingOrphans[key]) == 0 {
delete(s.pendingOrphans, key)
}
return true
}
}
return false
}
// checkPendingOrphansForCorrelation checks if any pending orphans match the given B event.
// Returns the first matching A event and removes it from pending orphans.
func (s *CorrelationService) checkPendingOrphansForCorrelation(bEvent *NormalizedEvent) *NormalizedEvent {
key := bEvent.CorrelationKey()
orphans, ok := s.pendingOrphans[key]
if !ok || len(orphans) == 0 {
return nil
}
for i, orphan := range orphans {
if s.eventsMatch(orphan.event, bEvent) {
// Found a match! Remove from pending and return
aEvent := orphan.event
s.pendingOrphans[key] = append(orphans[:i], orphans[i+1:]...)
if len(s.pendingOrphans[key]) == 0 {
delete(s.pendingOrphans, key)
}
s.logger.Debugf("pending orphan matched with B: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)",
aEvent.SrcIP, aEvent.SrcPort, bEvent.SrcIP, bEvent.SrcPort)
return aEvent
}
}
return nil
}
// emitPendingOrphans emits all pending orphans that have passed their emit delay.
// Returns the list of correlated logs to emit.
func (s *CorrelationService) emitPendingOrphans() []CorrelatedLog {
if !s.config.ApacheAlwaysEmit {
return nil
}
now := s.timeProvider.Now()
var results []CorrelatedLog
for key, orphans := range s.pendingOrphans {
for i := len(orphans) - 1; i >= 0; i-- {
if now.After(orphans[i].emitAfter) {
// Time to emit this orphan
orphan := NewCorrelatedLogFromEvent(orphans[i].event, "A")
s.logger.Warnf("orphan A event (emit delay expired): src_ip=%s src_port=%d",
orphans[i].event.SrcIP, orphans[i].event.SrcPort)
results = append(results, orphan)
// Remove from pending
s.pendingOrphans[key] = append(orphans[:i], orphans[i+1:]...)
if len(s.pendingOrphans[key]) == 0 {
delete(s.pendingOrphans, key)
}
}
}
}
return results
}
func removeElementFromSlice(elements []*list.Element, target *list.Element) []*list.Element {
if len(elements) == 0 {
return elements
@ -582,6 +724,18 @@ func (s *CorrelationService) Flush() []CorrelatedLog {
}
}
// Emit all pending orphans (immediately, ignoring delay)
if s.config.ApacheAlwaysEmit {
for _, orphans := range s.pendingOrphans {
for _, orphan := range orphans {
correlatedLog := NewCorrelatedLogFromEvent(orphan.event, "A")
s.logger.Warnf("flush pending orphan: src_ip=%s src_port=%d",
orphan.event.SrcIP, orphan.event.SrcPort)
results = append(results, correlatedLog)
}
}
}
// Never emit remaining B events alone.
// Clear buffers
@ -589,6 +743,7 @@ func (s *CorrelationService) Flush() []CorrelatedLog {
s.bufferB.events.Init()
s.pendingA = make(map[string][]*list.Element)
s.pendingB = make(map[string][]*list.Element)
s.pendingOrphans = make(map[string][]*pendingOrphan)
s.networkTTLs = make(map[*list.Element]time.Time)
return results

View File

@ -145,6 +145,7 @@ func TestCorrelationService_Flush(t *testing.T) {
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
ApacheEmitDelayMs: 0, // Zero delay = immediate emission (backward compatibility)
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
@ -161,7 +162,7 @@ func TestCorrelationService_Flush(t *testing.T) {
SrcPort: 8080,
}
// A est émis immédiatement quand ApacheAlwaysEmit=true
// A est émis immédiatement quand ApacheAlwaysEmit=true et ApacheEmitDelayMs=0
results := svc.ProcessEvent(apacheEvent)
if len(results) != 1 {
t.Fatalf("expected 1 immediate orphan event, got %d", len(results))
@ -1079,3 +1080,240 @@ func TestCorrelationService_CleanA_RespectsBTTL(t *testing.T) {
t.Errorf("expected A also expired at t=35s (25s old > 5s TimeWindow), got %d", a)
}
}
func TestCorrelationService_ApacheEmitDelay_BArrivesDuringDelay(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
ApacheEmitDelayMs: 500, // 500ms delay
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
// Send Apache event - should be added to pending orphans, NOT emitted immediately
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"method": "GET"},
}
results := svc.ProcessEvent(apacheEvent)
if len(results) != 0 {
t.Fatalf("expected 0 immediate results (pending orphan), got %d", len(results))
}
// Advance time by 250ms (less than delay)
timeProvider.now = now.Add(250 * time.Millisecond)
// Send B event - should correlate with pending A
networkEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now.Add(100 * time.Millisecond),
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"ja3": "abc"},
}
results = svc.ProcessEvent(networkEvent)
if len(results) != 1 {
t.Errorf("expected 1 correlated result, got %d", len(results))
} else if !results[0].Correlated {
t.Error("expected correlated result")
}
// Verify no pending orphans remain
if len(svc.pendingOrphans) != 0 {
t.Errorf("expected pending orphans empty, got %d keys", len(svc.pendingOrphans))
}
}
func TestCorrelationService_ApacheEmitDelay_NoBArrives(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
ApacheEmitDelayMs: 500, // 500ms delay
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
// Send Apache event
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"method": "GET"},
}
results := svc.ProcessEvent(apacheEvent)
if len(results) != 0 {
t.Fatalf("expected 0 immediate results, got %d", len(results))
}
// Advance time past delay (600ms > 500ms)
timeProvider.now = now.Add(600 * time.Millisecond)
// Send another event (unrelated) to trigger emitPendingOrphans
unrelatedEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now,
SrcIP: "10.0.0.1",
SrcPort: 9999,
}
results = svc.ProcessEvent(unrelatedEvent)
if len(results) != 1 {
t.Fatalf("expected 1 orphan result, got %d", len(results))
}
if results[0].OrphanSide != "A" {
t.Errorf("expected orphan side A, got %s", results[0].OrphanSide)
}
}
func TestCorrelationService_ApacheEmitDelay_ZeroDelay(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
ApacheEmitDelayMs: 0, // Zero delay = immediate emission (backward compat)
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
// Send Apache event - should emit immediately with zero delay
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"method": "GET"},
}
results := svc.ProcessEvent(apacheEvent)
// With zero delay, should emit immediately (backward compatibility mode)
if len(results) != 1 {
t.Fatalf("expected 1 immediate result (zero delay mode), got %d", len(results))
}
}
func TestCorrelationService_ApacheEmitDelay_MultipleA_SameKey(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
ApacheEmitDelayMs: 1000, // 1s delay
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
// Send multiple A events with same key
for i := 0; i < 3; i++ {
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now.Add(time.Duration(i) * time.Millisecond),
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"method": "GET", "path": fmt.Sprintf("/api/%d", i)},
}
results := svc.ProcessEvent(apacheEvent)
if len(results) != 0 {
t.Fatalf("expected 0 immediate results for A%d, got %d", i, len(results))
}
}
// Verify 3 pending orphans
key := "192.168.1.1:8080"
if len(svc.pendingOrphans[key]) != 3 {
t.Errorf("expected 3 pending orphans, got %d", len(svc.pendingOrphans[key]))
}
// Advance time past delay
timeProvider.now = now.Add(1100 * time.Millisecond)
// Trigger emit with unrelated event
unrelatedEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now,
SrcIP: "10.0.0.1",
SrcPort: 9999,
}
results := svc.ProcessEvent(unrelatedEvent)
// Should emit all 3 orphans
if len(results) != 3 {
t.Errorf("expected 3 orphan results, got %d", len(results))
}
}
func TestCorrelationService_ApacheEmitDelay_Flush(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
ApacheEmitDelayMs: 5000, // 5s delay
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
// Send A event
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"method": "GET"},
}
results := svc.ProcessEvent(apacheEvent)
if len(results) != 0 {
t.Fatalf("expected 0 immediate results, got %d", len(results))
}
// Flush immediately (should emit pending orphans regardless of delay)
flushed := svc.Flush()
if len(flushed) != 1 {
t.Errorf("expected 1 flushed orphan, got %d", len(flushed))
} else if flushed[0].OrphanSide != "A" {
t.Errorf("expected orphan side A, got %s", flushed[0].OrphanSide)
}
}