feat(bot-detector): add parallel Autoencoder scorer (#9)

- TrafficAutoEncoder class: symmetric AE (n→64→32→16→32→64→n) with BatchNorm+ReLU
- Trained alongside EIF on human_baseline, saved/loaded with model versioning
- Score = per-sample MSE reconstruction error, combined with EIF via AE_WEIGHT (α=0.30)
- AE latent space (16-dim) used for HDBSCAN clustering instead of raw features
- Configurable: AE_WEIGHT, AE_EPOCHS, AE_LATENT_DIM, AE_LEARNING_RATE
- Graceful fallback: if torch unavailable or AE fails, EIF-only scoring continues
- ClickHouse: ae_recon_error column added to ml_all_scores
- Tests: 5 new tests (AE train/score, encode latent, state dict save/load, weight combination)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-08 02:40:39 +02:00
parent f6e2d3c0ca
commit 57cf6c3828
4 changed files with 378 additions and 12 deletions

View File

@ -333,3 +333,174 @@ def test_lag1_autocorrelation_bot_vs_human():
rho_human = 0.0
assert abs(rho_human) < 0.5, f"Human autocorrelation should be low, got {rho_human:.3f}"
# ═══════════════════════════════════════════════════════════════════════════════
# AUTOENCODER TESTS
# ═══════════════════════════════════════════════════════════════════════════════
def test_ae_torch_availability_flag():
"""Verify torch availability detection works without crashing."""
try:
import torch
avail = True
except ImportError:
avail = False
assert isinstance(avail, bool)
def _make_ae(n_features, latent_dim=4):
"""Build a standalone TrafficAutoEncoder for testing (avoids importing bot_detector module)."""
import torch
import torch.nn as nn
class _AE:
def __init__(self, n_feat, ldim):
self.n_features = n_feat
self.latent_dim = ldim
self.device = torch.device('cpu')
dim1 = min(64, max(n_feat, ldim + 4))
dim2 = min(32, max(dim1 // 2, ldim + 2))
self.encoder = nn.Sequential(
nn.Linear(n_feat, dim1), nn.BatchNorm1d(dim1), nn.ReLU(),
nn.Linear(dim1, dim2), nn.BatchNorm1d(dim2), nn.ReLU(),
nn.Linear(dim2, ldim),
)
self.decoder = nn.Sequential(
nn.Linear(ldim, dim2), nn.BatchNorm1d(dim2), nn.ReLU(),
nn.Linear(dim2, dim1), nn.BatchNorm1d(dim1), nn.ReLU(),
nn.Linear(dim1, n_feat), nn.Sigmoid(),
)
self._all_params = list(self.encoder.parameters()) + list(self.decoder.parameters())
self._scaler_min = None
self._scaler_range = None
def _to_tensor(self, X):
if self._scaler_min is not None:
X_n = (X - self._scaler_min) / (self._scaler_range + 1e-9)
else:
X_n = X
return torch.tensor(np.clip(X_n, 0, 1), dtype=torch.float32)
def fit(self, X, epochs=50, lr=1e-3, batch_size=256):
self._scaler_min = X.min(axis=0)
self._scaler_range = X.max(axis=0) - self._scaler_min
X_t = self._to_tensor(X)
dataset = torch.utils.data.TensorDataset(X_t)
loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
optimizer = torch.optim.Adam(self._all_params, lr=lr, weight_decay=1e-5)
criterion = nn.MSELoss()
self.encoder.train(); self.decoder.train()
losses = []
for _ in range(epochs):
epoch_loss = 0.0
for (batch,) in loader:
latent = self.encoder(batch)
recon = self.decoder(latent)
loss = criterion(recon, batch)
optimizer.zero_grad(); loss.backward(); optimizer.step()
epoch_loss += loss.item() * len(batch)
losses.append(epoch_loss / len(X_t))
return {'final_loss': losses[-1], 'epochs': epochs, 'n_samples': len(X)}
def score_samples(self, X):
self.encoder.eval(); self.decoder.eval()
X_t = self._to_tensor(X)
with torch.no_grad():
return ((self.decoder(self.encoder(X_t)) - X_t) ** 2).mean(dim=1).numpy()
def encode(self, X):
self.encoder.eval()
X_t = self._to_tensor(X)
with torch.no_grad():
return self.encoder(X_t).numpy()
def state_dict(self):
return {'encoder': self.encoder.state_dict(), 'decoder': self.decoder.state_dict(),
'scaler_min': self._scaler_min, 'scaler_range': self._scaler_range,
'n_features': self.n_features, 'latent_dim': self.latent_dim}
@classmethod
def load_state_dict(cls, state):
ae = cls(state['n_features'], state['latent_dim'])
ae._scaler_min = state['scaler_min']
ae._scaler_range = state['scaler_range']
ae.encoder.load_state_dict(state['encoder'])
ae.decoder.load_state_dict(state['decoder'])
return ae
return _AE(n_features, latent_dim)
def test_ae_class_train_and_score():
"""TrafficAutoEncoder trains on normal data and scores anomalies higher."""
try:
import torch
except ImportError:
pytest.skip("torch not installed")
rng = np.random.default_rng(42)
n_features = 10
X_normal = rng.normal(0.5, 0.1, (200, n_features)).clip(0, 1)
X_anomaly = rng.uniform(0.8, 1.0, (20, n_features))
ae = _make_ae(n_features, latent_dim=4)
stats = ae.fit(X_normal, epochs=30, lr=1e-3)
assert stats['final_loss'] > 0, "Loss should be positive"
assert stats['epochs'] == 30
assert stats['n_samples'] == 200
normal_scores = ae.score_samples(X_normal)
anomaly_scores = ae.score_samples(X_anomaly)
assert np.mean(anomaly_scores) > np.mean(normal_scores), \
f"Anomaly MSE ({np.mean(anomaly_scores):.4f}) should > normal MSE ({np.mean(normal_scores):.4f})"
def test_ae_encode_latent_space():
"""Autoencoder encode() returns correct dimensionality."""
try:
import torch
except ImportError:
pytest.skip("torch not installed")
rng = np.random.default_rng(42)
X = rng.normal(0.5, 0.1, (50, 8)).clip(0, 1)
ae = _make_ae(8, latent_dim=4)
ae.fit(X, epochs=5)
latent = ae.encode(X)
assert latent.shape == (50, 4), f"Latent shape should be (50, 4), got {latent.shape}"
def test_ae_state_dict_save_load():
"""Autoencoder can save and load state dict."""
try:
import torch
except ImportError:
pytest.skip("torch not installed")
rng = np.random.default_rng(42)
X = rng.normal(0.5, 0.1, (100, 6)).clip(0, 1)
ae = _make_ae(6, latent_dim=3)
ae.fit(X, epochs=10)
scores_before = ae.score_samples(X)
state = ae.state_dict()
ae2 = type(ae).load_state_dict(state)
scores_after = ae2.score_samples(X)
np.testing.assert_allclose(scores_before, scores_after, rtol=1e-5,
err_msg="Scores should be identical after load")
def test_ae_weight_combination():
"""Combined score should be weighted average of EIF and AE components."""
eif_norm = np.array([0.2, 0.8, 0.5])
ae_norm = np.array([0.3, 0.9, 0.4])
alpha = 0.30
combined = (1 - alpha) * eif_norm + alpha * ae_norm
expected = np.array([0.2*0.7 + 0.3*0.3, 0.8*0.7 + 0.9*0.3, 0.5*0.7 + 0.4*0.3])
np.testing.assert_allclose(combined, expected, rtol=1e-7)
# Combined should be between min and max of components
assert all(combined >= np.minimum(eif_norm, ae_norm) - 1e-9)
assert all(combined <= np.maximum(eif_norm, ae_norm) + 1e-9)