fix: ClickHouse insertion using native clickhouse-go/v2 API
Some checks failed
Build and Test / test (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / docker (push) Has been cancelled

- Replace database/sql wrapper with clickhouse.Open() and clickhouse.Conn
- Use PrepareBatch + Append + Send pattern for proper batch inserts
- Fix ATTEMPT_TO_READ_AFTER_EOF errors caused by empty VALUES
- Add batch size logging for debugging
- Update version to 1.1.5

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
toto
2026-03-03 11:38:33 +01:00
parent 4b4ab84ee0
commit d78cc52a88
4 changed files with 52 additions and 41 deletions

View File

@ -2,7 +2,6 @@ package clickhouse
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
@ -11,7 +10,7 @@ import (
"sync"
"time"
_ "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/logcorrelator/logcorrelator/internal/domain"
)
@ -47,7 +46,7 @@ type Config struct {
// ClickHouseSink writes correlated logs to ClickHouse.
type ClickHouseSink struct {
config Config
db *sql.DB
conn clickhouse.Conn
mu sync.Mutex
buffer []domain.CorrelatedLog
flushChan chan struct{}
@ -86,22 +85,28 @@ func NewClickHouseSink(config Config) (*ClickHouseSink, error) {
done: make(chan struct{}),
}
// Connect to ClickHouse
db, err := sql.Open("clickhouse", config.DSN)
// Parse DSN and create options
options, err := clickhouse.ParseDSN(config.DSN)
if err != nil {
return nil, fmt.Errorf("failed to parse ClickHouse DSN: %w", err)
}
// Connect to ClickHouse using native API
conn, err := clickhouse.Open(options)
if err != nil {
return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err)
}
// Ping with timeout
// Ping with timeout to verify connection
pingCtx, pingCancel := context.WithTimeout(context.Background(), time.Duration(DefaultPingTimeoutMs)*time.Millisecond)
defer pingCancel()
if err := db.PingContext(pingCtx); err != nil {
_ = db.Close()
if err := conn.Ping(pingCtx); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("failed to ping ClickHouse: %w", err)
}
s.db = db
s.conn = conn
// Start flush goroutine
s.wg.Add(1)
@ -176,8 +181,8 @@ func (s *ClickHouseSink) Close() error {
closeErr = err
}
if s.db != nil {
if err := s.db.Close(); err != nil && closeErr == nil {
if s.conn != nil {
if err := s.conn.Close(); err != nil && closeErr == nil {
closeErr = err
}
}
@ -238,14 +243,12 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error {
s.buffer = make([]domain.CorrelatedLog, 0, s.config.BatchSize)
s.mu.Unlock()
if s.db == nil {
if s.conn == nil {
return fmt.Errorf("clickhouse connection is not initialized")
}
query := fmt.Sprintf(`
INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, fields)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`, s.config.Table)
// Log batch info before sending
batchSize := len(buffer)
// Retry logic with exponential backoff
var lastErr error
@ -259,7 +262,7 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error {
}
}
lastErr = s.executeBatch(ctx, query, buffer)
lastErr = s.executeBatch(ctx, buffer)
if lastErr == nil {
return nil
}
@ -269,31 +272,36 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error {
}
}
return fmt.Errorf("failed after %d retries: %w", MaxRetries, lastErr)
return fmt.Errorf("failed after %d retries (batch size: %d): %w", MaxRetries, batchSize, lastErr)
}
func (s *ClickHouseSink) executeBatch(ctx context.Context, query string, buffer []domain.CorrelatedLog) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
func (s *ClickHouseSink) executeBatch(ctx context.Context, buffer []domain.CorrelatedLog) error {
if s.conn == nil {
return fmt.Errorf("clickhouse connection is not initialized")
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, query)
query := fmt.Sprintf(`
INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, fields)
`, s.config.Table)
// Prepare batch using native clickhouse-go/v2 API
batch, err := s.conn.PrepareBatch(ctx, query)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
return fmt.Errorf("failed to prepare batch: %w", err)
}
defer stmt.Close()
for _, log := range buffer {
fieldsJSON, _ := json.Marshal(log.Fields)
for i, log := range buffer {
fieldsJSON, marshalErr := json.Marshal(log.Fields)
if marshalErr != nil {
return fmt.Errorf("failed to marshal fields for log %d: %w", i, marshalErr)
}
correlated := 0
correlated := uint8(0)
if log.Correlated {
correlated = 1
}
_, err := stmt.ExecContext(ctx,
appendErr := batch.Append(
log.Timestamp,
log.SrcIP,
log.SrcPort,
@ -303,13 +311,15 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, query string, buffer
log.OrphanSide,
string(fieldsJSON),
)
if err != nil {
return fmt.Errorf("failed to execute insert: %w", err)
if appendErr != nil {
return fmt.Errorf("failed to append log %d to batch: %w", i, appendErr)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
// Send the batch - DO NOT FORGET this step
sendErr := batch.Send()
if sendErr != nil {
return fmt.Errorf("failed to send batch (%d rows): %w", len(buffer), sendErr)
}
return nil

View File

@ -105,7 +105,7 @@ func TestClickHouseSink_IsRetryableError(t *testing.T) {
func TestClickHouseSink_FlushEmpty(t *testing.T) {
// Test that flushing an empty buffer doesn't cause issues
// (We can't test actual ClickHouse operations without a real instance)
s := &ClickHouseSink{
config: Config{
DSN: "clickhouse://test:test@localhost:9000/test",