Files
lidar_rendu/lidar_pipeline/pipeline.py
Jacquin Antoine e2bd6b2536 Remove RRIM and Multi-Hillshade RGB, fix DTM resolution reuse bug, add --init to docker run
- Remove generate_rrim, generate_multi_hillshade, _compute_openness_both
- Remove corresponding VIZ_STEPS entries, COLORMAPS, RGB_LEGENDS, and tests
- Fix DTM resolution mismatch: existing DTM at different resolution is now
  regenerated instead of silently reused
- Propagate actual DTM resolution to visualizations and rendering
- Add --init to docker run commands for proper signal handling on Ctrl+C
- Add .playwright-mcp/ to .gitignore

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 02:19:42 +02:00

468 lines
20 KiB
Python

"""Pipeline orchestration for LiDAR archaeological analysis.
LidarArchaeoPipeline coordinates the full processing chain:
1. Ground classification (PDAL/SMRF)
2. DTM generation
3. Visualization generation (17 products)
4. Rendering (WebP + PDF report)
"""
import logging
import multiprocessing
import shutil
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
import subprocess
# Use 'spawn' to avoid CUDA context corruption in forked subprocesses
try:
multiprocessing.set_start_method('spawn')
except RuntimeError:
pass # Already set (e.g. in tests or when called multiple times)
logger = logging.getLogger("lidar")
def _file_basename(path):
"""Extract base name from a LAZ/LAS file, removing all known extensions.
Handles double extensions like .copc.laz correctly:
'file.copc.laz' -> 'file', not 'file.copc'
"""
name = Path(path).name
# Remove known LiDAR extensions (order matters: longest first)
for ext in ['.copc.laz', '.copc.las', '.laz', '.las']:
if name.lower().endswith(ext):
return name[:-len(ext)]
return Path(path).stem
class FilePrefixFilter(logging.Filter):
"""Adds a file prefix to log messages when processing a specific file."""
def __init__(self):
super().__init__()
self.basename = None
def filter(self, record):
if self.basename:
record.msg = f"[{self.basename}] {record.msg}"
return True
# Module-level filter instance so process_file can set it
_file_filter = FilePrefixFilter()
from .dtm import classify_ground, create_dtm_fast
from .visualizations import (
SharedDEM,
generate_hillshade, generate_slope, generate_aspect, generate_curvature,
generate_lrm, generate_svf, generate_openness,
generate_mslrm, generate_tpi, generate_sailore,
generate_roughness, generate_anomalies, generate_wavelet,
generate_flow, generate_local_dominance,
)
from .gpu import gpu_cleanup
from .ign import generate_ign_overlay
from .rendering import tif_to_png, generate_pdf_report
# Ordered list of visualization steps.
# Each entry: (name, function_or_lambda)
# Adding a new visualization = add a generate_* function + register here.
VIZ_STEPS = [
('hillshade', generate_hillshade),
('slope', generate_slope),
('aspect', generate_aspect),
('curvature', generate_curvature),
('svf', generate_svf),
('lrm', generate_lrm),
('pos_open', lambda d, b, v, r, shared=None: generate_openness(d, b, v, r, positive=True, shared=shared)),
('neg_open', lambda d, b, v, r, shared=None: generate_openness(d, b, v, r, positive=False, shared=shared)),
('mslrm', generate_mslrm),
('tpi', generate_tpi),
('sailore', generate_sailore),
('roughness', generate_roughness),
('anomalies', generate_anomalies),
('wavelet', generate_wavelet),
('flow', generate_flow),
('local_dominance', generate_local_dominance),
('ortho', lambda d, b, v, r: generate_ign_overlay(
d, b, v, r,
layer='ORTHOIMAGERY.ORTHOPHOTOS',
title='Photographie Aérienne IGN',
legend_label='Orthophotographie\nImage aérienne',
description='Photographie aérienne IGN (Orthophoto)',
out_suffix='ortho')),
('topo', lambda d, b, v, r: generate_ign_overlay(
d, b, v, r,
layer='GEOGRAPHICALGRIDSYSTEMS.PLANIGNV2',
title='Carte Topographique IGN',
legend_label='Carte IGN\nPlan topographique',
description='Carte topographique IGN (Plan IGN)',
out_suffix='topo')),
]
class LidarArchaeoPipeline:
"""Orchestrates the LiDAR archaeological analysis pipeline."""
def __init__(self, input_dir, output_dir, resolution=0.5, workers=1, force=False, ground_method='auto', force_classify=False, keep_tif=False):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.resolution = resolution
self.workers = workers
self.force = force
self.ground_method = ground_method
self.force_classify = force_classify
self.keep_tif = keep_tif
self.temp_dir = self.output_dir / "temp"
if not self.input_dir.exists():
raise ValueError(f"Répertoire introuvable: {self.input_dir}")
self.output_dir.mkdir(parents=True, exist_ok=True)
self.temp_dir.mkdir(exist_ok=True)
self.dtm_dir = self.output_dir / "DTM"
self.vis_dir = self.output_dir / "visualisations"
self.pdf_dir = self.output_dir / "rapports"
for d in [self.dtm_dir, self.vis_dir, self.pdf_dir]:
d.mkdir(exist_ok=True)
logger.info("Pipeline initialisé")
logger.info(f" Entrée : {self.input_dir}")
logger.info(f" Sortie : {self.output_dir}")
logger.info(f" Résolution : {resolution}m/px")
logger.info(f" Workers : {workers}")
logger.info(f" Force : {'OUI' if self.force else 'non (skip existing)'}")
logger.info(f" Classification sol : {self.ground_method}")
logger.info(f" Force classif.: {'OUI' if self.force_classify else 'non'}")
logger.info(f" Keep TIFF : {'OUI' if self.keep_tif else 'non'}")
def find_laz_files(self):
"""Find all LAZ/LAS files in input directory."""
files = list(self.input_dir.glob("*.laz")) + list(self.input_dir.glob("*.las"))
logger.info(f"{len(files)} fichier(s) LiDAR trouvé(s)")
for f in sorted(files):
logger.debug(f" {f.name}")
return sorted(files)
def check_tools(self):
"""Check that required external tools are available."""
for name, cmd in [('pdal', 'pdal --version'), ('gdal', 'gdalinfo --version')]:
try:
result = subprocess.run(cmd.split(), capture_output=True, check=True, text=True)
version = result.stdout.strip().split('\n')[0]
logger.info(f"{name}: {version}")
except (subprocess.CalledProcessError, FileNotFoundError):
logger.error(f"{name} non disponible")
return False
return True
def generate_all_visualizations(self, dtm_file, basename, resolution=None):
"""Generate all archaeological visualizations for one DTM file.
Args:
resolution: Actual resolution from DTM geotransform. If None, uses self.resolution.
"""
if resolution is None:
resolution = self.resolution
logger.info(" Génération visualisations:")
# Create per-file subdirectory
file_vis_dir = self.vis_dir / basename
file_vis_dir.mkdir(exist_ok=True)
# Pre-compute shared DEM data (gradient, NaN mask, LRM) once for all visualizations
logger.info(" Pré-calcul données partagées (gradient, LRM)...")
t_shared = time.time()
shared = SharedDEM(dtm_file, resolution)
logger.info(f" ✓ Données partagées prêtes ({time.time()-t_shared:.1f}s)")
vis_results = {}
total = len(VIZ_STEPS)
for idx, (name, func) in enumerate(VIZ_STEPS, 1):
# When --force, delete existing TIF to ensure clean regeneration
if self.force:
for tif in file_vis_dir.glob(f"{basename}_{name}.tif"):
tif.unlink(missing_ok=True)
# Special cases for differently-named TIFs
if name == 'pos_open':
for tif in file_vis_dir.glob(f"{basename}_positive_openness.tif"):
tif.unlink(missing_ok=True)
elif name == 'neg_open':
for tif in file_vis_dir.glob(f"{basename}_negative_openness.tif"):
tif.unlink(missing_ok=True)
elif name == 'hillshade':
for tif in file_vis_dir.glob(f"{basename}_hillshade_multi.tif"):
tif.unlink(missing_ok=True)
# Check if output WebP already exists (skip unless --force)
if not self.force:
# Determine expected WebP filename from the viz name
# Special cases for openness and IGN overlays
if name == 'pos_open':
expected_webp = file_vis_dir / f"{basename}_positive_openness.webp"
elif name == 'neg_open':
expected_webp = file_vis_dir / f"{basename}_negative_openness.webp"
elif name == 'hillshade':
expected_webp = file_vis_dir / f"{basename}_hillshade_multi.webp"
elif name in ('ortho', 'topo'):
expected_webp = file_vis_dir / f"{basename}_{name}.webp"
else:
expected_webp = file_vis_dir / f"{basename}_{name}.webp"
if expected_webp.exists():
logger.info(f" [{idx}/{total}] {name}: déjà existant, ignoré")
vis_results[name] = expected_webp # Track as existing file
continue
logger.info(f" [{idx}/{total}] {name}...")
t0 = time.time()
try:
# IGN overlays don't use SharedDEM (they download external data)
if name in ('ortho', 'topo'):
result = func(dtm_file, basename, file_vis_dir, resolution)
else:
result = func(dtm_file, basename, file_vis_dir, resolution, shared=shared)
vis_results[name] = result
elapsed = time.time() - t0
if result:
logger.info(f" [{idx}/{total}] ✓ {name} ({elapsed:.1f}s)")
else:
logger.warning(f" [{idx}/{total}] ✗ {name} — no output ({elapsed:.1f}s)")
except Exception as e:
vis_results[name] = None
logger.error(f" [{idx}/{total}] ✗ {name}: {e}", exc_info=True)
# Free GPU memory between visualizations to prevent OOM
gpu_cleanup()
# Convert to WebP (only newly generated TIFs, not skipped ones)
logger.info(" Conversion images WebP:")
source_info = {
'method': self.ground_method,
'date': datetime.now().strftime('%Y-%m-%d'),
'basename': basename,
}
for name, tif_file in vis_results.items():
if tif_file and isinstance(tif_file, Path) and tif_file.suffix == '.tif' and tif_file.exists():
webp_file = tif_to_png(tif_file, file_vis_dir, resolution, keep_tif=self.keep_tif, source_info=source_info)
if webp_file:
logger.info(f"{webp_file.name}")
# Clean up remaining TIF files unless --keep-tif
if not self.keep_tif:
for tif in file_vis_dir.glob("*.tif"):
tif.unlink(missing_ok=True)
return vis_results
def process_file(self, laz_file):
"""Process a single LAZ file through the full pipeline."""
basename = _file_basename(laz_file)
_file_filter.basename = basename
t_start = time.time()
logger.info("=" * 60)
logger.info(f"FICHIER : {basename}")
logger.info("=" * 60)
# Skip ground classification + DTM if DTM already exists with matching resolution
# --force only affects visualizations/PDF, not classification/DTM
# Use --force-classification to force reclassification
dtm_path = self.dtm_dir / f"{basename}_dtm.tif"
if dtm_path.exists():
# Check that existing DTM resolution matches requested resolution
import rasterio
try:
with rasterio.open(dtm_path) as src:
existing_res = abs(src.transform.a)
if abs(existing_res - self.resolution) > 0.01:
logger.info(f"[1/5] DTM existant à {existing_res}m/px — résolution demandée {self.resolution}m/px → régénération")
dtm_path.unlink()
else:
logger.info(f"[1/5] Classification du sol — sautée (DTM existant à {existing_res}m/px)")
logger.info("[2/5] Génération DTM — sautée (DTM existant)")
dtm_file = dtm_path
t_classif = 0
t_dtm = 0
except Exception:
logger.warning(f"Impossible de lire le DTM existant — régénération")
dtm_path.unlink()
if not dtm_path.exists():
# Step 1: Ground classification
logger.info("[1/5] Classification du sol...")
t1 = time.time()
las_file = classify_ground(laz_file, self.temp_dir, method=self.ground_method, force=self.force_classify)
t_classif = time.time() - t1
if not las_file:
logger.error(f" ✗ Échec classification ({t_classif:.1f}s)")
return False
logger.info(f" ✓ Classification terminée ({t_classif:.1f}s)")
# Step 2: Generate DTM
logger.info("[2/5] Génération DTM...")
t2 = time.time()
dtm_file = create_dtm_fast(las_file, basename, self.dtm_dir, self.resolution)
t_dtm = time.time() - t2
if not dtm_file:
logger.error(f" ✗ Échec DTM ({t_dtm:.1f}s)")
return False
logger.info(f" ✓ DTM terminé ({t_dtm:.1f}s)")
# Step 3: Visualizations — use actual resolution from DTM
import rasterio
with rasterio.open(dtm_file) as src:
actual_res = abs(src.transform.a)
if abs(actual_res - self.resolution) > 0.01:
logger.info(f" Résolution DTM: {actual_res}m/px (demandée: {self.resolution}m/px)")
self.generate_all_visualizations(dtm_file, basename, actual_res)
# Step 4: PDF report
t_pdf = 0
file_vis_dir = self.vis_dir / basename
pdf_file = self.pdf_dir / f"{basename}_rapport.pdf"
if pdf_file.exists() and not self.force:
logger.info(f"[4/5] Rapport PDF déjà existant — ignoré: {pdf_file.name}")
else:
logger.info("[4/5] Rapport PDF A3...")
t4 = time.time()
generate_pdf_report(basename, file_vis_dir, self.pdf_dir, actual_res)
t_pdf = time.time() - t4
logger.info(f" ✓ Rapport PDF terminé ({t_pdf:.1f}s)")
t_total = time.time() - t_start
logger.info(f"{basename} terminé en {t_total:.1f}s")
logger.debug(f" Détails: classification={t_classif:.1f}s, DTM={t_dtm:.1f}s, PDF={t_pdf:.1f}s")
_file_filter.basename = None
return True
def process_all(self):
"""Process all LAZ files in input directory."""
files = self.find_laz_files()
if not files:
logger.error("Aucun fichier LAZ/LAS trouvé !")
return
logger.info("=" * 60)
logger.info("PIPELINE ARCHÉOLOGIQUE LiDAR")
logger.info("=" * 60)
logger.info("Vérification des outils...")
if not self.check_tools():
logger.error("Outils manquants — abandon")
return
results = {}
t_pipeline_start = time.time()
if self.workers > 1 and len(files) > 1:
logger.info(f"Traitement parallèle avec {self.workers} workers...")
logger.info(f"Fichiers: {len(files)}")
with ProcessPoolExecutor(max_workers=self.workers) as executor:
future_to_file = {
executor.submit(_process_file_standalone, str(laz_file), str(self.input_dir), str(self.output_dir), self.resolution, self.force, self.ground_method, self.force_classify, self.keep_tif): laz_file
for laz_file in files
}
done = 0
for future in as_completed(future_to_file):
laz_file = future_to_file[future]
done += 1
try:
success = future.result()
results[laz_file.name] = success
status = "" if success else ""
logger.info(f" [{done}/{len(files)}] {status} {laz_file.name}")
except Exception as e:
logger.error(f" [{done}/{len(files)}] ✗ {laz_file.name}: {e}")
logger.debug(f" Traceback:", exc_info=True)
results[laz_file.name] = False
else:
total = len(files)
for idx, laz_file in enumerate(files, 1):
logger.info(f"--- Fichier {idx}/{total} ---")
try:
results[laz_file.name] = self.process_file(laz_file)
except Exception as e:
logger.error(f"✗ Erreur traitement {laz_file.name}: {e}")
logger.debug("Traceback:", exc_info=True)
results[laz_file.name] = False
# Summary
t_pipeline_total = time.time() - t_pipeline_start
success_count = sum(1 for v in results.values() if v)
fail_count = sum(1 for v in results.values() if not v)
logger.info("=" * 60)
logger.info("RÉSUMÉ")
logger.info("=" * 60)
logger.info(f" Succès : {success_count}/{len(results)}")
if fail_count:
logger.info(f" Échecs : {fail_count}/{len(results)}")
for name, ok in results.items():
if not ok:
logger.info(f"{name}")
logger.info(f" Durée totale : {t_pipeline_total:.1f}s ({t_pipeline_total/60:.1f}min)")
logger.info(f"\nRésultats dans: {self.output_dir}")
logger.info(f" • DTM : {self.dtm_dir}")
logger.info(f" • Visualisations: {self.vis_dir}")
logger.info(f" • Rapports PDF : {self.pdf_dir}")
# Clean up temporary files
logger.info("Nettoyage des fichiers temporaires...")
try:
if self.temp_dir.exists():
shutil.rmtree(self.temp_dir)
# Also clean up any subdirectories inside temp/
temp_base = self.output_dir / "temp"
if temp_base.exists():
shutil.rmtree(temp_base)
logger.info(" ✓ Fichiers temporaires supprimés")
except Exception as e:
logger.warning(f" Note: Impossible de supprimer les fichiers temporaires: {e}")
def _process_file_standalone(laz_file_str, input_dir, output_dir, resolution, force=False, ground_method='auto', force_classify=False, keep_tif=False):
"""Standalone function for multiprocessing — creates its own pipeline instance.
Each worker gets its own temp directory to avoid file conflicts.
"""
# Configure logging in worker process (spawn doesn't inherit parent config)
import logging
import sys
# Ensure UTF-8 output — spawn workers may default to ASCII
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
worker_logger = logging.getLogger("lidar")
if not worker_logger.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(message)s"))
worker_logger.setLevel(logging.INFO)
worker_logger.addHandler(handler)
worker_logger.addFilter(_file_filter)
pipeline = LidarArchaeoPipeline(input_dir, output_dir, resolution=resolution, workers=1, force=force, ground_method=ground_method, force_classify=force_classify, keep_tif=keep_tif)
basename = _file_basename(laz_file_str)
pipeline.temp_dir = pipeline.output_dir / "temp" / basename
pipeline.temp_dir.mkdir(exist_ok=True)
laz_file = Path(laz_file_str)
result = pipeline.process_file(laz_file)
# Clean up per-file temp directory
try:
if pipeline.temp_dir.exists():
shutil.rmtree(pipeline.temp_dir)
except Exception:
pass
return result