#!/usr/bin/env bash # ============================================================================= # reload-prod-logs.sh — Exporte les http_logs de prod et les réimporte dans la # DB de dev avec un décalage de date dynamique. # # Usage: # ./scripts/reload-prod-logs.sh # décalage auto (max(time) → now) # ./scripts/reload-prod-logs.sh --shift 3600 # décalage de 3600 secondes # ./scripts/reload-prod-logs.sh --days 7 # n'exporte que les N derniers jours # ./scripts/reload-prod-logs.sh --cron # mode silencieux (pour crontab) # # Variables d'environnement : # PROD_CH_HOST Host ClickHouse de prod (défaut: test-sdv-anubis.sdv.fr) # PROD_CH_PORT Port HTTP de prod (défaut: 8123) # PROD_CH_USER Utilisateur prod (défaut: admin) # PROD_CH_PASSWORD Mot de passe prod (défaut: SuperPassword123!) # PROD_CH_DB Base de données prod (défaut: mabase_prod) # DEV_CONTAINER Nom du conteneur ClickHouse dev (défaut: integration-clickhouse-1) # DEV_DB_LOGS Base de données dev logs (défaut: ja4_logs) # BATCH_SIZE Nombre de lignes par batch (défaut: 200000) # ============================================================================= set -euo pipefail # ── Configuration ──────────────────────────────────────────────────────────── PROD_CH_HOST="${PROD_CH_HOST:-test-sdv-anubis.sdv.fr}" PROD_CH_PORT="${PROD_CH_PORT:-8123}" PROD_CH_USER="${PROD_CH_USER:-admin}" PROD_CH_PASSWORD="${PROD_CH_PASSWORD:-SuperPassword123!}" PROD_CH_DB="${PROD_CH_DB:-mabase_prod}" DEV_CONTAINER="${DEV_CONTAINER:-integration-clickhouse-1}" DEV_DB_LOGS="${DEV_DB_LOGS:-ja4_logs}" BATCH_SIZE="${BATCH_SIZE:-200000}" SHIFT_SECONDS="" DAYS_FILTER="" CRON_MODE=false # ── Parsing des arguments ──────────────────────────────────────────────────── while [[ $# -gt 0 ]]; do case "$1" in --shift) SHIFT_SECONDS="$2"; shift 2 ;; --days) DAYS_FILTER="$2"; shift 2 ;; --cron) CRON_MODE=true; shift ;; -h|--help) head -15 "$0" | grep -E '^#' | sed 's/^# \?//' exit 0 ;; *) echo "Option inconnue : $1"; exit 1 ;; esac done TMPDIR="${TMPDIR:-/tmp}" EXPORT_FILE="${TMPDIR}/prod_http_logs_export.native" LOG_PREFIX="[reload-prod-logs]" log() { if [ "$CRON_MODE" = false ]; then echo "${LOG_PREFIX} $(date '+%H:%M:%S') $*" fi } err() { echo "${LOG_PREFIX} ERREUR: $*" >&2 exit 1 } # ── Fonctions utilitaires ──────────────────────────────────────────────────── prod_query() { # Exécute une requête sur le CH de prod et retourne le résultat curl -sf \ -H "X-ClickHouse-User: ${PROD_CH_USER}" \ -H "X-ClickHouse-Key: ${PROD_CH_PASSWORD}" \ "http://${PROD_CH_HOST}:${PROD_CH_PORT}/" \ --data-binary "$1" } prod_query_to_file() { # Exécute une requête sur le CH de prod et écrit la sortie dans un fichier curl -sf \ -H "X-ClickHouse-User: ${PROD_CH_USER}" \ -H "X-ClickHouse-Key: ${PROD_CH_PASSWORD}" \ "http://${PROD_CH_HOST}:${PROD_CH_PORT}/" \ --data-binary "$1" \ -o "$2" } dev_query() { # Exécute une requête sur le CH de dev docker exec -i "${DEV_CONTAINER}" clickhouse-client --query "$1" } dev_insert_native() { # Insère des données au format Native dans le CH de dev docker exec -i "${DEV_CONTAINER}" clickhouse-client \ --query "INSERT INTO ${DEV_DB_LOGS}._staging_prod_import FORMAT Native" < "$1" } # ── Vérifications ──────────────────────────────────────────────────────────── log "Vérification de la connexion à prod (${PROD_CH_HOST})…" PROD_COUNT=$(prod_query "SELECT count() FROM ${PROD_CH_DB}.http_logs") \ || err "Impossible de contacter le ClickHouse de prod" log " Prod : ${PROD_COUNT} lignes dans http_logs" log "Vérification du conteneur dev (${DEV_CONTAINER})…" DEV_COUNT=$(dev_query "SELECT count() FROM ${DEV_DB_LOGS}.http_logs") \ || err "Impossible de contacter le ClickHouse de dev" log " Dev : ${DEV_COUNT} lignes dans http_logs" # ── Calcul du décalage de date ─────────────────────────────────────────────── if [ -z "${SHIFT_SECONDS}" ]; then # Calcul automatique : on décale pour que max(time) de prod = now() PROD_MAX_TS=$(prod_query "SELECT toUnixTimestamp(max(time)) FROM ${PROD_CH_DB}.http_logs") NOW_TS=$(date +%s) SHIFT_SECONDS=$(( NOW_TS - PROD_MAX_TS )) log "Décalage auto : ${SHIFT_SECONDS}s (prod max → now)" fi log "Décalage appliqué : ${SHIFT_SECONDS} secondes ($(( SHIFT_SECONDS / 3600 ))h)" # ── Filtre temporel ────────────────────────────────────────────────────────── WHERE_CLAUSE="" if [ -n "${DAYS_FILTER}" ]; then WHERE_CLAUSE="WHERE time >= now() - toIntervalDay(${DAYS_FILTER})" log "Filtre : ${DAYS_FILTER} derniers jours" fi # ── Création de la table de staging ────────────────────────────────────────── log "Création de la table de staging…" dev_query "DROP TABLE IF EXISTS ${DEV_DB_LOGS}._staging_prod_import" dev_query "CREATE TABLE ${DEV_DB_LOGS}._staging_prod_import AS ${DEV_DB_LOGS}.http_logs ENGINE = MergeTree() ORDER BY (time, src_ip) SETTINGS index_granularity = 8192" # ── Export par batch ───────────────────────────────────────────────────────── # Compter les lignes à exporter EXPORT_COUNT=$(prod_query "SELECT count() FROM ${PROD_CH_DB}.http_logs ${WHERE_CLAUSE}") log "Export de ${EXPORT_COUNT} lignes en batches de ${BATCH_SIZE}…" NUM_BATCHES=$(( (EXPORT_COUNT + BATCH_SIZE - 1) / BATCH_SIZE )) TOTAL_IMPORTED=0 for (( i=0; i/dev/null || stat -f%z "${EXPORT_FILE}" 2>/dev/null || echo 0) if [ "${FILE_SIZE}" -eq 0 ]; then log " Batch vide, arrêt." break fi dev_insert_native "${EXPORT_FILE}" \ || err "Import batch $((i+1)) échoué" BATCH_ROWS=$(dev_query "SELECT count() FROM ${DEV_DB_LOGS}._staging_prod_import" ) BATCH_IMPORTED=$(( BATCH_ROWS - TOTAL_IMPORTED )) TOTAL_IMPORTED=${BATCH_ROWS} log " → ${BATCH_IMPORTED} lignes importées (total staging: ${TOTAL_IMPORTED})" rm -f "${EXPORT_FILE}" done log "Staging terminé : ${TOTAL_IMPORTED} lignes." # ── Insertion avec décalage de date ────────────────────────────────────────── log "Nettoyage de la table cible (données de prod précédentes)…" # On identifie les données de prod importées par le décalage — on tronque tout # car les données de dev sont générées par le traffic seeder / intégration dev_query "TRUNCATE TABLE ${DEV_DB_LOGS}.http_logs" log "Insertion avec décalage de ${SHIFT_SECONDS}s dans ${DEV_DB_LOGS}.http_logs…" dev_query " INSERT INTO ${DEV_DB_LOGS}.http_logs SELECT time + toIntervalSecond(${SHIFT_SECONDS}) AS time, toDate(time + toIntervalSecond(${SHIFT_SECONDS})) AS log_date, src_ip, src_port, dst_ip, dst_port, src_asn, src_country_code, src_as_name, src_org, src_domain, method, scheme, host, path, query, http_version, orphan_side, correlated, keepalives, a_timestamp, b_timestamp, conn_id, ip_meta_df, ip_meta_id, ip_meta_total_length, ip_meta_ttl, tcp_meta_options, tcp_meta_window_size, tcp_meta_mss, tcp_meta_window_scale, syn_to_clienthello_ms, tls_version, tls_sni, tls_alpn, ja3, ja3_hash, ja4, client_headers, header_user_agent, header_accept, header_accept_encoding, header_accept_language, header_content_type, header_x_request_id, header_x_trace_id, header_x_forwarded_for, header_sec_ch_ua, header_sec_ch_ua_mobile, header_sec_ch_ua_platform, header_sec_fetch_dest, header_sec_fetch_mode, header_sec_fetch_site, anubis_bot_name, anubis_bot_action, anubis_bot_category FROM ${DEV_DB_LOGS}._staging_prod_import " FINAL_COUNT=$(dev_query "SELECT count() FROM ${DEV_DB_LOGS}.http_logs") FINAL_MIN=$(dev_query "SELECT min(time) FROM ${DEV_DB_LOGS}.http_logs") FINAL_MAX=$(dev_query "SELECT max(time) FROM ${DEV_DB_LOGS}.http_logs") log "Nettoyage de la table de staging…" dev_query "DROP TABLE IF EXISTS ${DEV_DB_LOGS}._staging_prod_import" rm -f "${EXPORT_FILE}" # ── Résultat ───────────────────────────────────────────────────────────────── log "════════════════════════════════════════════════════" log " Import terminé : ${FINAL_COUNT} lignes dans ${DEV_DB_LOGS}.http_logs" log " Plage : ${FINAL_MIN} → ${FINAL_MAX}" log " Décalage appliqué : ${SHIFT_SECONDS}s" log "════════════════════════════════════════════════════"