diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a851ca..c5b2923 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,36 @@ All notable changes to logcorrelator are documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.1.0] - 2026-03-02 + +### Added + +- **Keep-Alive support**: One-to-many correlation mode allows a single network event (B) to correlate with multiple HTTP events (A) +- **Dynamic TTL**: Network events (source B) now have configurable TTL that resets on each successful correlation +- **Separate buffer sizes**: Configurable `max_http_items` and `max_network_items` for independent buffer control +- **SIGHUP handling**: Service now handles SIGHUP signal for log rotation without restart +- **logrotate configuration**: RPM includes `/etc/logrotate.d/logcorrelator` for automatic log rotation +- **ExecReload**: Systemd service now supports `systemctl reload logcorrelator` + +### Changed + +- **Configuration structure**: New YAML structure with nested sections: + - `time_window` (object with `value` and `unit`) + - `orphan_policy` (object with `apache_always_emit` and `network_emit`) + - `matching.mode` (string: `one_to_one` or `one_to_many`) + - `buffers` (object with `max_http_items` and `max_network_items`) + - `ttl` (object with `network_ttl_s`) +- Backward compatibility maintained for old config fields (`time_window_s`, `emit_orphans`) + +### Technical Details + +- `CorrelationService` now supports `MatchingMode` configuration +- Network events tracked with individual TTL expiration times +- `FileSink.Reopen()` method for log file rotation +- All sinks implement `Reopen()` interface method + +--- + ## [1.0.7] - 2026-03-01 ### Added diff --git a/Dockerfile.package b/Dockerfile.package index 470b886..05cc9fd 100644 --- a/Dockerfile.package +++ b/Dockerfile.package @@ -52,6 +52,7 @@ COPY --from=builder /build/CHANGELOG.md /tmp/pkgroot/usr/share/doc/logcorrelator COPY packaging/rpm/post /tmp/scripts/post COPY packaging/rpm/preun /tmp/scripts/preun COPY packaging/rpm/postun /tmp/scripts/postun +COPY packaging/rpm/logrotate /tmp/pkgroot/etc/logrotate.d/logcorrelator # Create directories and set permissions RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ @@ -91,7 +92,8 @@ RUN mkdir -p /packages/rpm/el8 && \ usr/share/doc/logcorrelator/CHANGELOG.md \ var/log/logcorrelator \ var/run/logcorrelator \ - etc/systemd/system/logcorrelator.service + etc/systemd/system/logcorrelator.service \ + etc/logrotate.d/logcorrelator # ============================================================================= # Stage 3: RPM Package builder for Enterprise Linux 9 (el9) @@ -115,6 +117,7 @@ COPY --from=builder /build/CHANGELOG.md /tmp/pkgroot/usr/share/doc/logcorrelator COPY packaging/rpm/post /tmp/scripts/post COPY packaging/rpm/preun /tmp/scripts/preun COPY packaging/rpm/postun /tmp/scripts/postun +COPY packaging/rpm/logrotate /tmp/pkgroot/etc/logrotate.d/logcorrelator # Create directories and set permissions RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ @@ -154,7 +157,8 @@ RUN mkdir -p /packages/rpm/el9 && \ usr/share/doc/logcorrelator/CHANGELOG.md \ var/log/logcorrelator \ var/run/logcorrelator \ - etc/systemd/system/logcorrelator.service + etc/systemd/system/logcorrelator.service \ + etc/logrotate.d/logcorrelator # ============================================================================= # Stage 4: RPM Package builder for Enterprise Linux 10 (el10) @@ -178,6 +182,7 @@ COPY --from=builder /build/CHANGELOG.md /tmp/pkgroot/usr/share/doc/logcorrelator COPY packaging/rpm/post /tmp/scripts/post COPY packaging/rpm/preun /tmp/scripts/preun COPY packaging/rpm/postun /tmp/scripts/postun +COPY packaging/rpm/logrotate /tmp/pkgroot/etc/logrotate.d/logcorrelator # Create directories and set permissions RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ @@ -217,7 +222,8 @@ RUN mkdir -p /packages/rpm/el10 && \ usr/share/doc/logcorrelator/CHANGELOG.md \ var/log/logcorrelator \ var/run/logcorrelator \ - etc/systemd/system/logcorrelator.service + etc/systemd/system/logcorrelator.service \ + etc/logrotate.d/logcorrelator # ============================================================================= # Stage 5: Output - Image finale avec les packages RPM diff --git a/architecture.yml b/architecture.yml index 437f9a5..f55428c 100644 --- a/architecture.yml +++ b/architecture.yml @@ -9,12 +9,13 @@ service: événements HTTP applicatifs (source A, typiquement Apache ou reverse proxy) avec des événements réseau (source B, métadonnées IP/TCP, JA3/JA4, etc.) sur la base de la combinaison strictement définie src_ip + src_port, avec - une fenêtre temporelle configurable. Le service produit un log corrélé - unique pour chaque paire correspondante, émet toujours les événements A - même lorsqu’aucun événement B corrélé n’est disponible, n’émet jamais de - logs B seuls, et pousse les logs agrégés en temps quasi réel vers - ClickHouse et/ou un fichier local, en minimisant la rétention en mémoire - et sur disque. + une fenêtre temporelle configurable. Le service supporte les connexions + HTTP Keep-Alive : un log réseau peut être corrélé à plusieurs logs HTTP + successifs (stratégie 1‑à‑N). La rétention en mémoire est bornée par des + tailles de caches configurables et un TTL dynamique pour la source B. Le + service émet toujours les événements A même lorsqu’aucun événement B n’est + disponible, n’émet jamais de logs B seuls, et pousse les résultats vers + ClickHouse et/ou un fichier local. runtime: deployment: @@ -23,7 +24,7 @@ runtime: logcorrelator est livré sous forme de binaire autonome, exécuté comme un service systemd. L'unité systemd assure le démarrage automatique au boot, le redémarrage en cas de crash, et une intégration standard dans l'écosystème - Linux (notamment sur CentOS 7 et Rocky Linux 8+). + Linux. binary_path: /usr/bin/logcorrelator config_path: /etc/logcorrelator/logcorrelator.yml user: logcorrelator @@ -41,6 +42,7 @@ runtime: User=logcorrelator Group=logcorrelator ExecStart=/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml + ExecReload=/bin/kill -HUP $MAINPID Restart=on-failure RestartSec=5 @@ -63,98 +65,186 @@ runtime: graceful_shutdown: - SIGINT - SIGTERM + reload: + - SIGHUP description: > - En réception de SIGINT ou SIGTERM, le service arrête proprement la lecture - des sockets Unix, vide les buffers d’envoi (dans les limites de la politique - de drop), ferme les connexions ClickHouse puis s’arrête. + SIGINT/SIGTERM : arrêt propre (arrêt des sockets, vidage des buffers, fermeture + des sinks). SIGHUP : réouverture des fichiers de sortie (utile pour la + rotation des logs via logrotate) sans arrêter le service. + +packaging: + description: > + logcorrelator est distribué sous forme de packages .rpm (Rocky Linux, AlmaLinux, + RHEL), construits intégralement dans des conteneurs. Le changelog RPM est mis + à jour à chaque changement de version. + formats: + - rpm + target_distros: + - rocky-linux-8 + - rocky-linux-9 + - almalinux-10 + - rhel-8 + - rhel-9 + - rhel-10 + rpm: + tool: fpm + changelog: + source: git # ou CHANGELOG.md + description: > + À chaque build, un script génère un fichier de changelog RPM à partir de + l’historique (tags/commits) et le passe à fpm (option --rpm-changelog). + contents: + - path: /usr/bin/logcorrelator + type: binary + - path: /etc/logcorrelator/logcorrelator.yml + type: config + directives: "%config(noreplace)" + - path: /etc/logcorrelator/logcorrelator.yml.example + type: doc + description: Fichier d'exemple toujours mis à jour par le RPM. + - path: /etc/systemd/system/logcorrelator.service + type: systemd_unit + - path: /etc/logrotate.d/logcorrelator + type: logrotate_script + logrotate_example: | + /var/log/logcorrelator/*.log { + daily + rotate 7 + compress + delaycompress + missingok + notifempty + create 0640 logcorrelator logcorrelator + postrotate + systemctl reload logcorrelator > /dev/null 2>/dev/null || true + endscript + } config: format: yaml location: /etc/logcorrelator/logcorrelator.yml + reload_strategy: signal_sighup_for_files description: > - Toute la configuration est centralisée dans un fichier YAML lisible, - stocké dans /etc/logcorrelator. - reload_strategy: restart_service + Toute la configuration est centralisée dans un fichier YAML lisible. Le RPM + fournit aussi un fichier d’exemple mis à jour à chaque version. example: | - # Logging configuration + # /etc/logcorrelator/logcorrelator.yml + log: level: INFO # DEBUG, INFO, WARN, ERROR - # Inputs - at least 2 unix sockets required inputs: unix_sockets: + # Source HTTP (A) : logs applicatifs en JSON, 1 datagramme = 1 log. - name: http - path: /var/run/logcorrelator/http.socket - socket_permissions: "0660" - - name: network - path: /var/run/logcorrelator/network.socket + path: /var/run/logcorrelator/http.sock + socket_permissions: "0666" + socket_type: dgram + max_datagram_bytes: 65535 + + # Source réseau (B) : logs IP/TCP/JA3... en JSON, 1 datagramme = 1 log. + - name: network + path: /var/run/logcorrelator/network.sock + socket_permissions: "0666" + socket_type: dgram + max_datagram_bytes: 65535 - # Outputs outputs: file: + enabled: true path: /var/log/logcorrelator/correlated.log - clickhouse: - dsn: clickhouse://user:pass@localhost:9000/db - table: correlated_logs - stdout: false + format: json_lines + + clickhouse: + enabled: true + dsn: clickhouse://user:pass@localhost:9000/db + table: correlated_logs_http_network + batch_size: 500 + flush_interval_ms: 200 + max_buffer_size: 5000 + drop_on_overflow: true + async_insert: true + timeout_ms: 1000 + + stdout: + enabled: false - # Correlation (optional) correlation: - time_window_s: 1 - emit_orphans: true + # Fenêtre de corrélation : si le log HTTP arrive avant le réseau, il attend + # au plus cette durée (sauf éviction du cache HTTP). + time_window: + value: 1 + unit: s + + orphan_policy: + apache_always_emit: true + network_emit: false + + matching: + mode: one_to_many # Keep‑Alive : un B peut corréler plusieurs A. + + buffers: + # Tailles max des caches en mémoire (en nombre de logs). + max_http_items: 10000 + max_network_items: 20000 + + ttl: + # Durée de vie standard d’un log réseau (B) en mémoire. Chaque corrélation + # réussie avec un A réinitialise ce TTL. + network_ttl_s: 30 inputs: description: > - Le service consomme deux flux de logs JSON via des sockets Unix. Le schéma - exact des logs pour chaque source est flexible et peut évoluer. Seuls - quelques champs sont nécessaires pour la corrélation. + Deux flux de logs JSON via sockets Unix datagram (SOCK_DGRAM). Chaque datagramme + contient un JSON complet. unix_sockets: - name: apache_source id: A description: > - Source A, destinée aux logs HTTP applicatifs (Apache, reverse proxy, etc.). - Le schéma JSON est variable, avec un champ timestamp numérique obligatoire - et des champs header_* dynamiques. + Source A, logs HTTP applicatifs (Apache, reverse proxy, etc.). Schéma JSON + variable, champ timestamp obligatoire, headers dynamiques (header_*). path: /var/run/logcorrelator/apache.sock + permissions: "0666" protocol: unix - mode: stream + socket_type: dgram + mode: datagram format: json - framing: line + framing: message + max_datagram_bytes: 65535 retry_on_error: true + - name: network_source id: B description: > - Source B, destinée aux logs réseau (métadonnées IP/TCP, JA3/JA4, etc.). - Le schéma JSON est variable ; seuls src_ip et src_port sont requis. + Source B, logs réseau (métadonnées IP/TCP, JA3/JA4, etc.). Seuls src_ip + et src_port sont requis pour la corrélation. path: /var/run/logcorrelator/network.sock + permissions: "0666" protocol: unix - mode: stream + socket_type: dgram + mode: datagram format: json - framing: line + framing: message + max_datagram_bytes: 65535 retry_on_error: true outputs: description: > - Les logs corrélés sont envoyés vers un ou plusieurs sinks. MultiSink permet - de diffuser chaque log corrélé vers plusieurs destinations (fichier, - ClickHouse, stdout…). + Les logs corrélés sont envoyés vers un ou plusieurs sinks (MultiSink). sinks: file: enabled: true description: > - Sink vers fichier local, utile pour debug ou archivage local. Écrit un - JSON par ligne dans le chemin configuré. Rotation gérée par logrotate - ou équivalent. + Sink fichier local. Un JSON par ligne. Rotation gérée par logrotate, + réouverture du fichier sur SIGHUP. path: /var/log/logcorrelator/correlated.log format: json_lines - rotate_managed_by: external + rotate_managed_by: external_logrotate clickhouse: enabled: true description: > - Sink principal pour l’archivage et l’analyse en temps quasi réel. Les - logs corrélés sont insérés en batch dans ClickHouse avec un small buffer - et des inserts asynchrones. En cas de saturation ou d’indisponibilité - ClickHouse, les logs sont drop pour éviter de saturer la machine locale. + Sink principal pour l’archivage et l’analyse quasi temps réel. Inserts + batch asynchrones, drop en cas de saturation. dsn: clickhouse://user:pass@host:9000/db table: correlated_logs_http_network batch_size: 500 @@ -166,13 +256,12 @@ outputs: stdout: enabled: false description: > - Sink optionnel vers stdout pour les tests et le développement. + Sink optionnel pour les tests/développement. correlation: description: > - Corrélation strictement basée sur src_ip + src_port et une fenêtre temporelle - configurable. Aucun autre champ (dst_ip, dst_port, JA3/JA4, headers HTTP...) - n’est utilisé pour la décision de corrélation. + Corrélation stricte basée sur src_ip + src_port et une fenêtre temporelle + configurable. Aucun autre champ n’est utilisé pour la décision de corrélation. key: - src_ip - src_port @@ -180,53 +269,50 @@ correlation: value: 1 unit: s description: > - Fenêtre de temps symétrique appliquée aux timestamps de A et B. Deux - événements sont corrélés si |tA - tB| <= time_window. La valeur et l'unité - sont définies dans le YAML. + Fenêtre de temps appliquée aux timestamps de A et B. Si B n’arrive pas dans + ce délai, A est émis comme orphelin. + retention_limits: + max_http_items: 10000 + max_network_items: 20000 + description: > + Limites des caches. Si max_http_items est atteint, le plus ancien A est + évincé et émis orphelin. Si max_network_items est atteint, le plus ancien B + est supprimé silencieusement. + ttl_management: + network_ttl_s: 30 + description: > + TTL des logs réseau. Chaque fois qu’un B est corrélé à un A (Keep‑Alive), + son TTL est remis à cette valeur. timestamp_source: apache: field_timestamp network: reception_time - description: > - Pour A, utilisation du champ numérique "timestamp" (epoch ns). Pour B, - utilisation du temps de réception local. orphan_policy: apache_always_emit: true network_emit: false - description: > - A est toujours émis (même sans B) avec correlated=false et orphan_side="A". - B n’est jamais émis seul. matching: - mode: one_to_one_first_match + mode: one_to_many description: > - Stratégie 1‑à‑1, premier match : lors de l’arrivée d’un événement, on - cherche le premier événement compatible dans le buffer de l’autre source. - Les autres restent en attente ou expirent. + Stratégie 1‑à‑N : un log réseau peut être utilisé pour plusieurs logs HTTP + successifs tant qu’il n’a pas expiré ni été évincé. schema: description: > - Les schémas des sources A et B sont variables. Le service impose seulement - quelques champs obligatoires nécessaires à la corrélation et accepte des - champs supplémentaires sans modification de code. + Schémas variables pour A et B. Quelques champs seulement sont obligatoires + pour la corrélation, les autres sont acceptés sans modification de code. source_A: description: > - Logs HTTP applicatifs (Apache/reverse proxy) au format JSON. Schéma - variable, avec champs obligatoires pour corrélation (src_ip, src_port, - timestamp) et collecte des autres champs dans des maps. + Logs HTTP applicatifs au format JSON. required_fields: - name: src_ip type: string - description: Adresse IP source client. - name: src_port type: int - description: Port source client. - name: timestamp type: int64 unit: ns - description: Timestamp de référence pour la corrélation. optional_fields: - name: time type: string - format: rfc3339 - name: dst_ip type: string - name: dst_port @@ -242,16 +328,10 @@ schema: dynamic_fields: - pattern: header_* target_map: headers - description: > - Tous les champs header_* sont collectés dans headers[clé] = valeur. - pattern: "*" target_map: extra - description: > - Tous les champs non reconnus explicitement vont dans extra. source_B: - description: > - Logs réseau JSON (IP/TCP, JA3/JA4...). Schéma variable. src_ip et src_port - sont obligatoires pour la corrélation, le reste est libre. + description: Logs réseau JSON (IP/TCP, JA3/JA4...). required_fields: - name: src_ip type: string @@ -265,14 +345,10 @@ schema: dynamic_fields: - pattern: "*" target_map: extra - description: > - Tous les autres champs (ip_meta_*, tcp_meta_*, ja3, ja4, etc.) sont - rangés dans extra. normalized_event: description: > - Représentation interne unifiée des événements A/B sur laquelle opère la - logique de corrélation. + Représentation interne unifiée des événements A/B. fields: - name: source type: enum("A","B") @@ -293,13 +369,10 @@ schema: optional: true - name: extra type: map[string]any - description: Champs additionnels provenant de A ou B. correlated_log: description: > - Structure du log corrélé émis vers les sinks (fichier, ClickHouse). Contient - les informations de corrélation et tous les champs des sources A et B fusionnés - dans une structure JSON plate (flat). + Structure du log corrélé émis vers les sinks. fields: - name: timestamp type: time.Time @@ -319,18 +392,13 @@ schema: type: string - name: "*" type: map[string]any - description: > - Tous les champs additionnels provenant de A et B sont fusionnés - directement à la racine du JSON (structure plate, sans subdivisions). clickhouse_schema: strategy: external_ddls description: > - logcorrelator ne gère pas les ALTER TABLE. La table ClickHouse doit être - créée/modifiée en dehors du service. logcorrelator remplit les colonnes - existantes qu'il connaît et met NULL si un champ manque. - Depuis la version 1.0.3, les champs apache et network sont remplacés par - une colonne unique fields JSON contenant tous les champs fusionnés. + La table ClickHouse est gérée en dehors du service. logcorrelator remplit + les colonnes connues et met NULL si un champ manque. Tous les champs fusionnés + sont exposés dans une colonne JSON (fields). base_columns: - name: timestamp type: DateTime64(9) @@ -350,31 +418,26 @@ clickhouse_schema: type: JSON dynamic_fields: mode: map_or_additional_columns - description: > - Les champs dynamiques peuvent être exposés via colonnes dédiées créées par - migration, ou via Map/JSON. architecture: description: > - Architecture hexagonale : domaine de corrélation indépendant, ports - abstraits pour les sources/sinks, adaptateurs pour sockets Unix, fichier et - ClickHouse, couche application d’orchestration, et modules infra pour - config/observabilité. + Architecture hexagonale : domaine de corrélation indépendant, ports abstraits + pour les sources/sinks, adaptateurs pour sockets Unix, fichier et ClickHouse, + couche application d’orchestration, et modules infra (config, observabilité). modules: - name: cmd/logcorrelator type: entrypoint responsibilities: - - Chargement configuration YAML. + - Chargement de la configuration YAML. - Initialisation des adaptateurs d'entrée/sortie. - Création du CorrelationService. - - Démarrage de l'orchestrateur. - - Gestion du cycle de vie (signaux systemd). + - Démarrage de l’orchestrateur. + - Gestion des signaux (SIGINT, SIGTERM, SIGHUP). - name: internal/domain type: domain responsibilities: - Modèles NormalizedEvent et CorrelatedLog. - - Implémentation de CorrelationService (buffers, fenêtre, - orphelins). + - CorrelationService (fenêtre, TTL, buffers bornés, 1‑à‑N, orphelins). - name: internal/ports type: ports responsibilities: @@ -382,163 +445,70 @@ architecture: - name: internal/app type: application responsibilities: - - Orchestrator : relier EventSource → CorrelationService → MultiSink. + - Orchestrator : EventSource → CorrelationService → MultiSink. - name: internal/adapters/inbound/unixsocket type: adapter_inbound responsibilities: - - Lecture sockets Unix + parsing JSON → NormalizedEvent. + - Lecture Unix datagram (SOCK_DGRAM) et parsing JSON → NormalizedEvent. - name: internal/adapters/outbound/file type: adapter_outbound responsibilities: - - Écriture fichier JSON lines. + - Écriture JSON lines. + - Réouverture du fichier sur SIGHUP. - name: internal/adapters/outbound/clickhouse type: adapter_outbound responsibilities: - - Bufferisation + inserts batch vers ClickHouse. - - Application de drop_on_overflow. + - Bufferisation + inserts batch, gestion du drop_on_overflow. - name: internal/adapters/outbound/multi type: adapter_outbound responsibilities: - - Fan-out vers plusieurs sinks. + - Fan‑out vers plusieurs sinks. - name: internal/config type: infrastructure responsibilities: - - Chargement/validation config YAML. + - Chargement/validation de la configuration YAML. - name: internal/observability type: infrastructure responsibilities: - - Logging et métriques internes. + - Logging interne, métriques (tailles des caches, évictions, erreurs datagram). testing: unit: description: > - Tests unitaires table-driven avec couverture cible ≥ 80 %. Focalisés sur - la logique de corrélation, parsing et sink ClickHouse.[web:94][web:98][web:102] + Tests unitaires table‑driven, couverture cible ≥ 80 %, focale sur la logique + de corrélation, les caches et les sinks. coverage_minimum: 0.8 focus: - - CorrelationService - - Parsing A/B → NormalizedEvent + - CorrelationService (fenêtre, TTL, évictions, 1‑à‑N) + - Parsing A/B → NormalizedEvent (datagrammes) - ClickHouseSink (batching, overflow) + - FileSink (réouverture sur SIGHUP) - MultiSink integration: description: > Tests d’intégration validant le flux complet A+B → corrélation → sinks, - avec sockets simulés et ClickHouse mocké. + avec sockets Unix datagram simulées, ClickHouse mocké et scénarios Keep‑Alive. docker: description: > - Build et tests entièrement encapsulés dans Docker, avec multi‑stage build : - un stage builder pour compiler et tester, un stage runtime minimal pour - exécuter le service.[web:95][web:103] - images: - builder: - base: golang:latest - purpose: build_and_test - runtime: - base: scratch - purpose: run_binary_only - build: - multi_stage: true - steps: - - name: unit_tests - description: > - go test ./... avec génération de couverture. Le build échoue si la - couverture est < 80 %. - - name: compile_binary - description: > - Compilation CGO_ENABLED=0, GOOS=linux, GOARCH=amd64 pour un binaire - statique /usr/bin/logcorrelator. - - name: assemble_runtime_image - description: > - Copie du binaire dans l’image runtime et définition de l’ENTRYPOINT. - -packaging: - description: > - logcorrelator est distribué sous forme de packages .rpm (Rocky Linux 8, 9 et AlmaLinux 10), - construits intégralement dans Docker avec fpm. - formats: - - rpm - target_distros: - rpm: - - rocky-linux-8 - - rocky-linux-9 - - almalinux-10 - - rhel-8+ - - rhel-9+ - - rhel-10+ - tool: fpm + Build, tests et packaging RPM sont exécutés intégralement dans des conteneurs + via un multi‑stage build. build_pipeline: - dockerfile: Dockerfile.package + multi_stage: true stages: - - name: builder + - name: test_and_compile + base: golang:latest description: > - Compilation du binaire Go avec CGO_ENABLED=0 pour un binaire statique. - GOOS=linux GOARCH=amd64. - - name: rpm_rocky8_builder + go test ./... (échec si couverture < 80 %), puis compilation d’un binaire + statique (CGO_ENABLED=0, GOOS=linux, GOARCH=amd64). + - name: rpm_builder + base: ruby:alpine description: > - Construction du package RPM pour Rocky Linux 8 (el8) avec fpm. - - name: rpm_rocky9_builder + Installation de fpm, git et outils RPM. Génération du changelog RPM à + partir de l’historique. Construction des .rpm pour les différentes + distributions. + - name: output_export + base: scratch description: > - Construction du package RPM pour Rocky Linux 9 (el9) avec fpm. - - name: rpm_almalinux10_builder - description: > - Construction du package RPM pour AlmaLinux 10 (el10) avec fpm. - - name: output - description: > - Image Alpine minimale contenant les packages dans - /packages/rpm/{rocky8,rocky9,almalinux10}. - files: - binary: - source: dist/logcorrelator - dest: /usr/bin/logcorrelator - mode: "0755" - config: - - source: config.example.yml - dest: /etc/logcorrelator/logcorrelator.yml - mode: "0640" - config_file: true - - source: config.example.yml - dest: /usr/share/logcorrelator/logcorrelator.yml.example - mode: "0640" - directories: - - path: /var/log/logcorrelator - mode: "0755" - - path: /var/run/logcorrelator - mode: "0755" - - path: /etc/logcorrelator - mode: "0750" - maintainer_scripts: - rpm: - post: packaging/rpm/post - preun: packaging/rpm/preun - postun: packaging/rpm/postun - dependencies: - rpm: - - systemd - verify: - rpm: - rocky8: - command: docker run --rm -v $(pwd)/dist/rpm/rocky8:/packages rockylinux:8 sh -c "dnf install -y /packages/*.rpm" - rocky9: - command: docker run --rm -v $(pwd)/dist/rpm/rocky9:/packages rockylinux:9 sh -c "dnf install -y /packages/*.rpm" - almalinux10: - command: docker run --rm -v $(pwd)/dist/rpm/almalinux10:/packages almalinux:10 sh -c "dnf install -y /packages/*.rpm" - -non_functional: - performance: - target_latency_ms: 1000 - description: > - Latence visée < 1 s entre réception et insertion ClickHouse, avec - batching léger. - reliability: - drop_on_clickhouse_failure: true - description: > - En cas de ClickHouse lent/HS, les logs sont drop au‑delà du buffer pour - protéger la machine. - security: - user_separation: true - privileges: least - description: > - Service sous utilisateur dédié, pas de secrets en clair dans les logs, - principe de moindre privilège. + Étape minimale pour exposer les paquets RPM produits (docker build --output). diff --git a/cmd/logcorrelator/main.go b/cmd/logcorrelator/main.go index 18fe2a9..5b201dc 100644 --- a/cmd/logcorrelator/main.go +++ b/cmd/logcorrelator/main.go @@ -105,10 +105,13 @@ func main() { // Create correlation service correlationSvc := domain.NewCorrelationService(domain.CorrelationConfig{ - TimeWindow: cfg.Correlation.GetTimeWindow(), - ApacheAlwaysEmit: cfg.Correlation.EmitOrphans, - NetworkEmit: false, - MaxBufferSize: domain.DefaultMaxBufferSize, + TimeWindow: cfg.Correlation.GetTimeWindow(), + ApacheAlwaysEmit: cfg.Correlation.GetApacheAlwaysEmit(), + NetworkEmit: false, + MaxHTTPBufferSize: cfg.Correlation.GetMaxHTTPBufferSize(), + MaxNetworkBufferSize: cfg.Correlation.GetMaxNetworkBufferSize(), + NetworkTTLS: cfg.Correlation.GetNetworkTTLS(), + MatchingMode: cfg.Correlation.GetMatchingMode(), }, &domain.RealTimeProvider{}) // Set logger for correlation service @@ -134,10 +137,26 @@ func main() { // Wait for shutdown signal sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - sig := <-sigChan + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) - logger.Info(fmt.Sprintf("Shutdown signal received: %v", sig)) + for { + sig := <-sigChan + + if sig == syscall.SIGHUP { + // Reopen file sinks for log rotation + logger.Info("SIGHUP received, reopening file sinks...") + if err := multiSink.Reopen(); err != nil { + logger.Error("Error reopening file sinks", err) + } else { + logger.Info("File sinks reopened successfully") + } + continue + } + + // Shutdown signal received + logger.Info(fmt.Sprintf("Shutdown signal received: %v", sig)) + break + } // Graceful shutdown if err := orchestrator.Stop(); err != nil { diff --git a/config.example.yml b/config.example.yml index 39b6e08..93f80cb 100644 --- a/config.example.yml +++ b/config.example.yml @@ -20,15 +20,44 @@ inputs: outputs: file: + enabled: true path: /var/log/logcorrelator/correlated.log clickhouse: + enabled: false dsn: clickhouse://user:pass@localhost:9000/db table: correlated_logs_http_network + batch_size: 500 + flush_interval_ms: 200 + max_buffer_size: 5000 + drop_on_overflow: true + async_insert: true + timeout_ms: 1000 stdout: enabled: false correlation: - time_window_s: 1 - emit_orphans: true # http toujours émis, network jamais seul + # Time window for correlation (A and B must be within this window) + time_window: + value: 1 + unit: s + + # Orphan policy: what to do when no match is found + orphan_policy: + apache_always_emit: true # Always emit A events, even without B match + network_emit: false # Never emit B events alone + + # Matching mode: one_to_one or one_to_many (Keep-Alive) + matching: + mode: one_to_many + + # Buffer limits (max events in memory) + buffers: + max_http_items: 10000 + max_network_items: 20000 + + # TTL for network events (source B) + ttl: + network_ttl_s: 30 + diff --git a/internal/adapters/outbound/clickhouse/sink.go b/internal/adapters/outbound/clickhouse/sink.go index 2b1a0ba..199249b 100644 --- a/internal/adapters/outbound/clickhouse/sink.go +++ b/internal/adapters/outbound/clickhouse/sink.go @@ -115,6 +115,11 @@ func (s *ClickHouseSink) Name() string { return "clickhouse" } +// Reopen is a no-op for ClickHouse (connection is managed internally). +func (s *ClickHouseSink) Reopen() error { + return nil +} + // Write adds a log to the buffer. func (s *ClickHouseSink) Write(ctx context.Context, log domain.CorrelatedLog) error { deadline := time.Now().Add(time.Duration(s.config.TimeoutMs) * time.Millisecond) diff --git a/internal/adapters/outbound/file/sink.go b/internal/adapters/outbound/file/sink.go index ccee1e1..a630e91 100644 --- a/internal/adapters/outbound/file/sink.go +++ b/internal/adapters/outbound/file/sink.go @@ -38,9 +38,16 @@ func NewFileSink(config Config) (*FileSink, error) { return nil, fmt.Errorf("invalid file path: %w", err) } - return &FileSink{ + s := &FileSink{ config: config, - }, nil + } + + // Open file on creation + if err := s.openFile(); err != nil { + return nil, err + } + + return s, nil } // Name returns the sink name. @@ -48,6 +55,20 @@ func (s *FileSink) Name() string { return "file" } +// Reopen closes and reopens the file (for log rotation on SIGHUP). +func (s *FileSink) Reopen() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.file != nil { + if err := s.file.Close(); err != nil { + return fmt.Errorf("failed to close file: %w", err) + } + } + + return s.openFile() +} + // Write writes a correlated log to the file. func (s *FileSink) Write(ctx context.Context, log domain.CorrelatedLog) error { s.mu.Lock() diff --git a/internal/adapters/outbound/multi/sink.go b/internal/adapters/outbound/multi/sink.go index 8bcf9d0..35e9b65 100644 --- a/internal/adapters/outbound/multi/sink.go +++ b/internal/adapters/outbound/multi/sink.go @@ -121,3 +121,17 @@ func (s *MultiSink) Close() error { } return firstErr } + +// Reopen reopens all sinks (for log rotation on SIGHUP). +func (s *MultiSink) Reopen() error { + s.mu.RLock() + defer s.mu.RUnlock() + + var firstErr error + for _, sink := range s.sinks { + if err := sink.Reopen(); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} diff --git a/internal/adapters/outbound/multi/sink_test.go b/internal/adapters/outbound/multi/sink_test.go index c686e34..866eb7d 100644 --- a/internal/adapters/outbound/multi/sink_test.go +++ b/internal/adapters/outbound/multi/sink_test.go @@ -14,6 +14,7 @@ type mockSink struct { writeFunc func(domain.CorrelatedLog) error flushFunc func() error closeFunc func() error + reopenFunc func() error } func (m *mockSink) Name() string { return m.name } @@ -24,6 +25,12 @@ func (m *mockSink) Write(ctx context.Context, log domain.CorrelatedLog) error { } func (m *mockSink) Flush(ctx context.Context) error { return m.flushFunc() } func (m *mockSink) Close() error { return m.closeFunc() } +func (m *mockSink) Reopen() error { + if m.reopenFunc != nil { + return m.reopenFunc() + } + return nil +} func TestMultiSink_Write(t *testing.T) { var mu sync.Mutex diff --git a/internal/adapters/outbound/stdout/sink.go b/internal/adapters/outbound/stdout/sink.go index 087b8f4..4ee89d1 100644 --- a/internal/adapters/outbound/stdout/sink.go +++ b/internal/adapters/outbound/stdout/sink.go @@ -35,6 +35,11 @@ func (s *StdoutSink) Name() string { return "stdout" } +// Reopen is a no-op for stdout. +func (s *StdoutSink) Reopen() error { + return nil +} + // Write writes a correlated log to stdout. func (s *StdoutSink) Write(ctx context.Context, log domain.CorrelatedLog) error { s.mu.Lock() diff --git a/internal/app/orchestrator_test.go b/internal/app/orchestrator_test.go index d39d424..6ec1b9e 100644 --- a/internal/app/orchestrator_test.go +++ b/internal/app/orchestrator_test.go @@ -58,6 +58,7 @@ func (m *mockSink) Write(ctx context.Context, log domain.CorrelatedLog) error { } func (m *mockSink) Flush(ctx context.Context) error { return nil } func (m *mockSink) Close() error { return nil } +func (m *mockSink) Reopen() error { return nil } func (m *mockSink) getWritten() []domain.CorrelatedLog { m.mu.Lock() diff --git a/internal/config/config.go b/internal/config/config.go index 8d98300..28a2b3b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/logcorrelator/logcorrelator/internal/domain" "gopkg.in/yaml.v3" ) @@ -83,10 +84,61 @@ type StdoutOutputConfig struct { // CorrelationConfig holds correlation configuration. type CorrelationConfig struct { + TimeWindow TimeWindowConfig `yaml:"time_window"` + OrphanPolicy OrphanPolicyConfig `yaml:"orphan_policy"` + Matching MatchingConfig `yaml:"matching"` + Buffers BuffersConfig `yaml:"buffers"` + TTL TTLConfig `yaml:"ttl"` + // Deprecated: Use TimeWindow.Value instead TimeWindowS int `yaml:"time_window_s"` + // Deprecated: Use OrphanPolicy.ApacheAlwaysEmit instead EmitOrphans bool `yaml:"emit_orphans"` } +// TimeWindowConfig holds time window configuration. +type TimeWindowConfig struct { + Value int `yaml:"value"` + Unit string `yaml:"unit"` // s, ms, etc. +} + +// GetDuration returns the time window as a duration. +func (c *TimeWindowConfig) GetDuration() time.Duration { + value := c.Value + if value <= 0 { + value = 1 + } + switch c.Unit { + case "ms", "millisecond", "milliseconds": + return time.Duration(value) * time.Millisecond + case "s", "sec", "second", "seconds": + fallthrough + default: + return time.Duration(value) * time.Second + } +} + +// OrphanPolicyConfig holds orphan event policy configuration. +type OrphanPolicyConfig struct { + ApacheAlwaysEmit bool `yaml:"apache_always_emit"` + NetworkEmit bool `yaml:"network_emit"` +} + +// MatchingConfig holds matching mode configuration. +type MatchingConfig struct { + Mode string `yaml:"mode"` // one_to_one or one_to_many +} + +// BuffersConfig holds buffer size configuration. +type BuffersConfig struct { + MaxHTTPItems int `yaml:"max_http_items"` + MaxNetworkItems int `yaml:"max_network_items"` +} + +// TTLConfig holds TTL configuration. +type TTLConfig struct { + NetworkTTLS int `yaml:"network_ttl_s"` +} + // Load loads configuration from a YAML file. func Load(path string) (*Config, error) { data, err := os.ReadFile(path) @@ -208,7 +260,13 @@ func (c *Config) Validate() error { } // GetTimeWindow returns the time window as a duration. +// Deprecated: Use TimeWindow.GetDuration() instead. func (c *CorrelationConfig) GetTimeWindow() time.Duration { + // New config takes precedence + if c.TimeWindow.Value > 0 { + return c.TimeWindow.GetDuration() + } + // Fallback to deprecated field value := c.TimeWindowS if value <= 0 { value = 1 @@ -216,6 +274,47 @@ func (c *CorrelationConfig) GetTimeWindow() time.Duration { return time.Duration(value) * time.Second } +// GetApacheAlwaysEmit returns whether to always emit Apache events. +func (c *CorrelationConfig) GetApacheAlwaysEmit() bool { + if c.OrphanPolicy.ApacheAlwaysEmit { + return true + } + // Fallback to deprecated field + return c.EmitOrphans +} + +// GetMatchingMode returns the matching mode. +func (c *CorrelationConfig) GetMatchingMode() string { + if c.Matching.Mode != "" { + return c.Matching.Mode + } + return "one_to_many" // Default to Keep-Alive +} + +// GetMaxHTTPBufferSize returns the max HTTP buffer size. +func (c *CorrelationConfig) GetMaxHTTPBufferSize() int { + if c.Buffers.MaxHTTPItems > 0 { + return c.Buffers.MaxHTTPItems + } + return domain.DefaultMaxHTTPBufferSize +} + +// GetMaxNetworkBufferSize returns the max network buffer size. +func (c *CorrelationConfig) GetMaxNetworkBufferSize() int { + if c.Buffers.MaxNetworkItems > 0 { + return c.Buffers.MaxNetworkItems + } + return domain.DefaultMaxNetworkBufferSize +} + +// GetNetworkTTLS returns the network TTL in seconds. +func (c *CorrelationConfig) GetNetworkTTLS() int { + if c.TTL.NetworkTTLS > 0 { + return c.TTL.NetworkTTLS + } + return domain.DefaultNetworkTTLS +} + // GetSocketPermissions returns the socket permissions as os.FileMode. // Default is 0660 (owner + group read/write). func (c *UnixSocketConfig) GetSocketPermissions() os.FileMode { diff --git a/internal/domain/correlation_service.go b/internal/domain/correlation_service.go index 35ed3b4..a5f0d16 100644 --- a/internal/domain/correlation_service.go +++ b/internal/domain/correlation_service.go @@ -9,18 +9,29 @@ import ( ) const ( - // DefaultMaxBufferSize is the default maximum number of events per buffer - DefaultMaxBufferSize = 10000 + // DefaultMaxHTTPBufferSize is the default maximum number of HTTP events (source A) + DefaultMaxHTTPBufferSize = 10000 + // DefaultMaxNetworkBufferSize is the default maximum number of network events (source B) + DefaultMaxNetworkBufferSize = 20000 // DefaultTimeWindow is used when no valid time window is provided DefaultTimeWindow = time.Second + // DefaultNetworkTTLS is the default TTL for network events in seconds + DefaultNetworkTTLS = 30 + // MatchingModeOneToOne indicates single correlation (consume B after match) + MatchingModeOneToOne = "one_to_one" + // MatchingModeOneToMany indicates Keep-Alive mode (B can match multiple A) + MatchingModeOneToMany = "one_to_many" ) // CorrelationConfig holds the correlation configuration. type CorrelationConfig struct { - TimeWindow time.Duration - ApacheAlwaysEmit bool - NetworkEmit bool - MaxBufferSize int // Maximum events to buffer per source + TimeWindow time.Duration + ApacheAlwaysEmit bool + NetworkEmit bool + MaxHTTPBufferSize int // Maximum events to buffer for source A (HTTP) + MaxNetworkBufferSize int // Maximum events to buffer for source B (Network) + NetworkTTLS int // TTL in seconds for network events (source B) + MatchingMode string // "one_to_one" or "one_to_many" (Keep-Alive) } // CorrelationService handles the correlation logic between source A and B events. @@ -31,6 +42,7 @@ type CorrelationService struct { bufferB *eventBuffer pendingA map[string][]*list.Element // key -> ordered elements containing *NormalizedEvent pendingB map[string][]*list.Element + networkTTLs map[*list.Element]time.Time // TTL expiration time for each B event timeProvider TimeProvider logger *observability.Logger } @@ -62,12 +74,21 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider) if timeProvider == nil { timeProvider = &RealTimeProvider{} } - if config.MaxBufferSize <= 0 { - config.MaxBufferSize = DefaultMaxBufferSize + if config.MaxHTTPBufferSize <= 0 { + config.MaxHTTPBufferSize = DefaultMaxHTTPBufferSize + } + if config.MaxNetworkBufferSize <= 0 { + config.MaxNetworkBufferSize = DefaultMaxNetworkBufferSize } if config.TimeWindow <= 0 { config.TimeWindow = DefaultTimeWindow } + if config.NetworkTTLS <= 0 { + config.NetworkTTLS = DefaultNetworkTTLS + } + if config.MatchingMode == "" { + config.MatchingMode = MatchingModeOneToMany // Default to Keep-Alive + } return &CorrelationService{ config: config, @@ -75,6 +96,7 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider) bufferB: newEventBuffer(), pendingA: make(map[string][]*list.Element), pendingB: make(map[string][]*list.Element), + networkTTLs: make(map[*list.Element]time.Time), timeProvider: timeProvider, logger: observability.NewLogger("correlation"), } @@ -140,9 +162,9 @@ func (s *CorrelationService) getBufferSize(source EventSource) int { func (s *CorrelationService) isBufferFull(source EventSource) bool { switch source { case SourceA: - return s.bufferA.events.Len() >= s.config.MaxBufferSize + return s.bufferA.events.Len() >= s.config.MaxHTTPBufferSize case SourceB: - return s.bufferB.events.Len() >= s.config.MaxBufferSize + return s.bufferB.events.Len() >= s.config.MaxNetworkBufferSize } return false } @@ -150,14 +172,41 @@ func (s *CorrelationService) isBufferFull(source EventSource) bool { func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]CorrelatedLog, bool) { key := event.CorrelationKey() - // Look for the first matching B event (one-to-one first match) - if bEvent := s.findAndPopFirstMatch(s.bufferB, s.pendingB, key, func(other *NormalizedEvent) bool { + // Look for matching B events + matches := s.findMatches(s.bufferB, s.pendingB, key, func(other *NormalizedEvent) bool { return s.eventsMatch(event, other) - }); bEvent != nil { - correlated := NewCorrelatedLog(event, bEvent) - s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)", - event.SrcIP, event.SrcPort, bEvent.SrcIP, bEvent.SrcPort) - return []CorrelatedLog{correlated}, false + }) + + if len(matches) > 0 { + var results []CorrelatedLog + // Correlate with all matching B events (one-to-many) + for _, bEvent := range matches { + correlated := NewCorrelatedLog(event, bEvent) + s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)", + event.SrcIP, event.SrcPort, bEvent.SrcIP, bEvent.SrcPort) + results = append(results, correlated) + + // Reset TTL for matched B event (Keep-Alive) + if s.config.MatchingMode == MatchingModeOneToMany { + // Find the element for this B event and reset TTL + bKey := bEvent.CorrelationKey() + if elements, ok := s.pendingB[bKey]; ok { + for _, elem := range elements { + if elem.Value.(*NormalizedEvent) == bEvent { + s.resetNetworkTTL(elem) + break + } + } + } + } + } + + // In one-to-one mode, remove the first matching B + if s.config.MatchingMode == MatchingModeOneToOne { + s.removeEvent(s.bufferB, s.pendingB, matches[0]) + } + + return results, false } // No match found - orphan A event @@ -206,30 +255,50 @@ func (s *CorrelationService) addEvent(event *NormalizedEvent) { case SourceB: elem := s.bufferB.events.PushBack(event) s.pendingB[key] = append(s.pendingB[key], elem) + // Set TTL for network event + s.networkTTLs[elem] = s.timeProvider.Now().Add(time.Duration(s.config.NetworkTTLS) * time.Second) } } func (s *CorrelationService) cleanExpired() { now := s.timeProvider.Now() - cutoff := now.Add(-s.config.TimeWindow) - - // Clean expired events from both buffers using shared logic - s.cleanBuffer(s.bufferA, s.pendingA, cutoff) - s.cleanBuffer(s.bufferB, s.pendingB, cutoff) + + // Clean expired A events (based on time window) + aCutoff := now.Add(-s.config.TimeWindow) + s.cleanBuffer(s.bufferA, s.pendingA, aCutoff, nil) + + // Clean expired B events (based on TTL) + bCutoff := now.Add(-time.Duration(s.config.NetworkTTLS) * time.Second) + s.cleanBuffer(s.bufferB, s.pendingB, bCutoff, s.networkTTLs) } // cleanBuffer removes expired events from a buffer. -func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time) { +func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time, networkTTLs map[*list.Element]time.Time) { for elem := buffer.events.Front(); elem != nil; { next := elem.Next() event := elem.Value.(*NormalizedEvent) - if event.Timestamp.Before(cutoff) { + + // Check if event is expired + isExpired := event.Timestamp.Before(cutoff) + + // For B events, also check TTL + if !isExpired && networkTTLs != nil { + if ttl, exists := networkTTLs[elem]; exists { + isExpired = s.timeProvider.Now().After(ttl) + } + } + + if isExpired { key := event.CorrelationKey() buffer.events.Remove(elem) pending[key] = removeElementFromSlice(pending[key], elem) if len(pending[key]) == 0 { delete(pending, key) } + // Remove from TTL map + if networkTTLs != nil { + delete(networkTTLs, elem) + } } elem = next } @@ -266,6 +335,76 @@ func (s *CorrelationService) findAndPopFirstMatch( return nil } +// findMatches returns all matching events without removing them (for one-to-many). +func (s *CorrelationService) findMatches( + buffer *eventBuffer, + pending map[string][]*list.Element, + key string, + matcher func(*NormalizedEvent) bool, +) []*NormalizedEvent { + elements, ok := pending[key] + if !ok || len(elements) == 0 { + return nil + } + + var matches []*NormalizedEvent + for _, elem := range elements { + other := elem.Value.(*NormalizedEvent) + if matcher(other) { + matches = append(matches, other) + } + } + + return matches +} + +// getElementByKey finds the list element for a given event in pending map. +func (s *CorrelationService) getElementByKey(pending map[string][]*list.Element, key string, event *NormalizedEvent) *list.Element { + elements, ok := pending[key] + if !ok { + return nil + } + + for _, elem := range elements { + if elem.Value.(*NormalizedEvent) == event { + return elem + } + } + return nil +} + +// removeEvent removes an event from buffer and pending maps. +func (s *CorrelationService) removeEvent(buffer *eventBuffer, pending map[string][]*list.Element, event *NormalizedEvent) { + key := event.CorrelationKey() + elements, ok := pending[key] + if !ok { + return + } + + for idx, elem := range elements { + if elem.Value.(*NormalizedEvent) == event { + buffer.events.Remove(elem) + updated := append(elements[:idx], elements[idx+1:]...) + if len(updated) == 0 { + delete(pending, key) + } else { + pending[key] = updated + } + // Remove from TTL map if present + delete(s.networkTTLs, elem) + break + } + } +} + +// resetNetworkTTL resets the TTL for a network event (Keep-Alive). +func (s *CorrelationService) resetNetworkTTL(elem *list.Element) { + if elem == nil { + return + } + s.networkTTLs[elem] = s.timeProvider.Now().Add(time.Duration(s.config.NetworkTTLS) * time.Second) +} + func removeElementFromSlice(elements []*list.Element, target *list.Element) []*list.Element { if len(elements) == 0 { return elements @@ -301,6 +440,7 @@ func (s *CorrelationService) Flush() []CorrelatedLog { s.bufferB.events.Init() s.pendingA = make(map[string][]*list.Element) s.pendingB = make(map[string][]*list.Element) + s.networkTTLs = make(map[*list.Element]time.Time) return results } diff --git a/internal/domain/correlation_service_test.go b/internal/domain/correlation_service_test.go index 1883b81..029f851 100644 --- a/internal/domain/correlation_service_test.go +++ b/internal/domain/correlation_service_test.go @@ -18,9 +18,13 @@ func TestCorrelationService_Match(t *testing.T) { timeProvider := &mockTimeProvider{now: now} config := CorrelationConfig{ - TimeWindow: time.Second, - ApacheAlwaysEmit: false, // Don't emit A immediately to test matching - NetworkEmit: false, + TimeWindow: time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, } svc := NewCorrelationService(config, timeProvider) @@ -62,9 +66,13 @@ func TestCorrelationService_NoMatch_DifferentIP(t *testing.T) { timeProvider := &mockTimeProvider{now: now} config := CorrelationConfig{ - TimeWindow: time.Second, - ApacheAlwaysEmit: true, - NetworkEmit: false, + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, } svc := NewCorrelationService(config, timeProvider) @@ -96,9 +104,13 @@ func TestCorrelationService_NoMatch_TimeWindowExceeded(t *testing.T) { timeProvider := &mockTimeProvider{now: now} config := CorrelationConfig{ - TimeWindow: time.Second, - ApacheAlwaysEmit: true, - NetworkEmit: false, + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, } svc := NewCorrelationService(config, timeProvider) @@ -130,9 +142,13 @@ func TestCorrelationService_Flush(t *testing.T) { timeProvider := &mockTimeProvider{now: now} config := CorrelationConfig{ - TimeWindow: time.Second, - ApacheAlwaysEmit: true, - NetworkEmit: false, + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, } svc := NewCorrelationService(config, timeProvider) @@ -161,9 +177,13 @@ func TestCorrelationService_GetBufferSizes(t *testing.T) { timeProvider := &mockTimeProvider{now: now} config := CorrelationConfig{ - TimeWindow: time.Second, - ApacheAlwaysEmit: false, - NetworkEmit: false, + TimeWindow: time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, } svc := NewCorrelationService(config, timeProvider) @@ -194,9 +214,13 @@ func TestCorrelationService_FlushWithEvents(t *testing.T) { timeProvider := &mockTimeProvider{now: now} config := CorrelationConfig{ - TimeWindow: time.Second, - ApacheAlwaysEmit: true, - NetworkEmit: true, + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: true, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, } svc := NewCorrelationService(config, timeProvider) @@ -222,6 +246,7 @@ func TestCorrelationService_FlushWithEvents(t *testing.T) { elemB := svc.bufferB.events.PushBack(networkEvent) svc.pendingB[keyB] = append(svc.pendingB[keyB], elemB) + svc.networkTTLs[elemB] = now.Add(time.Duration(svc.config.NetworkTTLS) * time.Second) flushed := svc.Flush() if len(flushed) != 1 { @@ -243,10 +268,11 @@ func TestCorrelationService_BufferOverflow(t *testing.T) { timeProvider := &mockTimeProvider{now: now} config := CorrelationConfig{ - TimeWindow: time.Second, - ApacheAlwaysEmit: false, - NetworkEmit: false, - MaxBufferSize: 2, + TimeWindow: time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + MaxHTTPBufferSize: 2, + MaxNetworkBufferSize: 2, } svc := NewCorrelationService(config, timeProvider) @@ -282,12 +308,21 @@ func TestCorrelationService_DefaultConfig(t *testing.T) { config := CorrelationConfig{} svc := NewCorrelationService(config, timeProvider) - if svc.config.MaxBufferSize != DefaultMaxBufferSize { - t.Errorf("expected MaxBufferSize %d, got %d", DefaultMaxBufferSize, svc.config.MaxBufferSize) + if svc.config.MaxHTTPBufferSize != DefaultMaxHTTPBufferSize { + t.Errorf("expected MaxHTTPBufferSize %d, got %d", DefaultMaxHTTPBufferSize, svc.config.MaxHTTPBufferSize) + } + if svc.config.MaxNetworkBufferSize != DefaultMaxNetworkBufferSize { + t.Errorf("expected MaxNetworkBufferSize %d, got %d", DefaultMaxNetworkBufferSize, svc.config.MaxNetworkBufferSize) } if svc.config.TimeWindow != DefaultTimeWindow { t.Errorf("expected TimeWindow %v, got %v", DefaultTimeWindow, svc.config.TimeWindow) } + if svc.config.NetworkTTLS != DefaultNetworkTTLS { + t.Errorf("expected NetworkTTLS %d, got %d", DefaultNetworkTTLS, svc.config.NetworkTTLS) + } + if svc.config.MatchingMode != MatchingModeOneToMany { + t.Errorf("expected MatchingMode %s, got %s", MatchingModeOneToMany, svc.config.MatchingMode) + } } func TestCorrelationService_NilTimeProvider(t *testing.T) { @@ -307,9 +342,13 @@ func TestCorrelationService_DifferentSourceTypes(t *testing.T) { timeProvider := &mockTimeProvider{now: now} config := CorrelationConfig{ - TimeWindow: time.Second, - ApacheAlwaysEmit: false, - NetworkEmit: false, + TimeWindow: time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, } svc := NewCorrelationService(config, timeProvider) @@ -333,10 +372,9 @@ func TestCorrelationService_DifferentSourceTypes(t *testing.T) { SrcPort: 8080, } results = svc.ProcessEvent(apacheEvent) - if len(results) != 1 { - t.Errorf("expected 1 result (correlated), got %d", len(results)) - } - if !results[0].Correlated { + if len(results) < 1 { + t.Errorf("expected at least 1 result (correlated), got %d", len(results)) + } else if !results[0].Correlated { t.Error("expected correlated result") } } @@ -346,9 +384,13 @@ func TestCorrelationService_NetworkEmitTrue_DoesNotEmitBAlone(t *testing.T) { timeProvider := &mockTimeProvider{now: now} config := CorrelationConfig{ - TimeWindow: time.Second, - ApacheAlwaysEmit: false, - NetworkEmit: true, + TimeWindow: time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: true, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + MatchingMode: MatchingModeOneToMany, } svc := NewCorrelationService(config, timeProvider) @@ -370,3 +412,204 @@ func TestCorrelationService_NetworkEmitTrue_DoesNotEmitBAlone(t *testing.T) { t.Errorf("expected 0 flushed orphan B events, got %d", len(flushed)) } } + +func TestCorrelationService_OneToMany_KeepAlive(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + MatchingMode: MatchingModeOneToMany, // Keep-Alive mode + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + } + + svc := NewCorrelationService(config, timeProvider) + + // Send B event first (network) + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"ja3": "abc123"}, + } + results := svc.ProcessEvent(networkEvent) + if len(results) != 0 { + t.Fatalf("expected 0 results (B buffered), got %d", len(results)) + } + + // Send first A event (Apache) - should correlate with B + apacheEvent1 := &NormalizedEvent{ + Source: SourceA, + Timestamp: now.Add(500 * time.Millisecond), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET", "path": "/api/first"}, + } + results = svc.ProcessEvent(apacheEvent1) + if len(results) != 1 { + t.Errorf("expected 1 correlated result for first A, got %d", len(results)) + } else if !results[0].Correlated { + t.Error("expected correlated result for first A") + } + + // Send second A event (same connection, Keep-Alive) - should also correlate with same B + apacheEvent2 := &NormalizedEvent{ + Source: SourceA, + Timestamp: now.Add(1 * time.Second), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET", "path": "/api/second"}, + } + results = svc.ProcessEvent(apacheEvent2) + if len(results) != 1 { + t.Errorf("expected 1 correlated result for second A (Keep-Alive), got %d", len(results)) + } else if !results[0].Correlated { + t.Error("expected correlated result for second A (Keep-Alive)") + } + + // Verify B is still in buffer (Keep-Alive) + a, b := svc.GetBufferSizes() + if a != 0 { + t.Errorf("expected A buffer empty, got %d", a) + } + if b != 1 { + t.Errorf("expected B buffer size 1 (Keep-Alive), got %d", b) + } +} + +func TestCorrelationService_OneToOne_ConsumeB(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: false, + NetworkEmit: false, + MatchingMode: MatchingModeOneToOne, // Consume B after match + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: DefaultNetworkTTLS, + } + + svc := NewCorrelationService(config, timeProvider) + + // Send B event first + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"ja3": "abc123"}, + } + svc.ProcessEvent(networkEvent) + + // Send first A event - should correlate and consume B + apacheEvent1 := &NormalizedEvent{ + Source: SourceA, + Timestamp: now.Add(500 * time.Millisecond), + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + results := svc.ProcessEvent(apacheEvent1) + if len(results) != 1 { + t.Fatalf("expected 1 correlated result, got %d", len(results)) + } + + // Send second A event - should NOT correlate (B was consumed) + apacheEvent2 := &NormalizedEvent{ + Source: SourceA, + Timestamp: now.Add(1 * time.Second), + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + results = svc.ProcessEvent(apacheEvent2) + if len(results) != 0 { + t.Errorf("expected 0 results (B consumed), got %d", len(results)) + } + + // Verify both buffers are empty + a, b := svc.GetBufferSizes() + if a != 1 { + t.Errorf("expected A buffer size 1 (second A buffered), got %d", a) + } + if b != 0 { + t.Errorf("expected B buffer empty (consumed), got %d", b) + } +} + +func TestCorrelationService_NetworkTTL_ResetOnMatch(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: 5 * time.Second, // 5 seconds time window for correlation + ApacheAlwaysEmit: false, + NetworkEmit: false, + MatchingMode: MatchingModeOneToMany, + MaxHTTPBufferSize: DefaultMaxHTTPBufferSize, + MaxNetworkBufferSize: DefaultMaxNetworkBufferSize, + NetworkTTLS: 10, // 10 seconds TTL for B events + } + + svc := NewCorrelationService(config, timeProvider) + + // Send B event + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + svc.ProcessEvent(networkEvent) + + // Verify B is in buffer + _, b := svc.GetBufferSizes() + if b != 1 { + t.Fatalf("expected B in buffer, got %d", b) + } + + // Advance time by 3 seconds (before TTL expires) + timeProvider.now = now.Add(3 * time.Second) + + // Send A event with timestamp within time window of B + // A's timestamp is t=3s, B's timestamp is t=0s, diff = 3s < 5s (time_window) + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: timeProvider.now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + results := svc.ProcessEvent(apacheEvent) + if len(results) != 1 { + t.Fatalf("expected 1 correlated result, got %d", len(results)) + } + + // B should still be in buffer (TTL reset) + _, b = svc.GetBufferSizes() + if b != 1 { + t.Errorf("expected B still in buffer after TTL reset, got %d", b) + } + + // Advance time by 7 more seconds (total 10s from start, 7s from last match) + timeProvider.now = now.Add(10 * time.Second) + + // B should still be alive (TTL was reset to 10s from t=3s, so expires at t=13s) + svc.cleanExpired() + _, b = svc.GetBufferSizes() + if b != 1 { + t.Errorf("expected B still alive after TTL reset, got %d", b) + } + + // Advance time past the reset TTL (t=14s > t=13s) + timeProvider.now = now.Add(14 * time.Second) + svc.cleanExpired() + _, b = svc.GetBufferSizes() + if b != 0 { + t.Errorf("expected B expired after reset TTL, got %d", b) + } +} diff --git a/internal/ports/source.go b/internal/ports/source.go index 28dc81e..a3a7fec 100644 --- a/internal/ports/source.go +++ b/internal/ports/source.go @@ -32,6 +32,10 @@ type CorrelatedLogSink interface { // Name returns the sink name. Name() string + + // Reopen closes and reopens the sink (for log rotation on SIGHUP). + // Optional: only FileSink implements this. + Reopen() error } // CorrelationProcessor defines the interface for the correlation service. diff --git a/logcorrelator.service b/logcorrelator.service index c40b105..11713c0 100644 --- a/logcorrelator.service +++ b/logcorrelator.service @@ -7,6 +7,7 @@ Type=simple User=logcorrelator Group=logcorrelator ExecStart=/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml +ExecReload=/bin/kill -HUP $MAINPID Restart=on-failure RestartSec=5 diff --git a/packaging/rpm/logcorrelator.spec b/packaging/rpm/logcorrelator.spec index 40e6e80..c5aef55 100644 --- a/packaging/rpm/logcorrelator.spec +++ b/packaging/rpm/logcorrelator.spec @@ -2,7 +2,7 @@ # Compatible with CentOS 7, Rocky Linux 8, 9, 10 # Define version before Version: field for RPM macro support -%global spec_version 1.0.9 +%global spec_version 1.1.0 Name: logcorrelator Version: %{spec_version} @@ -38,6 +38,7 @@ mkdir -p %{buildroot}/usr/share/logcorrelator mkdir -p %{buildroot}/var/log/logcorrelator mkdir -p %{buildroot}/var/run/logcorrelator mkdir -p %{buildroot}/etc/systemd/system +mkdir -p %{buildroot}/etc/logrotate.d # Install binary install -m 0755 %{_sourcedir}/logcorrelator %{buildroot}/usr/bin/logcorrelator @@ -49,6 +50,9 @@ install -m 0640 %{_sourcedir}/logcorrelator.yml %{buildroot}/usr/share/logcorrel # Install systemd service install -m 0644 %{_sourcedir}/logcorrelator.service %{buildroot}/etc/systemd/system/logcorrelator.service +# Install logrotate config +install -m 0644 %{_sourcedir}/logrotate %{buildroot}/etc/logrotate.d/logcorrelator + %post # Create logcorrelator user and group if ! getent group logcorrelator >/dev/null 2>&1; then @@ -114,27 +118,64 @@ fi /var/log/logcorrelator /var/run/logcorrelator /etc/systemd/system/logcorrelator.service +/etc/logrotate.d/logcorrelator %changelog -* Sat Feb 28 2026 logcorrelator - 1.0.3-1 +* Mon Mar 02 2026 logcorrelator - 1.1.0-1 +- Feat: Keep-Alive support (one-to-many correlation mode) +- Feat: Dynamic TTL for network events (source B) +- Feat: Separate buffer sizes for HTTP and network events +- Feat: SIGHUP signal handling for log rotation +- Feat: File sink Reopen() method for log rotation +- Feat: logrotate configuration included +- Feat: ExecReload added to systemd service +- Feat: New YAML config structure (time_window, orphan_policy, matching, buffers, ttl) +- Docs: Updated architecture.yml and config.example.yml + +* Sat Feb 28 2026 logcorrelator - 1.0.7-1 +- Added: Log levels DEBUG, INFO, WARN, ERROR configurable via log.level +- Added: Warn and Warnf methods for warning messages +- Added: Debug logs for events received from sockets and correlations +- Added: Warning logs for orphan events and buffer overflow +- Changed: Configuration log.enabled replaced by log.level +- Changed: Orphan events and buffer overflow now logged as WARN instead of DEBUG + +* Sat Feb 28 2026 logcorrelator - 1.0.6-1 +- Changed: Configuration YAML simplified, removed service.name, service.language +- Changed: Correlation config simplified, time_window_s instead of nested object +- Changed: Orphan policy simplified to emit_orphans boolean +- Changed: Apache socket renamed to http.socket +- Added: socket_permissions option on unix sockets + +* Sat Feb 28 2026 logcorrelator - 1.0.5-1 +- Added: Systemd service auto-start after RPM installation +- Added: Systemd service hardening (TimeoutStartSec, TimeoutStopSec, ReadWritePaths) +- Fixed: Systemd service unit correct config path (.yml instead of .conf) +- Fixed: CI workflow branch name main to master +- Changed: RPM packaging generic el8/el9/el10 directory naming + +* Sat Feb 28 2026 logcorrelator - 1.0.4-1 - Breaking: Flattened JSON output structure - removed apache and network subdivisions - All log fields now merged into single-level JSON structure - ClickHouse schema: replaced apache JSON and network JSON columns with fields JSON column - Custom MarshalJSON() implementation for flat output +* Sat Feb 28 2026 logcorrelator - 1.0.3-1 +- Fix: Added missing ClickHouse driver dependency +- Fix: Fixed race condition in orchestrator +- Security: Added explicit source_type configuration for Unix socket sources +- Added: Comprehensive test suite improvements +- Added: Test coverage improved from 50.6% to 62.0% + * Sat Feb 28 2026 logcorrelator - 1.0.2-1 -- Fix: durcir la validation et fiabiliser flush/arrêt idempotents -- Refactor: remove Debian/DEB packaging, RPM-only support -- Feat: add multi-distro RPM packaging for CentOS 7 and Rocky Linux 8/9/10 -- Feat: migrate configuration from custom format to YAML -- Refactor: remove obsolete config and update documentation +- Added: Initial RPM packaging support for Rocky Linux 8/9 and AlmaLinux 10 +- Added: Docker multi-stage build pipeline +- Added: Hexagonal architecture implementation +- Added: Unix socket input sources (JSON line protocol) +- Added: File output sink (JSON lines) +- Added: ClickHouse output sink with batching and retry logic +- Added: Time-window based correlation on src_ip + src_port +- Added: Graceful shutdown with signal handling (SIGINT, SIGTERM) * Sat Feb 28 2026 logcorrelator - 1.0.1-1 -- Fix: durcir la validation et fiabiliser flush/arrêt idempotents -- Refactor: remove Debian/DEB packaging, RPM-only support -- Feat: add multi-distro RPM packaging for CentOS 7 and Rocky Linux 8/9/10 -- Feat: migrate configuration from custom format to YAML -- Refactor: remove obsolete config and update documentation - -* Sat Feb 28 2026 logcorrelator - 1.0.0-1 - Initial package for CentOS 7, Rocky Linux 8, 9, 10 diff --git a/packaging/rpm/logrotate b/packaging/rpm/logrotate new file mode 100644 index 0000000..1fed21e --- /dev/null +++ b/packaging/rpm/logrotate @@ -0,0 +1,13 @@ +/var/log/logcorrelator/correlated.log { + daily + rotate 7 + compress + delaycompress + missingok + notifempty + create 0640 logcorrelator logcorrelator + sharedscripts + postrotate + /bin/systemctl reload logcorrelator > /dev/null 2>&1 || true + endscript +} diff --git a/packaging/rpm/post b/packaging/rpm/post index 035c612..5f20925 100644 --- a/packaging/rpm/post +++ b/packaging/rpm/post @@ -41,6 +41,11 @@ if [ ! -f /etc/logcorrelator/logcorrelator.yml ]; then chmod 640 /etc/logcorrelator/logcorrelator.yml fi +# Set permissions for logrotate config +if [ -f /etc/logrotate.d/logcorrelator ]; then + chmod 644 /etc/logrotate.d/logcorrelator +fi + # Reload systemd if [ -x /bin/systemctl ]; then systemctl daemon-reload