From fc882dd3e72e31ed364e897af315ff9e21c6bb90 Mon Sep 17 00:00:00 2001 From: toto Date: Wed, 8 Apr 2026 11:35:34 +0200 Subject: [PATCH] feat(tests): realistic traffic seeder + IP diversity via mod_remoteip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Option A — X-Forwarded-For + mod_remoteip: - httpd-integration.conf: load mod_remoteip, trust all Docker RFC-1918 subnets (172/192.168/10). mod_reqin_log uses r->useragent_ip which mod_remoteip updates from XFF → each request logged with distinct src_ip - generate_traffic.py: XFF always set (was 30% only); human scenarios use 91.121/78.41/90.x ranges, bot scenarios use 185.220/45.155/193.32; pool of 1168 human IPs and 180 bot IPs; default --requests 500 Option D — Direct ClickHouse seeder (seed_clickhouse.py, stdlib only): - Inserts ~4000 rows into http_logs_raw triggering full MV chain: http_logs_raw → mv_http_logs → http_logs → mv_agg_host_ip_ja4_1h → agg_host_ip_ja4_1h • 720 human sessions: IPs in OVH/SFR/Orange ASN ranges (16276/15557/3215) → dict_asn_reputation maps these to asn_label='human' → satisfies bot_detector human_baseline >= 500 threshold • 150 scanner sessions: datacenter IPs, attack paths (/.env, wp-login, SQLi, path traversal), scanner UAs, minimal TCP fingerprints • 100 known-bot sessions: IPs matching bot_ip.csv entries • 20 brute-force clusters: 20-50 POST /login per IP All TCP/TLS metadata is profile-realistic (window, MSS, TTL, JA4, JA3) CSV stubs (mounted at /var/lib/clickhouse/user_files/): - iplocate-ip-to-asn.csv: 13 CIDR→ASN mappings (OVH/SFR/Orange/Tor/Contabo) - asn_reputation.csv: 13 ASN→label (8 'human', 3 'datacenter'/'hosting') - bot_ip.csv: 14 known scanner/Tor IPs (Shodan, Censys, Tor exits) - bot_ja4.csv: 5 bot JA4 fingerprints (curl, python-requests, masscan, zgrab) run-tests.sh: - Phase 4a: seeder runs before live traffic (ensures bot_detector baseline) - Phase 4b: live traffic gen at 500 requests (up from 200) - Phase 5f: new assertions — agg_host_ip_ja4_1h populated, ≥500 human rows in view_ai_features_1h, known-bot labels present - Phase 7: verifies ml_all_scores populated (bot_detector ran a cycle) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../platform/csv-stubs/asn_reputation.csv | 14 + .../integration/platform/csv-stubs/bot_ip.csv | 14 + .../platform/csv-stubs/bot_ja4.csv | 5 + .../platform/csv-stubs/iplocate-ip-to-asn.csv | 13 + .../platform/httpd-integration.conf | 9 + tests/integration/run-tests.sh | 87 ++- .../traffic-gen/generate_traffic.py | 77 ++- .../traffic-gen/seed_clickhouse.py | 514 ++++++++++++++++++ 8 files changed, 688 insertions(+), 45 deletions(-) create mode 100644 tests/integration/traffic-gen/seed_clickhouse.py diff --git a/tests/integration/platform/csv-stubs/asn_reputation.csv b/tests/integration/platform/csv-stubs/asn_reputation.csv index e69de29..fae37b3 100644 --- a/tests/integration/platform/csv-stubs/asn_reputation.csv +++ b/tests/integration/platform/csv-stubs/asn_reputation.csv @@ -0,0 +1,14 @@ +src_asn,label +16276,human +15557,human +3215,human +5432,human +1136,human +2856,human +8913,human +3352,human +15169,human +8075,human +210644,datacenter +209083,datacenter +197695,hosting diff --git a/tests/integration/platform/csv-stubs/bot_ip.csv b/tests/integration/platform/csv-stubs/bot_ip.csv index e69de29..8ef2d5f 100644 --- a/tests/integration/platform/csv-stubs/bot_ip.csv +++ b/tests/integration/platform/csv-stubs/bot_ip.csv @@ -0,0 +1,14 @@ +185.220.101.34/32,Tor_Exit_Node +185.220.101.47/32,Tor_Exit_Node +185.220.101.52/32,Tor_Exit_Node +185.220.101.73/32,Tor_Exit_Node +185.220.101.91/32,Tor_Exit_Node +185.220.100.253/32,Tor_Exit_Node +45.155.205.233/32,Shodan_Scanner +45.155.205.220/32,Shodan_Scanner +45.155.205.205/32,Shodan_Scanner +45.155.205.190/32,Shodan_Scanner +45.155.205.175/32,Shodan_Scanner +193.32.162.10/32,Censys_Scanner +193.32.162.11/32,Censys_Scanner +193.32.162.25/32,Censys_Scanner diff --git a/tests/integration/platform/csv-stubs/bot_ja4.csv b/tests/integration/platform/csv-stubs/bot_ja4.csv index e69de29..ce23404 100644 --- a/tests/integration/platform/csv-stubs/bot_ja4.csv +++ b/tests/integration/platform/csv-stubs/bot_ja4.csv @@ -0,0 +1,5 @@ +t13d030500_ffd59bab1b39_6e7f7df63e98,curl_scanner +t13d020300_6b9b1b2c3d4e_ffd59bab1b39,python_requests_scanner +t10d170000_0a1b2c3d4e5f_1b2c3d4e5f60,Masscan +t12d050700_5a6b7c8d9e0f_1a2b3c4d5e6f,zgrab_scanner +t13d010100_aabbccddeeff_0011223344aa,Headless_Chrome_Automation diff --git a/tests/integration/platform/csv-stubs/iplocate-ip-to-asn.csv b/tests/integration/platform/csv-stubs/iplocate-ip-to-asn.csv index 33f7b88..ceef6df 100644 --- a/tests/integration/platform/csv-stubs/iplocate-ip-to-asn.csv +++ b/tests/integration/platform/csv-stubs/iplocate-ip-to-asn.csv @@ -1 +1,14 @@ network,asn,country_code,name,org,domain +91.121.0.0/16,16276,FR,OVH SAS,OVH,ovh.com +78.41.0.0/16,15557,FR,SFR SA,SFR,sfr.com +90.0.0.0/8,3215,FR,Orange SA,Orange,orange.fr +212.0.0.0/8,5432,DE,Deutsche Telekom AG,Telekom,telekom.de +84.116.0.0/16,1136,NL,KPN Internet BV,KPN,kpn.com +77.108.0.0/16,2856,GB,BT Group plc,BT,bt.com +82.45.0.0/16,8913,GB,Virgin Media,Virgin Media,virginmedia.com +62.98.0.0/16,3352,ES,Telefonica Spain,Telefonica,telefonica.es +66.249.64.0/19,15169,US,Google LLC,Google,google.com +157.55.0.0/16,8075,US,Microsoft Corporation,Bing,microsoft.com +185.220.0.0/16,210644,NL,Accelerated-IT Services,Tor Project,tor-project.org +45.155.205.0/24,209083,DE,Contabo GmbH,Contabo,contabo.de +193.32.162.0/24,197695,RU,Reg.ru Hosting,Reg.ru,reg.ru diff --git a/tests/integration/platform/httpd-integration.conf b/tests/integration/platform/httpd-integration.conf index 02b2056..cbaf910 100644 --- a/tests/integration/platform/httpd-integration.conf +++ b/tests/integration/platform/httpd-integration.conf @@ -3,6 +3,15 @@ # Load mod-reqin-log LoadModule reqin_log_module modules/mod_reqin_log.so +# mod_remoteip: trust X-Forwarded-For from Docker internal subnets. +# mod_reqin_log reads r->useragent_ip which mod_remoteip updates, +# so the XFF IP appears as src_ip in the correlated logs. +LoadModule remoteip_module modules/mod_remoteip.so +RemoteIPHeader X-Forwarded-For +RemoteIPInternalProxy 172.0.0.0/8 +RemoteIPInternalProxy 192.168.0.0/16 +RemoteIPInternalProxy 10.0.0.0/8 + # Enable mod-reqin-log with correlator socket JsonSockLogEnabled On JsonSockLogSocket "/var/run/logcorrelator/http.socket" diff --git a/tests/integration/run-tests.sh b/tests/integration/run-tests.sh index defa683..3f7bff5 100755 --- a/tests/integration/run-tests.sh +++ b/tests/integration/run-tests.sh @@ -115,10 +115,6 @@ wait_for_service clickhouse 120 wait_for_service platform 120 wait_for_service dashboard 60 -# Give bot-detector time to start (it's expected to fail initially — no data yet) -log "Waiting 10s for bot-detector to initialize..." -sleep 10 - # ============================================================================= # Phase 3: Verify ClickHouse schema # ============================================================================= @@ -157,28 +153,46 @@ for user in data_writer analyst; do done # ============================================================================= -# Phase 4: Generate test traffic +# Phase 4: Seed ClickHouse + Generate test traffic # ============================================================================= log "============================================" -log "Phase 4: Generating test traffic" +log "Phase 4a: Seeding ClickHouse with synthetic data" log "============================================" -# Traffic comes from traffic-gen container (crosses Docker network eth0) -# so sentinel's pcap capture sees TLS ClientHello packets. -# Python generator uses multiple SSL contexts → varied JA4/JA3 fingerprints. -# Both HTTP (port 80) and HTTPS (port 443) requests are sent. -log "Starting Python traffic generator (200 requests, 10 workers)..." +# The seeder inserts directly into http_logs_raw, triggering all MVs: +# http_logs_raw → mv_http_logs → http_logs → mv_agg_host_ip_ja4_1h → agg_host_ip_ja4_1h +# This pre-populates: +# - 720 human sessions (IPs in residential ASN ranges → asn_label='human') +# - 150 scanner/anomaly sessions (IPs in datacenter ASN → ML anomaly candidates) +# - 100 known-bot sessions (IPs/JA4 matching bot_ip.csv / bot_ja4.csv) +# - 20 brute-force clusters (many POST /login per IP) +# After seeding, bot_detector has ≥500 human rows → can train and run. +log "Running seed_clickhouse.py..." +if docker compose exec -T traffic-gen python /app/seed_clickhouse.py \ + --host clickhouse --port 8123 --user default --password ""; then + pass "ClickHouse seeded (700+ human + 150 scanner + 100 known-bot rows)" +else + warn "Seeder reported errors (pipeline verification will show impact)" +fi + +log "============================================" +log "Phase 4b: Generating live test traffic via Apache" +log "============================================" + +# Live traffic crosses the Docker network so sentinel can capture TLS handshakes. +# X-Forwarded-For is always set — mod_remoteip updates r->useragent_ip → diverse src_ips. +log "Starting traffic generator (500 requests, 10 workers)..." if docker compose exec -T traffic-gen python /app/generate_traffic.py \ --host platform --http-port 80 --https-port 443 \ - --requests 200 --workers 10; then - pass "Traffic generation complete (200 requests: browsers, bots, GET/POST/HEAD/PUT/DELETE/OPTIONS)" + --requests 500 --workers 10; then + pass "Traffic generation complete (500 requests with diverse XFF IPs: browsers, bots)" else warn "Traffic generator reported some errors (>80% success still passes)" fi # Wait for correlator to flush all batches to ClickHouse -log "Waiting 15s for correlator to flush..." -sleep 15 +log "Waiting 20s for correlator to flush and bot-detector first cycle..." +sleep 20 # ============================================================================= # Phase 5: Verify data pipeline @@ -190,7 +204,7 @@ log "============================================" # 5a. Raw logs ingested RAW_COUNT=$(ch_query "SELECT count() FROM ja4_logs.http_logs_raw") if [ "$RAW_COUNT" -gt 0 ] 2>/dev/null; then - pass "Raw logs ingested: $RAW_COUNT rows in http_logs_raw" + pass "Raw logs ingested: $RAW_COUNT rows in http_logs_raw (seeder + live traffic)" else fail "No raw logs in http_logs_raw (correlator → ClickHouse failed)" # Debug @@ -252,6 +266,35 @@ else warn "Correlator file output empty" fi +# 5f. Verify seeder data reached agg table and AI features view +AGG_COUNT=$(ch_query "SELECT count() FROM ja4_processing.agg_host_ip_ja4_1h") +HUMAN_COUNT=$(ch_query "SELECT count() FROM ja4_processing.view_ai_features_1h WHERE asn_label='human'") +BOT_LABEL_COUNT=$(ch_query "SELECT count() FROM ja4_processing.view_ai_features_1h WHERE bot_name != ''") +UNIQ_SRC_IPS=$(ch_query "SELECT count(DISTINCT src_ip) FROM ja4_processing.view_ai_features_1h") +UNIQ_JA4=$(ch_query "SELECT count(DISTINCT ja4) FROM ja4_processing.view_ai_features_1h") + +if [ "$AGG_COUNT" -gt 0 ] 2>/dev/null; then + pass "Aggregation table populated: $AGG_COUNT sessions in agg_host_ip_ja4_1h" +else + fail "agg_host_ip_ja4_1h empty (MV chain broken)" +fi + +if [ "$HUMAN_COUNT" -ge 500 ] 2>/dev/null; then + pass "Bot-detector baseline: $HUMAN_COUNT human sessions (≥500 threshold met)" +elif [ "$HUMAN_COUNT" -gt 0 ] 2>/dev/null; then + warn "Human sessions below threshold: $HUMAN_COUNT < 500 (bot_detector will skip cycle)" +else + fail "No human sessions in view_ai_features_1h (asn_reputation CSV not loaded?)" +fi + +if [ "$BOT_LABEL_COUNT" -gt 0 ] 2>/dev/null; then + pass "Known bots labeled: $BOT_LABEL_COUNT sessions with bot_name (bot_ip/bot_ja4 dicts working)" +else + warn "No known-bot labels in view_ai_features_1h (bot_ip.csv / bot_ja4.csv empty?)" +fi + +log " Unique src_ips: $UNIQ_SRC_IPS | Unique JA4: $UNIQ_JA4" + # ============================================================================= # Phase 6: Verify dashboard API # ============================================================================= @@ -305,7 +348,17 @@ for line in sys.stdin: if [ "$BOT_STATUS" = "running" ]; then pass "Bot-detector is running" else - warn "Bot-detector state: $BOT_STATUS (may need more data to start properly)" + warn "Bot-detector state: $BOT_STATUS" +fi + +# Check if bot-detector successfully ran a detection cycle (not just SKIPPED_LOW_DATA) +BD_SCORES=$(ch_query "SELECT count() FROM ja4_processing.ml_all_scores" 2>/dev/null || echo 0) +BD_ANOMALIES=$(ch_query "SELECT count() FROM ja4_processing.ml_detected_anomalies" 2>/dev/null || echo 0) +if [ "$BD_SCORES" -gt 0 ] 2>/dev/null; then + pass "Bot-detector scored traffic: $BD_SCORES rows in ml_all_scores, $BD_ANOMALIES anomalies detected" +else + warn "ml_all_scores is empty — bot-detector may not have completed a cycle yet" + warn " (check: docker compose logs bot-detector | grep -E 'CYCLE|SKIP|train')" fi # ============================================================================= diff --git a/tests/integration/traffic-gen/generate_traffic.py b/tests/integration/traffic-gen/generate_traffic.py index 228d5cd..0ed9498 100644 --- a/tests/integration/traffic-gen/generate_traffic.py +++ b/tests/integration/traffic-gen/generate_traffic.py @@ -9,13 +9,13 @@ Simulates varied web traffic including: - Varied paths, query strings, form data, JSON payloads - Both HTTP (port 80) and HTTPS (port 443) - Different Accept/Language/Encoding headers - - Cookie / Referer / X-Forwarded-For variations - - Burst mode and sequential scenarios + - Cookie / Referer / X-Forwarded-For always set — ensures src_ip diversity + in ClickHouse via mod_remoteip (r->useragent_ip updated from XFF) - Multiple SSL contexts to vary TLS ClientHello parameters Usage: python generate_traffic.py [--host platform] [--http-port 80] [--https-port 443] - [--requests 200] [--workers 10] [--scenario all] + [--requests 500] [--workers 10] [--scenario all] """ import argparse @@ -148,14 +148,34 @@ FORM_BODIES = [ "q=test+query&submit=Search", ] -XFF_IPS = [ - "1.2.3.4", - "192.168.1.100", - "10.0.0.1", - "203.0.113.42", - "185.220.101.34", # Known Tor exit - "45.155.205.233", # Scanning IP -] + +# --------------------------------------------------------------------------- +# IP pools for X-Forwarded-For (mod_remoteip uses this as src_ip in logs) +# Ranges must match iplocate-ip-to-asn.csv entries so ASN lookup succeeds. +# +# HUMAN — residential ISP ranges → asn_label='human' → feeds ML baseline +HUMAN_IPS = ( + # OVH FR (ASN 16276) — 91.121.0.0/16 + [f"91.121.{o3}.{o4}" for o3 in range(0, 12) for o4 in range(1, 60)] + # SFR FR (ASN 15557) — 78.41.0.0/16 + + [f"78.41.{o3}.{o4}" for o3 in range(0, 4) for o4 in range(1, 40)] + # Orange FR (ASN 3215) — 90.x.x.x + + [f"90.{o2}.{o3}.{o4}" for o2 in range(10, 14) for o3 in range(0, 4) for o4 in range(1, 20)] +) +random.shuffle(HUMAN_IPS) + +# DATACENTER/BOT — scanner/Tor ranges → asn_label='datacenter' → ML scores these +BOT_IPS = ( + # Tor exits / Accelerated-IT (ASN 210644) — 185.220.101.x + [f"185.220.101.{i}" for i in range(1, 101)] + # Contabo scanner (ASN 209083) — 45.155.205.x + + [f"45.155.205.{i}" for i in range(1, 51)] + # Reg.ru (ASN 197695) — 193.32.162.x + + [f"193.32.162.{i}" for i in range(1, 31)] +) + +# Legacy alias (kept for existing code) +XFF_IPS = HUMAN_IPS[:20] + BOT_IPS[:10] # --------------------------------------------------------------------------- @@ -221,7 +241,7 @@ class RequestScenario: label: str = "" -def _random_headers(ua: str, is_bot: bool = False) -> dict: +def _random_headers(ua: str, is_bot: bool = False, xff_ip: str = None) -> dict: headers = { "User-Agent": ua, "Accept": random.choice([ @@ -238,6 +258,11 @@ def _random_headers(ua: str, is_bot: bool = False) -> dict: ]), "Accept-Language": random.choice(ACCEPT_LANGS), "Connection": random.choice(["keep-alive", "close"]), + # X-Forwarded-For: always set so mod_remoteip gives each request a + # distinct src_ip in the ClickHouse pipeline (r->useragent_ip). + "X-Forwarded-For": xff_ip or ( + random.choice(BOT_IPS) if is_bot else random.choice(HUMAN_IPS) + ), } # Sec-Fetch headers (browsers only) @@ -251,10 +276,6 @@ def _random_headers(ua: str, is_bot: bool = False) -> dict: if ref: headers["Referer"] = ref - # X-Forwarded-For sometimes (proxy simulation) - if random.random() < 0.3: - headers["X-Forwarded-For"] = random.choice(XFF_IPS) - # Cache headers if random.random() < 0.4: headers["Cache-Control"] = random.choice(["no-cache", "max-age=0", "no-store"]) @@ -283,7 +304,7 @@ def build_scenarios(host: str, http_port: int, https_port: int, count: int) -> l scenarios.append(RequestScenario( method="GET", url=f"{base_https}{path}{qs}", - headers=_random_headers(ua), + headers=_random_headers(ua, xff_ip=random.choice(HUMAN_IPS)), ssl_ctx=ssl_ctx, label=f"browser-https-{ssl_name}", )) @@ -296,7 +317,7 @@ def build_scenarios(host: str, http_port: int, https_port: int, count: int) -> l scenarios.append(RequestScenario( method="GET", url=f"{base_http}{path}{qs}", - headers=_random_headers(ua), + headers=_random_headers(ua, xff_ip=random.choice(HUMAN_IPS)), label="browser-http", )) @@ -308,7 +329,7 @@ def build_scenarios(host: str, http_port: int, https_port: int, count: int) -> l scenarios.append(RequestScenario( method="GET", url=f"{base_https}{path}", - headers=_random_headers(ua, is_bot=True), + headers=_random_headers(ua, is_bot=True, xff_ip=random.choice(BOT_IPS)), ssl_ctx=ssl_ctx, label=f"bot-https-{ssl_name}", )) @@ -320,7 +341,7 @@ def build_scenarios(host: str, http_port: int, https_port: int, count: int) -> l scenarios.append(RequestScenario( method="GET", url=f"{base_http}{path}", - headers=_random_headers(ua, is_bot=True), + headers=_random_headers(ua, is_bot=True, xff_ip=random.choice(BOT_IPS)), label="bot-http", )) @@ -329,7 +350,7 @@ def build_scenarios(host: str, http_port: int, https_port: int, count: int) -> l ua = random.choice(BROWSERS) body_str = random.choice(JSON_BODIES) body = body_str.encode() - hdrs = _random_headers(ua) + hdrs = _random_headers(ua, xff_ip=random.choice(HUMAN_IPS)) hdrs["Content-Type"] = "application/json" hdrs["Content-Length"] = str(len(body)) _, ssl_ctx = random.choice(SSL_CONTEXTS) @@ -347,7 +368,7 @@ def build_scenarios(host: str, http_port: int, https_port: int, count: int) -> l ua = random.choice(BROWSERS + BOTS) body_str = random.choice(FORM_BODIES) body = body_str.encode() - hdrs = _random_headers(ua) + hdrs = _random_headers(ua, xff_ip=random.choice(BOT_IPS)) hdrs["Content-Type"] = "application/x-www-form-urlencoded" hdrs["Content-Length"] = str(len(body)) scenarios.append(RequestScenario( @@ -365,7 +386,7 @@ def build_scenarios(host: str, http_port: int, https_port: int, count: int) -> l scenarios.append(RequestScenario( method="HEAD", url=f"{base_https}{random.choice(PATHS)}", - headers=_random_headers(ua), + headers=_random_headers(ua, xff_ip=random.choice(HUMAN_IPS)), ssl_ctx=ssl_ctx, label="head-https", )) @@ -374,7 +395,7 @@ def build_scenarios(host: str, http_port: int, https_port: int, count: int) -> l for _ in range(int(count * 0.05)): ua = random.choice(BROWSERS) body = json.dumps({"id": random.randint(1, 999), "value": "updated"}).encode() - hdrs = _random_headers(ua) + hdrs = _random_headers(ua, xff_ip=random.choice(HUMAN_IPS)) hdrs["Content-Type"] = "application/json" hdrs["Content-Length"] = str(len(body)) _, ssl_ctx = random.choice(SSL_CONTEXTS) @@ -394,7 +415,7 @@ def build_scenarios(host: str, http_port: int, https_port: int, count: int) -> l scenarios.append(RequestScenario( method="DELETE", url=f"{base_https}/api/v1/users/{random.randint(1,999)}", - headers=_random_headers(ua), + headers=_random_headers(ua, xff_ip=random.choice(HUMAN_IPS)), ssl_ctx=ssl_ctx, label="delete-https", )) @@ -402,7 +423,7 @@ def build_scenarios(host: str, http_port: int, https_port: int, count: int) -> l # --- OPTIONS (CORS preflight) --- for _ in range(int(count * 0.03)): ua = random.choice(BROWSERS) - hdrs = _random_headers(ua) + hdrs = _random_headers(ua, xff_ip=random.choice(HUMAN_IPS)) hdrs["Origin"] = random.choice(["https://app.example.com", "http://localhost:3000"]) hdrs["Access-Control-Request-Method"] = random.choice(["POST", "PUT", "DELETE"]) _, ssl_ctx = random.choice(SSL_CONTEXTS) @@ -421,7 +442,7 @@ def build_scenarios(host: str, http_port: int, https_port: int, count: int) -> l scenarios.append(RequestScenario( method="GET", url=f"{base_https}/health?filler={random.randint(1,9999)}", - headers=_random_headers(ua), + headers=_random_headers(ua, xff_ip=random.choice(HUMAN_IPS)), ssl_ctx=ssl_ctx, label="filler-https", )) @@ -497,7 +518,7 @@ if __name__ == "__main__": parser.add_argument("--host", default="platform") parser.add_argument("--http-port", type=int, default=80) parser.add_argument("--https-port", type=int, default=443) - parser.add_argument("--requests", type=int, default=200) + parser.add_argument("--requests", type=int, default=500) parser.add_argument("--workers", type=int, default=10) args = parser.parse_args() diff --git a/tests/integration/traffic-gen/seed_clickhouse.py b/tests/integration/traffic-gen/seed_clickhouse.py new file mode 100644 index 0000000..a4d9590 --- /dev/null +++ b/tests/integration/traffic-gen/seed_clickhouse.py @@ -0,0 +1,514 @@ +#!/usr/bin/env python3 +""" +seed_clickhouse.py — Bootstrap ClickHouse with realistic synthetic traffic data. + +Inserts directly into ja4_logs.http_logs_raw (triggers all MVs automatically): + • 700 human sessions — IPs in residential ISP ranges (ASN→'human' via dict) + • 150 datacenter/scanner sessions — anomalous patterns for ML detection + • 100 known-bot sessions — IPs/JA4 in bot_ip.csv / bot_ja4.csv + +This ensures view_ai_features_1h has ≥ 500 human rows for the bot_detector +training threshold (run_semi_supervised_logic requires len(human_baseline) >= 500). + +All timestamps are within the last 30 minutes so the 24h window filter catches them. + +No external dependencies — uses Python stdlib urllib only. + +Usage: + python seed_clickhouse.py + python seed_clickhouse.py --host clickhouse --port 8123 --user default --password "" + python seed_clickhouse.py --dry-run +""" + +import argparse +import hashlib +import json +import random +import time +import urllib.error +import urllib.parse +import urllib.request +from datetime import datetime, timedelta, timezone + +# --------------------------------------------------------------------------- +# JA4 fingerprint profiles (must match bot_ja4.csv for bot detection to work) +# --------------------------------------------------------------------------- + +# Human browser profiles — realistic TLS 1.3 fingerprints +HUMAN_JA4S = [ + "t13d1917h2_b0372614b25a_6a77dcf5a8be", # Chrome 120 Windows TLS1.3 + "t13d1817h2_b0372614b25a_0a3e5785d15f", # Firefox 121 TLS1.3 + "t13d1617h2_fc82e8b7e1c0_9dc949149365", # Safari 17 macOS TLS1.3 + "t13d1917h2_fc82e8b7e1c0_6b9b1b2c3d4e", # Edge 120 TLS1.3 + "t13d1817h2_9dc949149365_8c4a9a4b0d01", # Chrome Mobile TLS1.3 + "t12d1706h2_9dc949149365_fc82e8b7e1c0", # Chrome 120 TLS1.2 (older server) + "t12d1606h2_8c4a9a4b0d01_9dc949149365", # Firefox TLS1.2 +] + +# Bot/scanner profiles — intentionally minimal cipher suites, match bot_ja4.csv +BOT_JA4S = [ + "t13d030500_ffd59bab1b39_6e7f7df63e98", # curl scanner (in bot_ja4.csv) + "t13d020300_6b9b1b2c3d4e_ffd59bab1b39", # python-requests scanner (in bot_ja4.csv) + "t10d170000_0a1b2c3d4e5f_1b2c3d4e5f60", # Masscan (in bot_ja4.csv) + "t12d050700_5a6b7c8d9e0f_1a2b3c4d5e6f", # zgrab (in bot_ja4.csv) + "t13d010100_aabbccddeeff_0011223344aa", # Headless Chrome automation (in bot_ja4.csv) +] + +# --------------------------------------------------------------------------- +# IP pools — must match ranges in iplocate-ip-to-asn.csv +# --------------------------------------------------------------------------- + +# Human residential IPs — OVH FR (ASN 16276) → asn_label='human' +def _human_ips(n: int) -> list: + ips = [f"91.121.{o3}.{o4}" for o3 in range(0, 20) for o4 in range(1, 60)] + random.shuffle(ips) + return ips[:n] + +# Datacenter / scanner IPs — Tor/Contabo/Reg.ru → asn_label='datacenter'/'hosting' +def _scanner_ips(n: int) -> list: + ips = ( + [f"185.220.101.{i}" for i in range(1, 101)] # ASN 210644 datacenter + + [f"45.155.205.{i}" for i in range(1, 51)] # ASN 209083 datacenter + + [f"193.32.162.{i}" for i in range(1, 31)] # ASN 197695 hosting + ) + random.shuffle(ips) + return ips[:n] + +# Known bot IPs (subset also in bot_ip.csv → directly labeled) +BOT_IP_KNOWN = [ + "185.220.101.34", "185.220.101.47", "185.220.101.52", + "185.220.101.73", "185.220.101.91", + "45.155.205.233", "45.155.205.220", "45.155.205.205", + "193.32.162.10", "193.32.162.11", +] + +# --------------------------------------------------------------------------- +# User-Agent pools per profile +# --------------------------------------------------------------------------- +HUMAN_UA = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0", + "Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0", + "Mozilla/5.0 (Linux; Android 13; Pixel 7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.115 Mobile Safari/537.36", + "Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1", +] + +SCANNER_UA = [ + "curl/7.88.1", + "python-requests/2.31.0", + "Masscan/1.3", + "zgrab/0.x", + "Go-http-client/1.1", + "libwww-perl/6.72", + "Java/11.0.18", + "Wget/1.21.3", + "masscan/1.3 (https://github.com/robertdavidgraham/masscan)", + "-", # No User-Agent (raw scanner) +] + +BOT_CRAWLER_UA = [ + "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)", + "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)", + "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)", + "Twitterbot/1.0", + "facebookexternalhit/1.1 (+http://www.facebook.com/externalhit_uatext.php)", + "Googlebot/2.1 (+http://www.google.com/bot.html)", +] + +# --------------------------------------------------------------------------- +# Path pools per profile +# --------------------------------------------------------------------------- +HUMAN_PATHS = [ + "/", "/index.html", "/about", "/contact", "/products", "/services", + "/blog", "/blog/post-1", "/blog/post-2", "/faq", "/pricing", + "/login", "/register", "/profile", "/dashboard", + "/api/v1/users", "/api/v1/status", "/api/v2/metrics", + "/static/js/app.js", "/static/css/main.css", "/images/logo.png", + "/favicon.ico", "/robots.txt", "/sitemap.xml", + "/health", "/search?q=test", "/search?q=product+review", +] + +ATTACK_PATHS = [ + "/.env", "/.git/HEAD", "/.git/config", + "/wp-login.php", "/wp-admin/", "/xmlrpc.php", "/wp-config.php", + "/phpmyadmin/", "/phpMyAdmin/", "/pma/", + "/admin", "/admin/login", "/administrator/", + "/cgi-bin/test.cgi", "/cgi-bin/../etc/passwd", + "/download?file=../../../etc/passwd", "/download?file=../../../../etc/shadow", + "/api/search?q=", + "/api/users?id=1+OR+1%3D1", + "/shell.php", "/cmd.php", "/eval.php", + "/.aws/credentials", "/.ssh/id_rsa", + "/etc/passwd", "/proc/self/environ", +] + +BOT_PATHS = [ + "/robots.txt", "/sitemap.xml", "/", "/index.html", + "/sitemap_index.xml", "/news-sitemap.xml", + "/feed", "/rss.xml", "/atom.xml", +] + +# --------------------------------------------------------------------------- +# TCP / TLS metadata helpers +# --------------------------------------------------------------------------- + +# Realistic TCP options fingerprints per OS +TCP_OPTIONS = { + "linux": "020405b40402080affffffff000000000103030a", # MSS+NOP+SACK+TS+WS=10 + "windows": "020405b40103030801010402", # MSS+NOP+WS+SACK + "macos": "020405ac0103030601010402", # MSS+NOP+WS+SACK (macOS) + "scanner": "0204ffff", # Scanner: only MSS, max value + "minimal": "0204ffd7", # Minimal +} + +def _tcp_meta(profile: str = "linux") -> dict: + profiles = { + "linux": {"window_size": 65535, "mss": 1460, "wscale": 10, "ttl": 64, "df": 1}, + "windows": {"window_size": 64240, "mss": 1460, "wscale": 8, "ttl": 128, "df": 1}, + "macos": {"window_size": 65535, "mss": 1460, "wscale": 6, "ttl": 64, "df": 1}, + "android": {"window_size": 65535, "mss": 1420, "wscale": 9, "ttl": 64, "df": 1}, + "scanner": {"window_size": 1024, "mss": 1460, "wscale": 0, "ttl": 48, "df": 0}, + "minimal": {"window_size": 512, "mss": 576, "wscale": 0, "ttl": 60, "df": 0}, + } + meta = profiles.get(profile, profiles["linux"]) + return { + "tcp_meta_window_size": meta["window_size"] + random.randint(-100, 100), + "tcp_meta_mss": meta["mss"], + "tcp_meta_window_scale": meta["wscale"], + "tcp_meta_options": TCP_OPTIONS.get(profile, TCP_OPTIONS["linux"]), + "ip_meta_ttl": meta["ttl"] - random.randint(0, 5), + "ip_meta_df": meta["df"], + "ip_meta_id": random.randint(1, 65535), + "ip_meta_total_length": random.randint(1200, 1500), + } + + +def _syn_ms(profile: str) -> int: + """Realistic SYN→ClientHello latency in milliseconds.""" + if profile == "scanner": + return random.randint(0, 3) # Scanners: near-instant + if profile in ("minimal",): + return random.randint(1, 5) + return random.randint(10, 120) # Humans: network RTT + + +def _ja3_for_ja4(ja4: str) -> tuple: + """Generate a plausible JA3 string and its MD5 hash matching the JA4 profile.""" + # These are fake but consistent — just need to be non-empty strings + if "tls13" in ja4 or ja4.startswith("t13"): + raw = "771,4866-4867-4865-49196-49200-52393-52392,0-23-65281-10-11-35-16-5-13-18-51-45-43-27,29-23-24,0" + elif ja4.startswith("t12"): + raw = "771,49195-49199-49196-49200-52393-52392,0-23-65281-10-11-35-16-5-13,29-23-24,0" + elif ja4.startswith("t10"): + raw = "769,49161-49162-49171-49172,0-10-11,29-23-24,0" + else: + raw = "771,4866-4867-4865,0-23-65281,29-23-24,0" + md5 = hashlib.md5(raw.encode()).hexdigest() + return raw, md5 + + +# --------------------------------------------------------------------------- +# Row generators +# --------------------------------------------------------------------------- + +def _now_minus(seconds: int) -> str: + """ISO-8601 UTC timestamp N seconds in the past.""" + t = datetime.now(timezone.utc) - timedelta(seconds=seconds) + return t.strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _make_row( + src_ip: str, + ua: str, + path: str, + method: str = "GET", + ja4: str = None, + tcp_profile: str = "linux", + scheme: str = "https", + host: str = "platform", + time_offset_s: int = None, + extra_headers: dict = None, +) -> dict: + """Build a single raw_json dict matching what the correlator produces.""" + if time_offset_s is None: + time_offset_s = random.randint(0, 1700) # spread over last ~28 min + if ja4 is None: + ja4 = random.choice(HUMAN_JA4S) + ja3_raw, ja3_hash = _ja3_for_ja4(ja4) + tcp = _tcp_meta(tcp_profile) + syn_ms = _syn_ms(tcp_profile) + + client_headers = "Host,User-Agent,Accept,Accept-Language,Accept-Encoding" + if extra_headers: + client_headers += "," + ",".join(extra_headers.keys()) + + row = { + "time": _now_minus(time_offset_s), + "src_ip": src_ip, + "src_port": random.randint(1024, 65535), + "dst_ip": "172.20.0.2", + "dst_port": 443 if scheme == "https" else 80, + "method": method, + "scheme": scheme, + "host": host, + "path": path.split("?")[0] if "?" in path else path, + "query": path.split("?")[1] if "?" in path else "", + "http_version": "HTTP/2.0" if ja4.endswith("h2") else "HTTP/1.1", + "orphan_side": "", + "correlated": True, + "keepalives": random.randint(1, 8), + "a_timestamp": int(time.time() * 1_000_000), + "b_timestamp": int(time.time() * 1_000_000) + syn_ms * 1000, + "conn_id": f"seed_{src_ip.replace('.', '_')}_{random.randint(1000,9999)}", + "syn_to_clienthello_ms": syn_ms, + "tls_version": "1.3" if ja4.startswith("t13") else ("1.2" if ja4.startswith("t12") else "1.0"), + "tls_sni": host, + "tls_alpn": "h2" if "h2" in ja4 else "http/1.1", + "ja3": ja3_raw, + "ja3_hash": ja3_hash, + "ja4": ja4, + "client_headers": client_headers, + "header_User-Agent": ua, + "header_Accept": "text/html,application/xhtml+xml,*/*;q=0.8", + "header_Accept-Encoding": "gzip, deflate, br", + "header_Accept-Language": random.choice(["fr-FR,fr;q=0.9", "en-US,en;q=0.9", "de-DE,de;q=0.8"]), + "header_Content-Type": "", + "header_X-Request-Id": "", + "header_X-Trace-Id": "", + "header_X-Forwarded-For": "", + "header_Sec-Fetch-Site": "none" if tcp_profile != "scanner" else "", + "header_Sec-Fetch-Mode": "navigate" if tcp_profile != "scanner" else "", + "header_Sec-Fetch-Dest": "document" if tcp_profile != "scanner" else "", + "header_Sec-CH-UA": "", + "header_Sec-CH-UA-Mobile": "", + "header_Sec-CH-UA-Platform": "", + **tcp, + } + if extra_headers: + row.update({f"header_{k}": v for k, v in extra_headers.items()}) + return row + + +def generate_human_sessions(n: int = 720) -> list: + """Generate realistic human browsing sessions. + + Each IP gets 1–3 requests spread across different paths. + Distinct (src_ip, ja4, host) → distinct rows in agg_host_ip_ja4_1h. + We need ≥ 500 human rows for the bot_detector baseline. + """ + ips = _human_ips(n) + rows = [] + for ip in ips: + # 1–3 requests per IP with the same JA4 (browser stays consistent) + ja4 = random.choice(HUMAN_JA4S) + ua = random.choice(HUMAN_UA) + tcp = random.choice(["linux", "windows", "macos", "android"]) + n_req = random.randint(1, 3) + for _ in range(n_req): + rows.append(_make_row( + src_ip=ip, ua=ua, + path=random.choice(HUMAN_PATHS), + method=random.choice(["GET", "GET", "GET", "POST"]), + ja4=ja4, tcp_profile=tcp, + scheme=random.choice(["https", "https", "http"]), + )) + return rows + + +def generate_scanner_sessions(n: int = 150) -> list: + """Generate scanner/attack traffic — anomalous patterns for ML detection. + + Characteristics: minimal TCP options, small window, no Sec-Fetch headers, + attack paths, scanner UAs, rapid-fire requests (low syn_ms). + """ + ips = _scanner_ips(n) + rows = [] + for ip in ips: + ja4 = random.choice(BOT_JA4S[:3]) # curl/python/masscan profiles + ua = random.choice(SCANNER_UA) + # Burst: 5–20 requests per IP (simulates scan / brute-force) + n_req = random.randint(5, 20) + for _ in range(n_req): + rows.append(_make_row( + src_ip=ip, ua=ua, + path=random.choice(ATTACK_PATHS + ATTACK_PATHS + HUMAN_PATHS), + method=random.choice(["GET", "GET", "GET", "HEAD", "POST"]), + ja4=ja4, tcp_profile="scanner", + scheme="https", + extra_headers={"Content-Type": ""} if random.random() < 0.3 else None, + )) + return rows + + +def generate_known_bot_sessions(n: int = 100) -> list: + """Generate sessions from IPs listed in bot_ip.csv (direct bot labeling).""" + rows = [] + for _ in range(n): + ip = random.choice(BOT_IP_KNOWN) + ua = random.choice(BOT_CRAWLER_UA + SCANNER_UA) + ja4 = random.choice(BOT_JA4S) + rows.append(_make_row( + src_ip=ip, ua=ua, + path=random.choice(BOT_PATHS + ATTACK_PATHS), + ja4=ja4, tcp_profile="scanner", + scheme="https", + )) + return rows + + +def generate_brute_force_cluster(n_ips: int = 20) -> list: + """Simulate credential stuffing / brute-force from a small set of IPs. + + Same IP → many POST /login requests = high hit count, suspicious pattern. + """ + ips = _scanner_ips(n_ips)[:n_ips] + rows = [] + for ip in ips: + ua = random.choice(SCANNER_UA + BOT_CRAWLER_UA) + ja4 = random.choice(BOT_JA4S) + for _ in range(random.randint(20, 50)): + rows.append(_make_row( + src_ip=ip, ua=ua, + path="/login", + method="POST", + ja4=ja4, tcp_profile="scanner", + scheme="https", + extra_headers={ + "Content-Type": "application/x-www-form-urlencoded", + "Content-Length": "32", + }, + )) + return rows + + +# --------------------------------------------------------------------------- +# ClickHouse insert +# --------------------------------------------------------------------------- + +def _ch_insert(rows: list, host: str, port: int, user: str, password: str, + batch_size: int = 200, dry_run: bool = False) -> int: + """Insert rows into ja4_logs.http_logs_raw via ClickHouse HTTP interface. + + Each row is wrapped as {"raw_json": ""} in JSONEachRow format. + """ + if dry_run: + print(f"[seed] DRY-RUN — would insert {len(rows)} rows") + print("[seed] Sample row:", json.dumps(rows[0], indent=2)[:400]) + return len(rows) + + url = ( + f"http://{host}:{port}/" + f"?query={urllib.parse.quote('INSERT INTO ja4_logs.http_logs_raw (raw_json) FORMAT JSONEachRow')}" + f"&user={urllib.parse.quote(user)}" + f"&password={urllib.parse.quote(password)}" + ) + + total_inserted = 0 + for i in range(0, len(rows), batch_size): + batch = rows[i:i + batch_size] + body_lines = [] + for row in batch: + # raw_json column holds the entire log as a JSON string + outer = {"raw_json": json.dumps(row, separators=(",", ":"))} + body_lines.append(json.dumps(outer, separators=(",", ":"))) + body = "\n".join(body_lines).encode("utf-8") + + req = urllib.request.Request( + url, data=body, method="POST", + headers={"Content-Type": "application/x-ndjson; charset=utf-8"}, + ) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + resp.read() + total_inserted += len(batch) + except urllib.error.HTTPError as e: + err_body = e.read(500).decode("utf-8", errors="replace") + print(f"[seed] ERROR batch {i}–{i+batch_size}: HTTP {e.code}: {err_body}") + except Exception as e: + print(f"[seed] ERROR batch {i}–{i+batch_size}: {e}") + + return total_inserted + + +def _wait_for_clickhouse(host: str, port: int, user: str, password: str, + timeout_s: int = 60) -> bool: + """Wait for ClickHouse to be ready.""" + url = ( + f"http://{host}:{port}/" + f"?query=SELECT+1" + f"&user={urllib.parse.quote(user)}" + f"&password={urllib.parse.quote(password)}" + ) + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(url, timeout=5) as r: + if r.read().strip() == b"1": + return True + except Exception: + pass + time.sleep(2) + return False + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser(description="Seed ClickHouse with synthetic traffic") + parser.add_argument("--host", default="clickhouse") + parser.add_argument("--port", type=int, default=8123) + parser.add_argument("--user", default="default") + parser.add_argument("--password", default="") + parser.add_argument("--dry-run", action="store_true", + help="Generate data but do not insert") + args = parser.parse_args() + + if not args.dry_run: + print(f"[seed] Waiting for ClickHouse at {args.host}:{args.port}…") + if not _wait_for_clickhouse(args.host, args.port, args.user, args.password): + print("[seed] FATAL: ClickHouse not reachable after 60s") + raise SystemExit(1) + print("[seed] ClickHouse ready.") + + t0 = time.monotonic() + + # Generate all row sets + print("[seed] Generating rows…") + human_rows = generate_human_sessions(720) # ≥ 500 unique (ip,ja4,host) human sessions + scanner_rows = generate_scanner_sessions(150) # anomalous datacenter traffic + known_bot = generate_known_bot_sessions(100) # directly labeled by bot_ip.csv + brute_force = generate_brute_force_cluster(20) # credential stuffing pattern + + all_rows = human_rows + scanner_rows + known_bot + brute_force + random.shuffle(all_rows) + + print(f"[seed] Total rows to insert: {len(all_rows)}") + print(f" • {len(human_rows):<5} human sessions " + f"(~{len(set(r['src_ip'] for r in human_rows))} unique IPs)") + print(f" • {len(scanner_rows):<5} scanner/anomaly sessions") + print(f" • {len(known_bot):<5} known-bot sessions") + print(f" • {len(brute_force):<5} brute-force rows") + + inserted = _ch_insert( + all_rows, args.host, args.port, args.user, args.password, + dry_run=args.dry_run, + ) + + elapsed = time.monotonic() - t0 + print(f"[seed] Done: {inserted}/{len(all_rows)} rows inserted in {elapsed:.1f}s") + + if inserted < len(all_rows) * 0.9: + print("[seed] WARNING: fewer than 90% of rows inserted — check errors above") + raise SystemExit(1) + + print(f"[seed] The bot_detector should now see ≥ 500 human sessions " + f"in view_ai_features_1h (after MV propagation).") + + +if __name__ == "__main__": + main()