#!/usr/bin/env python3
"""
seed_clickhouse.py — Bootstrap ClickHouse with realistic synthetic traffic data.
Inserts directly into ja4_logs.http_logs_raw (triggers all MVs automatically):
• ~350,000 rows from 14,000 legitimate browser IPs (ISP ranges, asn_label='isp')
• ~100,000 rows from 3,000 malicious bot/scanner IPs (datacenter ranges)
• ~30,000 rows from 2,000 legitimate bot IPs (from bot_ip.csv CIDRs)
• ~20,000 rows from 1,000 AI bot IPs (datacenter ranges)
Total: 500,000 rows from 20,000 unique IPs (configurable via --rows / --ips).
Browser JA4 fingerprints are loaded from browser_ja4.csv so they match
dict_browser_ja4 (LEGITIMATE_BROWSER classification). Bot/scanner JA4s are
synthetic hashes guaranteed NOT to appear in that dictionary.
Bot IPs are drawn from real CIDRs in bot_ip.csv (Googlebot, Bingbot, etc.).
ISP and datacenter IPs use hard-coded /24 prefixes from well-known ASNs that
resolve correctly through iplocate-ip-to-asn.csv → asn_reputation.csv.
This ensures view_ai_features_1h has ≥ 500 human rows for the bot_detector
training threshold (run_semi_supervised_logic requires len(human_baseline) >= 500).
All timestamps are within the last 30 minutes so the 24h window filter catches them.
No external dependencies — uses Python stdlib only.
Usage:
python seed_clickhouse.py
python seed_clickhouse.py --host clickhouse --port 8123 --user default --password ""
python seed_clickhouse.py --rows 500000 --ips 20000 --seed 42
python seed_clickhouse.py --dry-run
"""
import argparse
import csv
import hashlib
import ipaddress
import json
import os
import random
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timedelta, timezone
# ---------------------------------------------------------------------------
# Hard-coded /24 prefixes — guaranteed to resolve via ASN dictionaries
# ---------------------------------------------------------------------------
# ISP ranges (asn_label='isp' in asn_reputation.csv)
ISP_PREFIXES = (
# Comcast AS7922 — within 24.0.0.0/12
[f"24.{o2}.{o3}" for o2 in range(1, 11) for o3 in range(0, 3)]
# Orange AS3215 — within 2.3.0.0/16
+ [f"2.3.{o3}" for o3 in range(0, 10)]
# Deutsche Telekom AS3320 — within 2.160.0.0/12
+ [f"2.{160 + o2}.{o3}" for o2 in range(0, 5) for o3 in range(0, 2)]
# AT&T AS7018 — within 12.0.0.0/10
+ [f"12.0.{o3}" for o3 in range(4, 14)]
# Verizon AS701 — within 63.0.0.0/12
+ [f"63.{o2}.0" for o2 in range(0, 10)]
# BT AS2856 — within 5.80.0.0/15
+ [f"5.80.{o3}" for o3 in range(0, 8)]
) # ~68 prefixes × 254 ≈ 17K IPs
# Datacenter ranges for scanners (asn_label='datacenter')
DC_SCANNER_PREFIXES = (
# DigitalOcean AS14061 — within 5.101.96.0/20
[f"5.101.{96 + o3}" for o3 in range(0, 6)]
# Hetzner AS24940 — within 5.9.0.0/16
+ [f"5.9.{o3}" for o3 in range(0, 6)]
# OVH AS16276 — within 5.39.0.0/17
+ [f"5.39.{o3}" for o3 in range(0, 5)]
) # ~17 prefixes × 254 ≈ 4.3K IPs
# Datacenter ranges for AI bots (separate from scanner ranges)
DC_AI_PREFIXES = (
# DigitalOcean (different /24s)
[f"5.101.{102 + o3}" for o3 in range(0, 4)]
# Hetzner (different /24s)
+ [f"5.9.{6 + o3}" for o3 in range(0, 4)]
) # ~8 prefixes × 254 ≈ 2K IPs
# Fallback /24s for legitimate bot overflow
DC_LEGIT_BOT_PREFIXES = [f"5.9.{20 + o3}" for o3 in range(0, 8)]
# ---------------------------------------------------------------------------
# Browser family → User-Agent mapping
# ---------------------------------------------------------------------------
BROWSER_UAS = {
"Chromium": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
"Mozilla/5.0 (Linux; Android 13; Pixel 7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.6099.115 Mobile Safari/537.36",
],
"Firefox": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) "
"Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (X11; Linux x86_64; rv:120.0) "
"Gecko/20100101 Firefox/120.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14.2; rv:121.0) "
"Gecko/20100101 Firefox/121.0",
],
"Safari": [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2_1) AppleWebKit/605.1.15 "
"(KHTML, like Gecko) Version/17.2 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 "
"(KHTML, like Gecko) Version/17.1 Safari/605.1.15",
],
"Edge": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
],
"Opera": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 OPR/105.0.0.0",
],
"Vivaldi": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 "
"Vivaldi/6.4.3160.47",
],
"Chrome_iOS": [
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"CriOS/120.0.6099.119 Mobile/15E148 Safari/604.1",
],
"Chromium_Legacy": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36",
],
"Firefox_Legacy": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:78.0) "
"Gecko/20100101 Firefox/78.0",
],
"Safari_Legacy": [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/15.6 Safari/605.1.15",
],
"Tor_Browser": [
"Mozilla/5.0 (Windows NT 10.0; rv:102.0) "
"Gecko/20100101 Firefox/102.0",
],
}
BROWSER_TCP = {
"Chromium": ["windows", "linux", "android"],
"Firefox": ["windows", "linux"],
"Safari": ["macos"],
"Edge": ["windows"],
"Opera": ["windows", "linux"],
"Vivaldi": ["windows", "linux"],
"Chrome_iOS": ["macos"],
"Chromium_Legacy": ["windows"],
"Firefox_Legacy": ["windows", "linux"],
"Safari_Legacy": ["macos"],
"Tor_Browser": ["linux"],
}
CHROMIUM_FAMILIES = {
"Chromium", "Edge", "Opera", "Vivaldi",
"Chrome_iOS", "Chromium_Legacy",
}
BROWSER_WEIGHTS = {
"Chromium": 55, "Firefox": 15, "Safari": 12, "Edge": 8,
"Opera": 3, "Vivaldi": 1, "Chrome_iOS": 3,
"Chromium_Legacy": 1, "Firefox_Legacy": 1,
"Safari_Legacy": 0.5, "Tor_Browser": 0.5,
}
SEC_CH_UA = {
"Chromium":
'"Chromium";v="120", "Google Chrome";v="120", "Not-A.Brand";v="99"',
"Edge":
'"Chromium";v="120", "Microsoft Edge";v="120", "Not-A.Brand";v="99"',
"Opera":
'"Chromium";v="119", "Opera";v="105", "Not-A.Brand";v="99"',
"Vivaldi":
'"Chromium";v="118", "Vivaldi";v="6.4", "Not-A.Brand";v="99"',
"Chrome_iOS":
'"Chromium";v="120", "Google Chrome";v="120", "Not-A.Brand";v="99"',
"Chromium_Legacy":
'"Chromium";v="90", "Google Chrome";v="90", "Not-A.Brand";v="99"',
}
# ---------------------------------------------------------------------------
# Hosts and Accept-Language pools
# ---------------------------------------------------------------------------
HOSTS = ["platform", "api.platform", "www.example.com", "shop.example.com"]
ACCEPT_LANGUAGES = [
"en-US,en;q=0.9",
"en-GB,en;q=0.9",
"fr-FR,fr;q=0.9,en;q=0.8",
"de-DE,de;q=0.9,en;q=0.8",
"es-ES,es;q=0.9,en;q=0.8",
"ja-JP,ja;q=0.9,en;q=0.8",
"pt-BR,pt;q=0.9,en;q=0.8",
"zh-CN,zh;q=0.9,en;q=0.8",
]
# ---------------------------------------------------------------------------
# Path pools
# ---------------------------------------------------------------------------
PAGE_PATHS = [
"/", "/index.html", "/about", "/contact", "/products", "/services",
"/blog", "/blog/post-1", "/blog/post-2", "/blog/post-3", "/faq",
"/pricing", "/login", "/register", "/profile", "/dashboard",
"/docs", "/docs/getting-started", "/docs/api-reference",
"/help", "/terms", "/privacy", "/search",
]
ASSET_PATHS = [
"/static/js/app.js", "/static/js/vendor.js", "/static/js/analytics.js",
"/static/css/main.css", "/static/css/theme.css",
"/images/logo.png", "/images/hero.webp", "/images/banner.jpg",
"/favicon.ico", "/fonts/inter-400.woff2", "/fonts/inter-700.woff2",
]
API_PATHS = [
"/api/v1/users", "/api/v1/status", "/api/v2/metrics",
"/api/v1/products", "/api/v1/search", "/api/v2/config",
]
ATTACK_PATHS = [
"/.env", "/.git/HEAD", "/.git/config",
"/wp-login.php", "/wp-admin/", "/xmlrpc.php", "/wp-config.php",
"/phpmyadmin/", "/phpMyAdmin/", "/pma/",
"/admin", "/admin/login", "/administrator/",
"/cgi-bin/test.cgi", "/cgi-bin/../etc/passwd",
"/download?file=../../../etc/passwd",
"/download?file=../../../../etc/shadow",
"/api/search?q=",
"/api/users?id=1+OR+1%3D1",
"/shell.php", "/cmd.php", "/eval.php",
"/.aws/credentials", "/.ssh/id_rsa",
"/etc/passwd", "/proc/self/environ",
"/actuator", "/actuator/env", "/actuator/health",
"/server-status", "/.svn/entries",
"/wp-content/uploads/", "/backup.zip", "/db.sql",
"/api/v1/../admin", "/api/debug",
"/.htaccess", "/.htpasswd",
"/console", "/debug/pprof/",
]
SCRAPER_PATHS = (
[f"/products/page/{i}" for i in range(1, 51)]
+ [
f"/category/{c}/page/{i}"
for c in ["electronics", "clothing", "books", "home", "sports"]
for i in range(1, 11)
]
)
BOT_PATHS = [
"/robots.txt", "/sitemap.xml", "/", "/index.html",
"/sitemap_index.xml", "/news-sitemap.xml",
"/feed", "/rss.xml", "/atom.xml",
]
CONTENT_PATHS = PAGE_PATHS + [f"/blog/post-{i}" for i in range(1, 21)] + [
f"/products/{s}"
for s in ["widget-a", "widget-b", "gadget-x", "tool-pro", "kit-basic"]
]
# ---------------------------------------------------------------------------
# Scanner / bot User-Agents
# ---------------------------------------------------------------------------
SCANNER_UAS = [
"curl/7.88.1",
"curl/8.1.2",
"python-requests/2.31.0",
"python-requests/2.28.1",
"python-urllib3/2.0.4",
"Masscan/1.3",
"masscan/1.3 (https://github.com/robertdavidgraham/masscan)",
"zgrab/0.x",
"Go-http-client/1.1",
"Go-http-client/2.0",
"libwww-perl/6.72",
"Java/11.0.18",
"Java/17.0.2",
"Wget/1.21.3",
"Scrapy/2.11.0",
"Apache-HttpClient/4.5.14",
"okhttp/4.12.0",
"Node-Fetch/1.0",
"axios/1.6.2",
"-",
"",
]
HEADLESS_UAS = [
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) HeadlessChrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) HeadlessChrome/119.0.0.0 Safari/537.36",
]
LEGIT_BOT_UAS = {
"Googlebot": [
"Mozilla/5.0 (compatible; Googlebot/2.1; "
"+http://www.google.com/bot.html)",
"Googlebot/2.1 (+http://www.google.com/bot.html)",
"Mozilla/5.0 (Linux; Android 6.0.1; Nexus 5X Build/MMB29P) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.6099.71 Mobile Safari/537.36 "
"(compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
],
"Bingbot": [
"Mozilla/5.0 (compatible; bingbot/2.0; "
"+http://www.bing.com/bingbot.htm)",
"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; "
"compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm) "
"Chrome/116.0.1938.76 Safari/537.36",
],
"DuckDuckBot": [
"DuckDuckBot/1.1; (+http://duckduckgo.com/duckduckbot.html)",
],
"Applebot": [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 "
"Safari/605.1.15 (Applebot/0.1; "
"+http://www.apple.com/go/applebot)",
],
"YandexBot": [
"Mozilla/5.0 (compatible; YandexBot/3.0; "
"+http://yandex.com/bots)",
],
"Twitterbot": [
"Twitterbot/1.0",
],
"FacebookBot": [
"facebookexternalhit/1.1 "
"(+http://www.facebook.com/externalhit_uatext.php)",
],
}
AI_BOT_UAS = {
"GPTBot": [
"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; "
"compatible; GPTBot/1.0; +https://openai.com/gptbot)",
],
"PerplexityBot": [
"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; "
"compatible; PerplexityBot/1.0; "
"+https://docs.perplexity.ai/docs/perplexity-bot)",
],
"ClaudeBot": [
"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; "
"compatible; ClaudeBot/1.0; "
"+https://www.anthropic.com/claude-bot)",
],
"CCBot": [
"CCBot/2.0 (https://commoncrawl.org/faq/)",
],
"Bytespider": [
"Mozilla/5.0 (Linux; Android 5.0) AppleWebKit/537.36 "
"(KHTML, like Gecko) Mobile Safari/537.36 "
"(compatible; Bytespider; spider-feedback@bytedance.com)",
],
}
# ---------------------------------------------------------------------------
# TCP / TLS metadata helpers
# ---------------------------------------------------------------------------
TCP_OPTIONS = {
"linux": "020405b40402080affffffff000000000103030a",
"windows": "020405b40103030801010402",
"macos": "020405ac0103030601010402",
"android": "020405b40402080affffffff000000000103030a",
"scanner": "0204ffff",
"minimal": "0204ffd7",
}
TCP_PROFILES = {
"linux": {"window_size": 65535, "mss": 1460, "wscale": 10,
"ttl": 64, "df": 1},
"windows": {"window_size": 64240, "mss": 1460, "wscale": 8,
"ttl": 128, "df": 1},
"macos": {"window_size": 65535, "mss": 1460, "wscale": 6,
"ttl": 64, "df": 1},
"android": {"window_size": 65535, "mss": 1420, "wscale": 9,
"ttl": 64, "df": 1},
"scanner": {"window_size": 1024, "mss": 1460, "wscale": 0,
"ttl": 48, "df": 0},
"minimal": {"window_size": 512, "mss": 576, "wscale": 0,
"ttl": 60, "df": 0},
}
def _tcp_meta(profile, rng):
meta = TCP_PROFILES.get(profile, TCP_PROFILES["linux"])
return {
"tcp_meta_window_size": meta["window_size"] + rng.randint(-100, 100),
"tcp_meta_mss": meta["mss"],
"tcp_meta_window_scale": meta["wscale"],
"tcp_meta_options": TCP_OPTIONS.get(profile, TCP_OPTIONS["linux"]),
"ip_meta_ttl": meta["ttl"] - rng.randint(0, 5),
"ip_meta_df": meta["df"],
"ip_meta_id": rng.randint(1, 65535),
"ip_meta_total_length": rng.randint(1200, 1500),
}
def _syn_ms(profile, rng):
"""Realistic SYN→ClientHello latency in milliseconds."""
if profile == "scanner":
return rng.randint(0, 3)
if profile == "minimal":
return rng.randint(1, 5)
return rng.randint(10, 120)
def _ja3_for_ja4(ja4):
"""Generate a plausible JA3 string and its MD5 hash."""
if ja4.startswith("t13"):
raw = ("771,4866-4867-4865-49196-49200-52393-52392,"
"0-23-65281-10-11-35-16-5-13-18-51-45-43-27,29-23-24,0")
elif ja4.startswith("t12"):
raw = ("771,49195-49199-49196-49200-52393-52392,"
"0-23-65281-10-11-35-16-5-13,29-23-24,0")
elif ja4.startswith("t10"):
raw = "769,49161-49162-49171-49172,0-10-11,29-23-24,0"
else:
raw = "771,4866-4867-4865,0-23-65281,29-23-24,0"
return raw, hashlib.md5(raw.encode()).hexdigest()
# ---------------------------------------------------------------------------
# CSV loading with fallback
# ---------------------------------------------------------------------------
_DATA_SEARCH_PATHS = [
"/app/data",
os.path.join(os.path.dirname(os.path.abspath(__file__)),
"..", "..", "..", "scripts", "data"),
os.path.join(os.path.dirname(os.path.abspath(__file__)), "data"),
]
def _find_data_dir(explicit=None):
if explicit and os.path.isdir(explicit):
return explicit
for p in _DATA_SEARCH_PATHS:
real = os.path.realpath(p)
if os.path.isdir(real) and os.path.isfile(
os.path.join(real, "browser_ja4.csv")):
return real
return None
def load_browser_ja4s(data_dir):
"""Load browser_ja4.csv → {family: [ja4_hash, …]} for TLS only."""
result = {}
if not data_dir:
return result
path = os.path.join(data_dir, "browser_ja4.csv")
if not os.path.isfile(path):
return result
with open(path, newline="", encoding="utf-8") as f:
for row in csv.reader(f):
if len(row) < 2:
continue
ja4, family = row[0].strip(), row[1].strip()
if ja4.startswith("t13") or ja4.startswith("t12"):
result.setdefault(family, []).append(ja4)
return result
def load_bot_ips(data_dir):
"""Load bot_ip.csv → {family: [cidr_str, …]}."""
result = {}
if not data_dir:
return result
path = os.path.join(data_dir, "bot_ip.csv")
if not os.path.isfile(path):
return result
with open(path, newline="", encoding="utf-8") as f:
for row in csv.reader(f):
if len(row) < 2:
continue
result.setdefault(row[1].strip(), []).append(row[0].strip())
return result
# ---------------------------------------------------------------------------
# IP generation helpers
# ---------------------------------------------------------------------------
def _gen_ips_from_prefixes(prefixes, n, rng):
"""Generate *n* unique IPs from /24 prefixes."""
ips = set()
attempts = 0
while len(ips) < n and attempts < n * 5:
prefix = rng.choice(prefixes)
octet = rng.randint(1, 254)
ips.add(f"{prefix}.{octet}")
attempts += 1
return list(ips)
def _ips_from_cidrs(cidrs, n, rng):
"""Generate *n* unique IPv4 IPs from a list of CIDR strings."""
networks = []
single_ips = []
for c in cidrs:
try:
net = ipaddress.ip_network(c, strict=False)
if net.version != 4:
continue
if net.prefixlen == 32:
single_ips.append(str(net.network_address))
elif net.num_addresses > 2:
networks.append(net)
except ValueError:
continue
# Start with any /32 single IPs
ips = set(single_ips)
if not networks:
return list(ips)[:n]
weights = [net.num_addresses for net in networks]
attempts = 0
while len(ips) < n and attempts < n * 10:
net = rng.choices(networks, weights=weights, k=1)[0]
host_offset = rng.randint(1, max(1, net.num_addresses - 2))
ips.add(str(net.network_address + host_offset))
attempts += 1
return list(ips)[:n]
def _generate_bot_ja4s(browser_ja4_set, rng, n=20):
"""Generate synthetic JA4 hashes NOT in the browser CSV."""
prefixes = [
"t13d0305", "t13d0203", "t12d0507", "t10d0100", "t13d0101",
"t12d0302", "t13d0405", "t12d0204", "t10d0200", "t13d0102",
]
bot_ja4s = []
for i in range(n):
p = prefixes[i % len(prefixes)]
seg1 = f"{rng.randint(0, 0xFFFFFFFFFFFF):012x}"
seg2 = f"{rng.randint(0, 0xFFFFFFFFFFFF):012x}"
suffix = "h1" if rng.random() < 0.7 else "h2"
ja4 = f"{p}{suffix}_{seg1}_{seg2}"
if ja4 not in browser_ja4_set:
bot_ja4s.append(ja4)
if not bot_ja4s:
bot_ja4s = [
"t13d030500_ffd59bab1b39_6e7f7df63e98",
"t13d020300_6b9b1b2c3d4e_ffd59bab1b39",
"t10d170000_0a1b2c3d4e5f_1b2c3d4e5f60",
"t12d050700_5a6b7c8d9e0f_1a2b3c4d5e6f",
"t13d010100_aabbccddeeff_0011223344aa",
]
return bot_ja4s
# ---------------------------------------------------------------------------
# Request distribution
# ---------------------------------------------------------------------------
def _distribute_requests(n_ips, total_rows, min_req, max_req, rng):
"""Distribute *total_rows* across *n_ips*, each in [min_req, max_req]."""
if n_ips == 0:
return []
counts = []
remaining = total_rows
for i in range(n_ips):
left = n_ips - i
if i == n_ips - 1:
counts.append(max(min_req, min(max_req, remaining)))
break
lo = max(min_req, remaining - (left - 1) * max_req)
hi = min(max_req, remaining - (left - 1) * min_req)
if lo > hi:
lo = hi = max(min_req, min(max_req, remaining // left))
counts.append(rng.randint(lo, hi))
remaining -= counts[-1]
rng.shuffle(counts)
return counts
# ---------------------------------------------------------------------------
# Timestamp helper
# ---------------------------------------------------------------------------
_BASE_TIME = None
def _now_minus(seconds):
"""ISO-8601 UTC timestamp *seconds* in the past."""
global _BASE_TIME
if _BASE_TIME is None:
_BASE_TIME = datetime.now(timezone.utc)
t = _BASE_TIME - timedelta(seconds=seconds)
return t.strftime("%Y-%m-%dT%H:%M:%SZ")
# ---------------------------------------------------------------------------
# Row builder — identical field set to original
# ---------------------------------------------------------------------------
def _make_row(
src_ip, ua, path, method="GET", ja4=None, tcp_profile="linux",
scheme="https", host="platform", time_offset_s=None,
extra_headers=None, rng=None,
):
"""Build a single raw_json dict matching what the correlator produces."""
if rng is None:
rng = random
if time_offset_s is None:
time_offset_s = rng.randint(0, 1700)
if ja4 is None:
ja4 = "t13d1917h2_b0372614b25a_6a77dcf5a8be"
ja3_raw, ja3_hash = _ja3_for_ja4(ja4)
tcp = _tcp_meta(tcp_profile, rng)
syn_ms = _syn_ms(tcp_profile, rng)
client_headers = "Host,User-Agent,Accept,Accept-Language,Accept-Encoding"
if extra_headers:
client_headers += "," + ",".join(extra_headers.keys())
row = {
"time": _now_minus(time_offset_s),
"src_ip": src_ip,
"src_port": rng.randint(1024, 65535),
"dst_ip": "172.20.0.2",
"dst_port": 443 if scheme == "https" else 80,
"method": method,
"scheme": scheme,
"host": host,
"path": path.split("?")[0] if "?" in path else path,
"query": path.split("?")[1] if "?" in path else "",
"http_version": "HTTP/2.0" if "h2" in ja4 else "HTTP/1.1",
"orphan_side": "",
"correlated": True,
"keepalives": rng.randint(1, 8),
"a_timestamp": int(time.time() * 1_000_000),
"b_timestamp": int(time.time() * 1_000_000) + syn_ms * 1000,
"conn_id": f"seed_{src_ip.replace('.', '_')}"
f"_{rng.randint(1000, 9999)}",
"syn_to_clienthello_ms": syn_ms,
"tls_version": ("1.3" if ja4.startswith("t13")
else "1.2" if ja4.startswith("t12")
else "1.0"),
"tls_sni": host,
"tls_alpn": "h2" if "h2" in ja4 else "http/1.1",
"ja3": ja3_raw,
"ja3_hash": ja3_hash,
"ja4": ja4,
"client_headers": client_headers,
"header_User-Agent": ua,
"header_Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
"header_Accept-Encoding": "gzip, deflate, br",
"header_Accept-Language": "",
"header_Content-Type": "",
"header_X-Request-Id": "",
"header_X-Trace-Id": "",
"header_X-Forwarded-For": "",
"header_Sec-Fetch-Site": "",
"header_Sec-Fetch-Mode": "",
"header_Sec-Fetch-Dest": "",
"header_Sec-CH-UA": "",
"header_Sec-CH-UA-Mobile": "",
"header_Sec-CH-UA-Platform": "",
**tcp,
}
if extra_headers:
row.update({f"header_{k}": v for k, v in extra_headers.items()})
return row
# ---------------------------------------------------------------------------
# Traffic generators
# ---------------------------------------------------------------------------
def generate_browser_traffic(n_ips, total_rows, browser_ja4s, rng):
"""Generate legitimate browser sessions with realistic navigation.
Each IP gets a consistent browser profile (family, JA4, UA, TCP) and
produces page navigations + asset/API requests with Referer chains,
cookies, Sec-Fetch headers, and Sec-CH-UA for Chromium browsers.
"""
rows = []
ips = _gen_ips_from_prefixes(ISP_PREFIXES, n_ips, rng)
counts = _distribute_requests(len(ips), total_rows, 5, 50, rng)
families = (list(browser_ja4s.keys())
if browser_ja4s else list(BROWSER_UAS.keys()))
family_weights = [BROWSER_WEIGHTS.get(f, 1) for f in families]
platform_map = {
"windows": '"Windows"', "linux": '"Linux"',
"macos": '"macOS"', "android": '"Android"',
}
for ip, n_req in zip(ips, counts):
family = rng.choices(families, weights=family_weights, k=1)[0]
ja4_list = browser_ja4s.get(family, [])
if not ja4_list:
for fb in ("Chromium", "Firefox", "Safari"):
ja4_list = browser_ja4s.get(fb, [])
if ja4_list:
break
if not ja4_list:
ja4_list = ["t13d1917h2_b0372614b25a_6a77dcf5a8be"]
ja4 = rng.choice(ja4_list)
ua = rng.choice(BROWSER_UAS.get(family, BROWSER_UAS["Chromium"]))
tcp = rng.choice(BROWSER_TCP.get(family, ["linux"]))
host = rng.choice(HOSTS)
lang = rng.choice(ACCEPT_LANGUAGES)
is_chromium = family in CHROMIUM_FAMILIES
session_cookie = f"sid={rng.randint(100000, 999999)}"
ch_ua = SEC_CH_UA.get(family, "") if is_chromium else ""
ch_mobile = "?0" if is_chromium else ""
ch_plat = platform_map.get(tcp, "") if is_chromium else ""
base_offset = rng.randint(60, 1700)
current_page = ""
for req_i in range(n_req):
offset = max(1, base_offset - req_i * rng.randint(1, 5))
if req_i == 0 or rng.random() < 0.25:
# Navigate to a new page
path = rng.choice(PAGE_PATHS)
method = "GET"
referer = ("" if req_i == 0
else f"https://{host}{current_page}")
sec_site = "none" if req_i == 0 else "same-origin"
sec_mode = "navigate"
sec_dest = "document"
current_page = path
elif rng.random() < 0.5:
# Asset request
path = rng.choice(ASSET_PATHS)
method = "GET"
referer = (f"https://{host}{current_page}"
if current_page else "")
sec_site = "same-origin"
sec_mode = "no-cors"
sec_dest = rng.choice(["script", "style", "image", "font"])
elif rng.random() < 0.4:
# API call
path = rng.choice(API_PATHS)
method = rng.choice(["GET", "POST"])
referer = (f"https://{host}{current_page}"
if current_page else "")
sec_site = "same-origin"
sec_mode = "cors"
sec_dest = "empty"
else:
# Another page navigation
path = rng.choice(PAGE_PATHS)
method = "GET"
referer = (f"https://{host}{current_page}"
if current_page else "")
sec_site = "same-origin"
sec_mode = "navigate"
sec_dest = "document"
current_page = path
row = _make_row(
src_ip=ip, ua=ua, path=path, method=method,
ja4=ja4, tcp_profile=tcp, scheme="https", host=host,
time_offset_s=offset, rng=rng,
)
row["header_Accept-Language"] = lang
row["header_Sec-Fetch-Site"] = sec_site
row["header_Sec-Fetch-Mode"] = sec_mode
row["header_Sec-Fetch-Dest"] = sec_dest
row["header_Sec-CH-UA"] = ch_ua
row["header_Sec-CH-UA-Mobile"] = ch_mobile
row["header_Sec-CH-UA-Platform"] = ch_plat
if referer:
row["header_Referer"] = referer
if "Referer" not in row["client_headers"]:
row["client_headers"] += ",Referer"
if req_i > 0:
row["header_Cookie"] = session_cookie
if "Cookie" not in row["client_headers"]:
row["client_headers"] += ",Cookie"
rows.append(row)
return rows
def generate_scanner_traffic(n_ips, total_rows, bot_ja4s, rng):
"""Generate malicious bot/scanner traffic in four sub-categories.
a. Vulnerability scanners (30%): diverse attack paths
b. Credential stuffers (20%): POST /login brute-force
c. Content scrapers (30%): methodical path crawling
d. DDoS-like (20%): same path hammered 100+ times
"""
rows = []
ips = _gen_ips_from_prefixes(DC_SCANNER_PREFIXES, n_ips, rng)
n_vuln = int(n_ips * 0.30)
n_cred = int(n_ips * 0.20)
n_scrap = int(n_ips * 0.30)
n_ddos = n_ips - n_vuln - n_cred - n_scrap
# Compute row budgets respecting per-IP minimums
min_vuln = n_vuln * 10
min_cred = n_cred * 15
min_scrap = n_scrap * 10
min_ddos = n_ddos * 100
total_min = min_vuln + min_cred + min_scrap + min_ddos
surplus = max(0, total_rows - total_min)
r_ddos = min_ddos + int(surplus * 0.40)
r_vuln = min_vuln + int(surplus * 0.20)
r_cred = min_cred + int(surplus * 0.15)
r_scrap = total_rows - r_ddos - r_vuln - r_cred
idx = 0
# --- Vulnerability scanners ---
vuln_ips = ips[idx:idx + n_vuln]; idx += n_vuln
counts = _distribute_requests(len(vuln_ips), r_vuln, 10, 60, rng)
for ip, n_req in zip(vuln_ips, counts):
ua = rng.choice(SCANNER_UAS)
ja4 = rng.choice(bot_ja4s)
rotate_ua = rng.random() < 0.3
for _ in range(n_req):
if rotate_ua:
ua = rng.choice(SCANNER_UAS)
rows.append(_make_row(
src_ip=ip, ua=ua, path=rng.choice(ATTACK_PATHS),
method=rng.choice(["GET", "GET", "HEAD"]),
ja4=ja4, tcp_profile="scanner", scheme="https",
host=rng.choice(HOSTS), rng=rng,
))
# Some vuln scanners do port probing (diverse dst_port)
for ip in rng.sample(vuln_ips, min(100, len(vuln_ips))):
for port in rng.sample(range(80, 10000), 5):
row = _make_row(
src_ip=ip, ua="-", path="/",
method="GET", ja4=rng.choice(bot_ja4s),
tcp_profile="scanner", scheme="https",
host=rng.choice(HOSTS), rng=rng,
)
row["dst_port"] = port
rows.append(row)
# --- Credential stuffers ---
login_paths = [
"/login", "/admin/login", "/api/auth/login",
"/wp-login.php", "/user/signin", "/api/v1/auth",
]
cred_ips = ips[idx:idx + n_cred]; idx += n_cred
counts = _distribute_requests(len(cred_ips), r_cred, 15, 60, rng)
for ip, n_req in zip(cred_ips, counts):
ua = rng.choice(SCANNER_UAS + HEADLESS_UAS)
ja4 = rng.choice(bot_ja4s)
target = rng.choice(login_paths)
for _ in range(n_req):
rows.append(_make_row(
src_ip=ip, ua=ua, path=target, method="POST",
ja4=ja4, tcp_profile="scanner", scheme="https",
host=rng.choice(HOSTS[:2]), rng=rng,
extra_headers={
"Content-Type": "application/x-www-form-urlencoded",
"Content-Length": str(rng.randint(20, 60)),
},
))
# --- Content scrapers ---
scrap_ips = ips[idx:idx + n_scrap]; idx += n_scrap
counts = _distribute_requests(len(scrap_ips), r_scrap, 10, 60, rng)
for ip, n_req in zip(scrap_ips, counts):
ua = rng.choice(SCANNER_UAS[:8])
ja4 = rng.choice(bot_ja4s)
for i in range(n_req):
rows.append(_make_row(
src_ip=ip, ua=ua,
path=SCRAPER_PATHS[i % len(SCRAPER_PATHS)],
method="GET", ja4=ja4, tcp_profile="scanner",
scheme="https", host=rng.choice(HOSTS), rng=rng,
))
# --- DDoS-like ---
ddos_targets = ["/", "/api/v1/search", "/products", "/api/v2/metrics"]
ddos_ips = ips[idx:idx + n_ddos]
counts = _distribute_requests(len(ddos_ips), r_ddos, 100, 200, rng)
for ip, n_req in zip(ddos_ips, counts):
ua = rng.choice(SCANNER_UAS)
ja4 = rng.choice(bot_ja4s)
target = rng.choice(ddos_targets)
for _ in range(n_req):
rows.append(_make_row(
src_ip=ip, ua=ua, path=target, method="GET",
ja4=ja4,
tcp_profile=rng.choice(["scanner", "minimal"]),
scheme="https", host=HOSTS[0],
time_offset_s=rng.randint(0, 300), rng=rng,
))
return rows
def generate_legit_bot_traffic(n_ips, total_rows, bot_ips_by_family,
bot_ja4s, rng):
"""Generate traffic from known legitimate bots (Googlebot, Bingbot, …).
IPs are drawn from bot_ip.csv CIDRs when available so they match
the bot_ip dictionary for direct labelling.
"""
rows = []
legit_families = list(LEGIT_BOT_UAS.keys())
all_ips = []
ip_family_map = {}
per_family = max(1, n_ips // len(legit_families))
for family in legit_families:
cidrs = bot_ips_by_family.get(family, [])
if cidrs:
fam_ips = _ips_from_cidrs(cidrs, per_family, rng)
else:
fam_ips = _gen_ips_from_prefixes(
DC_LEGIT_BOT_PREFIXES, per_family, rng)
for ip in fam_ips:
ip_family_map[ip] = family
all_ips.extend(fam_ips)
if len(all_ips) < n_ips:
extra = _gen_ips_from_prefixes(
DC_LEGIT_BOT_PREFIXES, n_ips - len(all_ips), rng)
for ip in extra:
ip_family_map[ip] = rng.choice(legit_families)
all_ips.extend(extra)
all_ips = all_ips[:n_ips]
counts = _distribute_requests(len(all_ips), total_rows, 5, 30, rng)
for ip, n_req in zip(all_ips, counts):
family = ip_family_map.get(ip, rng.choice(legit_families))
ua = rng.choice(
LEGIT_BOT_UAS.get(family, LEGIT_BOT_UAS["Googlebot"]))
ja4 = rng.choice(bot_ja4s)
# Bots: robots.txt first, then sitemap, then content
paths = ["/robots.txt"]
if rng.random() < 0.7:
paths.append("/sitemap.xml")
remaining_n = n_req - len(paths)
paths.extend(
rng.choices(CONTENT_PATHS + BOT_PATHS, k=max(0, remaining_n)))
paths = paths[:n_req]
for path in paths:
rows.append(_make_row(
src_ip=ip, ua=ua, path=path, method="GET",
ja4=ja4,
tcp_profile=rng.choice(["linux", "linux", "scanner"]),
scheme="https", host=rng.choice(HOSTS), rng=rng,
))
return rows
def generate_ai_bot_traffic(n_ips, total_rows, bot_ips_by_family,
bot_ja4s, rng):
"""Generate aggressive AI bot scraping traffic (GPTBot, ClaudeBot, …)."""
rows = []
ai_families = list(AI_BOT_UAS.keys())
all_ips = []
ip_family_map = {}
per_family = max(1, n_ips // len(ai_families))
for family in ai_families:
cidrs = bot_ips_by_family.get(family, [])
if cidrs:
fam_ips = _ips_from_cidrs(cidrs, per_family, rng)
else:
fam_ips = _gen_ips_from_prefixes(
DC_AI_PREFIXES, per_family, rng)
for ip in fam_ips:
ip_family_map[ip] = family
all_ips.extend(fam_ips)
if len(all_ips) < n_ips:
extra = _gen_ips_from_prefixes(
DC_AI_PREFIXES, n_ips - len(all_ips), rng)
for ip in extra:
ip_family_map[ip] = rng.choice(ai_families)
all_ips.extend(extra)
all_ips = all_ips[:n_ips]
counts = _distribute_requests(len(all_ips), total_rows, 10, 50, rng)
for ip, n_req in zip(all_ips, counts):
family = ip_family_map.get(ip, rng.choice(ai_families))
ua = rng.choice(AI_BOT_UAS[family])
ja4 = rng.choice(bot_ja4s)
paths = rng.choices(CONTENT_PATHS, k=n_req)
for path in paths:
rows.append(_make_row(
src_ip=ip, ua=ua, path=path, method="GET",
ja4=ja4, tcp_profile="linux", scheme="https",
host=rng.choice(HOSTS), rng=rng,
))
return rows
# ---------------------------------------------------------------------------
# ClickHouse insert
# ---------------------------------------------------------------------------
def _ch_insert(rows, host, port, user, password,
batch_size=2000, dry_run=False):
"""Insert rows into ja4_logs.http_logs_raw via ClickHouse HTTP interface.
Each row is wrapped as {"raw_json": ""} in JSONEachRow format.
"""
if dry_run:
print(f"[seed] DRY-RUN — would insert {len(rows)} rows")
print("[seed] Sample row:", json.dumps(rows[0], indent=2)[:400])
return len(rows)
query = "INSERT INTO ja4_logs.http_logs_raw (raw_json) FORMAT JSONEachRow"
url = (
f"http://{host}:{port}/"
f"?query={urllib.parse.quote(query)}"
f"&user={urllib.parse.quote(user)}"
f"&password={urllib.parse.quote(password)}"
)
total_inserted = 0
n_batches = (len(rows) + batch_size - 1) // batch_size
for i in range(0, len(rows), batch_size):
batch = rows[i:i + batch_size]
body_lines = []
for row in batch:
outer = {"raw_json": json.dumps(row, separators=(",", ":"))}
body_lines.append(json.dumps(outer, separators=(",", ":")))
body = "\n".join(body_lines).encode("utf-8")
req = urllib.request.Request(
url, data=body, method="POST",
headers={"Content-Type": "application/x-ndjson; charset=utf-8"},
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
resp.read()
total_inserted += len(batch)
except urllib.error.HTTPError as e:
err_body = e.read(500).decode("utf-8", errors="replace")
print(f"[seed] ERROR batch {i}–{i+batch_size}: "
f"HTTP {e.code}: {err_body}")
except Exception as e:
print(f"[seed] ERROR batch {i}–{i+batch_size}: {e}")
if total_inserted % 10000 < batch_size:
batch_num = i // batch_size + 1
print(f"[seed] Progress: {total_inserted:,}/{len(rows):,} rows "
f"({batch_num}/{n_batches} batches)")
return total_inserted
def _wait_for_clickhouse(host, port, user, password, timeout_s=60):
"""Wait for ClickHouse to be ready."""
url = (
f"http://{host}:{port}/"
f"?query=SELECT+1"
f"&user={urllib.parse.quote(user)}"
f"&password={urllib.parse.quote(password)}"
)
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
try:
with urllib.request.urlopen(url, timeout=5) as r:
if r.read().strip() == b"1":
return True
except Exception:
pass
time.sleep(2)
return False
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Seed ClickHouse with synthetic traffic")
parser.add_argument("--host", default="clickhouse")
parser.add_argument("--port", type=int, default=8123)
parser.add_argument("--user", default="default")
parser.add_argument("--password", default="")
parser.add_argument("--dry-run", action="store_true",
help="Generate data but do not insert")
parser.add_argument("--rows", type=int, default=500_000,
help="Total rows to generate (default: 500000)")
parser.add_argument("--ips", type=int, default=20_000,
help="Total unique IPs (default: 20000)")
parser.add_argument("--seed", type=int, default=None,
help="Random seed for reproducibility")
parser.add_argument("--data-dir", default=None,
help="Path to CSV data directory "
"(browser_ja4.csv, bot_ip.csv)")
args = parser.parse_args()
rng = random.Random(args.seed)
if not args.dry_run:
print(f"[seed] Waiting for ClickHouse at {args.host}:{args.port}…")
if not _wait_for_clickhouse(args.host, args.port,
args.user, args.password):
print("[seed] FATAL: ClickHouse not reachable after 60s")
raise SystemExit(1)
print("[seed] ClickHouse ready.")
t0 = time.monotonic()
# --- Load CSV data ---
data_dir = _find_data_dir(args.data_dir)
if data_dir:
print(f"[seed] Loading CSV data from {data_dir}")
else:
print("[seed] WARNING: CSV data directory not found, "
"using fallback data")
browser_ja4s = load_browser_ja4s(data_dir)
bot_ips_by_family = load_bot_ips(data_dir)
browser_ja4_set = set()
for ja4_list in browser_ja4s.values():
browser_ja4_set.update(ja4_list)
if browser_ja4s:
total_ja4 = sum(len(v) for v in browser_ja4s.values())
print(f"[seed] Loaded {total_ja4} browser JA4s "
f"across {len(browser_ja4s)} families")
if bot_ips_by_family:
total_cidrs = sum(len(v) for v in bot_ips_by_family.values())
print(f"[seed] Loaded {total_cidrs} bot CIDRs "
f"across {len(bot_ips_by_family)} families")
bot_ja4s = _generate_bot_ja4s(browser_ja4_set, rng)
# --- IP and row budgets ---
n_browser_ips = int(args.ips * 0.70)
n_scanner_ips = int(args.ips * 0.15)
n_legit_bot_ips = int(args.ips * 0.10)
n_ai_bot_ips = (args.ips - n_browser_ips
- n_scanner_ips - n_legit_bot_ips)
n_browser_rows = int(args.rows * 0.70)
n_scanner_rows = int(args.rows * 0.20)
n_legit_bot_rows = int(args.rows * 0.06)
n_ai_bot_rows = (args.rows - n_browser_rows
- n_scanner_rows - n_legit_bot_rows)
print(f"[seed] Generating {args.rows:,} rows from {args.ips:,} IPs…")
# --- Generate ---
browser_rows = generate_browser_traffic(
n_browser_ips, n_browser_rows, browser_ja4s, rng)
print(f"[seed] Browser: {len(browser_rows):>7,} rows "
f"({len(set(r['src_ip'] for r in browser_rows)):,} IPs)")
scanner_rows = generate_scanner_traffic(
n_scanner_ips, n_scanner_rows, bot_ja4s, rng)
print(f"[seed] Scanner: {len(scanner_rows):>7,} rows "
f"({len(set(r['src_ip'] for r in scanner_rows)):,} IPs)")
legit_bot_rows = generate_legit_bot_traffic(
n_legit_bot_ips, n_legit_bot_rows,
bot_ips_by_family, bot_ja4s, rng)
print(f"[seed] Legit bots: {len(legit_bot_rows):>7,} rows "
f"({len(set(r['src_ip'] for r in legit_bot_rows)):,} IPs)")
ai_bot_rows = generate_ai_bot_traffic(
n_ai_bot_ips, n_ai_bot_rows,
bot_ips_by_family, bot_ja4s, rng)
print(f"[seed] AI bots: {len(ai_bot_rows):>7,} rows "
f"({len(set(r['src_ip'] for r in ai_bot_rows)):,} IPs)")
all_rows = browser_rows + scanner_rows + legit_bot_rows + ai_bot_rows
rng.shuffle(all_rows)
gen_elapsed = time.monotonic() - t0
print(f"[seed] Generated {len(all_rows):,} total rows "
f"in {gen_elapsed:.1f}s")
# --- Insert ---
inserted = _ch_insert(
all_rows, args.host, args.port, args.user, args.password,
batch_size=2000, dry_run=args.dry_run,
)
elapsed = time.monotonic() - t0
print(f"[seed] Done: {inserted:,}/{len(all_rows):,} rows inserted "
f"in {elapsed:.1f}s")
if inserted < len(all_rows) * 0.9:
print("[seed] WARNING: fewer than 90% of rows inserted — "
"check errors above")
raise SystemExit(1)
print(f"[seed] The bot_detector should now see ≥ 500 human sessions "
f"in view_ai_features_1h (after MV propagation).")
if __name__ == "__main__":
main()