From 560ee59d8568e2235147f44de64012e2512a7bbb Mon Sep 17 00:00:00 2001 From: toto Date: Tue, 3 Mar 2026 11:49:41 +0100 Subject: [PATCH] fix: insert into http_logs_raw with single raw_json column - Table schema has only one column: raw_json (String) - Serialize entire CorrelatedLog as JSON string - Use INSERT INTO table (raw_json) with single Append() argument - Fix "No such column timestamp" errors Co-authored-by: Qwen-Coder --- internal/adapters/outbound/clickhouse/sink.go | 28 ++++++------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/internal/adapters/outbound/clickhouse/sink.go b/internal/adapters/outbound/clickhouse/sink.go index 72f5069..bd51e16 100644 --- a/internal/adapters/outbound/clickhouse/sink.go +++ b/internal/adapters/outbound/clickhouse/sink.go @@ -280,9 +280,9 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, buffer []domain.Corre return fmt.Errorf("clickhouse connection is not initialized") } - query := fmt.Sprintf(` - INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, fields) - `, s.config.Table) + // Table schema: http_logs_raw (raw_json String) + // Single column insert - the entire log is serialized as JSON string + query := fmt.Sprintf(`INSERT INTO %s (raw_json)`, s.config.Table) // Prepare batch using native clickhouse-go/v2 API batch, err := s.conn.PrepareBatch(ctx, query) @@ -291,26 +291,14 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, buffer []domain.Corre } for i, log := range buffer { - fieldsJSON, marshalErr := json.Marshal(log.Fields) + // Marshal the entire CorrelatedLog to JSON + logJSON, marshalErr := json.Marshal(log) if marshalErr != nil { - return fmt.Errorf("failed to marshal fields for log %d: %w", i, marshalErr) + return fmt.Errorf("failed to marshal log %d to JSON: %w", i, marshalErr) } - correlated := uint8(0) - if log.Correlated { - correlated = 1 - } - - appendErr := batch.Append( - log.Timestamp, - log.SrcIP, - log.SrcPort, - log.DstIP, - log.DstPort, - correlated, - log.OrphanSide, - string(fieldsJSON), - ) + // Append the JSON string as the raw_json column value + appendErr := batch.Append(string(logJSON)) if appendErr != nil { return fmt.Errorf("failed to append log %d to batch: %w", i, appendErr) }