"""Command-line interface for the LiDAR archaeological pipeline. Handles argument parsing, logging configuration, and entry point. """ import argparse import logging import os import shutil import signal import sys from .pipeline import LidarArchaeoPipeline from .gpu import log_gpu_status logger = logging.getLogger("lidar") def setup_logging(verbose=False, debug=False): """Configure the 'lidar' logger. Args: verbose: If True, include timestamps and level names. debug: If True, set level to DEBUG and add file:line info. """ # Ensure UTF-8 output for French messages (é, è, ê, etc.) if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8', errors='replace') if hasattr(sys.stderr, 'reconfigure'): sys.stderr.reconfigure(encoding='utf-8', errors='replace') if debug: level = logging.DEBUG fmt = "%(asctime)s.%(msecs)03d %(levelname)-5s [%(filename)s:%(lineno)d] %(message)s" elif verbose: level = logging.INFO fmt = "%(asctime)s %(levelname)-5s %(message)s" else: level = logging.INFO fmt = "%(message)s" handler = logging.StreamHandler(sys.stdout) handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S")) logger.setLevel(level) logger.handlers.clear() logger.addHandler(handler) logger.propagate = False # Prevent double logging via root logger # Also configure the root logger so worker processes log properly root_logger = logging.getLogger() root_logger.setLevel(level) if not root_logger.handlers: root_handler = logging.StreamHandler(sys.stdout) root_handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S")) root_logger.addHandler(root_handler) return logger def main(): """Entry point for the LiDAR archaeological pipeline.""" parser = argparse.ArgumentParser( description="Pipeline LiDAR pour détection archéologique", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ Exemples: Traitement standard: python -m lidar_pipeline /data/input -o /data/output Haute résolution avec accélération GPU: python -m lidar_pipeline /data/input -o /data/output -r 0.2 -g Mode verbeux (timestamps): python -m lidar_pipeline /data/input -o /data/output -v Mode debug (détails internes): python -m lidar_pipeline /data/input -o /data/output --debug Forcer la régénération de tous les fichiers: python -m lidar_pipeline /data/input -o /data/output --force Traiter un seul fichier (pour tests): python -m lidar_pipeline /data/input -o /data/output --file LHD_FXX_1000_6881_PTS_LAMB93_IGN69.copc Traitement parallèle (4 workers): python -m lidar_pipeline /data/input -o /data/output -w 4 """ ) parser.add_argument( "input", help="Dossier contenant les fichiers LAZ/LAS" ) parser.add_argument( "-o", "--output", default="/data/output", help="Dossier de sortie (défaut: /data/output)" ) parser.add_argument( "-r", "--resolution", type=float, default=0.5, help="Résolution en mètres par pixel (défaut: 0.5)" ) parser.add_argument( "-w", "--workers", type=int, default=1, help="Nombre de workers pour traitement parallèle (défaut: 1)" ) parser.add_argument( "-f", "--force", action="store_true", help="Régénérer tous les fichiers même si les WebP existent déjà" ) parser.add_argument( "--force-classification", action="store_true", help="Reclassifier le sol même si le fichier .las existe déjà" ) parser.add_argument( "--keep-tif", action="store_true", help="Conserver les fichiers TIFF (DTM + visualisations) pour pouvoir régénérer les WebP sans recalculer" ) parser.add_argument( "--ground-classification", choices=["auto", "smrf", "csf"], default="auto", help="Méthode de classification du sol : auto (détection), smrf, csf (défaut: auto)" ) parser.add_argument( "--quality", type=int, default=85, help="Qualité WebP (1-100, défaut: 85). Utilisez 100 pour lossless." ) parser.add_argument( "--lossless", action="store_true", help="Forcer la compression WebP lossless (équivalent à --quality 100)" ) parser.add_argument( "--only", nargs="+", type=str, default=None, help="Générer uniquement ces visualisations (ex: --only hillshade svf lrm)" ) parser.add_argument( "--skip", nargs="+", type=str, default=None, help="Exclure ces visualisations (ex: --skip ortho topo)" ) parser.add_argument( "--file", nargs="+", type=str, default=None, help="Traiter un ou plusieurs fichiers LAZ/LAS (nom complet sans extension, ex: LHD_FXX_1000_6882_PTS_LAMB93_IGN69.copc)" ) parser.add_argument( "-v", "--verbose", action="store_true", help="Mode verbeux : affiche les timestamps et niveaux" ) parser.add_argument( "--debug", action="store_true", help="Mode debug : affiche les détails internes (fichier:ligne)" ) args = parser.parse_args() # Configure logging before any other output setup_logging(verbose=args.verbose, debug=args.debug) # Add file prefix filter for parallel processing from .pipeline import _file_filter logger.addFilter(_file_filter) logger.info("=" * 60) logger.info("Pipeline LiDAR Archéologique") logger.info("=" * 60) # Kill orphan PDAL processes on interrupt or termination signal.signal(signal.SIGINT, _kill_orphan_pdal) signal.signal(signal.SIGTERM, _kill_orphan_pdal) import atexit atexit.register(_kill_orphan_pdal) log_gpu_status() try: quality = 100 if args.lossless else args.quality pipeline = LidarArchaeoPipeline( input_dir=args.input, output_dir=args.output, resolution=args.resolution, workers=args.workers, force=args.force, ground_method=args.ground_classification, force_classify=args.force_classification, keep_tif=args.keep_tif, quality=quality, only_viz=args.only, skip_viz=args.skip, ) # If --file is specified, process only matching files if args.file: from pathlib import Path input_dir = Path(args.input) # Each pattern is the full filename without extension (e.g. LHD_FXX_1000_6882_PTS_LAMB93_IGN69.copc) # Also supports bare name without .copc (e.g. LHD_FXX_1000_6882_PTS_LAMB93_IGN69) selected_files = [] for pattern in args.file: # Try exact filename first (e.g. LHD_FXX_...copc.laz) exact_match = input_dir / pattern if exact_match.exists() and exact_match.is_file(): matches = [exact_match] else: # Try with added extensions (e.g. pattern=LHD_FXX_...IGN69) matches = (list(input_dir.glob(f"{pattern}.laz")) + list(input_dir.glob(f"{pattern}.las")) + list(input_dir.glob(f"{pattern}.copc.laz")) + list(input_dir.glob(f"{pattern}.copc.las"))) # Remove duplicates matches = list(dict.fromkeys(matches)) if not matches: logger.warning(f"Aucun fichier trouvé pour: {pattern}") continue selected_files.extend(matches) # Remove duplicates across patterns seen = set() unique_files = [] for f in selected_files: if f not in seen: seen.add(f) unique_files.append(f) if not unique_files: logger.error("Aucun fichier trouvé pour les motifs spécifiés") sys.exit(1) logger.info(f"Traitement de {len(unique_files)} fichier(s) sélectionné(s)") for laz_file in unique_files: logger.info(f" → {laz_file.name}") for laz_file in unique_files: pipeline.process_file(laz_file) # Clean up temporary files logger.info("Nettoyage des fichiers temporaires...") try: if pipeline.temp_dir.exists(): shutil.rmtree(pipeline.temp_dir) temp_base = pipeline.output_dir / "temp" if temp_base.exists(): shutil.rmtree(temp_base) logger.info(" ✓ Fichiers temporaires supprimés") except Exception as e: logger.warning(f" Note: Impossible de supprimer les fichiers temporaires: {e}") else: pipeline.process_all() except Exception as e: logger.error(f"Erreur fatale: {e}", exc_info=True) sys.exit(1) def _kill_orphan_pdal(signum=None, frame=None): """Kill orphan PDAL processes on interrupt or exit.""" import subprocess try: subprocess.run(["pkill", "-f", "pdal"], capture_output=True, timeout=5) except Exception: pass if signum is not None: logger.info("Interruption — nettoyage des processus PDAL") sys.exit(130)