fix: insert into http_logs_raw with single raw_json column
Some checks failed
Build and Test / test (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / docker (push) Has been cancelled

- Table schema has only one column: raw_json (String)
- Serialize entire CorrelatedLog as JSON string
- Use INSERT INTO table (raw_json) with single Append() argument
- Fix "No such column timestamp" errors

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
toto
2026-03-03 11:49:41 +01:00
parent d78cc52a88
commit 560ee59d85

View File

@ -280,9 +280,9 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, buffer []domain.Corre
return fmt.Errorf("clickhouse connection is not initialized") return fmt.Errorf("clickhouse connection is not initialized")
} }
query := fmt.Sprintf(` // Table schema: http_logs_raw (raw_json String)
INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, fields) // Single column insert - the entire log is serialized as JSON string
`, s.config.Table) query := fmt.Sprintf(`INSERT INTO %s (raw_json)`, s.config.Table)
// Prepare batch using native clickhouse-go/v2 API // Prepare batch using native clickhouse-go/v2 API
batch, err := s.conn.PrepareBatch(ctx, query) batch, err := s.conn.PrepareBatch(ctx, query)
@ -291,26 +291,14 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, buffer []domain.Corre
} }
for i, log := range buffer { for i, log := range buffer {
fieldsJSON, marshalErr := json.Marshal(log.Fields) // Marshal the entire CorrelatedLog to JSON
logJSON, marshalErr := json.Marshal(log)
if marshalErr != nil { if marshalErr != nil {
return fmt.Errorf("failed to marshal fields for log %d: %w", i, marshalErr) return fmt.Errorf("failed to marshal log %d to JSON: %w", i, marshalErr)
} }
correlated := uint8(0) // Append the JSON string as the raw_json column value
if log.Correlated { appendErr := batch.Append(string(logJSON))
correlated = 1
}
appendErr := batch.Append(
log.Timestamp,
log.SrcIP,
log.SrcPort,
log.DstIP,
log.DstPort,
correlated,
log.OrphanSide,
string(fieldsJSON),
)
if appendErr != nil { if appendErr != nil {
return fmt.Errorf("failed to append log %d to batch: %w", i, appendErr) return fmt.Errorf("failed to append log %d to batch: %w", i, appendErr)
} }