chore: version 1.0.6 - simplify YAML configuration
- Remove service.name and service.language (unused) - Remove enabled flags on outputs (presence = enabled) - Simplify correlation config: time_window_s (integer) instead of nested object - Simplify orphan_policy to emit_orphans boolean - Rename apache socket to http.socket - Add socket_permissions option for unix sockets (default: 0660) - Update tests for new configuration format Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
15
CHANGELOG.md
15
CHANGELOG.md
@ -5,6 +5,21 @@ All notable changes to logcorrelator are documented in this file.
|
|||||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
|
## [1.0.6] - 2026-03-01
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Configuration YAML simplified: removed `service.name`, `service.language`, `enabled` flags
|
||||||
|
- Correlation config simplified: `time_window_s` (integer) instead of nested `time_window` object
|
||||||
|
- Orphan policy simplified: `emit_orphans` boolean instead of `orphan_policy` object
|
||||||
|
- Apache socket renamed to `http.socket`
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
- `socket_permissions` option on unix sockets to configure file permissions (default: `0660`)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## [1.0.4] - 2026-03-01
|
## [1.0.4] - 2026-03-01
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|||||||
2
Makefile
2
Makefile
@ -15,7 +15,7 @@ BINARY_NAME=logcorrelator
|
|||||||
DIST_DIR=dist
|
DIST_DIR=dist
|
||||||
|
|
||||||
# Package version
|
# Package version
|
||||||
PKG_VERSION ?= 1.0.4
|
PKG_VERSION ?= 1.0.6
|
||||||
|
|
||||||
## build: Build the logcorrelator binary locally
|
## build: Build the logcorrelator binary locally
|
||||||
build:
|
build:
|
||||||
|
|||||||
@ -71,9 +71,9 @@ docker run -d \
|
|||||||
make package-rpm
|
make package-rpm
|
||||||
|
|
||||||
# Installer le package RPM (Rocky Linux 8/9/10)
|
# Installer le package RPM (Rocky Linux 8/9/10)
|
||||||
sudo dnf install -y dist/rpm/rocky8/logcorrelator-1.0.3-1.el8.x86_64.rpm
|
sudo dnf install -y dist/rpm/rocky8/logcorrelator-1.0.6-1.el8.x86_64.rpm
|
||||||
sudo dnf install -y dist/rpm/rocky9/logcorrelator-1.0.3-1.el9.x86_64.rpm
|
sudo dnf install -y dist/rpm/rocky9/logcorrelator-1.0.6-1.el9.x86_64.rpm
|
||||||
sudo dnf install -y dist/rpm/almalinux10/logcorrelator-1.0.3-1.el10.x86_64.rpm
|
sudo dnf install -y dist/rpm/almalinux10/logcorrelator-1.0.6-1.el10.x86_64.rpm
|
||||||
|
|
||||||
# Activer et démarrer le service
|
# Activer et démarrer le service
|
||||||
sudo systemctl enable logcorrelator
|
sudo systemctl enable logcorrelator
|
||||||
|
|||||||
@ -76,46 +76,28 @@ config:
|
|||||||
stocké dans /etc/logcorrelator.
|
stocké dans /etc/logcorrelator.
|
||||||
reload_strategy: restart_service
|
reload_strategy: restart_service
|
||||||
example: |
|
example: |
|
||||||
# Service configuration
|
# Inputs - at least 2 unix sockets required
|
||||||
service:
|
|
||||||
name: logcorrelator
|
|
||||||
language: go
|
|
||||||
|
|
||||||
# Input sources (at least 2 required)
|
|
||||||
inputs:
|
inputs:
|
||||||
unix_sockets:
|
unix_sockets:
|
||||||
- name: apache_source
|
- name: http
|
||||||
path: /var/run/logcorrelator/apache.sock
|
path: /var/run/logcorrelator/http.socket
|
||||||
format: json
|
socket_permissions: "0660"
|
||||||
- name: network_source
|
- name: network
|
||||||
path: /var/run/logcorrelator/network.sock
|
path: /var/run/logcorrelator/network.socket
|
||||||
format: json
|
|
||||||
|
|
||||||
# File output
|
# Outputs
|
||||||
outputs:
|
outputs:
|
||||||
file:
|
file:
|
||||||
enabled: true
|
|
||||||
path: /var/log/logcorrelator/correlated.log
|
path: /var/log/logcorrelator/correlated.log
|
||||||
|
|
||||||
# ClickHouse output
|
|
||||||
outputs:
|
|
||||||
clickhouse:
|
clickhouse:
|
||||||
enabled: false
|
|
||||||
dsn: clickhouse://user:pass@localhost:9000/db
|
dsn: clickhouse://user:pass@localhost:9000/db
|
||||||
table: correlated_logs_http_network
|
table: correlated_logs
|
||||||
batch_size: 500
|
stdout: false
|
||||||
|
|
||||||
# Correlation configuration
|
# Correlation (optional)
|
||||||
correlation:
|
correlation:
|
||||||
key:
|
time_window_s: 1
|
||||||
- src_ip
|
emit_orphans: true
|
||||||
- src_port
|
|
||||||
time_window:
|
|
||||||
value: 1
|
|
||||||
unit: s
|
|
||||||
orphan_policy:
|
|
||||||
apache_always_emit: true
|
|
||||||
network_emit: false
|
|
||||||
|
|
||||||
inputs:
|
inputs:
|
||||||
description: >
|
description: >
|
||||||
|
|||||||
@ -39,7 +39,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize logger
|
// Initialize logger
|
||||||
logger := observability.NewLogger(cfg.Service.Name)
|
logger := observability.NewLogger("logcorrelator")
|
||||||
|
|
||||||
logger.Info(fmt.Sprintf("Starting logcorrelator version %s", Version))
|
logger.Info(fmt.Sprintf("Starting logcorrelator version %s", Version))
|
||||||
|
|
||||||
@ -50,15 +50,16 @@ func main() {
|
|||||||
Name: inputCfg.Name,
|
Name: inputCfg.Name,
|
||||||
Path: inputCfg.Path,
|
Path: inputCfg.Path,
|
||||||
SourceType: inputCfg.SourceType,
|
SourceType: inputCfg.SourceType,
|
||||||
|
SocketPermissions: inputCfg.GetSocketPermissions(),
|
||||||
})
|
})
|
||||||
sources = append(sources, source)
|
sources = append(sources, source)
|
||||||
logger.Info(fmt.Sprintf("Configured input source: name=%s, path=%s", inputCfg.Name, inputCfg.Path))
|
logger.Info(fmt.Sprintf("Configured input source: name=%s, path=%s, permissions=%o", inputCfg.Name, inputCfg.Path, inputCfg.GetSocketPermissions()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create sinks
|
// Create sinks
|
||||||
sinks := make([]ports.CorrelatedLogSink, 0)
|
sinks := make([]ports.CorrelatedLogSink, 0)
|
||||||
|
|
||||||
if cfg.Outputs.File.Enabled {
|
if cfg.Outputs.File.Path != "" {
|
||||||
fileSink, err := file.NewFileSink(file.Config{
|
fileSink, err := file.NewFileSink(file.Config{
|
||||||
Path: cfg.Outputs.File.Path,
|
Path: cfg.Outputs.File.Path,
|
||||||
})
|
})
|
||||||
@ -89,9 +90,9 @@ func main() {
|
|||||||
logger.Info(fmt.Sprintf("Configured ClickHouse sink: table=%s", cfg.Outputs.ClickHouse.Table))
|
logger.Info(fmt.Sprintf("Configured ClickHouse sink: table=%s", cfg.Outputs.ClickHouse.Table))
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Outputs.Stdout.Enabled {
|
if cfg.Outputs.Stdout {
|
||||||
stdoutSink := stdout.NewStdoutSink(stdout.Config{
|
stdoutSink := stdout.NewStdoutSink(stdout.Config{
|
||||||
Enabled: cfg.Outputs.Stdout.Enabled,
|
Enabled: true,
|
||||||
})
|
})
|
||||||
sinks = append(sinks, stdoutSink)
|
sinks = append(sinks, stdoutSink)
|
||||||
logger.Info("Configured stdout sink")
|
logger.Info("Configured stdout sink")
|
||||||
@ -103,15 +104,14 @@ func main() {
|
|||||||
// Create correlation service
|
// Create correlation service
|
||||||
correlationSvc := domain.NewCorrelationService(domain.CorrelationConfig{
|
correlationSvc := domain.NewCorrelationService(domain.CorrelationConfig{
|
||||||
TimeWindow: cfg.Correlation.GetTimeWindow(),
|
TimeWindow: cfg.Correlation.GetTimeWindow(),
|
||||||
ApacheAlwaysEmit: cfg.Correlation.OrphanPolicy.ApacheAlwaysEmit,
|
ApacheAlwaysEmit: cfg.Correlation.EmitOrphans,
|
||||||
NetworkEmit: cfg.Correlation.OrphanPolicy.NetworkEmit,
|
NetworkEmit: false,
|
||||||
MaxBufferSize: domain.DefaultMaxBufferSize,
|
MaxBufferSize: domain.DefaultMaxBufferSize,
|
||||||
}, &domain.RealTimeProvider{})
|
}, &domain.RealTimeProvider{})
|
||||||
|
|
||||||
logger.Info(fmt.Sprintf("Correlation service initialized: time_window=%s, apache_always_emit=%v, network_emit=%v",
|
logger.Info(fmt.Sprintf("Correlation service initialized: time_window=%s, emit_orphans=%v",
|
||||||
cfg.Correlation.GetTimeWindow().String(),
|
cfg.Correlation.GetTimeWindow().String(),
|
||||||
cfg.Correlation.OrphanPolicy.ApacheAlwaysEmit,
|
cfg.Correlation.EmitOrphans))
|
||||||
cfg.Correlation.OrphanPolicy.NetworkEmit))
|
|
||||||
|
|
||||||
// Create orchestrator
|
// Create orchestrator
|
||||||
orchestrator := app.NewOrchestrator(app.OrchestratorConfig{
|
orchestrator := app.NewOrchestrator(app.OrchestratorConfig{
|
||||||
|
|||||||
@ -1,47 +1,28 @@
|
|||||||
# logcorrelator configuration file
|
# logcorrelator configuration file
|
||||||
# Format: YAML
|
# Format: YAML
|
||||||
|
|
||||||
service:
|
|
||||||
name: logcorrelator
|
|
||||||
language: go
|
|
||||||
|
|
||||||
inputs:
|
inputs:
|
||||||
unix_sockets:
|
unix_sockets:
|
||||||
- name: apache_source
|
- name: http
|
||||||
path: /var/run/logcorrelator/apache.sock
|
path: /var/run/logcorrelator/http.socket
|
||||||
format: json
|
format: json
|
||||||
source_type: A # Explicit source type: "A" for Apache/HTTP, "B" for Network
|
socket_permissions: "0660" # owner + group read/write
|
||||||
- name: network_source
|
- name: network
|
||||||
path: /var/run/logcorrelator/network.sock
|
path: /var/run/logcorrelator/network.socket
|
||||||
format: json
|
format: json
|
||||||
source_type: B # If not specified, auto-detection based on header_* fields
|
socket_permissions: "0660"
|
||||||
|
|
||||||
outputs:
|
outputs:
|
||||||
file:
|
file:
|
||||||
enabled: true
|
|
||||||
path: /var/log/logcorrelator/correlated.log
|
path: /var/log/logcorrelator/correlated.log
|
||||||
|
|
||||||
clickhouse:
|
clickhouse:
|
||||||
enabled: false
|
|
||||||
dsn: clickhouse://user:pass@localhost:9000/db
|
dsn: clickhouse://user:pass@localhost:9000/db
|
||||||
table: correlated_logs_http_network
|
table: correlated_logs_http_network
|
||||||
batch_size: 500
|
|
||||||
flush_interval_ms: 200
|
|
||||||
max_buffer_size: 5000
|
|
||||||
drop_on_overflow: true
|
|
||||||
async_insert: true
|
|
||||||
timeout_ms: 1000
|
|
||||||
|
|
||||||
stdout:
|
stdout: false
|
||||||
enabled: false
|
|
||||||
|
|
||||||
correlation:
|
correlation:
|
||||||
key:
|
time_window_s: 1
|
||||||
- src_ip
|
emit_orphans: true # http toujours émis, network jamais seul
|
||||||
- src_port
|
|
||||||
time_window:
|
|
||||||
value: 1
|
|
||||||
unit: s
|
|
||||||
orphan_policy:
|
|
||||||
apache_always_emit: true
|
|
||||||
network_emit: false
|
|
||||||
|
|||||||
@ -16,8 +16,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Default socket file permissions (owner + group read/write)
|
|
||||||
DefaultSocketPermissions os.FileMode = 0660
|
|
||||||
// Maximum line size for JSON logs (1MB)
|
// Maximum line size for JSON logs (1MB)
|
||||||
MaxLineSize = 1024 * 1024
|
MaxLineSize = 1024 * 1024
|
||||||
// Maximum concurrent connections per socket
|
// Maximum concurrent connections per socket
|
||||||
@ -31,6 +29,7 @@ type Config struct {
|
|||||||
Name string
|
Name string
|
||||||
Path string
|
Path string
|
||||||
SourceType string // "A" for Apache/HTTP, "B" for Network, "" for auto-detect
|
SourceType string // "A" for Apache/HTTP, "B" for Network, "" for auto-detect
|
||||||
|
SocketPermissions os.FileMode
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnixSocketSource reads JSON events from a Unix socket.
|
// UnixSocketSource reads JSON events from a Unix socket.
|
||||||
@ -83,7 +82,11 @@ func (s *UnixSocketSource) Start(ctx context.Context, eventChan chan<- *domain.N
|
|||||||
s.listener = listener
|
s.listener = listener
|
||||||
|
|
||||||
// Set permissions - fail if we can't
|
// Set permissions - fail if we can't
|
||||||
if err := os.Chmod(s.config.Path, DefaultSocketPermissions); err != nil {
|
permissions := s.config.SocketPermissions
|
||||||
|
if permissions == 0 {
|
||||||
|
permissions = 0660 // default
|
||||||
|
}
|
||||||
|
if err := os.Chmod(s.config.Path, permissions); err != nil {
|
||||||
_ = listener.Close()
|
_ = listener.Close()
|
||||||
_ = os.Remove(s.config.Path)
|
_ = os.Remove(s.config.Path)
|
||||||
return fmt.Errorf("failed to set socket permissions: %w", err)
|
return fmt.Errorf("failed to set socket permissions: %w", err)
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package config
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -11,7 +12,6 @@ import (
|
|||||||
|
|
||||||
// Config holds the complete application configuration.
|
// Config holds the complete application configuration.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Service ServiceConfig `yaml:"service"`
|
|
||||||
Inputs InputsConfig `yaml:"inputs"`
|
Inputs InputsConfig `yaml:"inputs"`
|
||||||
Outputs OutputsConfig `yaml:"outputs"`
|
Outputs OutputsConfig `yaml:"outputs"`
|
||||||
Correlation CorrelationConfig `yaml:"correlation"`
|
Correlation CorrelationConfig `yaml:"correlation"`
|
||||||
@ -34,18 +34,18 @@ type UnixSocketConfig struct {
|
|||||||
Path string `yaml:"path"`
|
Path string `yaml:"path"`
|
||||||
Format string `yaml:"format"`
|
Format string `yaml:"format"`
|
||||||
SourceType string `yaml:"source_type"` // "A" for Apache/HTTP, "B" for Network
|
SourceType string `yaml:"source_type"` // "A" for Apache/HTTP, "B" for Network
|
||||||
|
SocketPermissions string `yaml:"socket_permissions"` // octal string, e.g., "0660", "0666"
|
||||||
}
|
}
|
||||||
|
|
||||||
// OutputsConfig holds output sinks configuration.
|
// OutputsConfig holds output sinks configuration.
|
||||||
type OutputsConfig struct {
|
type OutputsConfig struct {
|
||||||
File FileOutputConfig `yaml:"file"`
|
File FileOutputConfig `yaml:"file"`
|
||||||
ClickHouse ClickHouseOutputConfig `yaml:"clickhouse"`
|
ClickHouse ClickHouseOutputConfig `yaml:"clickhouse"`
|
||||||
Stdout StdoutOutputConfig `yaml:"stdout"`
|
Stdout bool `yaml:"stdout"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// FileOutputConfig holds file sink configuration.
|
// FileOutputConfig holds file sink configuration.
|
||||||
type FileOutputConfig struct {
|
type FileOutputConfig struct {
|
||||||
Enabled bool `yaml:"enabled"`
|
|
||||||
Path string `yaml:"path"`
|
Path string `yaml:"path"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,27 +63,15 @@ type ClickHouseOutputConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// StdoutOutputConfig holds stdout sink configuration.
|
// StdoutOutputConfig holds stdout sink configuration.
|
||||||
|
// Deprecated: stdout is now a boolean flag in OutputsConfig.
|
||||||
type StdoutOutputConfig struct {
|
type StdoutOutputConfig struct {
|
||||||
Enabled bool `yaml:"enabled"`
|
Enabled bool `yaml:"enabled"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// CorrelationConfig holds correlation configuration.
|
// CorrelationConfig holds correlation configuration.
|
||||||
type CorrelationConfig struct {
|
type CorrelationConfig struct {
|
||||||
Key []string `yaml:"key"`
|
TimeWindowS int `yaml:"time_window_s"`
|
||||||
TimeWindow TimeWindowConfig `yaml:"time_window"`
|
EmitOrphans bool `yaml:"emit_orphans"`
|
||||||
OrphanPolicy OrphanPolicyConfig `yaml:"orphan_policy"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TimeWindowConfig holds time window configuration.
|
|
||||||
type TimeWindowConfig struct {
|
|
||||||
Value int `yaml:"value"`
|
|
||||||
Unit string `yaml:"unit"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// OrphanPolicyConfig holds orphan event policy configuration.
|
|
||||||
type OrphanPolicyConfig struct {
|
|
||||||
ApacheAlwaysEmit bool `yaml:"apache_always_emit"`
|
|
||||||
NetworkEmit bool `yaml:"network_emit"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load loads configuration from a YAML file.
|
// Load loads configuration from a YAML file.
|
||||||
@ -109,16 +97,11 @@ func Load(path string) (*Config, error) {
|
|||||||
// defaultConfig returns a Config with default values.
|
// defaultConfig returns a Config with default values.
|
||||||
func defaultConfig() *Config {
|
func defaultConfig() *Config {
|
||||||
return &Config{
|
return &Config{
|
||||||
Service: ServiceConfig{
|
|
||||||
Name: "logcorrelator",
|
|
||||||
Language: "go",
|
|
||||||
},
|
|
||||||
Inputs: InputsConfig{
|
Inputs: InputsConfig{
|
||||||
UnixSockets: make([]UnixSocketConfig, 0),
|
UnixSockets: make([]UnixSocketConfig, 0),
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{
|
File: FileOutputConfig{
|
||||||
Enabled: true,
|
|
||||||
Path: "/var/log/logcorrelator/correlated.log",
|
Path: "/var/log/logcorrelator/correlated.log",
|
||||||
},
|
},
|
||||||
ClickHouse: ClickHouseOutputConfig{
|
ClickHouse: ClickHouseOutputConfig{
|
||||||
@ -130,20 +113,11 @@ func defaultConfig() *Config {
|
|||||||
AsyncInsert: true,
|
AsyncInsert: true,
|
||||||
TimeoutMs: 1000,
|
TimeoutMs: 1000,
|
||||||
},
|
},
|
||||||
Stdout: StdoutOutputConfig{
|
Stdout: false,
|
||||||
Enabled: false,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Correlation: CorrelationConfig{
|
Correlation: CorrelationConfig{
|
||||||
Key: []string{"src_ip", "src_port"},
|
TimeWindowS: 1,
|
||||||
TimeWindow: TimeWindowConfig{
|
EmitOrphans: true,
|
||||||
Value: 1,
|
|
||||||
Unit: "s",
|
|
||||||
},
|
|
||||||
OrphanPolicy: OrphanPolicyConfig{
|
|
||||||
ApacheAlwaysEmit: true,
|
|
||||||
NetworkEmit: false,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -176,12 +150,20 @@ func (c *Config) Validate() error {
|
|||||||
seenPaths[input.Path] = struct{}{}
|
seenPaths[input.Path] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !c.Outputs.File.Enabled && !c.Outputs.ClickHouse.Enabled && !c.Outputs.Stdout.Enabled {
|
// At least one output must be enabled
|
||||||
return fmt.Errorf("at least one output must be enabled")
|
hasOutput := false
|
||||||
|
if c.Outputs.File.Path != "" {
|
||||||
|
hasOutput = true
|
||||||
|
}
|
||||||
|
if c.Outputs.ClickHouse.Enabled {
|
||||||
|
hasOutput = true
|
||||||
|
}
|
||||||
|
if c.Outputs.Stdout {
|
||||||
|
hasOutput = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.Outputs.File.Enabled && strings.TrimSpace(c.Outputs.File.Path) == "" {
|
if !hasOutput {
|
||||||
return fmt.Errorf("file output path is required when file output is enabled")
|
return fmt.Errorf("at least one output must be enabled (file, clickhouse, or stdout)")
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.Outputs.ClickHouse.Enabled {
|
if c.Outputs.ClickHouse.Enabled {
|
||||||
@ -202,11 +184,8 @@ func (c *Config) Validate() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(c.Correlation.Key) == 0 {
|
if c.Correlation.TimeWindowS <= 0 {
|
||||||
return fmt.Errorf("correlation.key cannot be empty")
|
return fmt.Errorf("correlation.time_window_s must be > 0")
|
||||||
}
|
|
||||||
if c.Correlation.TimeWindow.Value <= 0 {
|
|
||||||
return fmt.Errorf("correlation.time_window.value must be > 0")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -214,24 +193,25 @@ func (c *Config) Validate() error {
|
|||||||
|
|
||||||
// GetTimeWindow returns the time window as a duration.
|
// GetTimeWindow returns the time window as a duration.
|
||||||
func (c *CorrelationConfig) GetTimeWindow() time.Duration {
|
func (c *CorrelationConfig) GetTimeWindow() time.Duration {
|
||||||
value := c.TimeWindow.Value
|
value := c.TimeWindowS
|
||||||
if value <= 0 {
|
if value <= 0 {
|
||||||
value = 1
|
value = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
unit := c.TimeWindow.Unit
|
|
||||||
if unit == "" {
|
|
||||||
unit = "s"
|
|
||||||
}
|
|
||||||
|
|
||||||
switch unit {
|
|
||||||
case "ms", "millisecond", "milliseconds":
|
|
||||||
return time.Duration(value) * time.Millisecond
|
|
||||||
case "s", "second", "seconds":
|
|
||||||
return time.Duration(value) * time.Second
|
|
||||||
case "m", "minute", "minutes":
|
|
||||||
return time.Duration(value) * time.Minute
|
|
||||||
default:
|
|
||||||
return time.Duration(value) * time.Second
|
return time.Duration(value) * time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetSocketPermissions returns the socket permissions as os.FileMode.
|
||||||
|
// Default is 0660 (owner + group read/write).
|
||||||
|
func (c *UnixSocketConfig) GetSocketPermissions() os.FileMode {
|
||||||
|
if c.SocketPermissions == "" {
|
||||||
|
return 0660
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse octal string (e.g., "0660", "660", "0666")
|
||||||
|
perms, err := strconv.ParseUint(strings.TrimPrefix(c.SocketPermissions, "0"), 8, 32)
|
||||||
|
if err != nil {
|
||||||
|
return 0660
|
||||||
|
}
|
||||||
|
|
||||||
|
return os.FileMode(perms)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,38 +9,25 @@ import (
|
|||||||
|
|
||||||
func TestLoad_ValidConfig(t *testing.T) {
|
func TestLoad_ValidConfig(t *testing.T) {
|
||||||
content := `
|
content := `
|
||||||
service:
|
|
||||||
name: logcorrelator
|
|
||||||
language: go
|
|
||||||
|
|
||||||
inputs:
|
inputs:
|
||||||
unix_sockets:
|
unix_sockets:
|
||||||
- name: apache_source
|
- name: http
|
||||||
path: /var/run/logcorrelator/apache.sock
|
path: /var/run/logcorrelator/http.socket
|
||||||
format: json
|
format: json
|
||||||
- name: network_source
|
- name: network
|
||||||
path: /var/run/logcorrelator/network.sock
|
path: /var/run/logcorrelator/network.socket
|
||||||
format: json
|
format: json
|
||||||
|
|
||||||
outputs:
|
outputs:
|
||||||
file:
|
file:
|
||||||
enabled: true
|
|
||||||
path: /var/log/logcorrelator/correlated.log
|
path: /var/log/logcorrelator/correlated.log
|
||||||
clickhouse:
|
clickhouse:
|
||||||
enabled: false
|
|
||||||
dsn: clickhouse://user:pass@localhost:9000/db
|
dsn: clickhouse://user:pass@localhost:9000/db
|
||||||
table: correlated_logs
|
table: correlated_logs
|
||||||
|
|
||||||
correlation:
|
correlation:
|
||||||
key:
|
time_window_s: 1
|
||||||
- src_ip
|
emit_orphans: true
|
||||||
- src_port
|
|
||||||
time_window:
|
|
||||||
value: 1
|
|
||||||
unit: s
|
|
||||||
orphan_policy:
|
|
||||||
apache_always_emit: true
|
|
||||||
network_emit: false
|
|
||||||
`
|
`
|
||||||
|
|
||||||
tmpDir := t.TempDir()
|
tmpDir := t.TempDir()
|
||||||
@ -54,14 +41,11 @@ correlation:
|
|||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Service.Name != "logcorrelator" {
|
|
||||||
t.Errorf("expected service name logcorrelator, got %s", cfg.Service.Name)
|
|
||||||
}
|
|
||||||
if len(cfg.Inputs.UnixSockets) != 2 {
|
if len(cfg.Inputs.UnixSockets) != 2 {
|
||||||
t.Errorf("expected 2 unix sockets, got %d", len(cfg.Inputs.UnixSockets))
|
t.Errorf("expected 2 unix sockets, got %d", len(cfg.Inputs.UnixSockets))
|
||||||
}
|
}
|
||||||
if !cfg.Outputs.File.Enabled {
|
if cfg.Outputs.File.Path != "/var/log/logcorrelator/correlated.log" {
|
||||||
t.Error("expected file output enabled")
|
t.Errorf("expected file path /var/log/logcorrelator/correlated.log, got %s", cfg.Outputs.File.Path)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,8 +74,6 @@ func TestLoad_DefaultValues(t *testing.T) {
|
|||||||
tmpDir := t.TempDir()
|
tmpDir := t.TempDir()
|
||||||
configPath := filepath.Join(tmpDir, "config.yml")
|
configPath := filepath.Join(tmpDir, "config.yml")
|
||||||
content := `
|
content := `
|
||||||
service:
|
|
||||||
name: test-service
|
|
||||||
inputs:
|
inputs:
|
||||||
unix_sockets:
|
unix_sockets:
|
||||||
- name: a
|
- name: a
|
||||||
@ -100,7 +82,7 @@ inputs:
|
|||||||
path: /tmp/b.sock
|
path: /tmp/b.sock
|
||||||
outputs:
|
outputs:
|
||||||
file:
|
file:
|
||||||
enabled: true
|
path: /var/log/test.log
|
||||||
`
|
`
|
||||||
if err := os.WriteFile(configPath, []byte(content), 0644); err != nil {
|
if err := os.WriteFile(configPath, []byte(content), 0644); err != nil {
|
||||||
t.Fatalf("failed to write config: %v", err)
|
t.Fatalf("failed to write config: %v", err)
|
||||||
@ -111,15 +93,12 @@ outputs:
|
|||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Service.Name != "test-service" {
|
|
||||||
t.Errorf("expected service name test-service, got %s", cfg.Service.Name)
|
|
||||||
}
|
|
||||||
// Check defaults
|
// Check defaults
|
||||||
if cfg.Correlation.TimeWindow.Value != 1 {
|
if cfg.Correlation.TimeWindowS != 1 {
|
||||||
t.Errorf("expected default time window value 1, got %d", cfg.Correlation.TimeWindow.Value)
|
t.Errorf("expected default time window value 1, got %d", cfg.Correlation.TimeWindowS)
|
||||||
}
|
}
|
||||||
if cfg.Correlation.OrphanPolicy.ApacheAlwaysEmit != true {
|
if cfg.Correlation.EmitOrphans != true {
|
||||||
t.Error("expected default apache_always_emit to be true")
|
t.Error("expected default emit_orphans to be true")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,7 +110,7 @@ func TestValidate_MinimumInputs(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: true},
|
File: FileOutputConfig{Path: "/var/log/test.log"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,9 +129,9 @@ func TestValidate_AtLeastOneOutput(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: false},
|
File: FileOutputConfig{},
|
||||||
ClickHouse: ClickHouseOutputConfig{Enabled: false},
|
ClickHouse: ClickHouseOutputConfig{Enabled: false},
|
||||||
Stdout: StdoutOutputConfig{Enabled: false},
|
Stdout: false,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,30 +148,23 @@ func TestGetTimeWindow(t *testing.T) {
|
|||||||
expected time.Duration
|
expected time.Duration
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "seconds",
|
name: "1 second",
|
||||||
config: CorrelationConfig{
|
config: CorrelationConfig{
|
||||||
TimeWindow: TimeWindowConfig{Value: 1, Unit: "s"},
|
TimeWindowS: 1,
|
||||||
},
|
},
|
||||||
expected: time.Second,
|
expected: time.Second,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "milliseconds",
|
name: "5 seconds",
|
||||||
config: CorrelationConfig{
|
config: CorrelationConfig{
|
||||||
TimeWindow: TimeWindowConfig{Value: 500, Unit: "ms"},
|
TimeWindowS: 5,
|
||||||
},
|
},
|
||||||
expected: 500 * time.Millisecond,
|
expected: 5 * time.Second,
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "minutes",
|
|
||||||
config: CorrelationConfig{
|
|
||||||
TimeWindow: TimeWindowConfig{Value: 2, Unit: "m"},
|
|
||||||
},
|
|
||||||
expected: 2 * time.Minute,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "default",
|
name: "default",
|
||||||
config: CorrelationConfig{
|
config: CorrelationConfig{
|
||||||
TimeWindow: TimeWindowConfig{},
|
TimeWindowS: 0,
|
||||||
},
|
},
|
||||||
expected: time.Second,
|
expected: time.Second,
|
||||||
},
|
},
|
||||||
@ -217,7 +189,7 @@ func TestValidate_DuplicateNames(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: true},
|
File: FileOutputConfig{Path: "/var/log/test.log"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -236,7 +208,7 @@ func TestValidate_DuplicatePaths(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: true},
|
File: FileOutputConfig{Path: "/var/log/test.log"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -255,7 +227,7 @@ func TestValidate_EmptyName(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: true},
|
File: FileOutputConfig{Path: "/var/log/test.log"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -274,7 +246,7 @@ func TestValidate_EmptyPath(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: true},
|
File: FileOutputConfig{Path: "/var/log/test.log"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -293,7 +265,7 @@ func TestValidate_EmptyFilePath(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: true, Path: ""},
|
File: FileOutputConfig{Path: ""},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -312,7 +284,7 @@ func TestValidate_ClickHouseMissingDSN(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: false},
|
File: FileOutputConfig{Path: ""},
|
||||||
ClickHouse: ClickHouseOutputConfig{
|
ClickHouse: ClickHouseOutputConfig{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
DSN: "",
|
DSN: "",
|
||||||
@ -336,7 +308,7 @@ func TestValidate_ClickHouseMissingTable(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: false},
|
File: FileOutputConfig{Path: ""},
|
||||||
ClickHouse: ClickHouseOutputConfig{
|
ClickHouse: ClickHouseOutputConfig{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
DSN: "clickhouse://localhost:9000/db",
|
DSN: "clickhouse://localhost:9000/db",
|
||||||
@ -360,7 +332,7 @@ func TestValidate_ClickHouseInvalidBatchSize(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: false},
|
File: FileOutputConfig{Path: ""},
|
||||||
ClickHouse: ClickHouseOutputConfig{
|
ClickHouse: ClickHouseOutputConfig{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
DSN: "clickhouse://localhost:9000/db",
|
DSN: "clickhouse://localhost:9000/db",
|
||||||
@ -385,7 +357,7 @@ func TestValidate_ClickHouseInvalidMaxBufferSize(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: false},
|
File: FileOutputConfig{Path: ""},
|
||||||
ClickHouse: ClickHouseOutputConfig{
|
ClickHouse: ClickHouseOutputConfig{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
DSN: "clickhouse://localhost:9000/db",
|
DSN: "clickhouse://localhost:9000/db",
|
||||||
@ -411,7 +383,7 @@ func TestValidate_ClickHouseInvalidTimeout(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: false},
|
File: FileOutputConfig{Path: ""},
|
||||||
ClickHouse: ClickHouseOutputConfig{
|
ClickHouse: ClickHouseOutputConfig{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
DSN: "clickhouse://localhost:9000/db",
|
DSN: "clickhouse://localhost:9000/db",
|
||||||
@ -437,33 +409,10 @@ func TestValidate_EmptyCorrelationKey(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
Outputs: OutputsConfig{
|
Outputs: OutputsConfig{
|
||||||
File: FileOutputConfig{Enabled: true},
|
File: FileOutputConfig{Path: "/var/log/test.log"},
|
||||||
},
|
},
|
||||||
Correlation: CorrelationConfig{
|
Correlation: CorrelationConfig{
|
||||||
Key: []string{},
|
TimeWindowS: 0,
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err := cfg.Validate()
|
|
||||||
if err == nil {
|
|
||||||
t.Error("expected error for empty correlation key")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestValidate_InvalidTimeWindow(t *testing.T) {
|
|
||||||
cfg := &Config{
|
|
||||||
Inputs: InputsConfig{
|
|
||||||
UnixSockets: []UnixSocketConfig{
|
|
||||||
{Name: "a", Path: "/tmp/a.sock"},
|
|
||||||
{Name: "b", Path: "/tmp/b.sock"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Outputs: OutputsConfig{
|
|
||||||
File: FileOutputConfig{Enabled: true},
|
|
||||||
},
|
|
||||||
Correlation: CorrelationConfig{
|
|
||||||
Key: []string{"src_ip", "src_port"},
|
|
||||||
TimeWindow: TimeWindowConfig{Value: 0},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -473,13 +422,55 @@ func TestValidate_InvalidTimeWindow(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetTimeWindow_UnknownUnit(t *testing.T) {
|
func TestGetSocketPermissions(t *testing.T) {
|
||||||
config := CorrelationConfig{
|
tests := []struct {
|
||||||
TimeWindow: TimeWindowConfig{Value: 5, Unit: "unknown"},
|
name string
|
||||||
|
config UnixSocketConfig
|
||||||
|
expected os.FileMode
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "default",
|
||||||
|
config: UnixSocketConfig{
|
||||||
|
SocketPermissions: "",
|
||||||
|
},
|
||||||
|
expected: 0660,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "explicit 0660",
|
||||||
|
config: UnixSocketConfig{
|
||||||
|
SocketPermissions: "0660",
|
||||||
|
},
|
||||||
|
expected: 0660,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "explicit 0666",
|
||||||
|
config: UnixSocketConfig{
|
||||||
|
SocketPermissions: "0666",
|
||||||
|
},
|
||||||
|
expected: 0666,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "without leading zero",
|
||||||
|
config: UnixSocketConfig{
|
||||||
|
SocketPermissions: "660",
|
||||||
|
},
|
||||||
|
expected: 0660,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid value",
|
||||||
|
config: UnixSocketConfig{
|
||||||
|
SocketPermissions: "invalid",
|
||||||
|
},
|
||||||
|
expected: 0660,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
result := config.GetTimeWindow()
|
|
||||||
expected := 5 * time.Second // Should default to seconds
|
for _, tt := range tests {
|
||||||
if result != expected {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
t.Errorf("expected %v, got %v", expected, result)
|
result := tt.config.GetSocketPermissions()
|
||||||
|
if result != tt.expected {
|
||||||
|
t.Errorf("expected %o, got %o", tt.expected, result)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
# Compatible with CentOS 7, Rocky Linux 8, 9, 10
|
# Compatible with CentOS 7, Rocky Linux 8, 9, 10
|
||||||
|
|
||||||
# Define version before Version: field for RPM macro support
|
# Define version before Version: field for RPM macro support
|
||||||
%global spec_version 1.0.4
|
%global spec_version 1.0.6
|
||||||
|
|
||||||
Name: logcorrelator
|
Name: logcorrelator
|
||||||
Version: %{spec_version}
|
Version: %{spec_version}
|
||||||
|
|||||||
Reference in New Issue
Block a user