feat: migrate configuration from custom format to YAML

- Replace custom directive-based config parser with YAML using gopkg.in/yaml.v3
- Rename config.example.conf to config.example.yml with YAML syntax
- Update default config path to /etc/logcorrelator/logcorrelator.yml
- Update Dockerfile.package to copy YAML config files
- Update packaging scripts to install logcorrelator.yml
- Update architecture.yml to document YAML configuration
- Add yaml.v3 dependency to go.mod

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Jacquin Antoine
2026-02-27 15:51:25 +01:00
parent 0d84a1284f
commit 37f9c21672
7 changed files with 163 additions and 263 deletions

View File

@ -1,99 +1,112 @@
package config
import (
"bufio"
"fmt"
"os"
"strconv"
"strings"
"time"
"gopkg.in/yaml.v3"
)
// Config holds the complete application configuration.
type Config struct {
Service ServiceConfig
Inputs InputsConfig
Outputs OutputsConfig
Correlation CorrelationConfig
Service ServiceConfig `yaml:"service"`
Inputs InputsConfig `yaml:"inputs"`
Outputs OutputsConfig `yaml:"outputs"`
Correlation CorrelationConfig `yaml:"correlation"`
}
// ServiceConfig holds service-level configuration.
type ServiceConfig struct {
Name string
Language string
Name string `yaml:"name"`
Language string `yaml:"language"`
}
// InputsConfig holds input sources configuration.
type InputsConfig struct {
UnixSockets []UnixSocketConfig
UnixSockets []UnixSocketConfig `yaml:"unix_sockets"`
}
// UnixSocketConfig holds a Unix socket source configuration.
type UnixSocketConfig struct {
Name string
Path string
Format string
Name string `yaml:"name"`
Path string `yaml:"path"`
Format string `yaml:"format"`
}
// OutputsConfig holds output sinks configuration.
type OutputsConfig struct {
File FileOutputConfig
ClickHouse ClickHouseOutputConfig
Stdout StdoutOutputConfig
File FileOutputConfig `yaml:"file"`
ClickHouse ClickHouseOutputConfig `yaml:"clickhouse"`
Stdout StdoutOutputConfig `yaml:"stdout"`
}
// FileOutputConfig holds file sink configuration.
type FileOutputConfig struct {
Enabled bool
Path string
Enabled bool `yaml:"enabled"`
Path string `yaml:"path"`
}
// ClickHouseOutputConfig holds ClickHouse sink configuration.
type ClickHouseOutputConfig struct {
Enabled bool
DSN string
Table string
BatchSize int
FlushIntervalMs int
MaxBufferSize int
DropOnOverflow bool
AsyncInsert bool
TimeoutMs int
Enabled bool `yaml:"enabled"`
DSN string `yaml:"dsn"`
Table string `yaml:"table"`
BatchSize int `yaml:"batch_size"`
FlushIntervalMs int `yaml:"flush_interval_ms"`
MaxBufferSize int `yaml:"max_buffer_size"`
DropOnOverflow bool `yaml:"drop_on_overflow"`
AsyncInsert bool `yaml:"async_insert"`
TimeoutMs int `yaml:"timeout_ms"`
}
// StdoutOutputConfig holds stdout sink configuration.
type StdoutOutputConfig struct {
Enabled bool
Enabled bool `yaml:"enabled"`
}
// CorrelationConfig holds correlation configuration.
type CorrelationConfig struct {
Key []string
TimeWindow TimeWindowConfig
OrphanPolicy OrphanPolicyConfig
Key []string `yaml:"key"`
TimeWindow TimeWindowConfig `yaml:"time_window"`
OrphanPolicy OrphanPolicyConfig `yaml:"orphan_policy"`
}
// TimeWindowConfig holds time window configuration.
type TimeWindowConfig struct {
Value int
Unit string
Value int `yaml:"value"`
Unit string `yaml:"unit"`
}
// OrphanPolicyConfig holds orphan event policy configuration.
type OrphanPolicyConfig struct {
ApacheAlwaysEmit bool
NetworkEmit bool
ApacheAlwaysEmit bool `yaml:"apache_always_emit"`
NetworkEmit bool `yaml:"network_emit"`
}
// Load loads configuration from a text file with directives.
// Load loads configuration from a YAML file.
func Load(path string) (*Config, error) {
file, err := os.Open(path)
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to open config file: %w", err)
return nil, fmt.Errorf("failed to read config file: %w", err)
}
defer file.Close()
cfg := &Config{
cfg := defaultConfig()
if err := yaml.Unmarshal(data, cfg); err != nil {
return nil, fmt.Errorf("failed to parse config file: %w", err)
}
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
return cfg, nil
}
// defaultConfig returns a Config with default values.
func defaultConfig() *Config {
return &Config{
Service: ServiceConfig{
Name: "logcorrelator",
Language: "go",
@ -131,171 +144,6 @@ func Load(path string) (*Config, error) {
},
},
}
scanner := bufio.NewScanner(file)
lineNum := 0
for scanner.Scan() {
lineNum++
line := strings.TrimSpace(scanner.Text())
// Skip empty lines and comments
if line == "" || strings.HasPrefix(line, "#") {
continue
}
if err := parseDirective(cfg, line); err != nil {
return nil, fmt.Errorf("line %d: %w", lineNum, err)
}
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("failed to read config file: %w", err)
}
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
return cfg, nil
}
func parseDirective(cfg *Config, line string) error {
parts := strings.Fields(line)
if len(parts) < 2 {
return fmt.Errorf("invalid directive: %s", line)
}
directive := parts[0]
value := strings.Join(parts[1:], " ")
switch directive {
case "service.name":
cfg.Service.Name = value
case "service.language":
cfg.Service.Language = value
case "input.unix_socket":
// Format: input.unix_socket <name> <path> [format]
if len(parts) < 3 {
return fmt.Errorf("input.unix_socket requires name and path")
}
format := "json"
if len(parts) >= 4 {
format = parts[3]
}
cfg.Inputs.UnixSockets = append(cfg.Inputs.UnixSockets, UnixSocketConfig{
Name: parts[1],
Path: parts[2],
Format: format,
})
case "output.file.enabled":
enabled, err := parseBool(value)
if err != nil {
return fmt.Errorf("invalid value for output.file.enabled: %w", err)
}
cfg.Outputs.File.Enabled = enabled
case "output.file.path":
cfg.Outputs.File.Path = value
case "output.clickhouse.enabled":
enabled, err := parseBool(value)
if err != nil {
return fmt.Errorf("invalid value for output.clickhouse.enabled: %w", err)
}
cfg.Outputs.ClickHouse.Enabled = enabled
case "output.clickhouse.dsn":
cfg.Outputs.ClickHouse.DSN = value
case "output.clickhouse.table":
cfg.Outputs.ClickHouse.Table = value
case "output.clickhouse.batch_size":
v, err := strconv.Atoi(value)
if err != nil {
return fmt.Errorf("invalid value for output.clickhouse.batch_size: %w", err)
}
cfg.Outputs.ClickHouse.BatchSize = v
case "output.clickhouse.flush_interval_ms":
v, err := strconv.Atoi(value)
if err != nil {
return fmt.Errorf("invalid value for output.clickhouse.flush_interval_ms: %w", err)
}
cfg.Outputs.ClickHouse.FlushIntervalMs = v
case "output.clickhouse.max_buffer_size":
v, err := strconv.Atoi(value)
if err != nil {
return fmt.Errorf("invalid value for output.clickhouse.max_buffer_size: %w", err)
}
cfg.Outputs.ClickHouse.MaxBufferSize = v
case "output.clickhouse.drop_on_overflow":
enabled, err := parseBool(value)
if err != nil {
return fmt.Errorf("invalid value for output.clickhouse.drop_on_overflow: %w", err)
}
cfg.Outputs.ClickHouse.DropOnOverflow = enabled
case "output.clickhouse.async_insert":
enabled, err := parseBool(value)
if err != nil {
return fmt.Errorf("invalid value for output.clickhouse.async_insert: %w", err)
}
cfg.Outputs.ClickHouse.AsyncInsert = enabled
case "output.clickhouse.timeout_ms":
v, err := strconv.Atoi(value)
if err != nil {
return fmt.Errorf("invalid value for output.clickhouse.timeout_ms: %w", err)
}
cfg.Outputs.ClickHouse.TimeoutMs = v
case "output.stdout.enabled":
enabled, err := parseBool(value)
if err != nil {
return fmt.Errorf("invalid value for output.stdout.enabled: %w", err)
}
cfg.Outputs.Stdout.Enabled = enabled
case "correlation.key":
cfg.Correlation.Key = strings.Split(value, ",")
for i, k := range cfg.Correlation.Key {
cfg.Correlation.Key[i] = strings.TrimSpace(k)
}
case "correlation.time_window.value":
v, err := strconv.Atoi(value)
if err != nil {
return fmt.Errorf("invalid value for correlation.time_window.value: %w", err)
}
cfg.Correlation.TimeWindow.Value = v
case "correlation.time_window.unit":
cfg.Correlation.TimeWindow.Unit = value
case "correlation.orphan_policy.apache_always_emit":
enabled, err := parseBool(value)
if err != nil {
return fmt.Errorf("invalid value for correlation.orphan_policy.apache_always_emit: %w", err)
}
cfg.Correlation.OrphanPolicy.ApacheAlwaysEmit = enabled
case "correlation.orphan_policy.network_emit":
enabled, err := parseBool(value)
if err != nil {
return fmt.Errorf("invalid value for correlation.orphan_policy.network_emit: %w", err)
}
cfg.Correlation.OrphanPolicy.NetworkEmit = enabled
default:
return fmt.Errorf("unknown directive: %s", directive)
}
return nil
}
func parseBool(s string) (bool, error) {
s = strings.ToLower(s)
switch s {
case "true", "yes", "1", "on":
return true, nil
case "false", "no", "0", "off":
return false, nil
default:
return false, fmt.Errorf("invalid boolean value: %s", s)
}
}
// Validate validates the configuration.