From 514cb553efb4112277a2476b6422fc03c143c87c Mon Sep 17 00:00:00 2001 From: Jacquin Antoine Date: Sat, 28 Feb 2026 22:26:20 +0100 Subject: [PATCH] feat: release v1.0.3 with flattened JSON output structure - breaking: remove apache and network subdivisions from JSON output - feat: all log fields now merged into single-level JSON structure - feat: custom MarshalJSON() implementation for flat output - chore: update ClickHouse schema to use single fields JSON column - docs: update CHANGELOG.md and README.md with v1.0.3 changes - build: bump version to 1.0.3 in build.sh and RPM spec Migration notes: - Existing ClickHouse tables need schema migration to use fields JSON column - Replace apache JSON and network JSON columns with fields JSON column Co-authored-by: Qwen-Coder --- CHANGELOG.md | 15 ++++ README.md | 3 +- build.sh | 2 +- internal/adapters/outbound/clickhouse/sink.go | 10 +-- internal/domain/correlated_log.go | 76 ++++++++++++------- internal/domain/correlated_log_test.go | 11 +-- packaging/rpm/logcorrelator.spec | 15 +++- 7 files changed, 89 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1c3d1d..809be8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,21 @@ All notable changes to logcorrelator are documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.0.3] - 2026-02-28 + +### Changed + +- **Breaking**: Flattened JSON output structure - removed `apache` and `network` subdivisions +- All log fields are now merged into a single-level JSON structure for easier parsing +- ClickHouse schema updated: replaced `apache JSON` and `network JSON` columns with single `fields JSON` column + +### Technical Details + +- Custom `MarshalJSON()` implementation flattens all fields at the root level +- Backward compatibility: existing ClickHouse tables need schema migration to use `fields JSON` column + +--- + ## [1.0.2] - 2026-02-28 ### Fixed diff --git a/README.md b/README.md index df23355..2bac6b2 100644 --- a/README.md +++ b/README.md @@ -200,8 +200,7 @@ CREATE TABLE correlated_logs_http_network ( dst_port UInt32, correlated UInt8, orphan_side String, - apache JSON, - network JSON + fields JSON ) ENGINE = MergeTree() ORDER BY (timestamp, src_ip, src_port); ``` diff --git a/build.sh b/build.sh index 9a31690..86a1a86 100755 --- a/build.sh +++ b/build.sh @@ -5,7 +5,7 @@ set -e SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$SCRIPT_DIR" -VERSION="${VERSION:-1.0.2}" +VERSION="${VERSION:-1.0.3}" OUTPUT_DIR="${SCRIPT_DIR}/dist" echo "==============================================" diff --git a/internal/adapters/outbound/clickhouse/sink.go b/internal/adapters/outbound/clickhouse/sink.go index f6329c2..78de0fb 100644 --- a/internal/adapters/outbound/clickhouse/sink.go +++ b/internal/adapters/outbound/clickhouse/sink.go @@ -234,8 +234,8 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error { } query := fmt.Sprintf(` - INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, apache, network) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, fields) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) `, s.config.Table) // Retry logic with exponential backoff @@ -277,8 +277,7 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, query string, buffer defer stmt.Close() for _, log := range buffer { - apacheJSON, _ := json.Marshal(log.Apache) - networkJSON, _ := json.Marshal(log.Network) + fieldsJSON, _ := json.Marshal(log.Fields) correlated := 0 if log.Correlated { @@ -293,8 +292,7 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, query string, buffer log.DstPort, correlated, log.OrphanSide, - string(apacheJSON), - string(networkJSON), + string(fieldsJSON), ) if err != nil { return fmt.Errorf("failed to execute insert: %w", err) diff --git a/internal/domain/correlated_log.go b/internal/domain/correlated_log.go index b4143e8..704741e 100644 --- a/internal/domain/correlated_log.go +++ b/internal/domain/correlated_log.go @@ -1,19 +1,49 @@ package domain -import "time" +import ( + "encoding/json" + "time" +) // CorrelatedLog represents the output correlated log entry. +// All fields are flattened into a single-level structure. type CorrelatedLog struct { - Timestamp time.Time `json:"timestamp"` - SrcIP string `json:"src_ip"` - SrcPort int `json:"src_port"` - DstIP string `json:"dst_ip,omitempty"` - DstPort int `json:"dst_port,omitempty"` - Correlated bool `json:"correlated"` - OrphanSide string `json:"orphan_side,omitempty"` - Apache map[string]any `json:"apache,omitempty"` - Network map[string]any `json:"network,omitempty"` - Extra map[string]any `json:"extra,omitempty"` + Timestamp time.Time `json:"timestamp"` + SrcIP string `json:"src_ip"` + SrcPort int `json:"src_port"` + DstIP string `json:"dst_ip,omitempty"` + DstPort int `json:"dst_port,omitempty"` + Correlated bool `json:"correlated"` + OrphanSide string `json:"orphan_side,omitempty"` + Fields map[string]any `json:"-"` // Additional fields, merged at marshal time +} + +// MarshalJSON implements custom JSON marshaling to flatten the structure. +func (c CorrelatedLog) MarshalJSON() ([]byte, error) { + // Create a flat map with all fields + flat := make(map[string]any) + + // Add core fields + flat["timestamp"] = c.Timestamp + flat["src_ip"] = c.SrcIP + flat["src_port"] = c.SrcPort + if c.DstIP != "" { + flat["dst_ip"] = c.DstIP + } + if c.DstPort != 0 { + flat["dst_port"] = c.DstPort + } + flat["correlated"] = c.Correlated + if c.OrphanSide != "" { + flat["orphan_side"] = c.OrphanSide + } + + // Merge additional fields + for k, v := range c.Fields { + flat[k] = v + } + + return json.Marshal(flat) } // NewCorrelatedLogFromEvent creates a correlated log from a single event (orphan). @@ -26,9 +56,7 @@ func NewCorrelatedLogFromEvent(event *NormalizedEvent, orphanSide string) Correl DstPort: event.DstPort, Correlated: false, OrphanSide: orphanSide, - Apache: extractApache(event), - Network: extractNetwork(event), - Extra: make(map[string]any), + Fields: extractFields(event), } } @@ -47,16 +75,11 @@ func NewCorrelatedLog(apacheEvent, networkEvent *NormalizedEvent) CorrelatedLog DstPort: coalesceInt(apacheEvent.DstPort, networkEvent.DstPort), Correlated: true, OrphanSide: "", - Apache: extractApache(apacheEvent), - Network: extractNetwork(networkEvent), - Extra: make(map[string]any), + Fields: mergeFields(apacheEvent, networkEvent), } } -func extractApache(e *NormalizedEvent) map[string]any { - if e.Source != SourceA { - return nil - } +func extractFields(e *NormalizedEvent) map[string]any { result := make(map[string]any) for k, v := range e.Raw { result[k] = v @@ -64,12 +87,13 @@ func extractApache(e *NormalizedEvent) map[string]any { return result } -func extractNetwork(e *NormalizedEvent) map[string]any { - if e.Source != SourceB { - return nil - } +func mergeFields(a, b *NormalizedEvent) map[string]any { result := make(map[string]any) - for k, v := range e.Raw { + // Merge fields from both events + for k, v := range a.Raw { + result[k] = v + } + for k, v := range b.Raw { result[k] = v } return result diff --git a/internal/domain/correlated_log_test.go b/internal/domain/correlated_log_test.go index 6284c5c..584a718 100644 --- a/internal/domain/correlated_log_test.go +++ b/internal/domain/correlated_log_test.go @@ -72,8 +72,8 @@ func TestNewCorrelatedLogFromEvent(t *testing.T) { if log.SrcIP != "192.168.1.1" { t.Errorf("expected src_ip 192.168.1.1, got %s", log.SrcIP) } - if log.Apache == nil { - t.Error("expected apache to be non-nil") + if log.Fields == nil { + t.Error("expected fields to be non-nil") } } @@ -106,10 +106,7 @@ func TestNewCorrelatedLog(t *testing.T) { if log.OrphanSide != "" { t.Errorf("expected orphan_side to be empty, got %s", log.OrphanSide) } - if log.Apache == nil { - t.Error("expected apache to be non-nil") - } - if log.Network == nil { - t.Error("expected network to be non-nil") + if log.Fields == nil { + t.Error("expected fields to be non-nil") } } diff --git a/packaging/rpm/logcorrelator.spec b/packaging/rpm/logcorrelator.spec index 03c2b85..74e0293 100644 --- a/packaging/rpm/logcorrelator.spec +++ b/packaging/rpm/logcorrelator.spec @@ -2,7 +2,7 @@ # Compatible with CentOS 7, Rocky Linux 8, 9, 10 Name: logcorrelator -Version: 1.0.1 +Version: 1.0.3 Release: 1%{?dist} Summary: Log correlation service for HTTP and network events @@ -112,6 +112,19 @@ fi /etc/systemd/system/logcorrelator.service %changelog +* Sat Feb 28 2026 logcorrelator - 1.0.3-1 +- Breaking: Flattened JSON output structure - removed apache and network subdivisions +- All log fields now merged into single-level JSON structure +- ClickHouse schema: replaced apache JSON and network JSON columns with fields JSON column +- Custom MarshalJSON() implementation for flat output + +* Sat Feb 28 2026 logcorrelator - 1.0.2-1 +- Fix: durcir la validation et fiabiliser flush/arrĂȘt idempotents +- Refactor: remove Debian/DEB packaging, RPM-only support +- Feat: add multi-distro RPM packaging for CentOS 7 and Rocky Linux 8/9/10 +- Feat: migrate configuration from custom format to YAML +- Refactor: remove obsolete config and update documentation + * Sat Feb 28 2026 logcorrelator - 1.0.1-1 - Fix: durcir la validation et fiabiliser flush/arrĂȘt idempotents - Refactor: remove Debian/DEB packaging, RPM-only support