- Tooltip entier (intersect:false) + ligne verticale crosshair sur tous les graphes - Zoom molette/pinch sur l'axe X, pan souris, limites clamped 30-3000 keV - Toggle échelle log/linéaire onglet Background - Extraction continuum détecteur (isotope peaks subtracted + Gaussian smoothing) - Reprise snapshot précédent au démarrage capture_background.py - Suppression refs "Théorique" et "Bruit capteur" de l'interface - Plugin chartjs-plugin-zoom + hammerjs via CDN - Fix Chart constructor spread operator Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
147 lines
5.3 KiB
Python
147 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Capture le bruit de fond du détecteur sur 24h (sans source).
|
|
Gère le débranchement/rebranchement du détecteur.
|
|
À lancer séparément avant le moniteur :
|
|
docker-compose run --rm detect python capture_background.py
|
|
"""
|
|
import numpy as np
|
|
import time
|
|
import json
|
|
import os
|
|
|
|
SAMPLE_INTERVAL = int(os.environ.get("SAMPLE_INTERVAL", "60"))
|
|
RESET_INTERVAL = int(os.environ.get("RESET_INTERVAL", "3600")) # Reset detector every N seconds (default: 1h)
|
|
TARGET_DURATION = int(os.environ.get("TARGET_DURATION", str(86400))) # 24h
|
|
OUTPUT_PATH = os.environ.get("BACKGROUND_PATH", "/data/background_24h.npy")
|
|
SNAPSHOT_PATH = os.environ.get("SNAPSHOT_PATH", "/data/background_snapshot.json")
|
|
|
|
BG_COUNTS = np.zeros(1024, dtype=np.float64)
|
|
BG_LIVE_TIME = 0.0
|
|
device = None
|
|
last_counts = None
|
|
last_live_time = None
|
|
last_reset_time = 0
|
|
|
|
# Resume from previous snapshot if it exists
|
|
if os.path.exists(SNAPSHOT_PATH):
|
|
try:
|
|
with open(SNAPSHOT_PATH) as _f:
|
|
_prev = json.load(_f)
|
|
_prev_spectrum = _prev.get("spectrum", [])
|
|
if len(_prev_spectrum) == 1024:
|
|
BG_COUNTS = np.array(_prev_spectrum, dtype=np.float64)
|
|
BG_LIVE_TIME = float(_prev.get("live_time_s", 0))
|
|
print(f"Snapshot anterieur charge : {BG_LIVE_TIME:.0f}s live, {BG_COUNTS.sum():.0f} coups")
|
|
except Exception as _e:
|
|
print(f"Impossible de charger le snapshot anterieur : {_e}")
|
|
|
|
def save_snapshot():
|
|
"""Save a human-readable snapshot of current background."""
|
|
cps = BG_COUNTS.sum() / BG_LIVE_TIME if BG_LIVE_TIME > 0 else 0
|
|
# Approximate energy calibration for RC-103: E ≈ 0.33 + 2.97*ch
|
|
peaks = []
|
|
max_c = BG_COUNTS.max()
|
|
if max_c > 0:
|
|
for i, c in enumerate(BG_COUNTS):
|
|
if c > max_c * 0.03:
|
|
energy = 0.33 + 2.97 * i
|
|
peaks.append({"channel": i, "energy_kev": round(energy, 1), "counts": round(float(c), 1)})
|
|
|
|
snapshot = {
|
|
"elapsed_hours": round((time.time() - start) / 3600, 2),
|
|
"live_time_s": round(BG_LIVE_TIME, 1),
|
|
"total_counts": round(float(BG_COUNTS.sum()), 0),
|
|
"cps": round(cps, 2),
|
|
"top_peaks": sorted(peaks, key=lambda x: -x["counts"])[:15],
|
|
"spectrum": [round(float(c), 1) for c in BG_COUNTS],
|
|
}
|
|
with open(SNAPSHOT_PATH, "w") as f:
|
|
json.dump(snapshot, f, indent=2)
|
|
|
|
print(f"Capture du bruit de fond pendant {TARGET_DURATION/3600:.0f}h...")
|
|
print("Assurez-vous qu'aucune source radioactive n'est a proximite du detecteur.")
|
|
print()
|
|
|
|
start_offset = 0
|
|
if os.path.exists(SNAPSHOT_PATH):
|
|
try:
|
|
with open(SNAPSHOT_PATH) as _f:
|
|
_prev = json.load(_f)
|
|
start_offset = float(_prev.get("elapsed_hours", 0)) * 3600
|
|
except Exception:
|
|
pass
|
|
|
|
start = time.time() - start_offset
|
|
last_reset_time = time.time()
|
|
|
|
while (time.time() - start) < TARGET_DURATION:
|
|
time.sleep(SAMPLE_INTERVAL)
|
|
try:
|
|
if device is None:
|
|
from radiacode import RadiaCode
|
|
|
|
device = RadiaCode()
|
|
device.spectrum_reset()
|
|
last_counts = None
|
|
last_live_time = None
|
|
last_reset_time = time.time()
|
|
print("Radiacode connecte.")
|
|
|
|
spectrum = device.spectrum()
|
|
counts = np.array(spectrum.counts, dtype=np.float64)
|
|
live_time = spectrum.duration.total_seconds()
|
|
|
|
# Compute delta since last read (avoid double-counting)
|
|
if last_counts is not None and last_live_time is not None:
|
|
delta_counts = counts - last_counts
|
|
delta_live_time = live_time - last_live_time
|
|
# If detector was reset externally, delta would be negative
|
|
if delta_counts.sum() < 0 or delta_live_time < 0:
|
|
delta_counts = counts
|
|
delta_live_time = live_time
|
|
BG_COUNTS += delta_counts
|
|
BG_LIVE_TIME += max(delta_live_time, 0)
|
|
else:
|
|
BG_COUNTS += counts
|
|
BG_LIVE_TIME += live_time
|
|
|
|
last_counts = counts.copy()
|
|
last_live_time = live_time
|
|
|
|
# Only reset detector spectrum at RESET_INTERVAL
|
|
now = time.time()
|
|
if now - last_reset_time >= RESET_INTERVAL:
|
|
device.spectrum_reset()
|
|
last_counts = None
|
|
last_live_time = None
|
|
last_reset_time = now
|
|
print(f" → Reset détecteur (intervalle {RESET_INTERVAL}s)")
|
|
|
|
elapsed = time.time() - start
|
|
cps = BG_COUNTS.sum() / BG_LIVE_TIME if BG_LIVE_TIME > 0 else 0
|
|
print(
|
|
f"Background : {elapsed/3600:.1f}h / {TARGET_DURATION/3600:.1f}h "
|
|
f"({BG_LIVE_TIME:.0f}s live, {BG_COUNTS.sum():.0f} coups, {cps:.1f} CPS)",
|
|
flush=True,
|
|
)
|
|
save_snapshot()
|
|
except Exception as e:
|
|
print(f"\nErreur : {e}, reconnexion...")
|
|
device = None
|
|
last_counts = None
|
|
last_live_time = None
|
|
|
|
os.makedirs(os.path.dirname(OUTPUT_PATH) if os.path.dirname(OUTPUT_PATH) else ".", exist_ok=True)
|
|
np.save(
|
|
OUTPUT_PATH,
|
|
{
|
|
"counts": BG_COUNTS,
|
|
"duration": BG_LIVE_TIME,
|
|
"timestamp": time.time(),
|
|
},
|
|
)
|
|
print(f"\n\nBackground sauvegarde : {OUTPUT_PATH}")
|
|
print(f" Duree live : {BG_LIVE_TIME/3600:.1f}h")
|
|
print(f" Total coups : {BG_COUNTS.sum():.0f}")
|
|
print(f" CPS moyen : {BG_COUNTS.sum()/BG_LIVE_TIME:.1f}") |