feat: release v1.0.3 with flattened JSON output structure

- breaking: remove apache and network subdivisions from JSON output
- feat: all log fields now merged into single-level JSON structure
- feat: custom MarshalJSON() implementation for flat output
- chore: update ClickHouse schema to use single fields JSON column
- docs: update CHANGELOG.md and README.md with v1.0.3 changes
- build: bump version to 1.0.3 in build.sh and RPM spec

Migration notes:
- Existing ClickHouse tables need schema migration to use fields JSON column
- Replace apache JSON and network JSON columns with fields JSON column

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Jacquin Antoine
2026-02-28 22:26:20 +01:00
parent 180c57c35b
commit 514cb553ef
7 changed files with 89 additions and 43 deletions

View File

@ -5,6 +5,21 @@ All notable changes to logcorrelator are documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.0.3] - 2026-02-28
### Changed
- **Breaking**: Flattened JSON output structure - removed `apache` and `network` subdivisions
- All log fields are now merged into a single-level JSON structure for easier parsing
- ClickHouse schema updated: replaced `apache JSON` and `network JSON` columns with single `fields JSON` column
### Technical Details
- Custom `MarshalJSON()` implementation flattens all fields at the root level
- Backward compatibility: existing ClickHouse tables need schema migration to use `fields JSON` column
---
## [1.0.2] - 2026-02-28 ## [1.0.2] - 2026-02-28
### Fixed ### Fixed

View File

@ -200,8 +200,7 @@ CREATE TABLE correlated_logs_http_network (
dst_port UInt32, dst_port UInt32,
correlated UInt8, correlated UInt8,
orphan_side String, orphan_side String,
apache JSON, fields JSON
network JSON
) ENGINE = MergeTree() ) ENGINE = MergeTree()
ORDER BY (timestamp, src_ip, src_port); ORDER BY (timestamp, src_ip, src_port);
``` ```

View File

@ -5,7 +5,7 @@ set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR" cd "$SCRIPT_DIR"
VERSION="${VERSION:-1.0.2}" VERSION="${VERSION:-1.0.3}"
OUTPUT_DIR="${SCRIPT_DIR}/dist" OUTPUT_DIR="${SCRIPT_DIR}/dist"
echo "==============================================" echo "=============================================="

View File

@ -234,8 +234,8 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error {
} }
query := fmt.Sprintf(` query := fmt.Sprintf(`
INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, apache, network) INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, fields)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`, s.config.Table) `, s.config.Table)
// Retry logic with exponential backoff // Retry logic with exponential backoff
@ -277,8 +277,7 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, query string, buffer
defer stmt.Close() defer stmt.Close()
for _, log := range buffer { for _, log := range buffer {
apacheJSON, _ := json.Marshal(log.Apache) fieldsJSON, _ := json.Marshal(log.Fields)
networkJSON, _ := json.Marshal(log.Network)
correlated := 0 correlated := 0
if log.Correlated { if log.Correlated {
@ -293,8 +292,7 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, query string, buffer
log.DstPort, log.DstPort,
correlated, correlated,
log.OrphanSide, log.OrphanSide,
string(apacheJSON), string(fieldsJSON),
string(networkJSON),
) )
if err != nil { if err != nil {
return fmt.Errorf("failed to execute insert: %w", err) return fmt.Errorf("failed to execute insert: %w", err)

View File

@ -1,19 +1,49 @@
package domain package domain
import "time" import (
"encoding/json"
"time"
)
// CorrelatedLog represents the output correlated log entry. // CorrelatedLog represents the output correlated log entry.
// All fields are flattened into a single-level structure.
type CorrelatedLog struct { type CorrelatedLog struct {
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
SrcIP string `json:"src_ip"` SrcIP string `json:"src_ip"`
SrcPort int `json:"src_port"` SrcPort int `json:"src_port"`
DstIP string `json:"dst_ip,omitempty"` DstIP string `json:"dst_ip,omitempty"`
DstPort int `json:"dst_port,omitempty"` DstPort int `json:"dst_port,omitempty"`
Correlated bool `json:"correlated"` Correlated bool `json:"correlated"`
OrphanSide string `json:"orphan_side,omitempty"` OrphanSide string `json:"orphan_side,omitempty"`
Apache map[string]any `json:"apache,omitempty"` Fields map[string]any `json:"-"` // Additional fields, merged at marshal time
Network map[string]any `json:"network,omitempty"` }
Extra map[string]any `json:"extra,omitempty"`
// MarshalJSON implements custom JSON marshaling to flatten the structure.
func (c CorrelatedLog) MarshalJSON() ([]byte, error) {
// Create a flat map with all fields
flat := make(map[string]any)
// Add core fields
flat["timestamp"] = c.Timestamp
flat["src_ip"] = c.SrcIP
flat["src_port"] = c.SrcPort
if c.DstIP != "" {
flat["dst_ip"] = c.DstIP
}
if c.DstPort != 0 {
flat["dst_port"] = c.DstPort
}
flat["correlated"] = c.Correlated
if c.OrphanSide != "" {
flat["orphan_side"] = c.OrphanSide
}
// Merge additional fields
for k, v := range c.Fields {
flat[k] = v
}
return json.Marshal(flat)
} }
// NewCorrelatedLogFromEvent creates a correlated log from a single event (orphan). // NewCorrelatedLogFromEvent creates a correlated log from a single event (orphan).
@ -26,9 +56,7 @@ func NewCorrelatedLogFromEvent(event *NormalizedEvent, orphanSide string) Correl
DstPort: event.DstPort, DstPort: event.DstPort,
Correlated: false, Correlated: false,
OrphanSide: orphanSide, OrphanSide: orphanSide,
Apache: extractApache(event), Fields: extractFields(event),
Network: extractNetwork(event),
Extra: make(map[string]any),
} }
} }
@ -47,16 +75,11 @@ func NewCorrelatedLog(apacheEvent, networkEvent *NormalizedEvent) CorrelatedLog
DstPort: coalesceInt(apacheEvent.DstPort, networkEvent.DstPort), DstPort: coalesceInt(apacheEvent.DstPort, networkEvent.DstPort),
Correlated: true, Correlated: true,
OrphanSide: "", OrphanSide: "",
Apache: extractApache(apacheEvent), Fields: mergeFields(apacheEvent, networkEvent),
Network: extractNetwork(networkEvent),
Extra: make(map[string]any),
} }
} }
func extractApache(e *NormalizedEvent) map[string]any { func extractFields(e *NormalizedEvent) map[string]any {
if e.Source != SourceA {
return nil
}
result := make(map[string]any) result := make(map[string]any)
for k, v := range e.Raw { for k, v := range e.Raw {
result[k] = v result[k] = v
@ -64,12 +87,13 @@ func extractApache(e *NormalizedEvent) map[string]any {
return result return result
} }
func extractNetwork(e *NormalizedEvent) map[string]any { func mergeFields(a, b *NormalizedEvent) map[string]any {
if e.Source != SourceB {
return nil
}
result := make(map[string]any) result := make(map[string]any)
for k, v := range e.Raw { // Merge fields from both events
for k, v := range a.Raw {
result[k] = v
}
for k, v := range b.Raw {
result[k] = v result[k] = v
} }
return result return result

View File

@ -72,8 +72,8 @@ func TestNewCorrelatedLogFromEvent(t *testing.T) {
if log.SrcIP != "192.168.1.1" { if log.SrcIP != "192.168.1.1" {
t.Errorf("expected src_ip 192.168.1.1, got %s", log.SrcIP) t.Errorf("expected src_ip 192.168.1.1, got %s", log.SrcIP)
} }
if log.Apache == nil { if log.Fields == nil {
t.Error("expected apache to be non-nil") t.Error("expected fields to be non-nil")
} }
} }
@ -106,10 +106,7 @@ func TestNewCorrelatedLog(t *testing.T) {
if log.OrphanSide != "" { if log.OrphanSide != "" {
t.Errorf("expected orphan_side to be empty, got %s", log.OrphanSide) t.Errorf("expected orphan_side to be empty, got %s", log.OrphanSide)
} }
if log.Apache == nil { if log.Fields == nil {
t.Error("expected apache to be non-nil") t.Error("expected fields to be non-nil")
}
if log.Network == nil {
t.Error("expected network to be non-nil")
} }
} }

View File

@ -2,7 +2,7 @@
# Compatible with CentOS 7, Rocky Linux 8, 9, 10 # Compatible with CentOS 7, Rocky Linux 8, 9, 10
Name: logcorrelator Name: logcorrelator
Version: 1.0.1 Version: 1.0.3
Release: 1%{?dist} Release: 1%{?dist}
Summary: Log correlation service for HTTP and network events Summary: Log correlation service for HTTP and network events
@ -112,6 +112,19 @@ fi
/etc/systemd/system/logcorrelator.service /etc/systemd/system/logcorrelator.service
%changelog %changelog
* Sat Feb 28 2026 logcorrelator <dev@example.com> - 1.0.3-1
- Breaking: Flattened JSON output structure - removed apache and network subdivisions
- All log fields now merged into single-level JSON structure
- ClickHouse schema: replaced apache JSON and network JSON columns with fields JSON column
- Custom MarshalJSON() implementation for flat output
* Sat Feb 28 2026 logcorrelator <dev@example.com> - 1.0.2-1
- Fix: durcir la validation et fiabiliser flush/arrêt idempotents
- Refactor: remove Debian/DEB packaging, RPM-only support
- Feat: add multi-distro RPM packaging for CentOS 7 and Rocky Linux 8/9/10
- Feat: migrate configuration from custom format to YAML
- Refactor: remove obsolete config and update documentation
* Sat Feb 28 2026 logcorrelator <dev@example.com> - 1.0.1-1 * Sat Feb 28 2026 logcorrelator <dev@example.com> - 1.0.1-1
- Fix: durcir la validation et fiabiliser flush/arrêt idempotents - Fix: durcir la validation et fiabiliser flush/arrêt idempotents
- Refactor: remove Debian/DEB packaging, RPM-only support - Refactor: remove Debian/DEB packaging, RPM-only support