#!/usr/bin/env python3
"""
seed_clickhouse.py — Bootstrap ClickHouse with realistic synthetic traffic data.
Inserts directly into ja4_logs.http_logs_raw (triggers all MVs automatically):
• 700 human sessions — IPs in residential ISP ranges (ASN→'human' via dict)
• 150 datacenter/scanner sessions — anomalous patterns for ML detection
• 100 known-bot sessions — IPs/JA4 in bot_ip.csv / bot_ja4.csv
This ensures view_ai_features_1h has ≥ 500 human rows for the bot_detector
training threshold (run_semi_supervised_logic requires len(human_baseline) >= 500).
All timestamps are within the last 30 minutes so the 24h window filter catches them.
No external dependencies — uses Python stdlib urllib only.
Usage:
python seed_clickhouse.py
python seed_clickhouse.py --host clickhouse --port 8123 --user default --password ""
python seed_clickhouse.py --dry-run
"""
import argparse
import hashlib
import json
import random
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timedelta, timezone
# ---------------------------------------------------------------------------
# JA4 fingerprint profiles (must match bot_ja4.csv for bot detection to work)
# ---------------------------------------------------------------------------
# Human browser profiles — realistic TLS 1.3 fingerprints
HUMAN_JA4S = [
"t13d1917h2_b0372614b25a_6a77dcf5a8be", # Chrome 120 Windows TLS1.3
"t13d1817h2_b0372614b25a_0a3e5785d15f", # Firefox 121 TLS1.3
"t13d1617h2_fc82e8b7e1c0_9dc949149365", # Safari 17 macOS TLS1.3
"t13d1917h2_fc82e8b7e1c0_6b9b1b2c3d4e", # Edge 120 TLS1.3
"t13d1817h2_9dc949149365_8c4a9a4b0d01", # Chrome Mobile TLS1.3
"t12d1706h2_9dc949149365_fc82e8b7e1c0", # Chrome 120 TLS1.2 (older server)
"t12d1606h2_8c4a9a4b0d01_9dc949149365", # Firefox TLS1.2
]
# Bot/scanner profiles — intentionally minimal cipher suites, match bot_ja4.csv
BOT_JA4S = [
"t13d030500_ffd59bab1b39_6e7f7df63e98", # curl scanner (in bot_ja4.csv)
"t13d020300_6b9b1b2c3d4e_ffd59bab1b39", # python-requests scanner (in bot_ja4.csv)
"t10d170000_0a1b2c3d4e5f_1b2c3d4e5f60", # Masscan (in bot_ja4.csv)
"t12d050700_5a6b7c8d9e0f_1a2b3c4d5e6f", # zgrab (in bot_ja4.csv)
"t13d010100_aabbccddeeff_0011223344aa", # Headless Chrome automation (in bot_ja4.csv)
]
# ---------------------------------------------------------------------------
# IP pools — must match ranges in iplocate-ip-to-asn.csv
# ---------------------------------------------------------------------------
# Human residential IPs — OVH FR (ASN 16276) → asn_label='human'
def _human_ips(n: int) -> list:
ips = [f"91.121.{o3}.{o4}" for o3 in range(0, 20) for o4 in range(1, 60)]
random.shuffle(ips)
return ips[:n]
# Datacenter / scanner IPs — Tor/Contabo/Reg.ru → asn_label='datacenter'/'hosting'
def _scanner_ips(n: int) -> list:
ips = (
[f"185.220.101.{i}" for i in range(1, 101)] # ASN 210644 datacenter
+ [f"45.155.205.{i}" for i in range(1, 51)] # ASN 209083 datacenter
+ [f"193.32.162.{i}" for i in range(1, 31)] # ASN 197695 hosting
)
random.shuffle(ips)
return ips[:n]
# Known bot IPs (subset also in bot_ip.csv → directly labeled)
BOT_IP_KNOWN = [
"185.220.101.34", "185.220.101.47", "185.220.101.52",
"185.220.101.73", "185.220.101.91",
"45.155.205.233", "45.155.205.220", "45.155.205.205",
"193.32.162.10", "193.32.162.11",
]
# ---------------------------------------------------------------------------
# User-Agent pools per profile
# ---------------------------------------------------------------------------
HUMAN_UA = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
"Mozilla/5.0 (Linux; Android 13; Pixel 7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.6099.115 Mobile Safari/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1",
]
SCANNER_UA = [
"curl/7.88.1",
"python-requests/2.31.0",
"Masscan/1.3",
"zgrab/0.x",
"Go-http-client/1.1",
"libwww-perl/6.72",
"Java/11.0.18",
"Wget/1.21.3",
"masscan/1.3 (https://github.com/robertdavidgraham/masscan)",
"-", # No User-Agent (raw scanner)
]
BOT_CRAWLER_UA = [
"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
"Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)",
"Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)",
"Twitterbot/1.0",
"facebookexternalhit/1.1 (+http://www.facebook.com/externalhit_uatext.php)",
"Googlebot/2.1 (+http://www.google.com/bot.html)",
]
# ---------------------------------------------------------------------------
# Path pools per profile
# ---------------------------------------------------------------------------
HUMAN_PATHS = [
"/", "/index.html", "/about", "/contact", "/products", "/services",
"/blog", "/blog/post-1", "/blog/post-2", "/faq", "/pricing",
"/login", "/register", "/profile", "/dashboard",
"/api/v1/users", "/api/v1/status", "/api/v2/metrics",
"/static/js/app.js", "/static/css/main.css", "/images/logo.png",
"/favicon.ico", "/robots.txt", "/sitemap.xml",
"/health", "/search?q=test", "/search?q=product+review",
]
ATTACK_PATHS = [
"/.env", "/.git/HEAD", "/.git/config",
"/wp-login.php", "/wp-admin/", "/xmlrpc.php", "/wp-config.php",
"/phpmyadmin/", "/phpMyAdmin/", "/pma/",
"/admin", "/admin/login", "/administrator/",
"/cgi-bin/test.cgi", "/cgi-bin/../etc/passwd",
"/download?file=../../../etc/passwd", "/download?file=../../../../etc/shadow",
"/api/search?q=",
"/api/users?id=1+OR+1%3D1",
"/shell.php", "/cmd.php", "/eval.php",
"/.aws/credentials", "/.ssh/id_rsa",
"/etc/passwd", "/proc/self/environ",
]
BOT_PATHS = [
"/robots.txt", "/sitemap.xml", "/", "/index.html",
"/sitemap_index.xml", "/news-sitemap.xml",
"/feed", "/rss.xml", "/atom.xml",
]
# ---------------------------------------------------------------------------
# TCP / TLS metadata helpers
# ---------------------------------------------------------------------------
# Realistic TCP options fingerprints per OS
TCP_OPTIONS = {
"linux": "020405b40402080affffffff000000000103030a", # MSS+NOP+SACK+TS+WS=10
"windows": "020405b40103030801010402", # MSS+NOP+WS+SACK
"macos": "020405ac0103030601010402", # MSS+NOP+WS+SACK (macOS)
"scanner": "0204ffff", # Scanner: only MSS, max value
"minimal": "0204ffd7", # Minimal
}
def _tcp_meta(profile: str = "linux") -> dict:
profiles = {
"linux": {"window_size": 65535, "mss": 1460, "wscale": 10, "ttl": 64, "df": 1},
"windows": {"window_size": 64240, "mss": 1460, "wscale": 8, "ttl": 128, "df": 1},
"macos": {"window_size": 65535, "mss": 1460, "wscale": 6, "ttl": 64, "df": 1},
"android": {"window_size": 65535, "mss": 1420, "wscale": 9, "ttl": 64, "df": 1},
"scanner": {"window_size": 1024, "mss": 1460, "wscale": 0, "ttl": 48, "df": 0},
"minimal": {"window_size": 512, "mss": 576, "wscale": 0, "ttl": 60, "df": 0},
}
meta = profiles.get(profile, profiles["linux"])
return {
"tcp_meta_window_size": meta["window_size"] + random.randint(-100, 100),
"tcp_meta_mss": meta["mss"],
"tcp_meta_window_scale": meta["wscale"],
"tcp_meta_options": TCP_OPTIONS.get(profile, TCP_OPTIONS["linux"]),
"ip_meta_ttl": meta["ttl"] - random.randint(0, 5),
"ip_meta_df": meta["df"],
"ip_meta_id": random.randint(1, 65535),
"ip_meta_total_length": random.randint(1200, 1500),
}
def _syn_ms(profile: str) -> int:
"""Realistic SYN→ClientHello latency in milliseconds."""
if profile == "scanner":
return random.randint(0, 3) # Scanners: near-instant
if profile in ("minimal",):
return random.randint(1, 5)
return random.randint(10, 120) # Humans: network RTT
def _ja3_for_ja4(ja4: str) -> tuple:
"""Generate a plausible JA3 string and its MD5 hash matching the JA4 profile."""
# These are fake but consistent — just need to be non-empty strings
if "tls13" in ja4 or ja4.startswith("t13"):
raw = "771,4866-4867-4865-49196-49200-52393-52392,0-23-65281-10-11-35-16-5-13-18-51-45-43-27,29-23-24,0"
elif ja4.startswith("t12"):
raw = "771,49195-49199-49196-49200-52393-52392,0-23-65281-10-11-35-16-5-13,29-23-24,0"
elif ja4.startswith("t10"):
raw = "769,49161-49162-49171-49172,0-10-11,29-23-24,0"
else:
raw = "771,4866-4867-4865,0-23-65281,29-23-24,0"
md5 = hashlib.md5(raw.encode()).hexdigest()
return raw, md5
# ---------------------------------------------------------------------------
# Row generators
# ---------------------------------------------------------------------------
def _now_minus(seconds: int) -> str:
"""ISO-8601 UTC timestamp N seconds in the past."""
t = datetime.now(timezone.utc) - timedelta(seconds=seconds)
return t.strftime("%Y-%m-%dT%H:%M:%SZ")
def _make_row(
src_ip: str,
ua: str,
path: str,
method: str = "GET",
ja4: str = None,
tcp_profile: str = "linux",
scheme: str = "https",
host: str = "platform",
time_offset_s: int = None,
extra_headers: dict = None,
) -> dict:
"""Build a single raw_json dict matching what the correlator produces."""
if time_offset_s is None:
time_offset_s = random.randint(0, 1700) # spread over last ~28 min
if ja4 is None:
ja4 = random.choice(HUMAN_JA4S)
ja3_raw, ja3_hash = _ja3_for_ja4(ja4)
tcp = _tcp_meta(tcp_profile)
syn_ms = _syn_ms(tcp_profile)
client_headers = "Host,User-Agent,Accept,Accept-Language,Accept-Encoding"
if extra_headers:
client_headers += "," + ",".join(extra_headers.keys())
row = {
"time": _now_minus(time_offset_s),
"src_ip": src_ip,
"src_port": random.randint(1024, 65535),
"dst_ip": "172.20.0.2",
"dst_port": 443 if scheme == "https" else 80,
"method": method,
"scheme": scheme,
"host": host,
"path": path.split("?")[0] if "?" in path else path,
"query": path.split("?")[1] if "?" in path else "",
"http_version": "HTTP/2.0" if ja4.endswith("h2") else "HTTP/1.1",
"orphan_side": "",
"correlated": True,
"keepalives": random.randint(1, 8),
"a_timestamp": int(time.time() * 1_000_000),
"b_timestamp": int(time.time() * 1_000_000) + syn_ms * 1000,
"conn_id": f"seed_{src_ip.replace('.', '_')}_{random.randint(1000,9999)}",
"syn_to_clienthello_ms": syn_ms,
"tls_version": "1.3" if ja4.startswith("t13") else ("1.2" if ja4.startswith("t12") else "1.0"),
"tls_sni": host,
"tls_alpn": "h2" if "h2" in ja4 else "http/1.1",
"ja3": ja3_raw,
"ja3_hash": ja3_hash,
"ja4": ja4,
"client_headers": client_headers,
"header_User-Agent": ua,
"header_Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
"header_Accept-Encoding": "gzip, deflate, br",
"header_Accept-Language": random.choice(["fr-FR,fr;q=0.9", "en-US,en;q=0.9", "de-DE,de;q=0.8"]),
"header_Content-Type": "",
"header_X-Request-Id": "",
"header_X-Trace-Id": "",
"header_X-Forwarded-For": "",
"header_Sec-Fetch-Site": "none" if tcp_profile != "scanner" else "",
"header_Sec-Fetch-Mode": "navigate" if tcp_profile != "scanner" else "",
"header_Sec-Fetch-Dest": "document" if tcp_profile != "scanner" else "",
"header_Sec-CH-UA": "",
"header_Sec-CH-UA-Mobile": "",
"header_Sec-CH-UA-Platform": "",
**tcp,
}
if extra_headers:
row.update({f"header_{k}": v for k, v in extra_headers.items()})
return row
def generate_human_sessions(n: int = 720) -> list:
"""Generate realistic human browsing sessions.
Each IP gets 1–3 requests spread across different paths.
Distinct (src_ip, ja4, host) → distinct rows in agg_host_ip_ja4_1h.
We need ≥ 500 human rows for the bot_detector baseline.
"""
ips = _human_ips(n)
rows = []
for ip in ips:
# 1–3 requests per IP with the same JA4 (browser stays consistent)
ja4 = random.choice(HUMAN_JA4S)
ua = random.choice(HUMAN_UA)
tcp = random.choice(["linux", "windows", "macos", "android"])
n_req = random.randint(1, 3)
for _ in range(n_req):
rows.append(_make_row(
src_ip=ip, ua=ua,
path=random.choice(HUMAN_PATHS),
method=random.choice(["GET", "GET", "GET", "POST"]),
ja4=ja4, tcp_profile=tcp,
scheme=random.choice(["https", "https", "http"]),
))
return rows
def generate_scanner_sessions(n: int = 150) -> list:
"""Generate scanner/attack traffic — anomalous patterns for ML detection.
Characteristics: minimal TCP options, small window, no Sec-Fetch headers,
attack paths, scanner UAs, rapid-fire requests (low syn_ms).
"""
ips = _scanner_ips(n)
rows = []
for ip in ips:
ja4 = random.choice(BOT_JA4S[:3]) # curl/python/masscan profiles
ua = random.choice(SCANNER_UA)
# Burst: 5–20 requests per IP (simulates scan / brute-force)
n_req = random.randint(5, 20)
for _ in range(n_req):
rows.append(_make_row(
src_ip=ip, ua=ua,
path=random.choice(ATTACK_PATHS + ATTACK_PATHS + HUMAN_PATHS),
method=random.choice(["GET", "GET", "GET", "HEAD", "POST"]),
ja4=ja4, tcp_profile="scanner",
scheme="https",
extra_headers={"Content-Type": ""} if random.random() < 0.3 else None,
))
return rows
def generate_known_bot_sessions(n: int = 100) -> list:
"""Generate sessions from IPs listed in bot_ip.csv (direct bot labeling)."""
rows = []
for _ in range(n):
ip = random.choice(BOT_IP_KNOWN)
ua = random.choice(BOT_CRAWLER_UA + SCANNER_UA)
ja4 = random.choice(BOT_JA4S)
rows.append(_make_row(
src_ip=ip, ua=ua,
path=random.choice(BOT_PATHS + ATTACK_PATHS),
ja4=ja4, tcp_profile="scanner",
scheme="https",
))
return rows
def generate_brute_force_cluster(n_ips: int = 20) -> list:
"""Simulate credential stuffing / brute-force from a small set of IPs.
Same IP → many POST /login requests = high hit count, suspicious pattern.
"""
ips = _scanner_ips(n_ips)[:n_ips]
rows = []
for ip in ips:
ua = random.choice(SCANNER_UA + BOT_CRAWLER_UA)
ja4 = random.choice(BOT_JA4S)
for _ in range(random.randint(20, 50)):
rows.append(_make_row(
src_ip=ip, ua=ua,
path="/login",
method="POST",
ja4=ja4, tcp_profile="scanner",
scheme="https",
extra_headers={
"Content-Type": "application/x-www-form-urlencoded",
"Content-Length": "32",
},
))
return rows
# ---------------------------------------------------------------------------
# ClickHouse insert
# ---------------------------------------------------------------------------
def _ch_insert(rows: list, host: str, port: int, user: str, password: str,
batch_size: int = 200, dry_run: bool = False) -> int:
"""Insert rows into ja4_logs.http_logs_raw via ClickHouse HTTP interface.
Each row is wrapped as {"raw_json": ""} in JSONEachRow format.
"""
if dry_run:
print(f"[seed] DRY-RUN — would insert {len(rows)} rows")
print("[seed] Sample row:", json.dumps(rows[0], indent=2)[:400])
return len(rows)
url = (
f"http://{host}:{port}/"
f"?query={urllib.parse.quote('INSERT INTO ja4_logs.http_logs_raw (raw_json) FORMAT JSONEachRow')}"
f"&user={urllib.parse.quote(user)}"
f"&password={urllib.parse.quote(password)}"
)
total_inserted = 0
for i in range(0, len(rows), batch_size):
batch = rows[i:i + batch_size]
body_lines = []
for row in batch:
# raw_json column holds the entire log as a JSON string
outer = {"raw_json": json.dumps(row, separators=(",", ":"))}
body_lines.append(json.dumps(outer, separators=(",", ":")))
body = "\n".join(body_lines).encode("utf-8")
req = urllib.request.Request(
url, data=body, method="POST",
headers={"Content-Type": "application/x-ndjson; charset=utf-8"},
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
resp.read()
total_inserted += len(batch)
except urllib.error.HTTPError as e:
err_body = e.read(500).decode("utf-8", errors="replace")
print(f"[seed] ERROR batch {i}–{i+batch_size}: HTTP {e.code}: {err_body}")
except Exception as e:
print(f"[seed] ERROR batch {i}–{i+batch_size}: {e}")
return total_inserted
def _wait_for_clickhouse(host: str, port: int, user: str, password: str,
timeout_s: int = 60) -> bool:
"""Wait for ClickHouse to be ready."""
url = (
f"http://{host}:{port}/"
f"?query=SELECT+1"
f"&user={urllib.parse.quote(user)}"
f"&password={urllib.parse.quote(password)}"
)
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
try:
with urllib.request.urlopen(url, timeout=5) as r:
if r.read().strip() == b"1":
return True
except Exception:
pass
time.sleep(2)
return False
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Seed ClickHouse with synthetic traffic")
parser.add_argument("--host", default="clickhouse")
parser.add_argument("--port", type=int, default=8123)
parser.add_argument("--user", default="default")
parser.add_argument("--password", default="")
parser.add_argument("--dry-run", action="store_true",
help="Generate data but do not insert")
args = parser.parse_args()
if not args.dry_run:
print(f"[seed] Waiting for ClickHouse at {args.host}:{args.port}…")
if not _wait_for_clickhouse(args.host, args.port, args.user, args.password):
print("[seed] FATAL: ClickHouse not reachable after 60s")
raise SystemExit(1)
print("[seed] ClickHouse ready.")
t0 = time.monotonic()
# Generate all row sets
print("[seed] Generating rows…")
human_rows = generate_human_sessions(720) # ≥ 500 unique (ip,ja4,host) human sessions
scanner_rows = generate_scanner_sessions(150) # anomalous datacenter traffic
known_bot = generate_known_bot_sessions(100) # directly labeled by bot_ip.csv
brute_force = generate_brute_force_cluster(20) # credential stuffing pattern
all_rows = human_rows + scanner_rows + known_bot + brute_force
random.shuffle(all_rows)
print(f"[seed] Total rows to insert: {len(all_rows)}")
print(f" • {len(human_rows):<5} human sessions "
f"(~{len(set(r['src_ip'] for r in human_rows))} unique IPs)")
print(f" • {len(scanner_rows):<5} scanner/anomaly sessions")
print(f" • {len(known_bot):<5} known-bot sessions")
print(f" • {len(brute_force):<5} brute-force rows")
inserted = _ch_insert(
all_rows, args.host, args.port, args.user, args.password,
dry_run=args.dry_run,
)
elapsed = time.monotonic() - t0
print(f"[seed] Done: {inserted}/{len(all_rows)} rows inserted in {elapsed:.1f}s")
if inserted < len(all_rows) * 0.9:
print("[seed] WARNING: fewer than 90% of rows inserted — check errors above")
raise SystemExit(1)
print(f"[seed] The bot_detector should now see ≥ 500 human sessions "
f"in view_ai_features_1h (after MV propagation).")
if __name__ == "__main__":
main()