maj cumulative

This commit is contained in:
SOC Analyst
2026-03-18 13:56:39 +01:00
parent 32a96966dd
commit c887846af5
18 changed files with 986 additions and 686 deletions

View File

@ -19,7 +19,8 @@ async def get_detections(
asn_number: Optional[str] = Query(None, description="Filtrer par ASN"),
search: Optional[str] = Query(None, description="Recherche texte (IP, JA4, Host)"),
sort_by: str = Query("detected_at", description="Trier par"),
sort_order: str = Query("DESC", description="Ordre (ASC/DESC)")
sort_order: str = Query("DESC", description="Ordre (ASC/DESC)"),
group_by_ip: bool = Query(False, description="Grouper par IP (first_seen/last_seen agrégés)")
):
"""
Récupère la liste des détections avec pagination et filtres
@ -47,7 +48,7 @@ async def get_detections(
if search:
where_clauses.append(
"(src_ip ILIKE %(search)s OR ja4 ILIKE %(search)s OR host ILIKE %(search)s)"
"(ilike(toString(src_ip), %(search)s) OR ilike(ja4, %(search)s) OR ilike(host, %(search)s))"
)
params["search"] = f"%{search}%"
@ -66,6 +67,124 @@ async def get_detections(
# Requête principale
offset = (page - 1) * page_size
sort_order = "DESC" if sort_order.upper() == "DESC" else "ASC"
# ── Mode groupé par IP (first_seen / last_seen depuis la DB) ────────────
if group_by_ip:
valid_sort_grouped = ["anomaly_score", "hits", "hit_velocity", "first_seen", "last_seen", "src_ip", "detected_at"]
grouped_sort = sort_by if sort_by in valid_sort_grouped else "last_seen"
# detected_at → last_seen (max(detected_at) dans le GROUP BY)
if grouped_sort == "detected_at":
grouped_sort = "last_seen"
# In outer query, min_score is exposed as anomaly_score — keep the alias
outer_sort = "min_score" if grouped_sort == "anomaly_score" else grouped_sort
# Count distinct IPs
count_ip_query = f"""
SELECT uniq(src_ip)
FROM ml_detected_anomalies
WHERE {where_clause}
"""
cr = db.query(count_ip_query, params)
total = cr.result_rows[0][0] if cr.result_rows else 0
grouped_query = f"""
SELECT
ip_data.src_ip,
ip_data.first_seen,
ip_data.last_seen,
ip_data.detection_count,
ip_data.unique_ja4s,
ip_data.unique_hosts,
ip_data.min_score AS anomaly_score,
ip_data.threat_level,
ip_data.model_name,
ip_data.country_code,
ip_data.asn_number,
ip_data.asn_org,
ip_data.hit_velocity,
ip_data.hits,
ip_data.asn_label,
ar.label AS asn_rep_label
FROM (
SELECT
src_ip,
min(detected_at) AS first_seen,
max(detected_at) AS last_seen,
count() AS detection_count,
groupUniqArray(5)(ja4) AS unique_ja4s,
groupUniqArray(5)(host) AS unique_hosts,
min(anomaly_score) AS min_score,
argMin(threat_level, anomaly_score) AS threat_level,
argMin(model_name, anomaly_score) AS model_name,
any(country_code) AS country_code,
any(asn_number) AS asn_number,
any(asn_org) AS asn_org,
max(hit_velocity) AS hit_velocity,
sum(hits) AS hits,
any(asn_label) AS asn_label
FROM ml_detected_anomalies
WHERE {where_clause}
GROUP BY src_ip
) ip_data
LEFT JOIN mabase_prod.asn_reputation ar
ON ar.src_asn = toUInt32OrZero(ip_data.asn_number)
ORDER BY {outer_sort} {sort_order}
LIMIT %(limit)s OFFSET %(offset)s
"""
params["limit"] = page_size
params["offset"] = offset
gresult = db.query(grouped_query, params)
def _label_to_score(label: str) -> float | None:
if not label: return None
mapping = {'human': 0.9, 'bot': 0.05, 'proxy': 0.25, 'vpn': 0.3,
'tor': 0.1, 'datacenter': 0.4, 'scanner': 0.05, 'malicious': 0.05}
return mapping.get(label.lower(), 0.5)
detections = []
for row in gresult.result_rows:
# row: src_ip, first_seen, last_seen, detection_count, unique_ja4s, unique_hosts,
# anomaly_score, threat_level, model_name, country_code, asn_number, asn_org,
# hit_velocity, hits, asn_label, asn_rep_label
ja4s = list(row[4]) if row[4] else []
hosts = list(row[5]) if row[5] else []
detections.append(Detection(
detected_at=row[1],
src_ip=str(row[0]),
ja4=ja4s[0] if ja4s else "",
host=hosts[0] if hosts else "",
bot_name="",
anomaly_score=float(row[6]) if row[6] else 0.0,
threat_level=row[7] or "LOW",
model_name=row[8] or "",
recurrence=int(row[3] or 0),
asn_number=str(row[10]) if row[10] else "",
asn_org=row[11] or "",
asn_detail="",
asn_domain="",
country_code=row[9] or "",
asn_label=row[14] or "",
hits=int(row[13] or 0),
hit_velocity=float(row[12]) if row[12] else 0.0,
fuzzing_index=0.0,
post_ratio=0.0,
reason="",
asn_rep_label=row[15] or "",
asn_score=_label_to_score(row[15] or ""),
first_seen=row[1],
last_seen=row[2],
unique_ja4s=ja4s,
unique_hosts=hosts,
))
total_pages = (total + page_size - 1) // page_size
return DetectionsListResponse(
items=detections, total=total, page=page,
page_size=page_size, total_pages=total_pages
)
# ── Mode individuel (comportement original) ──────────────────────────────
# Validation du tri
valid_sort_columns = [
"detected_at", "src_ip", "threat_level", "anomaly_score",
@ -74,8 +193,6 @@ async def get_detections(
if sort_by not in valid_sort_columns:
sort_by = "detected_at"
sort_order = "DESC" if sort_order.upper() == "DESC" else "ASC"
main_query = f"""
SELECT
detected_at,

View File

@ -33,8 +33,8 @@ async def get_top_anomalies(limit: int = Query(50, ge=1, le=500)):
any(a.ja4) AS ja4,
any(a.host) AS host,
sum(a.hits) AS hits,
round(max(uniqMerge(a.uniq_query_params))
/ greatest(max(uniqMerge(a.uniq_paths)), 1), 4) AS fuzzing_index,
round(uniqMerge(a.uniq_query_params)
/ greatest(uniqMerge(a.uniq_paths), 1), 4) AS fuzzing_index,
round(sum(a.hits)
/ greatest(dateDiff('second', min(a.first_seen), max(a.last_seen)), 1), 2) AS hit_velocity,
round(sum(a.count_head) / greatest(sum(a.hits), 1), 4) AS head_ratio,
@ -378,16 +378,27 @@ async def get_ml_scatter(limit: int = Query(200, ge=1, le=1000)):
try:
sql = """
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
any(ja4) AS ja4,
round(max(uniqMerge(uniq_query_params)) / greatest(max(uniqMerge(uniq_paths)), 1), 4) AS fuzzing_index,
round(sum(hits) / greatest(dateDiff('second', min(first_seen), max(last_seen)), 1), 2) AS hit_velocity,
sum(hits) AS hits,
round(sum(count_head) / greatest(sum(hits), 1), 4) AS head_ratio,
max(correlated_raw) AS correlated
FROM mabase_prod.agg_host_ip_ja4_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
GROUP BY src_ip
ip,
ja4,
round(fuzzing_index, 4) AS fuzzing_index,
round(total_hits / greatest(dateDiff('second', min_first, max_last), 1), 2) AS hit_velocity,
total_hits AS hits,
round(total_count_head / greatest(total_hits, 1), 4) AS head_ratio,
correlated
FROM (
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
any(ja4) AS ja4,
uniqMerge(uniq_query_params) / greatest(uniqMerge(uniq_paths), 1) AS fuzzing_index,
sum(hits) AS total_hits,
min(first_seen) AS min_first,
max(last_seen) AS max_last,
sum(count_head) AS total_count_head,
max(correlated_raw) AS correlated
FROM mabase_prod.agg_host_ip_ja4_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
GROUP BY src_ip
)
ORDER BY fuzzing_index DESC
LIMIT %(limit)s
"""

129
backend/routes/search.py Normal file
View File

@ -0,0 +1,129 @@
"""
Endpoint de recherche globale rapide — utilisé par la barre Cmd+K
"""
from fastapi import APIRouter, Query
from ..database import db
router = APIRouter(prefix="/api/search", tags=["search"])
IP_RE = r"^(\d{1,3}\.){0,3}\d{1,3}$"
@router.get("/quick")
async def quick_search(q: str = Query(..., min_length=1, max_length=100)):
"""
Recherche unifiée sur IPs, JA4, ASN, hosts.
Retourne jusqu'à 5 résultats par catégorie.
"""
q = q.strip()
pattern = f"%{q}%"
results = []
# ── IPs ──────────────────────────────────────────────────────────────────
ip_rows = db.query(
"""
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS clean_ip,
count() AS hits,
max(detected_at) AS last_seen,
any(threat_level) AS threat_level
FROM ml_detected_anomalies
WHERE ilike(toString(src_ip), %(p)s)
AND detected_at >= now() - INTERVAL 24 HOUR
GROUP BY clean_ip
ORDER BY hits DESC
LIMIT 5
""",
{"p": pattern},
)
for r in ip_rows.result_rows:
ip = str(r[0])
results.append({
"type": "ip",
"value": ip,
"label": ip,
"meta": f"{r[1]} détections · {r[3]}",
"url": f"/detections/ip/{ip}",
"investigation_url": f"/investigation/{ip}",
})
# ── JA4 fingerprints ─────────────────────────────────────────────────────
ja4_rows = db.query(
"""
SELECT
ja4,
count() AS hits,
uniq(src_ip) AS unique_ips
FROM ml_detected_anomalies
WHERE ilike(ja4, %(p)s)
AND ja4 != ''
AND detected_at >= now() - INTERVAL 24 HOUR
GROUP BY ja4
ORDER BY hits DESC
LIMIT 5
""",
{"p": pattern},
)
for r in ja4_rows.result_rows:
results.append({
"type": "ja4",
"value": str(r[0]),
"label": str(r[0]),
"meta": f"{r[1]} détections · {r[2]} IPs",
"url": f"/investigation/ja4/{r[0]}",
})
# ── Hosts ─────────────────────────────────────────────────────────────────
host_rows = db.query(
"""
SELECT
host,
count() AS hits,
uniq(src_ip) AS unique_ips
FROM ml_detected_anomalies
WHERE ilike(host, %(p)s)
AND host != ''
AND detected_at >= now() - INTERVAL 24 HOUR
GROUP BY host
ORDER BY hits DESC
LIMIT 5
""",
{"p": pattern},
)
for r in host_rows.result_rows:
results.append({
"type": "host",
"value": str(r[0]),
"label": str(r[0]),
"meta": f"{r[1]} hits · {r[2]} IPs",
"url": f"/detections?search={r[0]}",
})
# ── ASN ───────────────────────────────────────────────────────────────────
asn_rows = db.query(
"""
SELECT
asn_org,
asn_number,
count() AS hits,
uniq(src_ip) AS unique_ips
FROM ml_detected_anomalies
WHERE (ilike(asn_org, %(p)s) OR ilike(asn_number, %(p)s))
AND asn_org != '' AND asn_number != ''
AND detected_at >= now() - INTERVAL 24 HOUR
GROUP BY asn_org, asn_number
ORDER BY hits DESC
LIMIT 5
""",
{"p": pattern},
)
for r in asn_rows.result_rows:
results.append({
"type": "asn",
"value": str(r[1]),
"label": f"AS{r[1]}{r[0]}",
"meta": f"{r[2]} hits · {r[3]} IPs",
"url": f"/detections?asn={r[1]}",
})
return {"query": q, "results": results}

View File

@ -44,17 +44,22 @@ async def get_associated_ips(
column = type_column_map[attr_type]
query = f"""
SELECT DISTINCT src_ip
SELECT src_ip, count() AS hit_count
FROM ml_detected_anomalies
WHERE {column} = %(value)s
AND detected_at >= now() - INTERVAL 24 HOUR
ORDER BY src_ip
GROUP BY src_ip
ORDER BY hit_count DESC
LIMIT %(limit)s
"""
result = db.query(query, {"value": value, "limit": limit})
ips = [str(row[0]) for row in result.result_rows]
total_hits = sum(row[1] for row in result.result_rows) or 1
ips = [
{"ip": str(row[0]), "count": row[1], "percentage": round(row[1] * 100.0 / total_hits, 2)}
for row in result.result_rows
]
# Compter le total
count_query = f"""
@ -491,42 +496,77 @@ async def get_variability(attr_type: str, value: str):
first_seen = stats_row[2]
last_seen = stats_row[3]
# User-Agents via view_dashboard_user_agents (source principale pour les UAs)
# Colonnes disponibles: src_ip, ja4, hour, log_date, user_agents, requests
# User-Agents depuis http_logs pour des comptes exacts par requête
# (view_dashboard_user_agents déduplique par heure, ce qui sous-compte les hits)
_ua_params: dict = {"value": value}
if attr_type == "ip":
_ua_where = "toString(src_ip) = %(value)s"
_ua_params: dict = {"value": value}
_ua_logs_where = "src_ip = toIPv4(%(value)s)"
ua_query_simple = f"""
SELECT
header_user_agent AS user_agent,
count() AS count,
round(count() * 100.0 / (
SELECT count() FROM mabase_prod.http_logs
WHERE {_ua_logs_where} AND time >= now() - INTERVAL 24 HOUR
), 2) AS percentage,
min(time) AS first_seen,
max(time) AS last_seen
FROM mabase_prod.http_logs
WHERE {_ua_logs_where}
AND time >= now() - INTERVAL 24 HOUR
AND header_user_agent != '' AND header_user_agent IS NOT NULL
GROUP BY user_agent
ORDER BY count DESC
"""
ua_result = db.query(ua_query_simple, _ua_params)
user_agents = [get_attribute_value(row, 1, 2, 3, 4) for row in ua_result.result_rows]
elif attr_type == "ja4":
_ua_where = "ja4 = %(value)s"
_ua_params = {"value": value}
_ua_logs_where = "ja4 = %(value)s"
ua_query_simple = f"""
SELECT
header_user_agent AS user_agent,
count() AS count,
round(count() * 100.0 / (
SELECT count() FROM mabase_prod.http_logs
WHERE {_ua_logs_where} AND time >= now() - INTERVAL 24 HOUR
), 2) AS percentage,
min(time) AS first_seen,
max(time) AS last_seen
FROM mabase_prod.http_logs
WHERE {_ua_logs_where}
AND time >= now() - INTERVAL 24 HOUR
AND header_user_agent != '' AND header_user_agent IS NOT NULL
GROUP BY user_agent
ORDER BY count DESC
LIMIT 20
"""
ua_result = db.query(ua_query_simple, _ua_params)
user_agents = [get_attribute_value(row, 1, 2, 3, 4) for row in ua_result.result_rows]
else:
# country / asn / host: pivot via ml_detected_anomalies → IPs
# country / asn / host: pivot via ml_detected_anomalies → IPs, puis view UA
_ua_where = f"""toString(src_ip) IN (
SELECT DISTINCT replaceRegexpAll(toString(src_ip), '^::ffff:', '')
FROM ml_detected_anomalies
WHERE {column} = %(value)s AND detected_at >= now() - INTERVAL 24 HOUR
)"""
_ua_params = {"value": value}
ua_query_simple = f"""
SELECT
ua AS user_agent,
sum(requests) AS count,
round(sum(requests) * 100.0 / sum(sum(requests)) OVER (), 2) AS percentage,
min(log_date) AS first_seen,
max(log_date) AS last_seen
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE {_ua_where}
AND hour >= now() - INTERVAL 24 HOUR
AND ua != ''
GROUP BY user_agent
ORDER BY count DESC
LIMIT 10
"""
ua_result = db.query(ua_query_simple, _ua_params)
user_agents = [get_attribute_value(row, 1, 2, 3, 4) for row in ua_result.result_rows]
ua_query_simple = f"""
SELECT
ua AS user_agent,
sum(requests) AS count,
round(sum(requests) * 100.0 / sum(sum(requests)) OVER (), 2) AS percentage,
min(log_date) AS first_seen,
max(log_date) AS last_seen
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE {_ua_where}
AND hour >= now() - INTERVAL 24 HOUR
AND ua != ''
GROUP BY user_agent
ORDER BY count DESC
LIMIT 20
"""
ua_result = db.query(ua_query_simple, _ua_params)
user_agents = [get_attribute_value(row, 1, 2, 3, 4) for row in ua_result.result_rows]
# JA4 fingerprints
ja4_query = f"""