Files
radiacode/train/vega_ml/training/vega/model.py
Jacquin Antoine 745a64b342 Pipeline complet Radiacode 103 - identification automatique d'isotopes
- VegaModel CNN-FCNN 34.5M params, 82 isotopes, val acc 99.89%
- Generation 50k spectres synthetiques 1D (12-24h durees)
- Entrainement 100 epochs sur RTX 5060 Ti (CUDA 12.8, Blackwell)
- Detection continue avec soustraction du background
- Capture background 24h avec gestion deconnexion
- Docker Compose : conteneur train (GPU) + detect (CPU/USB)
- Modele entraite inclus (vega_best.pt, 395 Mo)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-19 12:29:56 +02:00

417 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Vega Model Architecture - CNN-FCNN with Multi-Task Heads
A hybrid Convolutional Neural Network with Fully Connected Neural Network
for gamma spectrum isotope identification. Based on peer-reviewed research
showing CNN-FCNN achieves state-of-the-art performance (99%+ accuracy).
Architecture:
Input: 1D gamma spectrum (1023 channels, 20-3000 keV)
Feature Extraction: 3 CNN modules with LeakyReLU, MaxPool, Dropout
Classification Head: Dense layers → Sigmoid (multi-label isotope presence)
Regression Head: Dense layers → ReLU (activity estimation in Bq)
"""
import torch
import torch.nn as nn
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
@dataclass
class VegaConfig:
"""Configuration for the Vega model."""
# Input configuration
num_channels: int = 1023 # Number of energy channels in spectrum
# Number of isotopes to classify
num_isotopes: int = 82 # From isotope database
# CNN backbone configuration
conv_channels: List[int] = field(default_factory=lambda: [64, 128, 256])
conv_kernel_size: int = 7
pool_size: int = 2
# Classification head configuration
fc_hidden_dims: List[int] = field(default_factory=lambda: [512, 256])
# Regularization
dropout_rate: float = 0.3
spatial_dropout_rate: float = 0.1
# Activation
leaky_relu_slope: float = 0.1
# Loss weighting
classification_weight: float = 1.0
regression_weight: float = 0.1
# Training
max_activity_bq: float = 1000.0 # For activity normalization
class ConvBlock(nn.Module):
"""
Convolutional block with two conv layers, activation, pooling, and dropout.
Based on Turner et al. (2021) architecture showing that stacking two
convolutions per module with pooling achieves good feature extraction.
"""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int = 7,
pool_size: int = 2,
dropout_rate: float = 0.1,
leaky_slope: float = 0.1
):
super().__init__()
# First convolution
self.conv1 = nn.Conv1d(
in_channels, out_channels,
kernel_size=kernel_size,
padding=kernel_size // 2
)
self.bn1 = nn.BatchNorm1d(out_channels)
self.act1 = nn.LeakyReLU(leaky_slope)
# Second convolution
self.conv2 = nn.Conv1d(
out_channels, out_channels,
kernel_size=kernel_size,
padding=kernel_size // 2
)
self.bn2 = nn.BatchNorm1d(out_channels)
self.act2 = nn.LeakyReLU(leaky_slope)
# Pooling and dropout
self.pool = nn.MaxPool1d(pool_size)
self.dropout = nn.Dropout1d(dropout_rate) # Spatial dropout for 1D
def forward(self, x: torch.Tensor) -> torch.Tensor:
# First conv block
x = self.conv1(x)
x = self.bn1(x)
x = self.act1(x)
# Second conv block
x = self.conv2(x)
x = self.bn2(x)
x = self.act2(x)
# Pool and dropout
x = self.pool(x)
x = self.dropout(x)
return x
class VegaModel(nn.Module):
"""
Vega: CNN-FCNN Multi-Task Model for Isotope Identification
Named after the bright star Vega (α Lyrae), which emits radiation
across the electromagnetic spectrum - fitting for a gamma spectrum analyzer.
The model performs two tasks:
1. Multi-label classification: Which isotopes are present?
2. Activity regression: What is the activity (Bq) of each isotope?
"""
def __init__(self, config: VegaConfig):
super().__init__()
self.config = config
# Build CNN backbone
self.backbone = self._build_backbone()
# Calculate flattened size after backbone
self._flat_size = self._calculate_flat_size()
# Build classification head (multi-label)
self.classifier = self._build_classifier()
# Build regression head (activity estimation)
self.regressor = self._build_regressor()
# Initialize weights
self._init_weights()
def _build_backbone(self) -> nn.Sequential:
"""Build the CNN feature extraction backbone."""
layers = []
in_channels = 1 # Input is 1D spectrum with 1 channel
for out_channels in self.config.conv_channels:
layers.append(ConvBlock(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=self.config.conv_kernel_size,
pool_size=self.config.pool_size,
dropout_rate=self.config.spatial_dropout_rate,
leaky_slope=self.config.leaky_relu_slope
))
in_channels = out_channels
return nn.Sequential(*layers)
def _calculate_flat_size(self) -> int:
"""Calculate the size of flattened features after backbone."""
# Create dummy input to calculate size
dummy = torch.zeros(1, 1, self.config.num_channels)
with torch.no_grad():
out = self.backbone(dummy)
return out.numel()
def _build_classifier(self) -> nn.Sequential:
"""Build the classification head for isotope presence prediction.
Outputs raw logits (not probabilities) for AMP compatibility.
Use BCEWithLogitsLoss for training, apply sigmoid during inference.
"""
layers = []
in_features = self._flat_size
# Hidden layers
for hidden_dim in self.config.fc_hidden_dims:
layers.extend([
nn.Linear(in_features, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.LeakyReLU(self.config.leaky_relu_slope),
nn.Dropout(self.config.dropout_rate)
])
in_features = hidden_dim
# Output layer - raw logits for AMP compatibility
layers.append(nn.Linear(in_features, self.config.num_isotopes))
return nn.Sequential(*layers)
def _build_regressor(self) -> nn.Sequential:
"""Build the regression head for activity estimation."""
layers = []
in_features = self._flat_size
# Hidden layers (shared architecture with classifier)
for hidden_dim in self.config.fc_hidden_dims:
layers.extend([
nn.Linear(in_features, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.LeakyReLU(self.config.leaky_relu_slope),
nn.Dropout(self.config.dropout_rate)
])
in_features = hidden_dim
# Output layer with ReLU for non-negative activity values
layers.extend([
nn.Linear(in_features, self.config.num_isotopes),
nn.ReLU() # Activity must be non-negative
])
return nn.Sequential(*layers)
def _init_weights(self):
"""Initialize weights using He initialization for LeakyReLU."""
for module in self.modules():
if isinstance(module, (nn.Conv1d, nn.Linear)):
nn.init.kaiming_normal_(
module.weight,
a=self.config.leaky_relu_slope,
mode='fan_out',
nonlinearity='leaky_relu'
)
if module.bias is not None:
nn.init.zeros_(module.bias)
elif isinstance(module, nn.BatchNorm1d):
nn.init.ones_(module.weight)
nn.init.zeros_(module.bias)
def forward(
self,
x: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Forward pass through the model.
Args:
x: Input spectrum tensor of shape (batch, channels) or (batch, 1, channels)
Values should be normalized [0, 1]
Returns:
Tuple of:
- isotope_logits: Raw logits for each isotope (batch, num_isotopes)
Apply sigmoid to get probabilities for inference
- activity_pred: Predicted activity in Bq for each isotope (batch, num_isotopes)
"""
# Ensure input has channel dimension
if x.dim() == 2:
x = x.unsqueeze(1) # (batch, channels) -> (batch, 1, channels)
# Feature extraction
features = self.backbone(x)
features = features.flatten(start_dim=1)
# Classification head (outputs logits)
isotope_logits = self.classifier(features)
# Regression head
activity_pred = self.regressor(features)
return isotope_logits, activity_pred
def predict(
self,
x: torch.Tensor,
threshold: float = 0.5,
return_all: bool = False
) -> Dict:
"""
Make predictions with post-processing.
Args:
x: Input spectrum tensor
threshold: Probability threshold for isotope presence
return_all: If True, return predictions for all isotopes
Returns:
Dictionary with predictions
"""
self.eval()
with torch.no_grad():
probs, activities = self(x)
# Apply threshold
present = probs >= threshold
# Mask activities by presence
masked_activities = activities * present.float()
return {
'probabilities': probs,
'activities_bq': masked_activities * self.config.max_activity_bq,
'present_mask': present,
'threshold': threshold
}
def count_parameters(self) -> int:
"""Count total trainable parameters."""
return sum(p.numel() for p in self.parameters() if p.requires_grad)
def summary(self) -> str:
"""Get a summary of the model architecture."""
lines = [
"=" * 60,
"VEGA Model - CNN-FCNN Multi-Task Isotope Identifier",
"=" * 60,
f"Input channels: {self.config.num_channels}",
f"Output isotopes: {self.config.num_isotopes}",
f"CNN channels: {self.config.conv_channels}",
f"FC hidden dims: {self.config.fc_hidden_dims}",
f"Dropout rate: {self.config.dropout_rate}",
f"Total parameters: {self.count_parameters():,}",
"=" * 60
]
return "\n".join(lines)
class VegaLoss(nn.Module):
"""
Combined loss function for Vega multi-task learning.
Combines:
- Binary Cross-Entropy for isotope classification (multi-label)
- Huber Loss for activity regression (robust to outliers)
"""
def __init__(
self,
classification_weight: float = 1.0,
regression_weight: float = 0.1,
huber_delta: float = 1.0
):
super().__init__()
self.classification_weight = classification_weight
self.regression_weight = regression_weight
# Use BCEWithLogitsLoss for AMP safety (combines sigmoid + BCE)
self.bce_loss = nn.BCEWithLogitsLoss()
self.huber_loss = nn.HuberLoss(delta=huber_delta)
def forward(
self,
pred_logits: torch.Tensor,
pred_activities: torch.Tensor,
target_presence: torch.Tensor,
target_activities: torch.Tensor
) -> Tuple[torch.Tensor, Dict[str, float]]:
"""
Calculate combined loss.
Args:
pred_logits: Predicted isotope logits (batch, num_isotopes)
pred_activities: Predicted activities (batch, num_isotopes)
target_presence: Ground truth presence labels (batch, num_isotopes)
target_activities: Ground truth activities (batch, num_isotopes)
Returns:
Tuple of total loss and dict of individual losses
"""
# Classification loss (BCEWithLogitsLoss applies sigmoid internally)
cls_loss = self.bce_loss(pred_logits, target_presence.float())
# Regression loss (only for present isotopes)
# Mask to only compute loss where isotopes are actually present
mask = target_presence.float()
if mask.sum() > 0:
masked_pred = pred_activities * mask
masked_target = target_activities * mask
reg_loss = self.huber_loss(masked_pred, masked_target)
else:
reg_loss = torch.tensor(0.0, device=pred_activities.device)
# Combined loss
total_loss = (
self.classification_weight * cls_loss +
self.regression_weight * reg_loss
)
loss_dict = {
'total': total_loss.item(),
'classification': cls_loss.item(),
'regression': reg_loss.item() if isinstance(reg_loss, torch.Tensor) else reg_loss
}
return total_loss, loss_dict
if __name__ == "__main__":
# Test the model
config = VegaConfig()
model = VegaModel(config)
print(model.summary())
# Test forward pass
batch_size = 4
x = torch.randn(batch_size, config.num_channels)
probs, activities = model(x)
print(f"\nInput shape: {x.shape}")
print(f"Output probs shape: {probs.shape}")
print(f"Output activities shape: {activities.shape}")
# Test loss
loss_fn = VegaLoss()
target_presence = torch.randint(0, 2, (batch_size, config.num_isotopes))
target_activities = torch.rand(batch_size, config.num_isotopes) * 100
loss, loss_dict = loss_fn(probs, activities, target_presence, target_activities)
print(f"\nLoss: {loss_dict}")