- fix: add missing ClickHouse driver dependency - fix: resolve race condition in orchestrator (single goroutine per source) - feat: add explicit source_type config for Unix socket sources - test: improve coverage from 50.6% to 62.0% - docs: add CHANGELOG.md with release notes - build: update version to 1.0.2 in build scripts and Dockerfiles Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
486 lines
10 KiB
Go
486 lines
10 KiB
Go
package config
|
|
|
|
import (
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestLoad_ValidConfig(t *testing.T) {
|
|
content := `
|
|
service:
|
|
name: logcorrelator
|
|
language: go
|
|
|
|
inputs:
|
|
unix_sockets:
|
|
- name: apache_source
|
|
path: /var/run/logcorrelator/apache.sock
|
|
format: json
|
|
- name: network_source
|
|
path: /var/run/logcorrelator/network.sock
|
|
format: json
|
|
|
|
outputs:
|
|
file:
|
|
enabled: true
|
|
path: /var/log/logcorrelator/correlated.log
|
|
clickhouse:
|
|
enabled: false
|
|
dsn: clickhouse://user:pass@localhost:9000/db
|
|
table: correlated_logs
|
|
|
|
correlation:
|
|
key:
|
|
- src_ip
|
|
- src_port
|
|
time_window:
|
|
value: 1
|
|
unit: s
|
|
orphan_policy:
|
|
apache_always_emit: true
|
|
network_emit: false
|
|
`
|
|
|
|
tmpDir := t.TempDir()
|
|
configPath := filepath.Join(tmpDir, "config.yml")
|
|
if err := os.WriteFile(configPath, []byte(content), 0644); err != nil {
|
|
t.Fatalf("failed to write config: %v", err)
|
|
}
|
|
|
|
cfg, err := Load(configPath)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
|
|
if cfg.Service.Name != "logcorrelator" {
|
|
t.Errorf("expected service name logcorrelator, got %s", cfg.Service.Name)
|
|
}
|
|
if len(cfg.Inputs.UnixSockets) != 2 {
|
|
t.Errorf("expected 2 unix sockets, got %d", len(cfg.Inputs.UnixSockets))
|
|
}
|
|
if !cfg.Outputs.File.Enabled {
|
|
t.Error("expected file output enabled")
|
|
}
|
|
}
|
|
|
|
func TestLoad_InvalidPath(t *testing.T) {
|
|
_, err := Load("/nonexistent/path/config.yml")
|
|
if err == nil {
|
|
t.Error("expected error for nonexistent path")
|
|
}
|
|
}
|
|
|
|
func TestLoad_InvalidYAML(t *testing.T) {
|
|
tmpDir := t.TempDir()
|
|
configPath := filepath.Join(tmpDir, "config.yml")
|
|
content := `invalid: yaml: content: [`
|
|
if err := os.WriteFile(configPath, []byte(content), 0644); err != nil {
|
|
t.Fatalf("failed to write config: %v", err)
|
|
}
|
|
|
|
_, err := Load(configPath)
|
|
if err == nil {
|
|
t.Error("expected error for invalid YAML")
|
|
}
|
|
}
|
|
|
|
func TestLoad_DefaultValues(t *testing.T) {
|
|
tmpDir := t.TempDir()
|
|
configPath := filepath.Join(tmpDir, "config.yml")
|
|
content := `
|
|
service:
|
|
name: test-service
|
|
inputs:
|
|
unix_sockets:
|
|
- name: a
|
|
path: /tmp/a.sock
|
|
- name: b
|
|
path: /tmp/b.sock
|
|
outputs:
|
|
file:
|
|
enabled: true
|
|
`
|
|
if err := os.WriteFile(configPath, []byte(content), 0644); err != nil {
|
|
t.Fatalf("failed to write config: %v", err)
|
|
}
|
|
|
|
cfg, err := Load(configPath)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
|
|
if cfg.Service.Name != "test-service" {
|
|
t.Errorf("expected service name test-service, got %s", cfg.Service.Name)
|
|
}
|
|
// Check defaults
|
|
if cfg.Correlation.TimeWindow.Value != 1 {
|
|
t.Errorf("expected default time window value 1, got %d", cfg.Correlation.TimeWindow.Value)
|
|
}
|
|
if cfg.Correlation.OrphanPolicy.ApacheAlwaysEmit != true {
|
|
t.Error("expected default apache_always_emit to be true")
|
|
}
|
|
}
|
|
|
|
func TestValidate_MinimumInputs(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "only_one", Path: "/tmp/test.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: true},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for less than 2 inputs")
|
|
}
|
|
}
|
|
|
|
func TestValidate_AtLeastOneOutput(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "a", Path: "/tmp/a.sock"},
|
|
{Name: "b", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: false},
|
|
ClickHouse: ClickHouseOutputConfig{Enabled: false},
|
|
Stdout: StdoutOutputConfig{Enabled: false},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for no outputs enabled")
|
|
}
|
|
}
|
|
|
|
func TestGetTimeWindow(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
config CorrelationConfig
|
|
expected time.Duration
|
|
}{
|
|
{
|
|
name: "seconds",
|
|
config: CorrelationConfig{
|
|
TimeWindow: TimeWindowConfig{Value: 1, Unit: "s"},
|
|
},
|
|
expected: time.Second,
|
|
},
|
|
{
|
|
name: "milliseconds",
|
|
config: CorrelationConfig{
|
|
TimeWindow: TimeWindowConfig{Value: 500, Unit: "ms"},
|
|
},
|
|
expected: 500 * time.Millisecond,
|
|
},
|
|
{
|
|
name: "minutes",
|
|
config: CorrelationConfig{
|
|
TimeWindow: TimeWindowConfig{Value: 2, Unit: "m"},
|
|
},
|
|
expected: 2 * time.Minute,
|
|
},
|
|
{
|
|
name: "default",
|
|
config: CorrelationConfig{
|
|
TimeWindow: TimeWindowConfig{},
|
|
},
|
|
expected: time.Second,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
result := tt.config.GetTimeWindow()
|
|
if result != tt.expected {
|
|
t.Errorf("expected %v, got %v", tt.expected, result)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestValidate_DuplicateNames(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "same", Path: "/tmp/a.sock"},
|
|
{Name: "same", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: true},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for duplicate names")
|
|
}
|
|
}
|
|
|
|
func TestValidate_DuplicatePaths(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "a", Path: "/tmp/same.sock"},
|
|
{Name: "b", Path: "/tmp/same.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: true},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for duplicate paths")
|
|
}
|
|
}
|
|
|
|
func TestValidate_EmptyName(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "", Path: "/tmp/a.sock"},
|
|
{Name: "b", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: true},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for empty name")
|
|
}
|
|
}
|
|
|
|
func TestValidate_EmptyPath(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "a", Path: ""},
|
|
{Name: "b", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: true},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for empty path")
|
|
}
|
|
}
|
|
|
|
func TestValidate_EmptyFilePath(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "a", Path: "/tmp/a.sock"},
|
|
{Name: "b", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: true, Path: ""},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for empty file path")
|
|
}
|
|
}
|
|
|
|
func TestValidate_ClickHouseMissingDSN(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "a", Path: "/tmp/a.sock"},
|
|
{Name: "b", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: false},
|
|
ClickHouse: ClickHouseOutputConfig{
|
|
Enabled: true,
|
|
DSN: "",
|
|
Table: "test",
|
|
},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for missing ClickHouse DSN")
|
|
}
|
|
}
|
|
|
|
func TestValidate_ClickHouseMissingTable(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "a", Path: "/tmp/a.sock"},
|
|
{Name: "b", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: false},
|
|
ClickHouse: ClickHouseOutputConfig{
|
|
Enabled: true,
|
|
DSN: "clickhouse://localhost:9000/db",
|
|
Table: "",
|
|
},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for missing ClickHouse table")
|
|
}
|
|
}
|
|
|
|
func TestValidate_ClickHouseInvalidBatchSize(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "a", Path: "/tmp/a.sock"},
|
|
{Name: "b", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: false},
|
|
ClickHouse: ClickHouseOutputConfig{
|
|
Enabled: true,
|
|
DSN: "clickhouse://localhost:9000/db",
|
|
Table: "test",
|
|
BatchSize: 0,
|
|
},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for invalid batch size")
|
|
}
|
|
}
|
|
|
|
func TestValidate_ClickHouseInvalidMaxBufferSize(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "a", Path: "/tmp/a.sock"},
|
|
{Name: "b", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: false},
|
|
ClickHouse: ClickHouseOutputConfig{
|
|
Enabled: true,
|
|
DSN: "clickhouse://localhost:9000/db",
|
|
Table: "test",
|
|
BatchSize: 100,
|
|
MaxBufferSize: 0,
|
|
},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for invalid max buffer size")
|
|
}
|
|
}
|
|
|
|
func TestValidate_ClickHouseInvalidTimeout(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "a", Path: "/tmp/a.sock"},
|
|
{Name: "b", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: false},
|
|
ClickHouse: ClickHouseOutputConfig{
|
|
Enabled: true,
|
|
DSN: "clickhouse://localhost:9000/db",
|
|
Table: "test",
|
|
BatchSize: 100,
|
|
TimeoutMs: 0,
|
|
},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for invalid timeout")
|
|
}
|
|
}
|
|
|
|
func TestValidate_EmptyCorrelationKey(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "a", Path: "/tmp/a.sock"},
|
|
{Name: "b", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: true},
|
|
},
|
|
Correlation: CorrelationConfig{
|
|
Key: []string{},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for empty correlation key")
|
|
}
|
|
}
|
|
|
|
func TestValidate_InvalidTimeWindow(t *testing.T) {
|
|
cfg := &Config{
|
|
Inputs: InputsConfig{
|
|
UnixSockets: []UnixSocketConfig{
|
|
{Name: "a", Path: "/tmp/a.sock"},
|
|
{Name: "b", Path: "/tmp/b.sock"},
|
|
},
|
|
},
|
|
Outputs: OutputsConfig{
|
|
File: FileOutputConfig{Enabled: true},
|
|
},
|
|
Correlation: CorrelationConfig{
|
|
Key: []string{"src_ip", "src_port"},
|
|
TimeWindow: TimeWindowConfig{Value: 0},
|
|
},
|
|
}
|
|
|
|
err := cfg.Validate()
|
|
if err == nil {
|
|
t.Error("expected error for invalid time window")
|
|
}
|
|
}
|
|
|
|
func TestGetTimeWindow_UnknownUnit(t *testing.T) {
|
|
config := CorrelationConfig{
|
|
TimeWindow: TimeWindowConfig{Value: 5, Unit: "unknown"},
|
|
}
|
|
result := config.GetTimeWindow()
|
|
expected := 5 * time.Second // Should default to seconds
|
|
if result != expected {
|
|
t.Errorf("expected %v, got %v", expected, result)
|
|
}
|
|
}
|