package config import ( "fmt" "os" "strings" "time" "gopkg.in/yaml.v3" ) // Config holds the complete application configuration. type Config struct { Service ServiceConfig `yaml:"service"` Inputs InputsConfig `yaml:"inputs"` Outputs OutputsConfig `yaml:"outputs"` Correlation CorrelationConfig `yaml:"correlation"` } // ServiceConfig holds service-level configuration. type ServiceConfig struct { Name string `yaml:"name"` Language string `yaml:"language"` } // InputsConfig holds input sources configuration. type InputsConfig struct { UnixSockets []UnixSocketConfig `yaml:"unix_sockets"` } // UnixSocketConfig holds a Unix socket source configuration. type UnixSocketConfig struct { Name string `yaml:"name"` Path string `yaml:"path"` Format string `yaml:"format"` } // OutputsConfig holds output sinks configuration. type OutputsConfig struct { File FileOutputConfig `yaml:"file"` ClickHouse ClickHouseOutputConfig `yaml:"clickhouse"` Stdout StdoutOutputConfig `yaml:"stdout"` } // FileOutputConfig holds file sink configuration. type FileOutputConfig struct { Enabled bool `yaml:"enabled"` Path string `yaml:"path"` } // ClickHouseOutputConfig holds ClickHouse sink configuration. type ClickHouseOutputConfig struct { Enabled bool `yaml:"enabled"` DSN string `yaml:"dsn"` Table string `yaml:"table"` BatchSize int `yaml:"batch_size"` FlushIntervalMs int `yaml:"flush_interval_ms"` MaxBufferSize int `yaml:"max_buffer_size"` DropOnOverflow bool `yaml:"drop_on_overflow"` AsyncInsert bool `yaml:"async_insert"` TimeoutMs int `yaml:"timeout_ms"` } // StdoutOutputConfig holds stdout sink configuration. type StdoutOutputConfig struct { Enabled bool `yaml:"enabled"` } // CorrelationConfig holds correlation configuration. type CorrelationConfig struct { Key []string `yaml:"key"` TimeWindow TimeWindowConfig `yaml:"time_window"` OrphanPolicy OrphanPolicyConfig `yaml:"orphan_policy"` } // TimeWindowConfig holds time window configuration. type TimeWindowConfig struct { Value int `yaml:"value"` Unit string `yaml:"unit"` } // OrphanPolicyConfig holds orphan event policy configuration. type OrphanPolicyConfig struct { ApacheAlwaysEmit bool `yaml:"apache_always_emit"` NetworkEmit bool `yaml:"network_emit"` } // Load loads configuration from a YAML file. func Load(path string) (*Config, error) { data, err := os.ReadFile(path) if err != nil { return nil, fmt.Errorf("failed to read config file: %w", err) } cfg := defaultConfig() if err := yaml.Unmarshal(data, cfg); err != nil { return nil, fmt.Errorf("failed to parse config file: %w", err) } if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("invalid config: %w", err) } return cfg, nil } // defaultConfig returns a Config with default values. func defaultConfig() *Config { return &Config{ Service: ServiceConfig{ Name: "logcorrelator", Language: "go", }, Inputs: InputsConfig{ UnixSockets: make([]UnixSocketConfig, 0), }, Outputs: OutputsConfig{ File: FileOutputConfig{ Enabled: true, Path: "/var/log/logcorrelator/correlated.log", }, ClickHouse: ClickHouseOutputConfig{ Enabled: false, BatchSize: 500, FlushIntervalMs: 200, MaxBufferSize: 5000, DropOnOverflow: true, AsyncInsert: true, TimeoutMs: 1000, }, Stdout: StdoutOutputConfig{ Enabled: false, }, }, Correlation: CorrelationConfig{ Key: []string{"src_ip", "src_port"}, TimeWindow: TimeWindowConfig{ Value: 1, Unit: "s", }, OrphanPolicy: OrphanPolicyConfig{ ApacheAlwaysEmit: true, NetworkEmit: false, }, }, } } // Validate validates the configuration. func (c *Config) Validate() error { if len(c.Inputs.UnixSockets) < 2 { return fmt.Errorf("at least two unix socket inputs are required") } seenNames := make(map[string]struct{}, len(c.Inputs.UnixSockets)) seenPaths := make(map[string]struct{}, len(c.Inputs.UnixSockets)) for i, input := range c.Inputs.UnixSockets { if strings.TrimSpace(input.Name) == "" { return fmt.Errorf("inputs.unix_sockets[%d].name is required", i) } if strings.TrimSpace(input.Path) == "" { return fmt.Errorf("inputs.unix_sockets[%d].path is required", i) } if _, exists := seenNames[input.Name]; exists { return fmt.Errorf("duplicate unix socket input name: %s", input.Name) } seenNames[input.Name] = struct{}{} if _, exists := seenPaths[input.Path]; exists { return fmt.Errorf("duplicate unix socket input path: %s", input.Path) } seenPaths[input.Path] = struct{}{} } if !c.Outputs.File.Enabled && !c.Outputs.ClickHouse.Enabled && !c.Outputs.Stdout.Enabled { return fmt.Errorf("at least one output must be enabled") } if c.Outputs.File.Enabled && strings.TrimSpace(c.Outputs.File.Path) == "" { return fmt.Errorf("file output path is required when file output is enabled") } if c.Outputs.ClickHouse.Enabled { if strings.TrimSpace(c.Outputs.ClickHouse.DSN) == "" { return fmt.Errorf("clickhouse DSN is required when enabled") } if strings.TrimSpace(c.Outputs.ClickHouse.Table) == "" { return fmt.Errorf("clickhouse table is required when enabled") } if c.Outputs.ClickHouse.BatchSize <= 0 { return fmt.Errorf("clickhouse batch_size must be > 0") } if c.Outputs.ClickHouse.MaxBufferSize <= 0 { return fmt.Errorf("clickhouse max_buffer_size must be > 0") } if c.Outputs.ClickHouse.TimeoutMs <= 0 { return fmt.Errorf("clickhouse timeout_ms must be > 0") } } if len(c.Correlation.Key) == 0 { return fmt.Errorf("correlation.key cannot be empty") } if c.Correlation.TimeWindow.Value <= 0 { return fmt.Errorf("correlation.time_window.value must be > 0") } return nil } // GetTimeWindow returns the time window as a duration. func (c *CorrelationConfig) GetTimeWindow() time.Duration { value := c.TimeWindow.Value if value <= 0 { value = 1 } unit := c.TimeWindow.Unit if unit == "" { unit = "s" } switch unit { case "ms", "millisecond", "milliseconds": return time.Duration(value) * time.Millisecond case "s", "second", "seconds": return time.Duration(value) * time.Second case "m", "minute", "minutes": return time.Duration(value) * time.Minute default: return time.Duration(value) * time.Second } }