feat(dashboard): page Listes de référence — visualisation CSV/dictionnaires

Nouvelle page /reflists pour visualiser les 9 dictionnaires ClickHouse :
- bot_ip (3.5K entrées) : IP/CIDR de bots connus
- bot_ja4 (31) : fingerprints JA4 de bots
- browser_ja4 (1.2K) : fingerprints JA4 navigateurs → famille, lib TLS
- asn_reputation (82.5K) : ASN → réputation (isp, datacenter, cdn…)
- iplocate_asn (714K) : géolocalisation IP → ASN, pays, nom
- anubis_ua_rules, anubis_ip_rules, anubis_asn_rules, anubis_country_rules

Fonctionnalités :
- 9 onglets de navigation entre les listes
- Recherche textuelle avec filtrage côté ClickHouse
- Pagination (200 entrées/page)
- Tri par colonne (ASC/DESC)
- Graphique de répartition (ECharts) par catégorie
- KPIs dictionnaires en haut de page
- Infobulles de documentation

API : /api/dictionaries, /api/reflist/{name}, /api/reflist/{name}/stats
Helpers : esc() (HTML escape) ajouté à base.html

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-09 14:56:54 +02:00
parent 039086a0b3
commit 98abbc80c7
4 changed files with 531 additions and 0 deletions

View File

@ -1470,3 +1470,180 @@ async def cluster_detail(cid: int) -> dict[str, Any]:
except Exception as exc:
logger.exception("cluster detail query failed for %s", cid)
raise HTTPException(status_code=500, detail=str(exc))
# ═══════════════════════════════════════════════════════════════════════════════
# Listes de référence (CSV / dictionnaires ClickHouse)
# ═══════════════════════════════════════════════════════════════════════════════
@router.get("/dictionaries")
async def dictionaries_meta():
"""Métadonnées de tous les dictionnaires ClickHouse."""
try:
rows = query(
"SELECT name, type, status, element_count, "
" arrayStringConcat(attribute.names, ', ') AS attributes "
"FROM system.dictionaries "
f"WHERE database = '{_DB}' "
"ORDER BY name",
)
return {"dictionaries": rows}
except Exception as exc:
logger.exception("dictionaries meta query failed")
raise HTTPException(status_code=500, detail=str(exc))
_REFLIST_SORT = {
"bot_ip": {"prefix", "bot_name"},
"bot_ja4": {"ja4", "bot_name"},
"browser_ja4": {"ja4", "browser_family", "tls_library"},
"asn_reputation": {"src_asn", "label"},
"iplocate_asn": {"asn", "country_code", "name", "network"},
"anubis_ua_rules": {"id", "regexp", "bot_name", "action", "category"},
"anubis_ip_rules": {"prefix", "bot_name", "action", "category"},
"anubis_asn_rules": {"asn", "bot_name", "action", "category"},
"anubis_country_rules": {"country_code", "bot_name", "action", "category"},
}
_REFLIST_SEARCH_COLS: dict[str, list[str]] = {
"bot_ip": ["prefix", "bot_name"],
"bot_ja4": ["ja4", "bot_name"],
"browser_ja4": ["ja4", "browser_family", "tls_library", "context"],
"asn_reputation": ["toString(src_asn)", "label"],
"iplocate_asn": ["network", "toString(asn)", "country_code", "name"],
"anubis_ua_rules": ["regexp", "bot_name", "action", "category"],
"anubis_ip_rules": ["prefix", "bot_name", "action", "category"],
"anubis_asn_rules": ["toString(asn)", "bot_name", "action", "category"],
"anubis_country_rules": ["country_code", "bot_name", "action", "category"],
}
_REFLIST_QUERIES: dict[str, str] = {
"bot_ip": f"SELECT prefix, bot_name FROM dictionary('{_DB}.dict_bot_ip')",
"bot_ja4": f"SELECT ja4, bot_name FROM dictionary('{_DB}.dict_bot_ja4')",
"browser_ja4": (
f"SELECT ja4, browser_family, tls_library, context "
f"FROM dictionary('{_DB}.dict_browser_ja4')"
),
"asn_reputation": (
f"SELECT src_asn, label FROM dictionary('{_DB}.dict_asn_reputation')"
),
"iplocate_asn": (
f"SELECT network, asn, country_code, name "
f"FROM dictionary('{_DB}.dict_iplocate_asn')"
),
"anubis_ua_rules": (
f"SELECT id, parent_id, regexp, "
f" arrayElement(values, indexOf(keys, 'bot_name')) AS bot_name, "
f" arrayElement(values, indexOf(keys, 'action')) AS action, "
f" arrayElement(values, indexOf(keys, 'category')) AS category "
f"FROM {_DB}.anubis_ua_rules"
),
"anubis_ip_rules": (
f"SELECT prefix, bot_name, action, category FROM {_DB}.anubis_ip_rules"
),
"anubis_asn_rules": (
f"SELECT asn, bot_name, action, category FROM {_DB}.anubis_asn_rules"
),
"anubis_country_rules": (
f"SELECT country_code, bot_name, action, category FROM {_DB}.anubis_country_rules"
),
}
@router.get("/reflist/{name}")
async def reflist(
name: str,
limit: int = Query(default=200, ge=1, le=10000),
offset: int = Query(default=0, ge=0),
sort: str = Query(default=""),
order: str = Query(default="ASC"),
search: str = Query(default=""),
):
"""Contenu paginé d'une liste de référence / dictionnaire."""
if name not in _REFLIST_QUERIES:
raise HTTPException(status_code=404, detail=f"Unknown reflist: {name}")
base_q = _REFLIST_QUERIES[name]
order_clause = ""
if sort and sort in _REFLIST_SORT.get(name, set()):
direction = "DESC" if order.upper() == "DESC" else "ASC"
order_clause = f" ORDER BY {sort} {direction}"
where_clause = ""
params: dict = {}
if search:
params["_q"] = f"%{search}%"
cols = _REFLIST_SEARCH_COLS.get(name, [])
if cols:
conditions = " OR ".join(f"{c} LIKE {{_q:String}}" for c in cols)
where_clause = f" WHERE ({conditions})"
try:
wrapped = f"SELECT * FROM ({base_q}){where_clause}"
count_q = f"SELECT count() AS total FROM ({wrapped})"
total_row = query(count_q, params or None)
total = total_row[0]["total"] if total_row else 0
data_q = f"{wrapped}{order_clause} LIMIT {int(limit)} OFFSET {int(offset)}"
rows = query(data_q, params or None)
return {"name": name, "total": total, "limit": limit, "offset": offset, "rows": rows}
except Exception as exc:
logger.exception("reflist query failed for %s", name)
raise HTTPException(status_code=500, detail=str(exc))
@router.get("/reflist/{name}/stats")
async def reflist_stats(name: str):
"""Statistiques agrégées pour une liste de référence."""
if name not in _REFLIST_QUERIES:
raise HTTPException(status_code=404, detail=f"Unknown reflist: {name}")
base_q = _REFLIST_QUERIES[name]
try:
count_q = f"SELECT count() AS total FROM ({base_q})"
total_row = query(count_q)
total = total_row[0]["total"] if total_row else 0
agg: list = []
if name == "bot_ip":
agg = query(
f"SELECT bot_name, count() AS cnt FROM ({base_q}) "
"GROUP BY bot_name ORDER BY cnt DESC LIMIT 20"
)
elif name == "bot_ja4":
agg = query(
f"SELECT bot_name, count() AS cnt FROM ({base_q}) "
"GROUP BY bot_name ORDER BY cnt DESC LIMIT 20"
)
elif name == "browser_ja4":
agg = query(
f"SELECT browser_family, count() AS cnt FROM ({base_q}) "
"GROUP BY browser_family ORDER BY cnt DESC LIMIT 20"
)
elif name == "asn_reputation":
agg = query(
f"SELECT label, count() AS cnt FROM ({base_q}) "
"GROUP BY label ORDER BY cnt DESC"
)
elif name == "iplocate_asn":
agg = query(
f"SELECT country_code, count() AS cnt FROM ({base_q}) "
"GROUP BY country_code ORDER BY cnt DESC LIMIT 20"
)
elif name == "anubis_ip_rules":
agg = query(
f"SELECT action, count() AS cnt FROM ({base_q}) "
"GROUP BY action ORDER BY cnt DESC"
)
elif name == "anubis_asn_rules":
agg = query(
f"SELECT action, count() AS cnt FROM ({base_q}) "
"GROUP BY action ORDER BY cnt DESC"
)
return {"name": name, "total": total, "breakdown": agg}
except Exception as exc:
logger.exception("reflist stats query failed for %s", name)
raise HTTPException(status_code=500, detail=str(exc))

View File

@ -76,3 +76,8 @@ async def cluster_detail_page(request: Request, cid: int):
@router.get("/tactics")
async def tactics_page(request: Request):
return templates.TemplateResponse("tactics.html", _ctx(request, "tactics"))
@router.get("/reflists")
async def reflists_page(request: Request):
return templates.TemplateResponse("reflists.html", _ctx(request, "reflists"))