Files
lidar_rendu/lidar_pipeline/cli.py
Jacquin Antoine 5b74322077 Skip ground classification when DTM already exists
If the DTM .tif exists and --force is not set, skip both ground
classification and DTM generation entirely. Previously, the pipeline
would spend 3+ minutes reclassifying ground even when the DTM was
already present and would be reused anyway.

Also includes: SharedDEM cache, enhanced WebP cartouche (compass rose,
adaptive scale bar, enriched info bar), removed COG/viewer, UTF-8
fix for parallel workers, skip logic for DTM and PDF.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-13 23:41:21 +02:00

243 lines
8.3 KiB
Python

"""Command-line interface for the LiDAR archaeological pipeline.
Handles argument parsing, logging configuration, and entry point.
"""
import argparse
import logging
import os
import shutil
import signal
import sys
from .pipeline import LidarArchaeoPipeline
from .gpu import log_gpu_status
logger = logging.getLogger("lidar")
def setup_logging(verbose=False, debug=False):
"""Configure the 'lidar' logger.
Args:
verbose: If True, include timestamps and level names.
debug: If True, set level to DEBUG and add file:line info.
"""
# Ensure UTF-8 output for French messages (é, è, ê, etc.)
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
if debug:
level = logging.DEBUG
fmt = "%(asctime)s.%(msecs)03d %(levelname)-5s [%(filename)s:%(lineno)d] %(message)s"
elif verbose:
level = logging.INFO
fmt = "%(asctime)s %(levelname)-5s %(message)s"
else:
level = logging.INFO
fmt = "%(message)s"
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
logger.setLevel(level)
logger.handlers.clear()
logger.addHandler(handler)
logger.propagate = False # Prevent double logging via root logger
# Also configure the root logger so worker processes log properly
root_logger = logging.getLogger()
root_logger.setLevel(level)
if not root_logger.handlers:
root_handler = logging.StreamHandler(sys.stdout)
root_handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
root_logger.addHandler(root_handler)
return logger
def main():
"""Entry point for the LiDAR archaeological pipeline."""
parser = argparse.ArgumentParser(
description="Pipeline LiDAR pour détection archéologique",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Exemples:
Traitement standard:
python -m lidar_pipeline /data/input -o /data/output
Haute résolution avec accélération GPU:
python -m lidar_pipeline /data/input -o /data/output -r 0.2 -g
Mode verbeux (timestamps):
python -m lidar_pipeline /data/input -o /data/output -v
Mode debug (détails internes):
python -m lidar_pipeline /data/input -o /data/output --debug
Forcer la régénération de tous les fichiers:
python -m lidar_pipeline /data/input -o /data/output --force
Traiter un seul fichier (pour tests):
python -m lidar_pipeline /data/input -o /data/output --file LHD_FXX_1000_6881_PTS_LAMB93_IGN69.copc
Traitement parallèle (4 workers):
python -m lidar_pipeline /data/input -o /data/output -w 4
"""
)
parser.add_argument(
"input",
help="Dossier contenant les fichiers LAZ/LAS"
)
parser.add_argument(
"-o", "--output",
default="/data/output",
help="Dossier de sortie (défaut: /data/output)"
)
parser.add_argument(
"-r", "--resolution",
type=float,
default=0.5,
help="Résolution en mètres par pixel (défaut: 0.5)"
)
parser.add_argument(
"-w", "--workers",
type=int,
default=1,
help="Nombre de workers pour traitement parallèle (défaut: 1)"
)
parser.add_argument(
"-f", "--force",
action="store_true",
help="Régénérer tous les fichiers même si les WebP existent déjà"
)
parser.add_argument(
"--force-classification",
action="store_true",
help="Reclassifier le sol même si le fichier .las existe déjà"
)
parser.add_argument(
"--keep-tif",
action="store_true",
help="Conserver les fichiers TIFF intermédiaires (sinon supprimés après conversion WebP)"
)
parser.add_argument(
"--ground-classification",
choices=["auto", "smrf", "pmf", "csf"],
default="auto",
help="Méthode de classification du sol : auto (détection), smrf, pmf, csf (défaut: auto)"
)
parser.add_argument(
"--file",
nargs="+",
type=str,
default=None,
help="Traiter un ou plusieurs fichiers LAZ/LAS (nom complet sans extension, ex: LHD_FXX_1000_6882_PTS_LAMB93_IGN69.copc)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Mode verbeux : affiche les timestamps et niveaux"
)
parser.add_argument(
"--debug",
action="store_true",
help="Mode debug : affiche les détails internes (fichier:ligne)"
)
args = parser.parse_args()
# Configure logging before any other output
setup_logging(verbose=args.verbose, debug=args.debug)
# Add file prefix filter for parallel processing
from .pipeline import _file_filter
logger.addFilter(_file_filter)
logger.info("=" * 60)
logger.info("Pipeline LiDAR Archéologique")
logger.info("=" * 60)
# Kill orphan PDAL processes on interrupt or termination
signal.signal(signal.SIGINT, _kill_orphan_pdal)
signal.signal(signal.SIGTERM, _kill_orphan_pdal)
import atexit
atexit.register(_kill_orphan_pdal)
log_gpu_status()
try:
pipeline = LidarArchaeoPipeline(
input_dir=args.input,
output_dir=args.output,
resolution=args.resolution,
workers=args.workers,
force=args.force,
ground_method=args.ground_classification,
force_classify=args.force_classification,
keep_tif=args.keep_tif,
)
# If --file is specified, process only matching files
if args.file:
from pathlib import Path
input_dir = Path(args.input)
# Each pattern is the full filename without extension (e.g. LHD_FXX_1000_6882_PTS_LAMB93_IGN69.copc)
# Also supports bare name without .copc (e.g. LHD_FXX_1000_6882_PTS_LAMB93_IGN69)
selected_files = []
for pattern in args.file:
matches = (list(input_dir.glob(f"{pattern}.laz"))
+ list(input_dir.glob(f"{pattern}.las"))
+ list(input_dir.glob(f"{pattern}.copc.laz"))
+ list(input_dir.glob(f"{pattern}.copc.las")))
# Remove duplicates
matches = list(dict.fromkeys(matches))
if not matches:
logger.warning(f"Aucun fichier trouvé pour: {pattern}")
continue
selected_files.extend(matches)
# Remove duplicates across patterns
seen = set()
unique_files = []
for f in selected_files:
if f not in seen:
seen.add(f)
unique_files.append(f)
if not unique_files:
logger.error("Aucun fichier trouvé pour les motifs spécifiés")
sys.exit(1)
logger.info(f"Traitement de {len(unique_files)} fichier(s) sélectionné(s)")
for laz_file in unique_files:
logger.info(f"{laz_file.name}")
for laz_file in unique_files:
pipeline.process_file(laz_file)
# Clean up temporary files
logger.info("Nettoyage des fichiers temporaires...")
try:
if pipeline.temp_dir.exists():
shutil.rmtree(pipeline.temp_dir)
temp_base = pipeline.output_dir / "temp"
if temp_base.exists():
shutil.rmtree(temp_base)
logger.info(" ✓ Fichiers temporaires supprimés")
except Exception as e:
logger.warning(f" Note: Impossible de supprimer les fichiers temporaires: {e}")
else:
pipeline.process_all()
except Exception as e:
logger.error(f"Erreur fatale: {e}", exc_info=True)
sys.exit(1)
def _kill_orphan_pdal(signum=None, frame=None):
"""Kill orphan PDAL processes on interrupt or exit."""
import subprocess
try:
subprocess.run(["pkill", "-f", "pdal"], capture_output=True, timeout=5)
except Exception:
pass
if signum is not None:
logger.info("Interruption — nettoyage des processus PDAL")
sys.exit(130)