#!/usr/bin/env python3 """ Pipeline LiDAR pour détection archéologique - Version Python pur Visualisations générées avec numpy/rasterio/matplotlib """ import os import sys import json import subprocess from pathlib import Path import numpy as np import rasterio from rasterio.transform import from_bounds import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from matplotlib import rcParams from scipy import ndimage from scipy.stats import binned_statistic_2d from tqdm import tqdm try: import laspy except ImportError as e: print(f"Erreur import: {e}") sys.exit(1) rcParams['figure.dpi'] = 150 rcParams['savefig.dpi'] = 300 rcParams['font.size'] = 10 class LidarArchaeoPipeline: """Pipeline Python pur pour analyse archéologique LiDAR""" def __init__(self, input_dir, output_dir, resolution=0.5): self.input_dir = Path(input_dir) self.output_dir = Path(output_dir) self.resolution = resolution self.temp_dir = self.output_dir / "temp" if not self.input_dir.exists(): raise ValueError(f"Répertoire introuvable: {self.input_dir}") self.output_dir.mkdir(parents=True, exist_ok=True) self.temp_dir.mkdir(exist_ok=True) self.dtm_dir = self.output_dir / "DTM" self.vis_dir = self.output_dir / "visualisations" self.report_dir = self.output_dir / "rapports" for d in [self.dtm_dir, self.vis_dir, self.report_dir]: d.mkdir(exist_ok=True) print(f"✓ Pipeline initialisé (Python pur)") print(f" Entrée : {self.input_dir}") print(f" Sortie : {self.output_dir}") print(f" Résolution : {resolution}m/px") def find_laz_files(self): """Trouve tous les fichiers LAZ/LAS""" files = list(self.input_dir.glob("*.laz")) + list(self.input_dir.glob("*.las")) print(f"✓ {len(files)} fichier(s) LiDAR trouvé(s)") return sorted(files) def check_tools(self): """Vérifie que les outils sont disponibles""" for name, cmd in [('pdal', 'pdal --version'), ('gdal', 'gdalinfo --version')]: try: subprocess.run(cmd.split(), capture_output=True, check=True) print(f" ✓ {name}") except (subprocess.CalledProcessError, FileNotFoundError): return False return True # ============ STEP 1: Classification ============ def create_pipeline_json(self, input_laz, output_las): """Create PDAL pipeline for ground classification""" pipeline = { "pipeline": [ str(input_laz), { "type": "filters.smrf", "ignore": "Classification[7:7]", "slope": 1.0, "window": 16.0, "threshold": 0.5, "scalar": 1.25 }, { "type": "filters.range", "limits": "Classification[2:2]" }, { "type": "writers.las", "filename": str(output_las), "extra_dims": "all" } ] } return json.dumps(pipeline) def classify_ground(self, laz_file): """Classify ground points using SMRF filter""" output_las = self.temp_dir / f"{laz_file.stem}_ground.las" if output_las.exists(): print(f" ✓ Classification déjà effectuée") return output_las pipeline_json = self.create_pipeline_json(laz_file, output_las) pipeline_file = self.temp_dir / "pipeline.json" with open(pipeline_file, 'w') as f: f.write(pipeline_json) try: subprocess.run( ["pdal", "pipeline", str(pipeline_file)], capture_output=True, check=True ) print(f" ✓ Classification sol terminée") return output_las except subprocess.CalledProcessError as e: print(f" ✗ Erreur PDAL: {e.stderr.decode()}") return None # ============ STEP 2: DTM Generation ============ def create_dtm_fast(self, las_file, basename): """Create DTM using fast binning method with gap filling""" print(f" → Génération DTM...") try: import laspy from scipy.ndimage import gaussian_filter # Read LAZ/LAS file las = laspy.read(str(las_file)) # Get bounds min_x, max_x = float(las.header.min[0]), float(las.header.max[0]) min_y, max_y = float(las.header.min[1]), float(las.header.max[1]) # Calculate grid dimensions width = int(np.ceil((max_x - min_x) / self.resolution)) height = int(np.ceil((max_y - min_y) / self.resolution)) print(f" Bounds: X[{min_x:.1f}, {max_x:.1f}] Y[{min_y:.1f}, {max_y:.1f}]") print(f" Grid: {width}x{height} pixels ({len(las.points):,} points)") # Fast binning method print(f" Rasterisation rapide...") stat = binned_statistic_2d( las.x, las.y, las.z, statistic='mean', bins=[width, height], range=[[min_x, max_x], [min_y, max_y]] ) dtm = stat.statistic.T # Fill gaps using interpolation print(f" Remplissage des zones vides...") from scipy.ndimage import distance_transform_edt # Create mask of empty cells mask = np.isnan(dtm) if np.any(mask): # Use inpainting to fill gaps from scipy import interpolate # Get coordinates of valid points y_coords, x_coords = np.where(~mask) z_values = dtm[~mask] # Create interpolation function if len(y_coords) > 100: # Only interpolate if enough points # Create grid for interpolation y_all, x_all = np.mgrid[0:dtm.shape[0], 0:dtm.shape[1]] # Use nearest neighbor interpolation for speed interp = interpolate.NearestNDInterpolator( np.column_stack((y_coords, x_coords)), z_values ) # Fill missing values dtm_filled = interp(y_all, x_all) # Apply slight smoothing to blend filled areas dtm_filled = gaussian_filter(dtm_filled, sigma=0.5) # Replace NaN values with filled values dtm[mask] = dtm_filled[mask] # Save as GeoTIFF output_tif = self.dtm_dir / f"{basename}_dtm.tif" transform = from_bounds(min_x, min_y, max_x, max_y, width, height) with rasterio.open( output_tif, 'w', driver='GTiff', height=height, width=width, count=1, dtype='float32', crs='EPSG:2154', transform=transform, compress='lzw' ) as dst: dst.write(dtm.astype('float32'), 1) print(f" ✓ DTM créé: {output_tif.name}") return output_tif except Exception as e: print(f" ✗ Erreur DTM: {e}") import traceback traceback.print_exc() return None # ============ STEP 3: Visualizations (Python) ============ def generate_hillshade(self, dem_file, basename): """Generate hillshade using numpy""" print(f" → Hillshade multidirectionnel...") output = self.vis_dir / f"{basename}_hillshade_multi.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Calculate gradients dy, dx = np.gradient(dem) # Multi-directional hillshade (NW, NE, SW, SE) azimuts = [315, 45, 225, 135] altitude = 30 hillshades = [] for az in azimuts: az_rad = np.radians(az) alt_rad = np.radians(altitude) # Calculate slope and aspect slope = np.arctan(np.sqrt(dx**2 + dy**2)) aspect = np.arctan2(-dy, dx) # Hillshade formula hs = np.sin(alt_rad) * np.sin(slope) + \ np.cos(alt_rad) * np.cos(slope) * np.cos(az_rad - aspect) hillshades.append(np.clip(hs, 0, 1)) # Combine all directions combined = np.mean(hillshades, axis=0) # Save with rasterio.open( output, 'w', driver='GTiff', height=combined.shape[0], width=combined.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(combined.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur hillshade: {e}") return None def generate_slope(self, dem_file, basename): """Generate slope map""" print(f" → Pente (Slope)...") output = self.vis_dir / f"{basename}_slope.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Calculate slope dy, dx = np.gradient(dem) slope = np.arctan(np.sqrt(dx**2 + dy**2)) * 180 / np.pi # Save with rasterio.open( output, 'w', driver='GTiff', height=slope.shape[0], width=slope.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(slope.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur slope: {e}") return None def generate_lrm(self, dem_file, basename): """Local Relief Model - deviation from local mean""" print(f" → Local Relief Model...") output = self.vis_dir / f"{basename}_lrm.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Calculate local mean with gaussian filter from scipy.ndimage import gaussian_filter local_mean = gaussian_filter(dem, sigma=15/self.resolution) # Deviation from mean lrm = dem - local_mean # Save with rasterio.open( output, 'w', driver='GTiff', height=lrm.shape[0], width=lrm.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(lrm.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur LRM: {e}") return None def generate_svf(self, dem_file, basename): """Sky-View Factor approximation""" print(f" → Sky-View Factor...") output = self.vis_dir / f"{basename}_svf.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Simplified SVF using relief inverted # Normalize DEM dem_norm = (dem - np.nanmin(dem)) / (np.nanmax(dem) - np.nanmin(dem)) # Apply inverted relief (approximation of SVF) svf = 1 - dem_norm # Save with rasterio.open( output, 'w', driver='GTiff', height=svf.shape[0], width=svf.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(svf.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur SVF: {e}") return None def generate_openness(self, dem_file, basename, positive=True): """Positive/Negative Openness approximation""" name = "positive_openness" if positive else "negative_openness" print(f" → {name.replace('_', ' ').title()}...") output = self.vis_dir / f"{basename}_{name}.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs if positive: # Positive openness: high points from scipy.ndimage import maximum_filter openness = maximum_filter(dem, size=int(10/self.resolution)) else: # Negative openness: low points (cavities!) from scipy.ndimage import minimum_filter openness = minimum_filter(dem, size=int(10/self.resolution)) # Calculate difference result = openness - dem if positive else dem - openness # Save with rasterio.open( output, 'w', driver='GTiff', height=result.shape[0], width=result.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(result.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur openness: {e}") return None # ============ STEP 4: PNG Conversion ============ def tif_to_png(self, tif_file): """Convert GeoTIFF to visualization JPEG with explicit legend""" if not tif_file or not tif_file.exists(): return None jpg_file = self.vis_dir / f"{tif_file.stem}.jpg" try: with rasterio.open(tif_file) as src: data = src.read(1) nodata = src.nodata if nodata is not None: data = np.ma.masked_where((data == nodata) | np.isnan(data), data) # Get valid data for normalization valid_data = data.compressed() if hasattr(data, 'compressed') else data.flatten() valid_data = valid_data[~np.isnan(valid_data)] # Determine colormap, normalization, and legend info if 'hillshade' in str(tif_file): cmap = 'gray' vmin, vmax = 0, 1 data = np.clip(data, vmin, vmax) title = "Hillshade Multidirectionnel" legend_label = "Illumination (0-1)" description = "Ombres → Structures, murs, terrasses visibles" elif 'slope' in str(tif_file): cmap = 'inferno' vmin, vmax = 0, np.percentile(valid_data, 95) data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Pente (Slope)" legend_label = f"Pente (°)\nMin: {vmin:.1f}° | Max: {vmax:.1f}°" description = "Orange/Clair = Forte pente (murs, talus) | Foncé = Faible pente" elif 'svf' in str(tif_file): cmap = 'viridis' vmin, vmax = np.percentile(valid_data, (5, 95)) data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Sky-View Factor" legend_label = "Ouverture vers le ciel\nFoncé = Cuvette | Clair = Sommet" description = "Excellent pour structures, tumulus, fondations" elif 'lrm' in str(tif_file): cmap = 'RdBu_r' vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1) vmin, vmax = -vmax, vmax data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Local Relief Model (LRM)" legend_label = f"Relief local (m)\nRouge = +{vmax:.1f}m (élévation)\nBleu = {vmin:.1f}m (dépression)" description = "Rouge = Surélévation | Bleu = Creux, fossé" elif 'positive' in str(tif_file): cmap = 'YlOrBr' vmin, vmax = np.percentile(valid_data, (10, 98)) data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Positive Openness" legend_label = f"Ouverture positive (m)\nMin: {vmin:.1f}m | Max: {vmax:.1f}m" description = "Orange = Zones élevées (tumulus, collines)" elif 'negative' in str(tif_file): cmap = 'GnBu_r' vmin, vmax = np.percentile(valid_data, (10, 98)) data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Negative Openness" legend_label = f"Ouverture négative (m)\nMin: {vmin:.1f}m | Max: {vmax:.1f}m" description = "Vert/Bleu = Zones basses (cavités, fossés, souterrains)" else: cmap = 'terrain' p2, p98 = np.percentile(valid_data, (2, 98)) data = np.clip((data - p2) / (p98 - p2), 0, 1) title = tif_file.stem.replace('_', ' ').title() legend_label = "Altitude normalisée" description = "" # Create figure with white background for JPEG fig, ax = plt.subplots(figsize=(18, 12), facecolor='white') # Display data im = ax.imshow(data, cmap=cmap, aspect='equal') # Enhanced colorbar with explicit values cbar = plt.colorbar(im, ax=ax, pad=0.03, shrink=0.75, aspect=30) cbar.ax.tick_params(labelsize=10, width=1.5) cbar.outline.set_linewidth(1.5) # Add colorbar label cbar.set_label(legend_label, fontsize=11, fontweight='bold') # Enhanced title with description full_title = f"{title}\n{description}" ax.set_title(full_title, fontsize=16, fontweight='bold', pad=15) # Add coordinates info ax.text(0.02, 0.98, f"Résolution: {self.resolution}m/px", transform=ax.transAxes, fontsize=9, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8)) ax.axis('off') fig.patch.set_facecolor('white') plt.tight_layout() # Save as JPEG plt.savefig(jpg_file, dpi=150, bbox_inches='tight', facecolor='white', format='jpg') plt.close() # Delete the source TIFF file to save space tif_file.unlink() return jpg_file except Exception as e: print(f" ✗ Erreur conversion JPEG: {e}") import traceback traceback.print_exc() return None def generate_all_visualizations(self, dtm_file, basename): """Generate all archaeological visualizations""" print(f"\n Génération visualisations:") vis_results = {} # Generate rasters vis_results['hillshade'] = self.generate_hillshade(dtm_file, basename) vis_results['slope'] = self.generate_slope(dtm_file, basename) vis_results['svf'] = self.generate_svf(dtm_file, basename) vis_results['lrm'] = self.generate_lrm(dtm_file, basename) vis_results['pos_open'] = self.generate_openness(dtm_file, basename, positive=True) vis_results['neg_open'] = self.generate_openness(dtm_file, basename, positive=False) # Convert to PNG print(f"\n Conversion images PNG:") for name, tif_file in vis_results.items(): png_file = self.tif_to_png(tif_file) if png_file: print(f" ✓ {png_file.name}") return vis_results def create_overview(self, basename): """Create overview image with all visualizations (JPEG)""" patterns = { 'Hillshade': '*_hillshade_multi.jpg', 'Pente': '*_slope.jpg', 'Sky-View Factor': '*_svf.jpg', 'Local Relief': '*_lrm.jpg', 'Pos. Openness': '*_positive_openness.jpg', 'Neg. Openness': '*_negative_openness.jpg' } images = {} for name, pattern in patterns.items(): files = list(self.vis_dir.glob(pattern)) if files: images[name] = str(files[0]) if len(images) < 2: return None # 2x3 grid with white background fig, axes = plt.subplots(2, 3, figsize=(20, 14), facecolor='white') axes = axes.flatten() for idx, (name, img_path) in enumerate(images.items()): if idx >= 6: break img = plt.imread(img_path) axes[idx].imshow(img) axes[idx].set_title(name, fontsize=12, fontweight='bold') axes[idx].axis('off') # Hide unused subplots for idx in range(len(images), 6): axes[idx].axis('off') # Title fig.suptitle( f"Analyse Archéologique LiDAR - {basename}", fontsize=16, fontweight='bold', y=0.98 ) plt.tight_layout() output = self.report_dir / f"{basename}_overview.jpg" plt.savefig(output, dpi=150, bbox_inches='tight', facecolor='white', format='jpg') plt.close() return output # ============ Complete Pipeline ============ def process_file(self, laz_file): """Process a single LAZ file""" basename = laz_file.stem print(f"\n{'='*60}") print(f" FICHIER : {basename}") print(f"{'='*60}") # Step 1: Ground classification print(f"\n[1/3] Classification du sol...") las_file = self.classify_ground(laz_file) if not las_file: print(f" ✗ Échec classification") return False # Step 2: Generate DTM print(f"\n[2/3] Génération DTM...") dtm_file = self.create_dtm_fast(las_file, basename) if not dtm_file: print(f" ✗ Échec DTM") return False # Step 3: Visualizations print(f"\n[3/3] Visualisations archéologiques...") vis = self.generate_all_visualizations(dtm_file, basename) # Overview overview = self.create_overview(basename) if overview: print(f"\n ✓ Vue synthétique: {overview}") print(f"\n✓ {basename} traité avec succès !") return True def process_all(self): """Process all LAZ files in input directory""" files = self.find_laz_files() if not files: print("Aucun fichier LAZ/LAS trouvé !") return print(f"\n{'='*60}") print(f" PIPELINE ARCHÉOLOGIQUE LiDAR") print(f"{'='*60}") # Check tools print(f"\nVérification des outils...") if not self.check_tools(): print("Erreur: Certains outils ne sont pas disponibles") return results = {} for laz_file in files: try: results[laz_file.name] = self.process_file(laz_file) except Exception as e: print(f"\n✗ Erreur traitement: {e}") import traceback traceback.print_exc() results[laz_file.name] = False # Summary print(f"\n{'='*60}") print(f" RÉSUMÉ") print(f"{'='*60}") success_count = sum(results.values()) print(f"✓ {success_count}/{len(results)} fichiers traités avec succès") print(f"\nRésultats dans: {self.output_dir}") print(f" • DTM: {self.dtm_dir}") print(f" • Visualisations JPEG: {self.vis_dir}") print(f" • Rapports: {self.report_dir}") # Clean up temporary files to save space print(f"\nNettoyage des fichiers temporaires...") try: import shutil if self.temp_dir.exists(): shutil.rmtree(self.temp_dir) print(f" ✓ Fichiers temporaires supprimés ({self.temp_dir})") except Exception as e: print(f" Note: Impossible de supprimer les fichiers temporaires") def main(): import argparse parser = argparse.ArgumentParser( description="Pipeline LiDAR pour détection archéologique (Python pur)" ) parser.add_argument( "input", help="Dossier contenant les fichiers LAZ/LAS" ) parser.add_argument( "-o", "--output", default="/data/output", help="Dossier de sortie" ) parser.add_argument( "-r", "--resolution", type=float, default=0.5, help="Résolution en mètres" ) args = parser.parse_args() try: pipeline = LidarArchaeoPipeline( input_dir=args.input, output_dir=args.output, resolution=args.resolution ) pipeline.process_all() except Exception as e: print(f"Erreur: {e}") sys.exit(1) if __name__ == "__main__": main()