""" CsI(Tl) detector response continuum calibration for Radiacode 103. Models ONLY the detector's noise continuum. Photopeaks from environmental isotopes depend on measurement location and are NOT part of this model. Uses two approaches: 1. Spline-based: non-parametric, automatically fits any shape 2. Parametric: for the /fit endpoint (comparison with measured data) The spline approach is preferred — it uses scipy's smoothing spline with Generalized Cross-Validation to automatically find the right smoothness, after iterative peak subtraction. """ import json import numpy as np from pathlib import Path from scipy.interpolate import make_smoothing_spline from scipy.signal import savgol_filter from app.config import ENERGY_OFFSET, ENERGY_SLOPE, NUM_CHANNELS PHOTOPEAK_LINES = [ (295.22, 0.1842), (351.93, 0.3560), (609.31, 0.4549), (911.20, 0.2580), (968.97, 0.1580), (1120.29, 0.1492), (1460.83, 0.1066), (1764.49, 0.1531), (2614.51, 0.3586), ] def _sigma_keV(E): return max(12.0, 23.6 * np.sqrt(max(E, 1.0) / 662.0)) def _smooth(y): window = min(51, len(y) // 10 * 2 + 1) if window < 5: window = 5 return savgol_filter(y, window_length=window, polyorder=3) def _subtract_peaks(energy_axis, smoothed_cps): """Iteratively estimate and subtract photopeak contributions.""" continuum = smoothed_cps.copy() peak_amplitudes = [] for line_energy, _ in PHOTOPEAK_LINES: sig = _sigma_keV(line_energy) idx = np.argmin(np.abs(energy_axis - line_energy)) n_sigma = max(int(2 * sig / 2.97), 3) off_lo = continuum[max(0, idx - 3 * n_sigma):max(1, idx - n_sigma)] off_hi = continuum[min(len(continuum), idx + n_sigma):min(len(continuum), idx + 3 * n_sigma)] off_peak = np.concatenate([off_lo, off_hi]) local_bg = np.median(off_peak) if len(off_peak) > 0 else 0 peak_height = continuum[idx] - local_bg if peak_height > 0: amplitude = peak_height * sig * np.sqrt(2 * np.pi) gaussian = amplitude * np.exp(-0.5 * ((energy_axis - line_energy) / sig) ** 2) / (sig * np.sqrt(2 * np.pi)) continuum -= gaussian continuum = np.maximum(continuum, 0) peak_amplitudes.append({"energy_keV": line_energy, "amplitude": float(max(0, peak_height) * sig * np.sqrt(2 * np.pi)) if peak_height > 0 else 0.0}) return continuum, peak_amplitudes def calibrate_spline(measured_cps, energy_axis): """ Fit a smoothing spline to the peak-subtracted continuum. Uses scipy's make_smoothing_spline with GCV (Generalized Cross-Validation) to automatically find the optimal smoothing parameter. Returns a dict with the fitted spline evaluated at all channels. """ E = energy_axis y_smooth = _smooth(measured_cps) continuum, peak_amplitudes = _subtract_peaks(E, y_smooth) # Ensure positive values for spline fitting continuum = np.maximum(continuum, 0) # Use log-space for better fit at low-signal high-energy region # Add small offset to avoid log(0) offset = continuum[continuum > 0].min() * 0.1 if (continuum > 0).any() else 1e-6 log_continuum = np.log(continuum + offset) # Fit smoothing spline in log-space (GCV auto-selects lambda) try: spline = make_smoothing_spline(E, log_continuum) log_fit = spline(E) # Convert back from log-space fit_cps = np.exp(log_fit) - offset fit_cps = np.maximum(fit_cps, 0) except Exception as e: return {"error": str(e)} # Quality metrics residuals = continuum - fit_cps ss_res = np.sum(residuals ** 2) ss_tot = np.sum((continuum - continuum.mean()) ** 2) r_squared = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0 return { "continuum_cps": fit_cps, "peak_amplitudes": peak_amplitudes, "r_squared": float(r_squared), "residuals_rms": float(np.sqrt(np.mean(residuals ** 2))), } def calibrate_background(measured_cps, energy_axis): """ Fit the continuum model using smoothing spline. Returns both spline-based fit and parameters for the /fit endpoint. """ result = calibrate_spline(measured_cps, energy_axis) if "error" in result: return result # The spline result is the continuum CPS array return { "params": {}, # Non-parametric model, no params "continuum_cps": result["continuum_cps"], "peak_amplitudes": result["peak_amplitudes"], "r_squared": result["r_squared"], "residuals_rms": result["residuals_rms"], "method": "smoothing_spline_gcv", } def build_calibrated_continuum(energy_axis, total_counts, params): """Build the continuum from calibrated parameters.""" if "continuum_cps" in params: # Spline-based: already have the CPS array cps = np.array(params["continuum_cps"]) if cps.sum() > 0: return cps * total_counts / cps.sum() return cps return np.zeros(len(energy_axis)) # Cached calibration _cached_result = None _CALIBRATION_PATH = Path("/data/bg_calibration.json") def load_or_calibrate(): """Load cached calibration or fit from measured data.""" global _cached_result if _cached_result is not None: return _cached_result if _CALIBRATION_PATH.exists(): try: with open(_CALIBRATION_PATH) as f: _cached_result = json.load(f) return _cached_result except Exception: pass from app.config import BACKGROUND_PATH, BACKGROUND_SNAPSHOT_PATH measured_counts = None live_time = 0 if BACKGROUND_PATH.exists(): try: bg_data = np.load(str(BACKGROUND_PATH), allow_pickle=True).item() measured_counts = bg_data["counts"].astype(np.float64)[:NUM_CHANNELS] live_time = float(bg_data["duration"]) except Exception: pass if measured_counts is None and BACKGROUND_SNAPSHOT_PATH.exists(): try: with open(BACKGROUND_SNAPSHOT_PATH) as f: snapshot = json.load(f) measured_counts = np.array(snapshot.get("spectrum", [])[:NUM_CHANNELS], dtype=np.float64) live_time = float(snapshot.get("live_time_s", 0)) except Exception: pass if measured_counts is None or live_time < 600: return None channels = np.arange(NUM_CHANNELS, dtype=np.float64) e_axis = ENERGY_OFFSET + ENERGY_SLOPE * channels measured_cps = measured_counts / live_time result = calibrate_background(measured_cps, e_axis) if "error" in result: return None _cached_result = { "continuum_cps": [round(float(c), 6) for c in result["continuum_cps"]], "method": result["method"], "r_squared": result["r_squared"], } _CALIBRATION_PATH.parent.mkdir(parents=True, exist_ok=True) tmp = _CALIBRATION_PATH.with_suffix(".tmp") with open(tmp, "w") as f: json.dump(_cached_result, f, indent=2) tmp.replace(_CALIBRATION_PATH) return _cached_result