diff --git a/Dockerfile b/Dockerfile index a442c8b..fd7855e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,8 +23,7 @@ RUN go mod download || true COPY . . # Run tests with coverage (fail if < 80%) -RUN --mount=type=cache,target=/root/.cache/go-build \ - go test -race -coverprofile=coverage.txt -covermode=atomic ./... && \ +RUN go test -race -coverprofile=coverage.txt -covermode=atomic ./... && \ echo "=== Coverage Report ===" && \ go tool cover -func=coverage.txt | grep total && \ TOTAL=$(go tool cover -func=coverage.txt | grep total | awk '{gsub(/%/, "", $3); print $3}') && \ @@ -36,8 +35,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build \ echo "Coverage check passed!" # Build binary -RUN --mount=type=cache,target=/root/.cache/go-build \ - CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ -ldflags="-w -s" \ -o /usr/bin/logcorrelator \ ./cmd/logcorrelator diff --git a/internal/adapters/outbound/clickhouse/sink.go b/internal/adapters/outbound/clickhouse/sink.go index 199249b..72f5069 100644 --- a/internal/adapters/outbound/clickhouse/sink.go +++ b/internal/adapters/outbound/clickhouse/sink.go @@ -2,7 +2,6 @@ package clickhouse import ( "context" - "database/sql" "encoding/json" "errors" "fmt" @@ -11,7 +10,7 @@ import ( "sync" "time" - _ "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/logcorrelator/logcorrelator/internal/domain" ) @@ -47,7 +46,7 @@ type Config struct { // ClickHouseSink writes correlated logs to ClickHouse. type ClickHouseSink struct { config Config - db *sql.DB + conn clickhouse.Conn mu sync.Mutex buffer []domain.CorrelatedLog flushChan chan struct{} @@ -86,22 +85,28 @@ func NewClickHouseSink(config Config) (*ClickHouseSink, error) { done: make(chan struct{}), } - // Connect to ClickHouse - db, err := sql.Open("clickhouse", config.DSN) + // Parse DSN and create options + options, err := clickhouse.ParseDSN(config.DSN) + if err != nil { + return nil, fmt.Errorf("failed to parse ClickHouse DSN: %w", err) + } + + // Connect to ClickHouse using native API + conn, err := clickhouse.Open(options) if err != nil { return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err) } - // Ping with timeout + // Ping with timeout to verify connection pingCtx, pingCancel := context.WithTimeout(context.Background(), time.Duration(DefaultPingTimeoutMs)*time.Millisecond) defer pingCancel() - if err := db.PingContext(pingCtx); err != nil { - _ = db.Close() + if err := conn.Ping(pingCtx); err != nil { + _ = conn.Close() return nil, fmt.Errorf("failed to ping ClickHouse: %w", err) } - s.db = db + s.conn = conn // Start flush goroutine s.wg.Add(1) @@ -176,8 +181,8 @@ func (s *ClickHouseSink) Close() error { closeErr = err } - if s.db != nil { - if err := s.db.Close(); err != nil && closeErr == nil { + if s.conn != nil { + if err := s.conn.Close(); err != nil && closeErr == nil { closeErr = err } } @@ -238,14 +243,12 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error { s.buffer = make([]domain.CorrelatedLog, 0, s.config.BatchSize) s.mu.Unlock() - if s.db == nil { + if s.conn == nil { return fmt.Errorf("clickhouse connection is not initialized") } - query := fmt.Sprintf(` - INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, fields) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - `, s.config.Table) + // Log batch info before sending + batchSize := len(buffer) // Retry logic with exponential backoff var lastErr error @@ -259,7 +262,7 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error { } } - lastErr = s.executeBatch(ctx, query, buffer) + lastErr = s.executeBatch(ctx, buffer) if lastErr == nil { return nil } @@ -269,31 +272,36 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error { } } - return fmt.Errorf("failed after %d retries: %w", MaxRetries, lastErr) + return fmt.Errorf("failed after %d retries (batch size: %d): %w", MaxRetries, batchSize, lastErr) } -func (s *ClickHouseSink) executeBatch(ctx context.Context, query string, buffer []domain.CorrelatedLog) error { - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) +func (s *ClickHouseSink) executeBatch(ctx context.Context, buffer []domain.CorrelatedLog) error { + if s.conn == nil { + return fmt.Errorf("clickhouse connection is not initialized") } - defer tx.Rollback() - stmt, err := tx.PrepareContext(ctx, query) + query := fmt.Sprintf(` + INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, fields) + `, s.config.Table) + + // Prepare batch using native clickhouse-go/v2 API + batch, err := s.conn.PrepareBatch(ctx, query) if err != nil { - return fmt.Errorf("failed to prepare statement: %w", err) + return fmt.Errorf("failed to prepare batch: %w", err) } - defer stmt.Close() - for _, log := range buffer { - fieldsJSON, _ := json.Marshal(log.Fields) + for i, log := range buffer { + fieldsJSON, marshalErr := json.Marshal(log.Fields) + if marshalErr != nil { + return fmt.Errorf("failed to marshal fields for log %d: %w", i, marshalErr) + } - correlated := 0 + correlated := uint8(0) if log.Correlated { correlated = 1 } - _, err := stmt.ExecContext(ctx, + appendErr := batch.Append( log.Timestamp, log.SrcIP, log.SrcPort, @@ -303,13 +311,15 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, query string, buffer log.OrphanSide, string(fieldsJSON), ) - if err != nil { - return fmt.Errorf("failed to execute insert: %w", err) + if appendErr != nil { + return fmt.Errorf("failed to append log %d to batch: %w", i, appendErr) } } - if err := tx.Commit(); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) + // Send the batch - DO NOT FORGET this step + sendErr := batch.Send() + if sendErr != nil { + return fmt.Errorf("failed to send batch (%d rows): %w", len(buffer), sendErr) } return nil diff --git a/internal/adapters/outbound/clickhouse/sink_test.go b/internal/adapters/outbound/clickhouse/sink_test.go index c30bf9e..f7792b7 100644 --- a/internal/adapters/outbound/clickhouse/sink_test.go +++ b/internal/adapters/outbound/clickhouse/sink_test.go @@ -105,7 +105,7 @@ func TestClickHouseSink_IsRetryableError(t *testing.T) { func TestClickHouseSink_FlushEmpty(t *testing.T) { // Test that flushing an empty buffer doesn't cause issues // (We can't test actual ClickHouse operations without a real instance) - + s := &ClickHouseSink{ config: Config{ DSN: "clickhouse://test:test@localhost:9000/test", diff --git a/packaging/rpm/logcorrelator.spec b/packaging/rpm/logcorrelator.spec index bf8c071..fbdaf4d 100644 --- a/packaging/rpm/logcorrelator.spec +++ b/packaging/rpm/logcorrelator.spec @@ -2,7 +2,7 @@ # Compatible with CentOS 7, Rocky Linux 8, 9, 10 # Define version before Version: field for RPM macro support -%global spec_version 1.1.4 +%global spec_version 1.1.5 Name: logcorrelator Version: %{spec_version} @@ -121,7 +121,10 @@ fi /etc/logrotate.d/logcorrelator %changelog -* Mon Mar 02 2026 logcorrelator - 1.1.5-1 +* Tue Mar 03 2026 logcorrelator - 1.1.5-1 +- Fix: ClickHouse insertion using native clickhouse-go/v2 API (PrepareBatch + Append + Send) +- Fix: Replaced database/sql wrapper with clickhouse.Open() and clickhouse.Conn +- Fix: Proper batch sending to avoid ATTEMPT_TO_READ_AFTER_EOF errors - Fix: Set correct permissions (755) on /var/run/logcorrelator in RPM post-install * Mon Mar 02 2026 logcorrelator - 1.1.4-1