feat: rewrite ASN classification with PeeringDB + expanded heuristics

Major improvements to generate_asn_data.py:
- Add PeeringDB network data source (34K networks with info_type)
- Add new categories: education, government, enterprise
- Rename 'human' label to 'isp' across all consumers
- Expand keyword heuristics (ISP, datacenter, hosting, CDN, education, gov)
- Add hard-coded lists for education, government, enterprise ASNs
- Support both --output-dir and --output-asn/--output-ipasn CLI interfaces
- Add --no-peeringdb flag for offline use

Results: unknown dropped from 86% to 57%, ISP coverage 21.8K ASNs,
education 3.1K, enterprise 5.7K, government 520.

Updated consumers:
- bot_detector.py: 'human' -> 'isp' for baseline selection
- dashboard api.py: 'human' -> 'isp' in SQL queries
- run-tests.sh: 'human' -> 'isp' in integration test assertions
- update-csv-data.sh: updated label description comment

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-08 16:02:07 +02:00
parent 9a48fb9d29
commit 5c5bca71d1
6 changed files with 35197 additions and 30362 deletions

View File

@ -1,32 +1,40 @@
#!/usr/bin/env python3
"""
generate_asn_data.py — Download public IPtoASN database and generate:
1. asn_reputation.csv — ASN number + reputation label (no header)
2. iplocate-ip-to-asn.csv — CIDR,ASN,country,AS name (no header)
generate_asn_data.py — Download public IPtoASN + PeeringDB data and generate:
1. asn_reputation.csv — ASN number + reputation label (no header)
2. iplocate-ip-to-asn.csv — CIDR,ASN,country,AS name (no header)
Data source: https://iptoasn.com/data/ip2asn-v4.tsv.gz (free, no registration)
Data sources:
- https://iptoasn.com/data/ip2asn-v4.tsv.gz (IP ranges → ASN mapping)
- https://www.peeringdb.com/api/net (ASN → network type)
Labels: isp, datacenter, hosting, cdn, enterprise, education, government, unknown
Usage:
python3 generate_asn_data.py --output-dir .
python3 generate_asn_data.py --output-dir . --no-download # reuse cached TSV
python3 generate_asn_data.py --output-dir ./data
python3 generate_asn_data.py --output-dir ./data --no-download
python3 generate_asn_data.py --output-dir ./data --no-peeringdb
python3 generate_asn_data.py --output-asn out/asn.csv --output-ipasn out/ip.csv
"""
import argparse
import gzip
import ipaddress
import io
import json
import os
import sys
import urllib.request
IPTOASN_URL = "https://iptoasn.com/data/ip2asn-v4.tsv.gz"
PEERINGDB_URL = "https://www.peeringdb.com/api/net?limit=0&depth=0"
CACHED_TSV = "ip2asn-v4.tsv"
CACHED_PEERINGDB = "peeringdb_nets.json"
# ---------------------------------------------------------------------------
# Hard-coded well-known ASN lists for accurate classification
# Hard-coded well-known ASN lists (highest classification priority)
# ---------------------------------------------------------------------------
KNOWN_HUMAN_ASNS: set[int] = {
KNOWN_ISP_ASNS: set[int] = {
# France
3215, 12322, 15557, 5410, 6799, 29169, 2027,
# Germany
@ -67,17 +75,17 @@ KNOWN_HUMAN_ASNS: set[int] = {
# Turkey
9121, 34984, 47331, 16135,
# Poland
5617, 12912, 6830, 5588, 8374, 21021,
5617, 12912, 5588, 8374, 21021,
# Sweden / Nordics
3301, 1257, 2119, 8473, 12552, 44034, 2116, 29518,
# Switzerland
3303, 6830,
3303,
# Belgium
5432, 6848, 12392,
6848, 12392,
# Portugal
3243, 2860, 8657,
# Ireland
5466, 15502, 6830,
5466, 15502,
# Southeast Asia
4773, 7552, 45899, 9299, 4818, 18403, 17974, 23969, 9534,
24203, 7470,
@ -116,7 +124,7 @@ KNOWN_DATACENTER_ASNS: set[int] = {
14061, 393406,
# Linode / Akamai Connected Cloud
63949, 22040,
# Vultr
# Vultr / Choopa
20473,
# Scaleway / Online SAS
12876,
@ -142,10 +150,20 @@ KNOWN_DATACENTER_ASNS: set[int] = {
36352,
# QuadraNet
8100,
# Choopa (Vultr subsidiary)
20473,
# Zenlayer
21859,
# Yandex Cloud
200350, 208722,
# Huawei Cloud
136907,
# NTT Global
2914,
# Cogent
174,
# GTT
3257,
# Zayo
6461,
}
KNOWN_CDN_ASNS: set[int] = {
@ -170,6 +188,11 @@ KNOWN_CDN_ASNS: set[int] = {
55095, # Cloudflare CN
394536, # Fastly
395973, # Fastly
36040, # Google CDN
15395, # MaxCDN
30081, # CacheFly
60068, # CDN77
200325, # BunnyCDN
}
KNOWN_HOSTING_ASNS: set[int] = {
@ -181,7 +204,6 @@ KNOWN_HOSTING_ASNS: set[int] = {
19871, # Network Solutions
40034, # Confluence Networks
26347, # DreamHost
36351, # SoftLayer (also datacenter, but historically hosting)
29802, # HIVELOCITY
36024, # HostNOC
21844, # ThePlanet / SoftLayer
@ -200,20 +222,186 @@ KNOWN_HOSTING_ASNS: set[int] = {
398101, # GoDaddy hosting
19969, # Joe's Datacenter
395003, # WPEngine
22612, # Namecheap
46664, # VolumeDrive
55286, # B2 Net Solutions / ServerMania
35916, # MultaCom
23470, # ReliableSite
}
KNOWN_EDUCATION_ASNS: set[int] = {
786, # JANET (UK academic network)
21320, # GEANT (European research)
11537, # Internet2 (US research)
2200, # RENATER (France research)
680, # DFN (Germany research)
137, # GARR (Italy research)
766, # RedIRIS (Spain research)
7575, # AARNet (Australia research)
513, # CERN
1741, # FUNET (Finland research)
2852, # CESNET (Czech research)
2603, # NORDUnet (Nordics research)
1930, # FCCN (Portugal research)
2381, # WIDE Project (Japan research)
1103, # SURFNET (Netherlands research)
2018, # AFRINIC
7660, # APNIC research
2500, # WIDE (Japan)
4556, # HKIX (Hong Kong academic)
24151, # CNGI-CERNET2 (China education)
4538, # CERNET (China education)
4837, # CERNET2 backbone
3, # MIT
10578, # Stanford
27, # University of Maryland
46, # UC Berkeley
104, # Carnegie Mellon
224, # Uninett (Norway research)
2852, # CESNET
2547, # Belnet (Belgium research)
1754, # DESY (Germany)
5765, # SANET (Slovakia academic)
9264, # KAIST (South Korea)
}
KNOWN_GOVERNMENT_ASNS: set[int] = {
3354, # US DoD Network Information Center
27064, # DoD Education Activity
721, # DoD / DISA
749, # DoD / DISA
568, # US Postal Service
3541, # Headquarters USAISC
6983, # US Internal Revenue Service
19551, # Incapsula Fed (US gov CDN)
394671, # US Dept of Veterans Affairs
7046, # US DoD
10796, # US Census Bureau
2572, # UK MoD
25180, # Bundesverwaltung (Swiss gov)
553, # BelWue (partially government)
559, # SWITCH (Swiss gov/edu)
}
KNOWN_ENTERPRISE_ASNS: set[int] = {
8068, # Microsoft Corp
36459, # GitHub
32934, # Facebook / Meta
13414, # Twitter / X
54115, # Dropbox
8003, # Uber
14413, # Linkedin
40428, # Salesforce
30148, # Sucuri (enterprise security)
394699, # Zoom
19551, # Shopify
62229, # Spotify
394406, # Slack
}
# ---------------------------------------------------------------------------
# Keyword-based heuristic classification
# PeeringDB info_type → label mapping
# ---------------------------------------------------------------------------
HUMAN_KEYWORDS = [
PEERINGDB_TYPE_MAP: dict[str, str] = {
"Cable/DSL/ISP": "isp",
"NSP": "isp",
"Content": "cdn",
"Enterprise": "enterprise",
"Educational/Research": "education",
"Government": "government",
"Non-Profit": "enterprise",
"Route Server": "unknown",
"Route Collector": "unknown",
}
def download_peeringdb(output_dir: str) -> dict[int, str]:
"""Download PeeringDB network list, return {asn: label} mapping."""
cache_path = os.path.join(output_dir, CACHED_PEERINGDB)
print(f"[peeringdb] Fetching {PEERINGDB_URL} ...")
data = None
try:
req = urllib.request.Request(PEERINGDB_URL, headers={
"User-Agent": "generate_asn_data/2.0",
"Accept": "application/json",
})
with urllib.request.urlopen(req, timeout=60) as resp:
raw = resp.read()
print(f"[peeringdb] Downloaded {len(raw):,} bytes")
data = json.loads(raw)
with open(cache_path, "wb") as f:
f.write(raw)
except Exception as e:
print(f"[warning] PeeringDB download failed: {e}", file=sys.stderr)
if os.path.exists(cache_path):
print(f"[info] Using cached {cache_path}")
with open(cache_path, "r", encoding="utf-8") as f:
data = json.load(f)
else:
print("[warning] No PeeringDB data available, continuing without it.",
file=sys.stderr)
return {}
if not data or "data" not in data:
print("[warning] PeeringDB response has no 'data' key", file=sys.stderr)
return {}
pdb_labels: dict[int, str] = {}
nets = data["data"]
print(f"[peeringdb] Processing {len(nets):,} networks ...")
for net in nets:
asn = net.get("asn", 0)
if not asn or asn <= 0:
continue
info_type = (net.get("info_type") or "").strip()
name = (net.get("name") or "").lower()
# Direct type mapping
if info_type in PEERINGDB_TYPE_MAP:
pdb_labels[asn] = PEERINGDB_TYPE_MAP[info_type]
elif info_type == "Network Services":
# Check keywords to distinguish hosting/datacenter/isp
if _match_keywords(name, HOSTING_KEYWORDS):
pdb_labels[asn] = "hosting"
elif _match_keywords(name, DATACENTER_KEYWORDS):
pdb_labels[asn] = "datacenter"
elif _match_keywords(name, ISP_KEYWORDS):
pdb_labels[asn] = "isp"
else:
pdb_labels[asn] = "enterprise"
elif info_type == "":
# No type set — try keyword classification on name
label = _classify_by_keywords(name)
if label:
pdb_labels[asn] = label
# else: unknown types like Route Server — skip
stats: dict[str, int] = {}
for v in pdb_labels.values():
stats[v] = stats.get(v, 0) + 1
print(f"[peeringdb] Mapped {len(pdb_labels):,} ASNs: {stats}")
return pdb_labels
# ---------------------------------------------------------------------------
# Keyword-based heuristic classification (expanded)
# ---------------------------------------------------------------------------
ISP_KEYWORDS = [
# Generic ISP terms
"telecom", "telcom", "telekom", "telefonica", "telecomunicacoes",
"telecomunicaciones", "telekommunikasjon", "telekommunikation",
"mobile", "broadband", "fiber", "fibre", "cable", "wireless",
"residential", "communications", "comm ", " isp", "netcom",
"internet service", "subscriber", "dsl", "adsl", "vdsl", "ftth",
"fttb", "dial-up", "dialup", "cellular", "lte", "5g network",
"fttb", "fttp", "dial-up", "dialup", "cellular", "lte", "5g network",
"wimax", "satellite", " tel ", "telco", "ptcl", "bsnl",
"telecable", "telemedia", "telnet", "telprom",
"internet provider", "internet access",
# Specific operators — Global
"airtel", "jio", "reliance", "vodafone", "orange", "bouygues",
"proximus", "swisscom", "telenor", "telia", "elisa", "dna oy",
"rogers", "bell canada", "shaw", "telus", "optus", "tpg ",
@ -238,6 +426,38 @@ HUMAN_KEYWORDS = [
"etisalat", "du telecom", "stc ", "zain",
"mtn ", "safaricom", "airtel africa", "rain ",
"telmex", "claro", "vivo ", "oi s.a",
# Additional global operators
"telstra", "spark nz", "chorus nz", "vodacom", "digicel",
"flow ", "liberty cablevision", "totalplay", "izzi ",
"megacable", "axtel", "tigo ", "entel", "bitel", "movilnet",
"cantv", "antel", "copel telecom", "algar telecom", "rede sul",
"surf telecom", "net servicos", "porto seguro",
"dialog axiata", "sri lanka telecom", "banglalink", "grameenphone",
"robi axiata", "ncell", "nepal telecom",
"omantel", "ooredoo", "mobily", "batelco", "viva bahrain",
"nawras", "sure telecom", "airlink", "cool ideas", "vumatel",
"telkom sa", "web africa", "afrihost", "zte corporation",
"smile telecom", "busyinternet", "camtel", "orange cameroun",
"maroc telecom", "inwi", "djezzy", "mobilis", "tunisie telecom",
"ethio telecom", "liquid telecom", "neotel", "cell c",
"a1 telekom", "magenta telekom", "drei austria", "cablecom",
"salt mobile", "sunrise", "post luxembourg", "tango ",
"nova croatia", "ht ", "vivacom", "bulsatcom", "cosmote",
"nova greece", "forthnet", "wind hellas", "cyta",
"telekom srbija", "vip mobile", "telenor serbia",
"telekom slovenije", "a1 slovenia",
"o2 czech", "t-mobile czech", "vodafone czech",
"orange polska", "polkomtel", "play mobile",
"netia", "inea ", "vectra",
"bite ", "tele2", "tet ", "eltel",
"starman", "telia eesti", "elisa eesti",
"lattelecom", "baltcom",
"kyivstar", "lifecell", "ukrtelecom",
"moldtelecom", "orange moldova",
"kazakhtelecom", "beeline kz", "tele2 kz",
"uztelecom", "ucell",
"bakcell", "azercell", "azerconnect",
"veon", "silknet", "magti",
]
DATACENTER_KEYWORDS = [
@ -247,69 +467,149 @@ DATACENTER_KEYWORDS = [
"scaleway", "contabo", "kamatera", "upcloud",
"oracle cloud", "alibaba cloud", "tencent cloud",
"ibm cloud", "softlayer", "rackspace", "equinix",
"leaseweb", "choopa", "data center", "datacenter",
"leaseweb", "choopa", "data center", "datacenter", "data centre",
"colocation", "colo ", "baremetal", "bare metal",
"infrastructure", "iaas", "paas",
# Additional cloud/datacenter providers
"serverel", "packet", "metal ", "nexon", "netmagic",
"phoenixnap", "tierpoint", "cyrusone", "coresite",
"datapipe", "internap", "peak 10", "zayo ", "coreweave",
"lambda cloud", "paperspace", "vast.ai", "fluidstack",
"cloudfloor", "netriplex", "hyperoptic", "serverspace",
"selectel", "aruba cloud", "worldstream", "datacamp",
"m247", "datapacket", "clouvider", "hostkey",
"tzulo", "ramnode", "hostens", "serverius", "servers.com",
"nocix", "fdcservers", "reliablesite",
"quadranet", "cogent ", "colocrossing",
]
HOSTING_KEYWORDS = [
"hosting", "host ", "hoster", "webhost",
"hosting", "host ", "hoster", "webhost", "web host",
"server farm", "vps", "virtual private",
"dedicated server", "shared hosting", "managed hosting",
"reseller", "cpanel", "plesk", "wordpress host",
"godaddy", "namecheap", "hostinger", "bluehost",
"siteground", "a2 hosting", "dreamhost", "hostgator",
"ionos", "squarespace", "wix", "wpengine",
"ionos", "squarespace", "wix", "wpengine", "wp engine",
"register.com", "domain registr", "registrar",
"strato ag", "mittwald", "hosteurope", "all-inkl",
"infomaniak", "gandi", "one.com", "loopia",
"fastcomet", "cloudways", "kinsta", "flywheel",
"netlify", "vercel", "render.com", "railway",
"heroku", "webhosting",
]
CDN_KEYWORDS = [
"cloudflare", "akamai", "fastly", "cdn ",
"content delivery", "edgecast", "limelight",
"stackpath", "sucuri", "keycdn", "bunnycdn",
"stackpath", "sucuri", "keycdn", "bunnycdn", "bunny cdn",
"jsdelivr", "cachefly", "imperva", "incapsula",
"g-core", "gcorelabs", "cdn77", "cdn network",
"medianova", "chinacache", "wangsu", "quantil",
"azion", "section.io", "beluga cdn",
]
EDUCATION_KEYWORDS = [
"university", "universit", "universite", "universidad", "universidade",
"universita", "universitaet", "univ ", "univ.", "univers",
"college", "academic", "academia", "research",
"education", "educational", "school", "institute",
"polytechnic", "hochschule", "fachhochschule",
"nren", "ren ", ".ren", "research network",
"wissenschaft", "forschung",
"campus", "student",
"ecole ", "école", "escuela", "escola",
"fakultat", "fakultet",
"技術", "大学", "学院", "研究", "科学",
"교육", "대학",
"กรม", "มหาวิทยาลัย",
]
GOVERNMENT_KEYWORDS = [
"government", "gouvernement", "gobierno", "governo", "regierung",
"gouv", "gov.", ".gov", "gob.",
"ministry", "ministere", "ministerio", "ministerium", "ministero",
"military", "army", "navy", "defense", "defence", "defensa",
"police", "polizei", "polizia",
"federal", "fedral", "estado", "etat",
"municipal", "prefecture", "region ",
"national agency", "state of ",
"bundeswehr", "armed forces",
"customs", "immigration",
"justic", "judiciary",
]
# Enterprise keywords are lowest priority — many ISPs have these suffixes
ENTERPRISE_KEYWORDS = [
"corporation", "corp.", "corporate",
" inc.", " inc,",
" ltd.", " ltd,",
"gmbh", "s.a.", "s.r.l",
" group", "holdings",
"financial", "banking", "insurance",
"pharmaceutical", "logistics",
"manufacturing", "industrial",
"automotive", "energy",
"retail ", "media group",
]
def classify_asn(asn_number: int, as_name: str) -> str:
"""Classify an ASN into a reputation category."""
# Hard-coded lookups first (highest priority)
def _match_keywords(name_lower: str, keywords: list[str]) -> bool:
"""Check if any keyword matches in the lowered name."""
return any(kw in name_lower for kw in keywords)
def _classify_by_keywords(name_lower: str) -> str:
"""Classify by keyword heuristics. Returns label or empty string."""
if name_lower in ("not routed", "none", "", "-"):
return "unknown"
if _match_keywords(name_lower, CDN_KEYWORDS):
return "cdn"
if _match_keywords(name_lower, EDUCATION_KEYWORDS):
return "education"
if _match_keywords(name_lower, GOVERNMENT_KEYWORDS):
return "government"
if _match_keywords(name_lower, HOSTING_KEYWORDS):
return "hosting"
if _match_keywords(name_lower, DATACENTER_KEYWORDS):
return "datacenter"
if _match_keywords(name_lower, ISP_KEYWORDS):
return "isp"
if _match_keywords(name_lower, ENTERPRISE_KEYWORDS):
return "enterprise"
return ""
def classify_asn(asn_number: int, as_name: str,
peeringdb: dict[int, str]) -> str:
"""Classify an ASN into a reputation category.
Priority: hard-coded lists > PeeringDB > keyword heuristics.
"""
# 1. Hard-coded lookups (highest priority)
if asn_number in KNOWN_CDN_ASNS:
return "cdn"
if asn_number in KNOWN_HOSTING_ASNS:
return "hosting"
if asn_number in KNOWN_DATACENTER_ASNS:
return "datacenter"
if asn_number in KNOWN_HUMAN_ASNS:
return "human"
if asn_number in KNOWN_EDUCATION_ASNS:
return "education"
if asn_number in KNOWN_GOVERNMENT_ASNS:
return "government"
if asn_number in KNOWN_ENTERPRISE_ASNS:
return "enterprise"
if asn_number in KNOWN_ISP_ASNS:
return "isp"
# Keyword heuristics on AS name
# 2. PeeringDB classification
if asn_number in peeringdb:
return peeringdb[asn_number]
# 3. Keyword heuristics on AS name
name_lower = as_name.lower()
# Skip "Not routed" or reserved
if name_lower in ("not routed", "none", "", "-"):
return "unknown"
# CDN first (most specific)
for kw in CDN_KEYWORDS:
if kw in name_lower:
return "cdn"
# Hosting before datacenter (more specific)
for kw in HOSTING_KEYWORDS:
if kw in name_lower:
return "hosting"
# Datacenter
for kw in DATACENTER_KEYWORDS:
if kw in name_lower:
return "datacenter"
# Human / ISP
for kw in HUMAN_KEYWORDS:
if kw in name_lower:
return "human"
return "unknown"
label = _classify_by_keywords(name_lower)
return label if label else "unknown"
def ranges_to_cidrs(start_ip: str, end_ip: str):
@ -330,7 +630,7 @@ def download_iptoasn(output_dir: str) -> str:
print(f"[download] Fetching {IPTOASN_URL} ...")
try:
req = urllib.request.Request(IPTOASN_URL, headers={
"User-Agent": "generate_asn_data/1.0"
"User-Agent": "generate_asn_data/2.0"
})
with urllib.request.urlopen(req, timeout=120) as resp:
compressed = resp.read()
@ -358,7 +658,7 @@ def parse_tsv(tsv_path: str):
"""
entries = []
with open(tsv_path, "r", encoding="utf-8") as f:
for line_no, line in enumerate(f, 1):
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
@ -380,60 +680,92 @@ def parse_tsv(tsv_path: str):
def main():
parser = argparse.ArgumentParser(
description="Generate ASN reputation and IP-to-ASN CSVs from iptoasn.com data"
description="Generate ASN reputation and IP-to-ASN CSVs from iptoasn.com + PeeringDB"
)
parser.add_argument(
"--output-dir", default=".",
help="Directory for output CSV files (default: current dir)"
"--output-dir", default=None,
help="Directory for output CSV files (writes asn_reputation.csv and iplocate-ip-to-asn.csv)"
)
parser.add_argument(
"--output-asn", default=None,
help="Explicit path for asn_reputation.csv output"
)
parser.add_argument(
"--output-ipasn", default=None,
help="Explicit path for iplocate-ip-to-asn.csv output"
)
parser.add_argument(
"--no-download", action="store_true",
help="Skip download, use cached ip2asn-v4.tsv in output-dir"
help="Skip iptoasn.com download, use cached ip2asn-v4.tsv"
)
parser.add_argument(
"--no-peeringdb", action="store_true",
help="Skip PeeringDB download, classify using keyword heuristics only"
)
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
# Resolve output paths — support both --output-dir and --output-asn/--output-ipasn
if args.output_asn and args.output_ipasn:
rep_path = args.output_asn
ip_path = args.output_ipasn
work_dir = os.path.dirname(rep_path) or "."
elif args.output_dir:
work_dir = args.output_dir
rep_path = os.path.join(work_dir, "asn_reputation.csv")
ip_path = os.path.join(work_dir, "iplocate-ip-to-asn.csv")
else:
work_dir = "."
rep_path = os.path.join(work_dir, "asn_reputation.csv")
ip_path = os.path.join(work_dir, "iplocate-ip-to-asn.csv")
# Step 1: Get TSV data
tsv_path = os.path.join(args.output_dir, CACHED_TSV)
os.makedirs(work_dir, exist_ok=True)
# Step 1: Get IP-to-ASN TSV data
tsv_path = os.path.join(work_dir, CACHED_TSV)
if not args.no_download:
tsv_path = download_iptoasn(args.output_dir)
tsv_path = download_iptoasn(work_dir)
elif not os.path.exists(tsv_path):
print(f"[fatal] --no-download specified but {tsv_path} not found",
file=sys.stderr)
sys.exit(1)
# Step 2: Parse
# Step 2: Get PeeringDB data
peeringdb: dict[int, str] = {}
if not args.no_peeringdb:
peeringdb = download_peeringdb(work_dir)
# Step 3: Parse TSV
print("[parse] Reading TSV ...")
entries = parse_tsv(tsv_path)
print(f"[parse] {len(entries):,} IP range entries")
# Step 3: Build ASN info map {asn: (country, as_name)}
# Use first occurrence for name/country (they're consistent in the dataset)
# Step 4: Build ASN info map {asn: (country, as_name)}
asn_info: dict[int, tuple[str, str]] = {}
for _, _, asn, country, as_name in entries:
if asn not in asn_info:
asn_info[asn] = (country, as_name)
# Step 4: Classify all ASNs
# Step 5: Classify all ASNs
asn_labels: dict[int, str] = {}
for asn, (country, as_name) in asn_info.items():
asn_labels[asn] = classify_asn(asn, as_name)
asn_labels[asn] = classify_asn(asn, as_name, peeringdb)
# Step 5: Write asn_reputation.csv (sorted by ASN, no header)
rep_path = os.path.join(args.output_dir, "asn_reputation.csv")
# Include ASNs that are in PeeringDB but not in iptoasn (they have no
# IP ranges but still get a label in asn_reputation.csv)
for asn, label in peeringdb.items():
if asn not in asn_labels:
asn_labels[asn] = label
# Step 6: Write asn_reputation.csv (sorted by ASN, no header)
sorted_asns = sorted(asn_labels.keys())
os.makedirs(os.path.dirname(rep_path) or ".", exist_ok=True)
with open(rep_path, "w") as f:
for asn in sorted_asns:
f.write(f"{asn},{asn_labels[asn]}\n")
print(f"[output] Wrote {len(sorted_asns):,} ASNs to {rep_path}")
# Step 6: Convert ranges to CIDRs and write iplocate-ip-to-asn.csv
ip_path = os.path.join(args.output_dir, "iplocate-ip-to-asn.csv")
cidr_count = 0
# Step 7: Convert ranges to CIDRs and write iplocate-ip-to-asn.csv
seen_cidrs: set[str] = set()
# Collect all rows, then sort
cidr_rows: list[tuple[ipaddress.IPv4Network, int, str, str]] = []
print("[cidr] Converting IP ranges to CIDR notation ...")
for start_ip, end_ip, asn, country, as_name in entries:
@ -444,30 +776,39 @@ def main():
seen_cidrs.add(key)
cidr_rows.append((cidr, asn, country, as_name))
# Sort by network address
cidr_rows.sort(key=lambda r: (r[0].network_address, r[0].prefixlen))
os.makedirs(os.path.dirname(ip_path) or ".", exist_ok=True)
with open(ip_path, "w") as f:
for cidr, asn, country, as_name in cidr_rows:
# Escape commas in AS names
safe_name = as_name.replace(",", " ")
f.write(f"{cidr},{asn},{country},{safe_name}\n")
cidr_count = len(cidr_rows)
print(f"[output] Wrote {cidr_count:,} CIDRs to {ip_path}")
# Step 7: Stats
# Step 8: Stats
stats: dict[str, int] = {}
for label in asn_labels.values():
stats[label] = stats.get(label, 0) + 1
all_labels = ["isp", "datacenter", "hosting", "cdn", "enterprise",
"education", "government", "unknown"]
print("\n=== Summary ===")
print(f"Total ASNs: {len(sorted_asns):,}")
for label in ["human", "datacenter", "hosting", "cdn", "unknown"]:
for label in all_labels:
count = stats.get(label, 0)
pct = 100.0 * count / len(sorted_asns) if sorted_asns else 0
print(f" {label:12s}: {count:>6,} ({pct:5.1f}%)")
print(f"Total CIDRs: {cidr_count:,}")
# Warn if unknown is still too high
unk_pct = 100.0 * stats.get("unknown", 0) / len(sorted_asns) if sorted_asns else 0
if unk_pct > 50:
print(f"\n[warning] {unk_pct:.1f}% of ASNs are still 'unknown'.", file=sys.stderr)
if not peeringdb:
print("[hint] Try running without --no-peeringdb to use PeeringDB data.",
file=sys.stderr)
if __name__ == "__main__":
main()