feat: observability, IP filtering, stdout/clickhouse fixes (v1.1.11)
- feat(observability): metrics server with /metrics and /health endpoints - feat(observability): correlation metrics (events, success/failed, reasons, buffers) - feat(correlation): IP exclusion filter (exact IPs and CIDR ranges) - feat(correlation): pending orphan delay for late-arriving B events - fix(stdout): sink is now a no-op for data; JSON must never appear on stdout - fix(clickhouse): all flush errors were silently discarded, now properly logged - fix(clickhouse): buffer overflow with DropOnOverflow now logged at WARN - fix(clickhouse): retry attempts logged at WARN with attempt/delay/error context - feat(clickhouse): connection success logged at INFO, batch sends at DEBUG - feat(clickhouse): SetLogger() for external logger injection - test(stdout): assert stdout remains empty for correlated and orphan logs - chore(rpm): bump version to 1.1.11, update changelog - docs: README and architecture.yml updated Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@ -2,6 +2,8 @@ package domain
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -41,6 +43,7 @@ type CorrelationConfig struct {
|
||||
MaxNetworkBufferSize int // Maximum events to buffer for source B (Network)
|
||||
NetworkTTLS int // TTL in seconds for network events (source B)
|
||||
MatchingMode string // "one_to_one" or "one_to_many" (Keep-Alive)
|
||||
ExcludeSourceIPs []string // List of source IPs or CIDR ranges to exclude
|
||||
}
|
||||
|
||||
// pendingOrphan represents an A event waiting to be emitted as orphan.
|
||||
@ -52,16 +55,18 @@ type pendingOrphan struct {
|
||||
|
||||
// CorrelationService handles the correlation logic between source A and B events.
|
||||
type CorrelationService struct {
|
||||
config CorrelationConfig
|
||||
mu sync.Mutex
|
||||
bufferA *eventBuffer
|
||||
bufferB *eventBuffer
|
||||
pendingA map[string][]*list.Element // key -> ordered elements containing *NormalizedEvent
|
||||
pendingB map[string][]*list.Element
|
||||
networkTTLs map[*list.Element]time.Time // TTL expiration time for each B event
|
||||
pendingOrphans map[string][]*pendingOrphan // key -> A events waiting to be emitted as orphans
|
||||
timeProvider TimeProvider
|
||||
logger *observability.Logger
|
||||
config CorrelationConfig
|
||||
mu sync.Mutex
|
||||
bufferA *eventBuffer
|
||||
bufferB *eventBuffer
|
||||
pendingA map[string][]*list.Element // key -> ordered elements containing *NormalizedEvent
|
||||
pendingB map[string][]*list.Element
|
||||
networkTTLs map[*list.Element]time.Time // TTL expiration time for each B event
|
||||
pendingOrphans map[string][]*pendingOrphan // key -> A events waiting to be emitted as orphans
|
||||
timeProvider TimeProvider
|
||||
logger *observability.Logger
|
||||
metrics *observability.CorrelationMetrics
|
||||
excludeIPsParsed []*net.IPNet // Parsed CIDR ranges for efficient lookup
|
||||
}
|
||||
|
||||
type eventBuffer struct {
|
||||
@ -111,6 +116,17 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider)
|
||||
config.ApacheEmitDelayMs = DefaultApacheEmitDelayMs
|
||||
}
|
||||
|
||||
// Parse excluded IPs (CIDR ranges) for efficient lookup
|
||||
var excludeIPsParsed []*net.IPNet
|
||||
for _, exclude := range config.ExcludeSourceIPs {
|
||||
if strings.Contains(exclude, "/") {
|
||||
_, cidr, err := net.ParseCIDR(exclude)
|
||||
if err == nil {
|
||||
excludeIPsParsed = append(excludeIPsParsed, cidr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &CorrelationService{
|
||||
config: config,
|
||||
bufferA: newEventBuffer(),
|
||||
@ -121,6 +137,8 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider)
|
||||
networkTTLs: make(map[*list.Element]time.Time),
|
||||
timeProvider: timeProvider,
|
||||
logger: observability.NewLogger("correlation"),
|
||||
metrics: observability.NewCorrelationMetrics(),
|
||||
excludeIPsParsed: excludeIPsParsed,
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,11 +147,56 @@ func (s *CorrelationService) SetLogger(logger *observability.Logger) {
|
||||
s.logger = logger
|
||||
}
|
||||
|
||||
// isIPExcluded checks if a source IP should be excluded from correlation.
|
||||
func (s *CorrelationService) isIPExcluded(ip string) bool {
|
||||
// Check exact IP matches first (from config)
|
||||
for _, exclude := range s.config.ExcludeSourceIPs {
|
||||
if !strings.Contains(exclude, "/") {
|
||||
// Exact IP match
|
||||
if exclude == ip {
|
||||
return true
|
||||
}
|
||||
if excludeIP := net.ParseIP(exclude); excludeIP != nil {
|
||||
if parsedIP := net.ParseIP(ip); parsedIP != nil {
|
||||
if excludeIP.Equal(parsedIP) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check CIDR ranges
|
||||
parsedIP := net.ParseIP(ip)
|
||||
if parsedIP == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, cidr := range s.excludeIPsParsed {
|
||||
if cidr.Contains(parsedIP) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// ProcessEvent processes an incoming event and returns correlated logs if matches are found.
|
||||
func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLog {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Check if source IP is excluded
|
||||
if s.isIPExcluded(event.SrcIP) {
|
||||
s.logger.Debugf("event excluded by IP filter: source=%s src_ip=%s src_port=%d",
|
||||
event.Source, event.SrcIP, event.SrcPort)
|
||||
s.metrics.RecordCorrelationFailed("ip_excluded")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Record event received
|
||||
s.metrics.RecordEventReceived(string(event.Source))
|
||||
|
||||
// Clean expired events first
|
||||
s.cleanExpired()
|
||||
|
||||
@ -145,6 +208,7 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
|
||||
// Buffer full - rotate oldest event instead of dropping new one
|
||||
s.logger.Warnf("buffer full, rotating oldest event: source=%s src_ip=%s src_port=%d",
|
||||
event.Source, event.SrcIP, event.SrcPort)
|
||||
s.metrics.RecordCorrelationFailed("buffer_eviction")
|
||||
if event.Source == SourceA {
|
||||
// Remove oldest A event and emit as orphan if configured
|
||||
s.rotateOldestA()
|
||||
@ -166,6 +230,7 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
|
||||
case SourceB:
|
||||
results, shouldBuffer = s.processSourceB(event)
|
||||
default:
|
||||
s.logger.Warnf("unknown event source: %s", event.Source)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -175,6 +240,9 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
|
||||
event.Source, event.SrcIP, event.SrcPort, s.getBufferSize(event.Source))
|
||||
}
|
||||
|
||||
// Update buffer sizes in metrics
|
||||
s.metrics.UpdateBufferSizes(int64(s.bufferA.events.Len()), int64(s.bufferB.events.Len()))
|
||||
|
||||
// Combine orphan results with correlation results
|
||||
if len(orphanResults) > 0 {
|
||||
results = append(orphanResults, results...)
|
||||
@ -248,6 +316,7 @@ func (s *CorrelationService) rotateOldestB() {
|
||||
|
||||
func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]CorrelatedLog, bool) {
|
||||
key := event.CorrelationKey()
|
||||
s.logger.Debugf("processing A event: key=%s timestamp=%v", key, event.Timestamp)
|
||||
|
||||
// Look for matching B events
|
||||
matches := s.findMatches(s.bufferB, s.pendingB, key, func(other *NormalizedEvent) bool {
|
||||
@ -259,9 +328,10 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate
|
||||
// Correlate with all matching B events (one-to-many)
|
||||
for _, bEvent := range matches {
|
||||
correlated := NewCorrelatedLog(event, bEvent)
|
||||
s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)",
|
||||
event.SrcIP, event.SrcPort, bEvent.SrcIP, bEvent.SrcPort)
|
||||
s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d ts=%v) + B(src_ip=%s src_port=%d ts=%v)",
|
||||
event.SrcIP, event.SrcPort, event.Timestamp, bEvent.SrcIP, bEvent.SrcPort, bEvent.Timestamp)
|
||||
results = append(results, correlated)
|
||||
s.metrics.RecordCorrelationSuccess()
|
||||
|
||||
// Reset TTL for matched B event (Keep-Alive)
|
||||
if s.config.MatchingMode == MatchingModeOneToMany {
|
||||
@ -271,6 +341,9 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate
|
||||
for _, elem := range elements {
|
||||
if elem.Value.(*NormalizedEvent) == bEvent {
|
||||
s.resetNetworkTTL(elem)
|
||||
s.metrics.RecordKeepAliveReset()
|
||||
s.logger.Debugf("TTL reset for B event (Keep-Alive): key=%s new_ttl=%v",
|
||||
bKey, s.config.NetworkTTLS)
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -286,6 +359,25 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate
|
||||
return results, false
|
||||
}
|
||||
|
||||
// No match found - check if there are B events with same key but outside time window
|
||||
if bElements, ok := s.pendingB[key]; ok && len(bElements) > 0 {
|
||||
// Key exists but no time match - log the time difference for debugging
|
||||
for _, bElem := range bElements {
|
||||
bEvent := bElem.Value.(*NormalizedEvent)
|
||||
timeDiff := event.Timestamp.Sub(bEvent.Timestamp)
|
||||
if timeDiff < 0 {
|
||||
timeDiff = -timeDiff
|
||||
}
|
||||
s.logger.Debugf("A event has same key as B but outside time window: key=%s time_diff=%v window=%v",
|
||||
key, timeDiff, s.config.TimeWindow)
|
||||
}
|
||||
s.metrics.RecordCorrelationFailed("time_window")
|
||||
} else {
|
||||
// No B events with same key at all
|
||||
s.logger.Debugf("A event has no matching B key in buffer: key=%s", key)
|
||||
s.metrics.RecordCorrelationFailed("no_match_key")
|
||||
}
|
||||
|
||||
// No match found - orphan A event
|
||||
// Instead of emitting immediately, add to pending orphans with delay
|
||||
// This allows B events to arrive slightly after A and still correlate
|
||||
@ -294,9 +386,11 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate
|
||||
if s.config.ApacheEmitDelayMs == 0 {
|
||||
orphan := NewCorrelatedLogFromEvent(event, "A")
|
||||
s.logger.Warnf("orphan A event (immediate): src_ip=%s src_port=%d", event.SrcIP, event.SrcPort)
|
||||
s.metrics.RecordOrphanEmitted("A")
|
||||
return []CorrelatedLog{orphan}, false
|
||||
}
|
||||
s.addPendingOrphan(event)
|
||||
s.metrics.RecordPendingOrphan()
|
||||
s.logger.Debugf("A event added to pending orphans (delay=%dms): src_ip=%s src_port=%d",
|
||||
s.config.ApacheEmitDelayMs, event.SrcIP, event.SrcPort)
|
||||
// Don't emit yet - will be emitted after delay expires
|
||||
@ -310,13 +404,16 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate
|
||||
|
||||
func (s *CorrelationService) processSourceB(event *NormalizedEvent) ([]CorrelatedLog, bool) {
|
||||
key := event.CorrelationKey()
|
||||
s.logger.Debugf("processing B event: key=%s timestamp=%v", key, event.Timestamp)
|
||||
|
||||
// FIRST: Check if there's a pending orphan A that matches this B event
|
||||
// This is the key optimization for delayed orphan emission
|
||||
if aEvent := s.checkPendingOrphansForCorrelation(event); aEvent != nil {
|
||||
correlated := NewCorrelatedLog(aEvent, event)
|
||||
s.logger.Debugf("correlation found (pending orphan): A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)",
|
||||
aEvent.SrcIP, aEvent.SrcPort, event.SrcIP, event.SrcPort)
|
||||
s.logger.Debugf("correlation found (pending orphan): A(src_ip=%s src_port=%d ts=%v) + B(src_ip=%s src_port=%d ts=%v)",
|
||||
aEvent.SrcIP, aEvent.SrcPort, aEvent.Timestamp, event.SrcIP, event.SrcPort, event.Timestamp)
|
||||
s.metrics.RecordCorrelationSuccess()
|
||||
s.metrics.RecordPendingOrphanMatch()
|
||||
return []CorrelatedLog{correlated}, false
|
||||
}
|
||||
|
||||
@ -325,12 +422,34 @@ func (s *CorrelationService) processSourceB(event *NormalizedEvent) ([]Correlate
|
||||
return s.eventsMatch(other, event)
|
||||
}); aEvent != nil {
|
||||
correlated := NewCorrelatedLog(aEvent, event)
|
||||
s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)",
|
||||
aEvent.SrcIP, aEvent.SrcPort, event.SrcIP, event.SrcPort)
|
||||
s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d ts=%v) + B(src_ip=%s src_port=%d ts=%v)",
|
||||
aEvent.SrcIP, aEvent.SrcPort, aEvent.Timestamp, event.SrcIP, event.SrcPort, event.Timestamp)
|
||||
s.metrics.RecordCorrelationSuccess()
|
||||
return []CorrelatedLog{correlated}, false
|
||||
}
|
||||
|
||||
// No match found - check if there are A events with same key but outside time window
|
||||
if aElements, ok := s.pendingA[key]; ok && len(aElements) > 0 {
|
||||
// Key exists but no time match - log the time difference for debugging
|
||||
for _, aElem := range aElements {
|
||||
aEvent := aElem.Value.(*NormalizedEvent)
|
||||
timeDiff := aEvent.Timestamp.Sub(event.Timestamp)
|
||||
if timeDiff < 0 {
|
||||
timeDiff = -timeDiff
|
||||
}
|
||||
s.logger.Debugf("B event has same key as A but outside time window: key=%s time_diff=%v window=%v",
|
||||
key, timeDiff, s.config.TimeWindow)
|
||||
}
|
||||
s.metrics.RecordCorrelationFailed("time_window")
|
||||
} else {
|
||||
// No A events with same key at all
|
||||
s.logger.Debugf("B event has no matching A key in buffer: key=%s", key)
|
||||
s.metrics.RecordCorrelationFailed("no_match_key")
|
||||
}
|
||||
|
||||
// Never emit B alone. Keep in buffer for potential future match.
|
||||
s.logger.Debugf("B event buffered (no match yet): key=%s src_ip=%s src_port=%d",
|
||||
key, event.SrcIP, event.SrcPort)
|
||||
return nil, true
|
||||
}
|
||||
|
||||
@ -417,8 +536,12 @@ func (s *CorrelationService) cleanBufferAByBTTL() {
|
||||
}
|
||||
|
||||
if s.config.ApacheAlwaysEmit {
|
||||
s.logger.Warnf("orphan A event (no B match, TTL expired): src_ip=%s src_port=%d",
|
||||
event.SrcIP, event.SrcPort)
|
||||
s.logger.Warnf("orphan A event (no B match, TTL expired): src_ip=%s src_port=%d key=%s",
|
||||
event.SrcIP, event.SrcPort, key)
|
||||
s.metrics.RecordOrphanEmitted("A")
|
||||
} else {
|
||||
s.logger.Debugf("A event removed (no valid B, TTL expired): src_ip=%s src_port=%d key=%s",
|
||||
event.SrcIP, event.SrcPort, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -448,6 +571,7 @@ func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string
|
||||
// rather than the original event timestamp.
|
||||
func (s *CorrelationService) cleanNetworkBufferByTTL() {
|
||||
now := s.timeProvider.Now()
|
||||
var removed int
|
||||
for elem, ttl := range s.networkTTLs {
|
||||
if now.After(ttl) {
|
||||
event := elem.Value.(*NormalizedEvent)
|
||||
@ -458,8 +582,15 @@ func (s *CorrelationService) cleanNetworkBufferByTTL() {
|
||||
delete(s.pendingB, key)
|
||||
}
|
||||
delete(s.networkTTLs, elem)
|
||||
removed++
|
||||
s.logger.Debugf("B event TTL expired: key=%s src_ip=%s src_port=%d ttl=%v age=%v",
|
||||
key, event.SrcIP, event.SrcPort, s.config.NetworkTTLS, now.Sub(event.Timestamp))
|
||||
s.metrics.RecordCorrelationFailed("ttl_expired")
|
||||
}
|
||||
}
|
||||
if removed > 0 {
|
||||
s.logger.Debugf("cleaned %d expired B events", removed)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *CorrelationService) findAndPopFirstMatch(
|
||||
@ -635,19 +766,20 @@ func (s *CorrelationService) emitPendingOrphans() []CorrelatedLog {
|
||||
if !s.config.ApacheAlwaysEmit {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
now := s.timeProvider.Now()
|
||||
var results []CorrelatedLog
|
||||
|
||||
|
||||
for key, orphans := range s.pendingOrphans {
|
||||
for i := len(orphans) - 1; i >= 0; i-- {
|
||||
if now.After(orphans[i].emitAfter) {
|
||||
// Time to emit this orphan
|
||||
orphan := NewCorrelatedLogFromEvent(orphans[i].event, "A")
|
||||
s.logger.Warnf("orphan A event (emit delay expired): src_ip=%s src_port=%d",
|
||||
orphans[i].event.SrcIP, orphans[i].event.SrcPort)
|
||||
s.logger.Warnf("orphan A event (emit delay expired): src_ip=%s src_port=%d key=%s delay_ms=%d",
|
||||
orphans[i].event.SrcIP, orphans[i].event.SrcPort, key, s.config.ApacheEmitDelayMs)
|
||||
s.metrics.RecordOrphanEmitted("A")
|
||||
results = append(results, orphan)
|
||||
|
||||
|
||||
// Remove from pending
|
||||
s.pendingOrphans[key] = append(orphans[:i], orphans[i+1:]...)
|
||||
if len(s.pendingOrphans[key]) == 0 {
|
||||
@ -656,7 +788,7 @@ func (s *CorrelationService) emitPendingOrphans() []CorrelatedLog {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
@ -755,3 +887,15 @@ func (s *CorrelationService) GetBufferSizes() (int, int) {
|
||||
defer s.mu.Unlock()
|
||||
return s.bufferA.events.Len(), s.bufferB.events.Len()
|
||||
}
|
||||
|
||||
// GetMetrics returns the correlation metrics (for monitoring/debugging).
|
||||
func (s *CorrelationService) GetMetrics() *observability.CorrelationMetrics {
|
||||
return s.metrics
|
||||
}
|
||||
|
||||
// GetMetricsSnapshot returns a point-in-time snapshot of metrics.
|
||||
func (s *CorrelationService) GetMetricsSnapshot() observability.MetricsSnapshot {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.metrics.Snapshot()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user