feat: Keep-Alive correlation, TTL management, SIGHUP handling, logrotate support
Some checks failed
Build and Test / test (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / docker (push) Has been cancelled

Major features:
- One-to-many correlation mode (Keep-Alive) for HTTP connections
- Dynamic TTL for network events with reset on each correlation
- Separate configurable buffer sizes for HTTP and network events
- SIGHUP signal handling for log rotation without service restart
- FileSink.Reopen() method for log file rotation
- logrotate configuration included in RPM
- ExecReload added to systemd service

Configuration changes:
- New YAML structure with nested sections (time_window, orphan_policy, matching, buffers, ttl)
- Backward compatibility maintained for deprecated fields

Packaging:
- RPM version 1.1.0 with logrotate config
- Updated spec file and changelog
- All distributions: el8, el9, el10

Tests:
- New tests for Keep-Alive mode and TTL reset
- Updated mocks with Reopen() interface method

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Jacquin Antoine
2026-03-02 20:32:59 +01:00
parent a415a3201a
commit 33e19b4f52
19 changed files with 974 additions and 321 deletions

View File

@ -115,6 +115,11 @@ func (s *ClickHouseSink) Name() string {
return "clickhouse"
}
// Reopen is a no-op for ClickHouse (connection is managed internally).
func (s *ClickHouseSink) Reopen() error {
return nil
}
// Write adds a log to the buffer.
func (s *ClickHouseSink) Write(ctx context.Context, log domain.CorrelatedLog) error {
deadline := time.Now().Add(time.Duration(s.config.TimeoutMs) * time.Millisecond)

View File

@ -38,9 +38,16 @@ func NewFileSink(config Config) (*FileSink, error) {
return nil, fmt.Errorf("invalid file path: %w", err)
}
return &FileSink{
s := &FileSink{
config: config,
}, nil
}
// Open file on creation
if err := s.openFile(); err != nil {
return nil, err
}
return s, nil
}
// Name returns the sink name.
@ -48,6 +55,20 @@ func (s *FileSink) Name() string {
return "file"
}
// Reopen closes and reopens the file (for log rotation on SIGHUP).
func (s *FileSink) Reopen() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.file != nil {
if err := s.file.Close(); err != nil {
return fmt.Errorf("failed to close file: %w", err)
}
}
return s.openFile()
}
// Write writes a correlated log to the file.
func (s *FileSink) Write(ctx context.Context, log domain.CorrelatedLog) error {
s.mu.Lock()

View File

@ -121,3 +121,17 @@ func (s *MultiSink) Close() error {
}
return firstErr
}
// Reopen reopens all sinks (for log rotation on SIGHUP).
func (s *MultiSink) Reopen() error {
s.mu.RLock()
defer s.mu.RUnlock()
var firstErr error
for _, sink := range s.sinks {
if err := sink.Reopen(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}

View File

@ -14,6 +14,7 @@ type mockSink struct {
writeFunc func(domain.CorrelatedLog) error
flushFunc func() error
closeFunc func() error
reopenFunc func() error
}
func (m *mockSink) Name() string { return m.name }
@ -24,6 +25,12 @@ func (m *mockSink) Write(ctx context.Context, log domain.CorrelatedLog) error {
}
func (m *mockSink) Flush(ctx context.Context) error { return m.flushFunc() }
func (m *mockSink) Close() error { return m.closeFunc() }
func (m *mockSink) Reopen() error {
if m.reopenFunc != nil {
return m.reopenFunc()
}
return nil
}
func TestMultiSink_Write(t *testing.T) {
var mu sync.Mutex

View File

@ -35,6 +35,11 @@ func (s *StdoutSink) Name() string {
return "stdout"
}
// Reopen is a no-op for stdout.
func (s *StdoutSink) Reopen() error {
return nil
}
// Write writes a correlated log to stdout.
func (s *StdoutSink) Write(ctx context.Context, log domain.CorrelatedLog) error {
s.mu.Lock()

View File

@ -58,6 +58,7 @@ func (m *mockSink) Write(ctx context.Context, log domain.CorrelatedLog) error {
}
func (m *mockSink) Flush(ctx context.Context) error { return nil }
func (m *mockSink) Close() error { return nil }
func (m *mockSink) Reopen() error { return nil }
func (m *mockSink) getWritten() []domain.CorrelatedLog {
m.mu.Lock()

View File

@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/logcorrelator/logcorrelator/internal/domain"
"gopkg.in/yaml.v3"
)
@ -83,10 +84,61 @@ type StdoutOutputConfig struct {
// CorrelationConfig holds correlation configuration.
type CorrelationConfig struct {
TimeWindow TimeWindowConfig `yaml:"time_window"`
OrphanPolicy OrphanPolicyConfig `yaml:"orphan_policy"`
Matching MatchingConfig `yaml:"matching"`
Buffers BuffersConfig `yaml:"buffers"`
TTL TTLConfig `yaml:"ttl"`
// Deprecated: Use TimeWindow.Value instead
TimeWindowS int `yaml:"time_window_s"`
// Deprecated: Use OrphanPolicy.ApacheAlwaysEmit instead
EmitOrphans bool `yaml:"emit_orphans"`
}
// TimeWindowConfig holds time window configuration.
type TimeWindowConfig struct {
Value int `yaml:"value"`
Unit string `yaml:"unit"` // s, ms, etc.
}
// GetDuration returns the time window as a duration.
func (c *TimeWindowConfig) GetDuration() time.Duration {
value := c.Value
if value <= 0 {
value = 1
}
switch c.Unit {
case "ms", "millisecond", "milliseconds":
return time.Duration(value) * time.Millisecond
case "s", "sec", "second", "seconds":
fallthrough
default:
return time.Duration(value) * time.Second
}
}
// OrphanPolicyConfig holds orphan event policy configuration.
type OrphanPolicyConfig struct {
ApacheAlwaysEmit bool `yaml:"apache_always_emit"`
NetworkEmit bool `yaml:"network_emit"`
}
// MatchingConfig holds matching mode configuration.
type MatchingConfig struct {
Mode string `yaml:"mode"` // one_to_one or one_to_many
}
// BuffersConfig holds buffer size configuration.
type BuffersConfig struct {
MaxHTTPItems int `yaml:"max_http_items"`
MaxNetworkItems int `yaml:"max_network_items"`
}
// TTLConfig holds TTL configuration.
type TTLConfig struct {
NetworkTTLS int `yaml:"network_ttl_s"`
}
// Load loads configuration from a YAML file.
func Load(path string) (*Config, error) {
data, err := os.ReadFile(path)
@ -208,7 +260,13 @@ func (c *Config) Validate() error {
}
// GetTimeWindow returns the time window as a duration.
// Deprecated: Use TimeWindow.GetDuration() instead.
func (c *CorrelationConfig) GetTimeWindow() time.Duration {
// New config takes precedence
if c.TimeWindow.Value > 0 {
return c.TimeWindow.GetDuration()
}
// Fallback to deprecated field
value := c.TimeWindowS
if value <= 0 {
value = 1
@ -216,6 +274,47 @@ func (c *CorrelationConfig) GetTimeWindow() time.Duration {
return time.Duration(value) * time.Second
}
// GetApacheAlwaysEmit returns whether to always emit Apache events.
func (c *CorrelationConfig) GetApacheAlwaysEmit() bool {
if c.OrphanPolicy.ApacheAlwaysEmit {
return true
}
// Fallback to deprecated field
return c.EmitOrphans
}
// GetMatchingMode returns the matching mode.
func (c *CorrelationConfig) GetMatchingMode() string {
if c.Matching.Mode != "" {
return c.Matching.Mode
}
return "one_to_many" // Default to Keep-Alive
}
// GetMaxHTTPBufferSize returns the max HTTP buffer size.
func (c *CorrelationConfig) GetMaxHTTPBufferSize() int {
if c.Buffers.MaxHTTPItems > 0 {
return c.Buffers.MaxHTTPItems
}
return domain.DefaultMaxHTTPBufferSize
}
// GetMaxNetworkBufferSize returns the max network buffer size.
func (c *CorrelationConfig) GetMaxNetworkBufferSize() int {
if c.Buffers.MaxNetworkItems > 0 {
return c.Buffers.MaxNetworkItems
}
return domain.DefaultMaxNetworkBufferSize
}
// GetNetworkTTLS returns the network TTL in seconds.
func (c *CorrelationConfig) GetNetworkTTLS() int {
if c.TTL.NetworkTTLS > 0 {
return c.TTL.NetworkTTLS
}
return domain.DefaultNetworkTTLS
}
// GetSocketPermissions returns the socket permissions as os.FileMode.
// Default is 0660 (owner + group read/write).
func (c *UnixSocketConfig) GetSocketPermissions() os.FileMode {

View File

@ -9,18 +9,29 @@ import (
)
const (
// DefaultMaxBufferSize is the default maximum number of events per buffer
DefaultMaxBufferSize = 10000
// DefaultMaxHTTPBufferSize is the default maximum number of HTTP events (source A)
DefaultMaxHTTPBufferSize = 10000
// DefaultMaxNetworkBufferSize is the default maximum number of network events (source B)
DefaultMaxNetworkBufferSize = 20000
// DefaultTimeWindow is used when no valid time window is provided
DefaultTimeWindow = time.Second
// DefaultNetworkTTLS is the default TTL for network events in seconds
DefaultNetworkTTLS = 30
// MatchingModeOneToOne indicates single correlation (consume B after match)
MatchingModeOneToOne = "one_to_one"
// MatchingModeOneToMany indicates Keep-Alive mode (B can match multiple A)
MatchingModeOneToMany = "one_to_many"
)
// CorrelationConfig holds the correlation configuration.
type CorrelationConfig struct {
TimeWindow time.Duration
ApacheAlwaysEmit bool
NetworkEmit bool
MaxBufferSize int // Maximum events to buffer per source
TimeWindow time.Duration
ApacheAlwaysEmit bool
NetworkEmit bool
MaxHTTPBufferSize int // Maximum events to buffer for source A (HTTP)
MaxNetworkBufferSize int // Maximum events to buffer for source B (Network)
NetworkTTLS int // TTL in seconds for network events (source B)
MatchingMode string // "one_to_one" or "one_to_many" (Keep-Alive)
}
// CorrelationService handles the correlation logic between source A and B events.
@ -31,6 +42,7 @@ type CorrelationService struct {
bufferB *eventBuffer
pendingA map[string][]*list.Element // key -> ordered elements containing *NormalizedEvent
pendingB map[string][]*list.Element
networkTTLs map[*list.Element]time.Time // TTL expiration time for each B event
timeProvider TimeProvider
logger *observability.Logger
}
@ -62,12 +74,21 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider)
if timeProvider == nil {
timeProvider = &RealTimeProvider{}
}
if config.MaxBufferSize <= 0 {
config.MaxBufferSize = DefaultMaxBufferSize
if config.MaxHTTPBufferSize <= 0 {
config.MaxHTTPBufferSize = DefaultMaxHTTPBufferSize
}
if config.MaxNetworkBufferSize <= 0 {
config.MaxNetworkBufferSize = DefaultMaxNetworkBufferSize
}
if config.TimeWindow <= 0 {
config.TimeWindow = DefaultTimeWindow
}
if config.NetworkTTLS <= 0 {
config.NetworkTTLS = DefaultNetworkTTLS
}
if config.MatchingMode == "" {
config.MatchingMode = MatchingModeOneToMany // Default to Keep-Alive
}
return &CorrelationService{
config: config,
@ -75,6 +96,7 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider)
bufferB: newEventBuffer(),
pendingA: make(map[string][]*list.Element),
pendingB: make(map[string][]*list.Element),
networkTTLs: make(map[*list.Element]time.Time),
timeProvider: timeProvider,
logger: observability.NewLogger("correlation"),
}
@ -140,9 +162,9 @@ func (s *CorrelationService) getBufferSize(source EventSource) int {
func (s *CorrelationService) isBufferFull(source EventSource) bool {
switch source {
case SourceA:
return s.bufferA.events.Len() >= s.config.MaxBufferSize
return s.bufferA.events.Len() >= s.config.MaxHTTPBufferSize
case SourceB:
return s.bufferB.events.Len() >= s.config.MaxBufferSize
return s.bufferB.events.Len() >= s.config.MaxNetworkBufferSize
}
return false
}
@ -150,14 +172,41 @@ func (s *CorrelationService) isBufferFull(source EventSource) bool {
func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]CorrelatedLog, bool) {
key := event.CorrelationKey()
// Look for the first matching B event (one-to-one first match)
if bEvent := s.findAndPopFirstMatch(s.bufferB, s.pendingB, key, func(other *NormalizedEvent) bool {
// Look for matching B events
matches := s.findMatches(s.bufferB, s.pendingB, key, func(other *NormalizedEvent) bool {
return s.eventsMatch(event, other)
}); bEvent != nil {
correlated := NewCorrelatedLog(event, bEvent)
s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)",
event.SrcIP, event.SrcPort, bEvent.SrcIP, bEvent.SrcPort)
return []CorrelatedLog{correlated}, false
})
if len(matches) > 0 {
var results []CorrelatedLog
// Correlate with all matching B events (one-to-many)
for _, bEvent := range matches {
correlated := NewCorrelatedLog(event, bEvent)
s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)",
event.SrcIP, event.SrcPort, bEvent.SrcIP, bEvent.SrcPort)
results = append(results, correlated)
// Reset TTL for matched B event (Keep-Alive)
if s.config.MatchingMode == MatchingModeOneToMany {
// Find the element for this B event and reset TTL
bKey := bEvent.CorrelationKey()
if elements, ok := s.pendingB[bKey]; ok {
for _, elem := range elements {
if elem.Value.(*NormalizedEvent) == bEvent {
s.resetNetworkTTL(elem)
break
}
}
}
}
}
// In one-to-one mode, remove the first matching B
if s.config.MatchingMode == MatchingModeOneToOne {
s.removeEvent(s.bufferB, s.pendingB, matches[0])
}
return results, false
}
// No match found - orphan A event
@ -206,30 +255,50 @@ func (s *CorrelationService) addEvent(event *NormalizedEvent) {
case SourceB:
elem := s.bufferB.events.PushBack(event)
s.pendingB[key] = append(s.pendingB[key], elem)
// Set TTL for network event
s.networkTTLs[elem] = s.timeProvider.Now().Add(time.Duration(s.config.NetworkTTLS) * time.Second)
}
}
func (s *CorrelationService) cleanExpired() {
now := s.timeProvider.Now()
cutoff := now.Add(-s.config.TimeWindow)
// Clean expired events from both buffers using shared logic
s.cleanBuffer(s.bufferA, s.pendingA, cutoff)
s.cleanBuffer(s.bufferB, s.pendingB, cutoff)
// Clean expired A events (based on time window)
aCutoff := now.Add(-s.config.TimeWindow)
s.cleanBuffer(s.bufferA, s.pendingA, aCutoff, nil)
// Clean expired B events (based on TTL)
bCutoff := now.Add(-time.Duration(s.config.NetworkTTLS) * time.Second)
s.cleanBuffer(s.bufferB, s.pendingB, bCutoff, s.networkTTLs)
}
// cleanBuffer removes expired events from a buffer.
func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time) {
func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time, networkTTLs map[*list.Element]time.Time) {
for elem := buffer.events.Front(); elem != nil; {
next := elem.Next()
event := elem.Value.(*NormalizedEvent)
if event.Timestamp.Before(cutoff) {
// Check if event is expired
isExpired := event.Timestamp.Before(cutoff)
// For B events, also check TTL
if !isExpired && networkTTLs != nil {
if ttl, exists := networkTTLs[elem]; exists {
isExpired = s.timeProvider.Now().After(ttl)
}
}
if isExpired {
key := event.CorrelationKey()
buffer.events.Remove(elem)
pending[key] = removeElementFromSlice(pending[key], elem)
if len(pending[key]) == 0 {
delete(pending, key)
}
// Remove from TTL map
if networkTTLs != nil {
delete(networkTTLs, elem)
}
}
elem = next
}
@ -266,6 +335,76 @@ func (s *CorrelationService) findAndPopFirstMatch(
return nil
}
// findMatches returns all matching events without removing them (for one-to-many).
func (s *CorrelationService) findMatches(
buffer *eventBuffer,
pending map[string][]*list.Element,
key string,
matcher func(*NormalizedEvent) bool,
) []*NormalizedEvent {
elements, ok := pending[key]
if !ok || len(elements) == 0 {
return nil
}
var matches []*NormalizedEvent
for _, elem := range elements {
other := elem.Value.(*NormalizedEvent)
if matcher(other) {
matches = append(matches, other)
}
}
return matches
}
// getElementByKey finds the list element for a given event in pending map.
func (s *CorrelationService) getElementByKey(pending map[string][]*list.Element, key string, event *NormalizedEvent) *list.Element {
elements, ok := pending[key]
if !ok {
return nil
}
for _, elem := range elements {
if elem.Value.(*NormalizedEvent) == event {
return elem
}
}
return nil
}
// removeEvent removes an event from buffer and pending maps.
func (s *CorrelationService) removeEvent(buffer *eventBuffer, pending map[string][]*list.Element, event *NormalizedEvent) {
key := event.CorrelationKey()
elements, ok := pending[key]
if !ok {
return
}
for idx, elem := range elements {
if elem.Value.(*NormalizedEvent) == event {
buffer.events.Remove(elem)
updated := append(elements[:idx], elements[idx+1:]...)
if len(updated) == 0 {
delete(pending, key)
} else {
pending[key] = updated
}
// Remove from TTL map if present
delete(s.networkTTLs, elem)
break
}
}
}
// resetNetworkTTL resets the TTL for a network event (Keep-Alive).
func (s *CorrelationService) resetNetworkTTL(elem *list.Element) {
if elem == nil {
return
}
s.networkTTLs[elem] = s.timeProvider.Now().Add(time.Duration(s.config.NetworkTTLS) * time.Second)
}
func removeElementFromSlice(elements []*list.Element, target *list.Element) []*list.Element {
if len(elements) == 0 {
return elements
@ -301,6 +440,7 @@ func (s *CorrelationService) Flush() []CorrelatedLog {
s.bufferB.events.Init()
s.pendingA = make(map[string][]*list.Element)
s.pendingB = make(map[string][]*list.Element)
s.networkTTLs = make(map[*list.Element]time.Time)
return results
}

View File

@ -18,9 +18,13 @@ func TestCorrelationService_Match(t *testing.T) {
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: false, // Don't emit A immediately to test matching
NetworkEmit: false,
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
@ -62,9 +66,13 @@ func TestCorrelationService_NoMatch_DifferentIP(t *testing.T) {
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
NetworkEmit: false,
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
@ -96,9 +104,13 @@ func TestCorrelationService_NoMatch_TimeWindowExceeded(t *testing.T) {
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
NetworkEmit: false,
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
@ -130,9 +142,13 @@ func TestCorrelationService_Flush(t *testing.T) {
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
NetworkEmit: false,
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
@ -161,9 +177,13 @@ func TestCorrelationService_GetBufferSizes(t *testing.T) {
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
@ -194,9 +214,13 @@ func TestCorrelationService_FlushWithEvents(t *testing.T) {
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
NetworkEmit: true,
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
NetworkEmit: true,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
@ -222,6 +246,7 @@ func TestCorrelationService_FlushWithEvents(t *testing.T) {
elemB := svc.bufferB.events.PushBack(networkEvent)
svc.pendingB[keyB] = append(svc.pendingB[keyB], elemB)
svc.networkTTLs[elemB] = now.Add(time.Duration(svc.config.NetworkTTLS) * time.Second)
flushed := svc.Flush()
if len(flushed) != 1 {
@ -243,10 +268,11 @@ func TestCorrelationService_BufferOverflow(t *testing.T) {
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
MaxBufferSize: 2,
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
MaxHTTPBufferSize: 2,
MaxNetworkBufferSize: 2,
}
svc := NewCorrelationService(config, timeProvider)
@ -282,12 +308,21 @@ func TestCorrelationService_DefaultConfig(t *testing.T) {
config := CorrelationConfig{}
svc := NewCorrelationService(config, timeProvider)
if svc.config.MaxBufferSize != DefaultMaxBufferSize {
t.Errorf("expected MaxBufferSize %d, got %d", DefaultMaxBufferSize, svc.config.MaxBufferSize)
if svc.config.MaxHTTPBufferSize != DefaultMaxHTTPBufferSize {
t.Errorf("expected MaxHTTPBufferSize %d, got %d", DefaultMaxHTTPBufferSize, svc.config.MaxHTTPBufferSize)
}
if svc.config.MaxNetworkBufferSize != DefaultMaxNetworkBufferSize {
t.Errorf("expected MaxNetworkBufferSize %d, got %d", DefaultMaxNetworkBufferSize, svc.config.MaxNetworkBufferSize)
}
if svc.config.TimeWindow != DefaultTimeWindow {
t.Errorf("expected TimeWindow %v, got %v", DefaultTimeWindow, svc.config.TimeWindow)
}
if svc.config.NetworkTTLS != DefaultNetworkTTLS {
t.Errorf("expected NetworkTTLS %d, got %d", DefaultNetworkTTLS, svc.config.NetworkTTLS)
}
if svc.config.MatchingMode != MatchingModeOneToMany {
t.Errorf("expected MatchingMode %s, got %s", MatchingModeOneToMany, svc.config.MatchingMode)
}
}
func TestCorrelationService_NilTimeProvider(t *testing.T) {
@ -307,9 +342,13 @@ func TestCorrelationService_DifferentSourceTypes(t *testing.T) {
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
@ -333,10 +372,9 @@ func TestCorrelationService_DifferentSourceTypes(t *testing.T) {
SrcPort: 8080,
}
results = svc.ProcessEvent(apacheEvent)
if len(results) != 1 {
t.Errorf("expected 1 result (correlated), got %d", len(results))
}
if !results[0].Correlated {
if len(results) < 1 {
t.Errorf("expected at least 1 result (correlated), got %d", len(results))
} else if !results[0].Correlated {
t.Error("expected correlated result")
}
}
@ -346,9 +384,13 @@ func TestCorrelationService_NetworkEmitTrue_DoesNotEmitBAlone(t *testing.T) {
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: true,
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: true,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
MatchingMode: MatchingModeOneToMany,
}
svc := NewCorrelationService(config, timeProvider)
@ -370,3 +412,204 @@ func TestCorrelationService_NetworkEmitTrue_DoesNotEmitBAlone(t *testing.T) {
t.Errorf("expected 0 flushed orphan B events, got %d", len(flushed))
}
}
func TestCorrelationService_OneToMany_KeepAlive(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
MatchingMode: MatchingModeOneToMany, // Keep-Alive mode
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
}
svc := NewCorrelationService(config, timeProvider)
// Send B event first (network)
networkEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"ja3": "abc123"},
}
results := svc.ProcessEvent(networkEvent)
if len(results) != 0 {
t.Fatalf("expected 0 results (B buffered), got %d", len(results))
}
// Send first A event (Apache) - should correlate with B
apacheEvent1 := &NormalizedEvent{
Source: SourceA,
Timestamp: now.Add(500 * time.Millisecond),
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"method": "GET", "path": "/api/first"},
}
results = svc.ProcessEvent(apacheEvent1)
if len(results) != 1 {
t.Errorf("expected 1 correlated result for first A, got %d", len(results))
} else if !results[0].Correlated {
t.Error("expected correlated result for first A")
}
// Send second A event (same connection, Keep-Alive) - should also correlate with same B
apacheEvent2 := &NormalizedEvent{
Source: SourceA,
Timestamp: now.Add(1 * time.Second),
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"method": "GET", "path": "/api/second"},
}
results = svc.ProcessEvent(apacheEvent2)
if len(results) != 1 {
t.Errorf("expected 1 correlated result for second A (Keep-Alive), got %d", len(results))
} else if !results[0].Correlated {
t.Error("expected correlated result for second A (Keep-Alive)")
}
// Verify B is still in buffer (Keep-Alive)
a, b := svc.GetBufferSizes()
if a != 0 {
t.Errorf("expected A buffer empty, got %d", a)
}
if b != 1 {
t.Errorf("expected B buffer size 1 (Keep-Alive), got %d", b)
}
}
func TestCorrelationService_OneToOne_ConsumeB(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
MatchingMode: MatchingModeOneToOne, // Consume B after match
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: DefaultNetworkTTLS,
}
svc := NewCorrelationService(config, timeProvider)
// Send B event first
networkEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
Raw: map[string]any{"ja3": "abc123"},
}
svc.ProcessEvent(networkEvent)
// Send first A event - should correlate and consume B
apacheEvent1 := &NormalizedEvent{
Source: SourceA,
Timestamp: now.Add(500 * time.Millisecond),
SrcIP: "192.168.1.1",
SrcPort: 8080,
}
results := svc.ProcessEvent(apacheEvent1)
if len(results) != 1 {
t.Fatalf("expected 1 correlated result, got %d", len(results))
}
// Send second A event - should NOT correlate (B was consumed)
apacheEvent2 := &NormalizedEvent{
Source: SourceA,
Timestamp: now.Add(1 * time.Second),
SrcIP: "192.168.1.1",
SrcPort: 8080,
}
results = svc.ProcessEvent(apacheEvent2)
if len(results) != 0 {
t.Errorf("expected 0 results (B consumed), got %d", len(results))
}
// Verify both buffers are empty
a, b := svc.GetBufferSizes()
if a != 1 {
t.Errorf("expected A buffer size 1 (second A buffered), got %d", a)
}
if b != 0 {
t.Errorf("expected B buffer empty (consumed), got %d", b)
}
}
func TestCorrelationService_NetworkTTL_ResetOnMatch(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: 5 * time.Second, // 5 seconds time window for correlation
ApacheAlwaysEmit: false,
NetworkEmit: false,
MatchingMode: MatchingModeOneToMany,
MaxHTTPBufferSize: DefaultMaxHTTPBufferSize,
MaxNetworkBufferSize: DefaultMaxNetworkBufferSize,
NetworkTTLS: 10, // 10 seconds TTL for B events
}
svc := NewCorrelationService(config, timeProvider)
// Send B event
networkEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
}
svc.ProcessEvent(networkEvent)
// Verify B is in buffer
_, b := svc.GetBufferSizes()
if b != 1 {
t.Fatalf("expected B in buffer, got %d", b)
}
// Advance time by 3 seconds (before TTL expires)
timeProvider.now = now.Add(3 * time.Second)
// Send A event with timestamp within time window of B
// A's timestamp is t=3s, B's timestamp is t=0s, diff = 3s < 5s (time_window)
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: timeProvider.now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
}
results := svc.ProcessEvent(apacheEvent)
if len(results) != 1 {
t.Fatalf("expected 1 correlated result, got %d", len(results))
}
// B should still be in buffer (TTL reset)
_, b = svc.GetBufferSizes()
if b != 1 {
t.Errorf("expected B still in buffer after TTL reset, got %d", b)
}
// Advance time by 7 more seconds (total 10s from start, 7s from last match)
timeProvider.now = now.Add(10 * time.Second)
// B should still be alive (TTL was reset to 10s from t=3s, so expires at t=13s)
svc.cleanExpired()
_, b = svc.GetBufferSizes()
if b != 1 {
t.Errorf("expected B still alive after TTL reset, got %d", b)
}
// Advance time past the reset TTL (t=14s > t=13s)
timeProvider.now = now.Add(14 * time.Second)
svc.cleanExpired()
_, b = svc.GetBufferSizes()
if b != 0 {
t.Errorf("expected B expired after reset TTL, got %d", b)
}
}

View File

@ -32,6 +32,10 @@ type CorrelatedLogSink interface {
// Name returns the sink name.
Name() string
// Reopen closes and reopens the sink (for log rotation on SIGHUP).
// Optional: only FileSink implements this.
Reopen() error
}
// CorrelationProcessor defines the interface for the correlation service.