Files
lidar_rendu/lidar_pipeline/cli.py
Jacquin Antoine eac482874d Fix bugs and improve pipeline flexibility
- Fix gpu_cleanup import missing in visualizations.py (NameError in workers)
- Fix t_pdf referenced before assignment when PDF is skipped
- Skip classification+DTM when DTM exists regardless of --force
- --force now only regenerates WebP/PDF, not classification/DTM
- --force-classification forces reclassification when needed
- Add laspy repair fallback for corrupt LAZ files (EVLR errors)
- Keep DTM TIF by default for reuse (--no-keep-tif to delete)
- Increase space between image and bottom cartouche (0.12→0.19)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 00:08:25 +02:00

243 lines
8.3 KiB
Python

"""Command-line interface for the LiDAR archaeological pipeline.
Handles argument parsing, logging configuration, and entry point.
"""
import argparse
import logging
import os
import shutil
import signal
import sys
from .pipeline import LidarArchaeoPipeline
from .gpu import log_gpu_status
logger = logging.getLogger("lidar")
def setup_logging(verbose=False, debug=False):
"""Configure the 'lidar' logger.
Args:
verbose: If True, include timestamps and level names.
debug: If True, set level to DEBUG and add file:line info.
"""
# Ensure UTF-8 output for French messages (é, è, ê, etc.)
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
if debug:
level = logging.DEBUG
fmt = "%(asctime)s.%(msecs)03d %(levelname)-5s [%(filename)s:%(lineno)d] %(message)s"
elif verbose:
level = logging.INFO
fmt = "%(asctime)s %(levelname)-5s %(message)s"
else:
level = logging.INFO
fmt = "%(message)s"
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
logger.setLevel(level)
logger.handlers.clear()
logger.addHandler(handler)
logger.propagate = False # Prevent double logging via root logger
# Also configure the root logger so worker processes log properly
root_logger = logging.getLogger()
root_logger.setLevel(level)
if not root_logger.handlers:
root_handler = logging.StreamHandler(sys.stdout)
root_handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
root_logger.addHandler(root_handler)
return logger
def main():
"""Entry point for the LiDAR archaeological pipeline."""
parser = argparse.ArgumentParser(
description="Pipeline LiDAR pour détection archéologique",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Exemples:
Traitement standard:
python -m lidar_pipeline /data/input -o /data/output
Haute résolution avec accélération GPU:
python -m lidar_pipeline /data/input -o /data/output -r 0.2 -g
Mode verbeux (timestamps):
python -m lidar_pipeline /data/input -o /data/output -v
Mode debug (détails internes):
python -m lidar_pipeline /data/input -o /data/output --debug
Forcer la régénération de tous les fichiers:
python -m lidar_pipeline /data/input -o /data/output --force
Traiter un seul fichier (pour tests):
python -m lidar_pipeline /data/input -o /data/output --file LHD_FXX_1000_6881_PTS_LAMB93_IGN69.copc
Traitement parallèle (4 workers):
python -m lidar_pipeline /data/input -o /data/output -w 4
"""
)
parser.add_argument(
"input",
help="Dossier contenant les fichiers LAZ/LAS"
)
parser.add_argument(
"-o", "--output",
default="/data/output",
help="Dossier de sortie (défaut: /data/output)"
)
parser.add_argument(
"-r", "--resolution",
type=float,
default=0.5,
help="Résolution en mètres par pixel (défaut: 0.5)"
)
parser.add_argument(
"-w", "--workers",
type=int,
default=1,
help="Nombre de workers pour traitement parallèle (défaut: 1)"
)
parser.add_argument(
"-f", "--force",
action="store_true",
help="Régénérer tous les fichiers même si les WebP existent déjà"
)
parser.add_argument(
"--force-classification",
action="store_true",
help="Reclassifier le sol même si le fichier .las existe déjà"
)
parser.add_argument(
"--no-keep-tif",
action="store_true",
help="Supprimer les fichiers TIFF intermédiaires après conversion WebP (par défaut: conservés)"
)
parser.add_argument(
"--ground-classification",
choices=["auto", "smrf", "pmf", "csf"],
default="auto",
help="Méthode de classification du sol : auto (détection), smrf, pmf, csf (défaut: auto)"
)
parser.add_argument(
"--file",
nargs="+",
type=str,
default=None,
help="Traiter un ou plusieurs fichiers LAZ/LAS (nom complet sans extension, ex: LHD_FXX_1000_6882_PTS_LAMB93_IGN69.copc)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Mode verbeux : affiche les timestamps et niveaux"
)
parser.add_argument(
"--debug",
action="store_true",
help="Mode debug : affiche les détails internes (fichier:ligne)"
)
args = parser.parse_args()
# Configure logging before any other output
setup_logging(verbose=args.verbose, debug=args.debug)
# Add file prefix filter for parallel processing
from .pipeline import _file_filter
logger.addFilter(_file_filter)
logger.info("=" * 60)
logger.info("Pipeline LiDAR Archéologique")
logger.info("=" * 60)
# Kill orphan PDAL processes on interrupt or termination
signal.signal(signal.SIGINT, _kill_orphan_pdal)
signal.signal(signal.SIGTERM, _kill_orphan_pdal)
import atexit
atexit.register(_kill_orphan_pdal)
log_gpu_status()
try:
pipeline = LidarArchaeoPipeline(
input_dir=args.input,
output_dir=args.output,
resolution=args.resolution,
workers=args.workers,
force=args.force,
ground_method=args.ground_classification,
force_classify=args.force_classification,
keep_tif=not args.no_keep_tif,
)
# If --file is specified, process only matching files
if args.file:
from pathlib import Path
input_dir = Path(args.input)
# Each pattern is the full filename without extension (e.g. LHD_FXX_1000_6882_PTS_LAMB93_IGN69.copc)
# Also supports bare name without .copc (e.g. LHD_FXX_1000_6882_PTS_LAMB93_IGN69)
selected_files = []
for pattern in args.file:
matches = (list(input_dir.glob(f"{pattern}.laz"))
+ list(input_dir.glob(f"{pattern}.las"))
+ list(input_dir.glob(f"{pattern}.copc.laz"))
+ list(input_dir.glob(f"{pattern}.copc.las")))
# Remove duplicates
matches = list(dict.fromkeys(matches))
if not matches:
logger.warning(f"Aucun fichier trouvé pour: {pattern}")
continue
selected_files.extend(matches)
# Remove duplicates across patterns
seen = set()
unique_files = []
for f in selected_files:
if f not in seen:
seen.add(f)
unique_files.append(f)
if not unique_files:
logger.error("Aucun fichier trouvé pour les motifs spécifiés")
sys.exit(1)
logger.info(f"Traitement de {len(unique_files)} fichier(s) sélectionné(s)")
for laz_file in unique_files:
logger.info(f"{laz_file.name}")
for laz_file in unique_files:
pipeline.process_file(laz_file)
# Clean up temporary files
logger.info("Nettoyage des fichiers temporaires...")
try:
if pipeline.temp_dir.exists():
shutil.rmtree(pipeline.temp_dir)
temp_base = pipeline.output_dir / "temp"
if temp_base.exists():
shutil.rmtree(temp_base)
logger.info(" ✓ Fichiers temporaires supprimés")
except Exception as e:
logger.warning(f" Note: Impossible de supprimer les fichiers temporaires: {e}")
else:
pipeline.process_all()
except Exception as e:
logger.error(f"Erreur fatale: {e}", exc_info=True)
sys.exit(1)
def _kill_orphan_pdal(signum=None, frame=None):
"""Kill orphan PDAL processes on interrupt or exit."""
import subprocess
try:
subprocess.run(["pkill", "-f", "pdal"], capture_output=True, timeout=5)
except Exception:
pass
if signum is not None:
logger.info("Interruption — nettoyage des processus PDAL")
sys.exit(130)