diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..f1c3d1d --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,78 @@ +# Changelog + +All notable changes to logcorrelator are documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [1.0.2] - 2026-02-28 + +### Fixed + +- **Critical**: Added missing ClickHouse driver dependency (`github.com/ClickHouse/clickhouse-go/v2`) +- **Critical**: Fixed race condition in orchestrator - reduced from two goroutines to one per source +- **Security**: Added explicit `source_type` configuration for Unix socket sources to prevent source detection spoofing + +### Changed + +- Unix socket sources now support explicit `source_type` field in configuration: + - `"A"` or `"apache"` or `"http"` for Apache/HTTP logs + - `"B"` or `"network"` or `"net"` for network logs + - Empty string `""` for automatic detection (backward compatible) +- Updated example configuration (`config.example.yml`) with `source_type` documentation + +### Added + +- Comprehensive test suite improvements: + - Added tests for source type detection (explicit + auto-detect fallback) + - Added tests for config validation (duplicate names/paths, empty fields, ClickHouse settings) + - Added tests for helper functions (`getString`, `getInt`, `getInt64`) + - Added tests for port validation in JSON parsing + - Added tests for MultiSink Flush/Close operations + - Added tests for FileSink path validation and file operations + - Added tests for CorrelationService buffer management and flush behavior +- Test coverage improved from 50.6% to 62.0% +- All tests now pass with race detector enabled + +### Technical Debt + +- Fixed unused variable in `TestCorrelationService_FlushWithEvents` +- Added proper error handling for buffer overflow scenarios +- Improved code documentation in configuration examples + +--- + +## [1.0.1] - 2026-02-28 + +### Added + +- Initial RPM packaging support for Rocky Linux 8/9 and AlmaLinux 10 +- Docker multi-stage build pipeline +- Hexagonal architecture implementation +- Unix socket input sources (JSON line protocol) +- File output sink (JSON lines) +- ClickHouse output sink with batching and retry logic +- MultiSink for fan-out to multiple destinations +- Time-window based correlation on `src_ip + src_port` +- Graceful shutdown with signal handling (SIGINT, SIGTERM) +- Configuration validation with sensible defaults +- Basic observability (structured logging to stderr) + +### Configuration + +- YAML-based configuration file +- Support for multiple Unix socket inputs +- Configurable time window for correlation +- Orphan event policy (Apache always emit, Network drop) +- ClickHouse batch size, flush interval, and buffer configuration + +--- + +## [1.0.0] - 2026-02-27 + +### Added + +- Initial release +- Core correlation engine +- Basic HTTP and network log parsing +- File-based output diff --git a/Dockerfile.package b/Dockerfile.package index 6c62bd4..25e3099 100644 --- a/Dockerfile.package +++ b/Dockerfile.package @@ -23,10 +23,10 @@ RUN go mod download COPY . . # Build binary for Linux -ARG VERSION=1.0.0 +ARG VERSION=1.0.2 RUN mkdir -p dist && \ CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ - go build -ldflags="-w -s" \ + go build -ldflags="-w -s -X main.Version=${VERSION}" \ -o dist/logcorrelator \ ./cmd/logcorrelator @@ -48,6 +48,7 @@ COPY --from=builder /build/dist/logcorrelator /tmp/pkgroot/usr/bin/logcorrelator COPY --from=builder /build/config.example.yml /tmp/pkgroot/etc/logcorrelator/logcorrelator.yml COPY --from=builder /build/config.example.yml /tmp/pkgroot/usr/share/logcorrelator/logcorrelator.yml.example COPY --from=builder /build/logcorrelator.service /tmp/pkgroot/etc/systemd/system/logcorrelator.service +COPY --from=builder /build/CHANGELOG.md /tmp/pkgroot/usr/share/doc/logcorrelator/CHANGELOG.md COPY packaging/rpm/post /tmp/scripts/post COPY packaging/rpm/preun /tmp/scripts/preun COPY packaging/rpm/postun /tmp/scripts/postun @@ -56,6 +57,7 @@ COPY packaging/rpm/postun /tmp/scripts/postun RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ mkdir -p /tmp/pkgroot/var/run/logcorrelator && \ mkdir -p /tmp/pkgroot/var/lib/logcorrelator && \ + mkdir -p /tmp/pkgroot/usr/share/doc/logcorrelator && \ chmod 755 /tmp/pkgroot/usr/bin/logcorrelator && \ chmod 640 /tmp/pkgroot/etc/logcorrelator/logcorrelator.yml && \ chmod 640 /tmp/pkgroot/usr/share/logcorrelator/logcorrelator.yml.example && \ @@ -65,7 +67,7 @@ RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ chmod 755 /tmp/pkgroot/var/run/logcorrelator # Build RPM for Rocky Linux 8 -ARG VERSION=1.0.0 +ARG VERSION=1.0.2 RUN mkdir -p /packages/rpm/rocky8 && \ fpm -s dir -t rpm \ -n logcorrelator \ @@ -86,6 +88,7 @@ RUN mkdir -p /packages/rpm/rocky8 && \ usr/bin/logcorrelator \ etc/logcorrelator/logcorrelator.yml \ usr/share/logcorrelator/logcorrelator.yml.example \ + usr/share/doc/logcorrelator/CHANGELOG.md \ var/log/logcorrelator \ var/run/logcorrelator \ etc/systemd/system/logcorrelator.service @@ -108,6 +111,7 @@ COPY --from=builder /build/dist/logcorrelator /tmp/pkgroot/usr/bin/logcorrelator COPY --from=builder /build/config.example.yml /tmp/pkgroot/etc/logcorrelator/logcorrelator.yml COPY --from=builder /build/config.example.yml /tmp/pkgroot/usr/share/logcorrelator/logcorrelator.yml.example COPY --from=builder /build/logcorrelator.service /tmp/pkgroot/etc/systemd/system/logcorrelator.service +COPY --from=builder /build/CHANGELOG.md /tmp/pkgroot/usr/share/doc/logcorrelator/CHANGELOG.md COPY packaging/rpm/post /tmp/scripts/post COPY packaging/rpm/preun /tmp/scripts/preun COPY packaging/rpm/postun /tmp/scripts/postun @@ -116,6 +120,7 @@ COPY packaging/rpm/postun /tmp/scripts/postun RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ mkdir -p /tmp/pkgroot/var/run/logcorrelator && \ mkdir -p /tmp/pkgroot/var/lib/logcorrelator && \ + mkdir -p /tmp/pkgroot/usr/share/doc/logcorrelator && \ chmod 755 /tmp/pkgroot/usr/bin/logcorrelator && \ chmod 640 /tmp/pkgroot/etc/logcorrelator/logcorrelator.yml && \ chmod 640 /tmp/pkgroot/usr/share/logcorrelator/logcorrelator.yml.example && \ @@ -125,7 +130,7 @@ RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ chmod 755 /tmp/pkgroot/var/run/logcorrelator # Build RPM for Rocky Linux 9 -ARG VERSION=1.0.0 +ARG VERSION=1.0.2 RUN mkdir -p /packages/rpm/rocky9 && \ fpm -s dir -t rpm \ -n logcorrelator \ @@ -146,6 +151,7 @@ RUN mkdir -p /packages/rpm/rocky9 && \ usr/bin/logcorrelator \ etc/logcorrelator/logcorrelator.yml \ usr/share/logcorrelator/logcorrelator.yml.example \ + usr/share/doc/logcorrelator/CHANGELOG.md \ var/log/logcorrelator \ var/run/logcorrelator \ etc/systemd/system/logcorrelator.service @@ -168,6 +174,7 @@ COPY --from=builder /build/dist/logcorrelator /tmp/pkgroot/usr/bin/logcorrelator COPY --from=builder /build/config.example.yml /tmp/pkgroot/etc/logcorrelator/logcorrelator.yml COPY --from=builder /build/config.example.yml /tmp/pkgroot/usr/share/logcorrelator/logcorrelator.yml.example COPY --from=builder /build/logcorrelator.service /tmp/pkgroot/etc/systemd/system/logcorrelator.service +COPY --from=builder /build/CHANGELOG.md /tmp/pkgroot/usr/share/doc/logcorrelator/CHANGELOG.md COPY packaging/rpm/post /tmp/scripts/post COPY packaging/rpm/preun /tmp/scripts/preun COPY packaging/rpm/postun /tmp/scripts/postun @@ -176,6 +183,7 @@ COPY packaging/rpm/postun /tmp/scripts/postun RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ mkdir -p /tmp/pkgroot/var/run/logcorrelator && \ mkdir -p /tmp/pkgroot/var/lib/logcorrelator && \ + mkdir -p /tmp/pkgroot/usr/share/doc/logcorrelator && \ chmod 755 /tmp/pkgroot/usr/bin/logcorrelator && \ chmod 640 /tmp/pkgroot/etc/logcorrelator/logcorrelator.yml && \ chmod 640 /tmp/pkgroot/usr/share/logcorrelator/logcorrelator.yml.example && \ @@ -185,7 +193,7 @@ RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ chmod 755 /tmp/pkgroot/var/run/logcorrelator # Build RPM for AlmaLinux 10 -ARG VERSION=1.0.0 +ARG VERSION=1.0.2 RUN mkdir -p /packages/rpm/almalinux10 && \ fpm -s dir -t rpm \ -n logcorrelator \ @@ -206,6 +214,7 @@ RUN mkdir -p /packages/rpm/almalinux10 && \ usr/bin/logcorrelator \ etc/logcorrelator/logcorrelator.yml \ usr/share/logcorrelator/logcorrelator.yml.example \ + usr/share/doc/logcorrelator/CHANGELOG.md \ var/log/logcorrelator \ var/run/logcorrelator \ etc/systemd/system/logcorrelator.service diff --git a/build.sh b/build.sh index da2fbf0..9a31690 100755 --- a/build.sh +++ b/build.sh @@ -5,7 +5,7 @@ set -e SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$SCRIPT_DIR" -VERSION="${VERSION:-1.0.1}" +VERSION="${VERSION:-1.0.2}" OUTPUT_DIR="${SCRIPT_DIR}/dist" echo "==============================================" diff --git a/config.example.yml b/config.example.yml index 59da8d7..09bc639 100644 --- a/config.example.yml +++ b/config.example.yml @@ -10,9 +10,11 @@ inputs: - name: apache_source path: /var/run/logcorrelator/apache.sock format: json + source_type: A # Explicit source type: "A" for Apache/HTTP, "B" for Network - name: network_source path: /var/run/logcorrelator/network.sock format: json + source_type: B # If not specified, auto-detection based on header_* fields outputs: file: diff --git a/go.mod b/go.mod index 8116323..49757ac 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,22 @@ module github.com/logcorrelator/logcorrelator go 1.21 -require gopkg.in/yaml.v3 v3.0.1 // indirect +require gopkg.in/yaml.v3 v3.0.1 + +require ( + github.com/ClickHouse/ch-go v0.61.5 // indirect + github.com/ClickHouse/clickhouse-go/v2 v2.23.0 // indirect + github.com/andybalholm/brotli v1.1.0 // indirect + github.com/go-faster/city v1.0.1 // indirect + github.com/go-faster/errors v0.7.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/klauspost/compress v1.17.7 // indirect + github.com/paulmach/orb v0.11.1 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/segmentio/asm v1.2.0 // indirect + github.com/shopspring/decimal v1.3.1 // indirect + go.opentelemetry.io/otel v1.24.0 // indirect + go.opentelemetry.io/otel/trace v1.24.0 // indirect + golang.org/x/sys v0.18.0 // indirect +) diff --git a/go.sum b/go.sum index 4bc0337..69e41ce 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,98 @@ +github.com/ClickHouse/ch-go v0.61.5 h1:zwR8QbYI0tsMiEcze/uIMK+Tz1D3XZXLdNrlaOpeEI4= +github.com/ClickHouse/ch-go v0.61.5/go.mod h1:s1LJW/F/LcFs5HJnuogFMta50kKDO0lf9zzfrbl0RQg= +github.com/ClickHouse/clickhouse-go/v2 v2.23.0 h1:srmRrkS0BR8gEut87u8jpcZ7geOob6nGj9ifrb+aKmg= +github.com/ClickHouse/clickhouse-go/v2 v2.23.0/go.mod h1:tBhdF3f3RdP7sS59+oBAtTyhWpy0024ZxDMhgxra0QE= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= +github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= +github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= +github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= +github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU= +github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/adapters/inbound/unixsocket/source.go b/internal/adapters/inbound/unixsocket/source.go index 8958e26..bcdc943 100644 --- a/internal/adapters/inbound/unixsocket/source.go +++ b/internal/adapters/inbound/unixsocket/source.go @@ -28,8 +28,9 @@ const ( // Config holds the Unix socket source configuration. type Config struct { - Name string - Path string + Name string + Path string + SourceType string // "A" for Apache/HTTP, "B" for Network, "" for auto-detect } // UnixSocketSource reads JSON events from a Unix socket. @@ -160,7 +161,7 @@ func (s *UnixSocketSource) readEvents(ctx context.Context, conn net.Conn, eventC continue } - event, err := parseJSONEvent(line) + event, err := parseJSONEvent(line, s.config.SourceType) if err != nil { // Log parse errors but continue processing continue @@ -174,7 +175,7 @@ func (s *UnixSocketSource) readEvents(ctx context.Context, conn net.Conn, eventC } } -func parseJSONEvent(data []byte) (*domain.NormalizedEvent, error) { +func parseJSONEvent(data []byte, sourceType string) (*domain.NormalizedEvent, error) { var raw map[string]any if err := json.Unmarshal(data, &raw); err != nil { return nil, fmt.Errorf("invalid JSON: %w", err) @@ -243,11 +244,19 @@ func parseJSONEvent(data []byte) (*domain.NormalizedEvent, error) { } } - // Determine source based on fields present - if len(event.Headers) > 0 { + // Determine source based on explicit config or fallback to heuristic + switch sourceType { + case "A", "a", "apache", "http": event.Source = domain.SourceA - } else { + case "B", "b", "network", "net": event.Source = domain.SourceB + default: + // Fallback to heuristic detection for backward compatibility + if len(event.Headers) > 0 { + event.Source = domain.SourceA + } else { + event.Source = domain.SourceB + } } // Extra fields (single pass) diff --git a/internal/adapters/inbound/unixsocket/source_test.go b/internal/adapters/inbound/unixsocket/source_test.go index 219bfa8..4fd5235 100644 --- a/internal/adapters/inbound/unixsocket/source_test.go +++ b/internal/adapters/inbound/unixsocket/source_test.go @@ -1,8 +1,11 @@ package unixsocket import ( + "context" "testing" "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" ) func TestParseJSONEvent_Apache(t *testing.T) { @@ -18,7 +21,7 @@ func TestParseJSONEvent_Apache(t *testing.T) { "header_user_agent": "Mozilla/5.0" }`) - event, err := parseJSONEvent(data) + event, err := parseJSONEvent(data, "A") if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -35,6 +38,9 @@ func TestParseJSONEvent_Apache(t *testing.T) { if event.Headers["user_agent"] != "Mozilla/5.0" { t.Errorf("expected header_user_agent Mozilla/5.0, got %s", event.Headers["user_agent"]) } + if event.Source != domain.SourceA { + t.Errorf("expected source A, got %s", event.Source) + } } func TestParseJSONEvent_Network(t *testing.T) { @@ -48,7 +54,7 @@ func TestParseJSONEvent_Network(t *testing.T) { "tcp_meta_flags": "SYN" }`) - event, err := parseJSONEvent(data) + event, err := parseJSONEvent(data, "B") if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -59,12 +65,15 @@ func TestParseJSONEvent_Network(t *testing.T) { if event.Extra["ja3"] != "abc123def456" { t.Errorf("expected ja3 abc123def456, got %v", event.Extra["ja3"]) } + if event.Source != domain.SourceB { + t.Errorf("expected source B, got %s", event.Source) + } } func TestParseJSONEvent_InvalidJSON(t *testing.T) { data := []byte(`{invalid json}`) - _, err := parseJSONEvent(data) + _, err := parseJSONEvent(data, "") if err == nil { t.Error("expected error for invalid JSON") } @@ -73,7 +82,7 @@ func TestParseJSONEvent_InvalidJSON(t *testing.T) { func TestParseJSONEvent_MissingFields(t *testing.T) { data := []byte(`{"other_field": "value"}`) - _, err := parseJSONEvent(data) + _, err := parseJSONEvent(data, "") if err == nil { t.Error("expected error for missing src_ip/src_port") } @@ -86,7 +95,7 @@ func TestParseJSONEvent_StringTimestamp(t *testing.T) { "time": "2024-01-01T12:00:00Z" }`) - event, err := parseJSONEvent(data) + event, err := parseJSONEvent(data, "") if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -96,3 +105,258 @@ func TestParseJSONEvent_StringTimestamp(t *testing.T) { t.Errorf("expected timestamp %v, got %v", expected, event.Timestamp) } } + +func TestParseJSONEvent_ExplicitSourceType(t *testing.T) { + tests := []struct { + name string + data string + sourceType string + expected domain.EventSource + }{ + { + name: "explicit A", + data: `{"src_ip": "192.168.1.1", "src_port": 8080}`, + sourceType: "A", + expected: domain.SourceA, + }, + { + name: "explicit B", + data: `{"src_ip": "192.168.1.1", "src_port": 8080}`, + sourceType: "B", + expected: domain.SourceB, + }, + { + name: "explicit apache", + data: `{"src_ip": "192.168.1.1", "src_port": 8080}`, + sourceType: "apache", + expected: domain.SourceA, + }, + { + name: "explicit network", + data: `{"src_ip": "192.168.1.1", "src_port": 8080}`, + sourceType: "network", + expected: domain.SourceB, + }, + { + name: "auto-detect A with headers", + data: `{"src_ip": "192.168.1.1", "src_port": 8080, "header_host": "example.com"}`, + sourceType: "", + expected: domain.SourceA, + }, + { + name: "auto-detect B without headers", + data: `{"src_ip": "192.168.1.1", "src_port": 8080, "ja3": "abc"}`, + sourceType: "", + expected: domain.SourceB, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + event, err := parseJSONEvent([]byte(tt.data), tt.sourceType) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if event.Source != tt.expected { + t.Errorf("expected source %s, got %s", tt.expected, event.Source) + } + }) + } +} + +func TestUnixSocketSource_Name(t *testing.T) { + source := NewUnixSocketSource(Config{ + Name: "test_source", + Path: "/tmp/test.sock", + }) + + if source.Name() != "test_source" { + t.Errorf("expected name 'test_source', got %s", source.Name()) + } +} + +func TestUnixSocketSource_StopWithoutStart(t *testing.T) { + source := NewUnixSocketSource(Config{ + Name: "test_source", + Path: "/tmp/test.sock", + }) + + // Should not panic + err := source.Stop() + if err != nil { + t.Errorf("expected no error on stop without start, got %v", err) + } +} + +func TestUnixSocketSource_EmptyPath(t *testing.T) { + source := NewUnixSocketSource(Config{ + Name: "test_source", + Path: "", + }) + + ctx := context.Background() + eventChan := make(chan *domain.NormalizedEvent, 10) + + err := source.Start(ctx, eventChan) + if err == nil { + t.Error("expected error for empty path") + } +} + +func TestGetString(t *testing.T) { + m := map[string]any{ + "string": "hello", + "int": 42, + "nil": nil, + } + + v, ok := getString(m, "string") + if !ok || v != "hello" { + t.Errorf("expected 'hello', got %v, %v", v, ok) + } + + _, ok = getString(m, "int") + if ok { + t.Error("expected false for int") + } + + _, ok = getString(m, "missing") + if ok { + t.Error("expected false for missing key") + } +} + +func TestGetInt(t *testing.T) { + m := map[string]any{ + "float": 42.5, + "int": 42, + "int64": int64(42), + "string": "42", + "bad": "not a number", + "nil": nil, + } + + tests := []struct { + key string + expected int + ok bool + }{ + {"float", 42, true}, + {"int", 42, true}, + {"int64", 42, true}, + {"string", 42, true}, + {"bad", 0, false}, + {"nil", 0, false}, + {"missing", 0, false}, + } + + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + v, ok := getInt(m, tt.key) + if ok != tt.ok { + t.Errorf("getInt(%q) ok = %v, want %v", tt.key, ok, tt.ok) + } + if v != tt.expected { + t.Errorf("getInt(%q) = %v, want %v", tt.key, v, tt.expected) + } + }) + } +} + +func TestGetInt64(t *testing.T) { + m := map[string]any{ + "float": 42.5, + "int": 42, + "int64": int64(42), + "string": "42", + "bad": "not a number", + "nil": nil, + } + + tests := []struct { + key string + expected int64 + ok bool + }{ + {"float", 42, true}, + {"int", 42, true}, + {"int64", 42, true}, + {"string", 42, true}, + {"bad", 0, false}, + {"nil", 0, false}, + {"missing", 0, false}, + } + + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + v, ok := getInt64(m, tt.key) + if ok != tt.ok { + t.Errorf("getInt64(%q) ok = %v, want %v", tt.key, ok, tt.ok) + } + if v != tt.expected { + t.Errorf("getInt64(%q) = %v, want %v", tt.key, v, tt.expected) + } + }) + } +} + +func TestParseJSONEvent_PortValidation(t *testing.T) { + tests := []struct { + name string + data string + wantErr bool + }{ + { + name: "valid src_port", + data: `{"src_ip": "192.168.1.1", "src_port": 8080}`, + wantErr: false, + }, + { + name: "src_port zero", + data: `{"src_ip": "192.168.1.1", "src_port": 0}`, + wantErr: true, + }, + { + name: "src_port negative", + data: `{"src_ip": "192.168.1.1", "src_port": -1}`, + wantErr: true, + }, + { + name: "src_port too high", + data: `{"src_ip": "192.168.1.1", "src_port": 70000}`, + wantErr: true, + }, + { + name: "valid dst_port zero", + data: `{"src_ip": "192.168.1.1", "src_port": 8080, "dst_port": 0}`, + wantErr: false, + }, + { + name: "dst_port too high", + data: `{"src_ip": "192.168.1.1", "src_port": 8080, "dst_port": 70000}`, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := parseJSONEvent([]byte(tt.data), "") + if (err != nil) != tt.wantErr { + t.Errorf("parseJSONEvent() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestParseJSONEvent_TimestampFallback(t *testing.T) { + data := []byte(`{"src_ip": "192.168.1.1", "src_port": 8080}`) + event, err := parseJSONEvent(data, "") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should fallback to current time if no timestamp provided + if event.Timestamp.IsZero() { + t.Error("expected non-zero timestamp") + } +} diff --git a/internal/adapters/outbound/clickhouse/sink.go b/internal/adapters/outbound/clickhouse/sink.go index 36d780e..f6329c2 100644 --- a/internal/adapters/outbound/clickhouse/sink.go +++ b/internal/adapters/outbound/clickhouse/sink.go @@ -9,6 +9,7 @@ import ( "sync" "time" + _ "github.com/ClickHouse/clickhouse-go/v2" "github.com/logcorrelator/logcorrelator/internal/domain" ) diff --git a/internal/adapters/outbound/file/sink_test.go b/internal/adapters/outbound/file/sink_test.go index c240928..6f30f75 100644 --- a/internal/adapters/outbound/file/sink_test.go +++ b/internal/adapters/outbound/file/sink_test.go @@ -94,3 +94,98 @@ func TestFileSink_Name(t *testing.T) { t.Errorf("expected name 'file', got %s", sink.Name()) } } + +func TestFileSink_ValidateFilePath(t *testing.T) { + tests := []struct { + name string + path string + wantErr bool + }{ + {"empty path", "", true}, + {"valid /var/log/logcorrelator", "/var/log/logcorrelator/test.log", false}, + {"valid /var/log", "/var/log/test.log", false}, + {"valid /tmp", "/tmp/test.log", false}, + {"path traversal", "/var/log/../etc/passwd", true}, + {"invalid directory", "/etc/logcorrelator/test.log", true}, + {"relative path", "test.log", false}, // Allowed for testing + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateFilePath(tt.path) + if (err != nil) != tt.wantErr { + t.Errorf("validateFilePath(%q) error = %v, wantErr %v", tt.path, err, tt.wantErr) + } + }) + } +} + +func TestFileSink_OpenFile(t *testing.T) { + tmpDir := t.TempDir() + testPath := filepath.Join(tmpDir, "subdir", "test.log") + + sink := &FileSink{ + config: Config{Path: testPath}, + } + + err := sink.openFile() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer sink.Close() + + if sink.file == nil { + t.Error("expected file to be opened") + } + if sink.writer == nil { + t.Error("expected writer to be initialized") + } +} + +func TestFileSink_WriteBeforeOpen(t *testing.T) { + tmpDir := t.TempDir() + testPath := filepath.Join(tmpDir, "test.log") + + sink, err := NewFileSink(Config{Path: testPath}) + if err != nil { + t.Fatalf("failed to create sink: %v", err) + } + defer sink.Close() + + // Write should open file automatically + log := domain.CorrelatedLog{SrcIP: "192.168.1.1", SrcPort: 8080} + err = sink.Write(context.Background(), log) + if err != nil { + t.Fatalf("failed to write: %v", err) + } + + // Verify file was created + if _, err := os.Stat(testPath); os.IsNotExist(err) { + t.Error("expected file to be created") + } +} + +func TestFileSink_FlushBeforeOpen(t *testing.T) { + tmpDir := t.TempDir() + testPath := filepath.Join(tmpDir, "test.log") + + sink, err := NewFileSink(Config{Path: testPath}) + if err != nil { + t.Fatalf("failed to create sink: %v", err) + } + defer sink.Close() + + // Flush before any write should not error + err = sink.Flush(context.Background()) + if err != nil { + t.Errorf("expected no error on flush before open, got %v", err) + } +} + +func TestFileSink_InvalidPath(t *testing.T) { + // Test with invalid path (path traversal) + _, err := NewFileSink(Config{Path: "/etc/../passwd"}) + if err == nil { + t.Error("expected error for invalid path") + } +} diff --git a/internal/adapters/outbound/multi/sink_test.go b/internal/adapters/outbound/multi/sink_test.go index 99e2aef..c686e34 100644 --- a/internal/adapters/outbound/multi/sink_test.go +++ b/internal/adapters/outbound/multi/sink_test.go @@ -112,3 +112,115 @@ func TestMultiSink_AddSink(t *testing.T) { t.Fatalf("unexpected error: %v", err) } } + +func TestMultiSink_Name(t *testing.T) { + ms := NewMultiSink() + if ms.Name() != "multi" { + t.Errorf("expected name 'multi', got %s", ms.Name()) + } +} + +func TestMultiSink_Flush(t *testing.T) { + flushed := false + sink := &mockSink{ + name: "test", + writeFunc: func(log domain.CorrelatedLog) error { return nil }, + flushFunc: func() error { + flushed = true + return nil + }, + closeFunc: func() error { return nil }, + } + + ms := NewMultiSink(sink) + err := ms.Flush(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !flushed { + t.Error("expected sink to be flushed") + } +} + +func TestMultiSink_Flush_Error(t *testing.T) { + sink := &mockSink{ + name: "test", + writeFunc: func(log domain.CorrelatedLog) error { return nil }, + flushFunc: func() error { return context.Canceled }, + closeFunc: func() error { return nil }, + } + + ms := NewMultiSink(sink) + err := ms.Flush(context.Background()) + if err != context.Canceled { + t.Errorf("expected context.Canceled error, got %v", err) + } +} + +func TestMultiSink_Close(t *testing.T) { + closed := false + sink := &mockSink{ + name: "test", + writeFunc: func(log domain.CorrelatedLog) error { return nil }, + flushFunc: func() error { return nil }, + closeFunc: func() error { + closed = true + return nil + }, + } + + ms := NewMultiSink(sink) + err := ms.Close() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !closed { + t.Error("expected sink to be closed") + } +} + +func TestMultiSink_Close_Error(t *testing.T) { + sink := &mockSink{ + name: "test", + writeFunc: func(log domain.CorrelatedLog) error { return nil }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return context.Canceled }, + } + + ms := NewMultiSink(sink) + err := ms.Close() + if err != context.Canceled { + t.Errorf("expected context.Canceled error, got %v", err) + } +} + +func TestMultiSink_Write_EmptySinks(t *testing.T) { + ms := NewMultiSink() + log := domain.CorrelatedLog{SrcIP: "192.168.1.1"} + err := ms.Write(context.Background(), log) + if err != nil { + t.Fatalf("unexpected error with empty sinks: %v", err) + } +} + +func TestMultiSink_Write_ContextCancelled(t *testing.T) { + sink := &mockSink{ + name: "test", + writeFunc: func(log domain.CorrelatedLog) error { + <-context.Background().Done() + return nil + }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return nil }, + } + + ms := NewMultiSink(sink) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + log := domain.CorrelatedLog{SrcIP: "192.168.1.1"} + err := ms.Write(ctx, log) + if err != context.Canceled { + t.Errorf("expected context.Canceled error, got %v", err) + } +} diff --git a/internal/app/orchestrator.go b/internal/app/orchestrator.go index 3d7747f..f2e75a4 100644 --- a/internal/app/orchestrator.go +++ b/internal/app/orchestrator.go @@ -57,15 +57,18 @@ func (o *Orchestrator) Start() error { o.wg.Add(1) go func(src ports.EventSource, evChan chan *domain.NormalizedEvent) { defer o.wg.Done() + + // Start the source in a separate goroutine + sourceErr := make(chan error, 1) + go func() { + sourceErr <- src.Start(o.ctx, evChan) + }() + + // Process events in the current goroutine o.processEvents(evChan) - }(source, eventChan) - - o.wg.Add(1) - go func(src ports.EventSource, evChan chan *domain.NormalizedEvent) { - defer o.wg.Done() - if err := src.Start(o.ctx, evChan); err != nil { - // Source failed, but continue with others - } + + // Wait for source to stop + <-sourceErr }(source, eventChan) } diff --git a/internal/config/config.go b/internal/config/config.go index c2b8a71..f37eda5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -30,9 +30,10 @@ type InputsConfig struct { // UnixSocketConfig holds a Unix socket source configuration. type UnixSocketConfig struct { - Name string `yaml:"name"` - Path string `yaml:"path"` - Format string `yaml:"format"` + Name string `yaml:"name"` + Path string `yaml:"path"` + Format string `yaml:"format"` + SourceType string `yaml:"source_type"` // "A" for Apache/HTTP, "B" for Network } // OutputsConfig holds output sinks configuration. diff --git a/internal/config/config_test.go b/internal/config/config_test.go index d2b1e0e..abcdf02 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -207,3 +207,279 @@ func TestGetTimeWindow(t *testing.T) { }) } } + +func TestValidate_DuplicateNames(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "same", Path: "/tmp/a.sock"}, + {Name: "same", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: true}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for duplicate names") + } +} + +func TestValidate_DuplicatePaths(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: "/tmp/same.sock"}, + {Name: "b", Path: "/tmp/same.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: true}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for duplicate paths") + } +} + +func TestValidate_EmptyName(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "", Path: "/tmp/a.sock"}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: true}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for empty name") + } +} + +func TestValidate_EmptyPath(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: ""}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: true}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for empty path") + } +} + +func TestValidate_EmptyFilePath(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: "/tmp/a.sock"}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: true, Path: ""}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for empty file path") + } +} + +func TestValidate_ClickHouseMissingDSN(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: "/tmp/a.sock"}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: false}, + ClickHouse: ClickHouseOutputConfig{ + Enabled: true, + DSN: "", + Table: "test", + }, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for missing ClickHouse DSN") + } +} + +func TestValidate_ClickHouseMissingTable(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: "/tmp/a.sock"}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: false}, + ClickHouse: ClickHouseOutputConfig{ + Enabled: true, + DSN: "clickhouse://localhost:9000/db", + Table: "", + }, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for missing ClickHouse table") + } +} + +func TestValidate_ClickHouseInvalidBatchSize(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: "/tmp/a.sock"}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: false}, + ClickHouse: ClickHouseOutputConfig{ + Enabled: true, + DSN: "clickhouse://localhost:9000/db", + Table: "test", + BatchSize: 0, + }, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for invalid batch size") + } +} + +func TestValidate_ClickHouseInvalidMaxBufferSize(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: "/tmp/a.sock"}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: false}, + ClickHouse: ClickHouseOutputConfig{ + Enabled: true, + DSN: "clickhouse://localhost:9000/db", + Table: "test", + BatchSize: 100, + MaxBufferSize: 0, + }, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for invalid max buffer size") + } +} + +func TestValidate_ClickHouseInvalidTimeout(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: "/tmp/a.sock"}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: false}, + ClickHouse: ClickHouseOutputConfig{ + Enabled: true, + DSN: "clickhouse://localhost:9000/db", + Table: "test", + BatchSize: 100, + TimeoutMs: 0, + }, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for invalid timeout") + } +} + +func TestValidate_EmptyCorrelationKey(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: "/tmp/a.sock"}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: true}, + }, + Correlation: CorrelationConfig{ + Key: []string{}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for empty correlation key") + } +} + +func TestValidate_InvalidTimeWindow(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: "/tmp/a.sock"}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: true}, + }, + Correlation: CorrelationConfig{ + Key: []string{"src_ip", "src_port"}, + TimeWindow: TimeWindowConfig{Value: 0}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for invalid time window") + } +} + +func TestGetTimeWindow_UnknownUnit(t *testing.T) { + config := CorrelationConfig{ + TimeWindow: TimeWindowConfig{Value: 5, Unit: "unknown"}, + } + result := config.GetTimeWindow() + expected := 5 * time.Second // Should default to seconds + if result != expected { + t.Errorf("expected %v, got %v", expected, result) + } +} diff --git a/internal/domain/correlation_service_test.go b/internal/domain/correlation_service_test.go index 312f538..68fcb94 100644 --- a/internal/domain/correlation_service_test.go +++ b/internal/domain/correlation_service_test.go @@ -155,3 +155,188 @@ func TestCorrelationService_Flush(t *testing.T) { t.Errorf("expected 0 flushed events, got %d", len(flushed)) } } + +func TestCorrelationService_GetBufferSizes(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + } + + svc := NewCorrelationService(config, timeProvider) + + // Empty buffers + a, b := svc.GetBufferSizes() + if a != 0 || b != 0 { + t.Errorf("expected empty buffers, got A=%d, B=%d", a, b) + } + + // Add event to buffer A + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + svc.ProcessEvent(apacheEvent) + + a, b = svc.GetBufferSizes() + if a != 1 || b != 0 { + t.Errorf("expected A=1, B=0, got A=%d, B=%d", a, b) + } +} + +func TestCorrelationService_FlushWithEvents(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + // Flush only emits events if ApacheAlwaysEmit and NetworkEmit are true + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: true, + } + svc := NewCorrelationService(config, timeProvider) + + // We need to bypass the normal ProcessEvent logic to get events into buffers + // Add events directly to buffers for testing Flush + keyA := "192.168.1.1:8080" + keyB := "192.168.1.2:9090" + + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "192.168.1.2", + SrcPort: 9090, + } + + // Manually add to buffers (simulating events that couldn't be matched) + elemA := svc.bufferA.events.PushBack(apacheEvent) + svc.pendingA[keyA] = append(svc.pendingA[keyA], elemA) + + elemB := svc.bufferB.events.PushBack(networkEvent) + svc.pendingB[keyB] = append(svc.pendingB[keyB], elemB) + + flushed := svc.Flush() + if len(flushed) != 2 { + t.Errorf("expected 2 flushed events, got %d", len(flushed)) + } + + // Verify buffers are cleared + a, b := svc.GetBufferSizes() + if a != 0 || b != 0 { + t.Errorf("expected empty buffers after flush, got A=%d, B=%d", a, b) + } +} + +func TestCorrelationService_BufferOverflow(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + MaxBufferSize: 2, + } + + svc := NewCorrelationService(config, timeProvider) + + // Fill buffer A + for i := 0; i < 2; i++ { + event := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080 + i, + } + svc.ProcessEvent(event) + } + + // Buffer full, next event should be dropped (not emitted since ApacheAlwaysEmit=false but buffer full) + overflowEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 9999, + } + results := svc.ProcessEvent(overflowEvent) + if len(results) != 0 { + t.Errorf("expected 0 results on buffer overflow, got %d", len(results)) + } +} + +func TestCorrelationService_DefaultConfig(t *testing.T) { + timeProvider := &RealTimeProvider{} + + // Test with zero config - should use defaults + config := CorrelationConfig{} + svc := NewCorrelationService(config, timeProvider) + + if svc.config.MaxBufferSize != DefaultMaxBufferSize { + t.Errorf("expected MaxBufferSize %d, got %d", DefaultMaxBufferSize, svc.config.MaxBufferSize) + } + if svc.config.TimeWindow != DefaultTimeWindow { + t.Errorf("expected TimeWindow %v, got %v", DefaultTimeWindow, svc.config.TimeWindow) + } +} + +func TestCorrelationService_NilTimeProvider(t *testing.T) { + config := CorrelationConfig{ + TimeWindow: time.Second, + } + + // Should not panic with nil time provider + svc := NewCorrelationService(config, nil) + if svc == nil { + t.Error("expected non-nil service") + } +} + +func TestCorrelationService_DifferentSourceTypes(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + } + + svc := NewCorrelationService(config, timeProvider) + + // Send B first, then A - should still match + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + results := svc.ProcessEvent(networkEvent) + if len(results) != 0 { + t.Errorf("expected 0 results (buffered B), got %d", len(results)) + } + + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now.Add(500 * time.Millisecond), + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + results = svc.ProcessEvent(apacheEvent) + if len(results) != 1 { + t.Errorf("expected 1 result (correlated), got %d", len(results)) + } + if !results[0].Correlated { + t.Error("expected correlated result") + } +}