feat(bot-detector): add XGBoost supervised third voice (#10)
Triple-voice ensemble architecture: - EIF (non-supervisé, anomalies zero-day) - Autoencoder (non-supervisé, corrélations non-linéaires) - XGBoost (supervisé, patterns connus + feedback SOC) XGBoost implementation: - Trained on historical ml_all_scores labels (NORMAL=0, HIGH/CRITICAL/DENY/KNOWN=1) - Weekly retraining (XGB_RETRAIN_INTERVAL_H=168), min 100 labels required - Score = predict_proba, combined via meta-learner: (1-β)*(EIF+AE) + β*xgb_prob - Configurable: XGB_WEIGHT (β=0.20), XGB_MIN_LABELS, XGB_RETRAIN_INTERVAL_HOURS - Graceful fallback: if xgboost unavailable or labels insufficient, EIF+AE only - ClickHouse: xgb_prob column added to ml_all_scores - Tests: 4 new tests (availability, train/predict, meta-learner, save/load) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@ -504,3 +504,86 @@ def test_ae_weight_combination():
|
||||
# Combined should be between min and max of components
|
||||
assert all(combined >= np.minimum(eif_norm, ae_norm) - 1e-9)
|
||||
assert all(combined <= np.maximum(eif_norm, ae_norm) + 1e-9)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
# XGBOOST TESTS
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
def test_xgb_availability_flag():
|
||||
"""Verify XGBoost availability detection works without crashing."""
|
||||
try:
|
||||
import xgboost
|
||||
avail = True
|
||||
except ImportError:
|
||||
avail = False
|
||||
assert isinstance(avail, bool)
|
||||
|
||||
|
||||
def test_xgb_basic_train_and_predict():
|
||||
"""XGBoost can train on labeled data and predict probabilities."""
|
||||
try:
|
||||
import xgboost as xgb
|
||||
except ImportError:
|
||||
pytest.skip("xgboost not installed")
|
||||
|
||||
rng = np.random.default_rng(42)
|
||||
n_features = 10
|
||||
n_samples = 500
|
||||
X = rng.normal(0, 1, (n_samples, n_features))
|
||||
# Label: anomaly if feature 0 > 1.5 (simple rule)
|
||||
y = (X[:, 0] > 1.5).astype(int)
|
||||
|
||||
model = xgb.XGBClassifier(n_estimators=50, max_depth=3,
|
||||
eval_metric='logloss', random_state=42)
|
||||
model.fit(X, y, verbose=False)
|
||||
|
||||
probs = model.predict_proba(X)[:, 1]
|
||||
assert probs.shape == (n_samples,)
|
||||
assert 0 <= probs.min() <= probs.max() <= 1
|
||||
# High feature 0 should have higher probability
|
||||
high_mask = X[:, 0] > 2.0
|
||||
low_mask = X[:, 0] < -1.0
|
||||
if high_mask.any() and low_mask.any():
|
||||
assert np.mean(probs[high_mask]) > np.mean(probs[low_mask]), \
|
||||
"XGBoost should give higher prob to anomalous samples"
|
||||
|
||||
|
||||
def test_xgb_meta_learner_combination():
|
||||
"""Meta-learner combines EIF+AE score and XGBoost probability correctly."""
|
||||
eif_ae_score = np.array([0.1, 0.5, 0.9])
|
||||
xgb_prob = np.array([0.2, 0.6, 0.8])
|
||||
beta = 0.20 # XGB_WEIGHT default
|
||||
combined = (1 - beta) * eif_ae_score + beta * xgb_prob
|
||||
expected = np.array([0.1*0.8 + 0.2*0.2, 0.5*0.8 + 0.6*0.2, 0.9*0.8 + 0.8*0.2])
|
||||
np.testing.assert_allclose(combined, expected, rtol=1e-7)
|
||||
assert all(combined >= 0) and all(combined <= 1)
|
||||
|
||||
|
||||
def test_xgb_save_load_model():
|
||||
"""XGBoost model can be saved and loaded correctly."""
|
||||
try:
|
||||
import xgboost as xgb
|
||||
except ImportError:
|
||||
pytest.skip("xgboost not installed")
|
||||
import tempfile
|
||||
|
||||
rng = np.random.default_rng(42)
|
||||
X = rng.normal(0, 1, (200, 5))
|
||||
y = (X[:, 0] > 1.0).astype(int)
|
||||
|
||||
model = xgb.XGBClassifier(n_estimators=20, max_depth=3,
|
||||
eval_metric='logloss', random_state=42)
|
||||
model.fit(X, y, verbose=False)
|
||||
probs_before = model.predict_proba(X)[:, 1]
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
|
||||
path = f.name
|
||||
try:
|
||||
model.save_model(path)
|
||||
model2 = xgb.XGBClassifier()
|
||||
model2.load_model(path)
|
||||
probs_after = model2.predict_proba(X)[:, 1]
|
||||
np.testing.assert_allclose(probs_before, probs_after, rtol=1e-5)
|
||||
finally:
|
||||
os.remove(path)
|
||||
|
||||
Reference in New Issue
Block a user