feat: add main entry point and stdout sink for Docker build
- Create cmd/logcorrelator/main.go as the application entry point - Loads configuration from YAML file - Initializes Unix socket sources, file/ClickHouse/stdout sinks - Sets up correlation service and orchestrator - Handles graceful shutdown on SIGINT/SIGTERM - Supports -version flag to print version - Add internal/adapters/outbound/stdout/sink.go - Implements CorrelatedLogSink interface for stdout output - Writes JSON lines to standard output - Fix .gitignore to use /logcorrelator instead of logcorrelator - Prevents cmd/logcorrelator directory from being ignored Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@ -8,7 +8,7 @@
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
logcorrelator
|
||||
/logcorrelator
|
||||
|
||||
# Test binary
|
||||
*.test
|
||||
|
||||
143
cmd/logcorrelator/main.go
Normal file
143
cmd/logcorrelator/main.go
Normal file
@ -0,0 +1,143 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/logcorrelator/logcorrelator/internal/adapters/inbound/unixsocket"
|
||||
"github.com/logcorrelator/logcorrelator/internal/adapters/outbound/clickhouse"
|
||||
"github.com/logcorrelator/logcorrelator/internal/adapters/outbound/file"
|
||||
"github.com/logcorrelator/logcorrelator/internal/adapters/outbound/multi"
|
||||
"github.com/logcorrelator/logcorrelator/internal/adapters/outbound/stdout"
|
||||
"github.com/logcorrelator/logcorrelator/internal/app"
|
||||
"github.com/logcorrelator/logcorrelator/internal/config"
|
||||
"github.com/logcorrelator/logcorrelator/internal/domain"
|
||||
"github.com/logcorrelator/logcorrelator/internal/observability"
|
||||
"github.com/logcorrelator/logcorrelator/internal/ports"
|
||||
)
|
||||
|
||||
var Version = "dev"
|
||||
|
||||
func main() {
|
||||
configPath := flag.String("config", "config.yml", "path to configuration file")
|
||||
version := flag.Bool("version", false, "print version and exit")
|
||||
flag.Parse()
|
||||
|
||||
if *version {
|
||||
fmt.Println(Version)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
// Load configuration
|
||||
cfg, err := config.Load(*configPath)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error loading configuration: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Initialize logger
|
||||
logger := observability.NewLogger(cfg.Service.Name)
|
||||
|
||||
logger.Info(fmt.Sprintf("Starting logcorrelator version %s", Version))
|
||||
|
||||
// Create sources
|
||||
sources := make([]ports.EventSource, 0, len(cfg.Inputs.UnixSockets))
|
||||
for _, inputCfg := range cfg.Inputs.UnixSockets {
|
||||
source := unixsocket.NewUnixSocketSource(unixsocket.Config{
|
||||
Name: inputCfg.Name,
|
||||
Path: inputCfg.Path,
|
||||
SourceType: inputCfg.SourceType,
|
||||
})
|
||||
sources = append(sources, source)
|
||||
logger.Info(fmt.Sprintf("Configured input source: name=%s, path=%s", inputCfg.Name, inputCfg.Path))
|
||||
}
|
||||
|
||||
// Create sinks
|
||||
sinks := make([]ports.CorrelatedLogSink, 0)
|
||||
|
||||
if cfg.Outputs.File.Enabled {
|
||||
fileSink, err := file.NewFileSink(file.Config{
|
||||
Path: cfg.Outputs.File.Path,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("Failed to create file sink", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
sinks = append(sinks, fileSink)
|
||||
logger.Info(fmt.Sprintf("Configured file sink: path=%s", cfg.Outputs.File.Path))
|
||||
}
|
||||
|
||||
if cfg.Outputs.ClickHouse.Enabled {
|
||||
clickHouseSink, err := clickhouse.NewClickHouseSink(clickhouse.Config{
|
||||
DSN: cfg.Outputs.ClickHouse.DSN,
|
||||
Table: cfg.Outputs.ClickHouse.Table,
|
||||
BatchSize: cfg.Outputs.ClickHouse.BatchSize,
|
||||
FlushIntervalMs: cfg.Outputs.ClickHouse.FlushIntervalMs,
|
||||
MaxBufferSize: cfg.Outputs.ClickHouse.MaxBufferSize,
|
||||
DropOnOverflow: cfg.Outputs.ClickHouse.DropOnOverflow,
|
||||
AsyncInsert: cfg.Outputs.ClickHouse.AsyncInsert,
|
||||
TimeoutMs: cfg.Outputs.ClickHouse.TimeoutMs,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("Failed to create ClickHouse sink", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
sinks = append(sinks, clickHouseSink)
|
||||
logger.Info(fmt.Sprintf("Configured ClickHouse sink: table=%s", cfg.Outputs.ClickHouse.Table))
|
||||
}
|
||||
|
||||
if cfg.Outputs.Stdout.Enabled {
|
||||
stdoutSink := stdout.NewStdoutSink(stdout.Config{
|
||||
Enabled: cfg.Outputs.Stdout.Enabled,
|
||||
})
|
||||
sinks = append(sinks, stdoutSink)
|
||||
logger.Info("Configured stdout sink")
|
||||
}
|
||||
|
||||
// Create multi-sink wrapper
|
||||
multiSink := multi.NewMultiSink(sinks...)
|
||||
|
||||
// Create correlation service
|
||||
correlationSvc := domain.NewCorrelationService(domain.CorrelationConfig{
|
||||
TimeWindow: cfg.Correlation.GetTimeWindow(),
|
||||
ApacheAlwaysEmit: cfg.Correlation.OrphanPolicy.ApacheAlwaysEmit,
|
||||
NetworkEmit: cfg.Correlation.OrphanPolicy.NetworkEmit,
|
||||
MaxBufferSize: domain.DefaultMaxBufferSize,
|
||||
}, &domain.RealTimeProvider{})
|
||||
|
||||
logger.Info(fmt.Sprintf("Correlation service initialized: time_window=%s, apache_always_emit=%v, network_emit=%v",
|
||||
cfg.Correlation.GetTimeWindow().String(),
|
||||
cfg.Correlation.OrphanPolicy.ApacheAlwaysEmit,
|
||||
cfg.Correlation.OrphanPolicy.NetworkEmit))
|
||||
|
||||
// Create orchestrator
|
||||
orchestrator := app.NewOrchestrator(app.OrchestratorConfig{
|
||||
Sources: sources,
|
||||
Sink: multiSink,
|
||||
}, correlationSvc)
|
||||
|
||||
// Start the application
|
||||
if err := orchestrator.Start(); err != nil {
|
||||
logger.Error("Failed to start orchestrator", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
logger.Info("logcorrelator started successfully")
|
||||
|
||||
// Wait for shutdown signal
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
sig := <-sigChan
|
||||
|
||||
logger.Info(fmt.Sprintf("Shutdown signal received: %v", sig))
|
||||
|
||||
// Graceful shutdown
|
||||
if err := orchestrator.Stop(); err != nil {
|
||||
logger.Error("Error during shutdown", err)
|
||||
}
|
||||
|
||||
logger.Info("logcorrelator stopped")
|
||||
}
|
||||
58
internal/adapters/outbound/stdout/sink.go
Normal file
58
internal/adapters/outbound/stdout/sink.go
Normal file
@ -0,0 +1,58 @@
|
||||
package stdout
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/logcorrelator/logcorrelator/internal/domain"
|
||||
)
|
||||
|
||||
// Config holds the stdout sink configuration.
|
||||
type Config struct {
|
||||
Enabled bool
|
||||
}
|
||||
|
||||
// StdoutSink writes correlated logs to stdout as JSON lines.
|
||||
type StdoutSink struct {
|
||||
config Config
|
||||
mu sync.Mutex
|
||||
enc *json.Encoder
|
||||
}
|
||||
|
||||
// NewStdoutSink creates a new stdout sink.
|
||||
func NewStdoutSink(config Config) *StdoutSink {
|
||||
return &StdoutSink{
|
||||
config: config,
|
||||
enc: json.NewEncoder(os.Stdout),
|
||||
}
|
||||
}
|
||||
|
||||
// Name returns the sink name.
|
||||
func (s *StdoutSink) Name() string {
|
||||
return "stdout"
|
||||
}
|
||||
|
||||
// Write writes a correlated log to stdout.
|
||||
func (s *StdoutSink) Write(ctx context.Context, log domain.CorrelatedLog) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if err := s.enc.Encode(log); err != nil {
|
||||
return fmt.Errorf("failed to write to stdout: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush flushes any buffered data (no-op for stdout).
|
||||
func (s *StdoutSink) Flush(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the sink (no-op for stdout).
|
||||
func (s *StdoutSink) Close() error {
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user