feat(correlation): emit A events filtered by include_dest_ports to ClickHouse (v1.1.16)
When an A event (HTTP) was excluded by the include_dest_ports filter, it was silently dropped and never reached ClickHouse. With ApacheAlwaysEmit=true, the event is now returned immediately as an uncorrelated log (orphan_side=A). B events on filtered ports continue to be dropped (no useful data). Updated TestCorrelationService_IncludeDestPorts_FilteredPort to assert the A event is emitted with Correlated=false, OrphanSide=A. Added TestCorrelationService_IncludeDestPorts_FilteredPort_NoAlwaysEmit to confirm silent drop when ApacheAlwaysEmit=false. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@ -215,6 +215,14 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
|
||||
s.logger.Debugf("event excluded by dest port filter: source=%s dst_port=%d",
|
||||
event.Source, event.DstPort)
|
||||
s.metrics.RecordCorrelationFailed("dest_port_filtered")
|
||||
// A events (HTTP) are always emitted even when dest port is filtered,
|
||||
// so they reach ClickHouse as uncorrelated entries.
|
||||
if event.Source == SourceA && s.config.ApacheAlwaysEmit {
|
||||
s.metrics.RecordOrphanEmitted("A")
|
||||
s.logger.Warnf("orphan A event (dest port filtered): src_ip=%s src_port=%d dst_port=%d",
|
||||
event.SrcIP, event.SrcPort, event.DstPort)
|
||||
return []CorrelatedLog{NewCorrelatedLogFromEvent(event, "A")}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -1362,33 +1362,61 @@ MatchingMode: MatchingModeOneToMany,
|
||||
IncludeDestPorts: []int{80, 443},
|
||||
}, mt)
|
||||
|
||||
// A event on port 22 (not in allow-list)
|
||||
// A event on port 22 (not in allow-list): must be emitted as uncorrelated
|
||||
aEvent := &NormalizedEvent{
|
||||
Source: SourceA, Timestamp: now,
|
||||
SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 22,
|
||||
}
|
||||
results := svc.ProcessEvent(aEvent)
|
||||
if len(results) != 0 {
|
||||
t.Fatalf("expected 0 results (filtered), got %d", len(results))
|
||||
if len(results) != 1 {
|
||||
t.Fatalf("expected 1 result (orphan A, dest port filtered), got %d", len(results))
|
||||
}
|
||||
if results[0].Correlated {
|
||||
t.Errorf("expected Correlated=false for dest-port-filtered A event")
|
||||
}
|
||||
if results[0].OrphanSide != "A" {
|
||||
t.Errorf("expected OrphanSide=A, got %q", results[0].OrphanSide)
|
||||
}
|
||||
|
||||
// B event on port 22 (not in allow-list)
|
||||
// B event on port 22 (not in allow-list): no emission
|
||||
bEvent := &NormalizedEvent{
|
||||
Source: SourceB, Timestamp: now,
|
||||
SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 22,
|
||||
}
|
||||
results = svc.ProcessEvent(bEvent)
|
||||
if len(results) != 0 {
|
||||
t.Fatalf("expected 0 results (filtered), got %d", len(results))
|
||||
t.Fatalf("expected 0 results (B filtered), got %d", len(results))
|
||||
}
|
||||
|
||||
// Flush should also return nothing
|
||||
// Flush should return nothing (nothing buffered)
|
||||
flushed := svc.Flush()
|
||||
if len(flushed) != 0 {
|
||||
t.Errorf("expected 0 flushed events, got %d", len(flushed))
|
||||
}
|
||||
}
|
||||
|
||||
// TestCorrelationService_IncludeDestPorts_FilteredPort_NoAlwaysEmit verifies that
|
||||
// when ApacheAlwaysEmit is false, A events on filtered ports are silently dropped.
|
||||
func TestCorrelationService_IncludeDestPorts_FilteredPort_NoAlwaysEmit(t *testing.T) {
|
||||
now := time.Now()
|
||||
mt := &mockTimeProvider{now: now}
|
||||
svc := NewCorrelationService(CorrelationConfig{
|
||||
TimeWindow: 10 * time.Second,
|
||||
ApacheAlwaysEmit: false,
|
||||
MatchingMode: MatchingModeOneToMany,
|
||||
IncludeDestPorts: []int{80, 443},
|
||||
}, mt)
|
||||
|
||||
// A event on port 22 (not in allow-list, ApacheAlwaysEmit=false): must be dropped
|
||||
results := svc.ProcessEvent(&NormalizedEvent{
|
||||
Source: SourceA, Timestamp: now,
|
||||
SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 22,
|
||||
})
|
||||
if len(results) != 0 {
|
||||
t.Fatalf("expected 0 results (ApacheAlwaysEmit=false), got %d", len(results))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCorrelationService_IncludeDestPorts_EmptyAllowsAll(t *testing.T) {
|
||||
now := time.Now()
|
||||
mt := &mockTimeProvider{now: now}
|
||||
|
||||
Reference in New Issue
Block a user