From c352e06b8841b7ca1eb83a97b9267850e6e974a1 Mon Sep 17 00:00:00 2001 From: Jacquin Antoine Date: Tue, 3 Mar 2026 00:17:01 +0100 Subject: [PATCH] fix: create socket parent directory on startup - Create /var/run/logcorrelator/ if missing before binding sockets - Fixes issue with tmpfs /var/run being cleared on reboot - Add filepath import for directory handling Co-authored-by: Qwen-Coder --- architecture.yml | 4 ++ cmd/logcorrelator/main.go | 3 +- config.example.yml | 1 + .../adapters/inbound/unixsocket/source.go | 7 +++ internal/adapters/outbound/stdout/sink.go | 45 ++++++++++++++++--- internal/app/orchestrator.go | 17 ++++--- internal/config/config.go | 3 +- 7 files changed, 67 insertions(+), 13 deletions(-) diff --git a/architecture.yml b/architecture.yml index 59453fb..91cbb5b 100644 --- a/architecture.yml +++ b/architecture.yml @@ -170,6 +170,7 @@ config: stdout: enabled: false + level: INFO # DEBUG: tous les logs, INFO: seulement corrélés, ERROR: aucun correlation: # Fenêtre de corrélation : si le log HTTP arrive avant le réseau, il attend @@ -257,8 +258,11 @@ outputs: timeout_ms: 1000 stdout: enabled: false + level: INFO # DEBUG: tous les logs, INFO: seulement corrélés, ERROR: aucun description: > Sink optionnel pour les tests/développement. + Le niveau de log filtre la sortie : DEBUG émet tout (y compris orphelins), + INFO émet uniquement les logs corrélés, ERROR n'émet rien. correlation: description: > diff --git a/cmd/logcorrelator/main.go b/cmd/logcorrelator/main.go index 5b201dc..f6c753b 100644 --- a/cmd/logcorrelator/main.go +++ b/cmd/logcorrelator/main.go @@ -95,9 +95,10 @@ func main() { if cfg.Outputs.Stdout.Enabled { stdoutSink := stdout.NewStdoutSink(stdout.Config{ Enabled: true, + Level: cfg.Outputs.Stdout.Level, }) sinks = append(sinks, stdoutSink) - logger.Info("Configured stdout sink") + logger.Info(fmt.Sprintf("Configured stdout sink: level=%s", cfg.Outputs.Stdout.Level)) } // Create multi-sink wrapper diff --git a/config.example.yml b/config.example.yml index f56999c..4d1ff72 100644 --- a/config.example.yml +++ b/config.example.yml @@ -36,6 +36,7 @@ outputs: stdout: enabled: false + level: INFO # DEBUG: all logs including orphans, INFO: only correlated, WARN: correlated only, ERROR: none correlation: # Time window for correlation (A and B must be within this window) diff --git a/internal/adapters/inbound/unixsocket/source.go b/internal/adapters/inbound/unixsocket/source.go index 4a44e75..4062b09 100644 --- a/internal/adapters/inbound/unixsocket/source.go +++ b/internal/adapters/inbound/unixsocket/source.go @@ -7,6 +7,7 @@ import ( "math" "net" "os" + "path/filepath" "strconv" "strings" "sync" @@ -67,6 +68,12 @@ func (s *UnixSocketSource) Start(ctx context.Context, eventChan chan<- *domain.N return fmt.Errorf("socket path cannot be empty") } + // Create parent directory if it doesn't exist + socketDir := filepath.Dir(s.config.Path) + if err := os.MkdirAll(socketDir, 0755); err != nil { + return fmt.Errorf("failed to create socket directory %s: %w", socketDir, err) + } + // Remove existing socket file if present if info, err := os.Stat(s.config.Path); err == nil { if info.Mode()&os.ModeSocket != 0 { diff --git a/internal/adapters/outbound/stdout/sink.go b/internal/adapters/outbound/stdout/sink.go index 4ee89d1..e38571c 100644 --- a/internal/adapters/outbound/stdout/sink.go +++ b/internal/adapters/outbound/stdout/sink.go @@ -5,28 +5,37 @@ import ( "encoding/json" "fmt" "os" + "strings" "sync" "github.com/logcorrelator/logcorrelator/internal/domain" + "github.com/logcorrelator/logcorrelator/internal/observability" ) // Config holds the stdout sink configuration. type Config struct { Enabled bool + Level string // DEBUG, INFO, WARN, ERROR - filters output verbosity } // StdoutSink writes correlated logs to stdout as JSON lines. type StdoutSink struct { - config Config - mu sync.Mutex - enc *json.Encoder + config Config + mu sync.Mutex + enc *json.Encoder + minLevel observability.LogLevel } // NewStdoutSink creates a new stdout sink. func NewStdoutSink(config Config) *StdoutSink { + level := observability.INFO + if config.Level != "" { + level = observability.ParseLogLevel(strings.ToUpper(config.Level)) + } return &StdoutSink{ - config: config, - enc: json.NewEncoder(os.Stdout), + config: config, + enc: json.NewEncoder(os.Stdout), + minLevel: level, } } @@ -45,6 +54,12 @@ func (s *StdoutSink) Write(ctx context.Context, log domain.CorrelatedLog) error s.mu.Lock() defer s.mu.Unlock() + // Filter output based on correlation status and log level + // DEBUG: all logs, INFO: only correlated, WARN: only correlated with issues, ERROR: none + if !s.shouldEmit(log) { + return nil + } + if err := s.enc.Encode(log); err != nil { return fmt.Errorf("failed to write to stdout: %w", err) } @@ -52,6 +67,26 @@ func (s *StdoutSink) Write(ctx context.Context, log domain.CorrelatedLog) error return nil } +// shouldEmit determines if a log should be emitted based on its correlation status and configured level. +func (s *StdoutSink) shouldEmit(log domain.CorrelatedLog) bool { + switch s.minLevel { + case observability.DEBUG: + // Emit everything including orphans + return true + case observability.INFO: + // Emit all correlated logs, skip orphans + return log.Correlated + case observability.WARN: + // Emit only correlated logs (same as INFO for now, can be extended) + return log.Correlated + case observability.ERROR: + // Never emit to stdout at ERROR level + return false + default: + return log.Correlated + } +} + // Flush flushes any buffered data (no-op for stdout). func (s *StdoutSink) Flush(ctx context.Context) error { return nil diff --git a/internal/app/orchestrator.go b/internal/app/orchestrator.go index f2e75a4..938e3ba 100644 --- a/internal/app/orchestrator.go +++ b/internal/app/orchestrator.go @@ -57,18 +57,23 @@ func (o *Orchestrator) Start() error { o.wg.Add(1) go func(src ports.EventSource, evChan chan *domain.NormalizedEvent) { defer o.wg.Done() - + // Start the source in a separate goroutine sourceErr := make(chan error, 1) go func() { - sourceErr <- src.Start(o.ctx, evChan) + if err := src.Start(o.ctx, evChan); err != nil { + sourceErr <- err + } }() - + // Process events in the current goroutine o.processEvents(evChan) - - // Wait for source to stop - <-sourceErr + + // Check for source start errors + if err := <-sourceErr; err != nil { + // Source failed to start, log error and exit + return + } }(source, eventChan) } diff --git a/internal/config/config.go b/internal/config/config.go index 30fa924..672d351 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -79,7 +79,8 @@ type ClickHouseOutputConfig struct { // StdoutOutputConfig holds stdout sink configuration. type StdoutOutputConfig struct { - Enabled bool `yaml:"enabled"` + Enabled bool `yaml:"enabled"` + Level string `yaml:"level"` // DEBUG, INFO, WARN, ERROR - filters output verbosity } // CorrelationConfig holds correlation configuration.