fix: create socket parent directory on startup
Some checks failed
Build and Test / test (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / docker (push) Has been cancelled

- Create /var/run/logcorrelator/ if missing before binding sockets
- Fixes issue with tmpfs /var/run being cleared on reboot
- Add filepath import for directory handling

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Jacquin Antoine
2026-03-03 00:17:01 +01:00
parent 6b690a3eb3
commit c352e06b88
7 changed files with 67 additions and 13 deletions

View File

@ -170,6 +170,7 @@ config:
stdout: stdout:
enabled: false enabled: false
level: INFO # DEBUG: tous les logs, INFO: seulement corrélés, ERROR: aucun
correlation: correlation:
# Fenêtre de corrélation : si le log HTTP arrive avant le réseau, il attend # Fenêtre de corrélation : si le log HTTP arrive avant le réseau, il attend
@ -257,8 +258,11 @@ outputs:
timeout_ms: 1000 timeout_ms: 1000
stdout: stdout:
enabled: false enabled: false
level: INFO # DEBUG: tous les logs, INFO: seulement corrélés, ERROR: aucun
description: > description: >
Sink optionnel pour les tests/développement. Sink optionnel pour les tests/développement.
Le niveau de log filtre la sortie : DEBUG émet tout (y compris orphelins),
INFO émet uniquement les logs corrélés, ERROR n'émet rien.
correlation: correlation:
description: > description: >

View File

@ -95,9 +95,10 @@ func main() {
if cfg.Outputs.Stdout.Enabled { if cfg.Outputs.Stdout.Enabled {
stdoutSink := stdout.NewStdoutSink(stdout.Config{ stdoutSink := stdout.NewStdoutSink(stdout.Config{
Enabled: true, Enabled: true,
Level: cfg.Outputs.Stdout.Level,
}) })
sinks = append(sinks, stdoutSink) sinks = append(sinks, stdoutSink)
logger.Info("Configured stdout sink") logger.Info(fmt.Sprintf("Configured stdout sink: level=%s", cfg.Outputs.Stdout.Level))
} }
// Create multi-sink wrapper // Create multi-sink wrapper

View File

@ -36,6 +36,7 @@ outputs:
stdout: stdout:
enabled: false enabled: false
level: INFO # DEBUG: all logs including orphans, INFO: only correlated, WARN: correlated only, ERROR: none
correlation: correlation:
# Time window for correlation (A and B must be within this window) # Time window for correlation (A and B must be within this window)

View File

@ -7,6 +7,7 @@ import (
"math" "math"
"net" "net"
"os" "os"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -67,6 +68,12 @@ func (s *UnixSocketSource) Start(ctx context.Context, eventChan chan<- *domain.N
return fmt.Errorf("socket path cannot be empty") return fmt.Errorf("socket path cannot be empty")
} }
// Create parent directory if it doesn't exist
socketDir := filepath.Dir(s.config.Path)
if err := os.MkdirAll(socketDir, 0755); err != nil {
return fmt.Errorf("failed to create socket directory %s: %w", socketDir, err)
}
// Remove existing socket file if present // Remove existing socket file if present
if info, err := os.Stat(s.config.Path); err == nil { if info, err := os.Stat(s.config.Path); err == nil {
if info.Mode()&os.ModeSocket != 0 { if info.Mode()&os.ModeSocket != 0 {

View File

@ -5,28 +5,37 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"strings"
"sync" "sync"
"github.com/logcorrelator/logcorrelator/internal/domain" "github.com/logcorrelator/logcorrelator/internal/domain"
"github.com/logcorrelator/logcorrelator/internal/observability"
) )
// Config holds the stdout sink configuration. // Config holds the stdout sink configuration.
type Config struct { type Config struct {
Enabled bool Enabled bool
Level string // DEBUG, INFO, WARN, ERROR - filters output verbosity
} }
// StdoutSink writes correlated logs to stdout as JSON lines. // StdoutSink writes correlated logs to stdout as JSON lines.
type StdoutSink struct { type StdoutSink struct {
config Config config Config
mu sync.Mutex mu sync.Mutex
enc *json.Encoder enc *json.Encoder
minLevel observability.LogLevel
} }
// NewStdoutSink creates a new stdout sink. // NewStdoutSink creates a new stdout sink.
func NewStdoutSink(config Config) *StdoutSink { func NewStdoutSink(config Config) *StdoutSink {
level := observability.INFO
if config.Level != "" {
level = observability.ParseLogLevel(strings.ToUpper(config.Level))
}
return &StdoutSink{ return &StdoutSink{
config: config, config: config,
enc: json.NewEncoder(os.Stdout), enc: json.NewEncoder(os.Stdout),
minLevel: level,
} }
} }
@ -45,6 +54,12 @@ func (s *StdoutSink) Write(ctx context.Context, log domain.CorrelatedLog) error
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
// Filter output based on correlation status and log level
// DEBUG: all logs, INFO: only correlated, WARN: only correlated with issues, ERROR: none
if !s.shouldEmit(log) {
return nil
}
if err := s.enc.Encode(log); err != nil { if err := s.enc.Encode(log); err != nil {
return fmt.Errorf("failed to write to stdout: %w", err) return fmt.Errorf("failed to write to stdout: %w", err)
} }
@ -52,6 +67,26 @@ func (s *StdoutSink) Write(ctx context.Context, log domain.CorrelatedLog) error
return nil return nil
} }
// shouldEmit determines if a log should be emitted based on its correlation status and configured level.
func (s *StdoutSink) shouldEmit(log domain.CorrelatedLog) bool {
switch s.minLevel {
case observability.DEBUG:
// Emit everything including orphans
return true
case observability.INFO:
// Emit all correlated logs, skip orphans
return log.Correlated
case observability.WARN:
// Emit only correlated logs (same as INFO for now, can be extended)
return log.Correlated
case observability.ERROR:
// Never emit to stdout at ERROR level
return false
default:
return log.Correlated
}
}
// Flush flushes any buffered data (no-op for stdout). // Flush flushes any buffered data (no-op for stdout).
func (s *StdoutSink) Flush(ctx context.Context) error { func (s *StdoutSink) Flush(ctx context.Context) error {
return nil return nil

View File

@ -61,14 +61,19 @@ func (o *Orchestrator) Start() error {
// Start the source in a separate goroutine // Start the source in a separate goroutine
sourceErr := make(chan error, 1) sourceErr := make(chan error, 1)
go func() { go func() {
sourceErr <- src.Start(o.ctx, evChan) if err := src.Start(o.ctx, evChan); err != nil {
sourceErr <- err
}
}() }()
// Process events in the current goroutine // Process events in the current goroutine
o.processEvents(evChan) o.processEvents(evChan)
// Wait for source to stop // Check for source start errors
<-sourceErr if err := <-sourceErr; err != nil {
// Source failed to start, log error and exit
return
}
}(source, eventChan) }(source, eventChan)
} }

View File

@ -79,7 +79,8 @@ type ClickHouseOutputConfig struct {
// StdoutOutputConfig holds stdout sink configuration. // StdoutOutputConfig holds stdout sink configuration.
type StdoutOutputConfig struct { type StdoutOutputConfig struct {
Enabled bool `yaml:"enabled"` Enabled bool `yaml:"enabled"`
Level string `yaml:"level"` // DEBUG, INFO, WARN, ERROR - filters output verbosity
} }
// CorrelationConfig holds correlation configuration. // CorrelationConfig holds correlation configuration.