feat: browser JA4 detection, Anubis bot rules, worldwide ASN data

- Add generate_browser_ja4.py: 1,186 browser JA4 fingerprints from FoxIO + ja4db.com
  covering 11 families (Chromium, Firefox, Safari, Edge, Tor, Opera, Vivaldi...)
- Rewrite generate_bot_ip.py: Anubis YAML rules (Google, Bing, Apple, DuckDuck,
  OpenAI, Perplexity bots) + Tor exit nodes + cloud scanner IPs (3,555 entries)
- Rewrite generate_asn_data.py: worldwide iptoasn.com data (78,049 ASNs, 714K CIDRs)
- Add dict_browser_ja4 ClickHouse dictionary + browser_family in AI features views
- Add /api/browsers dashboard endpoint
- Fix CSV quoting for fields containing commas (User-Agent strings)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-08 15:27:37 +02:00
parent b6184e6529
commit 7d09c614c3
15 changed files with 885900 additions and 3151 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

1186
scripts/data/browser_ja4.csv Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,154 +1,472 @@
#!/usr/bin/env python3
"""
generate_asn_data.py — Generate ASN reputation + IP-to-ASN lookup CSVs.
generate_asn_data.py — Download public IPtoASN database and generate:
1. asn_reputation.csv — ASN number + reputation label (no header)
2. iplocate-ip-to-asn.csv — CIDR,ASN,country,AS name (no header)
Sources:
• RIPE NCC, ARIN, APNIC ASN registries (well-known allocations)
• DataCenter ASN lists from ipinfo.io and bgp.he.net
• Manual curation of hosting/cloud/residential ISP ASNs
Data source: https://iptoasn.com/data/ip2asn-v4.tsv.gz (free, no registration)
Outputs:
asn_reputation.csv: src_asn,label
iplocate-ip-to-asn.csv: network,asn,country_code,name,org,domain
Usage:
python3 generate_asn_data.py --output-dir .
python3 generate_asn_data.py --output-dir . --no-download # reuse cached TSV
"""
import argparse
import csv
import gzip
import ipaddress
import io
import os
import sys
import urllib.request
# --- ASN Classifications ---
# Each entry: (asn, label, country, name, org, domain, networks[])
ASN_DATABASE = [
# ========================= RESIDENTIAL ISPs (human) =========================
IPTOASN_URL = "https://iptoasn.com/data/ip2asn-v4.tsv.gz"
CACHED_TSV = "ip2asn-v4.tsv"
# ---------------------------------------------------------------------------
# Hard-coded well-known ASN lists for accurate classification
# ---------------------------------------------------------------------------
KNOWN_HUMAN_ASNS: set[int] = {
# France
(16276, "human", "FR", "OVH SAS", "OVH", "ovh.com",
["91.121.0.0/16", "151.80.0.0/16", "137.74.0.0/16", "5.196.0.0/16", "54.36.0.0/16"]),
(15557, "human", "FR", "SFR SA", "SFR", "sfr.com",
["78.41.0.0/16", "90.28.0.0/14", "109.0.0.0/14"]),
(3215, "human", "FR", "Orange SA", "Orange", "orange.fr",
["90.0.0.0/8", "86.192.0.0/11", "81.48.0.0/14"]),
(12322, "human", "FR", "Free SAS", "Free", "free.fr",
["82.64.0.0/14", "78.220.0.0/14", "88.120.0.0/13"]),
3215, 12322, 15557, 5410, 6799, 29169, 2027,
# Germany
(5432, "human", "DE", "Deutsche Telekom AG", "Telekom", "telekom.de",
["212.0.0.0/8", "91.64.0.0/14", "2.200.0.0/14"]),
(3320, "human", "DE", "Deutsche Telekom DTAG", "DTAG", "telekom.de",
["80.128.0.0/11"]),
(6805, "human", "DE", "Telefonica Germany", "O2", "o2online.de",
["176.0.0.0/12"]),
# Netherlands
(1136, "human", "NL", "KPN Internet BV", "KPN", "kpn.com",
["84.116.0.0/16", "145.90.0.0/16"]),
(1103, "human", "NL", "SURF", "SURFnet", "surf.nl",
["145.0.0.0/16"]),
3320, 5432, 6805, 6830, 8422, 31334, 8881, 9145,
# UK
(2856, "human", "GB", "BT Group plc", "BT", "bt.com",
["77.108.0.0/16", "81.128.0.0/11", "86.128.0.0/11"]),
(8913, "human", "GB", "Virgin Media", "Virgin Media", "virginmedia.com",
["82.45.0.0/16", "86.0.0.0/11"]),
(5607, "human", "GB", "Sky UK Limited", "Sky", "sky.com",
["90.192.0.0/11", "151.224.0.0/13"]),
2856, 5607, 8913, 6871, 13285, 20712, 25577,
# Netherlands
1136, 15542, 33915, 50266, 15435,
# Spain
(3352, "human", "ES", "Telefonica Spain", "Telefonica", "telefonica.es",
["62.98.0.0/16", "80.24.0.0/14", "83.32.0.0/11"]),
3352, 12357, 12715, 12479, 12338,
# Italy
(3269, "human", "IT", "Telecom Italia", "TIM", "telecomitalia.it",
["79.0.0.0/12", "82.48.0.0/12"]),
# US residential
(7922, "human", "US", "Comcast Cable", "Comcast", "comcast.net",
["50.128.0.0/9", "73.0.0.0/8", "75.64.0.0/13"]),
(7018, "human", "US", "AT&T Services", "AT&T", "att.com",
["12.0.0.0/8", "32.0.0.0/11"]),
(701, "human", "US", "Verizon Business", "Verizon", "verizon.com",
["71.160.0.0/11", "74.64.0.0/11"]),
(20115, "human", "US", "Charter Communications", "Spectrum", "charter.com",
["24.16.0.0/13", "65.32.0.0/11"]),
3269, 12874, 30722, 1267, 12797,
# US — residential / consumer ISPs
7922, 7018, 701, 20115, 209, 6389, 22773, 10796, 11351, 11427,
11426, 20001, 22394, 5650, 6128, 10507, 12271, 19108, 26801,
33363, 33588, 33651, 33652, 33657, 33659, 33660, 33662, 33668,
7843, 11025, 12083, 20057, 23005, 26827, 33491, 33650, 5769,
6167, 11404, 14265, 21508, 22561, 30036,
# Canada
577, 812, 6327, 852, 855, 6539, 21949, 5645, 6453,
# Japan
(2516, "human", "JP", "KDDI Corporation", "KDDI", "kddi.com",
["106.128.0.0/10", "111.86.0.0/15"]),
(4713, "human", "JP", "NTT Communications", "OCN", "ntt.com",
["114.144.0.0/14", "118.238.0.0/15"]),
2516, 4713, 17676, 2519, 2497, 9605, 4685, 7679, 9824,
# South Korea
4766, 9318, 3786, 38091, 9848, 4659,
# Australia
1221, 4764, 4804, 7545, 9443, 18291, 24429,
# India
9829, 45609, 55836, 24560, 17488, 9498, 18101, 45820,
# Brazil
4230, 7738, 8167, 16735, 26599, 27699, 28573, 53006, 53089,
18881, 22085, 28343,
# Russia
8359, 12389, 25513, 8402, 3216, 31133, 42610,
# China
4134, 4837, 4808, 4812, 9808, 9394, 56040, 56041, 56042,
# Mexico
8151, 6503, 11888, 17072, 32098,
# Turkey
9121, 34984, 47331, 16135,
# Poland
5617, 12912, 6830, 5588, 8374, 21021,
# Sweden / Nordics
3301, 1257, 2119, 8473, 12552, 44034, 2116, 29518,
# Switzerland
3303, 6830,
# Belgium
5432, 6848, 12392,
# Portugal
3243, 2860, 8657,
# Ireland
5466, 15502, 6830,
# Southeast Asia
4773, 7552, 45899, 9299, 4818, 18403, 17974, 23969, 9534,
24203, 7470,
# Middle East
5384, 8781, 39891, 42961, 12880, 44244, 50710,
# Africa
36903, 37105, 36874, 36992, 37453, 29571, 33771, 37492,
# Argentina
7303, 10318, 11664, 22927,
# Colombia
10620, 13489, 14080, 27831,
# New Zealand
9790, 4771, 24127, 9500,
}
# ========================= SEARCH ENGINES (human) =========================
(15169, "human", "US", "Google LLC", "Google", "google.com",
["66.249.64.0/19", "64.233.160.0/19", "72.14.192.0/18"]),
(8075, "human", "US", "Microsoft Corporation", "Bing", "microsoft.com",
["157.55.0.0/16", "207.46.0.0/16", "40.76.0.0/14"]),
(32934, "human", "US", "Facebook Inc", "Meta", "facebook.com",
["69.63.176.0/20", "66.220.144.0/20", "31.13.24.0/21"]),
(13414, "human", "US", "Twitter Inc", "Twitter", "twitter.com",
["199.59.148.0/22", "199.16.156.0/22"]),
KNOWN_DATACENTER_ASNS: set[int] = {
# AWS
16509, 14618, 7224,
# Google Cloud
396982, 36492, 36384, 15169,
# Microsoft Azure
8075, 8068, 8069, 12076,
# Oracle Cloud
31898,
# IBM Cloud / SoftLayer
36351,
# Alibaba Cloud
45102,
# Tencent Cloud
132203,
# OVH / OVHcloud
16276,
# Hetzner
24940, 213230,
# DigitalOcean
14061, 393406,
# Linode / Akamai Connected Cloud
63949, 22040,
# Vultr
20473,
# Scaleway / Online SAS
12876,
# Contabo
209083, 40021,
# IONOS / 1&1
8560,
# Rackspace
33070, 19994, 27357,
# Equinix Metal
54825,
# Kamatera
36007,
# UpCloud
202053,
# Cherry Servers
59642,
# Leaseweb
28753, 60781,
# Psychz Networks
40676,
# ColoCrossing
36352,
# QuadraNet
8100,
# Choopa (Vultr subsidiary)
20473,
# Zenlayer
21859,
}
# ========================= DATACENTER / SCANNER =========================
(210644, "datacenter", "NL", "Accelerated-IT Services", "Tor Project", "tor-project.org",
["185.220.100.0/22", "185.220.101.0/24", "185.220.102.0/24"]),
(209083, "datacenter", "DE", "Contabo GmbH", "Contabo", "contabo.de",
["45.155.205.0/24", "62.171.128.0/17", "5.161.0.0/16"]),
(14061, "datacenter", "US", "DigitalOcean LLC", "DigitalOcean", "digitalocean.com",
["64.225.0.0/16", "104.131.0.0/16", "138.197.0.0/16", "159.65.0.0/16"]),
(16509, "datacenter", "US", "Amazon.com ARIN", "AWS", "amazonaws.com",
["3.0.0.0/8", "18.0.0.0/8", "52.0.0.0/8", "54.0.0.0/8"]),
(396982, "datacenter", "US", "Google Cloud", "GCP", "cloud.google.com",
["34.0.0.0/8", "35.184.0.0/13"]),
(8560, "datacenter", "DE", "IONOS SE", "IONOS", "ionos.com",
["74.208.0.0/16", "212.227.0.0/16"]),
(24940, "datacenter", "DE", "Hetzner Online GmbH", "Hetzner", "hetzner.com",
["136.243.0.0/16", "138.201.0.0/16", "144.76.0.0/16", "178.63.0.0/16"]),
(20473, "datacenter", "US", "The Constant Company", "Vultr", "vultr.com",
["45.32.0.0/16", "64.237.32.0/19", "108.61.0.0/16"]),
(63949, "datacenter", "US", "Linode LLC", "Linode", "linode.com",
["45.33.0.0/17", "45.56.0.0/16", "50.116.0.0/18"]),
(13335, "datacenter", "US", "Cloudflare Inc", "Cloudflare", "cloudflare.com",
["104.16.0.0/12", "172.64.0.0/13", "162.158.0.0/15"]),
KNOWN_CDN_ASNS: set[int] = {
13335, # Cloudflare
20940, # Akamai
54113, # Fastly
15133, # Edgecast / Verizon Digital Media / Edgio
16625, # Akamai
22822, # Limelight Networks
30148, # Sucuri
209242, # Cloudflare WARP
132892, # Cloudflare APAC
202623, # StackPath
35994, # Akamai
23454, # Akamai
23455, # Akamai
6185, # Apple CDN (via Akamai)
714, # Apple
2906, # Netflix Open Connect
36183, # Netflix
40027, # Netflix
55095, # Cloudflare CN
394536, # Fastly
395973, # Fastly
}
# ========================= HOSTING =========================
(197695, "hosting", "RU", "Reg.ru Hosting", "Reg.ru", "reg.ru",
["193.32.162.0/24", "194.58.92.0/22"]),
(51167, "hosting", "DE", "Contabo GmbH", "Contabo Hosting", "contabo.de",
["78.46.0.0/15"]),
(46606, "hosting", "US", "Unified Layer", "Bluehost", "bluehost.com",
["162.241.0.0/16", "198.57.128.0/17"]),
(26496, "hosting", "US", "GoDaddy.com", "GoDaddy", "godaddy.com",
["184.168.0.0/16", "198.71.128.0/17"]),
KNOWN_HOSTING_ASNS: set[int] = {
26496, # GoDaddy
46606, # Unified Layer / Bluehost
197695, # Reg.ru
47583, # Hostinger
32244, # Liquid Web
19871, # Network Solutions
40034, # Confluence Networks
26347, # DreamHost
36351, # SoftLayer (also datacenter, but historically hosting)
29802, # HIVELOCITY
36024, # HostNOC
21844, # ThePlanet / SoftLayer
33182, # HostDime
32475, # SingleHop
18450, # WebNX
53831, # Squarespace
15830, # Telecity / Equinix EU
25369, # Hydra Communications
46475, # Limestone Networks
13768, # Peer 1 Network
29854, # Westhost
36137, # ServerMania
20454, # SecuredServers
62567, # DigitalOcean (hosting arm)
398101, # GoDaddy hosting
19969, # Joe's Datacenter
395003, # WPEngine
}
# ---------------------------------------------------------------------------
# Keyword-based heuristic classification
# ---------------------------------------------------------------------------
HUMAN_KEYWORDS = [
"telecom", "telcom", "telekom", "telefonica", "telecomunicacoes",
"mobile", "broadband", "fiber", "fibre", "cable", "wireless",
"residential", "communications", "comm ", " isp", "netcom",
"internet service", "subscriber", "dsl", "adsl", "vdsl", "ftth",
"fttb", "dial-up", "dialup", "cellular", "lte", "5g network",
"wimax", "satellite", " tel ", "telco", "ptcl", "bsnl",
"airtel", "jio", "reliance", "vodafone", "orange", "bouygues",
"proximus", "swisscom", "telenor", "telia", "elisa", "dna oy",
"rogers", "bell canada", "shaw", "telus", "optus", "tpg ",
"internode", "comcast", "charter", "spectrum", "cox comm",
"verizon", "at&t", "centurylink", "lumen", "frontier comm",
"mediacom", "windstream", "consolidated comm", "t-mobile",
"sprint", "cricket", "boost mobile", "virgin media",
"bt group", "sky broadband", "sky uk", "plusnet", "talktalk",
"ee limited", "three uk", "hutchison", "deutsche telekom",
"1&1 versatel", "freenet", "unitymedia", "kabel deutschland",
"kpn", "ziggo", "xs4all", "movistar", "masmovil", "yoigo",
"tim s.p.a", "fastweb", "iliad", "free sas", "sfr",
"numericable", "kddi", "ntt ", "softbank bb", "so-net",
"biglobe", "kt corporation", "sk broadband", "lg uplus",
"china telecom", "china unicom", "china mobile",
"chunghwa telecom", "taiwan mobile", "far eastone",
"pldt", "globe telecom", "true internet", "ais ", "dtac",
"unifi", "maxis", "singtel", "starhub", "m1 limited",
"viettel", "vnpt", "indosat", "telkomsel", "xl axiata",
"turkcell", "turk telekom", "superonline",
"rostelecom", "mts ", "beeline", "megafon",
"etisalat", "du telecom", "stc ", "zain",
"mtn ", "safaricom", "airtel africa", "rain ",
"telmex", "claro", "vivo ", "oi s.a",
]
DATACENTER_KEYWORDS = [
"cloud", "amazon", "aws", "google cloud", "gcp",
"microsoft azure", "azure", "digitalocean", "digital ocean",
"linode", "vultr", "hetzner", "ovhcloud", "ovh sas",
"scaleway", "contabo", "kamatera", "upcloud",
"oracle cloud", "alibaba cloud", "tencent cloud",
"ibm cloud", "softlayer", "rackspace", "equinix",
"leaseweb", "choopa", "data center", "datacenter",
"colocation", "colo ", "baremetal", "bare metal",
"infrastructure", "iaas", "paas",
]
HOSTING_KEYWORDS = [
"hosting", "host ", "hoster", "webhost",
"server farm", "vps", "virtual private",
"dedicated server", "shared hosting", "managed hosting",
"reseller", "cpanel", "plesk", "wordpress host",
"godaddy", "namecheap", "hostinger", "bluehost",
"siteground", "a2 hosting", "dreamhost", "hostgator",
"ionos", "squarespace", "wix", "wpengine",
]
CDN_KEYWORDS = [
"cloudflare", "akamai", "fastly", "cdn ",
"content delivery", "edgecast", "limelight",
"stackpath", "sucuri", "keycdn", "bunnycdn",
"jsdelivr", "cachefly", "imperva", "incapsula",
]
def classify_asn(asn_number: int, as_name: str) -> str:
"""Classify an ASN into a reputation category."""
# Hard-coded lookups first (highest priority)
if asn_number in KNOWN_CDN_ASNS:
return "cdn"
if asn_number in KNOWN_HOSTING_ASNS:
return "hosting"
if asn_number in KNOWN_DATACENTER_ASNS:
return "datacenter"
if asn_number in KNOWN_HUMAN_ASNS:
return "human"
# Keyword heuristics on AS name
name_lower = as_name.lower()
# Skip "Not routed" or reserved
if name_lower in ("not routed", "none", "", "-"):
return "unknown"
# CDN first (most specific)
for kw in CDN_KEYWORDS:
if kw in name_lower:
return "cdn"
# Hosting before datacenter (more specific)
for kw in HOSTING_KEYWORDS:
if kw in name_lower:
return "hosting"
# Datacenter
for kw in DATACENTER_KEYWORDS:
if kw in name_lower:
return "datacenter"
# Human / ISP
for kw in HUMAN_KEYWORDS:
if kw in name_lower:
return "human"
return "unknown"
def ranges_to_cidrs(start_ip: str, end_ip: str):
"""Convert an IP range to a list of CIDR networks."""
try:
first = ipaddress.IPv4Address(start_ip)
last = ipaddress.IPv4Address(end_ip)
if first > last:
return []
return list(ipaddress.summarize_address_range(first, last))
except (ipaddress.AddressValueError, ValueError, TypeError):
return []
def download_iptoasn(output_dir: str) -> str:
"""Download and decompress ip2asn-v4.tsv.gz, return path to TSV."""
tsv_path = os.path.join(output_dir, CACHED_TSV)
print(f"[download] Fetching {IPTOASN_URL} ...")
try:
req = urllib.request.Request(IPTOASN_URL, headers={
"User-Agent": "generate_asn_data/1.0"
})
with urllib.request.urlopen(req, timeout=120) as resp:
compressed = resp.read()
print(f"[download] Downloaded {len(compressed):,} bytes compressed")
raw = gzip.decompress(compressed)
print(f"[download] Decompressed to {len(raw):,} bytes")
with open(tsv_path, "wb") as f:
f.write(raw)
print(f"[download] Saved to {tsv_path}")
except Exception as e:
print(f"[error] Download failed: {e}", file=sys.stderr)
if os.path.exists(tsv_path):
print(f"[info] Using cached {tsv_path}")
else:
print("[fatal] No cached TSV available, aborting.", file=sys.stderr)
sys.exit(1)
return tsv_path
def parse_tsv(tsv_path: str):
"""
Parse ip2asn-v4.tsv.
Format: range_start \\t range_end \\t AS_number \\t country_code \\t AS_description
Returns list of (start_ip, end_ip, asn, country, as_name) tuples.
"""
entries = []
with open(tsv_path, "r", encoding="utf-8") as f:
for line_no, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 5:
continue
start_ip, end_ip, asn_str, country, as_name = (
parts[0], parts[1], parts[2], parts[3], parts[4]
)
try:
asn = int(asn_str)
except ValueError:
continue
if asn == 0:
continue
entries.append((start_ip, end_ip, asn, country, as_name))
return entries
def main():
parser = argparse.ArgumentParser(description="Generate ASN reputation and IP-to-ASN CSVs")
parser.add_argument("--output-asn", default="asn_reputation.csv")
parser.add_argument("--output-ipasn", default="iplocate-ip-to-asn.csv")
parser = argparse.ArgumentParser(
description="Generate ASN reputation and IP-to-ASN CSVs from iptoasn.com data"
)
parser.add_argument(
"--output-dir", default=".",
help="Directory for output CSV files (default: current dir)"
)
parser.add_argument(
"--no-download", action="store_true",
help="Skip download, use cached ip2asn-v4.tsv in output-dir"
)
args = parser.parse_args()
# Generate asn_reputation.csv
seen_asn = set()
with open(args.output_asn, "w") as f:
f.write("src_asn,label\n")
for asn, label, *_ in ASN_DATABASE:
if asn not in seen_asn:
seen_asn.add(asn)
f.write(f"{asn},{label}\n")
os.makedirs(args.output_dir, exist_ok=True)
# Generate iplocate-ip-to-asn.csv
with open(args.output_ipasn, "w") as f:
f.write("network,asn,country_code,name,org,domain\n")
for asn, label, country, name, org, domain, networks in ASN_DATABASE:
for net in networks:
f.write(f"{net},{asn},{country},{name},{org},{domain}\n")
# Step 1: Get TSV data
tsv_path = os.path.join(args.output_dir, CACHED_TSV)
if not args.no_download:
tsv_path = download_iptoasn(args.output_dir)
elif not os.path.exists(tsv_path):
print(f"[fatal] --no-download specified but {tsv_path} not found",
file=sys.stderr)
sys.exit(1)
total_nets = sum(len(entry[6]) for entry in ASN_DATABASE)
human_count = sum(1 for entry in ASN_DATABASE if entry[1] == "human")
dc_count = sum(1 for entry in ASN_DATABASE if entry[1] == "datacenter")
host_count = sum(1 for entry in ASN_DATABASE if entry[1] == "hosting")
# Step 2: Parse
print("[parse] Reading TSV ...")
entries = parse_tsv(tsv_path)
print(f"[parse] {len(entries):,} IP range entries")
print(f"[asn] {len(seen_asn)} unique ASNs: "
f"{human_count} human, {dc_count} datacenter, {host_count} hosting")
print(f"[ipasn] {total_nets} network prefixes mapped")
# Step 3: Build ASN info map {asn: (country, as_name)}
# Use first occurrence for name/country (they're consistent in the dataset)
asn_info: dict[int, tuple[str, str]] = {}
for _, _, asn, country, as_name in entries:
if asn not in asn_info:
asn_info[asn] = (country, as_name)
# Step 4: Classify all ASNs
asn_labels: dict[int, str] = {}
for asn, (country, as_name) in asn_info.items():
asn_labels[asn] = classify_asn(asn, as_name)
# Step 5: Write asn_reputation.csv (sorted by ASN, no header)
rep_path = os.path.join(args.output_dir, "asn_reputation.csv")
sorted_asns = sorted(asn_labels.keys())
with open(rep_path, "w") as f:
for asn in sorted_asns:
f.write(f"{asn},{asn_labels[asn]}\n")
print(f"[output] Wrote {len(sorted_asns):,} ASNs to {rep_path}")
# Step 6: Convert ranges to CIDRs and write iplocate-ip-to-asn.csv
ip_path = os.path.join(args.output_dir, "iplocate-ip-to-asn.csv")
cidr_count = 0
seen_cidrs: set[str] = set()
# Collect all rows, then sort
cidr_rows: list[tuple[ipaddress.IPv4Network, int, str, str]] = []
print("[cidr] Converting IP ranges to CIDR notation ...")
for start_ip, end_ip, asn, country, as_name in entries:
cidrs = ranges_to_cidrs(start_ip, end_ip)
for cidr in cidrs:
key = f"{cidr},{asn}"
if key not in seen_cidrs:
seen_cidrs.add(key)
cidr_rows.append((cidr, asn, country, as_name))
# Sort by network address
cidr_rows.sort(key=lambda r: (r[0].network_address, r[0].prefixlen))
with open(ip_path, "w") as f:
for cidr, asn, country, as_name in cidr_rows:
# Escape commas in AS names
safe_name = as_name.replace(",", " ")
f.write(f"{cidr},{asn},{country},{safe_name}\n")
cidr_count = len(cidr_rows)
print(f"[output] Wrote {cidr_count:,} CIDRs to {ip_path}")
# Step 7: Stats
stats: dict[str, int] = {}
for label in asn_labels.values():
stats[label] = stats.get(label, 0) + 1
print("\n=== Summary ===")
print(f"Total ASNs: {len(sorted_asns):,}")
for label in ["human", "datacenter", "hosting", "cdn", "unknown"]:
count = stats.get(label, 0)
pct = 100.0 * count / len(sorted_asns) if sorted_asns else 0
print(f" {label:12s}: {count:>6,} ({pct:5.1f}%)")
print(f"Total CIDRs: {cidr_count:,}")
if __name__ == "__main__":

View File

@ -1,25 +1,75 @@
#!/usr/bin/env python3
"""
generate_bot_ip.py — Generate bot_ip.csv from known scanner networks + Tor exit nodes.
generate_bot_ip.py — Generate bot_ip.csv from Anubis crawler data, scanner
networks, and Tor exit nodes.
Sources:
Tor exit nodes: downloaded list or hardcoded fallback
• Shodan: known scanner ranges (census.shodan.io, 2024)
• Censys: known scanner ranges (censys.io, 2024)
• Binaryedge, SecurityTrails, ZoomEye, Stretchoid: known ranges
GreyNoise: top mass-scanner IPs (manually curated)
Anubis crawler YAML files (TecharoHQ/anubis on GitHub)
- Search engines: Googlebot, Bingbot, Applebot, DuckDuckBot
- AI crawlers: GPTBot, PerplexityBot
- Cloud scrapers: Alibaba Cloud, Huawei Cloud, Tencent Cloud
Curated scanner networks: Shodan, Censys, BinaryEdge, Stretchoid,
SecurityTrails, ZoomEye, GreyNoise, Shadowserver
• Tor exit nodes: https://check.torproject.org/torbulkexitlist
Anubis UA-only bot references (no IP ranges, documented here for reference):
• ai-catchall.yaml patterns: bytespider, CCBot, ChatGLM-Spider, ClaudeBot,
Diffbot, FacebookBot, Google-Extended, ImagesiftBot, Kangaroo Bot,
Meta-ExternalAgent, PetalBot, Scrapy, Timpibot, VelenPublicWebCrawler,
Webzio-Extended, cohere-ai
• ai-robots-txt.yaml patterns: Amazonbot, anthropic-ai, Applebot-Extended,
Bytespider, CCBot, ChatGPT-User, Claude-Web, cohere-ai, Diffbot,
FacebookBot, Google-Extended, GPTBot, Meta-ExternalAgent, OAI-SearchBot,
omgili, PerplexityBot, Timpibot, YouBot
Output format (no header):
<ip_or_cidr>,<bot_name>
ip_or_cidr,bot_name
Usage:
python3 generate_bot_ip.py --output bot_ip.csv [--no-download]
"""
import argparse
import ipaddress
import os
import sys
import urllib.request
import urllib.error
# --- Known scanner networks (public, well-documented) ---
KNOWN_SCANNERS = {
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
ANUBIS_COMMIT = "dbd64e0f4f23c37476b40b332781c676ffa413ef"
ANUBIS_BASE = (
f"https://raw.githubusercontent.com/TecharoHQ/anubis/{ANUBIS_COMMIT}/data/"
)
# (yaml_path_relative_to_data, bot_name, category)
ANUBIS_CRAWLERS = [
("crawlers/googlebot.yaml", "Googlebot", "search_engine"),
("crawlers/bingbot.yaml", "Bingbot", "search_engine"),
("crawlers/applebot.yaml", "Applebot", "search_engine"),
("crawlers/duckduckbot.yaml", "DuckDuckBot", "search_engine"),
("crawlers/openai-gptbot.yaml", "GPTBot", "ai_crawler"),
("crawlers/perplexitybot.yaml", "PerplexityBot", "ai_crawler"),
("crawlers/alibaba-cloud.yaml", "Alibaba_Cloud_Scraper", "cloud_scraper"),
("crawlers/huawei-cloud.yaml", "Huawei_Cloud_Scraper", "cloud_scraper"),
("crawlers/tencent-cloud.yaml", "Tencent_Cloud_Scraper", "cloud_scraper"),
]
TOR_URL = "https://check.torproject.org/torbulkexitlist"
# Curated scanner networks (category: scanner)
SCANNER_RANGES = {
# Shodan — https://wiki.ipfire.org/dns/public-servers (census.shodan.io)
"Shodan_Scanner": [
"Shodan": [
"66.240.192.0/24", "66.240.205.0/24", "66.240.236.0/24",
"71.6.135.0/24", "71.6.146.0/24", "71.6.158.0/24", "71.6.165.0/24",
"80.82.77.0/24", "80.82.78.0/24",
@ -32,45 +82,45 @@ KNOWN_SCANNERS = {
"209.126.110.0/24",
],
# Censys — https://support.censys.io/hc/en-us/articles/360043177092
"Censys_Scanner": [
"Censys": [
"162.142.125.0/24", "167.248.133.0/24", "167.94.138.0/24",
"167.94.145.0/24", "167.94.146.0/24",
"192.35.168.0/23",
],
# BinaryEdge — https://docs.binaryedge.io/
"BinaryEdge_Scanner": [
"BinaryEdge": [
"154.89.5.0/24",
"45.143.200.0/22",
],
# Stretchoid — persistent scanner botnet
"Stretchoid_Scanner": [
"Stretchoid": [
"198.235.24.0/24",
"205.210.31.0/24",
],
# SecurityTrails (Recorded Future) crawlers
"SecurityTrails_Crawler": [
"SecurityTrails": [
"52.250.0.0/16",
],
# ZoomEye (Knownsec)
"ZoomEye_Scanner": [
"ZoomEye": [
"106.75.0.0/16",
],
# GreyNoise known mass-scanners (individual IPs)
"GreyNoise_MassScanner": [
"GreyNoise": [
"45.155.205.233/32", "45.155.205.220/32", "45.155.205.205/32",
"45.155.205.190/32", "45.155.205.175/32", "45.155.205.160/32",
"45.155.205.146/32", "45.155.205.131/32",
"193.32.162.10/32", "193.32.162.11/32", "193.32.162.25/32",
"193.32.162.30/32", "193.32.162.40/32",
],
# Netlab/Shadowserver known sinkholes used by malware
"Shadowserver_Sinkhole": [
# Netlab/Shadowserver known sinkholes
"Shadowserver": [
"74.82.47.0/24",
"184.105.139.0/24", "184.105.247.0/24",
],
}
# Fallback Tor exit nodes when download unavailable
# Fallback Tor exit nodes when download is unavailable
FALLBACK_TOR_IPS = [
"185.220.101.34", "185.220.101.35", "185.220.101.36", "185.220.101.37",
"185.220.101.38", "185.220.101.39", "185.220.101.40", "185.220.101.41",
@ -111,61 +161,192 @@ FALLBACK_TOR_IPS = [
]
def load_tor_ips(tor_file):
"""Load Tor exit node IPs from downloaded file."""
ips = set()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def fetch_url(url, timeout=30):
"""Download URL content as string. Returns None on failure."""
try:
with open(tor_file) as f:
for line in f:
req = urllib.request.Request(url, headers={"User-Agent": "ja4-bot-ip-gen/1.0"})
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8", errors="replace")
except (urllib.error.URLError, OSError, ValueError) as exc:
print(f" [WARN] Failed to download {url}: {exc}", file=sys.stderr)
return None
def parse_yaml_remote_addresses(text):
"""Extract remote_addresses from an Anubis crawler YAML file.
Uses PyYAML if available, otherwise falls back to simple text parsing
since the YAML structure is predictable: remote_addresses is always a
flat list of CIDR strings.
"""
if HAS_YAML:
docs = yaml.safe_load(text)
addrs = []
if isinstance(docs, list):
for doc in docs:
if isinstance(doc, dict) and "remote_addresses" in doc:
addrs.extend(doc["remote_addresses"])
elif isinstance(docs, dict) and "remote_addresses" in docs:
addrs.extend(docs["remote_addresses"])
return [a.strip() for a in addrs if isinstance(a, str)]
# Fallback: manual parsing
addrs = []
in_block = False
for line in text.splitlines():
stripped = line.strip()
if stripped.startswith("remote_addresses:"):
in_block = True
# Inline list on same line: remote_addresses: [...]
rest = stripped.split(":", 1)[1].strip()
if rest.startswith("["):
for token in rest.strip("[]").split(","):
token = token.strip().strip('"').strip("'")
if token:
addrs.append(token)
if rest.endswith("]"):
in_block = False
continue
if in_block:
if stripped.startswith("- "):
val = stripped[2:].strip().strip('"').strip("'")
if val:
addrs.append(val)
elif stripped.startswith('"') or stripped.startswith("'"):
val = stripped.strip('",').strip("',").strip()
if val:
addrs.append(val)
elif stripped == "]":
in_block = False
elif stripped and not stripped.startswith("#"):
if stripped.startswith("["):
continue
# End of block when hitting a new YAML key
if ":" in stripped and not "/" in stripped:
in_block = False
return addrs
def normalize_cidr(addr):
"""Normalize an IP or CIDR to consistent string form."""
addr = addr.strip()
try:
net = ipaddress.ip_network(addr, strict=False)
return str(net)
except ValueError:
return addr
def sort_key(entry):
"""Sort entries by (bot_name, ip_version, network_address)."""
cidr, bot_name = entry
try:
net = ipaddress.ip_network(cidr, strict=False)
version = net.version
addr_int = int(net.network_address)
except ValueError:
version = 99
addr_int = 0
return (bot_name, version, addr_int)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Generate bot_ip.csv from Anubis crawler data, scanners, and Tor"
)
parser.add_argument("--output", default="bot_ip.csv",
help="Output CSV path (default: bot_ip.csv)")
parser.add_argument("--no-download", action="store_true",
help="Skip all network downloads; use only built-in data")
args = parser.parse_args()
# {cidr: bot_name} for dedup
entries = {}
# {category: count} for stats
category_counts = {}
def add_entry(cidr, bot_name, category):
cidr = normalize_cidr(cidr)
if cidr not in entries:
entries[cidr] = bot_name
category_counts[category] = category_counts.get(category, 0) + 1
# ------------------------------------------------------------------
# 1. Anubis crawler YAML files
# ------------------------------------------------------------------
if not args.no_download:
print("[bot_ip] Downloading Anubis crawler data ...")
for yaml_path, bot_name, category in ANUBIS_CRAWLERS:
url = ANUBIS_BASE + yaml_path
print(f" Fetching {yaml_path} ...", end=" ")
text = fetch_url(url)
if text is None:
print("FAILED")
continue
addrs = parse_yaml_remote_addresses(text)
print(f"{len(addrs)} addresses")
for addr in addrs:
add_entry(addr, bot_name, category)
# ------------------------------------------------------------------
# 2. Curated scanner networks
# ------------------------------------------------------------------
print("[bot_ip] Adding curated scanner networks ...")
for bot_name, networks in SCANNER_RANGES.items():
for net in networks:
add_entry(net, bot_name, "scanner")
# ------------------------------------------------------------------
# 3. Tor exit nodes
# ------------------------------------------------------------------
tor_ips = set()
if not args.no_download:
print("[bot_ip] Downloading Tor exit node list ...")
text = fetch_url(TOR_URL, timeout=30)
if text:
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
try:
ipaddress.ip_address(line)
ips.add(line)
tor_ips.add(line)
except ValueError:
pass
except FileNotFoundError:
pass
return ips
print(f" Downloaded {len(tor_ips)} Tor exit nodes")
def main():
parser = argparse.ArgumentParser(description="Generate bot_ip.csv")
parser.add_argument("--output", default="bot_ip.csv")
parser.add_argument("--tor-file", help="Path to downloaded Tor exit node list")
args = parser.parse_args()
entries = []
seen = set()
# Add known scanner networks
for bot_name, networks in KNOWN_SCANNERS.items():
for net in networks:
key = net
if key not in seen:
seen.add(key)
entries.append((net, bot_name))
# Add Tor exit nodes
if args.tor_file:
tor_ips = load_tor_ips(args.tor_file)
else:
if not tor_ips:
print(" Using fallback Tor exit node list")
tor_ips = set(FALLBACK_TOR_IPS)
for ip in sorted(tor_ips, key=lambda x: ipaddress.ip_address(x)):
key = f"{ip}/32"
if key not in seen:
seen.add(key)
entries.append((key, "Tor_Exit_Node"))
for ip in tor_ips:
add_entry(f"{ip}/32", "tor_exit_node", "anonymizer")
# ------------------------------------------------------------------
# 4. Sort, deduplicate, and write
# ------------------------------------------------------------------
sorted_entries = sorted(entries.items(), key=sort_key)
os.makedirs(os.path.dirname(os.path.abspath(args.output)), exist_ok=True)
with open(args.output, "w") as f:
for net, name in entries:
f.write(f"{net},{name}\n")
for cidr, bot_name in sorted_entries:
f.write(f"{cidr},{bot_name}\n")
print(f"[bot_ip] Generated {len(entries)} entries "
f"({len(tor_ips)} Tor nodes, "
f"{len(entries) - len(tor_ips)} scanner ranges)")
# ------------------------------------------------------------------
# 5. Print stats
# ------------------------------------------------------------------
print(f"\n[bot_ip] Generated {len(sorted_entries)} entries → {args.output}")
print("[bot_ip] Breakdown by category:")
for category in sorted(category_counts):
print(f" {category:20s} {category_counts[category]:>6d}")
if __name__ == "__main__":

View File

@ -0,0 +1,251 @@
#!/usr/bin/env python3
"""
generate_browser_ja4.py — Generate browser_ja4.csv with known browser TLS fingerprints.
Sources:
• FoxIO official ja4plus-mapping.csv:
https://github.com/FoxIO-LLC/ja4/blob/main/ja4plus-mapping.csv
• ja4db.com public API: https://ja4db.com/api/read/
• JA4+ specification: https://github.com/FoxIO-LLC/ja4
JA4 fingerprints identify TLS libraries, not individual browser versions.
All Chromium-based browsers (Chrome, Edge, Brave, Opera, Vivaldi, Samsung Internet)
share BoringSSL and produce identical JA4 fingerprints.
Firefox uses NSS, Safari uses Apple SecureTransport/Network.framework.
Output format (no header):
<ja4_fingerprint>,<browser_family>,<tls_library>,<context>
"""
import argparse
import json
import os
import sys
import urllib.request
# =============================================================================
# Curated browser JA4 fingerprints from FoxIO official mapping
# Format: (ja4, browser_family, tls_library, context)
# =============================================================================
BROWSER_FINGERPRINTS = [
# --- Chromium (Chrome, Edge, Brave, Opera, Vivaldi, Samsung Internet) ---
# BoringSSL — all Chromium-based browsers produce identical JA4
("t13d1516h2_8daaf6152771_02713d6af862", "Chromium",
"BoringSSL", "TCP fresh connection"),
("t13d1517h2_8daaf6152771_b0da82dd1658", "Chromium",
"BoringSSL", "TCP with PSK extension"),
("t13d1517h2_8daaf6152771_b1ff8ab2d16f", "Chromium",
"BoringSSL", "TCP alt extension set"),
("t13i1515h2_8daaf6152771_02713d6af862", "Chromium",
"BoringSSL", "TCP to IP (no SNI)"),
("t13i1516h2_8daaf6152771_b0da82dd1658", "Chromium",
"BoringSSL", "TCP to IP with PSK"),
("t13i1516h2_8daaf6152771_b1ff8ab2d16f", "Chromium",
"BoringSSL", "TCP to IP alt ext"),
("q13d0312h3_55b375c5d22e_06cda9e17597", "Chromium",
"BoringSSL", "QUIC HTTP/3 to domain"),
("q13i0311h3_55b375c5d22e_06cda9e17597", "Chromium",
"BoringSSL", "QUIC HTTP/3 to IP"),
# --- Mozilla Firefox ---
# NSS library
("t13d1715h2_5b57614c22b0_7121afd63204", "Firefox",
"NSS", "TCP to domain"),
("t13i1714h2_5b57614c22b0_7121afd63204", "Firefox",
"NSS", "TCP to IP (no SNI)"),
# Firefox with different extension counts (minor NSS updates)
("t13d1715h2_5b57614c22b0_3c5d18be5765", "Firefox",
"NSS", "TCP domain (NSS variant)"),
("t13d1716h2_5b57614c22b0_7121afd63204", "Firefox",
"NSS", "TCP domain (16 ext)"),
# --- Apple Safari (macOS + iOS) ---
# Apple SecureTransport / Network.framework
("t13d2014h2_a09f3c656075_14788d8d241b", "Safari",
"SecureTransport", "TCP to domain"),
("t13i2013h2_a09f3c656075_14788d8d241b", "Safari",
"SecureTransport", "TCP to IP (no SNI)"),
# Safari variants with different extension hashes
("t13d1714h2_5b57614c22b0_14788d8d241b", "Safari",
"SecureTransport", "Safari/iOS via WebKit (hybrid)"),
# --- Chrome on iOS ---
# Uses Apple's TLS stack, not BoringSSL (Apple enforces this on iOS)
("t13d1714h2_5b57614c22b0_14788d8d241b", "Chrome_iOS",
"SecureTransport", "Chrome on iOS (Apple TLS)"),
# --- Tor Browser ---
# Based on Firefox ESR but with a hardened NSS configuration
("t13d1614h2_5b57614c22b0_7121afd63204", "Tor_Browser",
"NSS", "TCP (hardened Firefox ESR)"),
# --- Common mobile browser patterns ---
# Android WebView uses BoringSSL (same as Chromium)
("t13d1516h2_8daaf6152771_02713d6af862", "Android_WebView",
"BoringSSL", "Android WebView (same as Chromium)"),
# --- Older browser versions (TLS 1.2 fallback) ---
("t12d1516h2_8daaf6152771_02713d6af862", "Chromium_Legacy",
"BoringSSL", "Chromium TLS 1.2 only (old/restricted)"),
("t12d1715h2_5b57614c22b0_7121afd63204", "Firefox_Legacy",
"NSS", "Firefox TLS 1.2 only (old)"),
("t12d2014h2_a09f3c656075_14788d8d241b", "Safari_Legacy",
"SecureTransport", "Safari TLS 1.2 only (old)"),
]
# Unique cipher hashes that identify TLS libraries
CIPHER_HASH_FAMILIES = {
"8daaf6152771": "Chromium/BoringSSL",
"55b375c5d22e": "Chromium/BoringSSL (QUIC)",
"5b57614c22b0": "Firefox/NSS",
"a09f3c656075": "Safari/SecureTransport",
}
JA4DB_URL = "https://ja4db.com/api/read/"
FOXIO_URL = ("https://raw.githubusercontent.com/FoxIO-LLC/ja4/"
"main/ja4plus-mapping.csv")
# Valid JA4 fingerprint: protocol prefix + hex hashes separated by underscores
# Example: t13d1516h2_8daaf6152771_02713d6af862
import re
_JA4_RE = re.compile(r'^[tdq]\d{2}[di]\d{4}[a-z0-9]{2}_[0-9a-f]{12}_[0-9a-f]{12}$')
def _is_valid_ja4(s):
"""Validate that a string is a well-formed JA4 fingerprint (ASCII hex only)."""
return bool(_JA4_RE.match(s))
def download_ja4db():
"""Download from ja4db.com API and extract browser fingerprints."""
entries = []
rejected = 0
try:
req = urllib.request.Request(JA4DB_URL, headers={"User-Agent": "ja4-platform/1.0"})
with urllib.request.urlopen(req, timeout=30) as r:
data = json.loads(r.read().decode())
for item in data:
ja4 = item.get("ja4_fingerprint")
if not ja4 or not _is_valid_ja4(ja4):
if ja4:
rejected += 1
continue
app = (item.get("application") or "").strip()
ua = (item.get("user_agent_string") or "").strip()
if app and "browser" in app.lower():
entries.append((ja4, app, "ja4db.com (verified)" if item.get("verified") else "ja4db.com"))
elif ua:
family = _classify_ua(ua)
if family:
entries.append((ja4, family, f"ja4db.com UA: {ua[:60]}"))
print(f" [ja4db.com] Downloaded {len(entries)} browser fingerprints "
f"(rejected {rejected} malformed)")
except Exception as e:
print(f" [ja4db.com] Warning: download failed ({e})", file=sys.stderr)
return entries
def download_foxio_mapping():
"""Download FoxIO official ja4plus-mapping.csv."""
entries = []
try:
req = urllib.request.Request(FOXIO_URL, headers={"User-Agent": "ja4-platform/1.0"})
with urllib.request.urlopen(req, timeout=15) as r:
lines = r.read().decode().strip().split("\n")
# Skip header
for line in lines[1:]:
parts = line.split(",")
if len(parts) < 5:
continue
app = parts[0].strip()
lib = parts[1].strip()
ja4 = parts[4].strip()
if not ja4 or not _is_valid_ja4(ja4):
continue
if "browser" in app.lower() or app in ("Mozilla Firefox", "Safari"):
entries.append((ja4, app, f"FoxIO official (lib: {lib})"))
print(f" [FoxIO] Downloaded {len(entries)} browser fingerprints")
except Exception as e:
print(f" [FoxIO] Warning: download failed ({e})", file=sys.stderr)
return entries
def _classify_ua(ua):
"""Classify a user-agent string into a browser family."""
ua_lower = ua.lower()
# Order matters — check more specific strings first
if "crios/" in ua_lower:
return "Chrome_iOS"
if "fxios/" in ua_lower:
return "Firefox_iOS"
if "edg/" in ua_lower or "edge/" in ua_lower:
return "Edge"
if "opr/" in ua_lower or "opera" in ua_lower:
return "Opera"
if "brave" in ua_lower:
return "Brave"
if "vivaldi" in ua_lower:
return "Vivaldi"
if "samsungbrowser" in ua_lower:
return "Samsung_Internet"
if "firefox/" in ua_lower:
return "Firefox"
if "safari/" in ua_lower and "chrome/" not in ua_lower:
return "Safari"
if "chrome/" in ua_lower:
return "Chromium"
return None
def main():
parser = argparse.ArgumentParser(description="Generate browser_ja4.csv")
parser.add_argument("--output", default="browser_ja4.csv",
help="Output CSV file path")
parser.add_argument("--no-download", action="store_true",
help="Skip downloading from remote sources")
args = parser.parse_args()
# Collect all fingerprints: (ja4, browser_family, tls_library, context)
seen = {} # ja4 -> (browser_family, tls_library, context)
# 1. Start with curated list (highest priority)
for ja4, family, lib, ctx in BROWSER_FINGERPRINTS:
if ja4 not in seen:
seen[ja4] = (family, lib, ctx)
# 2. Download from remote sources
if not args.no_download:
print("[browser_ja4] Downloading from remote sources...")
for ja4, family, source in download_foxio_mapping():
if ja4 not in seen:
lib = _lib_from_cipher_hash(ja4)
seen[ja4] = (family, lib, source)
for ja4, family, source in download_ja4db():
if ja4 not in seen:
lib = _lib_from_cipher_hash(ja4)
seen[ja4] = (family, lib, source)
# 3. Write output (use csv module to properly handle commas in context field)
import csv
with open(args.output, "w", newline="") as f:
writer = csv.writer(f)
for ja4 in sorted(seen.keys()):
family, lib, ctx = seen[ja4]
writer.writerow([ja4, family, lib, ctx])
families = set(f for f, _, _ in seen.values())
print(f"[browser_ja4] Generated {len(seen)} unique fingerprints "
f"covering {len(families)} browser families: {', '.join(sorted(families))}")
def _lib_from_cipher_hash(ja4):
"""Infer TLS library from the cipher hash portion of a JA4 fingerprint."""
parts = ja4.split("_")
if len(parts) >= 2:
cipher_hash = parts[1]
return CIPHER_HASH_FAMILIES.get(cipher_hash, "unknown")
return "unknown"
if __name__ == "__main__":
main()

View File

@ -18,6 +18,7 @@
DROP TABLE IF EXISTS ja4_processing.ml_all_scores;
DROP DICTIONARY IF EXISTS ja4_processing.dict_bot_ip;
DROP DICTIONARY IF EXISTS ja4_processing.dict_bot_ja4;
DROP DICTIONARY IF EXISTS ja4_processing.dict_browser_ja4;
DROP DICTIONARY IF EXISTS ja4_processing.dict_asn_reputation;
DROP TABLE IF EXISTS ja4_processing.ml_detected_anomalies;
DROP VIEW IF EXISTS ja4_processing.view_ip_recurrence;
@ -58,6 +59,10 @@ CREATE DICTIONARY ja4_processing.dict_asn_reputation (src_asn UInt64, label Stri
PRIMARY KEY src_asn SOURCE(FILE(path '/var/lib/clickhouse/user_files/asn_reputation.csv' format 'CSV'))
LAYOUT(HASHED()) LIFETIME(MIN 300 MAX 300);
CREATE DICTIONARY ja4_processing.dict_browser_ja4 (ja4 String, browser_family String, tls_library String, context String)
PRIMARY KEY ja4 SOURCE(FILE(path '/var/lib/clickhouse/user_files/browser_ja4.csv' format 'CSV'))
LAYOUT(COMPLEX_KEY_HASHED()) LIFETIME(MIN 300 MAX 300);
-- ============================================================================
-- 3. TABLE D'AGRÉGATION COMPORTEMENTALE (L4 / L5 / L7)
-- ============================================================================
@ -291,6 +296,7 @@ WITH base_data AS (
nullIf(dictGetOrDefault('ja4_processing.dict_bot_ja4', 'bot_name', tuple(a.ja4), ''), ''),
''
) AS bot_name,
dictGetOrDefault('ja4_processing.dict_browser_ja4', 'browser_family', tuple(a.ja4), '') AS browser_family,
a.hits AS hits,
sum(a.hits) OVER (PARTITION BY a.src_ip) AS total_ip_hits,
a.correlated AS correlated, a.tcp_jitter_variance AS tcp_jitter_variance,
@ -437,6 +443,7 @@ WITH base_data AS (
nullIf(dictGetOrDefault('ja4_processing.dict_bot_ja4', 'bot_name', tuple(a.ja4), ''), ''),
''
) AS bot_name,
dictGetOrDefault('ja4_processing.dict_browser_ja4', 'browser_family', tuple(a.ja4), '') AS browser_family,
a.hits AS hits,
sum(a.hits) OVER (PARTITION BY a.src_ip) AS total_ip_hits,
a.correlated AS correlated, a.tcp_jitter_variance AS tcp_jitter_variance,

View File

@ -545,6 +545,43 @@ async def fingerprints() -> dict[str, Any]:
return {"ja4_stats": [], "bot_ja4": []}
# ---------------------------------------------------------------------------
# GET /api/browsers — Browser family distribution from JA4 fingerprints
# ---------------------------------------------------------------------------
@router.get("/browsers")
async def browsers() -> dict[str, Any]:
"""Browser identification via JA4 TLS fingerprint → browser_family dictionary."""
try:
distribution = query(
f"SELECT browser_family, count() AS sessions, "
f"uniqExact(src_ip) AS unique_ips, sum(hits) AS total_hits "
f"FROM {_DB}.view_ai_features_1h "
"WHERE browser_family != '' "
"GROUP BY browser_family ORDER BY sessions DESC"
)
# Also get unknown (no browser match)
unknown = query_scalar(
f"SELECT count() FROM {_DB}.view_ai_features_1h "
"WHERE browser_family = '' AND bot_name = ''"
)
# Top JA4 per browser family
top_ja4 = query(
f"SELECT browser_family, ja4, count() AS sessions "
f"FROM {_DB}.view_ai_features_1h "
"WHERE browser_family != '' "
"GROUP BY browser_family, ja4 ORDER BY browser_family, sessions DESC "
"LIMIT 50"
)
return {
"distribution": distribution,
"unknown_sessions": unknown or 0,
"top_ja4_by_browser": top_ja4,
}
except Exception as exc:
logger.exception("browsers query failed")
return {"distribution": [], "unknown_sessions": 0, "top_ja4_by_browser": []}
# ---------------------------------------------------------------------------
# GET /api/behavior — Feature scatter + distributions
# ---------------------------------------------------------------------------

View File

@ -40,6 +40,19 @@ SOURCE(FILE(path '/var/lib/clickhouse/user_files/asn_reputation.csv' format 'CSV
LAYOUT(HASHED())
LIFETIME(MIN 300 MAX 300);
DROP DICTIONARY IF EXISTS ja4_processing.dict_browser_ja4;
CREATE DICTIONARY ja4_processing.dict_browser_ja4
(
ja4 String,
browser_family String,
tls_library String,
context String
)
PRIMARY KEY ja4
SOURCE(FILE(path '/var/lib/clickhouse/user_files/browser_ja4.csv' format 'CSV'))
LAYOUT(COMPLEX_KEY_HASHED())
LIFETIME(MIN 300 MAX 300);
-- -----------------------------------------------------------------------------
-- agg_host_ip_ja4_1h — behavioral aggregation (L4/L5/L7)

View File

@ -17,6 +17,7 @@ WITH base_data AS (
nullIf(dictGetOrDefault('ja4_processing.dict_bot_ja4', 'bot_name', tuple(a.ja4), ''), ''),
''
) AS bot_name,
dictGetOrDefault('ja4_processing.dict_browser_ja4', 'browser_family', tuple(a.ja4), '') AS browser_family,
-- Anubis: combined UA+IP priority logic > UA only > IP only > ASN > Country
CASE
WHEN dictGet('ja4_processing.dict_anubis_ua', 'has_ip', a.first_ua) = '1'

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff