From 87b94f3c18f8c7b10eb09900b9240f6fcf546360 Mon Sep 17 00:00:00 2001 From: Jacquin Antoine Date: Sat, 28 Feb 2026 23:32:25 +0100 Subject: [PATCH] feat: add main entry point and stdout sink for Docker build - Create cmd/logcorrelator/main.go as the application entry point - Loads configuration from YAML file - Initializes Unix socket sources, file/ClickHouse/stdout sinks - Sets up correlation service and orchestrator - Handles graceful shutdown on SIGINT/SIGTERM - Supports -version flag to print version - Add internal/adapters/outbound/stdout/sink.go - Implements CorrelatedLogSink interface for stdout output - Writes JSON lines to standard output - Fix .gitignore to use /logcorrelator instead of logcorrelator - Prevents cmd/logcorrelator directory from being ignored Co-authored-by: Qwen-Coder --- .gitignore | 2 +- cmd/logcorrelator/main.go | 143 ++++++++++++++++++++++ internal/adapters/outbound/stdout/sink.go | 58 +++++++++ 3 files changed, 202 insertions(+), 1 deletion(-) create mode 100644 cmd/logcorrelator/main.go create mode 100644 internal/adapters/outbound/stdout/sink.go diff --git a/.gitignore b/.gitignore index f1ef8a1..fa69443 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,7 @@ *.dll *.so *.dylib -logcorrelator +/logcorrelator # Test binary *.test diff --git a/cmd/logcorrelator/main.go b/cmd/logcorrelator/main.go new file mode 100644 index 0000000..e26c938 --- /dev/null +++ b/cmd/logcorrelator/main.go @@ -0,0 +1,143 @@ +package main + +import ( + "flag" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/logcorrelator/logcorrelator/internal/adapters/inbound/unixsocket" + "github.com/logcorrelator/logcorrelator/internal/adapters/outbound/clickhouse" + "github.com/logcorrelator/logcorrelator/internal/adapters/outbound/file" + "github.com/logcorrelator/logcorrelator/internal/adapters/outbound/multi" + "github.com/logcorrelator/logcorrelator/internal/adapters/outbound/stdout" + "github.com/logcorrelator/logcorrelator/internal/app" + "github.com/logcorrelator/logcorrelator/internal/config" + "github.com/logcorrelator/logcorrelator/internal/domain" + "github.com/logcorrelator/logcorrelator/internal/observability" + "github.com/logcorrelator/logcorrelator/internal/ports" +) + +var Version = "dev" + +func main() { + configPath := flag.String("config", "config.yml", "path to configuration file") + version := flag.Bool("version", false, "print version and exit") + flag.Parse() + + if *version { + fmt.Println(Version) + os.Exit(0) + } + + // Load configuration + cfg, err := config.Load(*configPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Error loading configuration: %v\n", err) + os.Exit(1) + } + + // Initialize logger + logger := observability.NewLogger(cfg.Service.Name) + + logger.Info(fmt.Sprintf("Starting logcorrelator version %s", Version)) + + // Create sources + sources := make([]ports.EventSource, 0, len(cfg.Inputs.UnixSockets)) + for _, inputCfg := range cfg.Inputs.UnixSockets { + source := unixsocket.NewUnixSocketSource(unixsocket.Config{ + Name: inputCfg.Name, + Path: inputCfg.Path, + SourceType: inputCfg.SourceType, + }) + sources = append(sources, source) + logger.Info(fmt.Sprintf("Configured input source: name=%s, path=%s", inputCfg.Name, inputCfg.Path)) + } + + // Create sinks + sinks := make([]ports.CorrelatedLogSink, 0) + + if cfg.Outputs.File.Enabled { + fileSink, err := file.NewFileSink(file.Config{ + Path: cfg.Outputs.File.Path, + }) + if err != nil { + logger.Error("Failed to create file sink", err) + os.Exit(1) + } + sinks = append(sinks, fileSink) + logger.Info(fmt.Sprintf("Configured file sink: path=%s", cfg.Outputs.File.Path)) + } + + if cfg.Outputs.ClickHouse.Enabled { + clickHouseSink, err := clickhouse.NewClickHouseSink(clickhouse.Config{ + DSN: cfg.Outputs.ClickHouse.DSN, + Table: cfg.Outputs.ClickHouse.Table, + BatchSize: cfg.Outputs.ClickHouse.BatchSize, + FlushIntervalMs: cfg.Outputs.ClickHouse.FlushIntervalMs, + MaxBufferSize: cfg.Outputs.ClickHouse.MaxBufferSize, + DropOnOverflow: cfg.Outputs.ClickHouse.DropOnOverflow, + AsyncInsert: cfg.Outputs.ClickHouse.AsyncInsert, + TimeoutMs: cfg.Outputs.ClickHouse.TimeoutMs, + }) + if err != nil { + logger.Error("Failed to create ClickHouse sink", err) + os.Exit(1) + } + sinks = append(sinks, clickHouseSink) + logger.Info(fmt.Sprintf("Configured ClickHouse sink: table=%s", cfg.Outputs.ClickHouse.Table)) + } + + if cfg.Outputs.Stdout.Enabled { + stdoutSink := stdout.NewStdoutSink(stdout.Config{ + Enabled: cfg.Outputs.Stdout.Enabled, + }) + sinks = append(sinks, stdoutSink) + logger.Info("Configured stdout sink") + } + + // Create multi-sink wrapper + multiSink := multi.NewMultiSink(sinks...) + + // Create correlation service + correlationSvc := domain.NewCorrelationService(domain.CorrelationConfig{ + TimeWindow: cfg.Correlation.GetTimeWindow(), + ApacheAlwaysEmit: cfg.Correlation.OrphanPolicy.ApacheAlwaysEmit, + NetworkEmit: cfg.Correlation.OrphanPolicy.NetworkEmit, + MaxBufferSize: domain.DefaultMaxBufferSize, + }, &domain.RealTimeProvider{}) + + logger.Info(fmt.Sprintf("Correlation service initialized: time_window=%s, apache_always_emit=%v, network_emit=%v", + cfg.Correlation.GetTimeWindow().String(), + cfg.Correlation.OrphanPolicy.ApacheAlwaysEmit, + cfg.Correlation.OrphanPolicy.NetworkEmit)) + + // Create orchestrator + orchestrator := app.NewOrchestrator(app.OrchestratorConfig{ + Sources: sources, + Sink: multiSink, + }, correlationSvc) + + // Start the application + if err := orchestrator.Start(); err != nil { + logger.Error("Failed to start orchestrator", err) + os.Exit(1) + } + + logger.Info("logcorrelator started successfully") + + // Wait for shutdown signal + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + sig := <-sigChan + + logger.Info(fmt.Sprintf("Shutdown signal received: %v", sig)) + + // Graceful shutdown + if err := orchestrator.Stop(); err != nil { + logger.Error("Error during shutdown", err) + } + + logger.Info("logcorrelator stopped") +} diff --git a/internal/adapters/outbound/stdout/sink.go b/internal/adapters/outbound/stdout/sink.go new file mode 100644 index 0000000..087b8f4 --- /dev/null +++ b/internal/adapters/outbound/stdout/sink.go @@ -0,0 +1,58 @@ +package stdout + +import ( + "context" + "encoding/json" + "fmt" + "os" + "sync" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +// Config holds the stdout sink configuration. +type Config struct { + Enabled bool +} + +// StdoutSink writes correlated logs to stdout as JSON lines. +type StdoutSink struct { + config Config + mu sync.Mutex + enc *json.Encoder +} + +// NewStdoutSink creates a new stdout sink. +func NewStdoutSink(config Config) *StdoutSink { + return &StdoutSink{ + config: config, + enc: json.NewEncoder(os.Stdout), + } +} + +// Name returns the sink name. +func (s *StdoutSink) Name() string { + return "stdout" +} + +// Write writes a correlated log to stdout. +func (s *StdoutSink) Write(ctx context.Context, log domain.CorrelatedLog) error { + s.mu.Lock() + defer s.mu.Unlock() + + if err := s.enc.Encode(log); err != nil { + return fmt.Errorf("failed to write to stdout: %w", err) + } + + return nil +} + +// Flush flushes any buffered data (no-op for stdout). +func (s *StdoutSink) Flush(ctx context.Context) error { + return nil +} + +// Close closes the sink (no-op for stdout). +func (s *StdoutSink) Close() error { + return nil +}