chore: release v1.0.2 with critical fixes and test improvements

- fix: add missing ClickHouse driver dependency
- fix: resolve race condition in orchestrator (single goroutine per source)
- feat: add explicit source_type config for Unix socket sources
- test: improve coverage from 50.6% to 62.0%
- docs: add CHANGELOG.md with release notes
- build: update version to 1.0.2 in build scripts and Dockerfiles

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Jacquin Antoine
2026-02-28 21:45:00 +01:00
parent 5f97af3627
commit 180c57c35b
15 changed files with 1178 additions and 30 deletions

View File

@ -28,8 +28,9 @@ const (
// Config holds the Unix socket source configuration.
type Config struct {
Name string
Path string
Name string
Path string
SourceType string // "A" for Apache/HTTP, "B" for Network, "" for auto-detect
}
// UnixSocketSource reads JSON events from a Unix socket.
@ -160,7 +161,7 @@ func (s *UnixSocketSource) readEvents(ctx context.Context, conn net.Conn, eventC
continue
}
event, err := parseJSONEvent(line)
event, err := parseJSONEvent(line, s.config.SourceType)
if err != nil {
// Log parse errors but continue processing
continue
@ -174,7 +175,7 @@ func (s *UnixSocketSource) readEvents(ctx context.Context, conn net.Conn, eventC
}
}
func parseJSONEvent(data []byte) (*domain.NormalizedEvent, error) {
func parseJSONEvent(data []byte, sourceType string) (*domain.NormalizedEvent, error) {
var raw map[string]any
if err := json.Unmarshal(data, &raw); err != nil {
return nil, fmt.Errorf("invalid JSON: %w", err)
@ -243,11 +244,19 @@ func parseJSONEvent(data []byte) (*domain.NormalizedEvent, error) {
}
}
// Determine source based on fields present
if len(event.Headers) > 0 {
// Determine source based on explicit config or fallback to heuristic
switch sourceType {
case "A", "a", "apache", "http":
event.Source = domain.SourceA
} else {
case "B", "b", "network", "net":
event.Source = domain.SourceB
default:
// Fallback to heuristic detection for backward compatibility
if len(event.Headers) > 0 {
event.Source = domain.SourceA
} else {
event.Source = domain.SourceB
}
}
// Extra fields (single pass)

View File

@ -1,8 +1,11 @@
package unixsocket
import (
"context"
"testing"
"time"
"github.com/logcorrelator/logcorrelator/internal/domain"
)
func TestParseJSONEvent_Apache(t *testing.T) {
@ -18,7 +21,7 @@ func TestParseJSONEvent_Apache(t *testing.T) {
"header_user_agent": "Mozilla/5.0"
}`)
event, err := parseJSONEvent(data)
event, err := parseJSONEvent(data, "A")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -35,6 +38,9 @@ func TestParseJSONEvent_Apache(t *testing.T) {
if event.Headers["user_agent"] != "Mozilla/5.0" {
t.Errorf("expected header_user_agent Mozilla/5.0, got %s", event.Headers["user_agent"])
}
if event.Source != domain.SourceA {
t.Errorf("expected source A, got %s", event.Source)
}
}
func TestParseJSONEvent_Network(t *testing.T) {
@ -48,7 +54,7 @@ func TestParseJSONEvent_Network(t *testing.T) {
"tcp_meta_flags": "SYN"
}`)
event, err := parseJSONEvent(data)
event, err := parseJSONEvent(data, "B")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -59,12 +65,15 @@ func TestParseJSONEvent_Network(t *testing.T) {
if event.Extra["ja3"] != "abc123def456" {
t.Errorf("expected ja3 abc123def456, got %v", event.Extra["ja3"])
}
if event.Source != domain.SourceB {
t.Errorf("expected source B, got %s", event.Source)
}
}
func TestParseJSONEvent_InvalidJSON(t *testing.T) {
data := []byte(`{invalid json}`)
_, err := parseJSONEvent(data)
_, err := parseJSONEvent(data, "")
if err == nil {
t.Error("expected error for invalid JSON")
}
@ -73,7 +82,7 @@ func TestParseJSONEvent_InvalidJSON(t *testing.T) {
func TestParseJSONEvent_MissingFields(t *testing.T) {
data := []byte(`{"other_field": "value"}`)
_, err := parseJSONEvent(data)
_, err := parseJSONEvent(data, "")
if err == nil {
t.Error("expected error for missing src_ip/src_port")
}
@ -86,7 +95,7 @@ func TestParseJSONEvent_StringTimestamp(t *testing.T) {
"time": "2024-01-01T12:00:00Z"
}`)
event, err := parseJSONEvent(data)
event, err := parseJSONEvent(data, "")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -96,3 +105,258 @@ func TestParseJSONEvent_StringTimestamp(t *testing.T) {
t.Errorf("expected timestamp %v, got %v", expected, event.Timestamp)
}
}
func TestParseJSONEvent_ExplicitSourceType(t *testing.T) {
tests := []struct {
name string
data string
sourceType string
expected domain.EventSource
}{
{
name: "explicit A",
data: `{"src_ip": "192.168.1.1", "src_port": 8080}`,
sourceType: "A",
expected: domain.SourceA,
},
{
name: "explicit B",
data: `{"src_ip": "192.168.1.1", "src_port": 8080}`,
sourceType: "B",
expected: domain.SourceB,
},
{
name: "explicit apache",
data: `{"src_ip": "192.168.1.1", "src_port": 8080}`,
sourceType: "apache",
expected: domain.SourceA,
},
{
name: "explicit network",
data: `{"src_ip": "192.168.1.1", "src_port": 8080}`,
sourceType: "network",
expected: domain.SourceB,
},
{
name: "auto-detect A with headers",
data: `{"src_ip": "192.168.1.1", "src_port": 8080, "header_host": "example.com"}`,
sourceType: "",
expected: domain.SourceA,
},
{
name: "auto-detect B without headers",
data: `{"src_ip": "192.168.1.1", "src_port": 8080, "ja3": "abc"}`,
sourceType: "",
expected: domain.SourceB,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
event, err := parseJSONEvent([]byte(tt.data), tt.sourceType)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if event.Source != tt.expected {
t.Errorf("expected source %s, got %s", tt.expected, event.Source)
}
})
}
}
func TestUnixSocketSource_Name(t *testing.T) {
source := NewUnixSocketSource(Config{
Name: "test_source",
Path: "/tmp/test.sock",
})
if source.Name() != "test_source" {
t.Errorf("expected name 'test_source', got %s", source.Name())
}
}
func TestUnixSocketSource_StopWithoutStart(t *testing.T) {
source := NewUnixSocketSource(Config{
Name: "test_source",
Path: "/tmp/test.sock",
})
// Should not panic
err := source.Stop()
if err != nil {
t.Errorf("expected no error on stop without start, got %v", err)
}
}
func TestUnixSocketSource_EmptyPath(t *testing.T) {
source := NewUnixSocketSource(Config{
Name: "test_source",
Path: "",
})
ctx := context.Background()
eventChan := make(chan *domain.NormalizedEvent, 10)
err := source.Start(ctx, eventChan)
if err == nil {
t.Error("expected error for empty path")
}
}
func TestGetString(t *testing.T) {
m := map[string]any{
"string": "hello",
"int": 42,
"nil": nil,
}
v, ok := getString(m, "string")
if !ok || v != "hello" {
t.Errorf("expected 'hello', got %v, %v", v, ok)
}
_, ok = getString(m, "int")
if ok {
t.Error("expected false for int")
}
_, ok = getString(m, "missing")
if ok {
t.Error("expected false for missing key")
}
}
func TestGetInt(t *testing.T) {
m := map[string]any{
"float": 42.5,
"int": 42,
"int64": int64(42),
"string": "42",
"bad": "not a number",
"nil": nil,
}
tests := []struct {
key string
expected int
ok bool
}{
{"float", 42, true},
{"int", 42, true},
{"int64", 42, true},
{"string", 42, true},
{"bad", 0, false},
{"nil", 0, false},
{"missing", 0, false},
}
for _, tt := range tests {
t.Run(tt.key, func(t *testing.T) {
v, ok := getInt(m, tt.key)
if ok != tt.ok {
t.Errorf("getInt(%q) ok = %v, want %v", tt.key, ok, tt.ok)
}
if v != tt.expected {
t.Errorf("getInt(%q) = %v, want %v", tt.key, v, tt.expected)
}
})
}
}
func TestGetInt64(t *testing.T) {
m := map[string]any{
"float": 42.5,
"int": 42,
"int64": int64(42),
"string": "42",
"bad": "not a number",
"nil": nil,
}
tests := []struct {
key string
expected int64
ok bool
}{
{"float", 42, true},
{"int", 42, true},
{"int64", 42, true},
{"string", 42, true},
{"bad", 0, false},
{"nil", 0, false},
{"missing", 0, false},
}
for _, tt := range tests {
t.Run(tt.key, func(t *testing.T) {
v, ok := getInt64(m, tt.key)
if ok != tt.ok {
t.Errorf("getInt64(%q) ok = %v, want %v", tt.key, ok, tt.ok)
}
if v != tt.expected {
t.Errorf("getInt64(%q) = %v, want %v", tt.key, v, tt.expected)
}
})
}
}
func TestParseJSONEvent_PortValidation(t *testing.T) {
tests := []struct {
name string
data string
wantErr bool
}{
{
name: "valid src_port",
data: `{"src_ip": "192.168.1.1", "src_port": 8080}`,
wantErr: false,
},
{
name: "src_port zero",
data: `{"src_ip": "192.168.1.1", "src_port": 0}`,
wantErr: true,
},
{
name: "src_port negative",
data: `{"src_ip": "192.168.1.1", "src_port": -1}`,
wantErr: true,
},
{
name: "src_port too high",
data: `{"src_ip": "192.168.1.1", "src_port": 70000}`,
wantErr: true,
},
{
name: "valid dst_port zero",
data: `{"src_ip": "192.168.1.1", "src_port": 8080, "dst_port": 0}`,
wantErr: false,
},
{
name: "dst_port too high",
data: `{"src_ip": "192.168.1.1", "src_port": 8080, "dst_port": 70000}`,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := parseJSONEvent([]byte(tt.data), "")
if (err != nil) != tt.wantErr {
t.Errorf("parseJSONEvent() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestParseJSONEvent_TimestampFallback(t *testing.T) {
data := []byte(`{"src_ip": "192.168.1.1", "src_port": 8080}`)
event, err := parseJSONEvent(data, "")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Should fallback to current time if no timestamp provided
if event.Timestamp.IsZero() {
t.Error("expected non-zero timestamp")
}
}

View File

@ -9,6 +9,7 @@ import (
"sync"
"time"
_ "github.com/ClickHouse/clickhouse-go/v2"
"github.com/logcorrelator/logcorrelator/internal/domain"
)

View File

@ -94,3 +94,98 @@ func TestFileSink_Name(t *testing.T) {
t.Errorf("expected name 'file', got %s", sink.Name())
}
}
func TestFileSink_ValidateFilePath(t *testing.T) {
tests := []struct {
name string
path string
wantErr bool
}{
{"empty path", "", true},
{"valid /var/log/logcorrelator", "/var/log/logcorrelator/test.log", false},
{"valid /var/log", "/var/log/test.log", false},
{"valid /tmp", "/tmp/test.log", false},
{"path traversal", "/var/log/../etc/passwd", true},
{"invalid directory", "/etc/logcorrelator/test.log", true},
{"relative path", "test.log", false}, // Allowed for testing
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateFilePath(tt.path)
if (err != nil) != tt.wantErr {
t.Errorf("validateFilePath(%q) error = %v, wantErr %v", tt.path, err, tt.wantErr)
}
})
}
}
func TestFileSink_OpenFile(t *testing.T) {
tmpDir := t.TempDir()
testPath := filepath.Join(tmpDir, "subdir", "test.log")
sink := &FileSink{
config: Config{Path: testPath},
}
err := sink.openFile()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
defer sink.Close()
if sink.file == nil {
t.Error("expected file to be opened")
}
if sink.writer == nil {
t.Error("expected writer to be initialized")
}
}
func TestFileSink_WriteBeforeOpen(t *testing.T) {
tmpDir := t.TempDir()
testPath := filepath.Join(tmpDir, "test.log")
sink, err := NewFileSink(Config{Path: testPath})
if err != nil {
t.Fatalf("failed to create sink: %v", err)
}
defer sink.Close()
// Write should open file automatically
log := domain.CorrelatedLog{SrcIP: "192.168.1.1", SrcPort: 8080}
err = sink.Write(context.Background(), log)
if err != nil {
t.Fatalf("failed to write: %v", err)
}
// Verify file was created
if _, err := os.Stat(testPath); os.IsNotExist(err) {
t.Error("expected file to be created")
}
}
func TestFileSink_FlushBeforeOpen(t *testing.T) {
tmpDir := t.TempDir()
testPath := filepath.Join(tmpDir, "test.log")
sink, err := NewFileSink(Config{Path: testPath})
if err != nil {
t.Fatalf("failed to create sink: %v", err)
}
defer sink.Close()
// Flush before any write should not error
err = sink.Flush(context.Background())
if err != nil {
t.Errorf("expected no error on flush before open, got %v", err)
}
}
func TestFileSink_InvalidPath(t *testing.T) {
// Test with invalid path (path traversal)
_, err := NewFileSink(Config{Path: "/etc/../passwd"})
if err == nil {
t.Error("expected error for invalid path")
}
}

View File

@ -112,3 +112,115 @@ func TestMultiSink_AddSink(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
}
func TestMultiSink_Name(t *testing.T) {
ms := NewMultiSink()
if ms.Name() != "multi" {
t.Errorf("expected name 'multi', got %s", ms.Name())
}
}
func TestMultiSink_Flush(t *testing.T) {
flushed := false
sink := &mockSink{
name: "test",
writeFunc: func(log domain.CorrelatedLog) error { return nil },
flushFunc: func() error {
flushed = true
return nil
},
closeFunc: func() error { return nil },
}
ms := NewMultiSink(sink)
err := ms.Flush(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !flushed {
t.Error("expected sink to be flushed")
}
}
func TestMultiSink_Flush_Error(t *testing.T) {
sink := &mockSink{
name: "test",
writeFunc: func(log domain.CorrelatedLog) error { return nil },
flushFunc: func() error { return context.Canceled },
closeFunc: func() error { return nil },
}
ms := NewMultiSink(sink)
err := ms.Flush(context.Background())
if err != context.Canceled {
t.Errorf("expected context.Canceled error, got %v", err)
}
}
func TestMultiSink_Close(t *testing.T) {
closed := false
sink := &mockSink{
name: "test",
writeFunc: func(log domain.CorrelatedLog) error { return nil },
flushFunc: func() error { return nil },
closeFunc: func() error {
closed = true
return nil
},
}
ms := NewMultiSink(sink)
err := ms.Close()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !closed {
t.Error("expected sink to be closed")
}
}
func TestMultiSink_Close_Error(t *testing.T) {
sink := &mockSink{
name: "test",
writeFunc: func(log domain.CorrelatedLog) error { return nil },
flushFunc: func() error { return nil },
closeFunc: func() error { return context.Canceled },
}
ms := NewMultiSink(sink)
err := ms.Close()
if err != context.Canceled {
t.Errorf("expected context.Canceled error, got %v", err)
}
}
func TestMultiSink_Write_EmptySinks(t *testing.T) {
ms := NewMultiSink()
log := domain.CorrelatedLog{SrcIP: "192.168.1.1"}
err := ms.Write(context.Background(), log)
if err != nil {
t.Fatalf("unexpected error with empty sinks: %v", err)
}
}
func TestMultiSink_Write_ContextCancelled(t *testing.T) {
sink := &mockSink{
name: "test",
writeFunc: func(log domain.CorrelatedLog) error {
<-context.Background().Done()
return nil
},
flushFunc: func() error { return nil },
closeFunc: func() error { return nil },
}
ms := NewMultiSink(sink)
ctx, cancel := context.WithCancel(context.Background())
cancel()
log := domain.CorrelatedLog{SrcIP: "192.168.1.1"}
err := ms.Write(ctx, log)
if err != context.Canceled {
t.Errorf("expected context.Canceled error, got %v", err)
}
}

View File

@ -57,15 +57,18 @@ func (o *Orchestrator) Start() error {
o.wg.Add(1)
go func(src ports.EventSource, evChan chan *domain.NormalizedEvent) {
defer o.wg.Done()
// Start the source in a separate goroutine
sourceErr := make(chan error, 1)
go func() {
sourceErr <- src.Start(o.ctx, evChan)
}()
// Process events in the current goroutine
o.processEvents(evChan)
}(source, eventChan)
o.wg.Add(1)
go func(src ports.EventSource, evChan chan *domain.NormalizedEvent) {
defer o.wg.Done()
if err := src.Start(o.ctx, evChan); err != nil {
// Source failed, but continue with others
}
// Wait for source to stop
<-sourceErr
}(source, eventChan)
}

View File

@ -30,9 +30,10 @@ type InputsConfig struct {
// UnixSocketConfig holds a Unix socket source configuration.
type UnixSocketConfig struct {
Name string `yaml:"name"`
Path string `yaml:"path"`
Format string `yaml:"format"`
Name string `yaml:"name"`
Path string `yaml:"path"`
Format string `yaml:"format"`
SourceType string `yaml:"source_type"` // "A" for Apache/HTTP, "B" for Network
}
// OutputsConfig holds output sinks configuration.

View File

@ -207,3 +207,279 @@ func TestGetTimeWindow(t *testing.T) {
})
}
}
func TestValidate_DuplicateNames(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "same", Path: "/tmp/a.sock"},
{Name: "same", Path: "/tmp/b.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: true},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for duplicate names")
}
}
func TestValidate_DuplicatePaths(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "a", Path: "/tmp/same.sock"},
{Name: "b", Path: "/tmp/same.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: true},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for duplicate paths")
}
}
func TestValidate_EmptyName(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "", Path: "/tmp/a.sock"},
{Name: "b", Path: "/tmp/b.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: true},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for empty name")
}
}
func TestValidate_EmptyPath(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "a", Path: ""},
{Name: "b", Path: "/tmp/b.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: true},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for empty path")
}
}
func TestValidate_EmptyFilePath(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "a", Path: "/tmp/a.sock"},
{Name: "b", Path: "/tmp/b.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: true, Path: ""},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for empty file path")
}
}
func TestValidate_ClickHouseMissingDSN(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "a", Path: "/tmp/a.sock"},
{Name: "b", Path: "/tmp/b.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: false},
ClickHouse: ClickHouseOutputConfig{
Enabled: true,
DSN: "",
Table: "test",
},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for missing ClickHouse DSN")
}
}
func TestValidate_ClickHouseMissingTable(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "a", Path: "/tmp/a.sock"},
{Name: "b", Path: "/tmp/b.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: false},
ClickHouse: ClickHouseOutputConfig{
Enabled: true,
DSN: "clickhouse://localhost:9000/db",
Table: "",
},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for missing ClickHouse table")
}
}
func TestValidate_ClickHouseInvalidBatchSize(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "a", Path: "/tmp/a.sock"},
{Name: "b", Path: "/tmp/b.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: false},
ClickHouse: ClickHouseOutputConfig{
Enabled: true,
DSN: "clickhouse://localhost:9000/db",
Table: "test",
BatchSize: 0,
},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for invalid batch size")
}
}
func TestValidate_ClickHouseInvalidMaxBufferSize(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "a", Path: "/tmp/a.sock"},
{Name: "b", Path: "/tmp/b.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: false},
ClickHouse: ClickHouseOutputConfig{
Enabled: true,
DSN: "clickhouse://localhost:9000/db",
Table: "test",
BatchSize: 100,
MaxBufferSize: 0,
},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for invalid max buffer size")
}
}
func TestValidate_ClickHouseInvalidTimeout(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "a", Path: "/tmp/a.sock"},
{Name: "b", Path: "/tmp/b.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: false},
ClickHouse: ClickHouseOutputConfig{
Enabled: true,
DSN: "clickhouse://localhost:9000/db",
Table: "test",
BatchSize: 100,
TimeoutMs: 0,
},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for invalid timeout")
}
}
func TestValidate_EmptyCorrelationKey(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "a", Path: "/tmp/a.sock"},
{Name: "b", Path: "/tmp/b.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: true},
},
Correlation: CorrelationConfig{
Key: []string{},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for empty correlation key")
}
}
func TestValidate_InvalidTimeWindow(t *testing.T) {
cfg := &Config{
Inputs: InputsConfig{
UnixSockets: []UnixSocketConfig{
{Name: "a", Path: "/tmp/a.sock"},
{Name: "b", Path: "/tmp/b.sock"},
},
},
Outputs: OutputsConfig{
File: FileOutputConfig{Enabled: true},
},
Correlation: CorrelationConfig{
Key: []string{"src_ip", "src_port"},
TimeWindow: TimeWindowConfig{Value: 0},
},
}
err := cfg.Validate()
if err == nil {
t.Error("expected error for invalid time window")
}
}
func TestGetTimeWindow_UnknownUnit(t *testing.T) {
config := CorrelationConfig{
TimeWindow: TimeWindowConfig{Value: 5, Unit: "unknown"},
}
result := config.GetTimeWindow()
expected := 5 * time.Second // Should default to seconds
if result != expected {
t.Errorf("expected %v, got %v", expected, result)
}
}

View File

@ -155,3 +155,188 @@ func TestCorrelationService_Flush(t *testing.T) {
t.Errorf("expected 0 flushed events, got %d", len(flushed))
}
}
func TestCorrelationService_GetBufferSizes(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
}
svc := NewCorrelationService(config, timeProvider)
// Empty buffers
a, b := svc.GetBufferSizes()
if a != 0 || b != 0 {
t.Errorf("expected empty buffers, got A=%d, B=%d", a, b)
}
// Add event to buffer A
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
}
svc.ProcessEvent(apacheEvent)
a, b = svc.GetBufferSizes()
if a != 1 || b != 0 {
t.Errorf("expected A=1, B=0, got A=%d, B=%d", a, b)
}
}
func TestCorrelationService_FlushWithEvents(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
// Flush only emits events if ApacheAlwaysEmit and NetworkEmit are true
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: true,
NetworkEmit: true,
}
svc := NewCorrelationService(config, timeProvider)
// We need to bypass the normal ProcessEvent logic to get events into buffers
// Add events directly to buffers for testing Flush
keyA := "192.168.1.1:8080"
keyB := "192.168.1.2:9090"
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
}
networkEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now,
SrcIP: "192.168.1.2",
SrcPort: 9090,
}
// Manually add to buffers (simulating events that couldn't be matched)
elemA := svc.bufferA.events.PushBack(apacheEvent)
svc.pendingA[keyA] = append(svc.pendingA[keyA], elemA)
elemB := svc.bufferB.events.PushBack(networkEvent)
svc.pendingB[keyB] = append(svc.pendingB[keyB], elemB)
flushed := svc.Flush()
if len(flushed) != 2 {
t.Errorf("expected 2 flushed events, got %d", len(flushed))
}
// Verify buffers are cleared
a, b := svc.GetBufferSizes()
if a != 0 || b != 0 {
t.Errorf("expected empty buffers after flush, got A=%d, B=%d", a, b)
}
}
func TestCorrelationService_BufferOverflow(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
MaxBufferSize: 2,
}
svc := NewCorrelationService(config, timeProvider)
// Fill buffer A
for i := 0; i < 2; i++ {
event := &NormalizedEvent{
Source: SourceA,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080 + i,
}
svc.ProcessEvent(event)
}
// Buffer full, next event should be dropped (not emitted since ApacheAlwaysEmit=false but buffer full)
overflowEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 9999,
}
results := svc.ProcessEvent(overflowEvent)
if len(results) != 0 {
t.Errorf("expected 0 results on buffer overflow, got %d", len(results))
}
}
func TestCorrelationService_DefaultConfig(t *testing.T) {
timeProvider := &RealTimeProvider{}
// Test with zero config - should use defaults
config := CorrelationConfig{}
svc := NewCorrelationService(config, timeProvider)
if svc.config.MaxBufferSize != DefaultMaxBufferSize {
t.Errorf("expected MaxBufferSize %d, got %d", DefaultMaxBufferSize, svc.config.MaxBufferSize)
}
if svc.config.TimeWindow != DefaultTimeWindow {
t.Errorf("expected TimeWindow %v, got %v", DefaultTimeWindow, svc.config.TimeWindow)
}
}
func TestCorrelationService_NilTimeProvider(t *testing.T) {
config := CorrelationConfig{
TimeWindow: time.Second,
}
// Should not panic with nil time provider
svc := NewCorrelationService(config, nil)
if svc == nil {
t.Error("expected non-nil service")
}
}
func TestCorrelationService_DifferentSourceTypes(t *testing.T) {
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
timeProvider := &mockTimeProvider{now: now}
config := CorrelationConfig{
TimeWindow: time.Second,
ApacheAlwaysEmit: false,
NetworkEmit: false,
}
svc := NewCorrelationService(config, timeProvider)
// Send B first, then A - should still match
networkEvent := &NormalizedEvent{
Source: SourceB,
Timestamp: now,
SrcIP: "192.168.1.1",
SrcPort: 8080,
}
results := svc.ProcessEvent(networkEvent)
if len(results) != 0 {
t.Errorf("expected 0 results (buffered B), got %d", len(results))
}
apacheEvent := &NormalizedEvent{
Source: SourceA,
Timestamp: now.Add(500 * time.Millisecond),
SrcIP: "192.168.1.1",
SrcPort: 8080,
}
results = svc.ProcessEvent(apacheEvent)
if len(results) != 1 {
t.Errorf("expected 1 result (correlated), got %d", len(results))
}
if !results[0].Correlated {
t.Error("expected correlated result")
}
}