Pipeline complet Radiacode 103 - identification automatique d'isotopes

- VegaModel CNN-FCNN 34.5M params, 82 isotopes, val acc 99.89%
- Generation 50k spectres synthetiques 1D (12-24h durees)
- Entrainement 100 epochs sur RTX 5060 Ti (CUDA 12.8, Blackwell)
- Detection continue avec soustraction du background
- Capture background 24h avec gestion deconnexion
- Docker Compose : conteneur train (GPU) + detect (CPU/USB)
- Modele entraite inclus (vega_best.pt, 395 Mo)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jacquin Antoine
2026-05-19 12:29:56 +02:00
commit 745a64b342
52 changed files with 17558 additions and 0 deletions

View File

@ -0,0 +1,18 @@
"""
Synthetic Gamma Spectra Generation Module
This module provides tools for generating realistic synthetic gamma spectra
for training isotope identification models. It simulates detector responses
compatible with Radiacode devices (101, 102, 103, 103G, 110).
Detector Specifications:
- Energy Range: 20 keV to 3000 keV (0.02 - 3 MeV)
- Channels: 1024 (usable: 1023)
- FWHM Resolution: 7.4% - 9.5% @ 662 keV (model dependent)
- Detector Types: CsI(Tl) and GAGG(Ce) scintillators
"""
__version__ = "0.1.0"
__author__ = "Isotope ID ML Project"
from .config import DetectorConfig, RADIACODE_CONFIGS

View File

@ -0,0 +1,142 @@
"""
Detector Configuration Module
Contains configuration parameters for Radiacode gamma spectrometers
and other detector settings.
"""
from dataclasses import dataclass, field
from typing import Dict, Optional
import numpy as np
@dataclass
class DetectorConfig:
"""Configuration for a gamma spectrometer detector."""
name: str
# Energy range in keV
energy_min_kev: float = 20.0
energy_max_kev: float = 3000.0
# Number of channels
num_channels: int = 1024
# Some devices/software workflows treat channel 0 as unreliable/noisy.
# This project models "usable" channels by skipping the first raw channel.
skip_first_channel: bool = True
# FWHM at 662 keV (Cs-137 reference) as fraction
fwhm_at_662: float = 0.084 # 8.4%
fwhm_uncertainty: float = 0.003 # ±0.3%
# Detector crystal type
crystal_type: str = "CsI(Tl)"
# Sensitivity: counts per second at 1 μSv/h for Cs-137
sensitivity_cps_per_usvh: float = 30.0
# Detector volume in cm³
detector_volume_cm3: float = 1.0
def get_channel_width_kev(self) -> float:
"""Get the width of each channel in keV."""
return (self.energy_max_kev - self.energy_min_kev) / self.num_channels
def get_energy_bins(self) -> np.ndarray:
"""Get array of energy bin centers (keV) for the modeled usable channels."""
channel_width = self.get_channel_width_kev()
# Raw device channels are assumed to be 0..num_channels-1 with centers:
# E_center(k) = E_min + (k + 0.5) * channel_width
# If we skip the first raw channel (k=0), we model usable channels k=1..num_channels-1.
start_raw_channel = 1 if self.skip_first_channel else 0
raw_channels = np.arange(start_raw_channel, self.num_channels, dtype=np.float64)
return self.energy_min_kev + (raw_channels + 0.5) * channel_width
def get_fwhm_at_energy(self, energy_kev: float) -> float:
"""
Calculate FWHM at a given energy.
For scintillators, FWHM scales approximately as sqrt(E).
FWHM(E) = FWHM_662 * sqrt(662/E) * E / 662 = FWHM_662 * sqrt(E/662)
"""
return self.fwhm_at_662 * np.sqrt(662.0 / energy_kev) * energy_kev
def get_sigma_at_energy(self, energy_kev: float) -> float:
"""
Get Gaussian sigma at a given energy.
sigma = FWHM / (2 * sqrt(2 * ln(2))) ≈ FWHM / 2.355
"""
fwhm = self.get_fwhm_at_energy(energy_kev)
return fwhm / 2.355
def energy_to_channel(self, energy_kev: float) -> int:
"""Convert energy in keV to modeled usable channel index."""
channel_width = self.get_channel_width_kev()
raw_channel = int((energy_kev - self.energy_min_kev) / channel_width)
if self.skip_first_channel:
channel = raw_channel - 1
max_channel = self.num_channels - 2
else:
channel = raw_channel
max_channel = self.num_channels - 1
return max(0, min(max_channel, channel))
def channel_to_energy(self, channel: int) -> float:
"""Convert modeled usable channel index to energy bin center (keV)."""
channel_width = self.get_channel_width_kev()
raw_channel = channel + (1 if self.skip_first_channel else 0)
raw_channel = max(0, min(self.num_channels - 1, int(raw_channel)))
return self.energy_min_kev + (raw_channel + 0.5) * channel_width
# Pre-defined configurations for Radiacode devices
RADIACODE_CONFIGS: Dict[str, DetectorConfig] = {
"radiacode_101": DetectorConfig(
name="Radiacode 101",
fwhm_at_662=0.095, # 9.5% (original model, similar to 102)
fwhm_uncertainty=0.004,
crystal_type="CsI(Tl)",
sensitivity_cps_per_usvh=30.0,
detector_volume_cm3=1.0,
),
"radiacode_102": DetectorConfig(
name="Radiacode 102",
fwhm_at_662=0.095, # 9.5%
fwhm_uncertainty=0.004,
crystal_type="CsI(Tl)",
sensitivity_cps_per_usvh=30.0,
detector_volume_cm3=1.0,
),
"radiacode_103": DetectorConfig(
name="Radiacode 103",
fwhm_at_662=0.084, # 8.4%
fwhm_uncertainty=0.003,
crystal_type="CsI(Tl)",
sensitivity_cps_per_usvh=30.0,
detector_volume_cm3=1.0,
),
"radiacode_103g": DetectorConfig(
name="Radiacode 103G",
energy_min_kev=25.0, # Tech spec lists 0.025…3 MeV
fwhm_at_662=0.074, # 7.4% (GAGG crystal - better resolution)
fwhm_uncertainty=0.003,
crystal_type="GAGG(Ce)",
sensitivity_cps_per_usvh=40.0,
detector_volume_cm3=1.0,
),
"radiacode_110": DetectorConfig(
name="Radiacode 110",
fwhm_at_662=0.084, # 8.4%
fwhm_uncertainty=0.003,
crystal_type="CsI(Tl)",
sensitivity_cps_per_usvh=77.0, # Higher sensitivity
detector_volume_cm3=2.5, # Larger crystal
),
}
def get_default_config() -> DetectorConfig:
"""Get the default detector configuration (Radiacode 103)."""
return RADIACODE_CONFIGS["radiacode_103"]

View File

@ -0,0 +1,418 @@
"""
Synthetic Spectra Generation Script
This script generates synthetic gamma spectra for training isotope identification models.
Usage:
python generate_spectra.py --num_samples 10 --output_dir ./data/synthetic
Output:
- data/synthetic/spectra/*.npy - Spectrum arrays (time x 1023 channels)
- data/synthetic/spectra/*.png - Visual representations (optional)
- data/synthetic/labels.json - Annotations for all samples
"""
import argparse
import sys
from pathlib import Path
import json
from datetime import datetime
import numpy as np
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from synthetic_spectra.generator import (
SpectrumGenerator,
SpectrumConfig,
IsotopeSource,
GeneratedSpectrum,
save_spectrum,
generate_labels_json,
)
from synthetic_spectra.config import RADIACODE_CONFIGS
from synthetic_spectra.ground_truth import (
get_all_isotopes,
get_isotopes_by_category,
IsotopeCategory,
DECAY_CHAINS,
)
def get_common_isotope_pool() -> list:
"""Get a pool of commonly encountered isotopes for realistic training data."""
common_isotopes = [
# Calibration sources (very common in spectra)
"Cs-137", "Co-60", "Am-241", "Ba-133", "Eu-152", "Na-22", "Co-57",
# Medical isotopes (occasionally encountered)
"Tc-99m", "I-131", "I-123", "F-18", "Ga-67", "In-111", "Lu-177",
# Natural background (always present to some degree)
"K-40", "Pb-214", "Bi-214", "Pb-212", "Bi-212", "Tl-208", "Ac-228",
# Industrial sources
"Ir-192", "Se-75", "Mn-54", "Zn-65",
# Uranium/Thorium (NORM)
"U-235", "Ra-226", "Th-232",
# Reactor/Fallout
"Cs-134", "Sb-125", "Ce-144", "Co-58",
]
# Filter to only isotopes in our database with gamma lines
from synthetic_spectra.ground_truth import get_isotope
valid_isotopes = []
for name in common_isotopes:
iso = get_isotope(name)
if iso and len(iso.gamma_lines) > 0:
valid_isotopes.append(name)
return valid_isotopes
def generate_single_isotope_sample(
generator: SpectrumGenerator,
isotope_name: str,
activity_bq: float,
duration_seconds: float,
**kwargs
) -> GeneratedSpectrum:
"""Generate a clean sample with a single isotope."""
config = SpectrumConfig(
duration_seconds=duration_seconds,
sources=[
IsotopeSource(
isotope_name=isotope_name,
activity_bq=activity_bq,
include_daughters=True
)
],
**kwargs
)
return generator.generate_spectrum(config)
def generate_mixed_isotope_sample(
generator: SpectrumGenerator,
isotope_names: list,
activities_bq: list,
duration_seconds: float,
**kwargs
) -> GeneratedSpectrum:
"""Generate a sample with multiple blended isotopes."""
sources = [
IsotopeSource(
isotope_name=name,
activity_bq=activity,
include_daughters=True
)
for name, activity in zip(isotope_names, activities_bq)
]
config = SpectrumConfig(
duration_seconds=duration_seconds,
sources=sources,
**kwargs
)
return generator.generate_spectrum(config)
def generate_training_batch(
num_samples: int,
output_dir: Path,
detector_name: str = "radiacode_103",
duration_range: tuple = (60, 300),
activity_range: tuple = (1.0, 100.0),
single_isotope_fraction: float = 0.4,
dual_isotope_fraction: float = 0.3,
multi_isotope_fraction: float = 0.2,
background_only_fraction: float = 0.1,
save_png: bool = False,
random_seed: int = None,
) -> list:
"""
Generate a batch of training samples with various configurations.
Args:
num_samples: Total number of samples to generate
output_dir: Output directory for spectra and labels
detector_name: Radiacode device to simulate
duration_range: (min, max) duration in seconds
activity_range: (min, max) source activity in Bq
single_isotope_fraction: Fraction of single-isotope samples
dual_isotope_fraction: Fraction of two-isotope samples
multi_isotope_fraction: Fraction of 3+ isotope samples
background_only_fraction: Fraction of background-only samples
save_png: Whether to also save PNG images
random_seed: Random seed for reproducibility
Returns:
List of generated spectra
"""
if random_seed is not None:
np.random.seed(random_seed)
# Create output directories
output_dir = Path(output_dir)
spectra_dir = output_dir / "spectra"
spectra_dir.mkdir(parents=True, exist_ok=True)
# Initialize generator
generator = SpectrumGenerator(
detector_config=RADIACODE_CONFIGS.get(detector_name),
random_seed=random_seed
)
# Get isotope pool
isotope_pool = get_common_isotope_pool()
print(f"Using isotope pool with {len(isotope_pool)} isotopes")
# Calculate sample counts for each category
n_single = int(num_samples * single_isotope_fraction)
n_dual = int(num_samples * dual_isotope_fraction)
n_multi = int(num_samples * multi_isotope_fraction)
n_background = int(num_samples * background_only_fraction)
# Adjust to ensure we hit exactly num_samples
remaining = num_samples - (n_single + n_dual + n_multi + n_background)
n_single += remaining
total_generated = 0
print(f"\nGenerating {num_samples} synthetic spectra:")
print(f" - Single isotope: {n_single}")
print(f" - Dual isotope: {n_dual}")
print(f" - Multi isotope (3+): {n_multi}")
print(f" - Background only: {n_background}")
print()
sample_num = 0
# Generate single isotope samples
print("Generating single-isotope samples...")
for i in range(n_single):
isotope = np.random.choice(isotope_pool)
activity = np.random.uniform(*activity_range)
duration = np.random.uniform(*duration_range)
spectrum = generate_single_isotope_sample(
generator,
isotope,
activity,
duration,
detector_name=detector_name,
include_background=True,
)
# Save spectrum (don't accumulate in memory)
save_spectrum(
spectrum,
spectra_dir,
save_image=True,
image_format='npy'
)
del spectrum # Free memory immediately
sample_num += 1
if sample_num % 100 == 0:
print(f" Generated {sample_num}/{num_samples} samples...")
# Generate dual isotope samples
print("Generating dual-isotope samples...")
for i in range(n_dual):
isotopes = np.random.choice(isotope_pool, size=2, replace=False)
activities = [np.random.uniform(*activity_range) for _ in range(2)]
duration = np.random.uniform(*duration_range)
spectrum = generate_mixed_isotope_sample(
generator,
list(isotopes),
activities,
duration,
detector_name=detector_name,
include_background=True,
)
save_spectrum(
spectrum,
spectra_dir,
save_image=True,
image_format='npy'
)
del spectrum
sample_num += 1
if sample_num % 100 == 0:
print(f" Generated {sample_num}/{num_samples} samples...")
# Generate multi-isotope samples
print("Generating multi-isotope samples...")
for i in range(n_multi):
num_isotopes = np.random.randint(3, min(6, len(isotope_pool)))
isotopes = np.random.choice(isotope_pool, size=num_isotopes, replace=False)
activities = [np.random.uniform(*activity_range) for _ in range(num_isotopes)]
duration = np.random.uniform(*duration_range)
spectrum = generate_mixed_isotope_sample(
generator,
list(isotopes),
activities,
duration,
detector_name=detector_name,
include_background=True,
)
save_spectrum(
spectrum,
spectra_dir,
save_image=True,
image_format='npy'
)
del spectrum
sample_num += 1
if sample_num % 100 == 0:
print(f" Generated {sample_num}/{num_samples} samples...")
# Generate background-only samples
print("Generating background-only samples...")
for i in range(n_background):
duration = np.random.uniform(*duration_range)
config = SpectrumConfig(
duration_seconds=duration,
sources=[], # No additional sources
include_background=True,
detector_name=detector_name,
)
spectrum = generator.generate_spectrum(config)
save_spectrum(
spectrum,
spectra_dir,
save_image=True,
image_format='npy'
)
del spectrum
sample_num += 1
total_generated = sample_num
print(f"\nGenerated {total_generated} samples total")
def main():
parser = argparse.ArgumentParser(
description="Generate synthetic gamma spectra for ML training"
)
parser.add_argument(
"--num_samples",
type=int,
default=10,
help="Number of samples to generate (default: 10)"
)
parser.add_argument(
"--output_dir",
type=str,
default="O:/master_data_collection/isotopev2",
help="Output directory (default: O:/master_data_collection/isotopev2)"
)
parser.add_argument(
"--detector",
type=str,
default="radiacode_103",
choices=list(RADIACODE_CONFIGS.keys()),
help="Detector to simulate (default: radiacode_103)"
)
parser.add_argument(
"--min_duration",
type=float,
default=60,
help="Minimum spectrum duration in seconds (default: 60)"
)
parser.add_argument(
"--max_duration",
type=float,
default=300,
help="Maximum spectrum duration in seconds (default: 300)"
)
parser.add_argument(
"--min_activity",
type=float,
default=1.0,
help="Minimum source activity in Bq (default: 1.0)"
)
parser.add_argument(
"--max_activity",
type=float,
default=100.0,
help="Maximum source activity in Bq (default: 100.0)"
)
parser.add_argument(
"--save_png",
action="store_true",
help="Also save PNG images of spectra"
)
parser.add_argument(
"--seed",
type=int,
default=None,
help="Random seed for reproducibility"
)
args = parser.parse_args()
print("=" * 60)
print("Synthetic Gamma Spectra Generator")
print("=" * 60)
print(f"Samples to generate: {args.num_samples}")
print(f"Output directory: {args.output_dir}")
print(f"Detector: {args.detector}")
print(f"Duration range: {args.min_duration}-{args.max_duration} seconds")
print(f"Activity range: {args.min_activity}-{args.max_activity} Bq")
print(f"Random seed: {args.seed}")
print("=" * 60)
generate_training_batch(
num_samples=args.num_samples,
output_dir=Path(args.output_dir),
detector_name=args.detector,
duration_range=(args.min_duration, args.max_duration),
activity_range=(args.min_activity, args.max_activity),
save_png=args.save_png,
random_seed=args.seed,
)
print("\n" + "=" * 60)
print("Generation complete!")
print("=" * 60)
# Count generated files
spectra_dir = Path(args.output_dir) / "spectra"
npy_files = list(spectra_dir.glob("spectrum_*.npy"))
print(f"\nTotal samples generated: {len(npy_files)}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,526 @@
"""
Synthetic Spectra Generation Script v2
Improvements over v1:
- Parallel generation using multiprocessing for 10x+ speedup
- Class-balanced isotope sampling to ensure all isotopes are represented
- More variable background noise (intensity, composition)
- Memory efficient - doesn't accumulate spectra in memory
- Progress bar with ETA
Usage:
python -m synthetic_spectra.generate_spectra_v2 --num_samples 100000 --workers 8
"""
import argparse
import sys
from pathlib import Path
import json
from datetime import datetime
import numpy as np
from multiprocessing import Pool, cpu_count
from functools import partial
import time
from typing import List, Tuple, Dict, Optional
import os
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from synthetic_spectra.generator import (
SpectrumGenerator,
SpectrumConfig,
IsotopeSource,
GeneratedSpectrum,
save_spectrum,
)
from synthetic_spectra.config import RADIACODE_CONFIGS
from synthetic_spectra.ground_truth import get_isotope
# =============================================================================
# ISOTOPE POOL WITH CATEGORIES FOR BALANCED SAMPLING
# =============================================================================
ISOTOPE_CATEGORIES = {
"calibration": [
"Cs-137", "Co-60", "Am-241", "Ba-133", "Eu-152", "Na-22", "Co-57", "Mn-54"
],
"medical": [
"Tc-99m", "I-131", "I-123", "F-18", "Ga-67", "Ga-68", "In-111", "Lu-177", "Tl-201"
],
"industrial": [
"Ir-192", "Se-75", "Zn-65", "Co-58", "Cd-109"
],
"natural_background": [
"K-40", "Ra-226", "U-235", "U-238", "Th-232"
],
"decay_chain_u238": [
"Pb-214", "Bi-214", "Pb-210"
],
"decay_chain_th232": [
"Pb-212", "Bi-212", "Tl-208", "Ac-228", "Ra-224"
],
"reactor_fallout": [
"Cs-134", "I-131", "Sr-90", "Zr-95", "Nb-95", "Ru-103", "Ce-141", "Ce-144", "Sb-125"
],
}
def get_valid_isotope_pool() -> Tuple[List[str], Dict[str, List[str]]]:
"""
Get all valid isotopes (with gamma lines) organized by category.
Returns:
Tuple of (flat_list, category_dict)
"""
valid_categories = {}
all_isotopes = []
for category, isotopes in ISOTOPE_CATEGORIES.items():
valid = []
for name in isotopes:
iso = get_isotope(name)
if iso and len(iso.gamma_lines) > 0:
valid.append(name)
if name not in all_isotopes:
all_isotopes.append(name)
valid_categories[category] = valid
return all_isotopes, valid_categories
# =============================================================================
# BACKGROUND VARIATION
# =============================================================================
class BackgroundConfig:
"""Configuration for varied background generation."""
def __init__(
self,
intensity_min: float = 0.3,
intensity_max: float = 3.0,
k40_prob: float = 0.95, # Almost always present
radon_prob: float = 0.8, # Usually present indoors
thorium_prob: float = 0.6, # Sometimes present
):
self.intensity_min = intensity_min
self.intensity_max = intensity_max
self.k40_prob = k40_prob
self.radon_prob = radon_prob
self.thorium_prob = thorium_prob
def sample(self, rng: np.random.Generator) -> dict:
"""Sample a random background configuration."""
return {
'background_cps': rng.uniform(self.intensity_min, self.intensity_max) * 5.0,
'include_k40': rng.random() < self.k40_prob,
'include_radon': rng.random() < self.radon_prob,
'include_thorium': rng.random() < self.thorium_prob,
}
# =============================================================================
# SINGLE SAMPLE GENERATION (for parallel workers)
# =============================================================================
def generate_single_sample(
args: Tuple[int, dict]
) -> Optional[str]:
"""
Generate a single sample. Designed to be called by worker processes.
Args:
args: Tuple of (sample_index, config_dict)
Returns:
Sample ID if successful, None if failed
"""
sample_idx, config = args
try:
# Create RNG with unique seed per sample
rng = np.random.default_rng(config['base_seed'] + sample_idx)
# Initialize generator (each worker creates its own)
detector_config = RADIACODE_CONFIGS.get(config['detector_name'])
generator = SpectrumGenerator(detector_config=detector_config)
# Determine sample type based on distribution
sample_type = config['sample_types'][sample_idx % len(config['sample_types'])]
# Get isotopes for this sample
isotope_pool = config['isotope_pool']
category_pools = config['category_pools']
# Sample background configuration
bg_config = BackgroundConfig(
intensity_min=config.get('bg_intensity_min', 0.3),
intensity_max=config.get('bg_intensity_max', 3.0),
)
bg_params = bg_config.sample(rng)
# Random duration
duration = rng.uniform(*config['duration_range'])
# Build sources based on sample type
sources = []
if sample_type == 'single':
# For class balance, cycle through isotopes
isotope_idx = sample_idx % len(isotope_pool)
isotope = isotope_pool[isotope_idx]
activity = rng.uniform(*config['activity_range'])
sources.append(IsotopeSource(
isotope_name=isotope,
activity_bq=activity,
include_daughters=True
))
elif sample_type == 'dual':
# Pick from different categories for variety
categories = list(category_pools.keys())
cat1, cat2 = rng.choice(categories, size=2, replace=True)
iso1 = rng.choice(category_pools[cat1]) if category_pools[cat1] else rng.choice(isotope_pool)
iso2 = rng.choice(category_pools[cat2]) if category_pools[cat2] else rng.choice(isotope_pool)
# Ensure different isotopes
while iso2 == iso1:
iso2 = rng.choice(isotope_pool)
for iso in [iso1, iso2]:
activity = rng.uniform(*config['activity_range'])
sources.append(IsotopeSource(
isotope_name=iso,
activity_bq=activity,
include_daughters=True
))
elif sample_type == 'multi':
# 3-5 isotopes from various categories
num_isotopes = rng.integers(3, 6)
selected = set()
for _ in range(num_isotopes):
cat = rng.choice(list(category_pools.keys()))
pool = category_pools[cat] if category_pools[cat] else isotope_pool
iso = rng.choice(pool)
# Avoid duplicates
attempts = 0
while iso in selected and attempts < 10:
iso = rng.choice(isotope_pool)
attempts += 1
if iso not in selected:
selected.add(iso)
activity = rng.uniform(*config['activity_range'])
sources.append(IsotopeSource(
isotope_name=iso,
activity_bq=activity,
include_daughters=True
))
# elif sample_type == 'background': sources stays empty
# Create spectrum config
spec_config = SpectrumConfig(
duration_seconds=duration,
sources=sources,
include_background=True,
background_cps=bg_params['background_cps'],
include_k40=bg_params['include_k40'],
include_radon=bg_params['include_radon'],
include_thorium=bg_params['include_thorium'],
detector_name=config['detector_name'],
)
# Generate spectrum
spectrum = generator.generate_spectrum(spec_config)
# Save spectrum
output_dir = Path(config['output_dir']) / "spectra"
save_spectrum(
spectrum,
output_dir,
save_image=True,
image_format='npy' # Skip PNG for speed
)
return spectrum.sample_id
except Exception as e:
print(f"Error generating sample {sample_idx}: {e}")
return None
# =============================================================================
# MAIN BATCH GENERATION
# =============================================================================
def generate_training_batch_parallel(
num_samples: int,
output_dir: Path,
detector_name: str = "radiacode_103",
duration_range: Tuple[float, float] = (60, 300),
activity_range: Tuple[float, float] = (1.0, 100.0),
single_isotope_fraction: float = 0.40,
dual_isotope_fraction: float = 0.30,
multi_isotope_fraction: float = 0.20,
background_only_fraction: float = 0.10,
bg_intensity_range: Tuple[float, float] = (0.3, 3.0),
num_workers: int = None,
random_seed: int = None,
chunk_size: int = 100,
) -> int:
"""
Generate training samples in parallel.
Args:
num_samples: Total number of samples to generate
output_dir: Output directory
detector_name: Detector to simulate
duration_range: (min, max) duration in seconds
activity_range: (min, max) activity in Bq
single_isotope_fraction: Fraction of single-isotope samples
dual_isotope_fraction: Fraction of dual-isotope samples
multi_isotope_fraction: Fraction of multi-isotope samples
background_only_fraction: Fraction of background-only samples
bg_intensity_range: (min, max) background intensity multiplier
num_workers: Number of parallel workers (default: CPU count - 1)
random_seed: Base random seed
chunk_size: Number of samples per worker batch
Returns:
Number of successfully generated samples
"""
if num_workers is None:
num_workers = max(1, cpu_count() - 1)
if random_seed is None:
random_seed = int(time.time())
# Create output directory
output_dir = Path(output_dir)
spectra_dir = output_dir / "spectra"
spectra_dir.mkdir(parents=True, exist_ok=True)
# Get isotope pools
isotope_pool, category_pools = get_valid_isotope_pool()
print(f"Isotope pool: {len(isotope_pool)} isotopes across {len(category_pools)} categories")
# Calculate sample counts
n_single = int(num_samples * single_isotope_fraction)
n_dual = int(num_samples * dual_isotope_fraction)
n_multi = int(num_samples * multi_isotope_fraction)
n_background = int(num_samples * background_only_fraction)
# Adjust to hit exact count
remaining = num_samples - (n_single + n_dual + n_multi + n_background)
n_single += remaining
# Create sample type list (shuffled for variety in batches)
sample_types = (
['single'] * n_single +
['dual'] * n_dual +
['multi'] * n_multi +
['background'] * n_background
)
np.random.seed(random_seed)
np.random.shuffle(sample_types)
print(f"\nGenerating {num_samples} samples with {num_workers} workers:")
print(f" - Single isotope: {n_single} ({single_isotope_fraction*100:.0f}%)")
print(f" - Dual isotope: {n_dual} ({dual_isotope_fraction*100:.0f}%)")
print(f" - Multi isotope: {n_multi} ({multi_isotope_fraction*100:.0f}%)")
print(f" - Background only: {n_background} ({background_only_fraction*100:.0f}%)")
print(f" - Background intensity: {bg_intensity_range[0]:.1f}x - {bg_intensity_range[1]:.1f}x")
print()
# Shared config for all workers
shared_config = {
'detector_name': detector_name,
'output_dir': str(output_dir),
'duration_range': duration_range,
'activity_range': activity_range,
'bg_intensity_min': bg_intensity_range[0],
'bg_intensity_max': bg_intensity_range[1],
'base_seed': random_seed,
'isotope_pool': isotope_pool,
'category_pools': category_pools,
'sample_types': sample_types,
}
# Generate samples in parallel
start_time = time.time()
successful = 0
# Create argument list
args_list = [(i, shared_config) for i in range(num_samples)]
# Use multiprocessing pool
with Pool(processes=num_workers) as pool:
# Process in chunks and report progress
for i in range(0, num_samples, chunk_size):
chunk_end = min(i + chunk_size, num_samples)
chunk_args = args_list[i:chunk_end]
results = pool.map(generate_single_sample, chunk_args)
chunk_success = sum(1 for r in results if r is not None)
successful += chunk_success
# Progress report
elapsed = time.time() - start_time
rate = successful / elapsed if elapsed > 0 else 0
eta = (num_samples - successful) / rate if rate > 0 else 0
print(f" Progress: {successful}/{num_samples} ({100*successful/num_samples:.1f}%) | "
f"Rate: {rate:.1f} samples/s | ETA: {eta/60:.1f} min")
total_time = time.time() - start_time
print(f"\n{'='*60}")
print(f"Generation complete!")
print(f" Total samples: {successful}/{num_samples}")
print(f" Total time: {total_time/60:.1f} minutes")
print(f" Average rate: {successful/total_time:.1f} samples/second")
print(f"{'='*60}")
return successful
def main():
parser = argparse.ArgumentParser(
description="Generate synthetic gamma spectra (v2 - parallel, balanced)"
)
parser.add_argument(
"--num_samples", "-n",
type=int,
default=100000,
help="Number of samples to generate (default: 100000)"
)
parser.add_argument(
"--output_dir", "-o",
type=str,
default="O:/master_data_collection/isotopev2",
help="Output directory (default: O:/master_data_collection/isotopev2)"
)
parser.add_argument(
"--detector",
type=str,
default="radiacode_103",
choices=list(RADIACODE_CONFIGS.keys()),
help="Detector to simulate (default: radiacode_103)"
)
parser.add_argument(
"--workers", "-w",
type=int,
default=None,
help="Number of parallel workers (default: CPU count - 1)"
)
parser.add_argument(
"--min_duration",
type=float,
default=60,
help="Minimum duration in seconds (default: 60)"
)
parser.add_argument(
"--max_duration",
type=float,
default=300,
help="Maximum duration in seconds (default: 300)"
)
parser.add_argument(
"--min_activity",
type=float,
default=1.0,
help="Minimum activity in Bq (default: 1.0)"
)
parser.add_argument(
"--max_activity",
type=float,
default=100.0,
help="Maximum activity in Bq (default: 100.0)"
)
parser.add_argument(
"--bg_min",
type=float,
default=0.3,
help="Minimum background intensity multiplier (default: 0.3)"
)
parser.add_argument(
"--bg_max",
type=float,
default=3.0,
help="Maximum background intensity multiplier (default: 3.0)"
)
parser.add_argument(
"--seed",
type=int,
default=None,
help="Random seed for reproducibility"
)
parser.add_argument(
"--chunk_size",
type=int,
default=100,
help="Samples per progress update (default: 100)"
)
# Sample type fractions
parser.add_argument("--single_frac", type=float, default=0.40)
parser.add_argument("--dual_frac", type=float, default=0.30)
parser.add_argument("--multi_frac", type=float, default=0.20)
parser.add_argument("--bg_frac", type=float, default=0.10)
args = parser.parse_args()
print("=" * 60)
print("Synthetic Gamma Spectra Generator v2")
print(" - Parallel processing")
print(" - Class-balanced sampling")
print(" - Variable background")
print("=" * 60)
print(f"Samples: {args.num_samples:,}")
print(f"Workers: {args.workers or (cpu_count() - 1)}")
print(f"Output: {args.output_dir}")
print(f"Detector: {args.detector}")
print(f"Duration: {args.min_duration}-{args.max_duration}s")
print(f"Activity: {args.min_activity}-{args.max_activity} Bq")
print(f"Background: {args.bg_min}x-{args.bg_max}x")
print("=" * 60)
generate_training_batch_parallel(
num_samples=args.num_samples,
output_dir=Path(args.output_dir),
detector_name=args.detector,
duration_range=(args.min_duration, args.max_duration),
activity_range=(args.min_activity, args.max_activity),
single_isotope_fraction=args.single_frac,
dual_isotope_fraction=args.dual_frac,
multi_isotope_fraction=args.multi_frac,
background_only_fraction=args.bg_frac,
bg_intensity_range=(args.bg_min, args.bg_max),
num_workers=args.workers,
random_seed=args.seed,
chunk_size=args.chunk_size,
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,577 @@
"""
Synthetic Spectra Generation Script v3
Optimized for 2D model training with:
- Fixed 60-second duration (60 time intervals)
- Better isotope combinations including decay chain scenarios
- Enhanced background-only samples
- More diverse mixing scenarios
Usage:
python -m synthetic_spectra.generate_spectra_v3 --num_samples 200000 --workers 8
"""
import argparse
import sys
from pathlib import Path
import json
from datetime import datetime
import numpy as np
from multiprocessing import Pool, cpu_count
from functools import partial
import time
from typing import List, Tuple, Dict, Optional
import os
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from synthetic_spectra.generator import (
SpectrumGenerator,
SpectrumConfig,
IsotopeSource,
GeneratedSpectrum,
save_spectrum,
)
from synthetic_spectra.config import RADIACODE_CONFIGS
from synthetic_spectra.ground_truth import get_isotope
# =============================================================================
# ISOTOPE POOLS - Organized for realistic scenarios
# =============================================================================
# Calibration/check sources (individual isotopes)
CALIBRATION_ISOTOPES = [
"Cs-137", "Co-60", "Am-241", "Ba-133", "Eu-152", "Na-22", "Co-57", "Mn-54"
]
# Medical isotopes (often found individually)
MEDICAL_ISOTOPES = [
"Tc-99m", "I-131", "I-123", "F-18", "Ga-67", "Ga-68", "In-111", "Lu-177", "Tl-201"
]
# Industrial sources
INDUSTRIAL_ISOTOPES = [
"Ir-192", "Se-75", "Zn-65", "Co-58", "Cd-109"
]
# Natural decay chains - these ALWAYS appear together in nature
URANIUM_238_CHAIN = ["U-238", "Ra-226", "Pb-214", "Bi-214"] # Secular equilibrium
THORIUM_232_CHAIN = ["Th-232", "Ac-228", "Pb-212", "Bi-212", "Tl-208"]
URANIUM_235_CHAIN = ["U-235"] # Daughters have low gamma yield
# Fallout/contamination (often appear in specific combinations)
CHERNOBYL_FUKUSHIMA = ["Cs-137", "Cs-134"] # Classic reactor fallout signature
FRESH_FALLOUT = ["I-131", "Cs-137", "Cs-134", "Zr-95", "Nb-95"]
OLDER_FALLOUT = ["Cs-137", "Sr-90"] # Long-lived only
# Natural background (what you'd see with no source)
NATURAL_BACKGROUND = ["K-40"] # Potassium in environment
# NORM - Naturally Occurring Radioactive Material
NORM_MATERIALS = ["K-40", "Ra-226", "Th-232", "U-238"]
def get_valid_isotopes(isotope_list: List[str]) -> List[str]:
"""Filter to isotopes with gamma lines."""
valid = []
for name in isotope_list:
iso = get_isotope(name)
if iso and len(iso.gamma_lines) > 0:
valid.append(name)
return valid
# Pre-validate all pools
VALID_CALIBRATION = get_valid_isotopes(CALIBRATION_ISOTOPES)
VALID_MEDICAL = get_valid_isotopes(MEDICAL_ISOTOPES)
VALID_INDUSTRIAL = get_valid_isotopes(INDUSTRIAL_ISOTOPES)
VALID_U238_CHAIN = get_valid_isotopes(URANIUM_238_CHAIN)
VALID_TH232_CHAIN = get_valid_isotopes(THORIUM_232_CHAIN)
VALID_FALLOUT = get_valid_isotopes(CHERNOBYL_FUKUSHIMA + FRESH_FALLOUT)
VALID_NORM = get_valid_isotopes(NORM_MATERIALS)
# All valid isotopes for random selection
ALL_VALID_ISOTOPES = list(set(
VALID_CALIBRATION + VALID_MEDICAL + VALID_INDUSTRIAL +
VALID_U238_CHAIN + VALID_TH232_CHAIN + VALID_FALLOUT + VALID_NORM
))
# =============================================================================
# SAMPLE SCENARIOS
# =============================================================================
class SampleScenario:
"""Defines a type of sample to generate."""
def __init__(self, name: str, fraction: float):
self.name = name
self.fraction = fraction
def generate_sources(self, rng: np.random.Generator, activity_range: Tuple[float, float]) -> List[IsotopeSource]:
"""Generate isotope sources for this scenario."""
raise NotImplementedError
class BackgroundOnlyScenario(SampleScenario):
"""Pure background - no identifiable sources."""
def __init__(self, fraction: float = 0.15):
super().__init__("background_only", fraction)
def generate_sources(self, rng, activity_range) -> List[IsotopeSource]:
return [] # No sources - just background
class SingleCalibrationScenario(SampleScenario):
"""Single calibration source."""
def __init__(self, fraction: float = 0.20):
super().__init__("single_calibration", fraction)
def generate_sources(self, rng, activity_range) -> List[IsotopeSource]:
isotope = rng.choice(VALID_CALIBRATION)
activity = rng.uniform(*activity_range)
return [IsotopeSource(isotope, activity, include_daughters=True)]
class SingleMedicalScenario(SampleScenario):
"""Single medical isotope."""
def __init__(self, fraction: float = 0.10):
super().__init__("single_medical", fraction)
def generate_sources(self, rng, activity_range) -> List[IsotopeSource]:
if not VALID_MEDICAL:
return []
isotope = rng.choice(VALID_MEDICAL)
activity = rng.uniform(*activity_range)
return [IsotopeSource(isotope, activity, include_daughters=True)]
class SingleIndustrialScenario(SampleScenario):
"""Single industrial source."""
def __init__(self, fraction: float = 0.05):
super().__init__("single_industrial", fraction)
def generate_sources(self, rng, activity_range) -> List[IsotopeSource]:
if not VALID_INDUSTRIAL:
return []
isotope = rng.choice(VALID_INDUSTRIAL)
activity = rng.uniform(*activity_range)
return [IsotopeSource(isotope, activity, include_daughters=True)]
class UraniumChainScenario(SampleScenario):
"""Natural uranium with decay chain in equilibrium."""
def __init__(self, fraction: float = 0.08):
super().__init__("uranium_chain", fraction)
def generate_sources(self, rng, activity_range) -> List[IsotopeSource]:
# All daughters at ~same activity (secular equilibrium)
base_activity = rng.uniform(*activity_range)
sources = []
for iso in VALID_U238_CHAIN:
# Slight variation to simulate real-world
activity = base_activity * rng.uniform(0.8, 1.2)
sources.append(IsotopeSource(iso, activity, include_daughters=False))
return sources
class ThoriumChainScenario(SampleScenario):
"""Natural thorium with decay chain."""
def __init__(self, fraction: float = 0.08):
super().__init__("thorium_chain", fraction)
def generate_sources(self, rng, activity_range) -> List[IsotopeSource]:
base_activity = rng.uniform(*activity_range)
sources = []
for iso in VALID_TH232_CHAIN:
activity = base_activity * rng.uniform(0.8, 1.2)
sources.append(IsotopeSource(iso, activity, include_daughters=False))
return sources
class NORMScenario(SampleScenario):
"""NORM - naturally occurring radioactive material (multiple natural isotopes)."""
def __init__(self, fraction: float = 0.08):
super().__init__("norm", fraction)
def generate_sources(self, rng, activity_range) -> List[IsotopeSource]:
# Pick 2-4 NORM isotopes
num_isotopes = rng.integers(2, 5)
selected = rng.choice(VALID_NORM, size=min(num_isotopes, len(VALID_NORM)), replace=False)
sources = []
for iso in selected:
activity = rng.uniform(*activity_range)
sources.append(IsotopeSource(iso, activity, include_daughters=True))
return sources
class FalloutScenario(SampleScenario):
"""Reactor fallout signature (Cs-137 + Cs-134 fingerprint)."""
def __init__(self, fraction: float = 0.06):
super().__init__("fallout", fraction)
def generate_sources(self, rng, activity_range) -> List[IsotopeSource]:
sources = []
# Cs-137/Cs-134 ratio varies with age of fallout
cs137_activity = rng.uniform(*activity_range)
# Fresh fallout: ~1:1 ratio, aged: Cs-134 decays faster
age_factor = rng.uniform(0.1, 1.0) # How "fresh" the fallout is
cs134_activity = cs137_activity * age_factor
if "Cs-137" in VALID_FALLOUT:
sources.append(IsotopeSource("Cs-137", cs137_activity, include_daughters=True))
if "Cs-134" in VALID_FALLOUT and cs134_activity > 0.5:
sources.append(IsotopeSource("Cs-134", cs134_activity, include_daughters=True))
# Sometimes include I-131 (very fresh fallout only)
if rng.random() < 0.3 and "I-131" in VALID_FALLOUT:
sources.append(IsotopeSource("I-131", rng.uniform(1, 50), include_daughters=True))
return sources
class MixedSourcesScenario(SampleScenario):
"""Random mix of 2-3 different source types."""
def __init__(self, fraction: float = 0.10):
super().__init__("mixed", fraction)
def generate_sources(self, rng, activity_range) -> List[IsotopeSource]:
num_isotopes = rng.integers(2, 4)
selected = rng.choice(ALL_VALID_ISOTOPES, size=num_isotopes, replace=False)
sources = []
for iso in selected:
activity = rng.uniform(*activity_range)
sources.append(IsotopeSource(iso, activity, include_daughters=True))
return sources
class ComplexMixScenario(SampleScenario):
"""Complex scenario: 4-6 isotopes from various categories."""
def __init__(self, fraction: float = 0.05):
super().__init__("complex_mix", fraction)
def generate_sources(self, rng, activity_range) -> List[IsotopeSource]:
num_isotopes = rng.integers(4, 7)
selected = set()
# Try to get variety from different pools
pools = [VALID_CALIBRATION, VALID_MEDICAL, VALID_INDUSTRIAL, VALID_U238_CHAIN, VALID_TH232_CHAIN]
for pool in pools:
if len(selected) >= num_isotopes:
break
if pool:
iso = rng.choice(pool)
selected.add(iso)
# Fill remaining with random
while len(selected) < num_isotopes:
iso = rng.choice(ALL_VALID_ISOTOPES)
selected.add(iso)
sources = []
for iso in selected:
activity = rng.uniform(*activity_range)
sources.append(IsotopeSource(iso, activity, include_daughters=True))
return sources
class WeakSourceScenario(SampleScenario):
"""Very weak sources - near detection limit."""
def __init__(self, fraction: float = 0.05):
super().__init__("weak_source", fraction)
def generate_sources(self, rng, activity_range) -> List[IsotopeSource]:
# Very low activity - near background
weak_activity_range = (0.1, 5.0) # Much weaker than normal
isotope = rng.choice(ALL_VALID_ISOTOPES)
activity = rng.uniform(*weak_activity_range)
return [IsotopeSource(isotope, activity, include_daughters=True)]
# All scenarios with their fractions (should sum to 1.0)
DEFAULT_SCENARIOS = [
BackgroundOnlyScenario(0.15), # 15% - important for "no detection" cases
SingleCalibrationScenario(0.20), # 20% - common check sources
SingleMedicalScenario(0.08), # 8% - medical isotopes
SingleIndustrialScenario(0.05), # 5% - industrial sources
UraniumChainScenario(0.10), # 10% - natural uranium + daughters
ThoriumChainScenario(0.10), # 10% - natural thorium + daughters
NORMScenario(0.07), # 7% - NORM materials
FalloutScenario(0.05), # 5% - reactor fallout signature
MixedSourcesScenario(0.10), # 10% - random 2-3 isotope mixes
ComplexMixScenario(0.05), # 5% - complex 4-6 isotope scenarios
WeakSourceScenario(0.05), # 5% - near-detection-limit sources
]
# =============================================================================
# BACKGROUND VARIATION
# =============================================================================
class BackgroundConfig:
"""Configuration for varied background generation."""
def __init__(
self,
intensity_min: float = 0.3,
intensity_max: float = 3.0,
k40_prob: float = 0.95,
radon_prob: float = 0.8,
thorium_prob: float = 0.6,
):
self.intensity_min = intensity_min
self.intensity_max = intensity_max
self.k40_prob = k40_prob
self.radon_prob = radon_prob
self.thorium_prob = thorium_prob
def sample(self, rng: np.random.Generator) -> dict:
"""Sample a random background configuration."""
return {
'background_cps': rng.uniform(self.intensity_min, self.intensity_max) * 5.0,
'include_k40': rng.random() < self.k40_prob,
'include_radon': rng.random() < self.radon_prob,
'include_thorium': rng.random() < self.thorium_prob,
}
# =============================================================================
# SAMPLE GENERATION
# =============================================================================
def generate_single_sample(args: Tuple[int, dict]) -> Optional[str]:
"""
Generate a single sample for parallel processing.
Args:
args: Tuple of (sample_index, config_dict)
Returns:
Sample ID if successful, None if failed
"""
sample_idx, config = args
try:
# Create RNG with unique seed per sample
rng = np.random.default_rng(config['base_seed'] + sample_idx)
# Initialize generator
detector_config = RADIACODE_CONFIGS.get(config['detector_name'])
generator = SpectrumGenerator(detector_config=detector_config)
# Select scenario based on cumulative probabilities
scenarios = config['scenarios']
scenario_probs = [s.fraction for s in scenarios]
scenario = rng.choice(scenarios, p=scenario_probs)
# Generate sources for this scenario
sources = scenario.generate_sources(rng, config['activity_range'])
# Background configuration
bg_config = BackgroundConfig(
intensity_min=config.get('bg_intensity_min', 0.3),
intensity_max=config.get('bg_intensity_max', 3.0),
)
bg_params = bg_config.sample(rng)
# FIXED 60-second duration for 2D model
duration = 60.0
# Create spectrum config
spec_config = SpectrumConfig(
duration_seconds=duration,
time_interval_seconds=1.0, # 1 second per interval = 60 intervals
sources=sources,
include_background=True,
background_cps=bg_params['background_cps'],
include_k40=bg_params['include_k40'],
include_radon=bg_params['include_radon'],
include_thorium=bg_params['include_thorium'],
detector_name=config['detector_name'],
)
# Generate spectrum
spectrum = generator.generate_spectrum(spec_config)
# Save spectrum
output_dir = Path(config['output_dir']) / "spectra"
save_spectrum(
spectrum,
output_dir,
save_image=True, # Save NPY file
image_format='npy' # Skip PNG for speed
)
return spectrum.sample_id
except Exception as e:
print(f"Error generating sample {sample_idx}: {e}")
import traceback
traceback.print_exc()
return None
def generate_training_data_v3(
num_samples: int,
output_dir: Path,
detector_name: str = "radiacode_103",
activity_range: Tuple[float, float] = (1.0, 100.0),
bg_intensity_range: Tuple[float, float] = (0.3, 3.0),
scenarios: Optional[List[SampleScenario]] = None,
num_workers: int = None,
random_seed: int = None,
) -> int:
"""
Generate training samples in parallel.
Args:
num_samples: Total number of samples to generate
output_dir: Output directory
detector_name: Detector to simulate
activity_range: (min, max) activity in Bq
bg_intensity_range: Background intensity multiplier range
scenarios: List of SampleScenario objects (default: DEFAULT_SCENARIOS)
num_workers: Number of parallel workers
random_seed: Base random seed
Returns:
Number of successfully generated samples
"""
if num_workers is None:
num_workers = max(1, cpu_count() - 1)
if random_seed is None:
random_seed = int(time.time())
if scenarios is None:
scenarios = DEFAULT_SCENARIOS
# Normalize scenario fractions
total_fraction = sum(s.fraction for s in scenarios)
for s in scenarios:
s.fraction /= total_fraction
# Create output directory
output_dir = Path(output_dir)
spectra_dir = output_dir / "spectra"
spectra_dir.mkdir(parents=True, exist_ok=True)
print(f"=" * 70)
print(f"SYNTHETIC SPECTRA GENERATION v3 - Optimized for 2D Model")
print(f"=" * 70)
print(f"\nConfiguration:")
print(f" Samples: {num_samples:,}")
print(f" Output: {output_dir}")
print(f" Detector: {detector_name}")
print(f" Duration: 60 seconds (fixed)")
print(f" Activity range: {activity_range[0]:.1f} - {activity_range[1]:.1f} Bq")
print(f" Workers: {num_workers}")
print(f"\nScenario distribution:")
for s in scenarios:
count = int(num_samples * s.fraction)
print(f" {s.name}: {s.fraction*100:.1f}% (~{count:,} samples)")
print()
# Shared config for all workers
shared_config = {
'detector_name': detector_name,
'output_dir': str(output_dir),
'activity_range': activity_range,
'bg_intensity_min': bg_intensity_range[0],
'bg_intensity_max': bg_intensity_range[1],
'base_seed': random_seed,
'scenarios': scenarios,
}
# Create work items
work_items = [(i, shared_config) for i in range(num_samples)]
# Progress tracking
start_time = time.time()
completed = 0
failed = 0
last_report = 0
print(f"Starting generation...")
# Generate in parallel
with Pool(num_workers) as pool:
for result in pool.imap_unordered(generate_single_sample, work_items, chunksize=100):
if result is not None:
completed += 1
else:
failed += 1
total = completed + failed
# Progress report every 1%
if total - last_report >= num_samples // 100 or total == num_samples:
elapsed = time.time() - start_time
rate = completed / elapsed if elapsed > 0 else 0
eta = (num_samples - total) / rate if rate > 0 else 0
print(f"\r Progress: {total:,}/{num_samples:,} ({100*total/num_samples:.1f}%) | "
f"Rate: {rate:.1f}/s | "
f"ETA: {eta/60:.1f}m | "
f"Failed: {failed}", end="", flush=True)
last_report = total
total_time = time.time() - start_time
print(f"\n\nGeneration complete!")
print(f" Total time: {total_time/60:.1f} minutes")
print(f" Successful: {completed:,}")
print(f" Failed: {failed}")
print(f" Rate: {completed/total_time:.1f} samples/second")
return completed
def main():
parser = argparse.ArgumentParser(description='Generate synthetic gamma spectra v3')
parser.add_argument('--num_samples', '-n', type=int, default=200000,
help='Number of samples to generate')
parser.add_argument('--output_dir', '-o', type=str, default='data/synthetic',
help='Output directory')
parser.add_argument('--detector', '-d', type=str, default='radiacode_103',
help='Detector type')
parser.add_argument('--workers', '-w', type=int, default=None,
help='Number of parallel workers')
parser.add_argument('--seed', '-s', type=int, default=None,
help='Random seed')
parser.add_argument('--activity_min', type=float, default=1.0,
help='Minimum activity in Bq')
parser.add_argument('--activity_max', type=float, default=100.0,
help='Maximum activity in Bq')
args = parser.parse_args()
generate_training_data_v3(
num_samples=args.num_samples,
output_dir=Path(args.output_dir),
detector_name=args.detector,
activity_range=(args.activity_min, args.activity_max),
num_workers=args.workers,
random_seed=args.seed,
)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,474 @@
"""
Synthetic Spectrum Generator
Main class for generating synthetic gamma spectra images
with various isotope combinations and configurations.
"""
import numpy as np
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple, Any
import json
from pathlib import Path
from datetime import datetime
import hashlib
from .config import DetectorConfig, get_default_config, RADIACODE_CONFIGS
from .ground_truth import (
ISOTOPE_DATABASE,
Isotope,
get_isotope,
get_all_isotopes,
DECAY_CHAINS,
get_chain_daughters,
infer_parent_from_daughters,
)
from .physics import (
PeakParameters,
generate_peak_spectrum,
generate_environmental_background,
apply_poisson_noise,
apply_electronic_noise,
normalize_spectrum,
)
@dataclass
class IsotopeSource:
"""Definition of an isotope source for spectrum generation."""
isotope_name: str
activity_bq: float
# Optional: if part of a decay chain, include daughters
include_daughters: bool = True
# Activity can vary by this factor for augmentation
activity_variation: float = 0.0
@dataclass
class SpectrumConfig:
"""Configuration for a single spectrum generation."""
# Time parameters
duration_seconds: float = 60.0
time_interval_seconds: float = 1.0 # Each row in the spectrogram
# Sources to include
sources: List[IsotopeSource] = field(default_factory=list)
# Background options
include_background: bool = True
background_cps: float = 5.0
include_k40: bool = True
include_radon: bool = True
include_thorium: bool = True
# Detector configuration
detector_name: str = "radiacode_103"
# Noise options
apply_poisson: bool = True
apply_electronic: bool = False
electronic_noise_sigma: float = 0.5
# Normalization
normalize: bool = True
normalization_method: str = "max" # max, sum, log, sqrt
@dataclass
class GeneratedSpectrum:
"""Result of spectrum generation."""
# The spectrum data (2D array: time x channels)
data: np.ndarray
# Metadata
config: SpectrumConfig
isotopes_present: List[str]
background_isotopes: List[str]
# For labels/annotations
labels: Dict[str, Any] = field(default_factory=dict)
# Unique identifier
sample_id: str = ""
# Generation timestamp
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
class SpectrumGenerator:
"""
Main class for generating synthetic gamma spectra.
Creates 2D spectrogram images where:
- X-axis: Energy channels (1023 channels, 20-3000 keV)
- Y-axis: Time intervals (variable duration)
- Pixel intensity: Normalized count rate
"""
def __init__(
self,
detector_config: Optional[DetectorConfig] = None,
random_seed: Optional[int] = None
):
"""
Initialize the spectrum generator.
Args:
detector_config: Detector configuration (default: Radiacode 103)
random_seed: Random seed for reproducibility
"""
if detector_config is None:
detector_config = get_default_config()
self.detector_config = detector_config
self.energy_bins = detector_config.get_energy_bins()
self.num_channels = len(self.energy_bins)
if random_seed is not None:
np.random.seed(random_seed)
def generate_single_interval(
self,
sources: List[IsotopeSource],
interval_duration: float,
include_background: bool = True,
background_config: Optional[Dict] = None
) -> Tuple[np.ndarray, List[str], List[str]]:
"""
Generate a single time interval spectrum.
Args:
sources: List of isotope sources
interval_duration: Duration in seconds
include_background: Whether to include environmental background
background_config: Background configuration options
Returns:
Tuple of (spectrum, source_isotopes, background_isotopes)
"""
spectrum = np.zeros(self.num_channels)
source_isotopes = []
background_isotopes = []
# Add background
if include_background:
if background_config is None:
background_config = {}
bg_spectrum, bg_isotopes = generate_environmental_background(
self.energy_bins,
interval_duration,
background_cps=background_config.get('background_cps', 5.0),
include_k40=background_config.get('include_k40', True),
include_radon=background_config.get('include_radon', True),
include_thorium=background_config.get('include_thorium', True),
detector_config=self.detector_config
)
spectrum += bg_spectrum
background_isotopes = bg_isotopes
# Add source isotopes
for source in sources:
isotope = get_isotope(source.isotope_name)
if isotope is None:
print(f"Warning: Unknown isotope {source.isotope_name}")
continue
# Apply activity variation if specified
activity = source.activity_bq
if source.activity_variation > 0:
variation = 1 + np.random.uniform(
-source.activity_variation,
source.activity_variation
)
activity *= variation
# Add gamma lines from this isotope
for gamma_line in isotope.gamma_lines:
peak_params = PeakParameters(
energy_kev=gamma_line.energy_kev,
intensity=gamma_line.intensity,
activity_bq=activity,
live_time_s=interval_duration
)
peak = generate_peak_spectrum(
self.energy_bins,
peak_params,
self.detector_config
)
spectrum += peak
source_isotopes.append(source.isotope_name)
# Include daughters if requested
if source.include_daughters and isotope.daughters:
for daughter_name in isotope.daughters:
daughter = get_isotope(daughter_name)
if daughter:
for gamma_line in daughter.gamma_lines:
peak_params = PeakParameters(
energy_kev=gamma_line.energy_kev,
intensity=gamma_line.intensity,
activity_bq=activity, # Secular equilibrium assumed
live_time_s=interval_duration
)
peak = generate_peak_spectrum(
self.energy_bins,
peak_params,
self.detector_config
)
spectrum += peak
source_isotopes.append(daughter_name)
return spectrum, list(set(source_isotopes)), background_isotopes
def generate_spectrum(
self,
config: SpectrumConfig
) -> GeneratedSpectrum:
"""
Generate a cumulative 1D spectrum (sum over time).
Instead of creating a 2D spectrogram (time x channels), this produces
a 1D spectrum by generating the full duration at once — matching how
a real detector accumulates counts. This avoids massive memory usage
with long durations.
Args:
config: Spectrum configuration
Returns:
GeneratedSpectrum object with 1D data (num_channels,)
"""
# Set detector config
if config.detector_name in RADIACODE_CONFIGS:
self.detector_config = RADIACODE_CONFIGS[config.detector_name]
self.energy_bins = self.detector_config.get_energy_bins()
self.num_channels = len(self.energy_bins)
all_source_isotopes = []
all_background_isotopes = []
# Generate the full-duration spectrum at once (like a real detector)
spectrum, src_iso, bg_iso = self.generate_single_interval(
config.sources,
config.duration_seconds, # Full duration, not per-interval
config.include_background,
background_config={
'background_cps': config.background_cps,
'include_k40': config.include_k40,
'include_radon': config.include_radon,
'include_thorium': config.include_thorium,
}
)
all_source_isotopes.extend(src_iso)
all_background_isotopes.extend(bg_iso)
# Apply noise
if config.apply_poisson:
spectrum = apply_poisson_noise(spectrum)
if config.apply_electronic:
spectrum = apply_electronic_noise(
spectrum,
config.electronic_noise_sigma
)
# Normalize if requested
if config.normalize:
spectrum = normalize_spectrum(spectrum, config.normalization_method)
# Generate unique sample ID
sample_id = self._generate_sample_id(config)
# Determine isotopes present
isotopes_present = list(set(all_source_isotopes))
background_isotopes = list(set(all_background_isotopes))
# Create labels
labels = {
'isotopes': isotopes_present,
'background_isotopes': background_isotopes,
'source_activities_bq': {
s.isotope_name: s.activity_bq for s in config.sources
},
'duration_seconds': config.duration_seconds,
'detector': config.detector_name,
'normalized': config.normalize,
'normalization_method': config.normalization_method if config.normalize else None,
}
return GeneratedSpectrum(
data=spectrum, # 1D array (num_channels,)
config=config,
isotopes_present=isotopes_present,
background_isotopes=background_isotopes,
labels=labels,
sample_id=sample_id
)
def _generate_sample_id(self, config: SpectrumConfig) -> str:
"""Generate a unique sample ID from config."""
# Create a hash from config parameters
hash_input = f"{datetime.now().timestamp()}"
hash_input += f"_{config.duration_seconds}"
hash_input += f"_{','.join(s.isotope_name for s in config.sources)}"
hash_input += f"_{np.random.randint(0, 1000000)}"
return hashlib.md5(hash_input.encode()).hexdigest()[:12]
def generate_random_spectrum(
self,
duration_range: Tuple[float, float] = (60, 300),
num_isotopes_range: Tuple[int, int] = (1, 3),
activity_range: Tuple[float, float] = (1.0, 100.0),
isotope_pool: Optional[List[str]] = None,
**kwargs
) -> GeneratedSpectrum:
"""
Generate a spectrum with random parameters.
Args:
duration_range: (min, max) duration in seconds
num_isotopes_range: (min, max) number of isotopes to include
activity_range: (min, max) activity in Bq
isotope_pool: List of isotope names to choose from (default: all with gammas)
**kwargs: Additional arguments passed to SpectrumConfig
Returns:
GeneratedSpectrum with random configuration
"""
# Choose duration
duration = np.random.uniform(*duration_range)
# Choose number of isotopes
num_isotopes = np.random.randint(num_isotopes_range[0], num_isotopes_range[1] + 1)
# Build isotope pool if not provided
if isotope_pool is None:
isotope_pool = [
iso.name for iso in get_all_isotopes()
if len(iso.gamma_lines) > 0 and
any(line.intensity > 0.01 for line in iso.gamma_lines)
]
# Select random isotopes
selected = np.random.choice(isotope_pool, size=min(num_isotopes, len(isotope_pool)), replace=False)
# Create sources with random activities
sources = []
for isotope_name in selected:
activity = np.random.uniform(*activity_range)
sources.append(IsotopeSource(
isotope_name=isotope_name,
activity_bq=activity,
include_daughters=np.random.random() > 0.3
))
# Create config
config = SpectrumConfig(
duration_seconds=duration,
sources=sources,
**kwargs
)
return self.generate_spectrum(config)
def save_spectrum(
spectrum: GeneratedSpectrum,
output_dir: Path,
save_image: bool = True,
image_format: str = 'npy',
save_individual_label: bool = True
) -> Dict[str, str]:
"""
Save a generated spectrum to disk.
Args:
spectrum: GeneratedSpectrum to save
output_dir: Output directory path
save_image: Whether to save the spectrum data as an image/array
image_format: Format for spectrum data ('npy', 'png', 'both')
save_individual_label: Whether to save individual JSON label file per sample
Returns:
Dict of saved file paths
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
saved_files = {}
base_name = f"spectrum_{spectrum.sample_id}"
# Save spectrum data
if save_image:
if image_format in ('npy', 'both'):
npy_path = output_dir / f"{base_name}.npy"
np.save(npy_path, spectrum.data)
saved_files['npy'] = str(npy_path)
if image_format in ('png', 'both'):
try:
from PIL import Image
# Convert to 8-bit grayscale image
data_normalized = spectrum.data
if data_normalized.max() > 0:
data_normalized = data_normalized / data_normalized.max()
img_data = (data_normalized * 255).astype(np.uint8)
img = Image.fromarray(img_data, mode='L')
png_path = output_dir / f"{base_name}.png"
img.save(png_path)
saved_files['png'] = str(png_path)
except ImportError:
print("Warning: PIL not installed, skipping PNG save")
# Save individual label JSON file (for efficient loading)
if save_individual_label:
json_path = output_dir / f"{base_name}.json"
with open(json_path, 'w') as f:
json.dump(spectrum.labels, f, indent=2)
saved_files['json'] = str(json_path)
saved_files['sample_id'] = spectrum.sample_id
return saved_files
def generate_labels_json(
spectra: List[GeneratedSpectrum],
output_path: Path
) -> None:
"""
Generate a combined JSON file with labels for all spectra.
Note: This is for backward compatibility. For large datasets,
individual JSON files per sample are more efficient.
Args:
spectra: List of generated spectra
output_path: Path to save labels JSON
"""
labels = {
'metadata': {
'generated_at': datetime.now().isoformat(),
'num_samples': len(spectra),
'channels': 1023,
'energy_range_kev': [20, 3000],
},
'samples': {}
}
for spectrum in spectra:
labels['samples'][spectrum.sample_id] = spectrum.labels
with open(output_path, 'w') as f:
json.dump(labels, f, indent=2)

View File

@ -0,0 +1,29 @@
"""
Ground Truth Module
Contains isotope data, decay chains, and chain signatures for
synthetic spectra generation.
"""
from .isotope_data import (
ISOTOPE_DATABASE,
Isotope,
GammaLine,
IsotopeCategory,
get_isotope,
get_all_isotopes,
get_isotope_names,
get_isotopes_by_category,
get_isotopes_with_gamma_in_range,
SECOND, MINUTE, HOUR, DAY, YEAR, STABLE
)
from .decay_chains import (
DECAY_CHAINS,
CHAIN_SIGNATURES,
DecayChain,
ChainSignature,
get_decay_chain,
get_chain_daughters,
infer_parent_from_daughters,
)

View File

@ -0,0 +1,320 @@
"""
Decay Chain Definitions
Defines radioactive decay chains and their relationships, including:
- U-238 decay chain (Uranium series)
- Th-232 decay chain (Thorium series)
- U-235 decay chain (Actinium series)
Also includes chain signatures - groups of isotopes that commonly
appear together and indicate parent isotopes.
"""
from dataclasses import dataclass, field
from typing import List, Dict, Set, Optional, Tuple
from .isotope_data import ISOTOPE_DATABASE, Isotope
@dataclass
class DecayChainMember:
"""A member of a decay chain with branching ratio."""
isotope_name: str
branching_ratio: float = 1.0 # Fraction of decays following this path
decay_mode: str = ""
@dataclass
class DecayChain:
"""Complete decay chain definition."""
name: str
parent: str
members: List[DecayChainMember]
description: str = ""
def get_member_names(self) -> List[str]:
"""Get list of all member isotope names."""
return [m.isotope_name for m in self.members]
def get_gamma_emitters(self) -> List[str]:
"""Get members that have significant gamma emissions."""
emitters = []
for member in self.members:
iso = ISOTOPE_DATABASE.get(member.isotope_name)
if iso and len(iso.gamma_lines) > 0:
# Check if any line has significant intensity
if any(line.intensity > 0.01 for line in iso.gamma_lines):
emitters.append(member.isotope_name)
return emitters
@dataclass
class ChainSignature:
"""
Signature pattern of isotopes that indicate presence of a parent.
When these daughter isotopes appear together in a spectrum,
it strongly indicates the presence of the parent isotope
(even if parent has weak/no gamma emissions).
"""
name: str
parent_chain: str # Name of the decay chain
inferred_parent: str # Parent isotope that is indicated
required_daughters: Set[str] # Must see all of these
optional_daughters: Set[str] = field(default_factory=set) # May also see
description: str = ""
# =============================================================================
# DECAY CHAINS
# =============================================================================
DECAY_CHAINS: Dict[str, DecayChain] = {}
# U-238 DECAY CHAIN (Uranium Series)
# U-238 -> Th-234 -> Pa-234m -> U-234 -> Th-230 -> Ra-226 -> Rn-222 ->
# Po-218 -> Pb-214 -> Bi-214 -> Po-214 -> Pb-210 -> Bi-210 -> Po-210 -> Pb-206
DECAY_CHAINS["U-238"] = DecayChain(
name="U-238 Decay Chain (Uranium Series)",
parent="U-238",
description="14 step decay chain ending at stable Pb-206",
members=[
DecayChainMember("U-238", decay_mode="alpha"),
DecayChainMember("Th-234", decay_mode="beta-"),
DecayChainMember("Pa-234m", branching_ratio=0.998, decay_mode="beta-"),
DecayChainMember("U-234", decay_mode="alpha"),
DecayChainMember("Th-230", decay_mode="alpha"),
DecayChainMember("Ra-226", decay_mode="alpha"),
DecayChainMember("Rn-222", decay_mode="alpha"),
DecayChainMember("Po-218", decay_mode="alpha"),
DecayChainMember("Pb-214", decay_mode="beta-"),
DecayChainMember("Bi-214", branching_ratio=0.9998, decay_mode="beta-"),
DecayChainMember("Po-214", decay_mode="alpha"),
DecayChainMember("Pb-210", decay_mode="beta-"),
DecayChainMember("Bi-210", decay_mode="beta-"),
DecayChainMember("Po-210", decay_mode="alpha"),
]
)
# TH-232 DECAY CHAIN (Thorium Series)
# Th-232 -> Ra-228 -> Ac-228 -> Th-228 -> Ra-224 -> Rn-220 ->
# Po-216 -> Pb-212 -> Bi-212 -> (Tl-208 or Po-212) -> Pb-208
DECAY_CHAINS["Th-232"] = DecayChain(
name="Th-232 Decay Chain (Thorium Series)",
parent="Th-232",
description="10+ step decay chain ending at stable Pb-208",
members=[
DecayChainMember("Th-232", decay_mode="alpha"),
DecayChainMember("Ra-228", decay_mode="beta-"),
DecayChainMember("Ac-228", decay_mode="beta-"),
DecayChainMember("Th-228", decay_mode="alpha"),
DecayChainMember("Ra-224", decay_mode="alpha"),
DecayChainMember("Rn-220", decay_mode="alpha"),
DecayChainMember("Po-216", decay_mode="alpha"),
DecayChainMember("Pb-212", decay_mode="beta-"),
DecayChainMember("Bi-212", decay_mode="beta-/alpha"),
DecayChainMember("Tl-208", branching_ratio=0.3594, decay_mode="beta-"),
DecayChainMember("Po-212", branching_ratio=0.6406, decay_mode="alpha"),
]
)
# U-235 DECAY CHAIN (Actinium Series)
# U-235 -> Th-231 -> Pa-231 -> Ac-227 -> (complex branching) -> Pb-207
DECAY_CHAINS["U-235"] = DecayChain(
name="U-235 Decay Chain (Actinium Series)",
parent="U-235",
description="11+ step decay chain ending at stable Pb-207",
members=[
DecayChainMember("U-235", decay_mode="alpha"),
DecayChainMember("Th-231", decay_mode="beta-"),
DecayChainMember("Pa-231", decay_mode="alpha"),
DecayChainMember("Ac-227", decay_mode="beta-/alpha"),
DecayChainMember("Pb-211", decay_mode="beta-"),
DecayChainMember("Bi-211", decay_mode="alpha"),
DecayChainMember("Tl-207", decay_mode="beta-"),
]
)
# Cs-137 -> Ba-137m (simple 2-step)
DECAY_CHAINS["Cs-137"] = DecayChain(
name="Cs-137 Decay",
parent="Cs-137",
description="Cs-137 beta decay to Ba-137m metastable state",
members=[
DecayChainMember("Cs-137", decay_mode="beta-"),
DecayChainMember("Ba-137m", decay_mode="IT"),
]
)
# =============================================================================
# CHAIN SIGNATURES
# =============================================================================
CHAIN_SIGNATURES: Dict[str, ChainSignature] = {}
# Radon-222 progeny (from U-238 chain via Ra-226)
# Seeing Pb-214 + Bi-214 together indicates radon presence
CHAIN_SIGNATURES["Rn-222_progeny"] = ChainSignature(
name="Radon-222 Progeny",
parent_chain="U-238",
inferred_parent="Rn-222",
required_daughters={"Pb-214", "Bi-214"},
optional_daughters={"Po-214"},
description="Pb-214 + Bi-214 indicates airborne Rn-222 (radon) daughters"
)
# Extended U-238 chain indicator
CHAIN_SIGNATURES["Ra-226_equilibrium"] = ChainSignature(
name="Ra-226 Secular Equilibrium",
parent_chain="U-238",
inferred_parent="Ra-226",
required_daughters={"Pb-214", "Bi-214"},
optional_daughters={"Rn-222", "Po-214", "Pb-210"},
description="Indicates Ra-226 or U-238 in secular equilibrium"
)
# Thoron progeny (from Th-232 chain)
# Seeing Pb-212 + Bi-212 + Tl-208 indicates thoron/thorium
CHAIN_SIGNATURES["Rn-220_progeny"] = ChainSignature(
name="Thoron (Rn-220) Progeny",
parent_chain="Th-232",
inferred_parent="Rn-220",
required_daughters={"Pb-212", "Bi-212"},
optional_daughters={"Tl-208", "Po-212"},
description="Pb-212 + Bi-212 indicates Rn-220 (thoron) daughters"
)
# Th-232 chain indicator (Ac-228 is key)
CHAIN_SIGNATURES["Th-232_equilibrium"] = ChainSignature(
name="Th-232 Secular Equilibrium",
parent_chain="Th-232",
inferred_parent="Th-232",
required_daughters={"Ac-228", "Pb-212", "Tl-208"},
optional_daughters={"Bi-212", "Ra-224"},
description="Ac-228 + Pb-212 + Tl-208 indicates Th-232 chain in equilibrium"
)
# U-235 presence (direct gamma)
CHAIN_SIGNATURES["U-235_direct"] = ChainSignature(
name="U-235 Direct",
parent_chain="U-235",
inferred_parent="U-235",
required_daughters={"U-235"}, # U-235 has direct 185.7 keV line
optional_daughters={"Th-231", "Pa-231"},
description="U-235 directly visible via 185.7 keV line"
)
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def get_decay_chain(name: str) -> Optional[DecayChain]:
"""Get a decay chain by parent isotope name."""
return DECAY_CHAINS.get(name)
def get_chain_daughters(parent: str, include_parent: bool = True) -> List[str]:
"""
Get all daughter isotopes in a decay chain.
Args:
parent: Parent isotope name (e.g., "U-238")
include_parent: Whether to include the parent in the list
Returns:
List of isotope names in the chain
"""
chain = DECAY_CHAINS.get(parent)
if chain is None:
return [parent] if include_parent else []
daughters = chain.get_member_names()
if not include_parent and daughters and daughters[0] == parent:
daughters = daughters[1:]
return daughters
def infer_parent_from_daughters(
detected_isotopes: Set[str]
) -> List[Tuple[str, ChainSignature, float]]:
"""
Given a set of detected isotopes, infer possible parent isotopes.
Args:
detected_isotopes: Set of isotope names detected in spectrum
Returns:
List of (parent_name, signature, confidence) tuples
Confidence is fraction of required daughters detected (1.0 = all)
"""
results = []
for sig_name, signature in CHAIN_SIGNATURES.items():
required_found = detected_isotopes & signature.required_daughters
if len(required_found) > 0:
confidence = len(required_found) / len(signature.required_daughters)
optional_found = detected_isotopes & signature.optional_daughters
# Boost confidence slightly if optional daughters also found
if len(signature.optional_daughters) > 0:
bonus = 0.1 * len(optional_found) / len(signature.optional_daughters)
confidence = min(1.0, confidence + bonus)
results.append((signature.inferred_parent, signature, confidence))
# Sort by confidence (highest first)
results.sort(key=lambda x: x[2], reverse=True)
return results
def get_equilibrium_ratios(chain_name: str) -> Dict[str, float]:
"""
Get secular equilibrium activity ratios for a decay chain.
In secular equilibrium, all daughter activities equal the parent activity.
This returns relative activity fractions (all 1.0 for secular equilibrium).
For non-equilibrium, this can be modified to return time-dependent ratios.
"""
chain = DECAY_CHAINS.get(chain_name)
if chain is None:
return {}
# In secular equilibrium, all activities are equal
return {m.isotope_name: 1.0 for m in chain.members}
def get_visible_chain_gammas(
chain_name: str,
min_intensity: float = 0.01
) -> Dict[str, List[Tuple[float, float]]]:
"""
Get all visible gamma lines from a decay chain.
Args:
chain_name: Name of the decay chain parent
min_intensity: Minimum emission intensity to include
Returns:
Dict mapping isotope name to list of (energy_keV, intensity) tuples
"""
chain = DECAY_CHAINS.get(chain_name)
if chain is None:
return {}
result = {}
for member in chain.members:
iso = ISOTOPE_DATABASE.get(member.isotope_name)
if iso:
lines = [
(line.energy_kev, line.intensity * member.branching_ratio)
for line in iso.gamma_lines
if line.intensity >= min_intensity
]
if lines:
result[member.isotope_name] = lines
return result

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,26 @@
"""
Physics Module
Contains spectrum generation physics including:
- Peak shape modeling
- Background generation
- Detector response
- Counting statistics
"""
from .spectrum_physics import (
PeakParameters,
gaussian_peak,
calculate_fwhm,
fwhm_to_sigma,
detector_efficiency,
calculate_expected_counts,
generate_peak_spectrum,
generate_compton_continuum,
generate_exponential_background,
generate_polynomial_background,
generate_environmental_background,
apply_poisson_noise,
apply_electronic_noise,
normalize_spectrum,
)

View File

@ -0,0 +1,553 @@
"""
Spectrum Physics Module
Implements the physics of gamma spectrum generation including:
- Peak shape modeling (Gaussian with detector response)
- Background continuum generation
- Counting statistics (Poisson sampling)
- Detector efficiency modeling
"""
import numpy as np
from scipy import special
from typing import Optional, Tuple, List
from dataclasses import dataclass
from ..config import DetectorConfig, get_default_config
@dataclass
class PeakParameters:
"""Parameters for a single gamma peak."""
energy_kev: float
intensity: float # Emission probability (photons/decay)
activity_bq: float # Source activity in Becquerels
live_time_s: float # Acquisition time in seconds
def gaussian_peak(
energy_bins: np.ndarray,
peak_energy: float,
sigma: float,
amplitude: float
) -> np.ndarray:
"""
Generate a Gaussian peak.
Args:
energy_bins: Array of energy bin centers (keV)
peak_energy: Center energy of peak (keV)
sigma: Standard deviation (keV)
amplitude: Peak area (total counts)
Returns:
Array of counts in each bin
"""
# Gaussian probability density
prob = np.exp(-0.5 * ((energy_bins - peak_energy) / sigma) ** 2)
prob /= (sigma * np.sqrt(2 * np.pi))
# Scale by amplitude and bin width
bin_width = energy_bins[1] - energy_bins[0] if len(energy_bins) > 1 else 1.0
return amplitude * prob * bin_width
def calculate_fwhm(energy_kev: float, fwhm_at_662: float = 0.084) -> float:
"""
Calculate FWHM at a given energy for scintillator detectors.
FWHM scales as sqrt(E) for scintillators due to statistical fluctuations
in light collection.
FWHM(E) = FWHM_662 * sqrt(E/662) * 662 / E * E = FWHM_662 * sqrt(662/E) * E
Actually: FWHM(E) / E = FWHM_662 / 662 * sqrt(662/E)
So: FWHM(E) = E * FWHM_662 / 662 * sqrt(662/E) = FWHM_662 * sqrt(662 * E) / 662
= FWHM_662 * sqrt(E / 662)
Wait, let me recalculate:
For scintillators, the relative resolution (FWHM/E) scales as 1/sqrt(E)
FWHM(E)/E = (FWHM_662/662) * sqrt(662/E)
FWHM(E) = FWHM_662 * sqrt(662 * E) / 662 = FWHM_662 * sqrt(E/662)
At 662 keV: FWHM = FWHM_662 * sqrt(1) = FWHM_662 ✓
At lower E: larger relative FWHM (worse resolution)
At higher E: smaller relative FWHM (better resolution)
Args:
energy_kev: Energy in keV
fwhm_at_662: FWHM at 662 keV as fraction (e.g., 0.084 for 8.4%)
Returns:
FWHM in keV at the given energy
"""
# FWHM_662 is given as fraction, so at 662 keV, FWHM = 0.084 * 662 = ~55.6 keV
fwhm_662_kev = fwhm_at_662 * 662.0
# Scale by sqrt(E/662)
fwhm_kev = fwhm_662_kev * np.sqrt(energy_kev / 662.0)
return fwhm_kev
def fwhm_to_sigma(fwhm: float) -> float:
"""Convert FWHM to Gaussian sigma."""
return fwhm / (2.0 * np.sqrt(2.0 * np.log(2.0))) # ≈ FWHM / 2.355
def detector_efficiency(
energy_kev: float,
detector_config: Optional[DetectorConfig] = None
) -> float:
"""
Calculate detector full-energy peak efficiency.
For CsI and GAGG scintillators, efficiency varies with energy.
This is a simplified model - real efficiency curves should be
measured for each detector.
Args:
energy_kev: Gamma energy in keV
detector_config: Detector configuration
Returns:
Efficiency as fraction (0-1)
"""
if detector_config is None:
detector_config = get_default_config()
# Simplified efficiency model for ~1 cm³ scintillator
# Low energy: efficiency increases (more stopping power)
# High energy: efficiency decreases (photons pass through)
# Peak around 100-300 keV for small scintillators
# This is a phenomenological model
# Real efficiency should be calibrated
if energy_kev < 20:
return 0.0
# Simple model: efficiency peaks around 100-200 keV
# Falls off at low energy (absorption in housing)
# Falls off at high energy (less stopping power)
# Low energy cutoff (absorption)
low_eff = 1.0 - np.exp(-energy_kev / 50.0)
# High energy falloff (escape)
# For 1 cm³ CsI, efficiency drops significantly above ~500 keV
high_eff = np.exp(-energy_kev / 2000.0)
# Combine effects
eff = 0.8 * low_eff * high_eff
# Scale by detector volume
volume_factor = (detector_config.detector_volume_cm3 / 1.0) ** (1/3)
eff *= min(1.0, volume_factor)
return max(0.0, min(1.0, eff))
def calculate_expected_counts(
peak_params: PeakParameters,
detector_config: Optional[DetectorConfig] = None
) -> float:
"""
Calculate expected counts in a photopeak.
λ = A * t * I * ε * T
Where:
A = activity (decays/s)
t = live time (s)
I = emission probability (photons/decay)
ε = detector efficiency
T = transmission factor (assumed 1 for now)
Args:
peak_params: Peak parameters
detector_config: Detector configuration
Returns:
Expected number of counts in the photopeak
"""
if detector_config is None:
detector_config = get_default_config()
efficiency = detector_efficiency(peak_params.energy_kev, detector_config)
expected = (
peak_params.activity_bq *
peak_params.live_time_s *
peak_params.intensity *
efficiency
)
return expected
def generate_peak_spectrum(
energy_bins: np.ndarray,
peak_params: PeakParameters,
detector_config: Optional[DetectorConfig] = None
) -> np.ndarray:
"""
Generate a single gamma peak with detector response.
Args:
energy_bins: Array of energy bin centers (keV)
peak_params: Peak parameters
detector_config: Detector configuration
Returns:
Array of expected counts in each bin (not yet Poisson sampled)
"""
if detector_config is None:
detector_config = get_default_config()
# Calculate expected counts
amplitude = calculate_expected_counts(peak_params, detector_config)
if amplitude <= 0:
return np.zeros_like(energy_bins)
# Calculate peak width
fwhm_kev = calculate_fwhm(peak_params.energy_kev, detector_config.fwhm_at_662)
sigma = fwhm_to_sigma(fwhm_kev)
# Generate Gaussian peak
peak = gaussian_peak(energy_bins, peak_params.energy_kev, sigma, amplitude)
return peak
def generate_compton_continuum(
energy_bins: np.ndarray,
peak_energy: float,
peak_counts: float,
compton_to_peak_ratio: float = 0.5
) -> np.ndarray:
"""
Generate simplified Compton continuum for a gamma line.
The Compton continuum extends from 0 to the Compton edge.
Compton edge energy = E * (1 - 1/(1 + 2*E/(511)))
Args:
energy_bins: Array of energy bin centers (keV)
peak_energy: Energy of the gamma line (keV)
peak_counts: Total counts in the photopeak
compton_to_peak_ratio: Ratio of Compton counts to peak counts
Returns:
Array of Compton continuum counts
"""
# Compton edge energy
alpha = peak_energy / 511.0 # E / m_e c²
compton_edge = peak_energy * (2 * alpha) / (1 + 2 * alpha)
# Create continuum (simplified flat + edge shape)
continuum = np.zeros_like(energy_bins)
# Mask for energies below Compton edge
mask = energy_bins < compton_edge
if np.any(mask):
# Simple model: roughly flat with enhancement near edge
base_level = peak_counts * compton_to_peak_ratio / np.sum(mask)
continuum[mask] = base_level
# Add edge enhancement (Klein-Nishina-like shape)
edge_region = (energy_bins > 0.8 * compton_edge) & (energy_bins < compton_edge)
if np.any(edge_region):
enhancement = 1.5 * np.exp(-((energy_bins[edge_region] - compton_edge) / (0.05 * compton_edge)) ** 2)
continuum[edge_region] *= (1 + enhancement)
return continuum
# =============================================================================
# BACKGROUND GENERATION
# =============================================================================
def generate_exponential_background(
energy_bins: np.ndarray,
amplitude: float = 100.0,
decay_constant: float = 0.003
) -> np.ndarray:
"""
Generate exponential background continuum.
B(E) = A * exp(-b * E)
Args:
energy_bins: Array of energy bin centers (keV)
amplitude: Background amplitude at E=0
decay_constant: Exponential decay constant (1/keV)
Returns:
Array of background counts
"""
return amplitude * np.exp(-decay_constant * energy_bins)
def generate_polynomial_background(
energy_bins: np.ndarray,
coefficients: List[float] = None
) -> np.ndarray:
"""
Generate polynomial background.
B(E) = Σ c_m * E^m
Args:
energy_bins: Array of energy bin centers (keV)
coefficients: Polynomial coefficients [c0, c1, c2, ...]
Returns:
Array of background counts
"""
if coefficients is None:
coefficients = [10.0, -0.005, 1e-6] # Default quadratic
background = np.zeros_like(energy_bins)
for m, c in enumerate(coefficients):
background += c * (energy_bins ** m)
return np.maximum(0, background)
def generate_environmental_background(
energy_bins: np.ndarray,
duration_seconds: float,
background_cps: float = 5.0,
include_k40: bool = True,
include_radon: bool = True,
include_thorium: bool = True,
detector_config: Optional[DetectorConfig] = None
) -> Tuple[np.ndarray, List[str]]:
"""
Generate realistic environmental background spectrum.
Includes:
- Exponential continuum (cosmic rays, scattered gammas)
- K-40 peak (1460 keV) - ubiquitous in environment
- Radon daughters (Pb-214, Bi-214) - indoor air
- Thorium daughters (Pb-212, Tl-208) - building materials
Args:
energy_bins: Array of energy bin centers (keV)
duration_seconds: Acquisition time
background_cps: Average background count rate (cps)
include_k40: Include potassium-40 peak
include_radon: Include radon daughter peaks
include_thorium: Include thorium daughter peaks
detector_config: Detector configuration
Returns:
Tuple of (background_spectrum, list_of_background_isotopes)
"""
if detector_config is None:
detector_config = get_default_config()
background_isotopes = []
# Start with exponential continuum
total_continuum_counts = background_cps * duration_seconds * 0.7
background = generate_exponential_background(
energy_bins,
amplitude=total_continuum_counts / 500,
decay_constant=0.002
)
# Normalize continuum to target count rate
if background.sum() > 0:
background *= (total_continuum_counts / background.sum())
# Add K-40 peak (very common)
if include_k40:
k40_activity = np.random.uniform(0.5, 5.0) # Bq
peak = generate_peak_spectrum(
energy_bins,
PeakParameters(
energy_kev=1460.83,
intensity=0.1066,
activity_bq=k40_activity,
live_time_s=duration_seconds
),
detector_config
)
background += peak
background_isotopes.append("K-40")
# Add radon daughters
if include_radon:
radon_activity = np.random.uniform(0.1, 2.0) # Bq
# Pb-214 lines
for energy, intensity in [(295.22, 0.1842), (351.93, 0.356)]:
peak = generate_peak_spectrum(
energy_bins,
PeakParameters(
energy_kev=energy,
intensity=intensity,
activity_bq=radon_activity,
live_time_s=duration_seconds
),
detector_config
)
background += peak
# Bi-214 lines
for energy, intensity in [(609.31, 0.4549), (1120.29, 0.1492), (1764.49, 0.1531)]:
peak = generate_peak_spectrum(
energy_bins,
PeakParameters(
energy_kev=energy,
intensity=intensity,
activity_bq=radon_activity,
live_time_s=duration_seconds
),
detector_config
)
background += peak
background_isotopes.extend(["Pb-214", "Bi-214"])
# Add thorium daughters
if include_thorium:
thorium_activity = np.random.uniform(0.05, 1.0) # Bq
# Ac-228 line
peak = generate_peak_spectrum(
energy_bins,
PeakParameters(
energy_kev=911.20,
intensity=0.258,
activity_bq=thorium_activity,
live_time_s=duration_seconds
),
detector_config
)
background += peak
# Pb-212 line
peak = generate_peak_spectrum(
energy_bins,
PeakParameters(
energy_kev=238.63,
intensity=0.436,
activity_bq=thorium_activity,
live_time_s=duration_seconds
),
detector_config
)
background += peak
# Tl-208 lines
for energy, intensity in [(583.19, 0.845 * 0.36), (2614.51, 0.998 * 0.36)]:
# Branching ratio of 36% for Tl-208 path
peak = generate_peak_spectrum(
energy_bins,
PeakParameters(
energy_kev=energy,
intensity=intensity,
activity_bq=thorium_activity,
live_time_s=duration_seconds
),
detector_config
)
background += peak
background_isotopes.extend(["Ac-228", "Pb-212", "Tl-208"])
return background, background_isotopes
def apply_poisson_noise(spectrum: np.ndarray) -> np.ndarray:
"""
Apply Poisson counting statistics to a spectrum.
Each bin is sampled from a Poisson distribution with
lambda = expected counts in that bin.
Args:
spectrum: Array of expected counts (can be float)
Returns:
Array of actual counts (integers)
"""
# Handle negative values (shouldn't happen but be safe)
spectrum = np.maximum(0, spectrum)
# Sample from Poisson distribution
return np.random.poisson(spectrum).astype(np.float64)
def apply_electronic_noise(
spectrum: np.ndarray,
sigma: float = 0.5
) -> np.ndarray:
"""
Apply small Gaussian electronic noise.
Args:
spectrum: Count spectrum
sigma: Standard deviation of electronic noise (counts)
Returns:
Spectrum with added electronic noise
"""
noise = np.random.normal(0, sigma, spectrum.shape)
result = spectrum + noise
return np.maximum(0, result)
# =============================================================================
# NORMALIZATION
# =============================================================================
def normalize_spectrum(
spectrum: np.ndarray,
method: str = "max"
) -> np.ndarray:
"""
Normalize a spectrum for ML training.
Args:
spectrum: Raw count spectrum
method: Normalization method
- "max": Divide by maximum value (range 0-1)
- "sum": Divide by total counts (probability distribution)
- "log": Log transform then max normalize
- "sqrt": Square root transform then max normalize
Returns:
Normalized spectrum
"""
if method == "max":
max_val = spectrum.max()
if max_val > 0:
return spectrum / max_val
return spectrum
elif method == "sum":
total = spectrum.sum()
if total > 0:
return spectrum / total
return spectrum
elif method == "log":
# Log transform (add 1 to handle zeros)
log_spec = np.log1p(spectrum)
max_val = log_spec.max()
if max_val > 0:
return log_spec / max_val
return log_spec
elif method == "sqrt":
sqrt_spec = np.sqrt(spectrum)
max_val = sqrt_spec.max()
if max_val > 0:
return sqrt_spec / max_val
return sqrt_spec
else:
raise ValueError(f"Unknown normalization method: {method}")

View File

@ -0,0 +1,477 @@
"""
Spectrum Viewer Application
A simple GUI application to browse and visualize generated synthetic spectra.
Randomly samples from the available spectra to avoid loading all files at once.
Usage:
python -m synthetic_spectra.spectrum_viewer
Or with options:
python -m synthetic_spectra.spectrum_viewer --num_samples 200 --data_dir ./data/synthetic/spectra
"""
import tkinter as tk
from tkinter import ttk
import numpy as np
import json
from pathlib import Path
import random
from typing import Optional, List, Dict, Any
from .config import RADIACODE_CONFIGS, get_default_config
# Try to import matplotlib for plotting
try:
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
from matplotlib.figure import Figure
HAS_MATPLOTLIB = True
except ImportError:
HAS_MATPLOTLIB = False
print("Warning: matplotlib not found. Install with: pip install matplotlib")
class SpectrumViewer:
"""
GUI application for viewing synthetic gamma spectra.
"""
def __init__(
self,
data_dir: str = "./data/synthetic/spectra",
num_samples: int = 100,
random_seed: Optional[int] = None
):
"""
Initialize the spectrum viewer.
Args:
data_dir: Directory containing spectrum .npy and .json files
num_samples: Number of random samples to load (for performance)
random_seed: Random seed for reproducible sample selection
"""
self.data_dir = Path(data_dir)
self.num_samples = num_samples
if random_seed is not None:
random.seed(random_seed)
# Find and sample spectrum files
self.spectrum_files = self._discover_and_sample_files()
if not self.spectrum_files:
raise ValueError(f"No spectrum files found in {self.data_dir}")
print(f"Loaded {len(self.spectrum_files)} spectrum samples")
# Current state
self.current_index = 0
self.current_spectrum: Optional[np.ndarray] = None
self.current_metadata: Optional[Dict[str, Any]] = None
# Setup GUI
self._setup_gui()
# Load first spectrum
self._load_current_spectrum()
def _discover_and_sample_files(self) -> List[Path]:
"""Find all spectrum files and randomly sample them."""
# Find all .npy files
all_npy_files = list(self.data_dir.glob("spectrum_*.npy"))
if not all_npy_files:
# Try without prefix
all_npy_files = list(self.data_dir.glob("*.npy"))
print(f"Found {len(all_npy_files)} total spectrum files")
# Randomly sample if we have more than requested
if len(all_npy_files) > self.num_samples:
sampled = random.sample(all_npy_files, self.num_samples)
else:
sampled = all_npy_files
# Sort by name for consistent ordering in dropdown
return sorted(sampled, key=lambda p: p.stem)
def _setup_gui(self):
"""Setup the tkinter GUI."""
self.root = tk.Tk()
self.root.title("Spectrum Viewer - Synthetic Gamma Spectra")
self.root.geometry("1200x800")
# Main container
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky="nsew")
# Configure grid weights for resizing
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(1, weight=1)
# === Top controls ===
controls_frame = ttk.Frame(main_frame)
controls_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10))
controls_frame.columnconfigure(1, weight=1)
# Dropdown for spectrum selection
ttk.Label(controls_frame, text="Select Spectrum:").grid(row=0, column=0, padx=(0, 10))
self.spectrum_var = tk.StringVar()
self.spectrum_dropdown = ttk.Combobox(
controls_frame,
textvariable=self.spectrum_var,
values=[f.stem for f in self.spectrum_files],
state="readonly",
width=50
)
self.spectrum_dropdown.grid(row=0, column=1, sticky="ew", padx=(0, 10))
self.spectrum_dropdown.bind("<<ComboboxSelected>>", self._on_spectrum_selected)
self.spectrum_dropdown.current(0)
# Navigation buttons
nav_frame = ttk.Frame(controls_frame)
nav_frame.grid(row=0, column=2)
ttk.Button(nav_frame, text="◀ Prev", command=self._prev_spectrum).pack(side="left", padx=2)
ttk.Button(nav_frame, text="Next ▶", command=self._next_spectrum).pack(side="left", padx=2)
ttk.Button(nav_frame, text="🎲 Random", command=self._random_spectrum).pack(side="left", padx=2)
# Sample count label
self.count_label = ttk.Label(
controls_frame,
text=f"Showing {len(self.spectrum_files)} of available spectra"
)
self.count_label.grid(row=0, column=3, padx=(10, 0))
# === Plotting area ===
plot_frame = ttk.Frame(main_frame)
plot_frame.grid(row=1, column=0, sticky="nsew")
plot_frame.columnconfigure(0, weight=1)
plot_frame.rowconfigure(0, weight=1)
if HAS_MATPLOTLIB:
# Create matplotlib figure with 2 subplots
self.fig = Figure(figsize=(12, 6), dpi=100)
# 2D spectrogram (heatmap)
self.ax_2d = self.fig.add_subplot(121)
self.ax_2d.set_title("2D Spectrogram (Time vs Energy)")
self.ax_2d.set_xlabel("Energy Channel")
self.ax_2d.set_ylabel("Time Interval (s)")
# 1D summed spectrum
self.ax_1d = self.fig.add_subplot(122)
self.ax_1d.set_title("Summed Spectrum")
self.ax_1d.set_xlabel("Energy (keV)")
self.ax_1d.set_ylabel("Counts (normalized)")
self.fig.tight_layout()
# Embed in tkinter
self.canvas = FigureCanvasTkAgg(self.fig, master=plot_frame)
self.canvas.draw()
self.canvas.get_tk_widget().grid(row=0, column=0, sticky="nsew")
# Toolbar
toolbar_frame = ttk.Frame(plot_frame)
toolbar_frame.grid(row=1, column=0, sticky="ew")
self.toolbar = NavigationToolbar2Tk(self.canvas, toolbar_frame)
self.toolbar.update()
else:
ttk.Label(
plot_frame,
text="matplotlib not installed. Install with: pip install matplotlib",
font=("Arial", 14)
).grid(row=0, column=0, pady=50)
# === Metadata panel ===
metadata_frame = ttk.LabelFrame(main_frame, text="Spectrum Metadata", padding="10")
metadata_frame.grid(row=2, column=0, sticky="ew", pady=(10, 0))
self.metadata_text = tk.Text(
metadata_frame,
height=10,
wrap="word",
font=("Consolas", 10)
)
self.metadata_text.pack(fill="both", expand=True)
# Scrollbar for metadata
scrollbar = ttk.Scrollbar(metadata_frame, orient="vertical", command=self.metadata_text.yview)
scrollbar.pack(side="right", fill="y")
self.metadata_text.configure(yscrollcommand=scrollbar.set)
def _load_current_spectrum(self):
"""Load the currently selected spectrum and its metadata."""
if not self.spectrum_files:
return
spectrum_path = self.spectrum_files[self.current_index]
json_path = spectrum_path.with_suffix(".json")
# Load numpy array
try:
self.current_spectrum = np.load(spectrum_path)
print(f"Loaded spectrum: {spectrum_path.name}, shape: {self.current_spectrum.shape}")
except Exception as e:
print(f"Error loading spectrum: {e}")
self.current_spectrum = None
# Load metadata JSON
if json_path.exists():
try:
with open(json_path, 'r') as f:
self.current_metadata = json.load(f)
except Exception as e:
print(f"Error loading metadata: {e}")
self.current_metadata = None
else:
self.current_metadata = None
# Update display
self._update_plot()
self._update_metadata()
def _update_plot(self):
"""Update the matplotlib plots."""
if not HAS_MATPLOTLIB or self.current_spectrum is None:
return
# Clear previous plots
self.ax_2d.clear()
self.ax_1d.clear()
spectrum = self.current_spectrum
num_channels = spectrum.shape[1] if len(spectrum.shape) > 1 else len(spectrum)
# Energy axis: use the same mapping as generation whenever possible.
detector_name = None
if isinstance(self.current_metadata, dict):
detector_name = (
self.current_metadata.get('detector')
or self.current_metadata.get('detector_name')
or (self.current_metadata.get('config') or {}).get('detector_name')
)
detector_config = RADIACODE_CONFIGS.get(detector_name, get_default_config())
energy_bins = detector_config.get_energy_bins()
if len(energy_bins) != num_channels:
# Fallback: linear mapping for the available channel count.
energy_bins = np.linspace(
detector_config.energy_min_kev,
detector_config.energy_max_kev,
num_channels,
dtype=np.float64
)
energy_min = float(energy_bins[0])
energy_max = float(energy_bins[-1])
if len(spectrum.shape) == 2:
# 2D spectrogram
num_intervals = spectrum.shape[0]
# Plot 2D heatmap
im = self.ax_2d.imshow(
spectrum,
aspect='auto',
origin='lower',
extent=[energy_min, energy_max, 0, num_intervals],
cmap='viridis'
)
self.ax_2d.set_title(f"2D Spectrogram ({num_intervals} time intervals)")
self.ax_2d.set_xlabel("Energy (keV)")
self.ax_2d.set_ylabel("Time Interval (s)")
# Add colorbar - use a dedicated axes to avoid removal issues
if not hasattr(self, '_cbar_ax') or self._cbar_ax is None:
# Create a dedicated colorbar axes on first use
self._cbar_ax = self.fig.add_axes([0.46, 0.55, 0.01, 0.35])
else:
self._cbar_ax.clear()
self._colorbar = self.fig.colorbar(im, cax=self._cbar_ax, label='Counts')
# Sum across time for 1D spectrum
summed_spectrum = spectrum.sum(axis=0)
else:
# 1D spectrum
self.ax_2d.text(
0.5, 0.5, "1D Spectrum\n(No time dimension)",
ha='center', va='center', transform=self.ax_2d.transAxes
)
summed_spectrum = spectrum
# Plot 1D summed spectrum
self.ax_1d.plot(energy_bins, summed_spectrum, 'b-', linewidth=0.8)
self.ax_1d.fill_between(energy_bins, 0, summed_spectrum, alpha=0.3)
self.ax_1d.set_title("Summed Spectrum")
self.ax_1d.set_xlabel("Energy (keV)")
self.ax_1d.set_ylabel("Counts (normalized)")
self.ax_1d.set_xlim(energy_min, energy_max)
self.ax_1d.set_ylim(0, None)
self.ax_1d.grid(True, alpha=0.3)
# Add vertical lines for common peaks if metadata available
if self.current_metadata:
isotopes = self.current_metadata.get('isotopes', [])
if isotopes:
# Add some common reference lines
peak_energies = self._get_peak_energies_from_metadata()
for energy, label in peak_energies[:5]: # Show top 5 peaks
if energy_min < energy < energy_max:
self.ax_1d.axvline(x=energy, color='red', linestyle='--', alpha=0.5, linewidth=0.8)
self.ax_1d.annotate(
label,
xy=(energy, self.ax_1d.get_ylim()[1] * 0.95),
fontsize=8,
rotation=90,
ha='right',
va='top'
)
# Use subplots_adjust instead of tight_layout to avoid colorbar axes conflict
self.fig.subplots_adjust(left=0.08, right=0.95, top=0.92, bottom=0.12, wspace=0.3)
self.canvas.draw()
def _get_peak_energies_from_metadata(self) -> List[tuple]:
"""Extract key peak energies from metadata for annotation."""
peaks = []
if not self.current_metadata:
return peaks
isotopes = self.current_metadata.get('isotopes', [])
# Common isotope peak energies
isotope_peaks = {
'Cs-137': [(661.66, 'Cs-137')],
'Co-60': [(1173.23, 'Co-60'), (1332.49, 'Co-60')],
'Am-241': [(59.54, 'Am-241')],
'Ba-133': [(356.0, 'Ba-133'), (81.0, 'Ba-133')],
'Na-22': [(511.0, 'Na-22'), (1274.54, 'Na-22')],
'K-40': [(1460.83, 'K-40')],
'Eu-152': [(344.28, 'Eu-152'), (1408.0, 'Eu-152')],
'I-131': [(364.49, 'I-131')],
'Tc-99m': [(140.51, 'Tc-99m')],
'Co-57': [(122.06, 'Co-57')],
}
for iso_info in isotopes:
iso_name = iso_info.get('name', '') if isinstance(iso_info, dict) else str(iso_info)
if iso_name in isotope_peaks:
peaks.extend(isotope_peaks[iso_name])
return peaks
def _update_metadata(self):
"""Update the metadata text display."""
self.metadata_text.delete(1.0, tk.END)
if self.current_spectrum is not None:
# Add spectrum shape info
info = f"Spectrum Shape: {self.current_spectrum.shape}\n"
info += f"Data type: {self.current_spectrum.dtype}\n"
info += f"Value range: [{self.current_spectrum.min():.4f}, {self.current_spectrum.max():.4f}]\n"
info += f"Mean value: {self.current_spectrum.mean():.4f}\n"
info += "\n" + "="*50 + "\n\n"
self.metadata_text.insert(tk.END, info)
if self.current_metadata:
# Pretty print JSON metadata
formatted = json.dumps(self.current_metadata, indent=2)
self.metadata_text.insert(tk.END, formatted)
else:
self.metadata_text.insert(tk.END, "No metadata JSON file found for this spectrum.")
def _on_spectrum_selected(self, event=None):
"""Handle spectrum selection from dropdown."""
selection = self.spectrum_var.get()
for i, f in enumerate(self.spectrum_files):
if f.stem == selection:
self.current_index = i
break
self._load_current_spectrum()
def _prev_spectrum(self):
"""Go to previous spectrum."""
self.current_index = (self.current_index - 1) % len(self.spectrum_files)
self.spectrum_dropdown.current(self.current_index)
self._load_current_spectrum()
def _next_spectrum(self):
"""Go to next spectrum."""
self.current_index = (self.current_index + 1) % len(self.spectrum_files)
self.spectrum_dropdown.current(self.current_index)
self._load_current_spectrum()
def _random_spectrum(self):
"""Jump to a random spectrum."""
self.current_index = random.randint(0, len(self.spectrum_files) - 1)
self.spectrum_dropdown.current(self.current_index)
self._load_current_spectrum()
def run(self):
"""Start the GUI main loop."""
self.root.mainloop()
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(
description="Visualize synthetic gamma spectra"
)
parser.add_argument(
"--data_dir",
type=str,
default="./data/synthetic/spectra",
help="Directory containing spectrum files (default: ./data/synthetic/spectra)"
)
parser.add_argument(
"--num_samples",
type=int,
default=100,
help="Number of random samples to load (default: 100)"
)
parser.add_argument(
"--seed",
type=int,
default=None,
help="Random seed for reproducible sample selection"
)
args = parser.parse_args()
if not HAS_MATPLOTLIB:
print("ERROR: matplotlib is required for visualization.")
print("Install with: pip install matplotlib")
return
print(f"Starting Spectrum Viewer...")
print(f"Data directory: {args.data_dir}")
print(f"Loading up to {args.num_samples} random samples...")
try:
viewer = SpectrumViewer(
data_dir=args.data_dir,
num_samples=args.num_samples,
random_seed=args.seed
)
viewer.run()
except ValueError as e:
print(f"Error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
raise
if __name__ == "__main__":
main()

View File

@ -0,0 +1,946 @@
"""
Training Data Visualization Script
Generates an interactive HTML dashboard with Plotly visualizations to explore
the synthetic training data distribution, isotope combinations, activities,
durations, and sample spectra.
Usage:
python -m synthetic_spectra.visualize_training_data
python -m synthetic_spectra.visualize_training_data --data-dir data/synthetic/spectra
python -m synthetic_spectra.visualize_training_data --output report.html --max-samples 1000
Output:
An interactive HTML file that can be opened in any browser.
"""
import argparse
import json
import sys
from pathlib import Path
from collections import Counter, defaultdict
from itertools import combinations
from typing import Dict, List, Tuple, Optional
import numpy as np
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
except ImportError:
print("Error: Plotly is required. Install it with: pip install plotly")
sys.exit(1)
from synthetic_spectra.ground_truth.isotope_data import (
ISOTOPE_DATABASE,
IsotopeCategory,
get_isotopes_by_category,
)
def load_all_metadata(data_dir: Path, max_samples: Optional[int] = None) -> List[Dict]:
"""Load all JSON metadata files from the data directory."""
json_files = sorted(data_dir.glob("*.json"))
if max_samples is not None and len(json_files) > max_samples:
# Randomly sample if we have too many
np.random.seed(42)
indices = np.random.choice(len(json_files), max_samples, replace=False)
json_files = [json_files[i] for i in sorted(indices)]
metadata_list = []
print(f"Loading {len(json_files)} metadata files...")
for i, json_file in enumerate(json_files):
try:
with open(json_file, 'r') as f:
data = json.load(f)
data['_filename'] = json_file.stem
metadata_list.append(data)
except Exception as e:
print(f" Warning: Could not load {json_file}: {e}")
if (i + 1) % 1000 == 0:
print(f" Loaded {i + 1}/{len(json_files)} files...")
print(f"Loaded {len(metadata_list)} samples successfully.")
return metadata_list
def load_sample_spectra(data_dir: Path, sample_ids: List[str]) -> Dict[str, np.ndarray]:
"""Load a few sample spectra for visualization."""
spectra = {}
for sample_id in sample_ids:
npy_file = data_dir / f"{sample_id}.npy"
if npy_file.exists():
try:
spectra[sample_id] = np.load(npy_file)
except Exception as e:
print(f" Warning: Could not load spectrum {npy_file}: {e}")
return spectra
def compute_statistics(metadata_list: List[Dict]) -> Dict:
"""Compute various statistics from the metadata."""
stats = {
'total_samples': len(metadata_list),
'isotope_counts': Counter(),
'isotope_cooccurrence': defaultdict(int),
'num_isotopes_distribution': Counter(),
'durations': [],
'activities': defaultdict(list),
'detectors': Counter(),
'category_counts': Counter(),
'samples_by_num_isotopes': defaultdict(list),
}
for meta in metadata_list:
isotopes = meta.get('isotopes', [])
source_activities = meta.get('source_activities_bq', {})
duration = meta.get('duration_seconds', 0)
detector = meta.get('detector', 'unknown')
# Count isotopes
for iso in isotopes:
stats['isotope_counts'][iso] += 1
# Get category
if iso in ISOTOPE_DATABASE:
cat = ISOTOPE_DATABASE[iso].category.value
stats['category_counts'][cat] += 1
# Count isotope pairs (co-occurrence)
for pair in combinations(sorted(isotopes), 2):
stats['isotope_cooccurrence'][pair] += 1
# Number of isotopes distribution
num_iso = len(isotopes)
stats['num_isotopes_distribution'][num_iso] += 1
stats['samples_by_num_isotopes'][num_iso].append(meta['_filename'])
# Duration
stats['durations'].append(duration)
# Activities per isotope
for iso, activity in source_activities.items():
stats['activities'][iso].append(activity)
# Detector
stats['detectors'][detector] += 1
return stats
def create_isotope_frequency_chart(stats: Dict) -> go.Figure:
"""Create bar chart of isotope frequencies."""
isotope_counts = stats['isotope_counts']
# Sort by frequency
sorted_isotopes = sorted(isotope_counts.items(), key=lambda x: x[1], reverse=True)
isotopes, counts = zip(*sorted_isotopes) if sorted_isotopes else ([], [])
# Color by category
colors = []
category_colors = {
'natural_background': '#2ecc71',
'primordial': '#27ae60',
'cosmogenic': '#1abc9c',
'u238_chain': '#e74c3c',
'th232_chain': '#c0392b',
'u235_chain': '#d35400',
'calibration': '#3498db',
'industrial': '#9b59b6',
'medical': '#f1c40f',
'reactor_fallout': '#e67e22',
'activation': '#95a5a6',
}
for iso in isotopes:
if iso in ISOTOPE_DATABASE:
cat = ISOTOPE_DATABASE[iso].category.value
colors.append(category_colors.get(cat, '#7f8c8d'))
else:
colors.append('#7f8c8d')
fig = go.Figure(data=[
go.Bar(
x=list(isotopes),
y=list(counts),
marker_color=colors,
hovertemplate="<b>%{x}</b><br>Count: %{y}<extra></extra>"
)
])
fig.update_layout(
title="Isotope Frequency Distribution",
xaxis_title="Isotope",
yaxis_title="Number of Samples",
xaxis_tickangle=-45,
height=500,
showlegend=False
)
return fig
def create_category_pie_chart(stats: Dict) -> go.Figure:
"""Create pie chart of isotope categories."""
category_counts = stats['category_counts']
if not category_counts:
return go.Figure().add_annotation(text="No category data available",
xref="paper", yref="paper", x=0.5, y=0.5)
labels = list(category_counts.keys())
values = list(category_counts.values())
# Pretty names for categories
pretty_names = {
'natural_background': 'Natural Background',
'primordial': 'Primordial',
'cosmogenic': 'Cosmogenic',
'u238_chain': 'U-238 Chain',
'th232_chain': 'Th-232 Chain',
'u235_chain': 'U-235 Chain',
'calibration': 'Calibration',
'industrial': 'Industrial',
'medical': 'Medical',
'reactor_fallout': 'Reactor/Fallout',
'activation': 'Activation Products',
}
labels = [pretty_names.get(l, l) for l in labels]
fig = go.Figure(data=[
go.Pie(
labels=labels,
values=values,
hole=0.4,
hovertemplate="<b>%{label}</b><br>Count: %{value}<br>%{percent}<extra></extra>"
)
])
fig.update_layout(
title="Isotope Categories Distribution",
height=450,
)
return fig
def create_num_isotopes_histogram(stats: Dict) -> go.Figure:
"""Create histogram of number of isotopes per sample."""
num_iso_dist = stats['num_isotopes_distribution']
x = sorted(num_iso_dist.keys())
y = [num_iso_dist[k] for k in x]
# Calculate percentages
total = sum(y)
percentages = [f"{(v/total)*100:.1f}%" for v in y]
fig = go.Figure(data=[
go.Bar(
x=[str(k) for k in x],
y=y,
text=percentages,
textposition='auto',
marker_color='#3498db',
hovertemplate="<b>%{x} isotopes</b><br>Count: %{y}<br>%{text}<extra></extra>"
)
])
fig.update_layout(
title="Sample Complexity (Number of Isotopes per Sample)",
xaxis_title="Number of Source Isotopes",
yaxis_title="Number of Samples",
height=400,
)
return fig
def create_duration_histogram(stats: Dict) -> go.Figure:
"""Create histogram of measurement durations."""
durations = stats['durations']
if not durations:
return go.Figure().add_annotation(text="No duration data available",
xref="paper", yref="paper", x=0.5, y=0.5)
fig = go.Figure(data=[
go.Histogram(
x=durations,
nbinsx=50,
marker_color='#9b59b6',
hovertemplate="Duration: %{x:.1f}s<br>Count: %{y}<extra></extra>"
)
])
fig.update_layout(
title="Measurement Duration Distribution",
xaxis_title="Duration (seconds)",
yaxis_title="Number of Samples",
height=400,
)
# Add statistics annotation
mean_dur = np.mean(durations)
std_dur = np.std(durations)
min_dur = np.min(durations)
max_dur = np.max(durations)
fig.add_annotation(
text=f"Mean: {mean_dur:.1f}s | Std: {std_dur:.1f}s | Range: [{min_dur:.1f}, {max_dur:.1f}]s",
xref="paper", yref="paper",
x=0.98, y=0.98,
xanchor='right', yanchor='top',
showarrow=False,
bgcolor="white",
bordercolor="black",
borderwidth=1,
font=dict(size=11)
)
return fig
def create_activity_boxplot(stats: Dict) -> go.Figure:
"""Create box plot of activities per isotope."""
activities = stats['activities']
if not activities:
return go.Figure().add_annotation(text="No activity data available",
xref="paper", yref="paper", x=0.5, y=0.5)
# Sort by median activity
sorted_isotopes = sorted(
activities.keys(),
key=lambda x: np.median(activities[x]) if activities[x] else 0,
reverse=True
)
# Only show top 30 for readability
top_isotopes = sorted_isotopes[:30]
fig = go.Figure()
for iso in top_isotopes:
fig.add_trace(go.Box(
y=activities[iso],
name=iso,
boxpoints='outliers',
hovertemplate=f"<b>{iso}</b><br>Activity: %{{y:.2f}} Bq<extra></extra>"
))
fig.update_layout(
title="Activity Distribution by Isotope (Top 30)",
xaxis_title="Isotope",
yaxis_title="Activity (Bq)",
xaxis_tickangle=-45,
height=500,
showlegend=False
)
return fig
def create_cooccurrence_heatmap(stats: Dict, top_n: int = 20) -> go.Figure:
"""Create heatmap of isotope co-occurrence."""
cooccurrence = stats['isotope_cooccurrence']
isotope_counts = stats['isotope_counts']
if not cooccurrence:
return go.Figure().add_annotation(text="No co-occurrence data (need multi-isotope samples)",
xref="paper", yref="paper", x=0.5, y=0.5)
# Get top N most frequent isotopes
top_isotopes = [iso for iso, _ in isotope_counts.most_common(top_n)]
# Build matrix
n = len(top_isotopes)
matrix = np.zeros((n, n))
for i, iso1 in enumerate(top_isotopes):
for j, iso2 in enumerate(top_isotopes):
if i < j:
pair = tuple(sorted([iso1, iso2]))
matrix[i, j] = cooccurrence.get(pair, 0)
matrix[j, i] = matrix[i, j]
fig = go.Figure(data=go.Heatmap(
z=matrix,
x=top_isotopes,
y=top_isotopes,
colorscale='Blues',
hovertemplate="<b>%{x}</b> + <b>%{y}</b><br>Co-occurrences: %{z}<extra></extra>"
))
fig.update_layout(
title=f"Isotope Co-occurrence Matrix (Top {top_n} Isotopes)",
xaxis_tickangle=-45,
height=600,
width=700,
)
return fig
def create_activity_vs_duration_scatter(metadata_list: List[Dict]) -> go.Figure:
"""Create scatter plot of total activity vs duration."""
durations = []
total_activities = []
num_isotopes = []
sample_ids = []
for meta in metadata_list:
duration = meta.get('duration_seconds', 0)
activities = meta.get('source_activities_bq', {})
if duration > 0 and activities:
durations.append(duration)
total_activities.append(sum(activities.values()))
num_isotopes.append(len(meta.get('isotopes', [])))
sample_ids.append(meta['_filename'])
if not durations:
return go.Figure().add_annotation(text="No data available",
xref="paper", yref="paper", x=0.5, y=0.5)
fig = go.Figure(data=go.Scatter(
x=durations,
y=total_activities,
mode='markers',
marker=dict(
size=6,
color=num_isotopes,
colorscale='Viridis',
colorbar=dict(title="# Isotopes"),
opacity=0.6
),
text=sample_ids,
hovertemplate="<b>%{text}</b><br>Duration: %{x:.1f}s<br>Total Activity: %{y:.2f} Bq<extra></extra>"
))
fig.update_layout(
title="Total Source Activity vs Measurement Duration",
xaxis_title="Duration (seconds)",
yaxis_title="Total Activity (Bq)",
height=500,
)
return fig
def create_sample_spectrum_plot(spectra: Dict[str, np.ndarray], metadata_list: List[Dict]) -> go.Figure:
"""Create interactive plot of sample spectra."""
if not spectra:
return go.Figure().add_annotation(text="No spectrum data loaded",
xref="paper", yref="paper", x=0.5, y=0.5)
# Create a metadata lookup
meta_lookup = {m['_filename']: m for m in metadata_list}
# Energy axis (keV) - 1023 channels from 20 to 3000 keV
num_channels = 1023
energy = np.linspace(20, 3000, num_channels)
fig = go.Figure()
colors = px.colors.qualitative.Set2
for i, (sample_id, spectrum) in enumerate(list(spectra.items())[:6]):
# Sum across time intervals to get total spectrum
total_spectrum = spectrum.sum(axis=0) if spectrum.ndim == 2 else spectrum
# Get isotope info
meta = meta_lookup.get(sample_id, {})
isotopes = meta.get('isotopes', ['Unknown'])
label = f"{sample_id[-6:]}: {', '.join(isotopes)}"
fig.add_trace(go.Scatter(
x=energy,
y=total_spectrum,
mode='lines',
name=label,
line=dict(color=colors[i % len(colors)], width=1),
hovertemplate=f"<b>{label}</b><br>Energy: %{{x:.1f}} keV<br>Counts: %{{y:.2f}}<extra></extra>"
))
fig.update_layout(
title="Sample Spectra (Time-Integrated)",
xaxis_title="Energy (keV)",
yaxis_title="Normalized Counts",
height=500,
legend=dict(yanchor="top", y=0.99, xanchor="right", x=0.99),
hovermode='closest'
)
return fig
def create_3d_spectrum_surface(spectrum: np.ndarray, sample_id: str) -> go.Figure:
"""Create 3D surface plot of a single spectrum (time vs energy vs counts)."""
if spectrum.ndim != 2:
return go.Figure().add_annotation(text="Spectrum must be 2D",
xref="paper", yref="paper", x=0.5, y=0.5)
num_intervals, num_channels = spectrum.shape
# Create axes
time_axis = np.arange(num_intervals)
energy_axis = np.linspace(20, 3000, num_channels)
# Downsample for performance if needed
if num_intervals > 100:
step = num_intervals // 100
spectrum = spectrum[::step, :]
time_axis = time_axis[::step]
if num_channels > 256:
ch_step = num_channels // 256
spectrum = spectrum[:, ::ch_step]
energy_axis = energy_axis[::ch_step]
fig = go.Figure(data=[
go.Surface(
z=spectrum,
x=energy_axis,
y=time_axis,
colorscale='Viridis',
hovertemplate="Time: %{y}s<br>Energy: %{x:.1f} keV<br>Counts: %{z:.3f}<extra></extra>"
)
])
fig.update_layout(
title=f"3D Spectrum View: {sample_id}",
scene=dict(
xaxis_title="Energy (keV)",
yaxis_title="Time (s)",
zaxis_title="Counts",
),
height=600,
)
return fig
def create_summary_table(stats: Dict) -> str:
"""Create an HTML summary table."""
total = stats['total_samples']
num_unique_isotopes = len(stats['isotope_counts'])
avg_isotopes_per_sample = sum(k * v for k, v in stats['num_isotopes_distribution'].items()) / total if total else 0
durations = stats['durations']
activities_all = [a for acts in stats['activities'].values() for a in acts]
html = f"""
<div style="padding: 20px; background: #f8f9fa; border-radius: 10px; margin: 20px 0;">
<h3 style="margin-top: 0; color: #2c3e50;">📊 Dataset Summary</h3>
<table style="width: 100%; border-collapse: collapse; font-size: 14px;">
<tr style="border-bottom: 1px solid #ddd;">
<td style="padding: 8px;"><strong>Total Samples</strong></td>
<td style="padding: 8px;">{total:,}</td>
</tr>
<tr style="border-bottom: 1px solid #ddd;">
<td style="padding: 8px;"><strong>Unique Isotopes</strong></td>
<td style="padding: 8px;">{num_unique_isotopes}</td>
</tr>
<tr style="border-bottom: 1px solid #ddd;">
<td style="padding: 8px;"><strong>Avg Isotopes per Sample</strong></td>
<td style="padding: 8px;">{avg_isotopes_per_sample:.2f}</td>
</tr>
<tr style="border-bottom: 1px solid #ddd;">
<td style="padding: 8px;"><strong>Duration Range</strong></td>
<td style="padding: 8px;">{min(durations) if durations else 0:.1f}s - {max(durations) if durations else 0:.1f}s</td>
</tr>
<tr style="border-bottom: 1px solid #ddd;">
<td style="padding: 8px;"><strong>Mean Duration</strong></td>
<td style="padding: 8px;">{np.mean(durations) if durations else 0:.1f}s</td>
</tr>
<tr style="border-bottom: 1px solid #ddd;">
<td style="padding: 8px;"><strong>Activity Range</strong></td>
<td style="padding: 8px;">{min(activities_all) if activities_all else 0:.2f} - {max(activities_all) if activities_all else 0:.2f} Bq</td>
</tr>
<tr>
<td style="padding: 8px;"><strong>Detectors</strong></td>
<td style="padding: 8px;">{', '.join(stats['detectors'].keys())}</td>
</tr>
</table>
</div>
"""
return html
def create_isotope_database_summary() -> go.Figure:
"""Create a sunburst chart of the isotope database by category."""
# Build hierarchy data
categories = defaultdict(list)
for name, isotope in ISOTOPE_DATABASE.items():
categories[isotope.category.value].append(name)
# Create sunburst data
ids = []
labels = []
parents = []
values = []
# Root
ids.append("Isotope Database")
labels.append("Isotope Database")
parents.append("")
values.append(len(ISOTOPE_DATABASE))
# Categories and isotopes
pretty_names = {
'natural_background': 'Natural Background',
'primordial': 'Primordial',
'cosmogenic': 'Cosmogenic',
'u238_chain': 'U-238 Chain',
'th232_chain': 'Th-232 Chain',
'u235_chain': 'U-235 Chain',
'calibration': 'Calibration',
'industrial': 'Industrial',
'medical': 'Medical',
'reactor_fallout': 'Reactor/Fallout',
'activation': 'Activation',
}
for cat, isotopes in categories.items():
cat_label = pretty_names.get(cat, cat)
ids.append(cat_label)
labels.append(f"{cat_label} ({len(isotopes)})")
parents.append("Isotope Database")
values.append(len(isotopes))
for iso in isotopes:
ids.append(f"{cat_label}/{iso}")
labels.append(iso)
parents.append(cat_label)
values.append(1)
fig = go.Figure(go.Sunburst(
ids=ids,
labels=labels,
parents=parents,
values=values,
branchvalues="total",
hovertemplate="<b>%{label}</b><extra></extra>"
))
fig.update_layout(
title=f"Isotope Database Structure ({len(ISOTOPE_DATABASE)} isotopes)",
height=600,
)
return fig
def generate_html_report(
data_dir: Path,
output_file: Path,
max_samples: Optional[int] = None
):
"""Generate the complete HTML report."""
print("=" * 60)
print("Training Data Visualization Report Generator")
print("=" * 60)
# Load all metadata
metadata_list = load_all_metadata(data_dir, max_samples)
if not metadata_list:
print("Error: No metadata files found!")
return
# Compute statistics
print("\nComputing statistics...")
stats = compute_statistics(metadata_list)
# Load a few sample spectra
print("\nLoading sample spectra for visualization...")
sample_ids = [m['_filename'] for m in metadata_list[:10]]
spectra = load_sample_spectra(data_dir, sample_ids)
print(f"\nGenerating visualizations...")
# Generate all figures
figures = {
'isotope_freq': create_isotope_frequency_chart(stats),
'category_pie': create_category_pie_chart(stats),
'num_isotopes': create_num_isotopes_histogram(stats),
'duration_hist': create_duration_histogram(stats),
'activity_box': create_activity_boxplot(stats),
'cooccurrence': create_cooccurrence_heatmap(stats),
'activity_duration': create_activity_vs_duration_scatter(metadata_list),
'sample_spectra': create_sample_spectrum_plot(spectra, metadata_list),
'isotope_db': create_isotope_database_summary(),
}
# Add 3D spectrum if we have data
if spectra:
first_id = list(spectra.keys())[0]
figures['spectrum_3d'] = create_3d_spectrum_surface(spectra[first_id], first_id)
# Create HTML
print("\nBuilding HTML report...")
html_parts = [
"""
<!DOCTYPE html>
<html>
<head>
<title>Synthetic Training Data Visualization</title>
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, sans-serif;
margin: 0;
padding: 20px;
background: #ecf0f1;
color: #2c3e50;
}
.container {
max-width: 1400px;
margin: 0 auto;
}
h1 {
text-align: center;
color: #2c3e50;
padding: 20px;
background: white;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
h2 {
color: #34495e;
border-bottom: 2px solid #3498db;
padding-bottom: 10px;
margin-top: 40px;
}
.chart-container {
background: white;
padding: 20px;
border-radius: 10px;
margin: 20px 0;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.row {
display: flex;
gap: 20px;
flex-wrap: wrap;
}
.col-6 {
flex: 1;
min-width: 400px;
}
.col-12 {
width: 100%;
}
.toc {
background: white;
padding: 20px;
border-radius: 10px;
margin: 20px 0;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.toc ul {
list-style: none;
padding-left: 0;
}
.toc li {
margin: 10px 0;
}
.toc a {
color: #3498db;
text-decoration: none;
}
.toc a:hover {
text-decoration: underline;
}
.info-box {
background: #e8f6ff;
border-left: 4px solid #3498db;
padding: 15px;
margin: 20px 0;
border-radius: 0 10px 10px 0;
}
</style>
</head>
<body>
<div class="container">
<h1>🔬 Synthetic Gamma Spectra Training Data Analysis</h1>
""",
create_summary_table(stats),
"""
<div class="toc">
<h3>📑 Table of Contents</h3>
<ul>
<li><a href="#isotope-distribution">1. Isotope Distribution</a></li>
<li><a href="#sample-complexity">2. Sample Complexity</a></li>
<li><a href="#temporal-activity">3. Temporal & Activity Analysis</a></li>
<li><a href="#cooccurrence">4. Isotope Co-occurrence</a></li>
<li><a href="#sample-spectra">5. Sample Spectra</a></li>
<li><a href="#database-overview">6. Isotope Database Overview</a></li>
</ul>
</div>
<h2 id="isotope-distribution">1. Isotope Distribution</h2>
<div class="info-box">
<strong>What this shows:</strong> The frequency of each isotope across all training samples.
Imbalanced distributions may lead to model bias towards common isotopes.
</div>
<div class="row">
<div class="col-6 chart-container">
""",
figures['isotope_freq'].to_html(full_html=False, include_plotlyjs=False),
"""
</div>
<div class="col-6 chart-container">
""",
figures['category_pie'].to_html(full_html=False, include_plotlyjs=False),
"""
</div>
</div>
<h2 id="sample-complexity">2. Sample Complexity</h2>
<div class="info-box">
<strong>What this shows:</strong> Distribution of how many source isotopes are present per sample.
Mix of single and multi-isotope samples helps the model handle real-world complexity.
</div>
<div class="chart-container">
""",
figures['num_isotopes'].to_html(full_html=False, include_plotlyjs=False),
"""
</div>
<h2 id="temporal-activity">3. Temporal & Activity Analysis</h2>
<div class="info-box">
<strong>What this shows:</strong> Distribution of measurement durations and source activities.
Varied durations simulate different counting scenarios.
</div>
<div class="row">
<div class="col-6 chart-container">
""",
figures['duration_hist'].to_html(full_html=False, include_plotlyjs=False),
"""
</div>
<div class="col-6 chart-container">
""",
figures['activity_duration'].to_html(full_html=False, include_plotlyjs=False),
"""
</div>
</div>
<div class="chart-container">
""",
figures['activity_box'].to_html(full_html=False, include_plotlyjs=False),
"""
</div>
<h2 id="cooccurrence">4. Isotope Co-occurrence</h2>
<div class="info-box">
<strong>What this shows:</strong> Which isotopes frequently appear together in training samples.
This helps understand potential confusion pairs and realistic combinations.
</div>
<div class="chart-container">
""",
figures['cooccurrence'].to_html(full_html=False, include_plotlyjs=False),
"""
</div>
<h2 id="sample-spectra">5. Sample Spectra Visualization</h2>
<div class="info-box">
<strong>What this shows:</strong> Actual spectrum shapes from the training data.
Each peak corresponds to gamma emission lines from the source isotopes.
</div>
<div class="chart-container">
""",
figures['sample_spectra'].to_html(full_html=False, include_plotlyjs=False),
"""
</div>
"""
]
# Add 3D spectrum if available
if 'spectrum_3d' in figures:
html_parts.append("""
<div class="chart-container">
<h3>3D Time-Energy-Counts View</h3>
""")
html_parts.append(figures['spectrum_3d'].to_html(full_html=False, include_plotlyjs=False))
html_parts.append("</div>")
html_parts.append("""
<h2 id="database-overview">6. Isotope Database Overview</h2>
<div class="info-box">
<strong>What this shows:</strong> The complete isotope database structure organized by category.
Click to explore the hierarchy.
</div>
<div class="chart-container">
""")
html_parts.append(figures['isotope_db'].to_html(full_html=False, include_plotlyjs=False))
html_parts.append("""
</div>
<footer style="text-align: center; padding: 40px; color: #7f8c8d;">
<p>Generated by ML for Isotope Identification Training Data Analyzer</p>
</footer>
</div>
</body>
</html>
""")
# Write HTML file
html_content = ''.join(html_parts)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"\n✅ Report generated successfully!")
print(f" Output: {output_file.absolute()}")
print(f"\nOpen in your browser to view the interactive visualizations.")
def main():
parser = argparse.ArgumentParser(
description="Generate interactive HTML visualization of training data"
)
parser.add_argument(
'--data-dir',
type=str,
default='data/synthetic/spectra',
help='Directory containing spectrum .json and .npy files'
)
parser.add_argument(
'--output',
type=str,
default='training_data_report.html',
help='Output HTML file name'
)
parser.add_argument(
'--max-samples',
type=int,
default=None,
help='Maximum number of samples to analyze (for faster generation)'
)
args = parser.parse_args()
data_dir = Path(args.data_dir)
output_file = Path(args.output)
if not data_dir.exists():
print(f"Error: Data directory not found: {data_dir}")
sys.exit(1)
generate_html_report(data_dir, output_file, args.max_samples)
if __name__ == "__main__":
main()