#!/usr/bin/env python3 """ Pipeline LiDAR pour détection archéologique Visualisations générées avec numpy/cupy + rasterio/matplotlib Support GPU via CuPy si disponible, fallback numpy sinon """ import os import sys import json import subprocess from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path import numpy as np import rasterio from rasterio.transform import from_bounds import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from matplotlib import rcParams from scipy import ndimage from scipy.stats import binned_statistic_2d from tqdm import tqdm try: import laspy except ImportError as e: print(f"Erreur import: {e}") sys.exit(1) try: from rasterio.warp import transform as warp_transform HAS_WARP = True except ImportError: HAS_WARP = False # GPU acceleration via CuPy try: import cupy as cp import cupyx.scipy.ndimage as cp_ndimage HAS_GPU = True _xp = cp # Default array module (GPU) _gpu_info = cp.cuda.runtime.getDeviceProperties(0) _gpu_name = _gpu_info['name'].decode() if isinstance(_gpu_info['name'], bytes) else str(_gpu_info['name']) print(f"✓ GPU détectée: {_gpu_name}") print(f" Mémoire GPU: {_gpu_info['totalGlobalMem'] // (1024**3)} Go") except (ImportError, Exception): HAS_GPU = False _xp = np # Fallback CPU def to_gpu(arr): """Send array to GPU if available.""" if HAS_GPU: return cp.asarray(arr.astype(np.float64)) return arr.astype(np.float64) def to_cpu(arr): """Bring array back to CPU (numpy).""" if HAS_GPU and isinstance(arr, cp.ndarray): return cp.asnumpy(arr) return arr def xp_gaussian_filter(arr, sigma): """Gaussian filter using GPU if available.""" if HAS_GPU and isinstance(arr, cp.ndarray): return cp_ndimage.gaussian_filter(arr, sigma) return ndimage.gaussian_filter(arr, sigma) def xp_uniform_filter(arr, size): """Uniform filter using GPU if available.""" if HAS_GPU and isinstance(arr, cp.ndarray): return cp_ndimage.uniform_filter(arr, size) return ndimage.uniform_filter(arr, size) def xp_minimum_filter(arr, footprint=None, size=None): """Minimum filter using GPU if available.""" if HAS_GPU and isinstance(arr, cp.ndarray): return cp_ndimage.minimum_filter(arr, footprint=footprint, size=size) return ndimage.minimum_filter(arr, footprint=footprint, size=size) rcParams['figure.dpi'] = 150 rcParams['savefig.dpi'] = 300 rcParams['font.size'] = 10 class LidarArchaeoPipeline: """Pipeline Python pur pour analyse archéologique LiDAR""" def __init__(self, input_dir, output_dir, resolution=0.5, workers=1): self.input_dir = Path(input_dir) self.output_dir = Path(output_dir) self.resolution = resolution self.workers = workers self.temp_dir = self.output_dir / "temp" if not self.input_dir.exists(): raise ValueError(f"Répertoire introuvable: {self.input_dir}") self.output_dir.mkdir(parents=True, exist_ok=True) self.temp_dir.mkdir(exist_ok=True) self.dtm_dir = self.output_dir / "DTM" self.vis_dir = self.output_dir / "visualisations" self.pdf_dir = self.output_dir / "rapports" for d in [self.dtm_dir, self.vis_dir, self.pdf_dir]: d.mkdir(exist_ok=True) print(f"✓ Pipeline initialisé (Python pur)") print(f" Entrée : {self.input_dir}") print(f" Sortie : {self.output_dir}") print(f" Résolution : {resolution}m/px") print(f" Workers : {workers}") def find_laz_files(self): """Trouve tous les fichiers LAZ/LAS""" files = list(self.input_dir.glob("*.laz")) + list(self.input_dir.glob("*.las")) print(f"✓ {len(files)} fichier(s) LiDAR trouvé(s)") return sorted(files) def check_tools(self): """Vérifie que les outils sont disponibles""" for name, cmd in [('pdal', 'pdal --version'), ('gdal', 'gdalinfo --version')]: try: subprocess.run(cmd.split(), capture_output=True, check=True) print(f" ✓ {name}") except (subprocess.CalledProcessError, FileNotFoundError): return False return True # ============ STEP 1: Classification ============ def create_pipeline_json(self, input_laz, output_las): """Create PDAL pipeline for ground classification""" pipeline = { "pipeline": [ str(input_laz), { "type": "filters.smrf", "ignore": "Classification[7:7]", "slope": 1.0, "window": 16.0, "threshold": 0.5, "scalar": 1.25 }, { "type": "filters.range", "limits": "Classification[2:2]" }, { "type": "writers.las", "filename": str(output_las), "extra_dims": "all" } ] } return json.dumps(pipeline) def classify_ground(self, laz_file): """Classify ground points using SMRF filter""" output_las = self.temp_dir / f"{laz_file.stem}_ground.las" if output_las.exists(): print(f" ✓ Classification déjà effectuée") return output_las pipeline_json = self.create_pipeline_json(laz_file, output_las) pipeline_file = self.temp_dir / "pipeline.json" with open(pipeline_file, 'w') as f: f.write(pipeline_json) try: subprocess.run( ["pdal", "pipeline", str(pipeline_file)], capture_output=True, check=True ) print(f" ✓ Classification sol terminée") return output_las except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() print(f" ✗ Erreur PDAL: {error_msg}") # If error is about ReturnNumber=0, try filtering those points if "ReturnNumber" in error_msg and "NumberOfReturns" in error_msg: print(f" → Tentative de filtrage des points ReturnNumber=0...") # Use filters.range to keep only valid points (ReturnNumber >= 1) filtered_pipeline = [ { "type": "readers.las", "filename": str(laz_file) }, { "type": "filters.range", "limits": "ReturnNumber[1:],NumberOfReturns[1:]" }, { "type": "filters.smrf", "scalar": 1.25 }, { "type": "filters.range", "limits": "Classification[2:2]" }, { "type": "writers.las", "filename": str(output_las), "extra_dims": "all" } ] filtered_json = json.dumps(filtered_pipeline) with open(pipeline_file, 'w') as f: f.write(filtered_json) try: subprocess.run( ["pdal", "pipeline", str(pipeline_file)], capture_output=True, check=True ) print(f" ✓ Classification sol terminée (points filtrés)") return output_las except subprocess.CalledProcessError as e2: print(f" ✗ Erreur même avec filtrage: {e2.stderr.decode()}") return None return None def run_pdal_pipeline(self, pipeline_json, output_file): """Exécute un pipeline PDAL""" pipeline_file = self.temp_dir / "temp_pipeline.json" with open(pipeline_file, 'w') as f: f.write(pipeline_json) try: subprocess.run( ["pdal", "pipeline", str(pipeline_file)], capture_output=True, check=True ) print(f" ✓ Pipeline terminé") return output_file except subprocess.CalledProcessError as e: print(f" ✗ Erreur PDAL: {e.stderr.decode()}") return None # ============ STEP 2: DTM Generation ============ def create_dtm_fast(self, las_file, basename): """Create DTM using fast binning method with gap filling""" print(f" → Génération DTM...") try: import laspy from scipy.ndimage import gaussian_filter # Read LAZ/LAS file las = laspy.read(str(las_file)) # Get bounds min_x, max_x = float(las.header.min[0]), float(las.header.max[0]) min_y, max_y = float(las.header.min[1]), float(las.header.max[1]) # Calculate grid dimensions width = int(np.ceil((max_x - min_x) / self.resolution)) height = int(np.ceil((max_y - min_y) / self.resolution)) print(f" Bounds: X[{min_x:.1f}, {max_x:.1f}] Y[{min_y:.1f}, {max_y:.1f}]") print(f" Grid: {width}x{height} pixels ({len(las.points):,} points)") # Fast binning method print(f" Rasterisation rapide...") stat = binned_statistic_2d( las.x, las.y, las.z, statistic='mean', bins=[width, height], range=[[min_x, max_x], [min_y, max_y]] ) dtm = stat.statistic.T # Flip Y axis so north is at the top (Y decreases from top to bottom in image) dtm = dtm[::-1, :] # Fill gaps using interpolation print(f" Remplissage des zones vides...") # Create mask of empty cells mask = np.isnan(dtm) if np.any(mask): from scipy.ndimage import distance_transform_edt # Calculate distance to nearest valid point for each cell dist_to_nearest = distance_transform_edt(mask) max_fill_distance = max(20, int(10 / self.resolution)) # Max 20px or 10m # Use nearest-neighbor interpolation for speed (memory efficient) # but apply strong Gaussian smoothing to remove artifacts from scipy import interpolate y_coords, x_coords = np.where(~mask) z_values = dtm[~mask] if len(y_coords) > 100: # Nearest neighbor interpolation (fast, memory-efficient) interp = interpolate.NearestNDInterpolator( np.column_stack((y_coords, x_coords)), z_values ) # Only interpolate where needed (memory-efficient) y_missing, x_missing = np.where(mask) dtm[y_missing, x_missing] = interp(y_missing, x_missing) # Apply Gaussian smoothing to entire DTM to remove blocky artifacts # Stronger smoothing in areas that were interpolated dtm_smooth = gaussian_filter(dtm, sigma=2.0) # Only use smoothed values for interpolated areas, keep original for valid data # Also only fill within max distance from valid data fill_mask = mask & (dist_to_nearest <= max_fill_distance) dtm[fill_mask] = dtm_smooth[fill_mask] # For cells beyond max distance, leave as NaN far_mask = mask & (dist_to_nearest > max_fill_distance) dtm[far_mask] = np.nan # Save as GeoTIFF output_tif = self.dtm_dir / f"{basename}_dtm.tif" transform = from_bounds(min_x, min_y, max_x, max_y, width, height) with rasterio.open( output_tif, 'w', driver='GTiff', height=height, width=width, count=1, dtype='float32', crs='EPSG:2154', transform=transform, compress='lzw' ) as dst: dst.write(dtm.astype('float32'), 1) print(f" ✓ DTM créé: {output_tif.name}") return output_tif except Exception as e: print(f" ✗ Erreur DTM: {e}") import traceback traceback.print_exc() return None # ============ STEP 3: Visualizations (Python) ============ def generate_hillshade(self, dem_file, basename): """Generate hillshade using numpy""" print(f" → Hillshade multidirectionnel...") output = self.vis_dir / f"{basename}_hillshade_multi.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Calculate gradients dy, dx = np.gradient(dem) # Multi-directional hillshade (NW, NE, SW, SE) azimuts = [315, 45, 225, 135] altitude = 30 hillshades = [] for az in azimuts: az_rad = np.radians(az) alt_rad = np.radians(altitude) # Calculate slope and aspect slope = np.arctan(np.sqrt(dx**2 + dy**2)) aspect = np.arctan2(dy, dx) # Hillshade formula hs = np.sin(alt_rad) * np.sin(slope) + \ np.cos(alt_rad) * np.cos(slope) * np.cos(az_rad - aspect) hillshades.append(np.clip(hs, 0, 1)) # Combine all directions combined = np.mean(hillshades, axis=0) # Save with rasterio.open( output, 'w', driver='GTiff', height=combined.shape[0], width=combined.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(combined.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur hillshade: {e}") return None def generate_hillshade_ne(self, dem_file, basename): """Generate hillshade from NW (azimuth 315deg) like topographic maps""" print(f" → Hillshade Nord-Est (carte topo)...") output = self.vis_dir / f"{basename}_hillshade_ne.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Single-direction hillshade from NW (315°) - standard for topo maps # Since DTM Y-axis is flipped (north at top, Y increases downward in image), # use +dy in aspect calculation (not -dy) azimuth = 315 altitude = 45 dy, dx = np.gradient(dem) slope = np.arctan(np.sqrt(dx**2 + dy**2)) # Use +dy because DTM is already flipped (north at top) aspect = np.arctan2(dy, dx) az_rad = np.radians(azimuth) alt_rad = np.radians(altitude) hs = np.sin(alt_rad) * np.sin(slope) + \ np.cos(alt_rad) * np.cos(slope) * np.cos(az_rad - aspect) hs = np.clip(hs, 0, 1) # Save with rasterio.open( output, 'w', driver='GTiff', height=hs.shape[0], width=hs.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(hs.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur hillshade NE: {e}") return None def generate_slope(self, dem_file, basename): """Generate slope map""" print(f" → Pente (Slope)...") output = self.vis_dir / f"{basename}_slope.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Calculate slope dy, dx = np.gradient(dem) slope = np.arctan(np.sqrt(dx**2 + dy**2)) * 180 / np.pi # Save with rasterio.open( output, 'w', driver='GTiff', height=slope.shape[0], width=slope.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(slope.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur slope: {e}") return None def generate_aspect(self, dem_file, basename): """Generate aspect (orientation of slopes) map""" print(f" → Aspect (Orientation)...") output = self.vis_dir / f"{basename}_aspect.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Calculate aspect (direction of slope) dy, dx = np.gradient(dem) aspect = np.arctan2(dy, dx) * 180 / np.pi aspect = np.mod(aspect, 360) # Convert to 0-360 # Save with rasterio.open( output, 'w', driver='GTiff', height=aspect.shape[0], width=aspect.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(aspect.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur aspect: {e}") return None def generate_curvature(self, dem_file, basename): """Generate curvature (terrain concavity/convexity) map""" print(f" → Courbure (Curvature)...") output = self.vis_dir / f"{basename}_curvature.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Calculate curvature using Laplacian dz_dx = np.gradient(dem, axis=1) dz_dy = np.gradient(dem, axis=0) d2z_dx2 = np.gradient(dz_dx, axis=1) d2z_dy2 = np.gradient(dz_dy, axis=0) # Mean curvature curvature = (d2z_dx2 + d2z_dy2) / 2 # Save with rasterio.open( output, 'w', driver='GTiff', height=curvature.shape[0], width=curvature.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(curvature.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur curvature: {e}") return None def generate_solar(self, dem_file, basename): """Generate solar irradiance simulation""" print(f" → Éclairage Solaire (Solar Irradiance)...") output = self.vis_dir / f"{basename}_solar.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Calculate gradients dy, dx = np.gradient(dem) # Calculate slope and aspect slope = np.arctan(np.sqrt(dx**2 + dy**2)) aspect = np.arctan2(dy, dx) # Solar irradiance (morning sun - azimuth 90, altitude 30) az_rad = np.radians(90) alt_rad = np.radians(30) # Solar radiation formula solar = np.sin(alt_rad) * np.sin(slope) + \ np.cos(alt_rad) * np.cos(slope) * np.cos(az_rad - aspect) # Clip negative values (shadows) solar = np.clip(solar, 0, 1) # Save with rasterio.open( output, 'w', driver='GTiff', height=solar.shape[0], width=solar.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(solar.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur solar: {e}") return None def generate_lrm(self, dem_file, basename): """Local Relief Model - deviation from local mean""" gpu_tag = " [GPU]" if HAS_GPU else "" print(f" → Local Relief Model{gpu_tag}...") output = self.vis_dir / f"{basename}_lrm.tif" try: with rasterio.open(dem_file) as src: dem_np = src.read(1) transform = src.transform crs = src.crs dem = to_gpu(dem_np) local_mean = xp_gaussian_filter(dem, sigma=15/self.resolution) lrm = dem - local_mean lrm_np = to_cpu(lrm).astype(np.float32) # Save with rasterio.open( output, 'w', driver='GTiff', height=lrm_np.shape[0], width=lrm_np.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(lrm_np, 1) return output except Exception as e: print(f" ✗ Erreur LRM: {e}") return None def generate_svf(self, dem_file, basename): """Sky-View Factor - ray-tracing on 16 azimuths. For each pixel, trace rays in N directions, find the max horizon angle in each direction, then SVF = (1/N) * sum(cos²(horizon_angle)). Valleys/crevices have low SVF (obstructed sky), ridges/peaks have high SVF. Uses GPU (CuPy) if available for acceleration.""" gpu_tag = " [GPU]" if HAS_GPU else "" print(f" → Sky-View Factor (ray-tracing){gpu_tag}...") output = self.vis_dir / f"{basename}_svf.tif" try: with rasterio.open(dem_file) as src: dem_np = src.read(1) transform = src.transform crs = src.crs rows, cols = dem_np.shape res = self.resolution # Move to GPU if available dem = to_gpu(dem_np) # 16 azimuth directions n_dirs = 16 angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False) dx = np.cos(angles) dy = np.sin(angles) max_dist = int(50 / res) # 50m search radius # Pad DEM with NaN for boundary handling xp = cp if HAS_GPU else np padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan) svf = xp.zeros_like(dem) for d_idx in range(n_dirs): ddx, ddy = dx[d_idx], dy[d_idx] horizon = xp.zeros_like(dem) for step in range(1, max_dist + 1): px = int(round(ddx * step)) py = int(round(ddy * step)) dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2) if dist_m < res * 0.5: continue elev_diff = padded[max_dist + py:max_dist + py + rows, max_dist + px:max_dist + px + cols] - dem angle = xp.arctan2(elev_diff, dist_m) horizon = xp.where(xp.isnan(angle), horizon, xp.maximum(horizon, xp.nan_to_num(angle, nan=0))) svf += xp.cos(xp.pi / 2 - horizon) ** 2 svf /= n_dirs # Bring back to CPU for saving svf_np = to_cpu(svf).astype(np.float32) with rasterio.open( output, 'w', driver='GTiff', height=svf_np.shape[0], width=svf_np.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(svf_np, 1) return output except Exception as e: print(f" ✗ Erreur SVF: {e}") return None return output except Exception as e: print(f" ✗ Erreur SVF: {e}") return None def generate_openness(self, dem_file, basename, positive=True): """Positive/Negative Openness - true zenith/nadir angle computation. For each pixel, in 8 directions (N, NE, E, SE, S, SW, W, NW): - Positive openness: max zenith angle (angle from vertical to highest visible terrain) - Negative openness: max nadir angle (angle from vertical down to lowest terrain) Result is averaged across all 8 directions. Uses GPU (CuPy) if available for acceleration.""" name = "positive_openness" if positive else "negative_openness" gpu_tag = " [GPU]" if HAS_GPU else "" print(f" → {name.replace('_', ' ').title()} (ray-tracing){gpu_tag}...") output = self.vis_dir / f"{basename}_{name}.tif" try: with rasterio.open(dem_file) as src: dem_np = src.read(1) transform = src.transform crs = src.crs rows, cols = dem_np.shape res = self.resolution dem = to_gpu(dem_np) xp = cp if HAS_GPU else np n_dirs = 8 angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False) dx = np.cos(angles) dy = np.sin(angles) max_dist = int(50 / res) padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan) openness_sum = xp.zeros_like(dem) for d_idx in range(n_dirs): ddx, ddy = dx[d_idx], dy[d_idx] max_angle = xp.zeros_like(dem) for step in range(1, max_dist + 1): px = int(round(ddx * step)) py = int(round(ddy * step)) dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2) if dist_m < res * 0.5: continue elev_diff = padded[max_dist + py:max_dist + py + rows, max_dist + px:max_dist + px + cols] - dem if positive: angle = xp.arctan2(xp.maximum(elev_diff, 0), dist_m) else: angle = xp.arctan2(xp.maximum(-elev_diff, 0), dist_m) max_angle = xp.where(xp.isnan(angle), max_angle, xp.maximum(max_angle, xp.nan_to_num(angle, nan=0))) openness_sum += max_angle # Average across all directions (convert to degrees) openness_result = to_cpu(xp.degrees(openness_sum / n_dirs)).astype(np.float32) with rasterio.open( output, 'w', driver='GTiff', height=openness_result.shape[0], width=openness_result.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(openness_result.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur openness: {e}") return None def generate_mslrm(self, dem_file, basename): """Multi-Scale Relief Model (MSRM) - LRM computed at multiple scales (sigma=5,10,25,50,100) and combined. Reveals features at all scales: small (walls, ditches), medium (enclosures, tumulus), large (landscapes). Uses GPU if available for acceleration.""" gpu_tag = " [GPU]" if HAS_GPU else "" print(f" → Multi-Scale Relief Model (MSRM){gpu_tag}...") output = self.vis_dir / f"{basename}_mslrm.tif" try: with rasterio.open(dem_file) as src: dem_np = src.read(1) transform = src.transform crs = src.crs dem = to_gpu(dem_np) xp = cp if HAS_GPU else np # Compute LRM at multiple scales sigmas = [5, 10, 25, 50, 100] lrm_stack = [] for sigma in sigmas: sigma_px = sigma / self.resolution local_mean = xp_gaussian_filter(dem, sigma=sigma_px) lrm = dem - local_mean lrm_norm = lrm / max(xp.nanstd(lrm), 0.01) lrm_stack.append(lrm_norm) # Combine: RMS of normalized LRM at all scales mslrm = xp.sqrt(xp.mean(xp.array(lrm_stack) ** 2, axis=0)) mslrm_np = to_cpu(mslrm).astype(np.float32) with rasterio.open( output, 'w', driver='GTiff', height=mslrm_np.shape[0], width=mslrm_np.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(mslrm_np, 1) return output except Exception as e: print(f" ✗ Erreur MSRM: {e}") return None def generate_tpi(self, dem_file, basename): """Multi-Scale Topographic Position Index. TPI = elevation - mean(neighborhood). Computed at fine (5m) and broad (100m) scales to identify ridges, valleys, and intermediate landforms. Uses GPU if available for acceleration.""" gpu_tag = " [GPU]" if HAS_GPU else "" print(f" → TPI multi-echelle{gpu_tag}...") output = self.vis_dir / f"{basename}_tpi.tif" try: with rasterio.open(dem_file) as src: dem_np = src.read(1) transform = src.transform crs = src.crs dem = to_gpu(dem_np) fine_size = int(5 / self.resolution) if fine_size % 2 == 0: fine_size += 1 tpi_fine = dem - xp_uniform_filter(dem, size=fine_size) broad_size = int(100 / self.resolution) if broad_size % 2 == 0: broad_size += 1 tpi_broad = dem - xp_uniform_filter(dem, size=broad_size) # Combine: fine-scale weighted more for archaeological features # Normalize each scale then weight: 0.6 fine + 0.4 broad xp = cp if HAS_GPU else np fine_std = max(float(xp.nanstd(tpi_fine)), 0.01) broad_std = max(float(xp.nanstd(tpi_broad)), 0.01) tpi_combined = 0.6 * (tpi_fine / fine_std) + 0.4 * (tpi_broad / broad_std) tpi_np = to_cpu(tpi_combined).astype(np.float32) with rasterio.open( output, 'w', driver='GTiff', height=tpi_np.shape[0], width=tpi_np.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(tpi_np, 1) return output except Exception as e: print(f" ✗ Erreur TPI: {e}") return None def generate_vat(self, dem_file, basename): """VAT composite (Visualisation for Archaeological Topography). Fuses hillshade (R), slope (G), and SVF (B) into an RGB composite that reveals all micro-topographic features in a single image.""" print(f" → VAT composite...") output = self.vis_dir / f"{basename}_vat.tif" try: # Generate required rasters if not already available with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs from scipy.ndimage import gaussian_filter rows, cols = dem.shape # Hillshade (luminance) for R channel gy, gx = np.gradient(dem, self.resolution) slope = np.arctan(np.sqrt(gx**2 + gy**2)) # Illumination from NW (azimuth 315°, altitude 45°) alt_rad = np.radians(45) az_rad = np.radians(315) shading = (np.sin(alt_rad) * np.cos(slope) + np.cos(alt_rad) * np.sin(slope) * np.cos(az_rad - np.arctan2(gy, gx))) hillshade = np.clip(shading, 0, 1) # Slope for G channel slope_deg = np.degrees(slope) slope_norm = np.clip(slope_deg / 45, 0, 1) # SVF (simplified fast version for composite) for B channel n_dirs_svf = 8 angles_svf = np.linspace(0, 2 * np.pi, n_dirs_svf, endpoint=False) dx_svf = np.cos(angles_svf) dy_svf = np.sin(angles_svf) max_dist_svf = int(30 / self.resolution) padded = np.pad(dem.astype(np.float64), max_dist_svf, mode='constant', constant_values=np.nan) svf = np.zeros_like(dem, dtype=np.float64) for d_idx in range(n_dirs_svf): horizon = np.zeros_like(dem, dtype=np.float64) for step in range(1, max_dist_svf + 1): px = int(round(dx_svf[d_idx] * step)) py = int(round(dy_svf[d_idx] * step)) dist_m = np.sqrt((dx_svf[d_idx] * step * self.resolution) ** 2 + (dy_svf[d_idx] * step * self.resolution) ** 2) if dist_m < self.resolution * 0.5: continue elev_diff = padded[max_dist_svf + py:max_dist_svf + py + rows, max_dist_svf + px:max_dist_svf + px + cols] - dem angle = np.arctan2(elev_diff, dist_m) horizon = np.where(np.isnan(angle), horizon, np.maximum(horizon, np.nan_to_num(angle, nan=0))) svf += np.cos(np.pi / 2 - horizon) ** 2 svf /= n_dirs_svf # Normalize each channel to 0-1 using percentile stretch def stretch(arr, p_low=2, p_high=98): valid = arr[~np.isnan(arr)] if len(valid) == 0: return arr lo, hi = np.percentile(valid, (p_low, p_high)) if hi - lo < 1e-10: return np.zeros_like(arr) return np.clip((arr - lo) / (hi - lo), 0, 1) r = stretch(hillshade, 1, 99) g = stretch(slope_norm, 2, 95) b = stretch(svf, 5, 95) # Stack as RGB (3-band GeoTIFF) rgb = np.stack([r, g, b], axis=0).astype('float32') with rasterio.open( output, 'w', driver='GTiff', height=rows, width=cols, count=3, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: for i in range(3): dst.write(rgb[i], i + 1) return output except Exception as e: print(f" ✗ Erreur VAT: {e}") return None def generate_depressions(self, dem_file, basename): """Depression detection using hydrological sink filling. Fills depressions in the DEM and computes the difference to identify dolines, sinkholes, cavities, and enclosed basins.""" print(f" → Detection depressions (hydrologique)...") output = self.vis_dir / f"{basename}_depressions.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Fill depressions using iterative approach (scipy) # Priority-flood algorithm approximation from scipy.ndimage import binary_dilation, generate_binary_structure # Replace NaN with a very high value for filling dem_filled = dem.copy() nodata_mask = np.isnan(dem_filled) dem_filled[nodata_mask] = np.nanmax(dem) + 1000 # Iterative depression filling # A pixel is a sink if it's lower than all its neighbors # Fill by raising sinks to the minimum of their neighbors changed = True iterations = 0 max_iter = 100 struct = generate_binary_structure(2, 2) # 8-connectivity while changed and iterations < max_iter: # Find local minima (sinks) from scipy.ndimage import minimum_filter neighbor_min = minimum_filter(dem_filled, footprint=struct) # A pixel is a sink if it's lower than the minimum of its neighbors sinks = (dem_filled < neighbor_min) & ~nodata_mask if not np.any(sinks): break # Fill sinks to the level of the lowest neighbor new_dem = np.maximum(dem_filled, neighbor_min) new_dem[nodata_mask] = np.nan changed = np.any(new_dem != dem_filled) dem_filled = new_dem iterations += 1 # Difference = depth of depressions # Only keep positive differences (actual depressions) depressions = dem_filled - dem depressions[nodata_mask] = np.nan # Only keep real depressions (positive depth) depressions = np.where(depressions > 0.01, depressions, 0) with rasterio.open( output, 'w', driver='GTiff', height=depressions.shape[0], width=depressions.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(depressions.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur depressions: {e}") return None def generate_sailore(self, dem_file, basename): """SAILORE - Self-Adaptive Improved Local Relief Model. Kernel size adapts to local slope: flat areas get larger kernels, steep areas get smaller kernels. Better than fixed LRM for heterogeneous terrain. Uses GPU if available for acceleration.""" gpu_tag = " [GPU]" if HAS_GPU else "" print(f" → SAILORE (LRM adaptatif){gpu_tag}...") output = self.vis_dir / f"{basename}_sailore.tif" try: with rasterio.open(dem_file) as src: dem_np = src.read(1) transform = src.transform crs = src.crs dem = to_gpu(dem_np) xp = cp if HAS_GPU else np gy, gx = xp.gradient(dem, self.resolution) slope = xp.arctan(xp.sqrt(gx**2 + gy**2)) slope_deg = xp.degrees(slope) sigma_min = 2.0 / self.resolution sigma_max = 25.0 / self.resolution slope_norm = xp.clip(slope_deg / 30.0, 0, 1) adaptive_sigma = sigma_max - slope_norm * (sigma_max - sigma_min) lrm_fine = dem - xp_gaussian_filter(dem, sigma=sigma_min) lrm_medium = dem - xp_gaussian_filter(dem, sigma=(sigma_min + sigma_max) / 2) lrm_coarse = dem - xp_gaussian_filter(dem, sigma=sigma_max) w_fine = slope_norm w_medium = 1 - 2 * xp.abs(slope_norm - 0.5) w_coarse = 1 - slope_norm w_total = w_fine + w_medium + w_coarse w_total[w_total == 0] = 1 sailore = (w_fine * lrm_fine + w_medium * lrm_medium + w_coarse * lrm_coarse) / w_total sailore_np = to_cpu(sailore).astype(np.float32) with rasterio.open( output, 'w', driver='GTiff', height=sailore_np.shape[0], width=sailore_np.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(sailore_np, 1) return output except Exception as e: print(f" ✗ Erreur SAILORE: {e}") return None def generate_geomorphons(self, dem_file, basename): """Geomorphons - classification of terrain into 10 landform types using ternary pattern of zenith/nadir angles in 8 directions. Types: flat, peak, ridge, shoulder, spur, slope, hollow, footslope, valley, pit.""" print(f" → Geomorphons (10 formes de terrain)...") output = self.vis_dir / f"{basename}_geomorphons.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs rows, cols = dem.shape # Flatness threshold in meters flat_threshold = 1.0 # 1m difference considered flat # 8 directions (N, NE, E, SE, S, SW, W, NW) n_dirs = 8 angles_g = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False) dx_g = np.cos(angles_g) dy_g = np.sin(angles_g) # Lookup distance in pixels lookup = int(10 / self.resolution) # 10m lookup distance # Pad DEM padded = np.pad(dem.astype(np.float64), lookup, mode='edge') # For each direction, determine if higher (+1), flat (0), or lower (-1) pattern = np.zeros((n_dirs, rows, cols), dtype=np.int8) for d in range(n_dirs): px = int(round(dx_g[d] * lookup)) py = int(round(dy_g[d] * lookup)) neighbor = padded[lookup + py:lookup + py + rows, lookup + px:lookup + px + cols] diff = neighbor - dem pattern[d] = np.where(diff > flat_threshold, 1, np.where(diff < -flat_threshold, -1, 0)) # Map ternary patterns to 10 landform types # Using the standard geomorphon classification # Count positive, negative, and flat directions n_positive = np.sum(pattern == 1, axis=0) n_negative = np.sum(pattern == -1, axis=0) n_flat = np.sum(pattern == 0, axis=0) # Classification rules (standard geomorphons) geomorphons = np.full((rows, cols), 0, dtype=np.int8) # 0=flat, 1=peak, 2=ridge, 3=shoulder, 4=spur, # 5=slope, 6=hollow, 7=footslope, 8=valley, 9=pit # Flat: all 8 directions are flat geomorphons[n_flat == 8] = 0 # Peak: all directions look up (all positive) geomorphons[n_positive == 8] = 1 # Pit: all directions look down (all negative) geomorphons[n_negative == 8] = 9 # Ridge: at least 3 positive, no negative, not all positive geomorphons[(n_positive >= 3) & (n_negative == 0) & (n_positive < 8)] = 2 # Valley: at least 3 negative, no positive, not all negative geomorphons[(n_negative >= 3) & (n_positive == 0) & (n_negative < 8)] = 8 # Shoulder: mixed positive+flat, some negative geomorphons[(n_positive >= 2) & (n_negative >= 1) & (n_positive > n_negative)] = 3 # Spur: some positive, more flat than negative geomorphons[(n_positive >= 1) & (n_negative >= 1) & (n_positive > n_negative) & (geomorphons == 0)] = 4 # Slope: mostly flat with some negative or positive geomorphons[(n_flat >= 4) & (n_positive + n_negative <= 4) & (geomorphons == 0)] = 5 # Hollow: more negative than positive, some flat geomorphons[(n_negative >= 1) & (n_positive >= 1) & (n_negative > n_positive) & (geomorphons == 0)] = 6 # Footslope: mixed negative+flat, some positive geomorphons[(n_negative >= 2) & (n_positive >= 1) & (n_negative > n_positive) & (geomorphons == 0)] = 7 # Catch remaining as slope geomorphons[geomorphons == 0] = 5 # Mask nodata nodata_mask = np.isnan(dem) geomorphons[nodata_mask] = 0 with rasterio.open( output, 'w', driver='GTiff', height=rows, width=cols, count=1, dtype='int8', crs=crs, transform=transform, compress='lzw', nodata=0 ) as dst: dst.write(geomorphons.astype('int8'), 1) return output except Exception as e: print(f" ✗ Erreur Geomorphons: {e}") return None def generate_roughness(self, dem_file, basename): """Surface roughness - standard deviation of elevation in a window plus roughness index (ratio of surface area to planar area). Reveals anthropogenic surfaces (walls, paths) vs natural terrain.""" print(f" → Rugosite de surface...") output = self.vis_dir / f"{basename}_roughness.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs from scipy.ndimage import generic_filter # Standard deviation in 5m window window_size = int(5 / self.resolution) if window_size % 2 == 0: window_size += 1 # Compute std deviation (surface roughness) def std_filter(arr): return np.nanstd(arr) roughness = generic_filter(dem.astype(np.float64), std_filter, size=window_size, mode='nearest') with rasterio.open( output, 'w', driver='GTiff', height=roughness.shape[0], width=roughness.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(roughness.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur rugosite: {e}") return None def generate_anomalies(self, dem_file, basename): """Statistical anomaly detection - z-score of local relief plus Local Moran's I for spatial autocorrelation. Identifies statistically significant topographic anomalies.""" print(f" → Detection anomalies statistiques...") output = self.vis_dir / f"{basename}_anomalies.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Z-score of LRM (local relief) from scipy.ndimage import gaussian_filter lrm = dem - gaussian_filter(dem, sigma=15 / self.resolution) lrm_mean = np.nanmean(lrm) lrm_std = max(np.nanstd(lrm), 0.01) z_score = (lrm - lrm_mean) / lrm_std # Local Moran's I for spatial clustering # I_i = (x_i - x_mean) * sum(w_ij * (x_j - x_mean)) / (S0 * sigma²) # Simplified: use local mean of z-score as spatial lag from scipy.ndimage import uniform_filter window = int(10 / self.resolution) if window % 2 == 0: window += 1 local_mean = uniform_filter(z_score, size=window) local_var = uniform_filter(z_score ** 2, size=window) - local_mean ** 2 local_var = np.maximum(local_var, 0.01) # Moran's I approximation: high positive = cluster, high negative = outlier morans_i = z_score * (local_mean - np.nanmean(z_score)) / np.nanstd(z_score) # Combine: absolute z-score (anomaly magnitude) * sign of Moran's I (cluster type) # High positive = significant elevated cluster, high negative = significant depression cluster anomaly_score = np.abs(z_score) * np.sign(morans_i) with rasterio.open( output, 'w', driver='GTiff', height=anomaly_score.shape[0], width=anomaly_score.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(anomaly_score.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur anomalies: {e}") return None def generate_wavelet(self, dem_file, basename): """Mexican Hat wavelet (Ricker wavelet) multi-scale analysis. Continuous Wavelet Transform at multiple scales to detect circular/circular features like tumulus, ring ditches, enclosures. Uses GPU if available for acceleration.""" gpu_tag = " [GPU]" if HAS_GPU else "" print(f" → Ondelette Mexican Hat multi-echelle{gpu_tag}...") output = self.vis_dir / f"{basename}_wavelet.tif" try: with rasterio.open(dem_file) as src: dem_np = src.read(1) transform = src.transform crs = src.crs dem = to_gpu(dem_np) xp = cp if HAS_GPU else np scales = [2, 5, 10, 20, 50] # meters wavelet_stack = [] for scale_m in scales: sigma_px = scale_m / self.resolution # 2D Mexican Hat = Laplacian of Gaussian if HAS_GPU: from cupyx.scipy.ndimage import gaussian_laplace as gpu_gaussian_laplace response = -gpu_gaussian_laplace(dem, sigma=sigma_px) else: from scipy.ndimage import gaussian_laplace response = -gaussian_laplace(to_cpu(dem).astype(np.float64), sigma=sigma_px) response = to_gpu(response) response /= max(float(xp.nanstd(response)), 0.01) wavelet_stack.append(response) combined = xp.sqrt(xp.mean(xp.array(wavelet_stack) ** 2, axis=0)) combined_np = to_cpu(combined).astype(np.float32) with rasterio.open( output, 'w', driver='GTiff', height=combined.shape[0], width=combined.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(combined_np, 1) return output except Exception as e: print(f" ✗ Erreur ondelette: {e}") return None def generate_texture(self, dem_file, basename): """GLCM texture analysis on hillshade - contrast, entropy, homogeneity. Detects anthropogenic surfaces (plowing, paths, walls) vs natural terrain.""" print(f" → Texture GLCM...") output = self.vis_dir / f"{basename}_texture.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # First create a hillshade for texture analysis gy, gx = np.gradient(dem, self.resolution) slope = np.arctan(np.sqrt(gx**2 + gy**2)) alt_rad = np.radians(45) az_rad = np.radians(315) shading = (np.sin(alt_rad) * np.cos(slope) + np.cos(alt_rad) * np.sin(slope) * np.cos(az_rad - np.arctan2(gy, gx))) hillshade = np.clip(shading, 0, 1) # Quantize to 256 levels for GLCM valid = hillshade[~np.isnan(hillshade)] if len(valid) == 0: raise ValueError("No valid data for texture analysis") lo, hi = np.percentile(valid, (1, 99)) img = np.clip((hillshade - lo) / max(hi - lo, 0.001), 0, 1) img_uint8 = (img * 255).astype(np.uint8) # Compute GLCM texture using sklearn-style approach # Simplified GLCM: local variance (proxy for contrast) and local entropy from scipy.ndimage import generic_filter window = int(5 / self.resolution) if window % 2 == 0: window += 1 # Local variance (contrast proxy) def local_variance(arr): arr_f = arr.astype(np.float64) return np.var(arr_f) contrast = generic_filter(img_uint8.astype(np.float64), local_variance, size=window, mode='nearest') # Local entropy approximation def local_entropy(arr): arr_f = arr.astype(np.float64) hist, _ = np.histogram(arr_f, bins=16, range=(0, 256)) hist = hist / max(hist.sum(), 1) hist = hist[hist > 0] return -np.sum(hist * np.log2(hist)) entropy = generic_filter(img_uint8.astype(np.float64), local_entropy, size=window, mode='nearest') # Local homogeneity (inverse difference moment proxy) def local_homogeneity(arr): arr_f = arr.astype(np.float64) mean_val = np.mean(arr_f) return np.mean(1.0 / (1.0 + (arr_f - mean_val) ** 2)) homogeneity = generic_filter(img_uint8.astype(np.float64), local_homogeneity, size=window, mode='nearest') # Combine into composite texture index # Normalize each component def norm(arr): valid = arr[~np.isnan(arr)] if len(valid) == 0: return arr std = max(np.std(valid), 0.01) return (arr - np.mean(valid)) / std texture_combined = (0.4 * norm(contrast) + 0.4 * norm(entropy) - 0.2 * norm(homogeneity)) with rasterio.open( output, 'w', driver='GTiff', height=texture_combined.shape[0], width=texture_combined.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(texture_combined.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur texture GLCM: {e}") return None def generate_flow(self, dem_file, basename): """Flow accumulation using D8 algorithm. Identifies drainage patterns, ditches, and enclosure boundaries. Uses scipy for efficient pit-filling and flow routing.""" print(f" → Accumulation de flux D8...") output = self.vis_dir / f"{basename}_flow.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs rows, cols = dem.shape nodata_mask = np.isnan(dem) # Fill pits first for proper flow routing from scipy.ndimage import minimum_filter, generate_binary_structure # Simple pit-filling: iterative depression filling dem_filled = dem.copy() dem_filled[nodata_mask] = np.nanmax(dem) + 1000 struct = generate_binary_structure(2, 2) # 8-connectivity for _ in range(50): neighbor_min = minimum_filter(dem_filled, footprint=struct) sinks = (dem_filled < neighbor_min) & ~nodata_mask if not np.any(sinks): break dem_filled = np.where(sinks, neighbor_min, dem_filled) dem_filled[nodata_mask] = np.nan # D8 flow direction: each cell flows to steepest descent neighbor # 8 neighbors: E, SE, S, SW, W, NW, N, NE dx8 = [1, 1, 0, -1, -1, -1, 0, 1] dy8 = [0, 1, 1, 1, 0, -1, -1, -1] # Distances (cardinal=1, diagonal=sqrt(2)) dist8 = [1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2)] # Compute flow direction for each cell flow_dir = np.full((rows, cols), -1, dtype=np.int8) max_slope = np.full((rows, cols), 0.0) padded = np.pad(dem_filled, 1, mode='constant', constant_values=np.nanmax(dem_filled) + 10000) for d in range(8): # Neighbor elevation nx = 1 + dx8[d] ny = 1 + dy8[d] neighbor_elev = padded[ny:ny + rows, nx:nx + cols] # Slope to neighbor (positive = downhill) slope = (dem_filled - neighbor_elev) / (dist8[d] * self.resolution) slope[nodata_mask] = -1 # Update flow direction if this neighbor has steepest descent better = slope > max_slope flow_dir[better] = d max_slope[better] = slope[better] # Flow accumulation: count upstream cells # Process cells from highest to lowest elevation flat_dem = dem_filled[~nodata_mask].flatten() valid_indices = np.where(~nodata_mask.flatten())[0] # Sort by elevation (highest first = process upstream first) sort_order = valid_indices[np.argsort(-flat_dem)] flow_acc = np.ones((rows, cols), dtype=np.float32) flow_acc[nodata_mask] = 0 # Accumulate flow for idx in sort_order: r, c = divmod(idx, cols) d = flow_dir[r, c] if d < 0: continue # Downstream cell nr, nc = r + dy8[d], c + dx8[d] if 0 <= nr < rows and 0 <= nc < cols and not nodata_mask[nr, nc]: flow_acc[nr, nc] += flow_acc[r, c] # Log-scale for visualization flow_log = np.log1p(flow_acc) with rasterio.open( output, 'w', driver='GTiff', height=flow_log.shape[0], width=flow_log.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(flow_log.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur flux: {e}") return None """Detect cavities/depressions below ground level. Uses local relief (difference between smoothed ground and actual terrain) to highlight depressions, dolines, sinkholes and any below-ground features. Surface features (mounds, walls) are suppressed so only depressions show.""" print(f" → Detection cavites (gouffres)...") output = self.vis_dir / f"{basename}_cavity.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs from scipy.ndimage import uniform_filter, minimum_filter, gaussian_filter from scipy.stats import binned_statistic_2d # Method 1: Local Relief - compare terrain to smoothed reference # Large window smoothing captures the regional ground level # Small depressions (cavities, dolines) appear as negative values window_large = int(100 / self.resolution) # 100m window if window_large % 2 == 0: window_large += 1 # Regional ground level (smoothed over large area) regional_ground = uniform_filter(dem, size=window_large) # Local relief = actual terrain minus regional ground # Positive = above regional ground (hills, walls) # Negative = below regional ground (cavities, dolines, sinkholes) local_relief = dem - regional_ground # Method 2: Minimum filter to detect sharp local depressions # Compares each point to its minimum neighborhood window_small = int(20 / self.resolution) | 1 # 20m window local_min = minimum_filter(dem, size=window_small) # Difference between local minimum and the actual terrain # If terrain is much higher than local minimum, there's a deep pit nearby pit_depth = dem - local_min # Only keep negative values (terrain lower than neighbors = depression) # This detects sharp, small depressions # Combine both methods: # Method 1 catches large gentle depressions (dolines) # Method 2 catches sharp small depressions (sinkholes, wells) # Take the most negative (deepest) signal combined = np.minimum(local_relief, -pit_depth) # Only show depressions (negative values = below ground) # Set positive values (hills, walls) to 0 so they don't obscure cavities combined = np.where(combined < 0, -combined, 0) # Slight smoothing to reduce noise combined = gaussian_filter(combined, sigma=1.0) # Mask NaN areas combined = np.where(np.isnan(dem), np.nan, combined) # Save with rasterio.open( output, 'w', driver='GTiff', height=combined.shape[0], width=combined.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(combined.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur cavity detection: {e}") import traceback traceback.print_exc() return None def generate_nodata_mask(self, dem_file, basename): """Generate a map showing areas without LiDAR coverage""" print(f" → Carte couverture LiDAR...") output = self.vis_dir / f"{basename}_nodata.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs # Create binary mask: 1 = has data, 0 = no data (NaN) # Also create distance from data edge to show interpolated areas from scipy.ndimage import distance_transform_edt has_data = (~np.isnan(dem)).astype(np.float32) no_data = np.isnan(dem).astype(np.float32) # Distance to nearest data point (in pixels) if np.any(no_data.astype(bool)): dist_from_data = distance_transform_edt(no_data.astype(bool)) else: dist_from_data = np.zeros_like(dem) # Normalize: 0 = has data, increasing values = further from data # Cap at a reasonable distance max_dist = 50 # pixels coverage = np.clip(dist_from_data / max_dist, 0, 1) # Areas with data = 0, areas without data > 0 # For visualization, use negative values for data and positive for gaps result = np.where(np.isnan(dem), coverage, -0.1) # -0.1 for data areas # Save with rasterio.open( output, 'w', driver='GTiff', height=result.shape[0], width=result.shape[1], count=1, dtype='float32', crs=crs, transform=transform, compress='lzw' ) as dst: dst.write(result.astype('float32'), 1) return output except Exception as e: print(f" ✗ Erreur nodata mask: {e}") import traceback traceback.print_exc() return None def download_ign_tiles(self, min_x, max_x, min_y, max_y, layer, zoom_level=15): """Download IGN WMTS tiles for the given bounds using Web Mercator (PM). Converts Lambert 93 bounds to lat/lon then to Web Mercator tile coordinates.""" import urllib.request import io import math from PIL import Image as PILImage # Convert Lambert 93 bounds to WGS84 lat/lon try: l93_xs = [min_x, max_x] l93_ys = [max_y, min_y] # NW, SE lons, lats = warp_transform('EPSG:2154', 'EPSG:4326', l93_xs, l93_ys) nw_lat, nw_lon = lats[0], lons[0] se_lat, se_lon = lats[1], lons[1] except Exception: print(f" ✗ Impossible de convertir les coordonnees") return None # IGN WMTS endpoint (free, no API key needed) wmts_url = "https://data.geopf.fr/wmts" tile_matrix_set = "PM" # Web Mercator (native cache) tile_size = 256 # Web Mercator resolution at each zoom level (meters per pixel at equator) # and tile calculation n = 2 ** zoom_level # Number of tiles at this zoom level # Calculate tile range from lat/lon bounds # Web Mercator formulas def lat_lon_to_tile(lat, lon, zoom): n = 2 ** zoom col = int((lon + 180) / 360 * n) lat_rad = math.radians(lat) row = int((1 - math.log(math.tan(lat_rad) + 1 / math.cos(lat_rad)) / math.pi) / 2 * n) return col, row col_min, row_min = lat_lon_to_tile(nw_lat, nw_lon, zoom_level) col_max, row_max = lat_lon_to_tile(se_lat, se_lon, zoom_level) # Calculate Web Mercator pixel coordinates for compositing def lat_lon_to_px(lat, lon, zoom): n = 2 ** zoom px_x = (lon + 180) / 360 * n * tile_size lat_rad = math.radians(lat) px_y = (1 - math.log(math.tan(lat_rad) + 1 / math.cos(lat_rad)) / math.pi) / 2 * n * tile_size return px_x, px_y nw_px_x, nw_px_y = lat_lon_to_px(nw_lat, nw_lon, zoom_level) se_px_x, se_px_y = lat_lon_to_px(se_lat, se_lon, zoom_level) out_width = int(se_px_x - nw_px_x) out_height = int(se_px_y - nw_px_y) if out_width <= 0 or out_height <= 0 or out_width > 5000 or out_height > 5000: return None composite = np.full((out_height, out_width, 3), 255, dtype=np.uint8) # Download and assemble tiles tiles_downloaded = 0 fmt = "image/png" if 'PLAN' in layer else "image/jpeg" for col in range(col_min, col_max + 1): for row in range(row_min, row_max + 1): url = ( f"{wmts_url}?SERVICE=WMTS&VERSION=1.0.0&REQUEST=GetTile" f"&LAYER={layer}&STYLE=normal" f"&TILEMATRIXSET={tile_matrix_set}" f"&TILEMATRIX={zoom_level}&TILECOL={col}&TILEROW={row}" f"&FORMAT={fmt}" ) try: req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req, timeout=10) as response: tile_data = response.read() tile_img = PILImage.open(io.BytesIO(tile_data)).convert('RGB') tile_arr = np.array(tile_img) # Calculate tile position in Web Mercator pixels tile_origin_x = col * tile_size tile_origin_y = row * tile_size # Position in composite image (relative to NW corner) px_x = int(tile_origin_x - nw_px_x) px_y = int(tile_origin_y - nw_px_y) # Paste tile into composite x_off = max(0, -px_x) y_off = max(0, -px_y) dst_x_start = max(0, px_x) dst_y_start = max(0, px_y) dst_x_end = min(out_width, px_x + tile_size) dst_y_end = min(out_height, px_y + tile_size) src_x = x_off src_y = y_off src_w = dst_x_end - dst_x_start src_h = dst_y_end - dst_y_start if src_w > 0 and src_h > 0 and tile_arr.shape[0] >= src_y + src_h and tile_arr.shape[1] >= src_x + src_w: composite[dst_y_start:dst_y_end, dst_x_start:dst_x_end] = \ tile_arr[src_y:src_y+src_h, src_x:src_x+src_w] tiles_downloaded += 1 except Exception as e: if tiles_downloaded == 0 and col == col_min and row == row_min: print(f" ✗ Erreur tuile IGN: {e}") continue print(f" → {tiles_downloaded} tuiles IGN telechargees ({layer})") if tiles_downloaded == 0: return None return composite def generate_ign_overlay(self, dem_file, basename, layer, title, legend_label, description, out_suffix): """Generate an IGN basemap overlay visualization""" print(f" → {title}...") output = self.vis_dir / f"{basename}_{out_suffix}.tif" try: with rasterio.open(dem_file) as src: dem = src.read(1) transform = src.transform crs = src.crs height, width = dem.shape top_left_x = transform.c top_left_y = transform.f pixel_size_x = transform.a pixel_size_y = abs(transform.e) min_x = top_left_x max_x = top_left_x + width * pixel_size_x max_y = top_left_y min_y = top_left_y - height * pixel_size_y # Choose zoom level based on resolution # At 0.5m/px, use zoom level 15 (3.19m/px IGN) zoom = 15 result = self.download_ign_tiles(min_x, max_x, min_y, max_y, layer, zoom) if result is None: print(f" ✗ Impossible de telecharger les tuiles IGN") return None ign_img = result # Resize IGN image to match our DTM dimensions from PIL import Image as PILImage ign_pil = PILImage.fromarray(ign_img) ign_resized = ign_pil.resize((width, height), PILImage.LANCZOS) ign_arr = np.array(ign_resized) # Save as GeoTIFF with rasterio.open( output, 'w', driver='GTiff', height=height, width=width, count=3, dtype='uint8', crs=crs, transform=transform, ) as dst: for i in range(3): dst.write(ign_arr[:, :, i], i + 1) return output except Exception as e: print(f" ✗ Erreur IGN overlay: {e}") import traceback traceback.print_exc() return None # ============ STEP 4: PNG Conversion ============ def tif_to_png(self, tif_file): """Convert GeoTIFF to visualization PNG with GPS coordinates, external legend/scale""" if not tif_file or not tif_file.exists(): return None jpg_file = self.vis_dir / f"{tif_file.stem}.webp" try: with rasterio.open(tif_file) as src: # Check if this is a 3-band RGB image (ortho/topo) is_rgb = src.count >= 3 and ('ortho' in str(tif_file) or 'topo' in str(tif_file)) if is_rgb: # Read all 3 bands for RGB data = src.read([1, 2, 3]) # Shape: (3, height, width) data = np.moveaxis(data, 0, -1) # Shape: (height, width, 3) else: data = src.read(1) nodata = src.nodata transform = src.transform crs = src.crs # Get geographic bounds from transform if is_rgb: height, width, _ = data.shape else: height, width = data.shape top_left_x = transform.c top_left_y = transform.f pixel_size_x = transform.a pixel_size_y = abs(transform.e) min_x = top_left_x max_x = top_left_x + width * pixel_size_x max_y = top_left_y min_y = top_left_y - height * pixel_size_y # Convert Lambert 93 corners to GPS (WGS84) gps_coords = {} if HAS_WARP and crs is not None: try: # Corner coordinates in Lambert 93 l93_xs = [min_x, max_x, min_x, max_x] l93_ys = [max_y, max_y, min_y, min_y] lons, lats = warp_transform(crs, 'EPSG:4326', l93_xs, l93_ys) gps_coords = { 'NW': (lats[0], lons[0]), 'NE': (lats[1], lons[1]), 'SW': (lats[2], lons[2]), 'SE': (lats[3], lons[3]), } # Also compute tick positions along edges n_ticks = 5 # Bottom edge (X axis): longitude ticks tick_l93_x = np.linspace(min_x, max_x, n_ticks) tick_l93_y_bottom = np.full(n_ticks, min_y) tick_lons, tick_lats = warp_transform(crs, 'EPSG:4326', tick_l93_x, tick_l93_y_bottom) gps_coords['x_ticks'] = list(zip(tick_lons, tick_lats)) # Left edge (Y axis): latitude ticks tick_l93_y = np.linspace(min_y, max_y, n_ticks) tick_l93_x_left = np.full(n_ticks, min_x) tick_lons_y, tick_lats_y = warp_transform(crs, 'EPSG:4326', tick_l93_x_left, tick_l93_y) gps_coords['y_ticks'] = list(zip(tick_lons_y, tick_lats_y)) except Exception: gps_coords = {} if nodata is not None and not is_rgb: data = np.ma.masked_where((data == nodata) | np.isnan(data), data) # For RGB images (ortho/topo), skip normalization and display directly if is_rgb: # RGB image: skip all normalization, display as-is pass else: # Get valid data for normalization valid_data = data.compressed() if hasattr(data, 'compressed') else data.flatten() valid_data = valid_data[~np.isnan(valid_data)] # Determine colormap, normalization, and legend info if is_rgb: # RGB images: no colormap needed, display directly if 'ortho' in str(tif_file): title = "Photographie Aerienne IGN" legend_label = "Orthophotographie\nImage aerienne" description = "Photographie aerienne IGN" elif 'topo' in str(tif_file): title = "Carte Topographique IGN" legend_label = "Carte IGN\nPlan topographique" description = "Carte topographique IGN (Plan IGN)" else: title = tif_file.stem.replace('_', ' ').title() legend_label = "Image RGB" description = "" elif 'hillshade' in str(tif_file): cmap = 'gray' vmin, vmax = 0, 1 data = np.clip(data, vmin, vmax) title = "Hillshade Multidirectionnel" legend_label = "Illumination combinee de 6 directions\nBlanc = Face eclairee | Noir = Zone d'ombre" description = "Ombres portees revelant micro-relief (murs, fosses, terrasses)" elif 'slope' in str(tif_file): cmap = 'inferno' vmin, vmax = 0, np.percentile(valid_data, 95) data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Pente (Inclinaison du terrain)" legend_label = f"Inclinaison en degres\nMin: {vmin:.1f} | Max: {vmax:.1f}\nClair = Forte pente | Sombre = Terrain plat" description = "Murs, talus et bords ressortent en clair - terrain plat en sombre" elif 'aspect' in str(tif_file): cmap = 'hsv' vmin, vmax = 0, 360 data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Aspect (Direction des pentes)" legend_label = "Direction vers laquelle le terrain descend\nRouge=Nord | Vert=Est | Cyan=Sud | Bleu=Ouest" description = "Orientation des pentes - utile pour distinguer structures des formes naturelles" elif 'curvature' in str(tif_file): cmap = 'RdYlBu_r' vmax = max(abs(np.percentile(valid_data, 5)), abs(np.percentile(valid_data, 95)), 0.001) vmin, vmax = -vmax, vmax data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Courbure (Convexite/Concavite du terrain)" legend_label = f"Changement de pente (1/m)\nRouge = Convexe (sommet de mur, levee)\nBleu = Concave (fond de fosse, depression)" description = "Detecte les ruptures de pente - utile pour bords de terrasses et levees" elif 'svf' in str(tif_file): cmap = 'viridis' vmin, vmax = np.percentile(valid_data, (5, 95)) data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Sky-View Factor (Ray-tracing 16 directions)" legend_label = "Fraction de ciel visible depuis chaque point\nSombre = Encaisse (fosses, vallees, rues)\nClair = Degage (sommets, plateformes, plateaux)" description = "Ray-tracing sur 16 azimuts - elimine l'eclairage, detecte structures lineaires et enclos" elif 'mslrm' in str(tif_file): cmap = 'RdBu_r' vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1) vmin, vmax = -vmax, vmax data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "MSRM - Multi-Scale Relief Model (5 echelles)" legend_label = f"Relief combine a 5 echelles (5m a 100m)\nRouge = Surelevation (mur, tumulus, levee)\nBleu = Depression (fosse, douve, fosse)\n\nDifference avec LRM:\nLRM = 1 echelle (15m)\nMSRM = 5 echelles combinees\nMSRM detecte du micro au macro" description = "Combine LRM a 5 echelles - detecte structures de 5m a 100m simultanement" elif 'lrm' in str(tif_file): cmap = 'RdBu_r' vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1) vmin, vmax = -vmax, vmax data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "LRM - Local Relief Model (echelle unique 15m)" legend_label = f"Ecart local par rapport au terrain moyen (m)\nRouge = Surelevation (+{vmax:.2f}m)\nBleu = Depression ({vmin:.2f}m)\nNoyau gaussien unique de 15m" description = "Micro-relief a 15m seulement - voir MSRM pour toutes les echelles" elif 'positive' in str(tif_file): cmap = 'YlOrBr' vmin, vmax = np.percentile(valid_data, (10, 98)) data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Openness Positive (ouverture vers le haut)" legend_label = f"Angle d'ouverture vers le haut (deg)\nClair = Vue degagee vers le ciel (sommets, plateaux)\nSombre = Vue bouchee (vallees encaissees)" description = "Ray-tracing 8 directions - complementaire de la negative pour detecter cretes" elif 'negative' in str(tif_file): cmap = 'GnBu_r' vmin, vmax = np.percentile(valid_data, (10, 98)) data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Openness Negative (ouverture vers le bas)" legend_label = f"Angle d'ouverture vers le bas (deg)\nClair = Surplomb (bords de fosse, grottes)\nSombre = Terrain plat (fonds de vallee)\nMeilleur detecteur de cavites et dolines" description = "Ray-tracing 8 directions - detecte fosses, dolines, souterrains, dolines" elif 'tpi' in str(tif_file): cmap = 'BrBG' vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1) vmin, vmax = -vmax, vmax data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "TPI - Topographic Position Index (2 echelles)" legend_label = "Position dans le paysage\nBrun/Sombre = Plus bas que le voisinage (fosse, vallee)\nVert/Clair = Plus haut que le voisinage (crete, plateau)\nCombine echelle fine 5m + large 100m" description = "Identifie la position topographique - utile pour repeter cretes vs vallees a grande echelle" elif 'depressions' in str(tif_file): cmap = 'YlOrRd' vmin, vmax = 0, max(np.percentile(valid_data, 99), 0.1) data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Depressions (Remplissage hydrologique)" legend_label = f"Profondeur des cuvettes detectees (m)\nTransparent = Pas de depression\nJaune = Depression legere | Rouge = Depression profonde\nMax: {vmax:.2f}m" description = "Simule le remplissage d'eau - detecte dolines, sinkholes, cuvettes et zones inondables" elif 'sailore' in str(tif_file): cmap = 'seismic' vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1) vmin, vmax = -vmax, vmax data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "SAILORE - LRM Auto-Adaptatif" legend_label = f"Relief local adaptatif (m)\nRouge = Surelevation | Bleu = Depression\n\nDifference avec LRM/MSRM:\nLRM = noyau fixe 15m\nMSRM = 5 noyaux fixes\nSAILORE = noyau adapte a la pente\nPlat=grand noyau | Pente=petit noyau" description = "Noyau qui s'adapte a la pente locale - terrain plat=grand noyau, pente=petit noyau" elif 'roughness' in str(tif_file): cmap = 'magma' vmin, vmax = 0, np.percentile(valid_data, 97) data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Rugosite de Surface (Ecart-type local 5m)" legend_label = f"Irregularite du terrain dans un voisinage de 5m\nSombre = Surface lisse (route, mur, sol plat)\nClair = Surface rugueuse (vegetation, ruines, pierres)\nMax: {vmax:.2f}m" description = "Mesure la variabilite locale - surfaces anthropiques lisses vs naturelles rugueuses" elif 'anomalies' in str(tif_file): cmap = 'coolwarm' vmax = max(abs(np.percentile(valid_data, 5)), abs(np.percentile(valid_data, 95)), 0.1) vmin, vmax = -vmax, vmax data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Anomalies Statistiques (Z-score x Moran's I)" legend_label = "Anomalies topographiques significatives\nRouge vif = Surelevation anormale (mur, tumulus)\nBleu vif = Depression anormale (fosse, doline)\nBlanc/gris = Normal\n\nCombine z-score (intensite) et\nMoran's I (regroupement spatial)" description = "Detecte uniquement les anomalies statistiquement significatives - filtre le bruit de fond" elif 'wavelet' in str(tif_file): cmap = 'cividis' vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1) vmin, vmax = -vmax, vmax data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Ondelette Mexican Hat (CWT multi-echelle)" legend_label = "Reponse de la transformee en ondelette a 5 echelles\nEchelles: 2m, 5m, 10m, 20m, 50m\n\nClair = Structure detectee a cette echelle\nSombre = Pas de structure\n\nOptimise pour formes circulaires:\ntumulus, enclos, fosses annulaires" description = "Transformee en ondelette 2D - excellente pour detecter structures circulaires" elif 'texture' in str(tif_file): cmap = 'inferno' vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1) vmin, vmax = -vmax, vmax data = np.clip((data - vmin) / (vmax - vmin), 0, 1) title = "Texture GLCM (Contraste + Entropie - Homogeneite)" legend_label = "Analyse de la texture du relief (fenetre 5m)\nClair = Texture heterogene (labour, ruines, sol perturbe)\nSombre = Texture homogene (sol nu, route, zone plate)\n\nCombine contraste, entropie et homogeneite" description = "Distingue surfaces anthropiques (labour, chemins) des naturelles" elif 'flow' in str(tif_file): cmap = 'Blues' vmin, vmax = 0, np.percentile(valid_data, 98) data = np.clip((data - vmin) / max(vmax - vmin, 0.01), 0, 1) title = "Accumulation de Flux Hydrologique (D8)" legend_label = f"Logarithme de l'accumulation d'eau\nBlanc = Pas de collecte (sommet, crete)\nBleu fonce = Collecte maximale (thalweg, fosse)\n\nSimule l'ecoulement de l'eau de pluie\nDetecte fosses d'enceinte, routes et drainage" description = "Algorithme D8 - simule le cheminement de l'eau pour detecter fosses et routes antiques" elif 'ortho' in str(tif_file): # Aerial photo - display RGB directly title = "Photographie Aerienne IGN" legend_label = "Orthophotographie\nImage aerienne" description = "Photographie aerienne IGN" # For 3-band RGB, display directly if data.ndim == 3: # Already RGB, just display pass else: cmap = 'gray' # Skip normalization for RGB elif 'topo' in str(tif_file): # Topographic map - display RGB directly title = "Carte Topographique IGN" legend_label = "Carte IGN\nPlan topographique" description = "Carte topographique IGN (Plan IGN)" # For 3-band RGB, display directly if data.ndim == 3: pass else: cmap = 'gray' else: cmap = 'terrain' p2, p98 = np.percentile(valid_data, (2, 98)) data = np.clip((data - p2) / (p98 - p2), 0, 1) title = tif_file.stem.replace('_', ' ').title() legend_label = "Altitude normalisee" description = "" # ---- Create figure with external margins ---- # Layout: map with colorbar, info bar below # Map aspect ratio determines the figure height dynamically from matplotlib.gridspec import GridSpec # Keep image width-based sizing, add room for colorbar and bottom info fig_width = 20 # Map height based on data aspect ratio, plus margins for title and info map_aspect = height / width fig = plt.figure(figsize=(fig_width, fig_width * map_aspect * 0.7 + 2.5), facecolor='white') gs = GridSpec(2, 1, height_ratios=[1.0, 0.06], hspace=0.04, figure=fig, left=0.06, right=0.88, top=0.93, bottom=0.08) # Map row (with title above) ax = fig.add_subplot(gs[0]) # Display data - RGB or single-band if is_rgb: im = ax.imshow(data, aspect='equal', origin='upper') else: im = ax.imshow(data, cmap=cmap, aspect='equal', origin='upper') # Title above the map ax.set_title(f"{title}\n{description}", fontsize=15, fontweight='bold', pad=10) # Colorbar on the right side (not for RGB images) if not is_rgb: cbar = plt.colorbar(im, ax=ax, pad=0.02, shrink=0.85, aspect=30) cbar.ax.tick_params(labelsize=9, width=1.5) cbar.outline.set_linewidth(1.5) cbar.set_label(legend_label, fontsize=10, fontweight='bold') else: # For other RGB images (ortho/topo), add legend label as text on the right ax.text(1.02, 0.5, legend_label, transform=ax.transAxes, fontsize=10, fontweight='bold', rotation=90, verticalalignment='center', horizontalalignment='left') # ---- GPS coordinate tick labels ---- if gps_coords and 'x_ticks' in gps_coords: # X-axis: longitude along bottom x_pixel_positions = np.linspace(0, width - 1, len(gps_coords['x_ticks'])) x_labels = [f"{lon:.5f}E" for lon, lat in gps_coords['x_ticks']] ax.set_xticks(x_pixel_positions) ax.set_xticklabels(x_labels, fontsize=7, rotation=30) ax.set_xlabel('Longitude', fontsize=9, fontweight='bold') # Y-axis: latitude along left y_pixel_positions = np.linspace(0, height - 1, len(gps_coords['y_ticks'])) y_labels = [f"{lat:.5f}N" for lon, lat in gps_coords['y_ticks']] ax.set_yticks(y_pixel_positions) ax.set_yticklabels(y_labels, fontsize=7) ax.set_ylabel('Latitude', fontsize=9, fontweight='bold') else: # Fallback to Lambert 93 coordinates x_ticks_count = 5 x_positions = np.linspace(0, width - 1, x_ticks_count) x_labels = [f"{(min_x + xp * pixel_size_x)/1000:.1f}" for xp in x_positions] ax.set_xticks(x_positions) ax.set_xticklabels(x_labels, fontsize=8) ax.set_xlabel('Est (km) - Lambert 93', fontsize=9, fontweight='bold') y_ticks_count = 5 y_positions = np.linspace(0, height - 1, y_ticks_count) y_labels = [f"{(max_y - yp * pixel_size_y)/1000:.1f}" for yp in y_positions] ax.set_yticks(y_positions) ax.set_yticklabels(y_labels, fontsize=8) ax.set_ylabel('Nord (km) - Lambert 93', fontsize=9, fontweight='bold') # Style axes ax.tick_params(axis='both', which='both', direction='out', length=3, width=0.8, colors='black') for spine in ax.spines.values(): spine.set_visible(True) spine.set_color('black') spine.set_linewidth(0.8) # ---- North arrow ---- from matplotlib.patches import Polygon as MplPolygon from mpl_toolkits.axes_grid1.inset_locator import inset_axes north_ax = inset_axes(ax, width="4%", height="7%", loc='upper right', bbox_to_anchor=(-0.03, 0.12, 1, 1), bbox_transform=ax.transAxes) north_ax.set_xlim(0, 1) north_ax.set_ylim(0, 1) north_ax.axis('off') north_ax.plot([0.5, 0.5], [0.1, 0.65], color='black', linewidth=2.5, zorder=10) north_ax.add_patch(MplPolygon([[0.5, 0.2], [0.35, 0.4], [0.5, 0.65], [0.65, 0.4]], closed=True, facecolor='black', edgecolor='black', zorder=9)) north_ax.text(0.5, 0.95, 'N', ha='center', va='top', fontsize=13, fontweight='bold', color='black', zorder=11) # ---- Bottom cartouche: info + scale bar OUTSIDE the map ---- info_ax = fig.add_subplot(gs[1]) info_ax.axis('off') # Altitude range (skip for RGB images) if is_rgb: alt_min = 0 alt_max = 0 else: alt_min = np.nanmin(valid_data) if len(valid_data) > 0 else 0 alt_max = np.nanmax(valid_data) if len(valid_data) > 0 else 0 extent_km_x = (max_x - min_x) / 1000 extent_km_y = (max_y - min_y) / 1000 # Build info text (left side of bottom row) if gps_coords: nw_lat, nw_lon = gps_coords['NW'] se_lat, se_lon = gps_coords['SE'] info_text = ( f"GPS: {nw_lat:.5f}N {nw_lon:.5f}E - {se_lat:.5f}N {se_lon:.5f}E | " f"EPSG:2154 | Res: {self.resolution}m/px | " f"Emprise: {extent_km_x:.1f}x{extent_km_y:.1f}km" ) else: info_text = ( f"EPSG:2154 | X: {min_x:.0f}-{max_x:.0f} Y: {min_y:.0f}-{max_y:.0f} | " f"Res: {self.resolution}m/px | Emprise: {extent_km_x:.1f}x{extent_km_y:.1f}km" ) info_ax.text(0.01, 0.5, info_text, transform=info_ax.transAxes, fontsize=8.5, verticalalignment='center', family='monospace', bbox=dict(boxstyle='round,pad=0.3', facecolor='#f0f0f0', edgecolor='#aaaaaa', alpha=0.95)) # Scale bar: always 100m (fixed scale) # 100m at 0.5m/px = 200 pixels scale_m = 100 scale_label = "100 m" pixels_per_meter = 1.0 / pixel_size_x scale_px = int(scale_m * pixels_per_meter) # Convert pixel length to fraction of the axis width # Scale bar as fraction of the info axis (which matches map width) scale_bar_frac = scale_px / width # Draw scale bar as simple black bar with text from matplotlib.patches import Rectangle as RectPatch bar_left = 0.80 bar_bottom = 0.15 bar_width_frac = min(scale_bar_frac, 0.15) # Cap at 15% of width bar_height = 0.35 info_ax.add_patch(RectPatch((bar_left, bar_bottom), bar_width_frac, bar_height, facecolor='black', edgecolor='black', linewidth=0.5, transform=info_ax.transAxes, clip_on=False)) info_ax.text(bar_left + bar_width_frac / 2, bar_bottom + bar_height + 0.12, scale_label, ha='center', va='bottom', fontsize=9, fontweight='bold', transform=info_ax.transAxes) fig.patch.set_facecolor('white') # Save as PNG first (matplotlib doesn't support WebP well) png_temp = self.vis_dir / f"{tif_file.stem}_temp.png" plt.savefig(png_temp, dpi=150, bbox_inches='tight', pad_inches=0.15, facecolor='white', format='png') plt.close() # Convert PNG to lossless WebP using PIL from PIL import Image as PILImage img = PILImage.open(str(png_temp)) img.save(str(jpg_file), format='WEBP', lossless=True) png_temp.unlink() # Delete temp PNG # Delete the source TIFF file to save space tif_file.unlink() return jpg_file except Exception as e: print(f" Erreur conversion PNG: {e}") import traceback traceback.print_exc() return None def generate_all_visualizations(self, dtm_file, basename): """Generate all archaeological visualizations""" print(f"\n Génération visualisations:") # Create per-file subdirectory file_vis_dir = self.vis_dir / basename file_vis_dir.mkdir(exist_ok=True) # Temporarily override vis_dir for this file original_vis_dir = self.vis_dir self.vis_dir = file_vis_dir vis_results = {} # Core visualizations vis_results['hillshade'] = self.generate_hillshade(dtm_file, basename) vis_results['slope'] = self.generate_slope(dtm_file, basename) vis_results['aspect'] = self.generate_aspect(dtm_file, basename) vis_results['curvature'] = self.generate_curvature(dtm_file, basename) vis_results['svf'] = self.generate_svf(dtm_file, basename) vis_results['lrm'] = self.generate_lrm(dtm_file, basename) vis_results['pos_open'] = self.generate_openness(dtm_file, basename, positive=True) vis_results['neg_open'] = self.generate_openness(dtm_file, basename, positive=False) # Advanced visualizations vis_results['mslrm'] = self.generate_mslrm(dtm_file, basename) vis_results['tpi'] = self.generate_tpi(dtm_file, basename) vis_results['depressions'] = self.generate_depressions(dtm_file, basename) vis_results['sailore'] = self.generate_sailore(dtm_file, basename) vis_results['roughness'] = self.generate_roughness(dtm_file, basename) vis_results['anomalies'] = self.generate_anomalies(dtm_file, basename) vis_results['wavelet'] = self.generate_wavelet(dtm_file, basename) vis_results['texture'] = self.generate_texture(dtm_file, basename) vis_results['flow'] = self.generate_flow(dtm_file, basename) # IGN overlays vis_results['ortho'] = self.generate_ign_overlay( dtm_file, basename, layer='ORTHOIMAGERY.ORTHOPHOTOS', title='Photographie Aerienne IGN', legend_label='Orthophotographie\nImage aerienne', description='Photographie aerienne IGN (Orthophoto)', out_suffix='ortho' ) vis_results['topo'] = self.generate_ign_overlay( dtm_file, basename, layer='GEOGRAPHICALGRIDSYSTEMS.PLANIGNV2', title='Carte Topographique IGN', legend_label='Carte IGN\nPlan topographique', description='Carte topographique IGN (Plan IGN)', out_suffix='topo' ) # Convert to PNG print(f"\n Conversion images PNG:") for name, tif_file in vis_results.items(): if tif_file: jpg_file = self.tif_to_png(tif_file) if jpg_file: print(f" ✓ {jpg_file.name}") # Restore original vis_dir self.vis_dir = original_vis_dir return vis_results # ============ STEP 5: PDF Report ============ def generate_pdf_report(self, basename): """Generate A3 PDF report for a LiDAR file with all visualizations. Page 1: Mise en situation (ortho + topo IGN side by side) Pages 2+: Other visualizations (2 per page)""" from matplotlib.backends.backend_pdf import PdfPages pdf_file = self.pdf_dir / f"{basename}_rapport.pdf" print(f" → Génération rapport PDF A3: {pdf_file.name}") # Look for PNGs in per-file subdirectory first, then fallback to main dir file_vis_dir = self.vis_dir / basename if file_vis_dir.exists(): png_files = sorted(file_vis_dir.glob("*.webp")) else: png_files = sorted(self.vis_dir.glob(f"{basename}_*.webp")) if not png_files: print(f" ✗ Aucune image PNG trouvée pour {basename}") return None # Categorize files situ_files = [] # Page 1: situation maps analysis_files = [] # Pages 2+: analysis maps for f in png_files: name = f.stem.lower() if 'ortho' in name: situ_files.insert(0, f) # Ortho first elif 'topo' in name: situ_files.append(f) # Topo second else: analysis_files.append(f) # Define display order for analysis maps order = ['mslrm', 'svf', 'negative_openness', 'positive_openness', 'sailore', 'depressions', 'hillshade_multi', 'lrm', 'tpi', 'slope', 'curvature', 'aspect', 'roughness', 'anomalies', 'wavelet', 'texture', 'flow'] def sort_key(f): name = f.stem.lower() for i, key in enumerate(order): if key in name: return i return len(order) analysis_files.sort(key=sort_key) # A3 landscape dimensions in inches (420mm x 297mm) a3_w, a3_h = 16.54, 11.69 try: with PdfPages(str(pdf_file)) as pdf: # ============ PAGE 1: Mise en situation ============ if situ_files: fig = plt.figure(figsize=(a3_w, a3_h), facecolor='white') n_situ = len(situ_files) if n_situ == 2: # Side by side gs = fig.add_gridspec(1, 2, wspace=0.05, left=0.03, right=0.97, top=0.92, bottom=0.06) else: # Single or more gs = fig.add_gridspec(1, max(n_situ, 1), wspace=0.05, left=0.03, right=0.97, top=0.92, bottom=0.06) fig.text(0.5, 0.97, f"Mise en situation - {basename}", fontsize=20, fontweight='bold', ha='center', va='top') for i, f in enumerate(situ_files): ax = fig.add_subplot(gs[0, i]) img = plt.imread(str(f)) ax.imshow(img) ax.axis('off') # Add title from filename title = f.stem.replace(basename + '_', '').replace('_', ' ').title() ax.set_title(title, fontsize=12, fontweight='bold', pad=5) pdf.savefig(fig, dpi=150) plt.close(fig) # ============ PAGES 2+: Analysis maps (2 per page) ============ # Group analysis files 2 per page for page_start in range(0, len(analysis_files), 2): page_files = analysis_files[page_start:page_start + 2] fig = plt.figure(figsize=(a3_w, a3_h), facecolor='white') if len(page_files) == 2: gs = fig.add_gridspec(1, 2, wspace=0.08, left=0.03, right=0.97, top=0.93, bottom=0.05) else: gs = fig.add_gridspec(1, 1, left=0.05, right=0.95, top=0.93, bottom=0.05) for i, f in enumerate(page_files): ax = fig.add_subplot(gs[0, i]) img = plt.imread(str(f)) ax.imshow(img) ax.axis('off') # Title from filename title = f.stem.replace(basename + '_', '').replace('_', ' ').title() ax.set_title(title, fontsize=11, fontweight='bold', pad=3) # Page number page_num = (page_start // 2) + 2 # Start from page 2 fig.text(0.99, 0.01, f"Page {page_num}", fontsize=8, ha='right', va='bottom', color='gray') pdf.savefig(fig, dpi=150) plt.close(fig) print(f" ✓ Rapport PDF: {pdf_file.name}") return pdf_file except Exception as e: print(f" ✗ Erreur PDF: {e}") import traceback traceback.print_exc() return None # ============ Complete Pipeline ============ def process_file(self, laz_file): """Process a single LAZ file""" basename = laz_file.stem print(f"\n{'='*60}") print(f" FICHIER : {basename}") print(f"{'='*60}") # Step 1: Ground classification print(f"\n[1/3] Classification du sol...") las_file = self.classify_ground(laz_file) if not las_file: print(f" ✗ Échec classification") return False # Step 2: Generate DTM print(f"\n[2/3] Génération DTM...") dtm_file = self.create_dtm_fast(las_file, basename) if not dtm_file: print(f" ✗ Échec DTM") return False # Step 3: Visualizations print(f"\n[3/4] Visualisations archéologiques...") vis = self.generate_all_visualizations(dtm_file, basename) # Step 4: PDF report print(f"\n[4/4] Rapport PDF A3...") self.generate_pdf_report(basename) print(f"\n✓ {basename} traité avec succès !") return True def process_all(self): """Process all LAZ files in input directory""" files = self.find_laz_files() if not files: print("Aucun fichier LAZ/LAS trouvé !") return print(f"\n{'='*60}") print(f" PIPELINE ARCHÉOLOGIQUE LiDAR") print(f"{'='*60}") # Check tools print(f"\nVérification des outils...") if not self.check_tools(): print("Erreur: Certains outils ne sont pas disponibles") return results = {} if self.workers > 1 and len(files) > 1: # Process multiple files in parallel print(f"\n Traitement parallèle avec {self.workers} workers...") with ProcessPoolExecutor(max_workers=self.workers) as executor: future_to_file = { executor.submit(self._process_file_standalone, str(laz_file), str(self.input_dir), str(self.output_dir), self.resolution): laz_file for laz_file in files } for future in as_completed(future_to_file): laz_file = future_to_file[future] try: success = future.result() results[laz_file.name] = success status = "✓" if success else "✗" print(f" {status} {laz_file.name}") except Exception as e: print(f"\n✗ Erreur traitement {laz_file.name}: {e}") results[laz_file.name] = False else: # Sequential processing for laz_file in files: try: results[laz_file.name] = self.process_file(laz_file) except Exception as e: print(f"\n✗ Erreur traitement: {e}") import traceback traceback.print_exc() results[laz_file.name] = False # Summary print(f"\n{'='*60}") print(f" RÉSUMÉ") print(f"{'='*60}") success_count = sum(results.values()) print(f"✓ {success_count}/{len(results)} fichiers traités avec succès") print(f"\nRésultats dans: {self.output_dir}") print(f" • DTM: {self.dtm_dir}") print(f" • Visualisations PNG: {self.vis_dir}") print(f" • Rapports PDF: {self.pdf_dir}") # Clean up temporary files to save space print(f"\nNettoyage des fichiers temporaires...") try: import shutil if self.temp_dir.exists(): shutil.rmtree(self.temp_dir) print(f" ✓ Fichiers temporaires supprimés ({self.temp_dir})") except Exception as e: print(f" Note: Impossible de supprimer les fichiers temporaires") @staticmethod def _process_file_standalone(laz_file_str, input_dir, output_dir, resolution): """Standalone function for multiprocessing - creates its own pipeline instance.""" pipeline = LidarArchaeoPipeline(input_dir, output_dir, resolution=resolution, workers=1) laz_file = Path(laz_file_str) return pipeline.process_file(laz_file) def main(): import argparse parser = argparse.ArgumentParser( description="Pipeline LiDAR pour détection archéologique (Python pur)" ) parser.add_argument( "input", help="Dossier contenant les fichiers LAZ/LAS" ) parser.add_argument( "-o", "--output", default="/data/output", help="Dossier de sortie" ) parser.add_argument( "-r", "--resolution", type=float, default=0.5, help="Résolution en mètres" ) parser.add_argument( "-w", "--workers", type=int, default=1, help="Nombre de workers pour traitement parallèle (défaut: 1)" ) args = parser.parse_args() try: pipeline = LidarArchaeoPipeline( input_dir=args.input, output_dir=args.output, resolution=args.resolution, workers=args.workers ) pipeline.process_all() except Exception as e: print(f"Erreur: {e}") sys.exit(1) if __name__ == "__main__": main()