Files
ja4-platform/tests/integration/traffic-gen/seed_clickhouse.py
toto 98289ccf04 fix: ASN dictionary pipeline + verbose bot-detector logging
- Fix dict_iplocate_asn: remove non-existent org/domain columns (4→4 cols)
- Add CSV header to iplocate-ip-to-asn.csv (CSVWithNames format)
- Replace org/domain dictGet calls with empty string literals in MV
- Full 714K CIDR stub for complete ASN resolution in tests
- Add header generation to generate_asn_data.py
- Verbose bot-detector stdout: data summary, triage breakdown, model
  training details, scoring stats, browser classification, boxed results
- Fix IPv6 filter in traffic seeder (_ips_from_cidrs)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-08 17:43:55 +02:00

1231 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
seed_clickhouse.py — Bootstrap ClickHouse with realistic synthetic traffic data.
Inserts directly into ja4_logs.http_logs_raw (triggers all MVs automatically):
• ~350,000 rows from 14,000 legitimate browser IPs (ISP ranges, asn_label='isp')
• ~100,000 rows from 3,000 malicious bot/scanner IPs (datacenter ranges)
• ~30,000 rows from 2,000 legitimate bot IPs (from bot_ip.csv CIDRs)
• ~20,000 rows from 1,000 AI bot IPs (datacenter ranges)
Total: 500,000 rows from 20,000 unique IPs (configurable via --rows / --ips).
Browser JA4 fingerprints are loaded from browser_ja4.csv so they match
dict_browser_ja4 (LEGITIMATE_BROWSER classification). Bot/scanner JA4s are
synthetic hashes guaranteed NOT to appear in that dictionary.
Bot IPs are drawn from real CIDRs in bot_ip.csv (Googlebot, Bingbot, etc.).
ISP and datacenter IPs use hard-coded /24 prefixes from well-known ASNs that
resolve correctly through iplocate-ip-to-asn.csv → asn_reputation.csv.
This ensures view_ai_features_1h has ≥ 500 human rows for the bot_detector
training threshold (run_semi_supervised_logic requires len(human_baseline) >= 500).
All timestamps are within the last 30 minutes so the 24h window filter catches them.
No external dependencies — uses Python stdlib only.
Usage:
python seed_clickhouse.py
python seed_clickhouse.py --host clickhouse --port 8123 --user default --password ""
python seed_clickhouse.py --rows 500000 --ips 20000 --seed 42
python seed_clickhouse.py --dry-run
"""
import argparse
import csv
import hashlib
import ipaddress
import json
import os
import random
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timedelta, timezone
# ---------------------------------------------------------------------------
# Hard-coded /24 prefixes — guaranteed to resolve via ASN dictionaries
# ---------------------------------------------------------------------------
# ISP ranges (asn_label='isp' in asn_reputation.csv)
ISP_PREFIXES = (
# Comcast AS7922 — within 24.0.0.0/12
[f"24.{o2}.{o3}" for o2 in range(1, 11) for o3 in range(0, 3)]
# Orange AS3215 — within 2.3.0.0/16
+ [f"2.3.{o3}" for o3 in range(0, 10)]
# Deutsche Telekom AS3320 — within 2.160.0.0/12
+ [f"2.{160 + o2}.{o3}" for o2 in range(0, 5) for o3 in range(0, 2)]
# AT&T AS7018 — within 12.0.0.0/10
+ [f"12.0.{o3}" for o3 in range(4, 14)]
# Verizon AS701 — within 63.0.0.0/12
+ [f"63.{o2}.0" for o2 in range(0, 10)]
# BT AS2856 — within 5.80.0.0/15
+ [f"5.80.{o3}" for o3 in range(0, 8)]
) # ~68 prefixes × 254 ≈ 17K IPs
# Datacenter ranges for scanners (asn_label='datacenter')
DC_SCANNER_PREFIXES = (
# DigitalOcean AS14061 — within 5.101.96.0/20
[f"5.101.{96 + o3}" for o3 in range(0, 6)]
# Hetzner AS24940 — within 5.9.0.0/16
+ [f"5.9.{o3}" for o3 in range(0, 6)]
# OVH AS16276 — within 5.39.0.0/17
+ [f"5.39.{o3}" for o3 in range(0, 5)]
) # ~17 prefixes × 254 ≈ 4.3K IPs
# Datacenter ranges for AI bots (separate from scanner ranges)
DC_AI_PREFIXES = (
# DigitalOcean (different /24s)
[f"5.101.{102 + o3}" for o3 in range(0, 4)]
# Hetzner (different /24s)
+ [f"5.9.{6 + o3}" for o3 in range(0, 4)]
) # ~8 prefixes × 254 ≈ 2K IPs
# Fallback /24s for legitimate bot overflow
DC_LEGIT_BOT_PREFIXES = [f"5.9.{20 + o3}" for o3 in range(0, 8)]
# ---------------------------------------------------------------------------
# Browser family → User-Agent mapping
# ---------------------------------------------------------------------------
BROWSER_UAS = {
"Chromium": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
"Mozilla/5.0 (Linux; Android 13; Pixel 7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.6099.115 Mobile Safari/537.36",
],
"Firefox": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) "
"Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (X11; Linux x86_64; rv:120.0) "
"Gecko/20100101 Firefox/120.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14.2; rv:121.0) "
"Gecko/20100101 Firefox/121.0",
],
"Safari": [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_2_1) AppleWebKit/605.1.15 "
"(KHTML, like Gecko) Version/17.2 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 "
"(KHTML, like Gecko) Version/17.1 Safari/605.1.15",
],
"Edge": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
],
"Opera": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 OPR/105.0.0.0",
],
"Vivaldi": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 "
"Vivaldi/6.4.3160.47",
],
"Chrome_iOS": [
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"CriOS/120.0.6099.119 Mobile/15E148 Safari/604.1",
],
"Chromium_Legacy": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36",
],
"Firefox_Legacy": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:78.0) "
"Gecko/20100101 Firefox/78.0",
],
"Safari_Legacy": [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/15.6 Safari/605.1.15",
],
"Tor_Browser": [
"Mozilla/5.0 (Windows NT 10.0; rv:102.0) "
"Gecko/20100101 Firefox/102.0",
],
}
BROWSER_TCP = {
"Chromium": ["windows", "linux", "android"],
"Firefox": ["windows", "linux"],
"Safari": ["macos"],
"Edge": ["windows"],
"Opera": ["windows", "linux"],
"Vivaldi": ["windows", "linux"],
"Chrome_iOS": ["macos"],
"Chromium_Legacy": ["windows"],
"Firefox_Legacy": ["windows", "linux"],
"Safari_Legacy": ["macos"],
"Tor_Browser": ["linux"],
}
CHROMIUM_FAMILIES = {
"Chromium", "Edge", "Opera", "Vivaldi",
"Chrome_iOS", "Chromium_Legacy",
}
BROWSER_WEIGHTS = {
"Chromium": 55, "Firefox": 15, "Safari": 12, "Edge": 8,
"Opera": 3, "Vivaldi": 1, "Chrome_iOS": 3,
"Chromium_Legacy": 1, "Firefox_Legacy": 1,
"Safari_Legacy": 0.5, "Tor_Browser": 0.5,
}
SEC_CH_UA = {
"Chromium":
'"Chromium";v="120", "Google Chrome";v="120", "Not-A.Brand";v="99"',
"Edge":
'"Chromium";v="120", "Microsoft Edge";v="120", "Not-A.Brand";v="99"',
"Opera":
'"Chromium";v="119", "Opera";v="105", "Not-A.Brand";v="99"',
"Vivaldi":
'"Chromium";v="118", "Vivaldi";v="6.4", "Not-A.Brand";v="99"',
"Chrome_iOS":
'"Chromium";v="120", "Google Chrome";v="120", "Not-A.Brand";v="99"',
"Chromium_Legacy":
'"Chromium";v="90", "Google Chrome";v="90", "Not-A.Brand";v="99"',
}
# ---------------------------------------------------------------------------
# Hosts and Accept-Language pools
# ---------------------------------------------------------------------------
HOSTS = ["platform", "api.platform", "www.example.com", "shop.example.com"]
ACCEPT_LANGUAGES = [
"en-US,en;q=0.9",
"en-GB,en;q=0.9",
"fr-FR,fr;q=0.9,en;q=0.8",
"de-DE,de;q=0.9,en;q=0.8",
"es-ES,es;q=0.9,en;q=0.8",
"ja-JP,ja;q=0.9,en;q=0.8",
"pt-BR,pt;q=0.9,en;q=0.8",
"zh-CN,zh;q=0.9,en;q=0.8",
]
# ---------------------------------------------------------------------------
# Path pools
# ---------------------------------------------------------------------------
PAGE_PATHS = [
"/", "/index.html", "/about", "/contact", "/products", "/services",
"/blog", "/blog/post-1", "/blog/post-2", "/blog/post-3", "/faq",
"/pricing", "/login", "/register", "/profile", "/dashboard",
"/docs", "/docs/getting-started", "/docs/api-reference",
"/help", "/terms", "/privacy", "/search",
]
ASSET_PATHS = [
"/static/js/app.js", "/static/js/vendor.js", "/static/js/analytics.js",
"/static/css/main.css", "/static/css/theme.css",
"/images/logo.png", "/images/hero.webp", "/images/banner.jpg",
"/favicon.ico", "/fonts/inter-400.woff2", "/fonts/inter-700.woff2",
]
API_PATHS = [
"/api/v1/users", "/api/v1/status", "/api/v2/metrics",
"/api/v1/products", "/api/v1/search", "/api/v2/config",
]
ATTACK_PATHS = [
"/.env", "/.git/HEAD", "/.git/config",
"/wp-login.php", "/wp-admin/", "/xmlrpc.php", "/wp-config.php",
"/phpmyadmin/", "/phpMyAdmin/", "/pma/",
"/admin", "/admin/login", "/administrator/",
"/cgi-bin/test.cgi", "/cgi-bin/../etc/passwd",
"/download?file=../../../etc/passwd",
"/download?file=../../../../etc/shadow",
"/api/search?q=<script>alert(1)</script>",
"/api/users?id=1+OR+1%3D1",
"/shell.php", "/cmd.php", "/eval.php",
"/.aws/credentials", "/.ssh/id_rsa",
"/etc/passwd", "/proc/self/environ",
"/actuator", "/actuator/env", "/actuator/health",
"/server-status", "/.svn/entries",
"/wp-content/uploads/", "/backup.zip", "/db.sql",
"/api/v1/../admin", "/api/debug",
"/.htaccess", "/.htpasswd",
"/console", "/debug/pprof/",
]
SCRAPER_PATHS = (
[f"/products/page/{i}" for i in range(1, 51)]
+ [
f"/category/{c}/page/{i}"
for c in ["electronics", "clothing", "books", "home", "sports"]
for i in range(1, 11)
]
)
BOT_PATHS = [
"/robots.txt", "/sitemap.xml", "/", "/index.html",
"/sitemap_index.xml", "/news-sitemap.xml",
"/feed", "/rss.xml", "/atom.xml",
]
CONTENT_PATHS = PAGE_PATHS + [f"/blog/post-{i}" for i in range(1, 21)] + [
f"/products/{s}"
for s in ["widget-a", "widget-b", "gadget-x", "tool-pro", "kit-basic"]
]
# ---------------------------------------------------------------------------
# Scanner / bot User-Agents
# ---------------------------------------------------------------------------
SCANNER_UAS = [
"curl/7.88.1",
"curl/8.1.2",
"python-requests/2.31.0",
"python-requests/2.28.1",
"python-urllib3/2.0.4",
"Masscan/1.3",
"masscan/1.3 (https://github.com/robertdavidgraham/masscan)",
"zgrab/0.x",
"Go-http-client/1.1",
"Go-http-client/2.0",
"libwww-perl/6.72",
"Java/11.0.18",
"Java/17.0.2",
"Wget/1.21.3",
"Scrapy/2.11.0",
"Apache-HttpClient/4.5.14",
"okhttp/4.12.0",
"Node-Fetch/1.0",
"axios/1.6.2",
"-",
"",
]
HEADLESS_UAS = [
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) HeadlessChrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) HeadlessChrome/119.0.0.0 Safari/537.36",
]
LEGIT_BOT_UAS = {
"Googlebot": [
"Mozilla/5.0 (compatible; Googlebot/2.1; "
"+http://www.google.com/bot.html)",
"Googlebot/2.1 (+http://www.google.com/bot.html)",
"Mozilla/5.0 (Linux; Android 6.0.1; Nexus 5X Build/MMB29P) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.6099.71 Mobile Safari/537.36 "
"(compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
],
"Bingbot": [
"Mozilla/5.0 (compatible; bingbot/2.0; "
"+http://www.bing.com/bingbot.htm)",
"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; "
"compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm) "
"Chrome/116.0.1938.76 Safari/537.36",
],
"DuckDuckBot": [
"DuckDuckBot/1.1; (+http://duckduckgo.com/duckduckbot.html)",
],
"Applebot": [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 "
"Safari/605.1.15 (Applebot/0.1; "
"+http://www.apple.com/go/applebot)",
],
"YandexBot": [
"Mozilla/5.0 (compatible; YandexBot/3.0; "
"+http://yandex.com/bots)",
],
"Twitterbot": [
"Twitterbot/1.0",
],
"FacebookBot": [
"facebookexternalhit/1.1 "
"(+http://www.facebook.com/externalhit_uatext.php)",
],
}
AI_BOT_UAS = {
"GPTBot": [
"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; "
"compatible; GPTBot/1.0; +https://openai.com/gptbot)",
],
"PerplexityBot": [
"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; "
"compatible; PerplexityBot/1.0; "
"+https://docs.perplexity.ai/docs/perplexity-bot)",
],
"ClaudeBot": [
"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; "
"compatible; ClaudeBot/1.0; "
"+https://www.anthropic.com/claude-bot)",
],
"CCBot": [
"CCBot/2.0 (https://commoncrawl.org/faq/)",
],
"Bytespider": [
"Mozilla/5.0 (Linux; Android 5.0) AppleWebKit/537.36 "
"(KHTML, like Gecko) Mobile Safari/537.36 "
"(compatible; Bytespider; spider-feedback@bytedance.com)",
],
}
# ---------------------------------------------------------------------------
# TCP / TLS metadata helpers
# ---------------------------------------------------------------------------
TCP_OPTIONS = {
"linux": "020405b40402080affffffff000000000103030a",
"windows": "020405b40103030801010402",
"macos": "020405ac0103030601010402",
"android": "020405b40402080affffffff000000000103030a",
"scanner": "0204ffff",
"minimal": "0204ffd7",
}
TCP_PROFILES = {
"linux": {"window_size": 65535, "mss": 1460, "wscale": 10,
"ttl": 64, "df": 1},
"windows": {"window_size": 64240, "mss": 1460, "wscale": 8,
"ttl": 128, "df": 1},
"macos": {"window_size": 65535, "mss": 1460, "wscale": 6,
"ttl": 64, "df": 1},
"android": {"window_size": 65535, "mss": 1420, "wscale": 9,
"ttl": 64, "df": 1},
"scanner": {"window_size": 1024, "mss": 1460, "wscale": 0,
"ttl": 48, "df": 0},
"minimal": {"window_size": 512, "mss": 576, "wscale": 0,
"ttl": 60, "df": 0},
}
def _tcp_meta(profile, rng):
meta = TCP_PROFILES.get(profile, TCP_PROFILES["linux"])
return {
"tcp_meta_window_size": meta["window_size"] + rng.randint(-100, 100),
"tcp_meta_mss": meta["mss"],
"tcp_meta_window_scale": meta["wscale"],
"tcp_meta_options": TCP_OPTIONS.get(profile, TCP_OPTIONS["linux"]),
"ip_meta_ttl": meta["ttl"] - rng.randint(0, 5),
"ip_meta_df": meta["df"],
"ip_meta_id": rng.randint(1, 65535),
"ip_meta_total_length": rng.randint(1200, 1500),
}
def _syn_ms(profile, rng):
"""Realistic SYN→ClientHello latency in milliseconds."""
if profile == "scanner":
return rng.randint(0, 3)
if profile == "minimal":
return rng.randint(1, 5)
return rng.randint(10, 120)
def _ja3_for_ja4(ja4):
"""Generate a plausible JA3 string and its MD5 hash."""
if ja4.startswith("t13"):
raw = ("771,4866-4867-4865-49196-49200-52393-52392,"
"0-23-65281-10-11-35-16-5-13-18-51-45-43-27,29-23-24,0")
elif ja4.startswith("t12"):
raw = ("771,49195-49199-49196-49200-52393-52392,"
"0-23-65281-10-11-35-16-5-13,29-23-24,0")
elif ja4.startswith("t10"):
raw = "769,49161-49162-49171-49172,0-10-11,29-23-24,0"
else:
raw = "771,4866-4867-4865,0-23-65281,29-23-24,0"
return raw, hashlib.md5(raw.encode()).hexdigest()
# ---------------------------------------------------------------------------
# CSV loading with fallback
# ---------------------------------------------------------------------------
_DATA_SEARCH_PATHS = [
"/app/data",
os.path.join(os.path.dirname(os.path.abspath(__file__)),
"..", "..", "..", "scripts", "data"),
os.path.join(os.path.dirname(os.path.abspath(__file__)), "data"),
]
def _find_data_dir(explicit=None):
if explicit and os.path.isdir(explicit):
return explicit
for p in _DATA_SEARCH_PATHS:
real = os.path.realpath(p)
if os.path.isdir(real) and os.path.isfile(
os.path.join(real, "browser_ja4.csv")):
return real
return None
def load_browser_ja4s(data_dir):
"""Load browser_ja4.csv → {family: [ja4_hash, …]} for TLS only."""
result = {}
if not data_dir:
return result
path = os.path.join(data_dir, "browser_ja4.csv")
if not os.path.isfile(path):
return result
with open(path, newline="", encoding="utf-8") as f:
for row in csv.reader(f):
if len(row) < 2:
continue
ja4, family = row[0].strip(), row[1].strip()
if ja4.startswith("t13") or ja4.startswith("t12"):
result.setdefault(family, []).append(ja4)
return result
def load_bot_ips(data_dir):
"""Load bot_ip.csv → {family: [cidr_str, …]}."""
result = {}
if not data_dir:
return result
path = os.path.join(data_dir, "bot_ip.csv")
if not os.path.isfile(path):
return result
with open(path, newline="", encoding="utf-8") as f:
for row in csv.reader(f):
if len(row) < 2:
continue
result.setdefault(row[1].strip(), []).append(row[0].strip())
return result
# ---------------------------------------------------------------------------
# IP generation helpers
# ---------------------------------------------------------------------------
def _gen_ips_from_prefixes(prefixes, n, rng):
"""Generate *n* unique IPs from /24 prefixes."""
ips = set()
attempts = 0
while len(ips) < n and attempts < n * 5:
prefix = rng.choice(prefixes)
octet = rng.randint(1, 254)
ips.add(f"{prefix}.{octet}")
attempts += 1
return list(ips)
def _ips_from_cidrs(cidrs, n, rng):
"""Generate *n* unique IPv4 IPs from a list of CIDR strings."""
networks = []
single_ips = []
for c in cidrs:
try:
net = ipaddress.ip_network(c, strict=False)
if net.version != 4:
continue
if net.prefixlen == 32:
single_ips.append(str(net.network_address))
elif net.num_addresses > 2:
networks.append(net)
except ValueError:
continue
# Start with any /32 single IPs
ips = set(single_ips)
if not networks:
return list(ips)[:n]
weights = [net.num_addresses for net in networks]
attempts = 0
while len(ips) < n and attempts < n * 10:
net = rng.choices(networks, weights=weights, k=1)[0]
host_offset = rng.randint(1, max(1, net.num_addresses - 2))
ips.add(str(net.network_address + host_offset))
attempts += 1
return list(ips)[:n]
def _generate_bot_ja4s(browser_ja4_set, rng, n=20):
"""Generate synthetic JA4 hashes NOT in the browser CSV."""
prefixes = [
"t13d0305", "t13d0203", "t12d0507", "t10d0100", "t13d0101",
"t12d0302", "t13d0405", "t12d0204", "t10d0200", "t13d0102",
]
bot_ja4s = []
for i in range(n):
p = prefixes[i % len(prefixes)]
seg1 = f"{rng.randint(0, 0xFFFFFFFFFFFF):012x}"
seg2 = f"{rng.randint(0, 0xFFFFFFFFFFFF):012x}"
suffix = "h1" if rng.random() < 0.7 else "h2"
ja4 = f"{p}{suffix}_{seg1}_{seg2}"
if ja4 not in browser_ja4_set:
bot_ja4s.append(ja4)
if not bot_ja4s:
bot_ja4s = [
"t13d030500_ffd59bab1b39_6e7f7df63e98",
"t13d020300_6b9b1b2c3d4e_ffd59bab1b39",
"t10d170000_0a1b2c3d4e5f_1b2c3d4e5f60",
"t12d050700_5a6b7c8d9e0f_1a2b3c4d5e6f",
"t13d010100_aabbccddeeff_0011223344aa",
]
return bot_ja4s
# ---------------------------------------------------------------------------
# Request distribution
# ---------------------------------------------------------------------------
def _distribute_requests(n_ips, total_rows, min_req, max_req, rng):
"""Distribute *total_rows* across *n_ips*, each in [min_req, max_req]."""
if n_ips == 0:
return []
counts = []
remaining = total_rows
for i in range(n_ips):
left = n_ips - i
if i == n_ips - 1:
counts.append(max(min_req, min(max_req, remaining)))
break
lo = max(min_req, remaining - (left - 1) * max_req)
hi = min(max_req, remaining - (left - 1) * min_req)
if lo > hi:
lo = hi = max(min_req, min(max_req, remaining // left))
counts.append(rng.randint(lo, hi))
remaining -= counts[-1]
rng.shuffle(counts)
return counts
# ---------------------------------------------------------------------------
# Timestamp helper
# ---------------------------------------------------------------------------
_BASE_TIME = None
def _now_minus(seconds):
"""ISO-8601 UTC timestamp *seconds* in the past."""
global _BASE_TIME
if _BASE_TIME is None:
_BASE_TIME = datetime.now(timezone.utc)
t = _BASE_TIME - timedelta(seconds=seconds)
return t.strftime("%Y-%m-%dT%H:%M:%SZ")
# ---------------------------------------------------------------------------
# Row builder — identical field set to original
# ---------------------------------------------------------------------------
def _make_row(
src_ip, ua, path, method="GET", ja4=None, tcp_profile="linux",
scheme="https", host="platform", time_offset_s=None,
extra_headers=None, rng=None,
):
"""Build a single raw_json dict matching what the correlator produces."""
if rng is None:
rng = random
if time_offset_s is None:
time_offset_s = rng.randint(0, 1700)
if ja4 is None:
ja4 = "t13d1917h2_b0372614b25a_6a77dcf5a8be"
ja3_raw, ja3_hash = _ja3_for_ja4(ja4)
tcp = _tcp_meta(tcp_profile, rng)
syn_ms = _syn_ms(tcp_profile, rng)
client_headers = "Host,User-Agent,Accept,Accept-Language,Accept-Encoding"
if extra_headers:
client_headers += "," + ",".join(extra_headers.keys())
row = {
"time": _now_minus(time_offset_s),
"src_ip": src_ip,
"src_port": rng.randint(1024, 65535),
"dst_ip": "172.20.0.2",
"dst_port": 443 if scheme == "https" else 80,
"method": method,
"scheme": scheme,
"host": host,
"path": path.split("?")[0] if "?" in path else path,
"query": path.split("?")[1] if "?" in path else "",
"http_version": "HTTP/2.0" if "h2" in ja4 else "HTTP/1.1",
"orphan_side": "",
"correlated": True,
"keepalives": rng.randint(1, 8),
"a_timestamp": int(time.time() * 1_000_000),
"b_timestamp": int(time.time() * 1_000_000) + syn_ms * 1000,
"conn_id": f"seed_{src_ip.replace('.', '_')}"
f"_{rng.randint(1000, 9999)}",
"syn_to_clienthello_ms": syn_ms,
"tls_version": ("1.3" if ja4.startswith("t13")
else "1.2" if ja4.startswith("t12")
else "1.0"),
"tls_sni": host,
"tls_alpn": "h2" if "h2" in ja4 else "http/1.1",
"ja3": ja3_raw,
"ja3_hash": ja3_hash,
"ja4": ja4,
"client_headers": client_headers,
"header_User-Agent": ua,
"header_Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
"header_Accept-Encoding": "gzip, deflate, br",
"header_Accept-Language": "",
"header_Content-Type": "",
"header_X-Request-Id": "",
"header_X-Trace-Id": "",
"header_X-Forwarded-For": "",
"header_Sec-Fetch-Site": "",
"header_Sec-Fetch-Mode": "",
"header_Sec-Fetch-Dest": "",
"header_Sec-CH-UA": "",
"header_Sec-CH-UA-Mobile": "",
"header_Sec-CH-UA-Platform": "",
**tcp,
}
if extra_headers:
row.update({f"header_{k}": v for k, v in extra_headers.items()})
return row
# ---------------------------------------------------------------------------
# Traffic generators
# ---------------------------------------------------------------------------
def generate_browser_traffic(n_ips, total_rows, browser_ja4s, rng):
"""Generate legitimate browser sessions with realistic navigation.
Each IP gets a consistent browser profile (family, JA4, UA, TCP) and
produces page navigations + asset/API requests with Referer chains,
cookies, Sec-Fetch headers, and Sec-CH-UA for Chromium browsers.
"""
rows = []
ips = _gen_ips_from_prefixes(ISP_PREFIXES, n_ips, rng)
counts = _distribute_requests(len(ips), total_rows, 5, 50, rng)
families = (list(browser_ja4s.keys())
if browser_ja4s else list(BROWSER_UAS.keys()))
family_weights = [BROWSER_WEIGHTS.get(f, 1) for f in families]
platform_map = {
"windows": '"Windows"', "linux": '"Linux"',
"macos": '"macOS"', "android": '"Android"',
}
for ip, n_req in zip(ips, counts):
family = rng.choices(families, weights=family_weights, k=1)[0]
ja4_list = browser_ja4s.get(family, [])
if not ja4_list:
for fb in ("Chromium", "Firefox", "Safari"):
ja4_list = browser_ja4s.get(fb, [])
if ja4_list:
break
if not ja4_list:
ja4_list = ["t13d1917h2_b0372614b25a_6a77dcf5a8be"]
ja4 = rng.choice(ja4_list)
ua = rng.choice(BROWSER_UAS.get(family, BROWSER_UAS["Chromium"]))
tcp = rng.choice(BROWSER_TCP.get(family, ["linux"]))
host = rng.choice(HOSTS)
lang = rng.choice(ACCEPT_LANGUAGES)
is_chromium = family in CHROMIUM_FAMILIES
session_cookie = f"sid={rng.randint(100000, 999999)}"
ch_ua = SEC_CH_UA.get(family, "") if is_chromium else ""
ch_mobile = "?0" if is_chromium else ""
ch_plat = platform_map.get(tcp, "") if is_chromium else ""
base_offset = rng.randint(60, 1700)
current_page = ""
for req_i in range(n_req):
offset = max(1, base_offset - req_i * rng.randint(1, 5))
if req_i == 0 or rng.random() < 0.25:
# Navigate to a new page
path = rng.choice(PAGE_PATHS)
method = "GET"
referer = ("" if req_i == 0
else f"https://{host}{current_page}")
sec_site = "none" if req_i == 0 else "same-origin"
sec_mode = "navigate"
sec_dest = "document"
current_page = path
elif rng.random() < 0.5:
# Asset request
path = rng.choice(ASSET_PATHS)
method = "GET"
referer = (f"https://{host}{current_page}"
if current_page else "")
sec_site = "same-origin"
sec_mode = "no-cors"
sec_dest = rng.choice(["script", "style", "image", "font"])
elif rng.random() < 0.4:
# API call
path = rng.choice(API_PATHS)
method = rng.choice(["GET", "POST"])
referer = (f"https://{host}{current_page}"
if current_page else "")
sec_site = "same-origin"
sec_mode = "cors"
sec_dest = "empty"
else:
# Another page navigation
path = rng.choice(PAGE_PATHS)
method = "GET"
referer = (f"https://{host}{current_page}"
if current_page else "")
sec_site = "same-origin"
sec_mode = "navigate"
sec_dest = "document"
current_page = path
row = _make_row(
src_ip=ip, ua=ua, path=path, method=method,
ja4=ja4, tcp_profile=tcp, scheme="https", host=host,
time_offset_s=offset, rng=rng,
)
row["header_Accept-Language"] = lang
row["header_Sec-Fetch-Site"] = sec_site
row["header_Sec-Fetch-Mode"] = sec_mode
row["header_Sec-Fetch-Dest"] = sec_dest
row["header_Sec-CH-UA"] = ch_ua
row["header_Sec-CH-UA-Mobile"] = ch_mobile
row["header_Sec-CH-UA-Platform"] = ch_plat
if referer:
row["header_Referer"] = referer
if "Referer" not in row["client_headers"]:
row["client_headers"] += ",Referer"
if req_i > 0:
row["header_Cookie"] = session_cookie
if "Cookie" not in row["client_headers"]:
row["client_headers"] += ",Cookie"
rows.append(row)
return rows
def generate_scanner_traffic(n_ips, total_rows, bot_ja4s, rng):
"""Generate malicious bot/scanner traffic in four sub-categories.
a. Vulnerability scanners (30%): diverse attack paths
b. Credential stuffers (20%): POST /login brute-force
c. Content scrapers (30%): methodical path crawling
d. DDoS-like (20%): same path hammered 100+ times
"""
rows = []
ips = _gen_ips_from_prefixes(DC_SCANNER_PREFIXES, n_ips, rng)
n_vuln = int(n_ips * 0.30)
n_cred = int(n_ips * 0.20)
n_scrap = int(n_ips * 0.30)
n_ddos = n_ips - n_vuln - n_cred - n_scrap
# Compute row budgets respecting per-IP minimums
min_vuln = n_vuln * 10
min_cred = n_cred * 15
min_scrap = n_scrap * 10
min_ddos = n_ddos * 100
total_min = min_vuln + min_cred + min_scrap + min_ddos
surplus = max(0, total_rows - total_min)
r_ddos = min_ddos + int(surplus * 0.40)
r_vuln = min_vuln + int(surplus * 0.20)
r_cred = min_cred + int(surplus * 0.15)
r_scrap = total_rows - r_ddos - r_vuln - r_cred
idx = 0
# --- Vulnerability scanners ---
vuln_ips = ips[idx:idx + n_vuln]; idx += n_vuln
counts = _distribute_requests(len(vuln_ips), r_vuln, 10, 60, rng)
for ip, n_req in zip(vuln_ips, counts):
ua = rng.choice(SCANNER_UAS)
ja4 = rng.choice(bot_ja4s)
rotate_ua = rng.random() < 0.3
for _ in range(n_req):
if rotate_ua:
ua = rng.choice(SCANNER_UAS)
rows.append(_make_row(
src_ip=ip, ua=ua, path=rng.choice(ATTACK_PATHS),
method=rng.choice(["GET", "GET", "HEAD"]),
ja4=ja4, tcp_profile="scanner", scheme="https",
host=rng.choice(HOSTS), rng=rng,
))
# Some vuln scanners do port probing (diverse dst_port)
for ip in rng.sample(vuln_ips, min(100, len(vuln_ips))):
for port in rng.sample(range(80, 10000), 5):
row = _make_row(
src_ip=ip, ua="-", path="/",
method="GET", ja4=rng.choice(bot_ja4s),
tcp_profile="scanner", scheme="https",
host=rng.choice(HOSTS), rng=rng,
)
row["dst_port"] = port
rows.append(row)
# --- Credential stuffers ---
login_paths = [
"/login", "/admin/login", "/api/auth/login",
"/wp-login.php", "/user/signin", "/api/v1/auth",
]
cred_ips = ips[idx:idx + n_cred]; idx += n_cred
counts = _distribute_requests(len(cred_ips), r_cred, 15, 60, rng)
for ip, n_req in zip(cred_ips, counts):
ua = rng.choice(SCANNER_UAS + HEADLESS_UAS)
ja4 = rng.choice(bot_ja4s)
target = rng.choice(login_paths)
for _ in range(n_req):
rows.append(_make_row(
src_ip=ip, ua=ua, path=target, method="POST",
ja4=ja4, tcp_profile="scanner", scheme="https",
host=rng.choice(HOSTS[:2]), rng=rng,
extra_headers={
"Content-Type": "application/x-www-form-urlencoded",
"Content-Length": str(rng.randint(20, 60)),
},
))
# --- Content scrapers ---
scrap_ips = ips[idx:idx + n_scrap]; idx += n_scrap
counts = _distribute_requests(len(scrap_ips), r_scrap, 10, 60, rng)
for ip, n_req in zip(scrap_ips, counts):
ua = rng.choice(SCANNER_UAS[:8])
ja4 = rng.choice(bot_ja4s)
for i in range(n_req):
rows.append(_make_row(
src_ip=ip, ua=ua,
path=SCRAPER_PATHS[i % len(SCRAPER_PATHS)],
method="GET", ja4=ja4, tcp_profile="scanner",
scheme="https", host=rng.choice(HOSTS), rng=rng,
))
# --- DDoS-like ---
ddos_targets = ["/", "/api/v1/search", "/products", "/api/v2/metrics"]
ddos_ips = ips[idx:idx + n_ddos]
counts = _distribute_requests(len(ddos_ips), r_ddos, 100, 200, rng)
for ip, n_req in zip(ddos_ips, counts):
ua = rng.choice(SCANNER_UAS)
ja4 = rng.choice(bot_ja4s)
target = rng.choice(ddos_targets)
for _ in range(n_req):
rows.append(_make_row(
src_ip=ip, ua=ua, path=target, method="GET",
ja4=ja4,
tcp_profile=rng.choice(["scanner", "minimal"]),
scheme="https", host=HOSTS[0],
time_offset_s=rng.randint(0, 300), rng=rng,
))
return rows
def generate_legit_bot_traffic(n_ips, total_rows, bot_ips_by_family,
bot_ja4s, rng):
"""Generate traffic from known legitimate bots (Googlebot, Bingbot, …).
IPs are drawn from bot_ip.csv CIDRs when available so they match
the bot_ip dictionary for direct labelling.
"""
rows = []
legit_families = list(LEGIT_BOT_UAS.keys())
all_ips = []
ip_family_map = {}
per_family = max(1, n_ips // len(legit_families))
for family in legit_families:
cidrs = bot_ips_by_family.get(family, [])
if cidrs:
fam_ips = _ips_from_cidrs(cidrs, per_family, rng)
else:
fam_ips = _gen_ips_from_prefixes(
DC_LEGIT_BOT_PREFIXES, per_family, rng)
for ip in fam_ips:
ip_family_map[ip] = family
all_ips.extend(fam_ips)
if len(all_ips) < n_ips:
extra = _gen_ips_from_prefixes(
DC_LEGIT_BOT_PREFIXES, n_ips - len(all_ips), rng)
for ip in extra:
ip_family_map[ip] = rng.choice(legit_families)
all_ips.extend(extra)
all_ips = all_ips[:n_ips]
counts = _distribute_requests(len(all_ips), total_rows, 5, 30, rng)
for ip, n_req in zip(all_ips, counts):
family = ip_family_map.get(ip, rng.choice(legit_families))
ua = rng.choice(
LEGIT_BOT_UAS.get(family, LEGIT_BOT_UAS["Googlebot"]))
ja4 = rng.choice(bot_ja4s)
# Bots: robots.txt first, then sitemap, then content
paths = ["/robots.txt"]
if rng.random() < 0.7:
paths.append("/sitemap.xml")
remaining_n = n_req - len(paths)
paths.extend(
rng.choices(CONTENT_PATHS + BOT_PATHS, k=max(0, remaining_n)))
paths = paths[:n_req]
for path in paths:
rows.append(_make_row(
src_ip=ip, ua=ua, path=path, method="GET",
ja4=ja4,
tcp_profile=rng.choice(["linux", "linux", "scanner"]),
scheme="https", host=rng.choice(HOSTS), rng=rng,
))
return rows
def generate_ai_bot_traffic(n_ips, total_rows, bot_ips_by_family,
bot_ja4s, rng):
"""Generate aggressive AI bot scraping traffic (GPTBot, ClaudeBot, …)."""
rows = []
ai_families = list(AI_BOT_UAS.keys())
all_ips = []
ip_family_map = {}
per_family = max(1, n_ips // len(ai_families))
for family in ai_families:
cidrs = bot_ips_by_family.get(family, [])
if cidrs:
fam_ips = _ips_from_cidrs(cidrs, per_family, rng)
else:
fam_ips = _gen_ips_from_prefixes(
DC_AI_PREFIXES, per_family, rng)
for ip in fam_ips:
ip_family_map[ip] = family
all_ips.extend(fam_ips)
if len(all_ips) < n_ips:
extra = _gen_ips_from_prefixes(
DC_AI_PREFIXES, n_ips - len(all_ips), rng)
for ip in extra:
ip_family_map[ip] = rng.choice(ai_families)
all_ips.extend(extra)
all_ips = all_ips[:n_ips]
counts = _distribute_requests(len(all_ips), total_rows, 10, 50, rng)
for ip, n_req in zip(all_ips, counts):
family = ip_family_map.get(ip, rng.choice(ai_families))
ua = rng.choice(AI_BOT_UAS[family])
ja4 = rng.choice(bot_ja4s)
paths = rng.choices(CONTENT_PATHS, k=n_req)
for path in paths:
rows.append(_make_row(
src_ip=ip, ua=ua, path=path, method="GET",
ja4=ja4, tcp_profile="linux", scheme="https",
host=rng.choice(HOSTS), rng=rng,
))
return rows
# ---------------------------------------------------------------------------
# ClickHouse insert
# ---------------------------------------------------------------------------
def _ch_insert(rows, host, port, user, password,
batch_size=2000, dry_run=False):
"""Insert rows into ja4_logs.http_logs_raw via ClickHouse HTTP interface.
Each row is wrapped as {"raw_json": "<escaped_json>"} in JSONEachRow format.
"""
if dry_run:
print(f"[seed] DRY-RUN — would insert {len(rows)} rows")
print("[seed] Sample row:", json.dumps(rows[0], indent=2)[:400])
return len(rows)
query = "INSERT INTO ja4_logs.http_logs_raw (raw_json) FORMAT JSONEachRow"
url = (
f"http://{host}:{port}/"
f"?query={urllib.parse.quote(query)}"
f"&user={urllib.parse.quote(user)}"
f"&password={urllib.parse.quote(password)}"
)
total_inserted = 0
n_batches = (len(rows) + batch_size - 1) // batch_size
for i in range(0, len(rows), batch_size):
batch = rows[i:i + batch_size]
body_lines = []
for row in batch:
outer = {"raw_json": json.dumps(row, separators=(",", ":"))}
body_lines.append(json.dumps(outer, separators=(",", ":")))
body = "\n".join(body_lines).encode("utf-8")
req = urllib.request.Request(
url, data=body, method="POST",
headers={"Content-Type": "application/x-ndjson; charset=utf-8"},
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
resp.read()
total_inserted += len(batch)
except urllib.error.HTTPError as e:
err_body = e.read(500).decode("utf-8", errors="replace")
print(f"[seed] ERROR batch {i}{i+batch_size}: "
f"HTTP {e.code}: {err_body}")
except Exception as e:
print(f"[seed] ERROR batch {i}{i+batch_size}: {e}")
if total_inserted % 10000 < batch_size:
batch_num = i // batch_size + 1
print(f"[seed] Progress: {total_inserted:,}/{len(rows):,} rows "
f"({batch_num}/{n_batches} batches)")
return total_inserted
def _wait_for_clickhouse(host, port, user, password, timeout_s=60):
"""Wait for ClickHouse to be ready."""
url = (
f"http://{host}:{port}/"
f"?query=SELECT+1"
f"&user={urllib.parse.quote(user)}"
f"&password={urllib.parse.quote(password)}"
)
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
try:
with urllib.request.urlopen(url, timeout=5) as r:
if r.read().strip() == b"1":
return True
except Exception:
pass
time.sleep(2)
return False
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Seed ClickHouse with synthetic traffic")
parser.add_argument("--host", default="clickhouse")
parser.add_argument("--port", type=int, default=8123)
parser.add_argument("--user", default="default")
parser.add_argument("--password", default="")
parser.add_argument("--dry-run", action="store_true",
help="Generate data but do not insert")
parser.add_argument("--rows", type=int, default=500_000,
help="Total rows to generate (default: 500000)")
parser.add_argument("--ips", type=int, default=20_000,
help="Total unique IPs (default: 20000)")
parser.add_argument("--seed", type=int, default=None,
help="Random seed for reproducibility")
parser.add_argument("--data-dir", default=None,
help="Path to CSV data directory "
"(browser_ja4.csv, bot_ip.csv)")
args = parser.parse_args()
rng = random.Random(args.seed)
if not args.dry_run:
print(f"[seed] Waiting for ClickHouse at {args.host}:{args.port}")
if not _wait_for_clickhouse(args.host, args.port,
args.user, args.password):
print("[seed] FATAL: ClickHouse not reachable after 60s")
raise SystemExit(1)
print("[seed] ClickHouse ready.")
t0 = time.monotonic()
# --- Load CSV data ---
data_dir = _find_data_dir(args.data_dir)
if data_dir:
print(f"[seed] Loading CSV data from {data_dir}")
else:
print("[seed] WARNING: CSV data directory not found, "
"using fallback data")
browser_ja4s = load_browser_ja4s(data_dir)
bot_ips_by_family = load_bot_ips(data_dir)
browser_ja4_set = set()
for ja4_list in browser_ja4s.values():
browser_ja4_set.update(ja4_list)
if browser_ja4s:
total_ja4 = sum(len(v) for v in browser_ja4s.values())
print(f"[seed] Loaded {total_ja4} browser JA4s "
f"across {len(browser_ja4s)} families")
if bot_ips_by_family:
total_cidrs = sum(len(v) for v in bot_ips_by_family.values())
print(f"[seed] Loaded {total_cidrs} bot CIDRs "
f"across {len(bot_ips_by_family)} families")
bot_ja4s = _generate_bot_ja4s(browser_ja4_set, rng)
# --- IP and row budgets ---
n_browser_ips = int(args.ips * 0.70)
n_scanner_ips = int(args.ips * 0.15)
n_legit_bot_ips = int(args.ips * 0.10)
n_ai_bot_ips = (args.ips - n_browser_ips
- n_scanner_ips - n_legit_bot_ips)
n_browser_rows = int(args.rows * 0.70)
n_scanner_rows = int(args.rows * 0.20)
n_legit_bot_rows = int(args.rows * 0.06)
n_ai_bot_rows = (args.rows - n_browser_rows
- n_scanner_rows - n_legit_bot_rows)
print(f"[seed] Generating {args.rows:,} rows from {args.ips:,} IPs…")
# --- Generate ---
browser_rows = generate_browser_traffic(
n_browser_ips, n_browser_rows, browser_ja4s, rng)
print(f"[seed] Browser: {len(browser_rows):>7,} rows "
f"({len(set(r['src_ip'] for r in browser_rows)):,} IPs)")
scanner_rows = generate_scanner_traffic(
n_scanner_ips, n_scanner_rows, bot_ja4s, rng)
print(f"[seed] Scanner: {len(scanner_rows):>7,} rows "
f"({len(set(r['src_ip'] for r in scanner_rows)):,} IPs)")
legit_bot_rows = generate_legit_bot_traffic(
n_legit_bot_ips, n_legit_bot_rows,
bot_ips_by_family, bot_ja4s, rng)
print(f"[seed] Legit bots: {len(legit_bot_rows):>7,} rows "
f"({len(set(r['src_ip'] for r in legit_bot_rows)):,} IPs)")
ai_bot_rows = generate_ai_bot_traffic(
n_ai_bot_ips, n_ai_bot_rows,
bot_ips_by_family, bot_ja4s, rng)
print(f"[seed] AI bots: {len(ai_bot_rows):>7,} rows "
f"({len(set(r['src_ip'] for r in ai_bot_rows)):,} IPs)")
all_rows = browser_rows + scanner_rows + legit_bot_rows + ai_bot_rows
rng.shuffle(all_rows)
gen_elapsed = time.monotonic() - t0
print(f"[seed] Generated {len(all_rows):,} total rows "
f"in {gen_elapsed:.1f}s")
# --- Insert ---
inserted = _ch_insert(
all_rows, args.host, args.port, args.user, args.password,
batch_size=2000, dry_run=args.dry_run,
)
elapsed = time.monotonic() - t0
print(f"[seed] Done: {inserted:,}/{len(all_rows):,} rows inserted "
f"in {elapsed:.1f}s")
if inserted < len(all_rows) * 0.9:
print("[seed] WARNING: fewer than 90% of rows inserted — "
"check errors above")
raise SystemExit(1)
print(f"[seed] The bot_detector should now see ≥ 500 human sessions "
f"in view_ai_features_1h (after MV propagation).")
if __name__ == "__main__":
main()