"""Terrain visualization functions for LiDAR archaeological analysis. Each function takes (dem_file, basename, vis_dir, resolution) as explicit parameters and returns the path to the output GeoTIFF file, or None on error. When a SharedDEM object is provided via the `shared` parameter, pre-computed data (gradient, NaN mask, LRM) is reused across visualizations to avoid redundant I/O and computation. """ import logging import time import warnings from pathlib import Path import numpy as np import rasterio from scipy.ndimage import generic_filter from scipy.stats import binned_statistic_2d from .gpu import HAS_GPU, to_gpu, to_cpu, xp_gaussian_filter, xp_uniform_filter, xp_minimum_filter, xp_maximum_filter, gpu_cleanup logger = logging.getLogger("lidar") # Use CuPy array module when available if HAS_GPU: import cupy as cp xp = cp else: xp = np class SharedDEM: """Pre-computed DEM data shared across all visualizations. Reads the DEM once and pre-computes: - NaN mask and filled DEM (avoids 20+ calls to _fill_nans) - Gradient components (shared by hillshade, slope, aspect, curvature) - LRM at 15m kernel (shared by lrm + anomalies) """ def __init__(self, dem_file, resolution): dem_np, transform, crs = _read_dem(dem_file) self.dem_file = dem_file self.resolution = resolution self.transform = transform self.crs = crs self.nan_mask = np.isnan(dem_np) self.dem_np = dem_np.astype(np.float32) # Pre-fill NaNs once (saves ~20 calls to NearestNDInterpolator) self.filled, _ = _fill_nans(self.dem_np) # Initialize GPU lazy caches before any filter calls self._filled_gpu = None self._dem_gpu = None # Pre-compute gradient (shared by hillshade, slope, aspect, curvature) self.dy = np.gradient(self.filled, resolution, axis=0) self.dx = np.gradient(self.filled, resolution, axis=1) self.slope_rad = np.arctan(np.sqrt(self.dx**2 + self.dy**2)) self.slope_deg = np.degrees(self.slope_rad) self.aspect = np.mod(np.degrees(np.arctan2(self.dy, self.dx)), 360) # Pre-compute LRM at 15m (shared by lrm + anomalies) sigma_15 = 15.0 / resolution local_mean_15 = _filter_nanaware_from_filled(self, xp_gaussian_filter, sigma=sigma_15) self.lrm_15 = self.dem_np - local_mean_15 self.lrm_15[self.nan_mask] = np.nan @property def filled_gpu(self): """Lazy GPU copy of the filled DEM.""" if self._filled_gpu is None and HAS_GPU: self._filled_gpu = to_gpu(self.filled) return self._filled_gpu @property def dem_gpu(self): """Lazy GPU copy of the DEM.""" if self._dem_gpu is None and HAS_GPU: self._dem_gpu = to_gpu(self.dem_np) return self._dem_gpu def _filter_nanaware_from_filled(shared, filter_func, *args, **kwargs): """Apply filter on pre-filled DEM data (skips expensive _fill_nans). Uses the SharedDEM.filled array directly, then restores NaN mask. If GPU is available, uses the lazy GPU copy to avoid CPU↔GPU transfers. """ if HAS_GPU and shared.filled_gpu is not None: filled_gpu = to_gpu(shared.filled) result_gpu = filter_func(filled_gpu, *args, **kwargs) result = to_cpu(result_gpu) gpu_cleanup() else: result = filter_func(shared.filled, *args, **kwargs) result[shared.nan_mask] = np.nan return result def _save_tif(output_path, data, transform, crs, dtype='float32', count=1, nodata=None, nan_mask=None): """Helper to save a 2D or 3D array as GeoTIFF. Args: nan_mask: Optional boolean mask (True=NaN) to apply before saving. Restores NaN zones in gradient-derived products that were computed on the filled DEM. """ if nan_mask is not None: data = np.array(data, dtype=dtype, copy=True) data[nan_mask] = np.nan # Auto-detect nodata for float types with NaN if nodata is None and dtype.startswith('float') and np.any(np.isnan(data)): nodata = float('nan') if data.ndim == 2: height, width = data.shape with rasterio.open( output_path, 'w', driver='GTiff', height=height, width=width, count=count, dtype=dtype, crs=crs, transform=transform, compress='deflate', nodata=nodata ) as dst: dst.write(data.astype(dtype), 1) elif data.ndim == 3: bands, height, width = data.shape with rasterio.open( output_path, 'w', driver='GTiff', height=height, width=width, count=bands, dtype=dtype, crs=crs, transform=transform, compress='deflate', nodata=nodata ) as dst: for i in range(bands): dst.write(data[i].astype(dtype), i + 1) def _read_dem(dem_file): """Read DEM file and return (data, transform, crs).""" with rasterio.open(dem_file) as src: return src.read(1), src.transform, src.crs def _fill_nans(arr): """Fill NaN values using nearest-neighbor interpolation. Returns (filled_array, nan_mask) so the caller can restore NaN after filtering. """ from scipy.interpolate import NearestNDInterpolator nan_mask = np.isnan(arr) if not np.any(nan_mask): return arr, nan_mask valid = ~nan_mask y_coords, x_coords = np.where(valid) if len(y_coords) == 0: return np.zeros_like(arr), nan_mask z_values = arr[valid] interp = NearestNDInterpolator( np.column_stack((y_coords, x_coords)), z_values ) y_missing, x_missing = np.where(nan_mask) filled = arr.copy() filled[y_missing, x_missing] = interp(y_missing, x_missing) return filled, nan_mask def _filter_nanaware(arr, filter_func, *args, use_gpu=True, **kwargs): """Apply a filter to an array while preserving NaN zones. 1. Fill NaN with nearest-neighbor interpolation 2. Apply the filter 3. Restore original NaN mask on the result Args: arr: Input array (numpy or cupy). filter_func: Function that takes (array, *args, **kwargs) and returns filtered array. use_gpu: If True, apply filter on GPU (send filled array to GPU first). Returns: Filtered array with original NaN positions preserved. """ is_gpu_arr = HAS_GPU and isinstance(arr, cp.ndarray) arr_np = to_cpu(arr) if is_gpu_arr else arr filled, nan_mask = _fill_nans(arr_np) if use_gpu and HAS_GPU: filled_gpu = to_gpu(filled) result_gpu = filter_func(filled_gpu, *args, **kwargs) result = to_cpu(result_gpu) else: result = filter_func(filled, *args, **kwargs) result[nan_mask] = np.nan return result # ============================================================ # Core terrain visualizations # ============================================================ def generate_hillshade(dem_file, basename, vis_dir, resolution, shared=None): """Generate multi-directional hillshade with slope shading — GPU if available. Combines 4-direction hillshade (NW, NE, SW, SE) with slope shading for improved micro-relief visibility on flat terrain. Result = 0.7 * hillshade + 0.3 * cos(slope). """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Hillshade multidirectionnel{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_hillshade_multi.tif" try: if shared: transform = shared.transform crs = shared.crs dem = to_gpu(shared.dem_np) dy = to_gpu(shared.dy) if HAS_GPU else shared.dy dx = to_gpu(shared.dx) if HAS_GPU else shared.dx slope = to_gpu(shared.slope_rad) if HAS_GPU else shared.slope_rad aspect = xp.arctan2(dy, dx) sin_slope = xp.sin(slope) cos_slope = xp.cos(slope) else: dem_np, transform, crs = _read_dem(dem_file) dem = to_gpu(dem_np) dy, dx = xp.gradient(dem) slope = xp.arctan(xp.sqrt(dx**2 + dy**2)) aspect = xp.arctan2(dy, dx) sin_slope = xp.sin(slope) cos_slope = xp.cos(slope) azimuts = [315, 45, 225, 135] altitude = 30 hillshades = [] alt_rad = xp.radians(xp.array(altitude)) sin_alt = xp.sin(alt_rad) cos_alt = xp.cos(alt_rad) for az in azimuts: az_rad = xp.radians(xp.array(az)) hs = sin_alt * sin_slope + cos_alt * cos_slope * xp.cos(az_rad - aspect) hillshades.append(xp.clip(hs, 0, 1)) combined_hillshade = xp.mean(xp.array(hillshades), axis=0) slope_shaded = cos_slope combined = 0.7 * combined_hillshade + 0.3 * slope_shaded nan_mask = shared.nan_mask if shared else np.isnan(to_cpu(dem_np)) _save_tif(output, to_cpu(combined), transform, crs, nan_mask=nan_mask) logger.info(f" ✓ Hillshade terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur hillshade: {e}", exc_info=True) return None def generate_slope(dem_file, basename, vis_dir, resolution, shared=None): """Generate slope map (degrees) — GPU if available.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Pente (Slope){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_slope.tif" try: if shared: transform = shared.transform crs = shared.crs slope = shared.slope_deg nan_mask = shared.nan_mask if HAS_GPU: slope = to_gpu(slope) else: dem_np, transform, crs = _read_dem(dem_file) dem = to_gpu(dem_np) dy, dx = xp.gradient(dem) slope = xp.arctan(xp.sqrt(dx**2 + dy**2)) * 180 / xp.pi nan_mask = np.isnan(dem_np) _save_tif(output, to_cpu(slope) if HAS_GPU else slope, transform, crs, nan_mask=nan_mask) logger.info(f" ✓ Pente terminée ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur slope: {e}", exc_info=True) return None def generate_aspect(dem_file, basename, vis_dir, resolution, shared=None): """Generate aspect (slope orientation) map — GPU if available.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Aspect (Orientation){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_aspect.tif" try: if shared: transform = shared.transform crs = shared.crs aspect = shared.aspect nan_mask = shared.nan_mask if HAS_GPU: aspect = to_gpu(aspect) else: dem_np, transform, crs = _read_dem(dem_file) dem = to_gpu(dem_np) dy, dx = xp.gradient(dem) aspect = xp.arctan2(dy, dx) * 180 / xp.pi aspect = xp.mod(aspect, 360) nan_mask = np.isnan(dem_np) _save_tif(output, to_cpu(aspect) if HAS_GPU else aspect, transform, crs, nan_mask=nan_mask) logger.info(f" ✓ Aspect terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur aspect: {e}", exc_info=True) return None def generate_curvature(dem_file, basename, vis_dir, resolution, shared=None): """Generate curvature (terrain concavity/convexity) map — GPU if available.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Courbure (Curvature){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_curvature.tif" try: if shared: transform = shared.transform crs = shared.crs dx = shared.dx dy = shared.dy nan_mask = shared.nan_mask if HAS_GPU: dx = to_gpu(dx) dy = to_gpu(dy) else: dem_np, transform, crs = _read_dem(dem_file) dem = to_gpu(dem_np) dy, dx = xp.gradient(dem) nan_mask = np.isnan(dem_np) d2z_dx2 = xp.gradient(dx, axis=1) d2z_dy2 = xp.gradient(dy, axis=0) curvature = (d2z_dx2 + d2z_dy2) / 2 _save_tif(output, to_cpu(curvature), transform, crs, nan_mask=nan_mask) logger.info(f" ✓ Courbure terminée ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur curvature: {e}", exc_info=True) return None # ============================================================ # GPU-accelerated visualizations # ============================================================ def generate_lrm(dem_file, basename, vis_dir, resolution, shared=None): """Local Relief Model - deviation from local mean (GPU if available).""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Local Relief Model{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_lrm.tif" try: if shared: transform = shared.transform crs = shared.crs lrm = shared.lrm_15.copy() else: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) local_mean = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=15/resolution) lrm = dem_np - local_mean lrm[nan_mask] = np.nan _save_tif(output, lrm.astype(np.float32), transform, crs) logger.info(f" ✓ LRM terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur LRM: {e}", exc_info=True) return None def generate_svf(dem_file, basename, vis_dir, resolution, shared=None): """Sky-View Factor - ray-tracing on 16 azimuths (GPU if available). For each pixel, trace rays in N directions, find the max horizon angle in each direction, then SVF = (1/N) * sum(cos²(horizon_angle)). Valleys/crevices have low SVF (obstructed sky), ridges/peaks have high SVF. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Sky-View Factor (ray-tracing){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_svf.tif" try: if shared: transform = shared.transform crs = shared.crs dem_np = shared.dem_np rows, cols = dem_np.shape res = resolution dem = to_gpu(shared.filled) if HAS_GPU else shared.filled nan_mask = shared.nan_mask else: dem_np, transform, crs = _read_dem(dem_file) rows, cols = dem_np.shape res = resolution nan_mask = np.isnan(dem_np) filled, _ = _fill_nans(dem_np) dem = to_gpu(filled) if HAS_GPU else filled n_dirs = 16 angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False) dx_dir = np.cos(angles) dy_dir = np.sin(angles) max_dist = int(50 / res) padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan) svf = xp.zeros_like(dem) for d_idx in range(n_dirs): ddx, ddy = dx_dir[d_idx], dy_dir[d_idx] horizon = xp.zeros_like(dem) # Pre-compute all valid steps for this direction valid_steps = [] for step in range(1, max_dist + 1): px = int(round(ddx * step)) py = int(round(ddy * step)) dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2) if dist_m < res * 0.5: continue valid_steps.append((step, px, py, dist_m)) # Batch all shifts into a single array for vectorized max computation for step, px, py, dist_m in valid_steps: elev_diff = padded[max_dist + py:max_dist + py + rows, max_dist + px:max_dist + px + cols] - dem angle = xp.arctan2(elev_diff, dist_m) horizon = xp.where(xp.isnan(angle), horizon, xp.maximum(horizon, xp.nan_to_num(angle, nan=0))) svf += xp.cos(xp.pi / 2 - horizon) ** 2 svf /= n_dirs svf_np = to_cpu(svf).astype(np.float32) svf_np[nan_mask] = np.nan _save_tif(output, svf_np, transform, crs) logger.info(f" ✓ SVF terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur SVF: {e}", exc_info=True) return None def generate_openness(dem_file, basename, vis_dir, resolution, positive=True, shared=None): """Positive/Negative Openness - true zenith/nadir angle computation (GPU if available). For each pixel, in 8 directions (N, NE, E, SE, S, SW, W, NW): - Positive openness: max zenith angle (angle from vertical to highest visible terrain) - Negative openness: max nadir angle (angle from vertical down to lowest terrain) Result is averaged across all 8 directions. """ name = "positive_openness" if positive else "negative_openness" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → {name.replace('_', ' ').title()} (ray-tracing){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_{name}.tif" try: if shared: transform = shared.transform crs = shared.crs dem_np = shared.dem_np rows, cols = dem_np.shape res = resolution dem = to_gpu(shared.filled) if HAS_GPU else shared.filled nan_mask = shared.nan_mask else: dem_np, transform, crs = _read_dem(dem_file) rows, cols = dem_np.shape res = resolution nan_mask = np.isnan(dem_np) filled, _ = _fill_nans(dem_np) dem = to_gpu(filled) if HAS_GPU else filled n_dirs = 8 angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False) dx_dir = np.cos(angles) dy_dir = np.sin(angles) max_dist = int(50 / res) padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan) openness_sum = xp.zeros_like(dem) for d_idx in range(n_dirs): ddx, ddy = dx_dir[d_idx], dy_dir[d_idx] max_angle = xp.zeros_like(dem) for step in range(1, max_dist + 1): px = int(round(ddx * step)) py = int(round(ddy * step)) dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2) if dist_m < res * 0.5: continue elev_diff = padded[max_dist + py:max_dist + py + rows, max_dist + px:max_dist + px + cols] - dem if positive: angle = xp.arctan2(xp.maximum(elev_diff, 0), dist_m) else: angle = xp.arctan2(xp.maximum(-elev_diff, 0), dist_m) max_angle = xp.where(xp.isnan(angle), max_angle, xp.maximum(max_angle, xp.nan_to_num(angle, nan=0))) openness_sum += max_angle openness_result = to_cpu(xp.degrees(openness_sum / n_dirs)).astype(np.float32) openness_result[nan_mask] = np.nan _save_tif(output, openness_result, transform, crs) logger.info(f" ✓ {name} terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur openness: {e}", exc_info=True) return None def _compute_openness_both(dem, resolution, nan_mask, n_dirs=8, radius=50): """Compute positive and negative openness in one ray-tracing pass. Traces rays in n_dirs directions up to radius pixels, measuring: - positive openness: max angle above horizontal to visible terrain - negative openness: max angle below horizontal to visible terrain Returns (pos_open, neg_open) as float32 arrays in degrees. NaN mask is applied after computation. """ rows, cols = dem.shape res = resolution max_dist = int(radius / res) angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False) dx_dir = np.cos(angles) dy_dir = np.sin(angles) padded = np.pad(dem, max_dist, mode='constant', constant_values=np.nanmax(dem[~nan_mask]) + 10000 if np.any(~nan_mask) else 0) pos_sum = np.zeros_like(dem) neg_sum = np.zeros_like(dem) for d_idx in range(n_dirs): ddx, ddy = dx_dir[d_idx], dy_dir[d_idx] max_pos_angle = np.zeros_like(dem) max_neg_angle = np.zeros_like(dem) for step in range(1, max_dist + 1): px = int(round(ddx * step)) py = int(round(ddy * step)) dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2) if dist_m < res * 0.5: continue elev_diff = padded[max_dist + py:max_dist + py + rows, max_dist + px:max_dist + px + cols] - dem pos_angle = np.arctan2(np.maximum(elev_diff, 0), dist_m) neg_angle = np.arctan2(np.maximum(-elev_diff, 0), dist_m) valid = ~np.isnan(elev_diff) max_pos_angle[valid] = np.maximum(max_pos_angle[valid], pos_angle[valid]) max_neg_angle[valid] = np.maximum(max_neg_angle[valid], neg_angle[valid]) pos_sum += max_pos_angle neg_sum += max_neg_angle pos_open = np.degrees(pos_sum / n_dirs).astype(np.float32) neg_open = np.degrees(neg_sum / n_dirs).astype(np.float32) pos_open[nan_mask] = np.nan neg_open[nan_mask] = np.nan return pos_open, neg_open def generate_rrim(dem_file, basename, vis_dir, resolution, shared=None, n_dirs=8, radius=50, pmin=2, pmax=98, contrast=1.5): """Red Relief Image Map — RGB composite for archaeological prospection. Combines slope, positive openness, and negative openness into a single false-color image where: Red = positive openness (ridges, elevated features) Green = inverted slope (flat = bright, steep = dark) Blue = negative openness (depressions, ditches) Each channel is normalized via percentiles and enhanced with a gamma curve. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → RRIM (Red Relief Image){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_rrim.tif" try: if shared: transform = shared.transform crs = shared.crs dem_np = shared.dem_np nan_mask = shared.nan_mask slope_rad = shared.slope_rad dem_for_ray = to_gpu(shared.filled) if HAS_GPU else shared.filled else: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) filled, _ = _fill_nans(dem_np) dem_for_ray = to_gpu(filled) if HAS_GPU else filled dy, dx = np.gradient(filled, resolution) slope_rad = np.arctan(np.sqrt(dx**2 + dy**2)) # Compute both openness values (ray-tracing on filled DEM) pos_open, neg_open = _compute_openness_both( to_cpu(dem_for_ray) if HAS_GPU else dem_for_ray, resolution, nan_mask, n_dirs=n_dirs, radius=radius ) # Normalize each component to [0, 1] using percentiles slope_deg = np.degrees(slope_rad) slope_deg[nan_mask] = np.nan def _normalize(arr, lo, hi): valid = arr[~np.isnan(arr)] if len(valid) == 0: return np.zeros_like(arr, dtype=np.float32) vlo = np.percentile(valid, lo) vhi = np.percentile(valid, hi) if vhi - vlo < 1e-6: return np.full_like(arr, 0.5, dtype=np.float32) norm = np.clip((arr - vlo) / (vhi - vlo), 0, 1) # Apply gamma for contrast norm = np.power(norm, 1.0 / contrast) return norm.astype(np.float32) r = _normalize(pos_open, pmin, pmax) # Red: positive openness (ridges) g = _normalize(90 - slope_deg, pmin, pmax) # Green: inverted slope (flat=bright) g[nan_mask] = np.nan b = _normalize(neg_open, pmin, pmax) # Blue: negative openness (ditches) # Assemble RGB (uint8) rgb = np.stack([r, g, b], axis=0) # (3, H, W) rgb = np.nan_to_num(rgb, nan=0.0) rgb_uint8 = (np.clip(rgb, 0, 1) * 255).astype(np.uint8) _save_tif(output, rgb_uint8, transform, crs, dtype='uint8', count=3) logger.info(f" ✓ RRIM terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur RRIM: {e}", exc_info=True) return None def generate_multi_hillshade(dem_file, basename, vis_dir, resolution, shared=None, azimuths=(315, 135, 45), altitude=30, blend_slope=0.3): """Multi-directional hillshade RGB composite — 3 azimuths mapped to R/G/B. Each azimuth produces a hillshade mapped to a color channel: Red = azimuth 315° (NW illumination) Green = azimuth 135° (SE illumination) Blue = azimuth 45° (NE illumination) Shadow direction reveals structure orientation through color. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Hillshade Composite RGB{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_multi_hillshade.tif" try: if shared: transform = shared.transform crs = shared.crs nan_mask = shared.nan_mask slope_rad = to_gpu(shared.slope_rad) if HAS_GPU else shared.slope_rad aspect = to_gpu(shared.aspect) if HAS_GPU else shared.aspect else: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) filled, _ = _fill_nans(dem_np) dem = to_gpu(filled) if HAS_GPU else filled dy, dx = xp.gradient(dem, resolution) slope_rad = xp.arctan(xp.sqrt(dx**2 + dy**2)) aspect = xp.arctan2(dy, dx) alt_rad = xp.radians(xp.array(altitude)) sin_alt = xp.sin(alt_rad) cos_alt = xp.cos(alt_rad) cos_slope = xp.cos(slope_rad) channels = [] for az in azimuths: az_rad = xp.radians(xp.array(az)) hs = sin_alt * xp.sin(slope_rad) + cos_alt * cos_slope * xp.cos(az_rad - aspect) blended = (1 - blend_slope) * xp.clip(hs, 0, 1) + blend_slope * cos_slope channels.append(to_cpu(blended).astype(np.float32)) gpu_cleanup() # Assemble RGB (uint8) rgb = np.stack(channels, axis=0) # (3, H, W) rgb[:, nan_mask] = 0.0 rgb_uint8 = (np.clip(rgb, 0, 1) * 255).astype(np.uint8) _save_tif(output, rgb_uint8, transform, crs, dtype='uint8', count=3) logger.info(f" ✓ Hillshade Composite RGB terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur multi_hillshade: {e}", exc_info=True) return None def generate_local_dominance(dem_file, basename, vis_dir, resolution, shared=None, radius=15, pmin=2, pmax=98): """Local Dominance — proportion of neighborhood below center point. LD = (dem - local_min) / (local_max - local_min + epsilon) High values = locally dominant (peak, ridge) Low values = locally recessed (valley, pit) Uses minimum/maximum filters on the filled DEM, then restores NaN mask. Complements openness by measuring local height position rather than angular extent. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Dominance Locale (rayon {radius}m){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_local_dominance.tif" try: if shared: transform = shared.transform crs = shared.crs nan_mask = shared.nan_mask dem_np = shared.dem_np else: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) radius_px = max(1, int(radius / resolution)) if radius_px % 2 == 0: radius_px += 1 local_min = _filter_nanaware_from_filled( shared, xp_minimum_filter, size=radius_px ) if shared else _filter_nanaware( dem_np, xp_minimum_filter, size=radius_px ) local_max_data = _filter_nanaware_from_filled( shared, xp_maximum_filter, size=radius_px ) if shared else _filter_nanaware( dem_np, xp_maximum_filter, size=radius_px ) # Local dominance ratio epsilon = 0.01 # Avoid division by zero on flat terrain local_range = local_max_data - local_min + epsilon dominance = (dem_np - local_min) / local_range dominance = np.clip(dominance, 0, 1) dominance[nan_mask] = np.nan _save_tif(output, dominance.astype(np.float32), transform, crs, nan_mask=nan_mask) logger.info(f" ✓ Dominance Locale terminée ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur local_dominance: {e}", exc_info=True) return None def generate_mslrm(dem_file, basename, vis_dir, resolution, shared=None): """Multi-Scale Relief Model (MSRM) - LRM at 5 scales combined (GPU if available).""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Multi-Scale Relief Model (MSRM){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_mslrm.tif" try: if shared: transform = shared.transform crs = shared.crs dem_np = shared.dem_np nan_mask = shared.nan_mask else: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) sigmas = [5, 10, 25, 50, 100] lrm_stack = [] for sigma in sigmas: sigma_px = sigma / resolution if shared: local_mean = _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=sigma_px) else: local_mean = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_px) lrm = dem_np - local_mean lrm[nan_mask] = np.nan valid_lrm = lrm[~nan_mask] lrm_std = max(np.nanstd(valid_lrm), 0.01) if len(valid_lrm) > 0 else 0.01 lrm_norm = lrm / lrm_std lrm_stack.append(lrm_norm.astype(np.float32)) lrm_array = np.array(lrm_stack) with np.errstate(invalid='ignore', divide='ignore'): with warnings.catch_warnings(): warnings.filterwarnings('ignore', message='Mean of empty slice') mslrm = np.sqrt(np.nanmean(lrm_array ** 2, axis=0)) mslrm[nan_mask] = np.nan _save_tif(output, mslrm.astype(np.float32), transform, crs) logger.info(f" ✓ MSRM terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur MSRM: {e}", exc_info=True) return None def generate_tpi(dem_file, basename, vis_dir, resolution, shared=None): """Multi-Scale Topographic Position Index (GPU if available). TPI = elevation - mean(neighborhood). Computed at fine (5m) and broad (100m) scales. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → TPI multi-échelle{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_tpi.tif" try: if shared: transform = shared.transform crs = shared.crs dem_np = shared.dem_np nan_mask = shared.nan_mask else: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) fine_size = int(5 / resolution) if fine_size % 2 == 0: fine_size += 1 if shared: fine_mean = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=fine_size) else: fine_mean = _filter_nanaware(dem_np, xp_uniform_filter, size=fine_size) tpi_fine = dem_np - fine_mean tpi_fine[nan_mask] = np.nan broad_size = int(100 / resolution) if broad_size % 2 == 0: broad_size += 1 if shared: broad_mean = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=broad_size) else: broad_mean = _filter_nanaware(dem_np, xp_uniform_filter, size=broad_size) tpi_broad = dem_np - broad_mean tpi_broad[nan_mask] = np.nan fine_std = max(np.nanstd(tpi_fine[~nan_mask]), 0.01) if np.any(~nan_mask) else 0.01 broad_std = max(np.nanstd(tpi_broad[~nan_mask]), 0.01) if np.any(~nan_mask) else 0.01 tpi_combined = 0.6 * (tpi_fine / fine_std) + 0.4 * (tpi_broad / broad_std) tpi_combined[nan_mask] = np.nan _save_tif(output, tpi_combined.astype(np.float32), transform, crs) logger.info(f" ✓ TPI terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur TPI: {e}", exc_info=True) return None # ============================================================ # SAILORE # ============================================================ def generate_sailore(dem_file, basename, vis_dir, resolution, shared=None): """SAILORE - Self-Adaptive Improved Local Relief Model (GPU if available). Kernel size adapts to local slope: flat areas get larger kernels, steep areas get smaller kernels. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → SAILORE (LRM adaptatif){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_sailore.tif" try: if shared: transform = shared.transform crs = shared.crs dem_np = shared.dem_np nan_mask = shared.nan_mask slope_deg = shared.slope_deg else: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) gy, gx = np.gradient(dem_np, resolution) slope = np.arctan(np.sqrt(gx**2 + gy**2)) slope_deg = np.degrees(slope) slope_deg[nan_mask] = np.nan sigma_min = 2.0 / resolution sigma_max = 25.0 / resolution slope_norm = np.clip(slope_deg / 30.0, 0, 1) if shared: lrm_fine = dem_np - _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=sigma_min) else: lrm_fine = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_min) lrm_fine[nan_mask] = np.nan if shared: lrm_medium = dem_np - _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=(sigma_min + sigma_max) / 2) else: lrm_medium = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=(sigma_min + sigma_max) / 2) lrm_medium[nan_mask] = np.nan if shared: lrm_coarse = dem_np - _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=sigma_max) else: lrm_coarse = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_max) lrm_coarse[nan_mask] = np.nan w_fine = slope_norm w_medium = 1 - 2 * np.abs(slope_norm - 0.5) w_coarse = 1 - slope_norm w_total = w_fine + w_medium + w_coarse w_total[w_total == 0] = 1 sailore = (w_fine * lrm_fine + w_medium * lrm_medium + w_coarse * lrm_coarse) / w_total sailore[nan_mask] = np.nan _save_tif(output, sailore.astype(np.float32), transform, crs) logger.info(f" ✓ SAILORE terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur SAILORE: {e}", exc_info=True) return None # ============================================================ # Roughness # ============================================================ def generate_roughness(dem_file, basename, vis_dir, resolution, shared=None): """Surface roughness - standard deviation of elevation in a window (GPU-accelerated).""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Rugosité de surface{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_roughness.tif" try: if shared: transform = shared.transform crs = shared.crs dem_np = shared.dem_np nan_mask = shared.nan_mask else: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) window_size = int(5 / resolution) if window_size % 2 == 0: window_size += 1 # Vectorized std: sqrt(E[X²] - (E[X])²) via uniform_filter (NaN-aware) if shared: local_mean = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=window_size) # For local_mean_sq, we need to filter filled², not filled local_mean_sq = _filter_nanaware(shared.filled.astype(np.float64)**2, xp_uniform_filter, size=window_size) local_mean_sq[shared.nan_mask] = np.nan else: local_mean = _filter_nanaware(dem_np.astype(np.float64), xp_uniform_filter, size=window_size) local_mean_sq = _filter_nanaware(dem_np.astype(np.float64)**2, xp_uniform_filter, size=window_size) roughness = np.sqrt(np.maximum(local_mean_sq - local_mean * local_mean, 0)) roughness[nan_mask] = np.nan roughness = to_cpu(roughness) _save_tif(output, roughness, transform, crs) logger.info(f" ✓ Rugosité terminée ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur rugosité: {e}", exc_info=True) return None # ============================================================ # Anomalies # ============================================================ def generate_anomalies(dem_file, basename, vis_dir, resolution, shared=None): """Statistical anomaly detection - z-score of local relief + Local Moran's I — GPU if available.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Détection anomalies statistiques{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_anomalies.tif" try: if shared: transform = shared.transform crs = shared.crs dem_np = shared.dem_np nan_mask = shared.nan_mask lrm = shared.lrm_15.copy() else: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) lrm_mean_val = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=15 / resolution) lrm = dem_np - lrm_mean_val lrm[nan_mask] = np.nan valid_lrm = lrm[~nan_mask] lrm_mean = np.nanmean(valid_lrm) if len(valid_lrm) > 0 else 0 lrm_std = max(np.nanstd(valid_lrm), 0.01) if len(valid_lrm) > 0 else 0.01 z_score = (lrm - lrm_mean) / lrm_std window = int(10 / resolution) if window % 2 == 0: window += 1 if shared: local_mean = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=window) else: local_mean = _filter_nanaware(z_score, xp_uniform_filter, size=window) z_mean = np.nanmean(valid_lrm) if len(valid_lrm) > 0 else 0 z_std = max(np.nanstd(z_score[~nan_mask]), 0.01) if np.any(~nan_mask) else 0.01 morans_i = z_score * (local_mean - z_mean) / z_std anomaly_score = np.abs(z_score) * np.sign(morans_i) anomaly_score[nan_mask] = np.nan _save_tif(output, anomaly_score.astype(np.float32), transform, crs) logger.info(f" ✓ Anomalies terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur anomalies: {e}", exc_info=True) return None # ============================================================ # Wavelet # ============================================================ def generate_wavelet(dem_file, basename, vis_dir, resolution, shared=None): """Mexican Hat wavelet multi-scale analysis (GPU if available). CWT 2D at multiple scales to detect circular features. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Ondelette Mexican Hat multi-échelle{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_wavelet.tif" try: if shared: transform = shared.transform crs = shared.crs dem_np = shared.dem_np nan_mask = shared.nan_mask filled = shared.filled.astype(np.float64) else: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) filled, _ = _fill_nans(dem_np.astype(np.float64)) scales = [2, 5, 10, 20, 50] wavelet_stack = [] for scale_m in scales: sigma_px = scale_m / resolution if HAS_GPU: from cupyx.scipy.ndimage import gaussian_laplace as gpu_gaussian_laplace response = -gpu_gaussian_laplace(to_gpu(filled), sigma=sigma_px) response = to_cpu(response) else: from scipy.ndimage import gaussian_laplace response = -gaussian_laplace(filled, sigma=sigma_px) response[nan_mask] = np.nan valid = response[~nan_mask] std_val = max(np.nanstd(valid), 0.01) if len(valid) > 0 else 0.01 response = response / std_val wavelet_stack.append(response) with np.errstate(invalid='ignore', divide='ignore'): with warnings.catch_warnings(): warnings.filterwarnings('ignore', message='Mean of empty slice') combined = np.sqrt(np.nanmean(np.array(wavelet_stack) ** 2, axis=0)) combined[nan_mask] = np.nan _save_tif(output, combined.astype(np.float32), transform, crs) logger.info(f" ✓ Ondelette terminée ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur ondelette: {e}", exc_info=True) return None # ============================================================ # Flow accumulation # ============================================================ def _d8_accumulate_numba(flow_dir, nodata_mask, rows, cols): """JIT-compiled D8 flow accumulation loop. Uses numba for ~100x speedup over pure Python loop. Falls back to pure Python if numba is unavailable. """ try: from numba import njit @njit(cache=True) def _accumulate(flow_dir, nodata_mask, rows, cols): dx8 = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int8) dy8 = np.array([0, 1, 1, 1, 0, -1, -1, -1], dtype=np.int8) flow_acc = np.ones((rows, cols), dtype=np.float32) # Sort cells by elevation (high to low) — walk downhill # We use the fact that flow_dir already encodes steepest descent # Process from highest to lowest elevation for r in range(rows): for c in range(cols): if nodata_mask[r, c]: flow_acc[r, c] = 0.0 continue # Iterative accumulation: process cells in top-down order # Multiple passes until convergence for _pass in range(10): changed = 0 for r in range(rows): for c in range(cols): if nodata_mask[r, c]: continue d = flow_dir[r, c] if d < 0: continue nr = r + dy8[d] nc = c + dx8[d] if 0 <= nr < rows and 0 <= nc < cols and not nodata_mask[nr, nc]: old_acc = flow_acc[nr, nc] flow_acc[nr, nc] += flow_acc[r, c] if flow_acc[nr, nc] != old_acc: changed += 1 if changed == 0: break return flow_acc return _accumulate(flow_dir, nodata_mask, rows, cols) except ImportError: # Fallback: pure Python return None def _priority_flood(dem, nodata_mask): """Priority-flood algorithm for sink filling (Wang & Liu 2006). O(n log n) compared to 50 iterations of minimum_filter. Fills pits so water can flow downhill. """ import heapq rows, cols = dem.shape filled = dem.copy() closed = nodata_mask.copy() open_queue = [] # Initialize border cells for r in range(rows): for c in [0, cols - 1]: if not closed[r, c]: heapq.heappush(open_queue, (filled[r, c], r, c)) closed[r, c] = True for c in range(1, cols - 1): for r in [0, rows - 1]: if not closed[r, c]: heapq.heappush(open_queue, (filled[r, c], r, c)) closed[r, c] = True dx8 = [1, 1, 0, -1, -1, -1, 0, 1] dy8 = [0, 1, 1, 1, 0, -1, -1, -1] while open_queue: elev, r, c = heapq.heappop(open_queue) for d in range(8): nr, nc = r + dy8[d], c + dx8[d] if 0 <= nr < rows and 0 <= nc < cols and not closed[nr, nc]: if filled[nr, nc] < elev: filled[nr, nc] = elev # Fill the pit closed[nr, nc] = True heapq.heappush(open_queue, (filled[nr, nc], nr, nc)) return filled def generate_flow(dem_file, basename, vis_dir, resolution, shared=None): """Flow accumulation using D8 algorithm — priority-flood sink filling, accumulation via numba.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Accumulation de flux D8{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_flow.tif" try: if shared: transform = shared.transform crs = shared.crs dem_np = shared.dem_np nodata_mask = shared.nan_mask else: dem_np, transform, crs = _read_dem(dem_file) nodata_mask = np.isnan(dem_np) rows, cols = dem_np.shape # Sink filling — priority-flood (O(n log n), faster than 50× minimum_filter) dem_filled_np = _priority_flood(dem_np, nodata_mask) # D8 slope — vectorized dx8 = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int32) dy8 = np.array([0, 1, 1, 1, 0, -1, -1, -1], dtype=np.int32) dist8 = np.array([1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2)]) flow_dir = np.full((rows, cols), -1, dtype=np.int8) max_slope = np.zeros((rows, cols), dtype=np.float64) padded = np.pad(dem_filled_np, 1, mode='constant', constant_values=np.nanmax(dem_filled_np[~np.isnan(dem_filled_np)]) + 10000) for d in range(8): nx = 1 + dx8[d] ny = 1 + dy8[d] neighbor_elev = padded[ny:ny + rows, nx:nx + cols] slope = (dem_filled_np - neighbor_elev) / (dist8[d] * resolution) slope[nodata_mask] = -1 better = slope > max_slope flow_dir[better] = d max_slope[better] = slope[better] # D8 accumulation — try numba first, fallback to Python result = _d8_accumulate_numba(flow_dir, nodata_mask.astype(np.bool_), rows, cols) if result is not None: flow_acc = result logger.info(f" Accumulation D8 via numba") else: # Pure Python fallback (slow for large DEMs) logger.info(f" Accumulation D8 via Python (installez numba pour accélérer)") flat_dem = dem_filled_np[~nodata_mask].flatten() valid_indices = np.where(~nodata_mask.flatten())[0] sort_order = valid_indices[np.argsort(-flat_dem)] flow_acc = np.ones((rows, cols), dtype=np.float32) flow_acc[nodata_mask] = 0 for idx in sort_order: r, c = divmod(idx, cols) d = flow_dir[r, c] if d < 0: continue nr, nc = r + dy8[d], c + dx8[d] if 0 <= nr < rows and 0 <= nc < cols and not nodata_mask[nr, nc]: flow_acc[nr, nc] += flow_acc[r, c] flow_log = np.log1p(flow_acc) _save_tif(output, flow_log, transform, crs) logger.info(f" ✓ Flux terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur flux: {e}", exc_info=True) return None