chore: version 1.0.7 - add log levels
Some checks failed
Build and Test / test (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / docker (push) Has been cancelled

- Add configurable log levels: DEBUG, INFO, WARN, ERROR
- Replace debug.enabled with log.level in configuration
- Add Warn/Warnf methods for warning messages
- Log orphan events and buffer overflow as WARN
- Log parse errors as WARN
- Log raw events and correlations as DEBUG

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Jacquin Antoine
2026-03-01 02:33:04 +01:00
parent 56c2923121
commit a3ae5421cf
12 changed files with 408 additions and 88 deletions

View File

@ -13,6 +13,7 @@ import (
"time"
"github.com/logcorrelator/logcorrelator/internal/domain"
"github.com/logcorrelator/logcorrelator/internal/observability"
)
const (
@ -41,6 +42,7 @@ type UnixSocketSource struct {
wg sync.WaitGroup
semaphore chan struct{} // Limit concurrent connections
stopOnce sync.Once
logger *observability.Logger
}
// NewUnixSocketSource creates a new Unix socket source.
@ -49,9 +51,15 @@ func NewUnixSocketSource(config Config) *UnixSocketSource {
config: config,
done: make(chan struct{}),
semaphore: make(chan struct{}, MaxConcurrentConnections),
logger: observability.NewLogger("unixsocket:" + config.Name),
}
}
// SetLogger sets the logger for the source (for debug mode).
func (s *UnixSocketSource) SetLogger(logger *observability.Logger) {
s.logger = logger.WithFields(map[string]any{"source": s.config.Name})
}
// Name returns the source name.
func (s *UnixSocketSource) Name() string {
return s.config.Name
@ -166,10 +174,15 @@ func (s *UnixSocketSource) readEvents(ctx context.Context, conn net.Conn, eventC
event, err := parseJSONEvent(line, s.config.SourceType)
if err != nil {
// Log parse errors but continue processing
// Log parse errors as warnings
s.logger.Warnf("parse error: %v", err)
continue
}
// Debug: log raw events
s.logger.Debugf("event received: source=%s src_ip=%s src_port=%d",
event.Source, event.SrcIP, event.SrcPort)
select {
case eventChan <- event:
case <-ctx.Done():

View File

@ -12,9 +12,23 @@ import (
// Config holds the complete application configuration.
type Config struct {
Inputs InputsConfig `yaml:"inputs"`
Outputs OutputsConfig `yaml:"outputs"`
Correlation CorrelationConfig `yaml:"correlation"`
Log LogConfig `yaml:"log"`
Inputs InputsConfig `yaml:"inputs"`
Outputs OutputsConfig `yaml:"outputs"`
Correlation CorrelationConfig `yaml:"correlation"`
}
// LogConfig holds logging configuration.
type LogConfig struct {
Level string `yaml:"level"` // DEBUG, INFO, WARN, ERROR
}
// GetLogLevel returns the log level, defaulting to INFO if not set.
func (c *LogConfig) GetLevel() string {
if c.Level == "" {
return "INFO"
}
return strings.ToUpper(c.Level)
}
// ServiceConfig holds service-level configuration.
@ -97,6 +111,9 @@ func Load(path string) (*Config, error) {
// defaultConfig returns a Config with default values.
func defaultConfig() *Config {
return &Config{
Log: LogConfig{
Level: "INFO",
},
Inputs: InputsConfig{
UnixSockets: make([]UnixSocketConfig, 0),
},

View File

@ -474,3 +474,83 @@ func TestGetSocketPermissions(t *testing.T) {
})
}
}
func TestLogConfig_GetLevel(t *testing.T) {
tests := []struct {
name string
config LogConfig
expected string
}{
{
name: "default",
config: LogConfig{Level: ""},
expected: "INFO",
},
{
name: "DEBUG uppercase",
config: LogConfig{Level: "DEBUG"},
expected: "DEBUG",
},
{
name: "debug lowercase",
config: LogConfig{Level: "debug"},
expected: "DEBUG",
},
{
name: "WARN",
config: LogConfig{Level: "WARN"},
expected: "WARN",
},
{
name: "ERROR",
config: LogConfig{Level: "ERROR"},
expected: "ERROR",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.config.GetLevel()
if result != tt.expected {
t.Errorf("GetLevel() = %v, want %v", result, tt.expected)
}
})
}
}
func TestLoad_LogLevel(t *testing.T) {
content := `
log:
level: DEBUG
inputs:
unix_sockets:
- name: http
path: /var/run/logcorrelator/http.socket
- name: network
path: /var/run/logcorrelator/network.socket
outputs:
file:
path: /var/log/test.log
correlation:
time_window_s: 1
emit_orphans: true
`
tmpDir := t.TempDir()
configPath := filepath.Join(tmpDir, "config.yml")
if err := os.WriteFile(configPath, []byte(content), 0644); err != nil {
t.Fatalf("failed to write config: %v", err)
}
cfg, err := Load(configPath)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if cfg.Log.GetLevel() != "DEBUG" {
t.Errorf("expected log level DEBUG, got %s", cfg.Log.GetLevel())
}
}

View File

@ -4,6 +4,8 @@ import (
"container/list"
"sync"
"time"
"github.com/logcorrelator/logcorrelator/internal/observability"
)
const (
@ -30,6 +32,7 @@ type CorrelationService struct {
pendingA map[string][]*list.Element // key -> ordered elements containing *NormalizedEvent
pendingB map[string][]*list.Element
timeProvider TimeProvider
logger *observability.Logger
}
type eventBuffer struct {
@ -73,9 +76,15 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider)
pendingA: make(map[string][]*list.Element),
pendingB: make(map[string][]*list.Element),
timeProvider: timeProvider,
logger: observability.NewLogger("correlation"),
}
}
// SetLogger sets the logger for the correlation service.
func (s *CorrelationService) SetLogger(logger *observability.Logger) {
s.logger = logger
}
// ProcessEvent processes an incoming event and returns correlated logs if matches are found.
func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLog {
s.mu.Lock()
@ -87,6 +96,8 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
// Check buffer overflow before adding
if s.isBufferFull(event.Source) {
// Buffer full, drop event or emit as orphan
s.logger.Warnf("buffer full, dropping event: source=%s src_ip=%s src_port=%d",
event.Source, event.SrcIP, event.SrcPort)
if event.Source == SourceA && s.config.ApacheAlwaysEmit {
return []CorrelatedLog{NewCorrelatedLogFromEvent(event, "A")}
}
@ -112,11 +123,23 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
if shouldBuffer {
s.addEvent(event)
s.logger.Debugf("event buffered: source=%s src_ip=%s src_port=%d buffer_size=%d",
event.Source, event.SrcIP, event.SrcPort, s.getBufferSize(event.Source))
}
return results
}
func (s *CorrelationService) getBufferSize(source EventSource) int {
switch source {
case SourceA:
return s.bufferA.events.Len()
case SourceB:
return s.bufferB.events.Len()
}
return 0
}
func (s *CorrelationService) isBufferFull(source EventSource) bool {
switch source {
case SourceA:
@ -135,12 +158,15 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate
return s.eventsMatch(event, other)
}); bEvent != nil {
correlated := NewCorrelatedLog(event, bEvent)
s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)",
event.SrcIP, event.SrcPort, bEvent.SrcIP, bEvent.SrcPort)
return []CorrelatedLog{correlated}, false
}
// No match found
// No match found - orphan A event
if s.config.ApacheAlwaysEmit {
orphan := NewCorrelatedLogFromEvent(event, "A")
s.logger.Warnf("orphan A event (no B match): src_ip=%s src_port=%d", event.SrcIP, event.SrcPort)
return []CorrelatedLog{orphan}, false
}
@ -156,12 +182,15 @@ func (s *CorrelationService) processSourceB(event *NormalizedEvent) ([]Correlate
return s.eventsMatch(other, event)
}); aEvent != nil {
correlated := NewCorrelatedLog(aEvent, event)
s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)",
aEvent.SrcIP, aEvent.SrcPort, event.SrcIP, event.SrcPort)
return []CorrelatedLog{correlated}, false
}
// No match found
// No match found - orphan B event (not emitted by default)
if s.config.NetworkEmit {
orphan := NewCorrelatedLogFromEvent(event, "B")
s.logger.Warnf("orphan B event (no A match): src_ip=%s src_port=%d", event.SrcIP, event.SrcPort)
return []CorrelatedLog{orphan}, false
}

View File

@ -1,34 +1,109 @@
package observability
import (
"fmt"
"log"
"os"
"strings"
"sync"
)
// Logger provides structured logging.
type Logger struct {
mu sync.Mutex
logger *log.Logger
prefix string
fields map[string]any
// LogLevel represents the severity of a log message.
type LogLevel int
const (
DEBUG LogLevel = iota
INFO
WARN
ERROR
)
// ParseLogLevel converts a string to LogLevel.
func ParseLogLevel(level string) LogLevel {
switch strings.ToUpper(level) {
case "DEBUG":
return DEBUG
case "INFO":
return INFO
case "WARN", "WARNING":
return WARN
case "ERROR":
return ERROR
default:
return INFO
}
}
// NewLogger creates a new logger.
// String returns the string representation of LogLevel.
func (l LogLevel) String() string {
switch l {
case DEBUG:
return "DEBUG"
case INFO:
return "INFO"
case WARN:
return "WARN"
case ERROR:
return "ERROR"
default:
return "INFO"
}
}
// Logger provides structured logging.
type Logger struct {
mu sync.Mutex
logger *log.Logger
prefix string
fields map[string]any
minLevel LogLevel
}
// NewLogger creates a new logger with INFO level by default.
func NewLogger(prefix string) *Logger {
return &Logger{
logger: log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds),
prefix: prefix,
fields: make(map[string]any),
logger: log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds),
prefix: prefix,
fields: make(map[string]any),
minLevel: INFO,
}
}
// NewLoggerWithLevel creates a new logger with specified minimum level.
func NewLoggerWithLevel(prefix string, level string) *Logger {
return &Logger{
logger: log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds),
prefix: prefix,
fields: make(map[string]any),
minLevel: ParseLogLevel(level),
}
}
// SetLevel sets the minimum log level.
func (l *Logger) SetLevel(level string) {
l.mu.Lock()
defer l.mu.Unlock()
l.minLevel = ParseLogLevel(level)
}
// ShouldLog returns true if the given level should be logged.
func (l *Logger) ShouldLog(level LogLevel) bool {
l.mu.Lock()
defer l.mu.Unlock()
return level >= l.minLevel
}
// WithFields returns a new logger with additional fields.
func (l *Logger) WithFields(fields map[string]any) *Logger {
l.mu.Lock()
minLevel := l.minLevel
l.mu.Unlock()
newLogger := &Logger{
logger: l.logger,
prefix: l.prefix,
fields: make(map[string]any),
logger: l.logger,
prefix: l.prefix,
fields: make(map[string]any),
minLevel: minLevel,
}
for k, v := range l.fields {
newLogger.fields[k] = v
@ -41,13 +116,29 @@ func (l *Logger) WithFields(fields map[string]any) *Logger {
// Info logs an info message.
func (l *Logger) Info(msg string) {
if !l.ShouldLog(INFO) {
return
}
l.mu.Lock()
defer l.mu.Unlock()
l.log("INFO", msg)
}
// Warn logs a warning message.
func (l *Logger) Warn(msg string) {
if !l.ShouldLog(WARN) {
return
}
l.mu.Lock()
defer l.mu.Unlock()
l.log("WARN", msg)
}
// Error logs an error message.
func (l *Logger) Error(msg string, err error) {
if !l.ShouldLog(ERROR) {
return
}
l.mu.Lock()
defer l.mu.Unlock()
if err != nil {
@ -57,13 +148,46 @@ func (l *Logger) Error(msg string, err error) {
}
}
// Debug logs a debug message.
// Debug logs a debug message (only if debug level is enabled).
func (l *Logger) Debug(msg string) {
if !l.ShouldLog(DEBUG) {
return
}
l.mu.Lock()
defer l.mu.Unlock()
l.log("DEBUG", msg)
}
// Debugf logs a formatted debug message (only if debug level is enabled).
func (l *Logger) Debugf(msg string, args ...any) {
if !l.ShouldLog(DEBUG) {
return
}
l.mu.Lock()
defer l.mu.Unlock()
l.log("DEBUG", fmt.Sprintf(msg, args...))
}
// Warnf logs a formatted warning message.
func (l *Logger) Warnf(msg string, args ...any) {
if !l.ShouldLog(WARN) {
return
}
l.mu.Lock()
defer l.mu.Unlock()
l.log("WARN", fmt.Sprintf(msg, args...))
}
// Infof logs a formatted info message.
func (l *Logger) Infof(msg string, args ...any) {
if !l.ShouldLog(INFO) {
return
}
l.mu.Lock()
defer l.mu.Unlock()
l.log("INFO", fmt.Sprintf(msg, args...))
}
func (l *Logger) log(level, msg string) {
prefix := l.prefix
if prefix != "" {

View File

@ -1,10 +1,6 @@
package observability
import (
"bytes"
"io"
"os"
"strings"
"testing"
)
@ -19,72 +15,107 @@ func TestNewLogger(t *testing.T) {
}
func TestLogger_Info(t *testing.T) {
// Capture stderr
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stderr = w
logger := NewLogger("test")
logger := NewLoggerWithLevel("test", "INFO")
// INFO should be logged
if !logger.ShouldLog(INFO) {
t.Error("INFO should be enabled")
}
logger.Info("test message")
w.Close()
os.Stderr = oldStderr
var buf bytes.Buffer
io.Copy(&buf, r)
output := buf.String()
if !strings.Contains(output, "INFO") {
t.Error("expected INFO in output")
}
if !strings.Contains(output, "test message") {
t.Error("expected 'test message' in output")
}
}
func TestLogger_Error(t *testing.T) {
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stderr = w
logger := NewLogger("test")
logger := NewLoggerWithLevel("test", "ERROR")
// ERROR should be logged
if !logger.ShouldLog(ERROR) {
t.Error("ERROR should be enabled")
}
logger.Error("error message", nil)
w.Close()
os.Stderr = oldStderr
var buf bytes.Buffer
io.Copy(&buf, r)
output := buf.String()
if !strings.Contains(output, "ERROR") {
t.Error("expected ERROR in output")
}
if !strings.Contains(output, "error message") {
t.Error("expected 'error message' in output")
}
}
func TestLogger_Debug(t *testing.T) {
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stderr = w
logger := NewLogger("test")
logger.Debug("debug message")
w.Close()
os.Stderr = oldStderr
var buf bytes.Buffer
io.Copy(&buf, r)
output := buf.String()
if !strings.Contains(output, "DEBUG") {
t.Error("expected DEBUG in output")
// Debug should be disabled by default (INFO is default)
if logger.ShouldLog(DEBUG) {
t.Error("debug should be disabled by default")
}
if !strings.Contains(output, "debug message") {
t.Error("expected 'debug message' in output")
// Enable debug level
logger.SetLevel("DEBUG")
if !logger.ShouldLog(DEBUG) {
t.Error("debug should be enabled after SetLevel(DEBUG)")
}
// Just verify ShouldLog works correctly
logger.Debug("test message") // Should not panic
}
func TestLogger_SetLevel(t *testing.T) {
logger := NewLogger("test")
// Default is INFO
if logger.minLevel != INFO {
t.Error("default level should be INFO")
}
// Test all levels
logger.SetLevel("DEBUG")
if !logger.ShouldLog(DEBUG) {
t.Error("DEBUG should be enabled after SetLevel(DEBUG)")
}
logger.SetLevel("INFO")
if logger.ShouldLog(DEBUG) {
t.Error("DEBUG should be disabled after SetLevel(INFO)")
}
if !logger.ShouldLog(INFO) {
t.Error("INFO should be enabled after SetLevel(INFO)")
}
logger.SetLevel("WARN")
if logger.ShouldLog(INFO) {
t.Error("INFO should be disabled after SetLevel(WARN)")
}
if !logger.ShouldLog(WARN) {
t.Error("WARN should be enabled after SetLevel(WARN)")
}
logger.SetLevel("ERROR")
if logger.ShouldLog(WARN) {
t.Error("WARN should be disabled after SetLevel(ERROR)")
}
if !logger.ShouldLog(ERROR) {
t.Error("ERROR should be enabled after SetLevel(ERROR)")
}
}
func TestParseLogLevel(t *testing.T) {
tests := []struct {
input string
expected LogLevel
}{
{"DEBUG", DEBUG},
{"debug", DEBUG},
{"INFO", INFO},
{"info", INFO},
{"WARN", WARN},
{"warn", WARN},
{"WARNING", WARN},
{"ERROR", ERROR},
{"error", ERROR},
{"", INFO}, // default
{"invalid", INFO}, // default
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
result := ParseLogLevel(tt.input)
if result != tt.expected {
t.Errorf("ParseLogLevel(%q) = %v, want %v", tt.input, result, tt.expected)
}
})
}
}