From 8fc14c1e94cfaf10d10d6fb30b37281fc62047af Mon Sep 17 00:00:00 2001 From: Jacquin Antoine Date: Fri, 27 Feb 2026 15:31:46 +0100 Subject: [PATCH] Initial commit: logcorrelator with unified packaging (DEB + RPM using fpm) Co-authored-by: Qwen-Coder --- .github/workflows/ci.yml | 73 +++ .gitignore | 31 ++ Dockerfile | 150 +++++ Dockerfile.package | 125 +++++ README.md | 278 ++++++++++ architecture.yml | 521 ++++++++++++++++++ build.sh | 75 +++ config.example.conf | 41 ++ go.mod | 3 + go.sum | 0 .../adapters/inbound/unixsocket/source.go | 334 +++++++++++ .../inbound/unixsocket/source_test.go | 98 ++++ internal/adapters/outbound/clickhouse/sink.go | 333 +++++++++++ .../adapters/outbound/clickhouse/sink_test.go | 305 ++++++++++ internal/adapters/outbound/file/sink.go | 168 ++++++ internal/adapters/outbound/file/sink_test.go | 96 ++++ internal/adapters/outbound/multi/sink.go | 123 +++++ internal/adapters/outbound/multi/sink_test.go | 114 ++++ internal/app/orchestrator.go | 158 ++++++ internal/app/orchestrator_test.go | 160 ++++++ internal/config/config.go | 340 ++++++++++++ internal/config/config_test.go | 224 ++++++++ internal/domain/correlated_log.go | 90 +++ internal/domain/correlated_log_test.go | 115 ++++ internal/domain/correlation_service.go | 243 ++++++++ internal/domain/correlation_service_test.go | 153 +++++ internal/domain/event.go | 37 ++ internal/observability/logger.go | 85 +++ internal/observability/logger_test.go | 111 ++++ internal/ports/source.go | 54 ++ logcorrelator.service | 23 + packaging/deb/postinst | 66 +++ packaging/deb/postrm | 52 ++ packaging/deb/prerm | 29 + test.sh | 21 + 35 files changed, 4829 insertions(+) create mode 100644 .github/workflows/ci.yml create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 Dockerfile.package create mode 100644 README.md create mode 100644 architecture.yml create mode 100755 build.sh create mode 100644 config.example.conf create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/adapters/inbound/unixsocket/source.go create mode 100644 internal/adapters/inbound/unixsocket/source_test.go create mode 100644 internal/adapters/outbound/clickhouse/sink.go create mode 100644 internal/adapters/outbound/clickhouse/sink_test.go create mode 100644 internal/adapters/outbound/file/sink.go create mode 100644 internal/adapters/outbound/file/sink_test.go create mode 100644 internal/adapters/outbound/multi/sink.go create mode 100644 internal/adapters/outbound/multi/sink_test.go create mode 100644 internal/app/orchestrator.go create mode 100644 internal/app/orchestrator_test.go create mode 100644 internal/config/config.go create mode 100644 internal/config/config_test.go create mode 100644 internal/domain/correlated_log.go create mode 100644 internal/domain/correlated_log_test.go create mode 100644 internal/domain/correlation_service.go create mode 100644 internal/domain/correlation_service_test.go create mode 100644 internal/domain/event.go create mode 100644 internal/observability/logger.go create mode 100644 internal/observability/logger_test.go create mode 100644 internal/ports/source.go create mode 100644 logcorrelator.service create mode 100644 packaging/deb/postinst create mode 100644 packaging/deb/postrm create mode 100644 packaging/deb/prerm create mode 100755 test.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..3bcea61 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,73 @@ +name: Build and Test + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.21' + + - name: Download dependencies + run: go mod download + + - name: Run tests with coverage + run: | + go test -race -coverprofile=coverage.txt -covermode=atomic ./... + TOTAL=$(go tool cover -func=coverage.txt | grep total | awk '{gsub(/%/, "", $3); print $3}') + echo "Coverage: ${TOTAL}%" + if (( $(echo "$TOTAL < 80" | bc -l) )); then + echo "Coverage ${TOTAL}% is below 80% threshold" + exit 1 + fi + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + file: ./coverage.txt + + build: + runs-on: ubuntu-latest + needs: test + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.21' + + - name: Build binary + run: | + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ + -ldflags="-w -s" \ + -o logcorrelator \ + ./cmd/logcorrelator + + - name: Upload binary artifact + uses: actions/upload-artifact@v4 + with: + name: logcorrelator-linux-amd64 + path: logcorrelator + + docker: + runs-on: ubuntu-latest + needs: test + steps: + - uses: actions/checkout@v4 + + - name: Build Docker image + run: docker build -t logcorrelator:latest . + + - name: Run tests in Docker + run: | + docker run --rm logcorrelator:latest --help || true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6c133f8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,31 @@ +# Build directory +/build/ +/dist/ + +# Binaries +*.exe +*.exe~ +*.dll +*.so +*.dylib +logcorrelator + +# Test binary +*.test + +# Output of the go coverage tool +*.out + +# Dependency directories +vendor/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..62c20fd --- /dev/null +++ b/Dockerfile @@ -0,0 +1,150 @@ +# syntax=docker/dockerfile:1 + +# ============================================================================= +# Builder stage - compile and test +# ============================================================================= +FROM golang:1.21 AS builder + +WORKDIR /build + +# Install dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + git \ + bc \ + && rm -rf /var/lib/apt/lists/* + +# Copy go mod files +COPY go.mod ./ + +# Download dependencies +RUN go mod download || true + +# Copy source code +COPY . . + +# Run tests with coverage (fail if < 80%) +RUN --mount=type=cache,target=/root/.cache/go-build \ + go test -race -coverprofile=coverage.txt -covermode=atomic ./... && \ + echo "=== Coverage Report ===" && \ + go tool cover -func=coverage.txt | grep total && \ + TOTAL=$(go tool cover -func=coverage.txt | grep total | awk '{gsub(/%/, "", $3); print $3}') && \ + echo "Total coverage: ${TOTAL}%" && \ + if (( $(echo "$TOTAL < 80" | bc -l) )); then \ + echo "ERROR: Coverage ${TOTAL}% is below 80% threshold"; \ + exit 1; \ + fi && \ + echo "Coverage check passed!" + +# Build binary +RUN --mount=type=cache,target=/root/.cache/go-build \ + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ + -ldflags="-w -s" \ + -o /usr/bin/logcorrelator \ + ./cmd/logcorrelator + +# Create runtime root filesystem +RUN mkdir -p /tmp/runtime-root/var/log/logcorrelator /tmp/runtime-root/var/run/logcorrelator /tmp/runtime-root/etc/logcorrelator + +# ============================================================================= +# Runtime stage - minimal image for running the service +# ============================================================================= +FROM gcr.io/distroless/base-debian12 AS runtime + +# Copy binary from builder +COPY --from=builder /usr/bin/logcorrelator /usr/bin/logcorrelator + +# Copy example config +COPY --from=builder /build/config.example.conf /etc/logcorrelator/logcorrelator.conf + +# Create necessary directories in builder stage (distroless has no shell) +COPY --from=builder /tmp/runtime-root/var /var +COPY --from=builder /tmp/runtime-root/etc /etc + +# Expose nothing (Unix sockets only) +# Health check not applicable for this service type + +# Set entrypoint +ENTRYPOINT ["/usr/bin/logcorrelator"] +CMD ["-config", "/etc/logcorrelator/logcorrelator.conf"] + +# ============================================================================= +# RPM build stage - create .rpm package entirely in Docker +# ============================================================================= +FROM ruby:3.2-bookworm AS rpm-builder + +WORKDIR /build + +# Install fpm and rpm tools +RUN apt-get update && apt-get install -y --no-install-recommends \ + rpm \ + && rm -rf /var/lib/apt/lists/* \ + && gem install fpm -v 1.16.0 + +# Copy binary from builder stage +COPY --from=builder /usr/bin/logcorrelator /tmp/pkgroot/usr/bin/logcorrelator + +# Copy config and systemd unit +COPY --from=builder /build/config.example.conf /tmp/pkgroot/etc/logcorrelator/logcorrelator.conf +COPY logcorrelator.service /tmp/pkgroot/etc/systemd/system/logcorrelator.service + +# Create directory structure and set permissions +RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ + mkdir -p /tmp/pkgroot/var/run/logcorrelator && \ + chmod 755 /tmp/pkgroot/var/log/logcorrelator && \ + chmod 755 /tmp/pkgroot/var/run/logcorrelator + +# Build RPM +ARG VERSION=1.0.0 +RUN fpm -s dir -t rpm \ + -n logcorrelator \ + -v ${VERSION} \ + -C /tmp/pkgroot \ + --prefix / \ + --description "Log correlation service for HTTP and network events" \ + --url "https://github.com/logcorrelator/logcorrelator" \ + --license "MIT" \ + --vendor "logcorrelator" \ + -p /tmp/logcorrelator-${VERSION}.rpm \ + usr/bin/logcorrelator \ + etc/logcorrelator/logcorrelator.conf \ + etc/systemd/system/logcorrelator.service \ + var/log/logcorrelator \ + var/run/logcorrelator + +# ============================================================================= +# Test stage - verify RPM on Rocky Linux +# ============================================================================= +FROM rockylinux:8 AS rpm-test + +# Install systemd (for testing service unit) +RUN dnf install -y systemd && \ + dnf clean all + +# Copy RPM from rpm-builder +COPY --from=rpm-builder /tmp/logcorrelator-*.rpm /tmp/ + +# Install the RPM +RUN rpm -ivh /tmp/logcorrelator-*.rpm || true + +# Verify installation +RUN ls -la /usr/bin/logcorrelator && \ + ls -la /etc/logcorrelator/ && \ + ls -la /etc/systemd/system/logcorrelator.service + +# ============================================================================= +# Development stage - for local testing with hot reload +# ============================================================================= +FROM golang:1.21 AS dev + +WORKDIR /app + +# Install air for hot reload (optional) +RUN go install github.com/air-verse/air@latest + +COPY go.mod ./ +RUN go mod download || true + +COPY . . + +# Default command: run with example config +CMD ["go", "run", "./cmd/logcorrelator", "-config", "config.example.conf"] diff --git a/Dockerfile.package b/Dockerfile.package new file mode 100644 index 0000000..642c0f1 --- /dev/null +++ b/Dockerfile.package @@ -0,0 +1,125 @@ +# syntax=docker/dockerfile:1 +# ============================================================================= +# logcorrelator - Dockerfile de packaging unifié (DEB + RPM avec fpm) +# ============================================================================= + +# ============================================================================= +# Stage 1: Builder - Compilation du binaire Go +# ============================================================================= +FROM golang:1.21 AS builder + +WORKDIR /build + +# Install dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + git \ + && rm -rf /var/lib/apt/lists/* + +# Copy go mod files +COPY go.mod go.sum ./ +RUN go mod download + +# Copy source code +COPY . . + +# Build binary for Linux +ARG VERSION=1.0.0 +RUN mkdir -p dist && \ + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build -ldflags="-w -s" \ + -o dist/logcorrelator \ + ./cmd/logcorrelator + +# ============================================================================= +# Stage 2: Package builder - fpm pour DEB et RPM +# ============================================================================= +FROM ruby:3.2-bookworm AS package-builder + +WORKDIR /package + +# Install fpm and dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + rpm \ + dpkg-dev \ + fakeroot \ + && rm -rf /var/lib/apt/lists/* \ + && gem install fpm -v 1.16.0 + +# Copy binary from builder +COPY --from=builder /build/dist/logcorrelator /tmp/pkgroot/usr/bin/logcorrelator +COPY --from=builder /build/config.example.conf /tmp/pkgroot/etc/logcorrelator/logcorrelator.conf +COPY --from=builder /build/config.example.conf /tmp/pkgroot/usr/share/logcorrelator/logcorrelator.conf.example + +# Create directories and set permissions +RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ + mkdir -p /tmp/pkgroot/var/run/logcorrelator && \ + chmod 755 /tmp/pkgroot/usr/bin/logcorrelator && \ + chmod 640 /tmp/pkgroot/etc/logcorrelator/logcorrelator.conf && \ + chmod 640 /tmp/pkgroot/usr/share/logcorrelator/logcorrelator.conf.example && \ + chmod 755 /tmp/pkgroot/var/log/logcorrelator && \ + chmod 755 /tmp/pkgroot/var/run/logcorrelator + +# Copy maintainer scripts +COPY packaging/deb/postinst /tmp/scripts/postinst +COPY packaging/deb/prerm /tmp/scripts/prerm +COPY packaging/deb/postrm /tmp/scripts/postrm +RUN chmod 755 /tmp/scripts/* + +# Build DEB package +ARG VERSION=1.0.0 +ARG ARCH=amd64 +RUN mkdir -p /packages/deb && \ + fpm -s dir -t deb \ + -n logcorrelator \ + -v "${VERSION}" \ + -C /tmp/pkgroot \ + --architecture "${ARCH}" \ + --description "Log correlation service for HTTP and network events" \ + --url "https://github.com/logcorrelator/logcorrelator" \ + --license "MIT" \ + --vendor "logcorrelator " \ + --maintainer "logcorrelator " \ + --depends "systemd" \ + --after-install /tmp/scripts/postinst \ + --before-remove /tmp/scripts/prerm \ + --after-remove /tmp/scripts/postrm \ + -p /packages/deb/logcorrelator_${VERSION}_${ARCH}.deb \ + usr/bin/logcorrelator \ + etc/logcorrelator/logcorrelator.conf \ + usr/share/logcorrelator/logcorrelator.conf.example \ + var/log/logcorrelator \ + var/run/logcorrelator + +# Build RPM package +ARG DIST=el8 +RUN mkdir -p /packages/rpm && \ + fpm -s dir -t rpm \ + -n logcorrelator \ + -v "${VERSION}" \ + -C /tmp/pkgroot \ + --architecture "x86_64" \ + --description "Log correlation service for HTTP and network events" \ + --url "https://github.com/logcorrelator/logcorrelator" \ + --license "MIT" \ + --vendor "logcorrelator " \ + --depends "systemd" \ + --after-install /tmp/scripts/postinst \ + --before-remove /tmp/scripts/prerm \ + --after-remove /tmp/scripts/postrm \ + -p /packages/rpm/logcorrelator-${VERSION}-1.x86_64.rpm \ + usr/bin/logcorrelator \ + etc/logcorrelator/logcorrelator.conf \ + usr/share/logcorrelator/logcorrelator.conf.example \ + var/log/logcorrelator \ + var/run/logcorrelator + +# ============================================================================= +# Stage 3: Output - Image finale avec les packages +# ============================================================================= +FROM alpine:latest AS output + +WORKDIR /packages +COPY --from=package-builder /packages/deb/*.deb /packages/deb/ +COPY --from=package-builder /packages/rpm/*.rpm /packages/rpm/ + +CMD ["sh", "-c", "echo '=== DEB Packages ===' && ls -la /packages/deb/ && echo '' && echo '=== RPM Packages ===' && ls -la /packages/rpm/"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..c9c9d01 --- /dev/null +++ b/README.md @@ -0,0 +1,278 @@ +# logcorrelator + +Service de corrélation de logs HTTP et réseau écrit en Go. + +## Description + +**logcorrelator** reçoit deux flux de logs JSON via des sockets Unix : +- **Source A** : logs HTTP applicatifs (Apache, reverse proxy) +- **Source B** : logs réseau (métadonnées IP/TCP, JA3/JA4, etc.) + +Il corrèle les événements sur la base de `src_ip + src_port` avec une fenêtre temporelle configurable, et produit des logs corrélés vers : +- Un fichier local (JSON lines) +- ClickHouse (pour analyse et archivage) + +## Architecture + +``` +┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ +│ Apache Source │────▶│ │────▶│ File Sink │ +│ (Unix Socket) │ │ Correlation │ │ (JSON lines) │ +└─────────────────┘ │ Service │ └─────────────────┘ + │ │ +┌─────────────────┐ │ - Buffers │ ┌─────────────────┐ +│ Network Source │────▶│ - Time Window │────▶│ ClickHouse │ +│ (Unix Socket) │ │ - Orphan Policy │ │ Sink │ +└─────────────────┘ └──────────────────┘ └─────────────────┘ +``` + +## Build (100% Docker) + +Tout le build et les tests s'exécutent dans des containers Docker : + +```bash +# Build complet (binaire + tests + RPM) +./build.sh + +# Uniquement les tests +./test.sh + +# Build manuel avec Docker +docker build --target builder -t logcorrelator-builder . +docker build --target runtime -t logcorrelator:latest . +``` + +### Prérequis + +- Docker 20.10+ +- Bash + +## Installation + +### Depuis Docker + +```bash +# Build de l'image +./build.sh + +# Exécuter +docker run -d \ + --name logcorrelator \ + -v /var/run/logcorrelator:/var/run/logcorrelator \ + -v /var/log/logcorrelator:/var/log/logcorrelator \ + -v ./config.conf:/etc/logcorrelator/logcorrelator.conf \ + logcorrelator:latest +``` + +### Depuis le package RPM (Rocky Linux 8+) + +```bash +# Générer le RPM +./build.sh + +# Installer le package +sudo rpm -ivh dist/logcorrelator-1.0.0.rpm + +# Activer et démarrer le service +sudo systemctl enable logcorrelator +sudo systemctl start logcorrelator + +# Vérifier le statut +sudo systemctl status logcorrelator +``` + +### Build manuel (sans Docker) + +```bash +# Prérequis: Go 1.21+ +go build -o logcorrelator ./cmd/logcorrelator + +# Exécuter +./logcorrelator -config config.example.conf +``` + +## Configuration + +La configuration utilise un fichier texte simple avec des directives : + +```bash +# Format: directive value [value...] +# Lignes starting with # sont des commentaires + +service.name logcorrelator +service.language go + +# Inputs (au moins 2 requis) +input.unix_socket apache_source /var/run/logcorrelator/apache.sock json +input.unix_socket network_source /var/run/logcorrelator/network.sock json + +# Outputs +output.file.enabled true +output.file.path /var/log/logcorrelator/correlated.log + +output.clickhouse.enabled false +output.clickhouse.dsn clickhouse://user:pass@localhost:9000/db +output.clickhouse.table correlated_logs_http_network +output.clickhouse.batch_size 500 +output.clickhouse.flush_interval_ms 200 + +# Corrélation +correlation.key src_ip,src_port +correlation.time_window.value 1 +correlation.time_window.unit s + +# Politique des orphelins +correlation.orphan_policy.apache_always_emit true +correlation.orphan_policy.network_emit false +``` + +### Directives disponibles + +| Directive | Description | Défaut | +|-----------|-------------|--------| +| `service.name` | Nom du service | `logcorrelator` | +| `service.language` | Langage | `go` | +| `input.unix_socket` | Socket Unix (name path [format]) | Requis | +| `output.file.enabled` | Activer sortie fichier | `true` | +| `output.file.path` | Chemin fichier | `/var/log/logcorrelator/correlated.log` | +| `output.clickhouse.enabled` | Activer ClickHouse | `false` | +| `output.clickhouse.dsn` | DSN ClickHouse | - | +| `output.clickhouse.table` | Table ClickHouse | - | +| `output.clickhouse.batch_size` | Taille batch | `500` | +| `output.clickhouse.flush_interval_ms` | Intervalle flush | `200` | +| `output.clickhouse.max_buffer_size` | Buffer max | `5000` | +| `output.clickhouse.drop_on_overflow` | Drop si overflow | `true` | +| `output.stdout.enabled` | Sortie stdout (debug) | `false` | +| `correlation.key` | Clés de corrélation | `src_ip,src_port` | +| `correlation.time_window.value` | Valeur fenêtre | `1` | +| `correlation.time_window.unit` | Unité (ms/s/m) | `s` | +| `correlation.orphan_policy.apache_always_emit` | Émettre A seul | `true` | +| `correlation.orphan_policy.network_emit` | Émettre B seul | `false` | + +## Format des logs + +### Source A (HTTP) + +```json +{ + "src_ip": "192.168.1.1", + "src_port": 8080, + "dst_ip": "10.0.0.1", + "dst_port": 80, + "timestamp": 1704110400000000000, + "method": "GET", + "path": "/api/test", + "header_host": "example.com" +} +``` + +### Source B (Réseau) + +```json +{ + "src_ip": "192.168.1.1", + "src_port": 8080, + "dst_ip": "10.0.0.1", + "dst_port": 443, + "ja3": "abc123def456", + "ja4": "xyz789" +} +``` + +### Log corrélé (sortie) + +```json +{ + "timestamp": "2024-01-01T12:00:00Z", + "src_ip": "192.168.1.1", + "src_port": 8080, + "dst_ip": "10.0.0.1", + "dst_port": 80, + "correlated": true, + "apache": {"method": "GET", "path": "/api/test"}, + "network": {"ja3": "abc123def456"} +} +``` + +## Schema ClickHouse + +```sql +CREATE TABLE correlated_logs_http_network ( + timestamp DateTime64(9), + src_ip String, + src_port UInt32, + dst_ip String, + dst_port UInt32, + correlated UInt8, + orphan_side String, + apache JSON, + network JSON +) ENGINE = MergeTree() +ORDER BY (timestamp, src_ip, src_port); +``` + +## Tests + +```bash +# Via Docker +./test.sh + +# Local +go test ./... +go test -cover ./... +go test -coverprofile=coverage.out ./... +go tool cover -html=coverage.out +``` + +## Signaux + +| Signal | Comportement | +|--------|--------------| +| `SIGINT` | Arrêt gracieux | +| `SIGTERM` | Arrêt gracieux | + +Lors de l'arrêt gracieux : +1. Fermeture des sockets Unix +2. Flush des buffers +3. Émission des événements en attente +4. Fermeture des connexions ClickHouse + +## Logs internes + +Les logs internes sont envoyés vers stderr : + +```bash +# Docker +docker logs -f logcorrelator + +# Systemd +journalctl -u logcorrelator -f +``` + +## Structure du projet + +``` +. +├── cmd/logcorrelator/ # Point d'entrée +├── internal/ +│ ├── adapters/ +│ │ ├── inbound/unixsocket/ +│ │ └── outbound/ +│ │ ├── clickhouse/ +│ │ ├── file/ +│ │ └── multi/ +│ ├── app/ # Orchestration +│ ├── config/ # Configuration +│ ├── domain/ # Domaine (corrélation) +│ ├── observability/ # Logging +│ └── ports/ # Interfaces +├── config.example.conf # Exemple de config +├── Dockerfile # Build multi-stage +├── build.sh # Script de build +├── test.sh # Script de tests +└── logcorrelator.service # Unité systemd +``` + +## License + +MIT diff --git a/architecture.yml b/architecture.yml new file mode 100644 index 0000000..e5bc906 --- /dev/null +++ b/architecture.yml @@ -0,0 +1,521 @@ +service: + name: logcorrelator + context: http-network-correlation + language: go + pattern: hexagonal + description: > + logcorrelator est un service système (lancé par systemd) écrit en Go, chargé + de recevoir deux flux de logs JSON via des sockets Unix, de corréler les + événements HTTP applicatifs (source A, typiquement Apache ou reverse proxy) + avec des événements réseau (source B, métadonnées IP/TCP, JA3/JA4, etc.) + sur la base de la combinaison strictement définie src_ip + src_port, avec + une fenêtre temporelle configurable. Le service produit un log corrélé + unique pour chaque paire correspondante, émet toujours les événements A + même lorsqu’aucun événement B corrélé n’est disponible, n’émet jamais de + logs B seuls, et pousse les logs agrégés en temps quasi réel vers + ClickHouse et/ou un fichier local, en minimisant la rétention en mémoire + et sur disque. + +runtime: + deployment: + unit_type: systemd + description: > + logcorrelator est livré sous forme de binaire autonome, exécuté comme un + service systemd. L’unité systemd assure le démarrage automatique au boot, + le redémarrage en cas de crash, et une intégration standard dans l’écosystème + Linux (notamment sur Rocky Linux 8+). + binary_path: /usr/bin/logcorrelator + config_path: /etc/logcorrelator/logcorrelator.toml + user: logcorrelator + group: logcorrelator + restart: on-failure + systemd_unit: + path: /etc/systemd/system/logcorrelator.service + content_example: | + [Unit] + Description=logcorrelator service + After=network.target + + [Service] + Type=simple + User=logcorrelator + Group=logcorrelator + ExecStart=/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.toml + Restart=on-failure + RestartSec=5 + + [Install] + WantedBy=multi-user.target + os: + supported: + - rocky-linux-8+ + - rocky-linux-9+ + - autres-linux-recentes + logs: + stdout_stderr: journald + structured: true + description: > + Les logs internes du service (erreurs, messages d’information) sont envoyés + vers stdout/stderr et collectés par journald. Ils sont structurés et ne + contiennent pas de données personnelles. + signals: + graceful_shutdown: + - SIGINT + - SIGTERM + description: > + En réception de SIGINT ou SIGTERM, le service arrête proprement la lecture + des sockets Unix, vide les buffers d’envoi (dans les limites de la politique + de drop), ferme les connexions ClickHouse puis s’arrête. + +config: + format: toml + location: /etc/logcorrelator/logcorrelator.toml + description: > + Toute la configuration est centralisée dans un fichier TOML lisible, stocké + dans /etc/logcorrelator. Ni YAML ni JSON ne sont utilisés pour la config. + reload_strategy: restart_service + example: | + [service] + name = "logcorrelator" + language = "go" + + [[inputs.unix_sockets]] + name = "apache_source" + path = "/var/run/logcorrelator/apache.sock" + format = "json" + + [[inputs.unix_sockets]] + name = "network_source" + path = "/var/run/logcorrelator/network.sock" + format = "json" + + [outputs.file] + enabled = true + path = "/var/log/logcorrelator/correlated.log" + + [outputs.clickhouse] + enabled = true + dsn = "clickhouse://user:pass@host:9000/db" + table = "correlated_logs_http_network" + batch_size = 500 + flush_interval_ms = 200 + max_buffer_size = 5000 + drop_on_overflow = true + async_insert = true + + [correlation] + key = ["src_ip", "src_port"] + + [correlation.time_window] + value = 1 + unit = "s" + + [correlation.orphan_policy] + apache_always_emit = true + network_emit = false + +inputs: + description: > + Le service consomme deux flux de logs JSON via des sockets Unix. Le schéma + exact des logs pour chaque source est flexible et peut évoluer. Seuls + quelques champs sont nécessaires pour la corrélation. + unix_sockets: + - name: apache_source + id: A + description: > + Source A, destinée aux logs HTTP applicatifs (Apache, reverse proxy, etc.). + Le schéma JSON est variable, avec un champ timestamp numérique obligatoire + et des champs header_* dynamiques. + path: /var/run/logcorrelator/apache.sock + protocol: unix + mode: stream + format: json + framing: line + retry_on_error: true + - name: network_source + id: B + description: > + Source B, destinée aux logs réseau (métadonnées IP/TCP, JA3/JA4, etc.). + Le schéma JSON est variable ; seuls src_ip et src_port sont requis. + path: /var/run/logcorrelator/network.sock + protocol: unix + mode: stream + format: json + framing: line + retry_on_error: true + +outputs: + description: > + Les logs corrélés sont envoyés vers un ou plusieurs sinks. MultiSink permet + de diffuser chaque log corrélé vers plusieurs destinations (fichier, + ClickHouse, stdout…). + sinks: + file: + enabled: true + description: > + Sink vers fichier local, utile pour debug ou archivage local. Écrit un + JSON par ligne dans le chemin configuré. Rotation gérée par logrotate + ou équivalent. + path: /var/log/logcorrelator/correlated.log + format: json_lines + rotate_managed_by: external + clickhouse: + enabled: true + description: > + Sink principal pour l’archivage et l’analyse en temps quasi réel. Les + logs corrélés sont insérés en batch dans ClickHouse avec un small buffer + et des inserts asynchrones. En cas de saturation ou d’indisponibilité + ClickHouse, les logs sont drop pour éviter de saturer la machine locale. + dsn: clickhouse://user:pass@host:9000/db + table: correlated_logs_http_network + batch_size: 500 + flush_interval_ms: 200 + max_buffer_size: 5000 + drop_on_overflow: true + async_insert: true + timeout_ms: 1000 + stdout: + enabled: false + description: > + Sink optionnel vers stdout pour les tests et le développement. + +correlation: + description: > + Corrélation strictement basée sur src_ip + src_port et une fenêtre temporelle + configurable. Aucun autre champ (dst_ip, dst_port, JA3/JA4, headers HTTP...) + n’est utilisé pour la décision de corrélation. + key: + - src_ip + - src_port + time_window: + value: 1 + unit: s + description: > + Fenêtre de temps symétrique appliquée aux timestamps de A et B. Deux + événements sont corrélés si |tA - tB| <= time_window. La valeur et l’unité + sont définies dans le TOML. + timestamp_source: + apache: field_timestamp + network: reception_time + description: > + Pour A, utilisation du champ numérique "timestamp" (epoch ns). Pour B, + utilisation du temps de réception local. + orphan_policy: + apache_always_emit: true + network_emit: false + description: > + A est toujours émis (même sans B) avec correlated=false et orphan_side="A". + B n’est jamais émis seul. + matching: + mode: one_to_one_first_match + description: > + Stratégie 1‑à‑1, premier match : lors de l’arrivée d’un événement, on + cherche le premier événement compatible dans le buffer de l’autre source. + Les autres restent en attente ou expirent. + +schema: + description: > + Les schémas des sources A et B sont variables. Le service impose seulement + quelques champs obligatoires nécessaires à la corrélation et accepte des + champs supplémentaires sans modification de code. + source_A: + description: > + Logs HTTP applicatifs (Apache/reverse proxy) au format JSON. Schéma + variable, avec champs obligatoires pour corrélation (src_ip, src_port, + timestamp) et collecte des autres champs dans des maps. + required_fields: + - name: src_ip + type: string + description: Adresse IP source client. + - name: src_port + type: int + description: Port source client. + - name: timestamp + type: int64 + unit: ns + description: Timestamp de référence pour la corrélation. + optional_fields: + - name: time + type: string + format: rfc3339 + - name: dst_ip + type: string + - name: dst_port + type: int + - name: method + type: string + - name: path + type: string + - name: host + type: string + - name: http_version + type: string + dynamic_fields: + - pattern: header_* + target_map: headers + description: > + Tous les champs header_* sont collectés dans headers[clé] = valeur. + - pattern: "*" + target_map: extra + description: > + Tous les champs non reconnus explicitement vont dans extra. + source_B: + description: > + Logs réseau JSON (IP/TCP, JA3/JA4...). Schéma variable. src_ip et src_port + sont obligatoires pour la corrélation, le reste est libre. + required_fields: + - name: src_ip + type: string + - name: src_port + type: int + optional_fields: + - name: dst_ip + type: string + - name: dst_port + type: int + dynamic_fields: + - pattern: "*" + target_map: extra + description: > + Tous les autres champs (ip_meta_*, tcp_meta_*, ja3, ja4, etc.) sont + rangés dans extra. + + normalized_event: + description: > + Représentation interne unifiée des événements A/B sur laquelle opère la + logique de corrélation. + fields: + - name: source + type: enum("A","B") + - name: timestamp + type: time.Time + - name: src_ip + type: string + - name: src_port + type: int + - name: dst_ip + type: string + optional: true + - name: dst_port + type: int + optional: true + - name: headers + type: map[string]string + optional: true + - name: extra + type: map[string]any + description: Champs additionnels provenant de A ou B. + + correlated_log: + description: > + Structure du log corrélé émis vers les sinks (fichier, ClickHouse). Contient + les informations de corrélation, les infos communes et les contenus de A/B. + fields: + - name: timestamp + type: time.Time + - name: src_ip + type: string + - name: src_port + type: int + - name: dst_ip + type: string + optional: true + - name: dst_port + type: int + optional: true + - name: correlated + type: bool + - name: orphan_side + type: string + - name: apache + type: map[string]any + optional: true + - name: network + type: map[string]any + optional: true + - name: extra + type: map[string]any + description: Champs dérivés éventuels. + +clickhouse_schema: + strategy: external_ddls + description: > + logcorrelator ne gère pas les ALTER TABLE. La table ClickHouse doit être + créée/modifiée en dehors du service. logcorrelator remplit les colonnes + existantes qu’il connaît et met NULL si un champ manque. + base_columns: + - name: timestamp + type: DateTime64(9) + - name: src_ip + type: String + - name: src_port + type: UInt32 + - name: dst_ip + type: String + - name: dst_port + type: UInt32 + - name: correlated + type: UInt8 + - name: orphan_side + type: String + - name: apache + type: JSON + - name: network + type: JSON + dynamic_fields: + mode: map_or_additional_columns + description: > + Les champs dynamiques peuvent être exposés via colonnes dédiées créées par + migration, ou via Map/JSON. + +architecture: + description: > + Architecture hexagonale : domaine de corrélation indépendant, ports + abstraits pour les sources/sinks, adaptateurs pour sockets Unix, fichier et + ClickHouse, couche application d’orchestration, et modules infra pour + config/observabilité. + modules: + - name: cmd/logcorrelator + type: entrypoint + responsibilities: + - Chargement configuration TOML. + - Initialisation des adaptateurs d’entrée/sortie. + - Création du CorrelationService. + - Démarrage de l’orchestrateur. + - Gestion du cycle de vie (signaux systemd). + - name: internal/domain + type: domain + responsibilities: + - Modèles NormalizedEvent et CorrelatedLog. + - Implémentation de CorrelationService (buffers, fenêtre, + orphelins). + - name: internal/ports + type: ports + responsibilities: + - EventSource, CorrelatedLogSink, TimeProvider. + - name: internal/app + type: application + responsibilities: + - Orchestrator : relier EventSource → CorrelationService → MultiSink. + - name: internal/adapters/inbound/unixsocket + type: adapter_inbound + responsibilities: + - Lecture sockets Unix + parsing JSON → NormalizedEvent. + - name: internal/adapters/outbound/file + type: adapter_outbound + responsibilities: + - Écriture fichier JSON lines. + - name: internal/adapters/outbound/clickhouse + type: adapter_outbound + responsibilities: + - Bufferisation + inserts batch vers ClickHouse. + - Application de drop_on_overflow. + - name: internal/adapters/outbound/multi + type: adapter_outbound + responsibilities: + - Fan-out vers plusieurs sinks. + - name: internal/config + type: infrastructure + responsibilities: + - Chargement/validation config TOML. + - name: internal/observability + type: infrastructure + responsibilities: + - Logging et métriques internes. + +testing: + unit: + description: > + Tests unitaires table-driven avec couverture cible ≥ 80 %. Focalisés sur + la logique de corrélation, parsing et sink ClickHouse.[web:94][web:98][web:102] + coverage_minimum: 0.8 + focus: + - CorrelationService + - Parsing A/B → NormalizedEvent + - ClickHouseSink (batching, overflow) + - MultiSink + integration: + description: > + Tests d’intégration validant le flux complet A+B → corrélation → sinks, + avec sockets simulés et ClickHouse mocké. + +docker: + description: > + Build et tests entièrement encapsulés dans Docker, avec multi‑stage build : + un stage builder pour compiler et tester, un stage runtime minimal pour + exécuter le service.[web:95][web:103] + images: + builder: + base: golang:latest + purpose: build_and_test + runtime: + base: gcr.io/distroless/base-debian12 + purpose: run_binary_only + build: + multi_stage: true + steps: + - name: unit_tests + description: > + go test ./... avec génération de couverture. Le build échoue si la + couverture est < 80 %. + - name: compile_binary + description: > + Compilation CGO_ENABLED=0, GOOS=linux, GOARCH=amd64 pour un binaire + statique /usr/bin/logcorrelator. + - name: assemble_runtime_image + description: > + Copie du binaire dans l’image runtime et définition de l’ENTRYPOINT. + +packaging: + description: > + logcorrelator doit être distribué en package .rpm pour Rocky Linux (8+), + construit intégralement dans Docker à partir du binaire compilé.[web:96][web:99][web:101] + formats: + - rpm + target_distros: + - rocky-linux-8+ + - rocky-linux-9+ + tool: fpm + build_pipeline: + steps: + - name: build_binary_in_docker + description: > + Utiliser l’image builder pour compiler logcorrelator et installer le + binaire dans un répertoire de staging (par ex. /tmp/pkgroot/usr/bin/logcorrelator). + - name: prepare_filesystem_layout + description: > + Créer la hiérarchie : + - /usr/bin/logcorrelator + - /etc/logcorrelator/logcorrelator.toml (exemple) + - /etc/systemd/system/logcorrelator.service (unit) + - /var/log/logcorrelator (répertoire de logs) + - name: run_fpm_in_docker + description: > + Lancer un conteneur fpm (par ex. image ruby:fpm) avec montage de + /tmp/pkgroot, et exécuter fpm -s dir -t rpm pour générer le .rpm + compatible Rocky Linux. + - name: verify_rpm_on_rocky + description: > + Tester l’installation et le démarrage du service dans un conteneur + Rocky Linux 8/9 (docker run --rm -it rockylinux:8), en installant le + .rpm, en activant le service systemd et en vérifiant qu’il démarre + correctement. + +non_functional: + performance: + target_latency_ms: 1000 + description: > + Latence visée < 1 s entre réception et insertion ClickHouse, avec + batching léger. + reliability: + drop_on_clickhouse_failure: true + description: > + En cas de ClickHouse lent/HS, les logs sont drop au‑delà du buffer pour + protéger la machine. + security: + user_separation: true + privileges: least + description: > + Service sous utilisateur dédié, pas de secrets en clair dans les logs, + principe de moindre privilège. + diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..a0d71d9 --- /dev/null +++ b/build.sh @@ -0,0 +1,75 @@ +#!/bin/bash +# Build script - everything runs in Docker containers +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + +VERSION="${VERSION:-1.0.0}" +OUTPUT_DIR="${SCRIPT_DIR}/dist" + +echo "==============================================" +echo " logcorrelator - Docker Build Pipeline" +echo "==============================================" +echo "" + +# Create output directory +mkdir -p "${OUTPUT_DIR}" + +# Step 1: Build and test +echo "[1/4] Building and running tests in container..." +docker build \ + --target builder \ + -t logcorrelator-builder:latest \ + -f Dockerfile . + +# Step 2: Build runtime image +echo "[2/4] Building runtime image..." +docker build \ + --target runtime \ + -t logcorrelator:${VERSION} \ + -t logcorrelator:latest \ + -f Dockerfile . + +# Step 3: Build packages (DEB + RPM) +echo "[3/4] Building DEB and RPM packages in container..." +docker build \ + --target output \ + --build-arg VERSION="${VERSION}" \ + -t logcorrelator-packager:latest \ + -f Dockerfile.package . + +# Extract packages from builder container +echo "[4/4] Extracting packages..." +mkdir -p "${OUTPUT_DIR}/deb" "${OUTPUT_DIR}/rpm" +docker run --rm -v "${OUTPUT_DIR}:/output" logcorrelator-packager:latest \ + sh -c 'cp -r /packages/deb /output/ && cp -r /packages/rpm /output/' + +echo "" +echo "==============================================" +echo " Build Complete!" +echo "==============================================" +echo "" +echo "Artifacts:" +echo " - Runtime image: logcorrelator:${VERSION}" +echo " - DEB package: ${OUTPUT_DIR}/deb/logcorrelator_${VERSION}_amd64.deb" +echo " - RPM package: ${OUTPUT_DIR}/rpm/logcorrelator-${VERSION}-1.x86_64.rpm" +echo "" +echo "Usage:" +echo " # Run with Docker:" +echo " docker run -d --name logcorrelator \\" +echo " -v /var/run/logcorrelator:/var/run/logcorrelator \\" +echo " -v /var/log/logcorrelator:/var/log/logcorrelator \\" +echo " -v ./config.conf:/etc/logcorrelator/logcorrelator.conf \\" +echo " logcorrelator:latest" +echo "" +echo " # Install DEB on Debian/Ubuntu:" +echo " sudo dpkg -i ${OUTPUT_DIR}/deb/logcorrelator_${VERSION}_amd64.deb" +echo " sudo systemctl enable logcorrelator" +echo " sudo systemctl start logcorrelator" +echo "" +echo " # Install RPM on Rocky Linux:" +echo " sudo rpm -ivh ${OUTPUT_DIR}/rpm/logcorrelator-${VERSION}-1.x86_64.rpm" +echo " sudo systemctl enable logcorrelator" +echo " sudo systemctl start logcorrelator" +echo "" diff --git a/config.example.conf b/config.example.conf new file mode 100644 index 0000000..2158601 --- /dev/null +++ b/config.example.conf @@ -0,0 +1,41 @@ +# logcorrelator configuration file +# Format: directive value [value...] +# Lines starting with # are comments + +# Service configuration +service.name logcorrelator +service.language go + +# Input sources (at least 2 required) +# Format: input.unix_socket [format] +input.unix_socket apache_source /var/run/logcorrelator/apache.sock json +input.unix_socket network_source /var/run/logcorrelator/network.sock json + +# File output +output.file.enabled true +output.file.path /var/log/logcorrelator/correlated.log + +# ClickHouse output +output.clickhouse.enabled false +output.clickhouse.dsn clickhouse://user:pass@localhost:9000/db +output.clickhouse.table correlated_logs_http_network +output.clickhouse.batch_size 500 +output.clickhouse.flush_interval_ms 200 +output.clickhouse.max_buffer_size 5000 +output.clickhouse.drop_on_overflow true +output.clickhouse.async_insert true +output.clickhouse.timeout_ms 1000 + +# Stdout output (for debugging) +output.stdout.enabled false + +# Correlation configuration +correlation.key src_ip,src_port +correlation.time_window.value 1 +correlation.time_window.unit s + +# Orphan policy +# apache_always_emit: always emit A events even without matching B +# network_emit: emit B events alone (usually false) +correlation.orphan_policy.apache_always_emit true +correlation.orphan_policy.network_emit false diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4ba01f4 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/logcorrelator/logcorrelator + +go 1.21 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/internal/adapters/inbound/unixsocket/source.go b/internal/adapters/inbound/unixsocket/source.go new file mode 100644 index 0000000..5b0907b --- /dev/null +++ b/internal/adapters/inbound/unixsocket/source.go @@ -0,0 +1,334 @@ +package unixsocket + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "net" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +const ( + // Default socket file permissions (owner + group read/write) + DefaultSocketPermissions os.FileMode = 0660 + // Maximum line size for JSON logs (1MB) + MaxLineSize = 1024 * 1024 + // Maximum concurrent connections per socket + MaxConcurrentConnections = 100 + // Rate limit: max events per second + MaxEventsPerSecond = 10000 +) + +// Config holds the Unix socket source configuration. +type Config struct { + Name string + Path string +} + +// UnixSocketSource reads JSON events from a Unix socket. +type UnixSocketSource struct { + config Config + mu sync.Mutex + listener net.Listener + done chan struct{} + wg sync.WaitGroup + semaphore chan struct{} // Limit concurrent connections +} + +// NewUnixSocketSource creates a new Unix socket source. +func NewUnixSocketSource(config Config) *UnixSocketSource { + return &UnixSocketSource{ + config: config, + done: make(chan struct{}), + semaphore: make(chan struct{}, MaxConcurrentConnections), + } +} + +// Name returns the source name. +func (s *UnixSocketSource) Name() string { + return s.config.Name +} + +// Start begins listening on the Unix socket. +func (s *UnixSocketSource) Start(ctx context.Context, eventChan chan<- *domain.NormalizedEvent) error { + // Remove existing socket file if present + if info, err := os.Stat(s.config.Path); err == nil { + if info.Mode()&os.ModeSocket != 0 { + if err := os.Remove(s.config.Path); err != nil { + return fmt.Errorf("failed to remove existing socket: %w", err) + } + } else { + return fmt.Errorf("path exists but is not a socket: %s", s.config.Path) + } + } + + // Create listener + listener, err := net.Listen("unix", s.config.Path) + if err != nil { + return fmt.Errorf("failed to create unix socket listener: %w", err) + } + s.listener = listener + + // Set permissions - fail if we can't + if err := os.Chmod(s.config.Path, DefaultSocketPermissions); err != nil { + listener.Close() + os.Remove(s.config.Path) + return fmt.Errorf("failed to set socket permissions: %w", err) + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.acceptConnections(ctx, eventChan) + }() + + return nil +} + +func (s *UnixSocketSource) acceptConnections(ctx context.Context, eventChan chan<- *domain.NormalizedEvent) { + for { + select { + case <-s.done: + return + case <-ctx.Done(): + return + default: + } + + conn, err := s.listener.Accept() + if err != nil { + select { + case <-s.done: + return + case <-ctx.Done(): + return + default: + continue + } + } + + // Check semaphore for connection limiting + select { + case s.semaphore <- struct{}{}: + // Connection accepted + default: + // Too many connections, reject + conn.Close() + continue + } + + s.wg.Add(1) + go func(c net.Conn) { + defer s.wg.Done() + defer func() { <-s.semaphore }() + defer c.Close() + s.readEvents(ctx, c, eventChan) + }(conn) + } +} + +func (s *UnixSocketSource) readEvents(ctx context.Context, conn net.Conn, eventChan chan<- *domain.NormalizedEvent) { + // Set read deadline to prevent hanging + conn.SetReadDeadline(time.Now().Add(5 * time.Minute)) + + scanner := bufio.NewScanner(conn) + // Increase buffer size limit to 1MB + buf := make([]byte, 0, 4096) + scanner.Buffer(buf, MaxLineSize) + + for scanner.Scan() { + select { + case <-ctx.Done(): + return + default: + } + + line := scanner.Bytes() + if len(line) == 0 { + continue + } + + event, err := parseJSONEvent(line) + if err != nil { + // Log parse errors but continue processing + continue + } + + select { + case eventChan <- event: + case <-ctx.Done(): + return + } + } + + if err := scanner.Err(); err != nil { + // Connection error, log but don't crash + } +} + +func parseJSONEvent(data []byte) (*domain.NormalizedEvent, error) { + var raw map[string]any + if err := json.Unmarshal(data, &raw); err != nil { + return nil, fmt.Errorf("invalid JSON: %w", err) + } + + event := &domain.NormalizedEvent{ + Raw: raw, + Extra: make(map[string]any), + } + + // Extract and validate src_ip + if v, ok := getString(raw, "src_ip"); ok { + event.SrcIP = v + } else { + return nil, fmt.Errorf("missing required field: src_ip") + } + + // Extract and validate src_port + if v, ok := getInt(raw, "src_port"); ok { + if v < 1 || v > 65535 { + return nil, fmt.Errorf("src_port must be between 1 and 65535, got %d", v) + } + event.SrcPort = v + } else { + return nil, fmt.Errorf("missing required field: src_port") + } + + // Extract dst_ip (optional) + if v, ok := getString(raw, "dst_ip"); ok { + event.DstIP = v + } + + // Extract dst_port (optional) + if v, ok := getInt(raw, "dst_port"); ok { + if v < 0 || v > 65535 { + return nil, fmt.Errorf("dst_port must be between 0 and 65535, got %d", v) + } + event.DstPort = v + } + + // Extract timestamp - try different fields + if ts, ok := getInt64(raw, "timestamp"); ok { + // Assume nanoseconds + event.Timestamp = time.Unix(0, ts) + } else if tsStr, ok := getString(raw, "time"); ok { + if t, err := time.Parse(time.RFC3339, tsStr); err == nil { + event.Timestamp = t + } + } else if tsStr, ok := getString(raw, "timestamp"); ok { + if t, err := time.Parse(time.RFC3339, tsStr); err == nil { + event.Timestamp = t + } + } + + if event.Timestamp.IsZero() { + event.Timestamp = time.Now() + } + + // Extract headers (header_* fields) + event.Headers = make(map[string]string) + for k, v := range raw { + if len(k) > 7 && k[:7] == "header_" { + if sv, ok := v.(string); ok { + event.Headers[k[7:]] = sv + } + } + } + + // Determine source based on fields present + if len(event.Headers) > 0 { + event.Source = domain.SourceA + } else { + event.Source = domain.SourceB + } + + // Extra fields (single pass) + knownFields := map[string]bool{ + "src_ip": true, "src_port": true, "dst_ip": true, "dst_port": true, + "timestamp": true, "time": true, + } + for k, v := range raw { + if knownFields[k] { + continue + } + if strings.HasPrefix(k, "header_") { + continue + } + event.Extra[k] = v + } + + return event, nil +} + +func getString(m map[string]any, key string) (string, bool) { + if v, ok := m[key]; ok { + if s, ok := v.(string); ok { + return s, true + } + } + return "", false +} + +func getInt(m map[string]any, key string) (int, bool) { + if v, ok := m[key]; ok { + switch val := v.(type) { + case float64: + return int(val), true + case int: + return val, true + case int64: + return int(val), true + case string: + if i, err := strconv.Atoi(val); err == nil { + return i, true + } + } + } + return 0, false +} + +func getInt64(m map[string]any, key string) (int64, bool) { + if v, ok := m[key]; ok { + switch val := v.(type) { + case float64: + return int64(val), true + case int: + return int64(val), true + case int64: + return val, true + case string: + if i, err := strconv.ParseInt(val, 10, 64); err == nil { + return i, true + } + } + } + return 0, false +} + +// Stop gracefully stops the source. +func (s *UnixSocketSource) Stop() error { + s.mu.Lock() + defer s.mu.Unlock() + + close(s.done) + + if s.listener != nil { + s.listener.Close() + } + + s.wg.Wait() + + // Clean up socket file + if err := os.Remove(s.config.Path); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove socket file: %w", err) + } + + return nil +} diff --git a/internal/adapters/inbound/unixsocket/source_test.go b/internal/adapters/inbound/unixsocket/source_test.go new file mode 100644 index 0000000..219bfa8 --- /dev/null +++ b/internal/adapters/inbound/unixsocket/source_test.go @@ -0,0 +1,98 @@ +package unixsocket + +import ( + "testing" + "time" +) + +func TestParseJSONEvent_Apache(t *testing.T) { + data := []byte(`{ + "src_ip": "192.168.1.1", + "src_port": 8080, + "dst_ip": "10.0.0.1", + "dst_port": 80, + "timestamp": 1704110400000000000, + "method": "GET", + "path": "/api/test", + "header_host": "example.com", + "header_user_agent": "Mozilla/5.0" + }`) + + event, err := parseJSONEvent(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if event.SrcIP != "192.168.1.1" { + t.Errorf("expected src_ip 192.168.1.1, got %s", event.SrcIP) + } + if event.SrcPort != 8080 { + t.Errorf("expected src_port 8080, got %d", event.SrcPort) + } + if event.Headers["host"] != "example.com" { + t.Errorf("expected header host example.com, got %s", event.Headers["host"]) + } + if event.Headers["user_agent"] != "Mozilla/5.0" { + t.Errorf("expected header_user_agent Mozilla/5.0, got %s", event.Headers["user_agent"]) + } +} + +func TestParseJSONEvent_Network(t *testing.T) { + data := []byte(`{ + "src_ip": "192.168.1.1", + "src_port": 8080, + "dst_ip": "10.0.0.1", + "dst_port": 443, + "ja3": "abc123def456", + "ja4": "xyz789", + "tcp_meta_flags": "SYN" + }`) + + event, err := parseJSONEvent(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if event.SrcIP != "192.168.1.1" { + t.Errorf("expected src_ip 192.168.1.1, got %s", event.SrcIP) + } + if event.Extra["ja3"] != "abc123def456" { + t.Errorf("expected ja3 abc123def456, got %v", event.Extra["ja3"]) + } +} + +func TestParseJSONEvent_InvalidJSON(t *testing.T) { + data := []byte(`{invalid json}`) + + _, err := parseJSONEvent(data) + if err == nil { + t.Error("expected error for invalid JSON") + } +} + +func TestParseJSONEvent_MissingFields(t *testing.T) { + data := []byte(`{"other_field": "value"}`) + + _, err := parseJSONEvent(data) + if err == nil { + t.Error("expected error for missing src_ip/src_port") + } +} + +func TestParseJSONEvent_StringTimestamp(t *testing.T) { + data := []byte(`{ + "src_ip": "192.168.1.1", + "src_port": 8080, + "time": "2024-01-01T12:00:00Z" + }`) + + event, err := parseJSONEvent(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + if !event.Timestamp.Equal(expected) { + t.Errorf("expected timestamp %v, got %v", expected, event.Timestamp) + } +} diff --git a/internal/adapters/outbound/clickhouse/sink.go b/internal/adapters/outbound/clickhouse/sink.go new file mode 100644 index 0000000..a9cfdfa --- /dev/null +++ b/internal/adapters/outbound/clickhouse/sink.go @@ -0,0 +1,333 @@ +package clickhouse + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +const ( + // DefaultBatchSize is the default number of records per batch + DefaultBatchSize = 500 + // DefaultFlushIntervalMs is the default flush interval in milliseconds + DefaultFlushIntervalMs = 200 + // DefaultMaxBufferSize is the default maximum buffer size + DefaultMaxBufferSize = 5000 + // DefaultTimeoutMs is the default timeout for operations in milliseconds + DefaultTimeoutMs = 1000 + // DefaultPingTimeoutMs is the timeout for initial connection ping + DefaultPingTimeoutMs = 5000 + // MaxRetries is the maximum number of retry attempts for failed inserts + MaxRetries = 3 + // RetryBaseDelay is the base delay between retries + RetryBaseDelay = 100 * time.Millisecond +) + +// Config holds the ClickHouse sink configuration. +type Config struct { + DSN string + Table string + BatchSize int + FlushIntervalMs int + MaxBufferSize int + DropOnOverflow bool + AsyncInsert bool + TimeoutMs int +} + +// ClickHouseSink writes correlated logs to ClickHouse. +type ClickHouseSink struct { + config Config + db *sql.DB + mu sync.Mutex + buffer []domain.CorrelatedLog + flushChan chan struct{} + done chan struct{} + wg sync.WaitGroup +} + +// NewClickHouseSink creates a new ClickHouse sink. +func NewClickHouseSink(config Config) (*ClickHouseSink, error) { + // Apply defaults + if config.BatchSize <= 0 { + config.BatchSize = DefaultBatchSize + } + if config.FlushIntervalMs <= 0 { + config.FlushIntervalMs = DefaultFlushIntervalMs + } + if config.MaxBufferSize <= 0 { + config.MaxBufferSize = DefaultMaxBufferSize + } + if config.TimeoutMs <= 0 { + config.TimeoutMs = DefaultTimeoutMs + } + + s := &ClickHouseSink{ + config: config, + buffer: make([]domain.CorrelatedLog, 0, config.BatchSize), + flushChan: make(chan struct{}, 1), + done: make(chan struct{}), + } + + // Connect to ClickHouse + db, err := sql.Open("clickhouse", config.DSN) + if err != nil { + return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err) + } + + // Ping with timeout + pingCtx, pingCancel := context.WithTimeout(context.Background(), time.Duration(DefaultPingTimeoutMs)*time.Millisecond) + defer pingCancel() + + if err := db.PingContext(pingCtx); err != nil { + db.Close() + return nil, fmt.Errorf("failed to ping ClickHouse: %w", err) + } + + s.db = db + + // Start flush goroutine + s.wg.Add(1) + go s.flushLoop() + + return s, nil +} + +// Name returns the sink name. +func (s *ClickHouseSink) Name() string { + return "clickhouse" +} + +// Write adds a log to the buffer. +func (s *ClickHouseSink) Write(ctx context.Context, log domain.CorrelatedLog) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Check buffer overflow + if len(s.buffer) >= s.config.MaxBufferSize { + if s.config.DropOnOverflow { + // Drop the log + return nil + } + // Block until space is available (with timeout) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Duration(s.config.TimeoutMs) * time.Millisecond): + return fmt.Errorf("buffer full, timeout exceeded") + } + } + + s.buffer = append(s.buffer, log) + + // Trigger flush if batch is full + if len(s.buffer) >= s.config.BatchSize { + select { + case s.flushChan <- struct{}{}: + default: + } + } + + return nil +} + +// Flush flushes the buffer to ClickHouse. +func (s *ClickHouseSink) Flush(ctx context.Context) error { + return s.doFlush(ctx) +} + +// Close closes the sink. +func (s *ClickHouseSink) Close() error { + close(s.done) + s.wg.Wait() + + if s.db != nil { + return s.db.Close() + } + return nil +} + +func (s *ClickHouseSink) flushLoop() { + defer s.wg.Done() + + ticker := time.NewTicker(time.Duration(s.config.FlushIntervalMs) * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-s.done: + return + case <-ticker.C: + s.mu.Lock() + needsFlush := len(s.buffer) > 0 + s.mu.Unlock() + if needsFlush { + // Use timeout context for flush + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond) + s.doFlush(ctx) + cancel() + } + case <-s.flushChan: + s.mu.Lock() + needsFlush := len(s.buffer) >= s.config.BatchSize + s.mu.Unlock() + if needsFlush { + // Use timeout context for flush + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond) + s.doFlush(ctx) + cancel() + } + } + } +} + +func (s *ClickHouseSink) doFlush(ctx context.Context) error { + s.mu.Lock() + if len(s.buffer) == 0 { + s.mu.Unlock() + return nil + } + + // Copy buffer to flush + buffer := make([]domain.CorrelatedLog, len(s.buffer)) + copy(buffer, s.buffer) + s.buffer = make([]domain.CorrelatedLog, 0, s.config.BatchSize) + s.mu.Unlock() + + // Prepare batch insert with retry + query := fmt.Sprintf(` + INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, apache, network) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `, s.config.Table) + + // Retry logic with exponential backoff + var lastErr error + for attempt := 0; attempt < MaxRetries; attempt++ { + if attempt > 0 { + // Exponential backoff + delay := RetryBaseDelay * time.Duration(1<= len(substr) && containsLower(s, substr) +} + +func containsLower(s, substr string) bool { + s = toLower(s) + substr = toLower(substr) + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +func toLower(s string) string { + var result []byte + for i := 0; i < len(s); i++ { + c := s[i] + if c >= 'A' && c <= 'Z' { + c = c + ('a' - 'A') + } + result = append(result, c) + } + return string(result) +} diff --git a/internal/adapters/outbound/clickhouse/sink_test.go b/internal/adapters/outbound/clickhouse/sink_test.go new file mode 100644 index 0000000..c30bf9e --- /dev/null +++ b/internal/adapters/outbound/clickhouse/sink_test.go @@ -0,0 +1,305 @@ +package clickhouse + +import ( + "context" + "testing" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +func TestClickHouseSink_Name(t *testing.T) { + sink := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + }, + } + + if sink.Name() != "clickhouse" { + t.Errorf("expected name 'clickhouse', got %s", sink.Name()) + } +} + +func TestClickHouseSink_ConfigDefaults(t *testing.T) { + // Test that defaults are applied correctly + config := Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + // Other fields are zero, should get defaults + } + + // Verify defaults would be applied (we can't actually connect in tests) + if config.BatchSize <= 0 { + config.BatchSize = DefaultBatchSize + } + if config.FlushIntervalMs <= 0 { + config.FlushIntervalMs = DefaultFlushIntervalMs + } + if config.MaxBufferSize <= 0 { + config.MaxBufferSize = DefaultMaxBufferSize + } + if config.TimeoutMs <= 0 { + config.TimeoutMs = DefaultTimeoutMs + } + + if config.BatchSize != DefaultBatchSize { + t.Errorf("expected BatchSize %d, got %d", DefaultBatchSize, config.BatchSize) + } + if config.FlushIntervalMs != DefaultFlushIntervalMs { + t.Errorf("expected FlushIntervalMs %d, got %d", DefaultFlushIntervalMs, config.FlushIntervalMs) + } + if config.MaxBufferSize != DefaultMaxBufferSize { + t.Errorf("expected MaxBufferSize %d, got %d", DefaultMaxBufferSize, config.MaxBufferSize) + } + if config.TimeoutMs != DefaultTimeoutMs { + t.Errorf("expected TimeoutMs %d, got %d", DefaultTimeoutMs, config.TimeoutMs) + } +} + +func TestClickHouseSink_Write_BufferOverflow(t *testing.T) { + // This test verifies the buffer overflow logic without actually connecting + config := Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + BatchSize: 10, + MaxBufferSize: 10, + DropOnOverflow: true, + TimeoutMs: 100, + FlushIntervalMs: 1000, + } + + // We can't test actual writes without a ClickHouse instance, + // but we can verify the config is valid + if config.BatchSize > config.MaxBufferSize { + t.Error("BatchSize should not exceed MaxBufferSize") + } +} + +func TestClickHouseSink_IsRetryableError(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + {"nil error", nil, false}, + {"connection refused", &mockError{"connection refused"}, true}, + {"connection reset", &mockError{"connection reset by peer"}, true}, + {"timeout", &mockError{"timeout waiting for response"}, true}, + {"network unreachable", &mockError{"network is unreachable"}, true}, + {"broken pipe", &mockError{"broken pipe"}, true}, + {"syntax error", &mockError{"syntax error in SQL"}, false}, + {"table not found", &mockError{"table test not found"}, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isRetryableError(tt.err) + if result != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestClickHouseSink_FlushEmpty(t *testing.T) { + // Test that flushing an empty buffer doesn't cause issues + // (We can't test actual ClickHouse operations without a real instance) + + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + }, + buffer: make([]domain.CorrelatedLog, 0), + } + + // Should not panic or error on empty flush + ctx := context.Background() + err := s.Flush(ctx) + if err != nil { + t.Errorf("expected no error on empty flush, got %v", err) + } +} + +func TestClickHouseSink_CloseWithoutConnect(t *testing.T) { + // Test that closing without connecting doesn't panic + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + }, + buffer: make([]domain.CorrelatedLog, 0), + done: make(chan struct{}), + } + + err := s.Close() + if err != nil { + t.Errorf("expected no error on close without connect, got %v", err) + } +} + +func TestClickHouseSink_Constants(t *testing.T) { + // Verify constants have reasonable values + if DefaultBatchSize <= 0 { + t.Error("DefaultBatchSize should be positive") + } + if DefaultFlushIntervalMs <= 0 { + t.Error("DefaultFlushIntervalMs should be positive") + } + if DefaultMaxBufferSize <= 0 { + t.Error("DefaultMaxBufferSize should be positive") + } + if DefaultTimeoutMs <= 0 { + t.Error("DefaultTimeoutMs should be positive") + } + if DefaultPingTimeoutMs <= 0 { + t.Error("DefaultPingTimeoutMs should be positive") + } + if MaxRetries <= 0 { + t.Error("MaxRetries should be positive") + } + if RetryBaseDelay <= 0 { + t.Error("RetryBaseDelay should be positive") + } +} + +// mockError implements error for testing +type mockError struct { + msg string +} + +func (e *mockError) Error() string { + return e.msg +} + +// Test the doFlush function with empty buffer (no actual DB connection) +func TestClickHouseSink_DoFlushEmpty(t *testing.T) { + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + }, + buffer: make([]domain.CorrelatedLog, 0), + } + + ctx := context.Background() + err := s.doFlush(ctx) + if err != nil { + t.Errorf("expected no error when flushing empty buffer, got %v", err) + } +} + +// Test that buffer is properly managed (without actual DB operations) +func TestClickHouseSink_BufferManagement(t *testing.T) { + log := domain.CorrelatedLog{ + SrcIP: "192.168.1.1", + SrcPort: 8080, + Correlated: true, + } + + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + MaxBufferSize: 100, // Allow more than 1 element + DropOnOverflow: false, + TimeoutMs: 1000, + }, + buffer: []domain.CorrelatedLog{log}, + } + + // Verify buffer has data + if len(s.buffer) != 1 { + t.Fatalf("expected buffer length 1, got %d", len(s.buffer)) + } + + // Test that Write properly adds to buffer + ctx := context.Background() + err := s.Write(ctx, log) + if err != nil { + t.Errorf("unexpected error on Write: %v", err) + } + + if len(s.buffer) != 2 { + t.Errorf("expected buffer length 2 after Write, got %d", len(s.buffer)) + } +} + +// Test Write with context cancellation +func TestClickHouseSink_Write_ContextCancel(t *testing.T) { + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + MaxBufferSize: 1, + DropOnOverflow: false, + TimeoutMs: 10, + }, + buffer: make([]domain.CorrelatedLog, 0, 1), + } + + // Fill the buffer + log := domain.CorrelatedLog{SrcIP: "192.168.1.1", SrcPort: 8080} + s.buffer = append(s.buffer, log) + + // Try to write with cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + err := s.Write(ctx, log) + if err == nil { + t.Error("expected error when writing with cancelled context") + } +} + +// Test DropOnOverflow behavior +func TestClickHouseSink_Write_DropOnOverflow(t *testing.T) { + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + MaxBufferSize: 1, + DropOnOverflow: true, + TimeoutMs: 10, + }, + buffer: make([]domain.CorrelatedLog, 0, 1), + } + + // Fill the buffer + log := domain.CorrelatedLog{SrcIP: "192.168.1.1", SrcPort: 8080} + s.buffer = append(s.buffer, log) + + // Try to write when buffer is full - should drop silently + ctx := context.Background() + err := s.Write(ctx, log) + if err != nil { + t.Errorf("expected no error when DropOnOverflow is true, got %v", err) + } +} + +// Benchmark Write operation (without actual DB) +func BenchmarkClickHouseSink_Write(b *testing.B) { + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + MaxBufferSize: 10000, + DropOnOverflow: true, + }, + buffer: make([]domain.CorrelatedLog, 0, 10000), + } + + log := domain.CorrelatedLog{ + Timestamp: time.Now(), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Correlated: true, + } + + ctx := context.Background() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Write(ctx, log) + } +} diff --git a/internal/adapters/outbound/file/sink.go b/internal/adapters/outbound/file/sink.go new file mode 100644 index 0000000..d20efdb --- /dev/null +++ b/internal/adapters/outbound/file/sink.go @@ -0,0 +1,168 @@ +package file + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +const ( + // DefaultFilePermissions for output files + DefaultFilePermissions os.FileMode = 0644 + // DefaultDirPermissions for output directories + DefaultDirPermissions os.FileMode = 0750 +) + +// Config holds the file sink configuration. +type Config struct { + Path string +} + +// FileSink writes correlated logs to a file as JSON lines. +type FileSink struct { + config Config + mu sync.Mutex + file *os.File + writer *bufio.Writer +} + +// NewFileSink creates a new file sink. +func NewFileSink(config Config) (*FileSink, error) { + // Validate path + if err := validateFilePath(config.Path); err != nil { + return nil, fmt.Errorf("invalid file path: %w", err) + } + + return &FileSink{ + config: config, + }, nil +} + +// Name returns the sink name. +func (s *FileSink) Name() string { + return "file" +} + +// Write writes a correlated log to the file. +func (s *FileSink) Write(ctx context.Context, log domain.CorrelatedLog) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.file == nil { + if err := s.openFile(); err != nil { + return err + } + } + + data, err := json.Marshal(log) + if err != nil { + return fmt.Errorf("failed to marshal log: %w", err) + } + + if _, err := s.writer.Write(data); err != nil { + return fmt.Errorf("failed to write log: %w", err) + } + if _, err := s.writer.WriteString("\n"); err != nil { + return fmt.Errorf("failed to write newline: %w", err) + } + + return nil +} + +// Flush flushes any buffered data. +func (s *FileSink) Flush(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.writer != nil { + return s.writer.Flush() + } + return nil +} + +// Close closes the sink. +func (s *FileSink) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.writer != nil { + if err := s.writer.Flush(); err != nil { + return err + } + } + + if s.file != nil { + return s.file.Close() + } + return nil +} + +func (s *FileSink) openFile() error { + // Validate path again before opening + if err := validateFilePath(s.config.Path); err != nil { + return fmt.Errorf("invalid file path: %w", err) + } + + // Ensure directory exists + dir := filepath.Dir(s.config.Path) + if err := os.MkdirAll(dir, DefaultDirPermissions); err != nil { + return fmt.Errorf("failed to create directory: %w", err) + } + + file, err := os.OpenFile(s.config.Path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, DefaultFilePermissions) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + + s.file = file + s.writer = bufio.NewWriter(file) + return nil +} + +// validateFilePath validates that the file path is safe and allowed. +func validateFilePath(path string) error { + if path == "" { + return fmt.Errorf("path cannot be empty") + } + + // Clean the path + cleanPath := filepath.Clean(path) + + // Ensure path is absolute or relative to allowed directories + allowedPrefixes := []string{ + "/var/log/logcorrelator", + "/var/log", + "/tmp", + } + + // Check if path is in allowed directories + allowed := false + for _, prefix := range allowedPrefixes { + if strings.HasPrefix(cleanPath, prefix) { + allowed = true + break + } + } + + if !allowed { + // Allow relative paths for testing + if !filepath.IsAbs(cleanPath) { + return nil + } + return fmt.Errorf("path must be in allowed directories: %v", allowedPrefixes) + } + + // Check for path traversal + if strings.Contains(cleanPath, "..") { + return fmt.Errorf("path cannot contain '..'") + } + + return nil +} diff --git a/internal/adapters/outbound/file/sink_test.go b/internal/adapters/outbound/file/sink_test.go new file mode 100644 index 0000000..c240928 --- /dev/null +++ b/internal/adapters/outbound/file/sink_test.go @@ -0,0 +1,96 @@ +package file + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +func TestFileSink_Write(t *testing.T) { + tmpDir := t.TempDir() + testPath := filepath.Join(tmpDir, "test.log") + + sink, err := NewFileSink(Config{Path: testPath}) + if err != nil { + t.Fatalf("failed to create sink: %v", err) + } + defer sink.Close() + + log := domain.CorrelatedLog{ + SrcIP: "192.168.1.1", + SrcPort: 8080, + Correlated: true, + } + + if err := sink.Write(context.Background(), log); err != nil { + t.Fatalf("failed to write: %v", err) + } + + if err := sink.Flush(context.Background()); err != nil { + t.Fatalf("failed to flush: %v", err) + } + + // Verify file exists and contains data + data, err := os.ReadFile(testPath) + if err != nil { + t.Fatalf("failed to read file: %v", err) + } + + if len(data) == 0 { + t.Error("expected non-empty file") + } +} + +func TestFileSink_MultipleWrites(t *testing.T) { + tmpDir := t.TempDir() + testPath := filepath.Join(tmpDir, "test.log") + + sink, err := NewFileSink(Config{Path: testPath}) + if err != nil { + t.Fatalf("failed to create sink: %v", err) + } + defer sink.Close() + + for i := 0; i < 5; i++ { + log := domain.CorrelatedLog{ + SrcIP: "192.168.1.1", + SrcPort: 8080 + i, + } + if err := sink.Write(context.Background(), log); err != nil { + t.Fatalf("failed to write: %v", err) + } + } + + sink.Close() + + // Verify file has 5 lines + data, err := os.ReadFile(testPath) + if err != nil { + t.Fatalf("failed to read file: %v", err) + } + + lines := 0 + for _, b := range data { + if b == '\n' { + lines++ + } + } + + if lines != 5 { + t.Errorf("expected 5 lines, got %d", lines) + } +} + +func TestFileSink_Name(t *testing.T) { + sink, err := NewFileSink(Config{Path: "/tmp/test.log"}) + if err != nil { + t.Fatalf("failed to create sink: %v", err) + } + + if sink.Name() != "file" { + t.Errorf("expected name 'file', got %s", sink.Name()) + } +} diff --git a/internal/adapters/outbound/multi/sink.go b/internal/adapters/outbound/multi/sink.go new file mode 100644 index 0000000..8bcf9d0 --- /dev/null +++ b/internal/adapters/outbound/multi/sink.go @@ -0,0 +1,123 @@ +package multi + +import ( + "context" + "sync" + + "github.com/logcorrelator/logcorrelator/internal/domain" + "github.com/logcorrelator/logcorrelator/internal/ports" +) + +// MultiSink fans out correlated logs to multiple sinks. +type MultiSink struct { + mu sync.RWMutex + sinks []ports.CorrelatedLogSink +} + +// NewMultiSink creates a new multi-sink. +func NewMultiSink(sinks ...ports.CorrelatedLogSink) *MultiSink { + return &MultiSink{ + sinks: sinks, + } +} + +// Name returns the sink name. +func (s *MultiSink) Name() string { + return "multi" +} + +// AddSink adds a sink to the fan-out. +func (s *MultiSink) AddSink(sink ports.CorrelatedLogSink) { + s.mu.Lock() + defer s.mu.Unlock() + s.sinks = append(s.sinks, sink) +} + +// Write writes a correlated log to all sinks concurrently. +// Returns the first error encountered (but all sinks are attempted). +func (s *MultiSink) Write(ctx context.Context, log domain.CorrelatedLog) error { + s.mu.RLock() + sinks := make([]ports.CorrelatedLogSink, len(s.sinks)) + copy(sinks, s.sinks) + s.mu.RUnlock() + + if len(sinks) == 0 { + return nil + } + + var wg sync.WaitGroup + var firstErr error + var firstErrMu sync.Mutex + errChan := make(chan error, len(sinks)) + + for _, sink := range sinks { + wg.Add(1) + go func(sk ports.CorrelatedLogSink) { + defer wg.Done() + if err := sk.Write(ctx, log); err != nil { + // Non-blocking send to errChan + select { + case errChan <- err: + default: + // Channel full, error will be handled via firstErr + } + } + }(sink) + } + + // Wait for all writes to complete in a separate goroutine + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + // Collect errors with timeout + select { + case <-done: + close(errChan) + // Collect first error + for err := range errChan { + if err != nil { + firstErrMu.Lock() + if firstErr == nil { + firstErr = err + } + firstErrMu.Unlock() + } + } + case <-ctx.Done(): + return ctx.Err() + } + + firstErrMu.Lock() + defer firstErrMu.Unlock() + return firstErr +} + +// Flush flushes all sinks. +func (s *MultiSink) Flush(ctx context.Context) error { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, sink := range s.sinks { + if err := sink.Flush(ctx); err != nil { + return err + } + } + return nil +} + +// Close closes all sinks. +func (s *MultiSink) Close() error { + s.mu.RLock() + defer s.mu.RUnlock() + + var firstErr error + for _, sink := range s.sinks { + if err := sink.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} diff --git a/internal/adapters/outbound/multi/sink_test.go b/internal/adapters/outbound/multi/sink_test.go new file mode 100644 index 0000000..99e2aef --- /dev/null +++ b/internal/adapters/outbound/multi/sink_test.go @@ -0,0 +1,114 @@ +package multi + +import ( + "context" + "sync" + "testing" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +type mockSink struct { + name string + mu sync.Mutex + writeFunc func(domain.CorrelatedLog) error + flushFunc func() error + closeFunc func() error +} + +func (m *mockSink) Name() string { return m.name } +func (m *mockSink) Write(ctx context.Context, log domain.CorrelatedLog) error { + m.mu.Lock() + defer m.mu.Unlock() + return m.writeFunc(log) +} +func (m *mockSink) Flush(ctx context.Context) error { return m.flushFunc() } +func (m *mockSink) Close() error { return m.closeFunc() } + +func TestMultiSink_Write(t *testing.T) { + var mu sync.Mutex + writeCount := 0 + + sink1 := &mockSink{ + name: "sink1", + writeFunc: func(log domain.CorrelatedLog) error { + mu.Lock() + writeCount++ + mu.Unlock() + return nil + }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return nil }, + } + + sink2 := &mockSink{ + name: "sink2", + writeFunc: func(log domain.CorrelatedLog) error { + mu.Lock() + writeCount++ + mu.Unlock() + return nil + }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return nil }, + } + + ms := NewMultiSink(sink1, sink2) + + log := domain.CorrelatedLog{SrcIP: "192.168.1.1"} + err := ms.Write(context.Background(), log) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if writeCount != 2 { + t.Errorf("expected 2 writes, got %d", writeCount) + } +} + +func TestMultiSink_Write_OneFails(t *testing.T) { + sink1 := &mockSink{ + name: "sink1", + writeFunc: func(log domain.CorrelatedLog) error { + return nil + }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return nil }, + } + + sink2 := &mockSink{ + name: "sink2", + writeFunc: func(log domain.CorrelatedLog) error { + return context.Canceled + }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return nil }, + } + + ms := NewMultiSink(sink1, sink2) + + log := domain.CorrelatedLog{SrcIP: "192.168.1.1"} + err := ms.Write(context.Background(), log) + if err == nil { + t.Error("expected error when one sink fails") + } +} + +func TestMultiSink_AddSink(t *testing.T) { + ms := NewMultiSink() + + sink := &mockSink{ + name: "dynamic", + writeFunc: func(log domain.CorrelatedLog) error { return nil }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return nil }, + } + + ms.AddSink(sink) + + log := domain.CorrelatedLog{SrcIP: "192.168.1.1"} + err := ms.Write(context.Background(), log) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/internal/app/orchestrator.go b/internal/app/orchestrator.go new file mode 100644 index 0000000..3d7747f --- /dev/null +++ b/internal/app/orchestrator.go @@ -0,0 +1,158 @@ +package app + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" + "github.com/logcorrelator/logcorrelator/internal/ports" +) + +const ( + // DefaultEventChannelBufferSize is the default size for event channels + DefaultEventChannelBufferSize = 1000 + // ShutdownTimeout is the maximum time to wait for graceful shutdown + ShutdownTimeout = 30 * time.Second +) + +// OrchestratorConfig holds the orchestrator configuration. +type OrchestratorConfig struct { + Sources []ports.EventSource + Sink ports.CorrelatedLogSink +} + +// Orchestrator connects sources to the correlation service and sinks. +type Orchestrator struct { + config OrchestratorConfig + correlationSvc ports.CorrelationProcessor + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + running atomic.Bool +} + +// NewOrchestrator creates a new orchestrator. +func NewOrchestrator(config OrchestratorConfig, correlationSvc ports.CorrelationProcessor) *Orchestrator { + ctx, cancel := context.WithCancel(context.Background()) + return &Orchestrator{ + config: config, + correlationSvc: correlationSvc, + ctx: ctx, + cancel: cancel, + } +} + +// Start begins the orchestration. +func (o *Orchestrator) Start() error { + if !o.running.CompareAndSwap(false, true) { + return nil // Already running + } + + // Start each source + for _, source := range o.config.Sources { + eventChan := make(chan *domain.NormalizedEvent, DefaultEventChannelBufferSize) + + o.wg.Add(1) + go func(src ports.EventSource, evChan chan *domain.NormalizedEvent) { + defer o.wg.Done() + o.processEvents(evChan) + }(source, eventChan) + + o.wg.Add(1) + go func(src ports.EventSource, evChan chan *domain.NormalizedEvent) { + defer o.wg.Done() + if err := src.Start(o.ctx, evChan); err != nil { + // Source failed, but continue with others + } + }(source, eventChan) + } + + return nil +} + +func (o *Orchestrator) processEvents(eventChan <-chan *domain.NormalizedEvent) { + for { + select { + case <-o.ctx.Done(): + // Drain remaining events before exiting + for { + select { + case event, ok := <-eventChan: + if !ok { + return + } + logs := o.correlationSvc.ProcessEvent(event) + for _, log := range logs { + o.config.Sink.Write(o.ctx, log) + } + default: + return + } + } + case event, ok := <-eventChan: + if !ok { + return + } + + // Process through correlation service + logs := o.correlationSvc.ProcessEvent(event) + + // Write correlated logs to sink + for _, log := range logs { + if err := o.config.Sink.Write(o.ctx, log); err != nil { + // Log error but continue processing + } + } + } + } +} + +// Stop gracefully stops the orchestrator. +// It stops all sources first, then flushes remaining events, then closes sinks. +func (o *Orchestrator) Stop() error { + if !o.running.CompareAndSwap(true, false) { + return nil // Not running + } + + // Create shutdown context with timeout + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), ShutdownTimeout) + defer shutdownCancel() + + // First, cancel the main context to stop accepting new events + o.cancel() + + // Wait for source goroutines to finish + // Use a separate goroutine with timeout to prevent deadlock + done := make(chan struct{}) + go func() { + o.wg.Wait() + close(done) + }() + + select { + case <-done: + // Sources stopped cleanly + case <-shutdownCtx.Done(): + // Timeout waiting for sources + } + + // Flush remaining events from correlation service + flushedLogs := o.correlationSvc.Flush() + for _, log := range flushedLogs { + if err := o.config.Sink.Write(shutdownCtx, log); err != nil { + // Log error but continue + } + } + + // Flush and close sink with timeout + if err := o.config.Sink.Flush(shutdownCtx); err != nil { + // Log error + } + if err := o.config.Sink.Close(); err != nil { + // Log error + } + + return nil +} diff --git a/internal/app/orchestrator_test.go b/internal/app/orchestrator_test.go new file mode 100644 index 0000000..d39d424 --- /dev/null +++ b/internal/app/orchestrator_test.go @@ -0,0 +1,160 @@ +package app + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" + "github.com/logcorrelator/logcorrelator/internal/ports" +) + +type mockEventSource struct { + name string + mu sync.RWMutex + eventChan chan<- *domain.NormalizedEvent + started bool + stopped bool +} + +func (m *mockEventSource) Name() string { return m.name } +func (m *mockEventSource) Start(ctx context.Context, eventChan chan<- *domain.NormalizedEvent) error { + m.mu.Lock() + m.started = true + m.eventChan = eventChan + m.mu.Unlock() + <-ctx.Done() + m.mu.Lock() + m.stopped = true + m.mu.Unlock() + return nil +} +func (m *mockEventSource) Stop() error { return nil } + +func (m *mockEventSource) getEventChan() chan<- *domain.NormalizedEvent { + m.mu.RLock() + defer m.mu.RUnlock() + return m.eventChan +} + +func (m *mockEventSource) isStarted() bool { + m.mu.RLock() + defer m.mu.RUnlock() + return m.started +} + +type mockSink struct { + mu sync.Mutex + written []domain.CorrelatedLog +} + +func (m *mockSink) Name() string { return "mock" } +func (m *mockSink) Write(ctx context.Context, log domain.CorrelatedLog) error { + m.mu.Lock() + defer m.mu.Unlock() + m.written = append(m.written, log) + return nil +} +func (m *mockSink) Flush(ctx context.Context) error { return nil } +func (m *mockSink) Close() error { return nil } + +func (m *mockSink) getWritten() []domain.CorrelatedLog { + m.mu.Lock() + defer m.mu.Unlock() + result := make([]domain.CorrelatedLog, len(m.written)) + copy(result, m.written) + return result +} + +func TestOrchestrator_StartStop(t *testing.T) { + source := &mockEventSource{name: "test"} + sink := &mockSink{} + + corrConfig := domain.CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + } + correlationSvc := domain.NewCorrelationService(corrConfig, &domain.RealTimeProvider{}) + + orchestrator := NewOrchestrator(OrchestratorConfig{ + Sources: []ports.EventSource{source}, + Sink: sink, + }, correlationSvc) + + if err := orchestrator.Start(); err != nil { + t.Fatalf("failed to start: %v", err) + } + + // Let it run briefly + time.Sleep(100 * time.Millisecond) + + if err := orchestrator.Stop(); err != nil { + t.Fatalf("failed to stop: %v", err) + } + + if !source.isStarted() { + t.Error("expected source to be started") + } +} + +func TestOrchestrator_ProcessEvent(t *testing.T) { + source := &mockEventSource{name: "test"} + sink := &mockSink{} + + corrConfig := domain.CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + } + correlationSvc := domain.NewCorrelationService(corrConfig, &domain.RealTimeProvider{}) + + orchestrator := NewOrchestrator(OrchestratorConfig{ + Sources: []ports.EventSource{source}, + Sink: sink, + }, correlationSvc) + + if err := orchestrator.Start(); err != nil { + t.Fatalf("failed to start: %v", err) + } + + // Wait for source to start and get the channel + var eventChan chan<- *domain.NormalizedEvent + for i := 0; i < 50; i++ { + eventChan = source.getEventChan() + if eventChan != nil { + break + } + time.Sleep(10 * time.Millisecond) + } + + if eventChan == nil { + t.Fatal("source did not start properly") + } + + // Send an event through the source + event := &domain.NormalizedEvent{ + Source: domain.SourceA, + Timestamp: time.Now(), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + + // Send event + eventChan <- event + + // Give it time to process + time.Sleep(100 * time.Millisecond) + + if err := orchestrator.Stop(); err != nil { + t.Fatalf("failed to stop: %v", err) + } + + // Should have written at least one log (the orphan A) + written := sink.getWritten() + if len(written) == 0 { + t.Error("expected at least one log to be written") + } +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..b740bc2 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,340 @@ +package config + +import ( + "bufio" + "fmt" + "os" + "strconv" + "strings" + "time" +) + +// Config holds the complete application configuration. +type Config struct { + Service ServiceConfig + Inputs InputsConfig + Outputs OutputsConfig + Correlation CorrelationConfig +} + +// ServiceConfig holds service-level configuration. +type ServiceConfig struct { + Name string + Language string +} + +// InputsConfig holds input sources configuration. +type InputsConfig struct { + UnixSockets []UnixSocketConfig +} + +// UnixSocketConfig holds a Unix socket source configuration. +type UnixSocketConfig struct { + Name string + Path string + Format string +} + +// OutputsConfig holds output sinks configuration. +type OutputsConfig struct { + File FileOutputConfig + ClickHouse ClickHouseOutputConfig + Stdout StdoutOutputConfig +} + +// FileOutputConfig holds file sink configuration. +type FileOutputConfig struct { + Enabled bool + Path string +} + +// ClickHouseOutputConfig holds ClickHouse sink configuration. +type ClickHouseOutputConfig struct { + Enabled bool + DSN string + Table string + BatchSize int + FlushIntervalMs int + MaxBufferSize int + DropOnOverflow bool + AsyncInsert bool + TimeoutMs int +} + +// StdoutOutputConfig holds stdout sink configuration. +type StdoutOutputConfig struct { + Enabled bool +} + +// CorrelationConfig holds correlation configuration. +type CorrelationConfig struct { + Key []string + TimeWindow TimeWindowConfig + OrphanPolicy OrphanPolicyConfig +} + +// TimeWindowConfig holds time window configuration. +type TimeWindowConfig struct { + Value int + Unit string +} + +// OrphanPolicyConfig holds orphan event policy configuration. +type OrphanPolicyConfig struct { + ApacheAlwaysEmit bool + NetworkEmit bool +} + +// Load loads configuration from a text file with directives. +func Load(path string) (*Config, error) { + file, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("failed to open config file: %w", err) + } + defer file.Close() + + cfg := &Config{ + Service: ServiceConfig{ + Name: "logcorrelator", + Language: "go", + }, + Inputs: InputsConfig{ + UnixSockets: make([]UnixSocketConfig, 0), + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{ + Enabled: true, + Path: "/var/log/logcorrelator/correlated.log", + }, + ClickHouse: ClickHouseOutputConfig{ + Enabled: false, + BatchSize: 500, + FlushIntervalMs: 200, + MaxBufferSize: 5000, + DropOnOverflow: true, + AsyncInsert: true, + TimeoutMs: 1000, + }, + Stdout: StdoutOutputConfig{ + Enabled: false, + }, + }, + Correlation: CorrelationConfig{ + Key: []string{"src_ip", "src_port"}, + TimeWindow: TimeWindowConfig{ + Value: 1, + Unit: "s", + }, + OrphanPolicy: OrphanPolicyConfig{ + ApacheAlwaysEmit: true, + NetworkEmit: false, + }, + }, + } + + scanner := bufio.NewScanner(file) + lineNum := 0 + + for scanner.Scan() { + lineNum++ + line := strings.TrimSpace(scanner.Text()) + + // Skip empty lines and comments + if line == "" || strings.HasPrefix(line, "#") { + continue + } + + if err := parseDirective(cfg, line); err != nil { + return nil, fmt.Errorf("line %d: %w", lineNum, err) + } + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("invalid config: %w", err) + } + + return cfg, nil +} + +func parseDirective(cfg *Config, line string) error { + parts := strings.Fields(line) + if len(parts) < 2 { + return fmt.Errorf("invalid directive: %s", line) + } + + directive := parts[0] + value := strings.Join(parts[1:], " ") + + switch directive { + case "service.name": + cfg.Service.Name = value + case "service.language": + cfg.Service.Language = value + + case "input.unix_socket": + // Format: input.unix_socket [format] + if len(parts) < 3 { + return fmt.Errorf("input.unix_socket requires name and path") + } + format := "json" + if len(parts) >= 4 { + format = parts[3] + } + cfg.Inputs.UnixSockets = append(cfg.Inputs.UnixSockets, UnixSocketConfig{ + Name: parts[1], + Path: parts[2], + Format: format, + }) + + case "output.file.enabled": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for output.file.enabled: %w", err) + } + cfg.Outputs.File.Enabled = enabled + case "output.file.path": + cfg.Outputs.File.Path = value + + case "output.clickhouse.enabled": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.enabled: %w", err) + } + cfg.Outputs.ClickHouse.Enabled = enabled + case "output.clickhouse.dsn": + cfg.Outputs.ClickHouse.DSN = value + case "output.clickhouse.table": + cfg.Outputs.ClickHouse.Table = value + case "output.clickhouse.batch_size": + v, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.batch_size: %w", err) + } + cfg.Outputs.ClickHouse.BatchSize = v + case "output.clickhouse.flush_interval_ms": + v, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.flush_interval_ms: %w", err) + } + cfg.Outputs.ClickHouse.FlushIntervalMs = v + case "output.clickhouse.max_buffer_size": + v, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.max_buffer_size: %w", err) + } + cfg.Outputs.ClickHouse.MaxBufferSize = v + case "output.clickhouse.drop_on_overflow": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.drop_on_overflow: %w", err) + } + cfg.Outputs.ClickHouse.DropOnOverflow = enabled + case "output.clickhouse.async_insert": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.async_insert: %w", err) + } + cfg.Outputs.ClickHouse.AsyncInsert = enabled + case "output.clickhouse.timeout_ms": + v, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.timeout_ms: %w", err) + } + cfg.Outputs.ClickHouse.TimeoutMs = v + + case "output.stdout.enabled": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for output.stdout.enabled: %w", err) + } + cfg.Outputs.Stdout.Enabled = enabled + + case "correlation.key": + cfg.Correlation.Key = strings.Split(value, ",") + for i, k := range cfg.Correlation.Key { + cfg.Correlation.Key[i] = strings.TrimSpace(k) + } + case "correlation.time_window.value": + v, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid value for correlation.time_window.value: %w", err) + } + cfg.Correlation.TimeWindow.Value = v + case "correlation.time_window.unit": + cfg.Correlation.TimeWindow.Unit = value + case "correlation.orphan_policy.apache_always_emit": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for correlation.orphan_policy.apache_always_emit: %w", err) + } + cfg.Correlation.OrphanPolicy.ApacheAlwaysEmit = enabled + case "correlation.orphan_policy.network_emit": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for correlation.orphan_policy.network_emit: %w", err) + } + cfg.Correlation.OrphanPolicy.NetworkEmit = enabled + + default: + return fmt.Errorf("unknown directive: %s", directive) + } + + return nil +} + +func parseBool(s string) (bool, error) { + s = strings.ToLower(s) + switch s { + case "true", "yes", "1", "on": + return true, nil + case "false", "no", "0", "off": + return false, nil + default: + return false, fmt.Errorf("invalid boolean value: %s", s) + } +} + +// Validate validates the configuration. +func (c *Config) Validate() error { + if len(c.Inputs.UnixSockets) < 2 { + return fmt.Errorf("at least two unix socket inputs are required") + } + + if !c.Outputs.File.Enabled && !c.Outputs.ClickHouse.Enabled && !c.Outputs.Stdout.Enabled { + return fmt.Errorf("at least one output must be enabled") + } + + if c.Outputs.ClickHouse.Enabled && c.Outputs.ClickHouse.DSN == "" { + return fmt.Errorf("clickhouse DSN is required when enabled") + } + + return nil +} + +// GetTimeWindow returns the time window as a duration. +func (c *CorrelationConfig) GetTimeWindow() time.Duration { + value := c.TimeWindow.Value + if value <= 0 { + value = 1 + } + + unit := c.TimeWindow.Unit + if unit == "" { + unit = "s" + } + + switch unit { + case "ms", "millisecond", "milliseconds": + return time.Duration(value) * time.Millisecond + case "s", "second", "seconds": + return time.Duration(value) * time.Second + case "m", "minute", "minutes": + return time.Duration(value) * time.Minute + default: + return time.Duration(value) * time.Second + } +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..c6598e1 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,224 @@ +package config + +import ( + "os" + "path/filepath" + "testing" + "time" +) + +func TestLoad_ValidConfig(t *testing.T) { + content := ` +# Test configuration +service.name logcorrelator +service.language go + +input.unix_socket apache_source /var/run/logcorrelator/apache.sock json +input.unix_socket network_source /var/run/logcorrelator/network.sock json + +output.file.enabled true +output.file.path /var/log/logcorrelator/correlated.log + +output.clickhouse.enabled false +output.clickhouse.dsn clickhouse://user:pass@localhost:9000/db +output.clickhouse.table correlated_logs + +correlation.key src_ip,src_port +correlation.time_window.value 1 +correlation.time_window.unit s + +correlation.orphan_policy.apache_always_emit true +correlation.orphan_policy.network_emit false +` + + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.conf") + if err := os.WriteFile(configPath, []byte(content), 0644); err != nil { + t.Fatalf("failed to write config: %v", err) + } + + cfg, err := Load(configPath) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if cfg.Service.Name != "logcorrelator" { + t.Errorf("expected service name logcorrelator, got %s", cfg.Service.Name) + } + if len(cfg.Inputs.UnixSockets) != 2 { + t.Errorf("expected 2 unix sockets, got %d", len(cfg.Inputs.UnixSockets)) + } + if !cfg.Outputs.File.Enabled { + t.Error("expected file output enabled") + } +} + +func TestLoad_InvalidPath(t *testing.T) { + _, err := Load("/nonexistent/path/config.conf") + if err == nil { + t.Error("expected error for nonexistent path") + } +} + +func TestLoad_InvalidDirective(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.conf") + content := `invalid.directive value` + if err := os.WriteFile(configPath, []byte(content), 0644); err != nil { + t.Fatalf("failed to write config: %v", err) + } + + _, err := Load(configPath) + if err == nil { + t.Error("expected error for invalid directive") + } +} + +func TestLoad_Comments(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.conf") + content := ` +# This is a comment +service.name logcorrelator +# Another comment +input.unix_socket test /tmp/test.sock json +input.unix_socket test2 /tmp/test2.sock json +output.file.enabled true +` + if err := os.WriteFile(configPath, []byte(content), 0644); err != nil { + t.Fatalf("failed to write config: %v", err) + } + + cfg, err := Load(configPath) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if cfg.Service.Name != "logcorrelator" { + t.Errorf("expected service name logcorrelator, got %s", cfg.Service.Name) + } +} + +func TestValidate_MinimumInputs(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "only_one", Path: "/tmp/test.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: true}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for less than 2 inputs") + } +} + +func TestValidate_AtLeastOneOutput(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: "/tmp/a.sock"}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: false}, + ClickHouse: ClickHouseOutputConfig{Enabled: false}, + Stdout: StdoutOutputConfig{Enabled: false}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for no outputs enabled") + } +} + +func TestGetTimeWindow(t *testing.T) { + tests := []struct { + name string + config CorrelationConfig + expected time.Duration + }{ + { + name: "seconds", + config: CorrelationConfig{ + TimeWindow: TimeWindowConfig{Value: 1, Unit: "s"}, + }, + expected: time.Second, + }, + { + name: "milliseconds", + config: CorrelationConfig{ + TimeWindow: TimeWindowConfig{Value: 500, Unit: "ms"}, + }, + expected: 500 * time.Millisecond, + }, + { + name: "minutes", + config: CorrelationConfig{ + TimeWindow: TimeWindowConfig{Value: 2, Unit: "m"}, + }, + expected: 2 * time.Minute, + }, + { + name: "default", + config: CorrelationConfig{ + TimeWindow: TimeWindowConfig{}, + }, + expected: time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.config.GetTimeWindow() + if result != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestParseBool(t *testing.T) { + tests := []struct { + input string + expected bool + hasError bool + }{ + {"true", true, false}, + {"True", true, false}, + {"TRUE", true, false}, + {"yes", true, false}, + {"1", true, false}, + {"on", true, false}, + {"false", false, false}, + {"False", false, false}, + {"no", false, false}, + {"0", false, false}, + {"off", false, false}, + {"invalid", false, true}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + result, err := parseBool(tt.input) + if tt.hasError { + if err == nil { + t.Error("expected error, got nil") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if result != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, result) + } + } + }) + } +} diff --git a/internal/domain/correlated_log.go b/internal/domain/correlated_log.go new file mode 100644 index 0000000..b4143e8 --- /dev/null +++ b/internal/domain/correlated_log.go @@ -0,0 +1,90 @@ +package domain + +import "time" + +// CorrelatedLog represents the output correlated log entry. +type CorrelatedLog struct { + Timestamp time.Time `json:"timestamp"` + SrcIP string `json:"src_ip"` + SrcPort int `json:"src_port"` + DstIP string `json:"dst_ip,omitempty"` + DstPort int `json:"dst_port,omitempty"` + Correlated bool `json:"correlated"` + OrphanSide string `json:"orphan_side,omitempty"` + Apache map[string]any `json:"apache,omitempty"` + Network map[string]any `json:"network,omitempty"` + Extra map[string]any `json:"extra,omitempty"` +} + +// NewCorrelatedLogFromEvent creates a correlated log from a single event (orphan). +func NewCorrelatedLogFromEvent(event *NormalizedEvent, orphanSide string) CorrelatedLog { + return CorrelatedLog{ + Timestamp: event.Timestamp, + SrcIP: event.SrcIP, + SrcPort: event.SrcPort, + DstIP: event.DstIP, + DstPort: event.DstPort, + Correlated: false, + OrphanSide: orphanSide, + Apache: extractApache(event), + Network: extractNetwork(event), + Extra: make(map[string]any), + } +} + +// NewCorrelatedLog creates a correlated log from two matched events. +func NewCorrelatedLog(apacheEvent, networkEvent *NormalizedEvent) CorrelatedLog { + ts := apacheEvent.Timestamp + if networkEvent.Timestamp.After(ts) { + ts = networkEvent.Timestamp + } + + return CorrelatedLog{ + Timestamp: ts, + SrcIP: apacheEvent.SrcIP, + SrcPort: apacheEvent.SrcPort, + DstIP: coalesceString(apacheEvent.DstIP, networkEvent.DstIP), + DstPort: coalesceInt(apacheEvent.DstPort, networkEvent.DstPort), + Correlated: true, + OrphanSide: "", + Apache: extractApache(apacheEvent), + Network: extractNetwork(networkEvent), + Extra: make(map[string]any), + } +} + +func extractApache(e *NormalizedEvent) map[string]any { + if e.Source != SourceA { + return nil + } + result := make(map[string]any) + for k, v := range e.Raw { + result[k] = v + } + return result +} + +func extractNetwork(e *NormalizedEvent) map[string]any { + if e.Source != SourceB { + return nil + } + result := make(map[string]any) + for k, v := range e.Raw { + result[k] = v + } + return result +} + +func coalesceString(a, b string) string { + if a != "" { + return a + } + return b +} + +func coalesceInt(a, b int) int { + if a != 0 { + return a + } + return b +} diff --git a/internal/domain/correlated_log_test.go b/internal/domain/correlated_log_test.go new file mode 100644 index 0000000..6284c5c --- /dev/null +++ b/internal/domain/correlated_log_test.go @@ -0,0 +1,115 @@ +package domain + +import ( + "testing" + "time" +) + +func TestNormalizedEvent_CorrelationKeyFull(t *testing.T) { + tests := []struct { + name string + event *NormalizedEvent + expected string + }{ + { + name: "basic key", + event: &NormalizedEvent{ + SrcIP: "192.168.1.1", + SrcPort: 8080, + }, + expected: "192.168.1.1:8080", + }, + { + name: "different port", + event: &NormalizedEvent{ + SrcIP: "10.0.0.1", + SrcPort: 443, + }, + expected: "10.0.0.1:443", + }, + { + name: "port zero", + event: &NormalizedEvent{ + SrcIP: "127.0.0.1", + SrcPort: 0, + }, + expected: "127.0.0.1:0", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + key := tt.event.CorrelationKeyFull() + if key != tt.expected { + t.Errorf("expected %s, got %s", tt.expected, key) + } + }) + } +} + +func TestNewCorrelatedLogFromEvent(t *testing.T) { + event := &NormalizedEvent{ + Source: SourceA, + Timestamp: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC), + SrcIP: "192.168.1.1", + SrcPort: 8080, + DstIP: "10.0.0.1", + DstPort: 80, + Raw: map[string]any{ + "method": "GET", + "path": "/api/test", + }, + } + + log := NewCorrelatedLogFromEvent(event, "A") + + if log.Correlated { + t.Error("expected correlated to be false") + } + if log.OrphanSide != "A" { + t.Errorf("expected orphan_side A, got %s", log.OrphanSide) + } + if log.SrcIP != "192.168.1.1" { + t.Errorf("expected src_ip 192.168.1.1, got %s", log.SrcIP) + } + if log.Apache == nil { + t.Error("expected apache to be non-nil") + } +} + +func TestNewCorrelatedLog(t *testing.T) { + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC), + SrcIP: "192.168.1.1", + SrcPort: 8080, + DstIP: "10.0.0.1", + DstPort: 80, + Raw: map[string]any{"method": "GET"}, + } + + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: time.Date(2024, 1, 1, 12, 0, 0, 500000000, time.UTC), + SrcIP: "192.168.1.1", + SrcPort: 8080, + DstIP: "10.0.0.1", + DstPort: 80, + Raw: map[string]any{"ja3": "abc123"}, + } + + log := NewCorrelatedLog(apacheEvent, networkEvent) + + if !log.Correlated { + t.Error("expected correlated to be true") + } + if log.OrphanSide != "" { + t.Errorf("expected orphan_side to be empty, got %s", log.OrphanSide) + } + if log.Apache == nil { + t.Error("expected apache to be non-nil") + } + if log.Network == nil { + t.Error("expected network to be non-nil") + } +} diff --git a/internal/domain/correlation_service.go b/internal/domain/correlation_service.go new file mode 100644 index 0000000..dfac022 --- /dev/null +++ b/internal/domain/correlation_service.go @@ -0,0 +1,243 @@ +package domain + +import ( + "container/list" + "sync" + "time" +) + +const ( + // DefaultMaxBufferSize is the default maximum number of events per buffer + DefaultMaxBufferSize = 10000 +) + +// CorrelationConfig holds the correlation configuration. +type CorrelationConfig struct { + TimeWindow time.Duration + ApacheAlwaysEmit bool + NetworkEmit bool + MaxBufferSize int // Maximum events to buffer per source +} + +// CorrelationService handles the correlation logic between source A and B events. +type CorrelationService struct { + config CorrelationConfig + mu sync.Mutex + bufferA *eventBuffer + bufferB *eventBuffer + pendingA map[string]*list.Element // key -> list element containing NormalizedEvent + pendingB map[string]*list.Element + timeProvider TimeProvider +} + +type eventBuffer struct { + events *list.List +} + +func newEventBuffer() *eventBuffer { + return &eventBuffer{ + events: list.New(), + } +} + +// TimeProvider abstracts time for testability. +type TimeProvider interface { + Now() time.Time +} + +// RealTimeProvider uses real system time. +type RealTimeProvider struct{} + +func (p *RealTimeProvider) Now() time.Time { + return time.Now() +} + +// NewCorrelationService creates a new correlation service. +func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider) *CorrelationService { + if timeProvider == nil { + timeProvider = &RealTimeProvider{} + } + if config.MaxBufferSize <= 0 { + config.MaxBufferSize = DefaultMaxBufferSize + } + return &CorrelationService{ + config: config, + bufferA: newEventBuffer(), + bufferB: newEventBuffer(), + pendingA: make(map[string]*list.Element), + pendingB: make(map[string]*list.Element), + timeProvider: timeProvider, + } +} + +// ProcessEvent processes an incoming event and returns correlated logs if matches are found. +func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLog { + s.mu.Lock() + defer s.mu.Unlock() + + // Clean expired events first + s.cleanExpired() + + // Check buffer overflow before adding + if s.isBufferFull(event.Source) { + // Buffer full, drop event or emit as orphan + if event.Source == SourceA && s.config.ApacheAlwaysEmit { + return []CorrelatedLog{NewCorrelatedLogFromEvent(event, "A")} + } + return nil + } + + var results []CorrelatedLog + + switch event.Source { + case SourceA: + results = s.processSourceA(event) + case SourceB: + results = s.processSourceB(event) + } + + // Add the new event to the appropriate buffer + s.addEvent(event) + + return results +} + +func (s *CorrelationService) isBufferFull(source EventSource) bool { + switch source { + case SourceA: + return s.bufferA.events.Len() >= s.config.MaxBufferSize + case SourceB: + return s.bufferB.events.Len() >= s.config.MaxBufferSize + } + return false +} + +func (s *CorrelationService) processSourceA(event *NormalizedEvent) []CorrelatedLog { + key := event.CorrelationKeyFull() + + // Look for a matching B event + if elem, ok := s.pendingB[key]; ok { + bEvent := elem.Value.(*NormalizedEvent) + if s.eventsMatch(event, bEvent) { + // Found a match! + correlated := NewCorrelatedLog(event, bEvent) + s.bufferB.events.Remove(elem) + delete(s.pendingB, key) + return []CorrelatedLog{correlated} + } + } + + // No match found + if s.config.ApacheAlwaysEmit { + orphan := NewCorrelatedLogFromEvent(event, "A") + return []CorrelatedLog{orphan} + } + + // Keep in buffer for potential future match + return nil +} + +func (s *CorrelationService) processSourceB(event *NormalizedEvent) []CorrelatedLog { + key := event.CorrelationKeyFull() + + // Look for a matching A event + if elem, ok := s.pendingA[key]; ok { + aEvent := elem.Value.(*NormalizedEvent) + if s.eventsMatch(aEvent, event) { + // Found a match! + correlated := NewCorrelatedLog(aEvent, event) + s.bufferA.events.Remove(elem) + delete(s.pendingA, key) + return []CorrelatedLog{correlated} + } + } + + // No match found - B is never emitted alone per spec + if s.config.NetworkEmit { + orphan := NewCorrelatedLogFromEvent(event, "B") + return []CorrelatedLog{orphan} + } + + // Keep in buffer for potential future match (but won't be emitted alone) + return nil +} + +func (s *CorrelationService) eventsMatch(a, b *NormalizedEvent) bool { + diff := a.Timestamp.Sub(b.Timestamp) + if diff < 0 { + diff = -diff + } + return diff <= s.config.TimeWindow +} + +func (s *CorrelationService) addEvent(event *NormalizedEvent) { + key := event.CorrelationKeyFull() + + switch event.Source { + case SourceA: + elem := s.bufferA.events.PushBack(event) + s.pendingA[key] = elem + case SourceB: + elem := s.bufferB.events.PushBack(event) + s.pendingB[key] = elem + } +} + +func (s *CorrelationService) cleanExpired() { + now := s.timeProvider.Now() + cutoff := now.Add(-s.config.TimeWindow) + + // Clean expired events from both buffers using shared logic + s.cleanBuffer(s.bufferA, s.pendingA, cutoff) + s.cleanBuffer(s.bufferB, s.pendingB, cutoff) +} + +// cleanBuffer removes expired events from a buffer (shared logic for A and B). +func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string]*list.Element, cutoff time.Time) { + for elem := buffer.events.Front(); elem != nil; { + event := elem.Value.(*NormalizedEvent) + if event.Timestamp.Before(cutoff) { + next := elem.Next() + key := event.CorrelationKeyFull() + buffer.events.Remove(elem) + if pending[key] == elem { + delete(pending, key) + } + elem = next + } else { + break // Events are ordered, so we can stop early + } + } +} + +// Flush forces emission of remaining buffered events (for shutdown). +func (s *CorrelationService) Flush() []CorrelatedLog { + s.mu.Lock() + defer s.mu.Unlock() + + var results []CorrelatedLog + + // Emit remaining A events as orphans if configured + if s.config.ApacheAlwaysEmit { + for elem := s.bufferA.events.Front(); elem != nil; elem = elem.Next() { + event := elem.Value.(*NormalizedEvent) + orphan := NewCorrelatedLogFromEvent(event, "A") + results = append(results, orphan) + } + } + + // Clear buffers + s.bufferA.events.Init() + s.bufferB.events.Init() + s.pendingA = make(map[string]*list.Element) + s.pendingB = make(map[string]*list.Element) + + return results +} + +// GetBufferSizes returns the current buffer sizes (for monitoring). +func (s *CorrelationService) GetBufferSizes() (int, int) { + s.mu.Lock() + defer s.mu.Unlock() + return s.bufferA.events.Len(), s.bufferB.events.Len() +} diff --git a/internal/domain/correlation_service_test.go b/internal/domain/correlation_service_test.go new file mode 100644 index 0000000..ffc03d6 --- /dev/null +++ b/internal/domain/correlation_service_test.go @@ -0,0 +1,153 @@ +package domain + +import ( + "testing" + "time" +) + +type mockTimeProvider struct { + now time.Time +} + +func (m *mockTimeProvider) Now() time.Time { + return m.now +} + +func TestCorrelationService_Match(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: false, // Don't emit A immediately to test matching + NetworkEmit: false, + } + + svc := NewCorrelationService(config, timeProvider) + + // Send Apache event (should be buffered, not emitted) + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + + results := svc.ProcessEvent(apacheEvent) + if len(results) != 0 { + t.Fatalf("expected 0 results (buffered), got %d", len(results)) + } + + // Send matching Network event within window + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now.Add(500 * time.Millisecond), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"ja3": "abc"}, + } + + // This should match and return correlated log + results = svc.ProcessEvent(networkEvent) + if len(results) != 1 { + t.Errorf("expected 1 result (correlated), got %d", len(results)) + } else if !results[0].Correlated { + t.Error("expected correlated result") + } +} + +func TestCorrelationService_NoMatch_DifferentIP(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + } + + svc := NewCorrelationService(config, timeProvider) + + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "192.168.1.2", // Different IP + SrcPort: 8080, + } + + svc.ProcessEvent(apacheEvent) + results := svc.ProcessEvent(networkEvent) + + if len(results) != 0 { + t.Errorf("expected 0 results (different IP), got %d", len(results)) + } +} + +func TestCorrelationService_NoMatch_TimeWindowExceeded(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + } + + svc := NewCorrelationService(config, timeProvider) + + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now.Add(2 * time.Second), // Outside window + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + + svc.ProcessEvent(apacheEvent) + results := svc.ProcessEvent(networkEvent) + + if len(results) != 0 { + t.Errorf("expected 0 results (time window exceeded), got %d", len(results)) + } +} + +func TestCorrelationService_Flush(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + } + + svc := NewCorrelationService(config, timeProvider) + + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + + svc.ProcessEvent(apacheEvent) + + flushed := svc.Flush() + if len(flushed) != 1 { + t.Errorf("expected 1 flushed event, got %d", len(flushed)) + } +} diff --git a/internal/domain/event.go b/internal/domain/event.go new file mode 100644 index 0000000..bbb3b21 --- /dev/null +++ b/internal/domain/event.go @@ -0,0 +1,37 @@ +package domain + +import ( + "strconv" + "time" +) + +// EventSource identifies the source of an event. +type EventSource string + +const ( + SourceA EventSource = "A" // Apache/HTTP source + SourceB EventSource = "B" // Network source +) + +// NormalizedEvent represents a unified internal event from either source. +type NormalizedEvent struct { + Source EventSource + Timestamp time.Time + SrcIP string + SrcPort int + DstIP string + DstPort int + Headers map[string]string + Extra map[string]any + Raw map[string]any // Original raw data +} + +// CorrelationKey returns the key used for correlation (src_ip + src_port). +func (e *NormalizedEvent) CorrelationKey() string { + return e.SrcIP + ":" + strconv.Itoa(e.SrcPort) +} + +// CorrelationKeyFull returns a proper correlation key (alias for clarity). +func (e *NormalizedEvent) CorrelationKeyFull() string { + return e.CorrelationKey() +} diff --git a/internal/observability/logger.go b/internal/observability/logger.go new file mode 100644 index 0000000..836536c --- /dev/null +++ b/internal/observability/logger.go @@ -0,0 +1,85 @@ +package observability + +import ( + "log" + "os" + "sync" +) + +// Logger provides structured logging. +type Logger struct { + mu sync.Mutex + logger *log.Logger + prefix string + fields map[string]any +} + +// NewLogger creates a new logger. +func NewLogger(prefix string) *Logger { + return &Logger{ + logger: log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds), + prefix: prefix, + fields: make(map[string]any), + } +} + +// WithFields returns a new logger with additional fields. +func (l *Logger) WithFields(fields map[string]any) *Logger { + newLogger := &Logger{ + logger: l.logger, + prefix: l.prefix, + fields: make(map[string]any), + } + for k, v := range l.fields { + newLogger.fields[k] = v + } + for k, v := range fields { + newLogger.fields[k] = v + } + return newLogger +} + +// Info logs an info message. +func (l *Logger) Info(msg string) { + l.mu.Lock() + defer l.mu.Unlock() + l.log("INFO", msg) +} + +// Error logs an error message. +func (l *Logger) Error(msg string, err error) { + l.mu.Lock() + defer l.mu.Unlock() + if err != nil { + l.log("ERROR", msg+" "+err.Error()) + } else { + l.log("ERROR", msg) + } +} + +// Debug logs a debug message. +func (l *Logger) Debug(msg string) { + l.mu.Lock() + defer l.mu.Unlock() + l.log("DEBUG", msg) +} + +func (l *Logger) log(level, msg string) { + prefix := l.prefix + if prefix != "" { + prefix = "[" + prefix + "] " + } + + l.logger.SetPrefix(prefix + level + " ") + + var args []any + for k, v := range l.fields { + args = append(args, k, v) + } + + if len(args) > 0 { + l.logger.Printf(msg+" %+v", args...) + } else { + l.logger.Print(msg) + } +} diff --git a/internal/observability/logger_test.go b/internal/observability/logger_test.go new file mode 100644 index 0000000..cdc74fd --- /dev/null +++ b/internal/observability/logger_test.go @@ -0,0 +1,111 @@ +package observability + +import ( + "bytes" + "io" + "os" + "strings" + "testing" +) + +func TestNewLogger(t *testing.T) { + logger := NewLogger("test") + if logger == nil { + t.Fatal("expected non-nil logger") + } + if logger.prefix != "test" { + t.Errorf("expected prefix 'test', got %s", logger.prefix) + } +} + +func TestLogger_Info(t *testing.T) { + // Capture stderr + oldStderr := os.Stderr + r, w, _ := os.Pipe() + os.Stderr = w + + logger := NewLogger("test") + logger.Info("test message") + + w.Close() + os.Stderr = oldStderr + + var buf bytes.Buffer + io.Copy(&buf, r) + output := buf.String() + + if !strings.Contains(output, "INFO") { + t.Error("expected INFO in output") + } + if !strings.Contains(output, "test message") { + t.Error("expected 'test message' in output") + } +} + +func TestLogger_Error(t *testing.T) { + oldStderr := os.Stderr + r, w, _ := os.Pipe() + os.Stderr = w + + logger := NewLogger("test") + logger.Error("error message", nil) + + w.Close() + os.Stderr = oldStderr + + var buf bytes.Buffer + io.Copy(&buf, r) + output := buf.String() + + if !strings.Contains(output, "ERROR") { + t.Error("expected ERROR in output") + } + if !strings.Contains(output, "error message") { + t.Error("expected 'error message' in output") + } +} + +func TestLogger_Debug(t *testing.T) { + oldStderr := os.Stderr + r, w, _ := os.Pipe() + os.Stderr = w + + logger := NewLogger("test") + logger.Debug("debug message") + + w.Close() + os.Stderr = oldStderr + + var buf bytes.Buffer + io.Copy(&buf, r) + output := buf.String() + + if !strings.Contains(output, "DEBUG") { + t.Error("expected DEBUG in output") + } + if !strings.Contains(output, "debug message") { + t.Error("expected 'debug message' in output") + } +} + +func TestLogger_WithFields(t *testing.T) { + logger := NewLogger("test") + fieldsLogger := logger.WithFields(map[string]any{ + "key1": "value1", + "key2": 42, + }) + + if fieldsLogger == logger { + t.Error("expected different logger instance") + } + if len(fieldsLogger.fields) != 2 { + t.Errorf("expected 2 fields, got %d", len(fieldsLogger.fields)) + } +} + +func TestLogger_Name(t *testing.T) { + logger := NewLogger("myservice") + if logger.prefix != "myservice" { + t.Errorf("expected prefix 'myservice', got %s", logger.prefix) + } +} diff --git a/internal/ports/source.go b/internal/ports/source.go new file mode 100644 index 0000000..42fd2f5 --- /dev/null +++ b/internal/ports/source.go @@ -0,0 +1,54 @@ +package ports + +import ( + "context" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +// EventSource defines the interface for log sources. +type EventSource interface { + // Start begins reading events and sending them to the channel. + // Returns an error if the source cannot be started. + Start(ctx context.Context, eventChan chan<- *domain.NormalizedEvent) error + + // Stop gracefully stops the source. + Stop() error + + // Name returns the source name. + Name() string +} + +// CorrelatedLogSink defines the interface for correlated log destinations. +type CorrelatedLogSink interface { + // Write sends a correlated log to the sink. + Write(ctx context.Context, log domain.CorrelatedLog) error + + // Flush flushes any buffered logs. + Flush(ctx context.Context) error + + // Close closes the sink. + Close() error + + // Name returns the sink name. + Name() string +} + +// TimeProvider abstracts time for testability. +type TimeProvider interface { + Now() time.Time +} + +// CorrelationProcessor defines the interface for the correlation service. +// This allows for easier testing and alternative implementations. +type CorrelationProcessor interface { + // ProcessEvent processes an incoming event and returns correlated logs. + ProcessEvent(event *domain.NormalizedEvent) []domain.CorrelatedLog + + // Flush forces emission of remaining buffered events. + Flush() []domain.CorrelatedLog + + // GetBufferSizes returns the current buffer sizes for monitoring. + GetBufferSizes() (int, int) +} diff --git a/logcorrelator.service b/logcorrelator.service new file mode 100644 index 0000000..50484df --- /dev/null +++ b/logcorrelator.service @@ -0,0 +1,23 @@ +[Unit] +Description=logcorrelator service +After=network.target + +[Service] +Type=simple +User=logcorrelator +Group=logcorrelator +ExecStart=/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.conf +Restart=on-failure +RestartSec=5 + +# Security hardening +NoNewPrivileges=true +ProtectSystem=strict +ProtectHome=true +ReadWritePaths=/var/log/logcorrelator /var/run/logcorrelator + +# Resource limits +LimitNOFILE=65536 + +[Install] +WantedBy=multi-user.target diff --git a/packaging/deb/postinst b/packaging/deb/postinst new file mode 100644 index 0000000..212c942 --- /dev/null +++ b/packaging/deb/postinst @@ -0,0 +1,66 @@ +#!/bin/bash +set -e + +# postinst script for logcorrelator .deb package + +case "$1" in + configure) + # Create logcorrelator user and group if they don't exist + if ! getent group logcorrelator > /dev/null 2>&1; then + groupadd --system logcorrelator + fi + + if ! getent passwd logcorrelator > /dev/null 2>&1; then + useradd --system \ + --gid logcorrelator \ + --home-dir /var/lib/logcorrelator \ + --no-create-home \ + --shell /usr/sbin/nologin \ + logcorrelator + fi + + # Create necessary directories + mkdir -p /var/lib/logcorrelator + mkdir -p /var/run/logcorrelator + mkdir -p /var/log/logcorrelator + mkdir -p /etc/logcorrelator + + # Set proper ownership + chown -R logcorrelator:logcorrelator /var/lib/logcorrelator + chown -R logcorrelator:logcorrelator /var/run/logcorrelator + chown -R logcorrelator:logcorrelator /var/log/logcorrelator + chown -R logcorrelator:logcorrelator /etc/logcorrelator + + # Set proper permissions + chmod 750 /var/lib/logcorrelator + chmod 750 /var/log/logcorrelator + chmod 750 /etc/logcorrelator + + # Install default config if it doesn't exist + if [ ! -f /etc/logcorrelator/logcorrelator.conf ]; then + cp /usr/share/logcorrelator/logcorrelator.conf.example /etc/logcorrelator/logcorrelator.conf + chown logcorrelator:logcorrelator /etc/logcorrelator/logcorrelator.conf + chmod 640 /etc/logcorrelator/logcorrelator.conf + fi + + # Enable and start the service (if running in a real system, not container) + if [ -x /bin/systemctl ] && [ -d /run/systemd/system ]; then + systemctl daemon-reload + systemctl enable logcorrelator.service + if ! systemctl is-active --quiet logcorrelator.service; then + systemctl start logcorrelator.service + fi + fi + ;; + + abort-upgrade|abort-remove|abort-deconfigure) + # On abort, do nothing special + ;; + + *) + echo "postinst called with unknown argument '$1'" >&2 + exit 1 + ;; +esac + +exit 0 diff --git a/packaging/deb/postrm b/packaging/deb/postrm new file mode 100644 index 0000000..f796389 --- /dev/null +++ b/packaging/deb/postrm @@ -0,0 +1,52 @@ +#!/bin/bash +set -e + +# postrm script for logcorrelator .deb package + +case "$1" in + remove) + # On remove, leave config and data files + ;; + + purge) + # On purge, remove everything + + # Stop service if running + if [ -x /bin/systemctl ] && [ -d /run/systemd/system ]; then + systemctl stop logcorrelator.service 2>/dev/null || true + systemctl disable logcorrelator.service 2>/dev/null || true + systemctl daemon-reload + fi + + # Remove configuration + rm -rf /etc/logcorrelator + + # Remove data and logs + rm -rf /var/lib/logcorrelator + rm -rf /var/log/logcorrelator + rm -rf /var/run/logcorrelator + + # Remove user and group + if getent passwd logcorrelator > /dev/null 2>&1; then + userdel logcorrelator 2>/dev/null || true + fi + + if getent group logcorrelator > /dev/null 2>&1; then + groupdel logcorrelator 2>/dev/null || true + fi + ;; + + abort-upgrade|abort-remove|abort-deconfigure) + # On abort, restart the service + if [ -x /bin/systemctl ] && [ -d /run/systemd/system ]; then + systemctl start logcorrelator.service 2>/dev/null || true + fi + ;; + + *) + echo "postrm called with unknown argument '$1'" >&2 + exit 1 + ;; +esac + +exit 0 diff --git a/packaging/deb/prerm b/packaging/deb/prerm new file mode 100644 index 0000000..a82dc01 --- /dev/null +++ b/packaging/deb/prerm @@ -0,0 +1,29 @@ +#!/bin/bash +set -e + +# prerm script for logcorrelator .deb package + +case "$1" in + remove|deconfigure) + # Stop and disable the service + if [ -x /bin/systemctl ] && [ -d /run/systemd/system ]; then + systemctl stop logcorrelator.service 2>/dev/null || true + systemctl disable logcorrelator.service 2>/dev/null || true + systemctl daemon-reload + fi + ;; + + upgrade) + # On upgrade, just stop the service (will be restarted by postinst) + if [ -x /bin/systemctl ] && [ -d /run/systemd/system ]; then + systemctl stop logcorrelator.service 2>/dev/null || true + fi + ;; + + *) + echo "prerm called with unknown argument '$1'" >&2 + exit 1 + ;; +esac + +exit 0 diff --git a/test.sh b/test.sh new file mode 100755 index 0000000..2a2f515 --- /dev/null +++ b/test.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Test script - runs all tests in Docker container +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + +echo "==============================================" +echo " logcorrelator - Test Suite (Docker)" +echo "==============================================" +echo "" + +# Build test image and run tests +docker build \ + --target builder \ + -t logcorrelator-test:latest \ + -f Dockerfile . + +echo "" +echo "Tests completed successfully!" +echo ""