"""Terrain visualization functions for LiDAR archaeological analysis. Each function takes (dem_file, basename, vis_dir, resolution) as explicit parameters and returns the path to the output GeoTIFF file, or None on error. """ import logging import time import warnings from pathlib import Path import numpy as np import rasterio from scipy.ndimage import generic_filter from scipy.stats import binned_statistic_2d from .gpu import HAS_GPU, to_gpu, to_cpu, xp_gaussian_filter, xp_uniform_filter, xp_minimum_filter logger = logging.getLogger("lidar") # Use CuPy array module when available if HAS_GPU: import cupy as cp xp = cp else: xp = np def _save_tif(output_path, data, transform, crs, dtype='float32', count=1, nodata=None): """Helper to save a 2D or 3D array as GeoTIFF.""" # Auto-detect nodata for float types with NaN if nodata is None and dtype.startswith('float') and np.any(np.isnan(data)): nodata = float('nan') if data.ndim == 2: height, width = data.shape with rasterio.open( output_path, 'w', driver='GTiff', height=height, width=width, count=count, dtype=dtype, crs=crs, transform=transform, compress='lzw', nodata=nodata ) as dst: dst.write(data.astype(dtype), 1) elif data.ndim == 3: bands, height, width = data.shape with rasterio.open( output_path, 'w', driver='GTiff', height=height, width=width, count=bands, dtype=dtype, crs=crs, transform=transform, compress='lzw', nodata=nodata ) as dst: for i in range(bands): dst.write(data[i].astype(dtype), i + 1) def _read_dem(dem_file): """Read DEM file and return (data, transform, crs).""" with rasterio.open(dem_file) as src: return src.read(1), src.transform, src.crs def _fill_nans(arr): """Fill NaN values using nearest-neighbor interpolation. Returns (filled_array, nan_mask) so the caller can restore NaN after filtering. """ from scipy.interpolate import NearestNDInterpolator nan_mask = np.isnan(arr) if not np.any(nan_mask): return arr, nan_mask valid = ~nan_mask y_coords, x_coords = np.where(valid) if len(y_coords) == 0: return np.zeros_like(arr), nan_mask z_values = arr[valid] interp = NearestNDInterpolator( np.column_stack((y_coords, x_coords)), z_values ) y_missing, x_missing = np.where(nan_mask) filled = arr.copy() filled[y_missing, x_missing] = interp(y_missing, x_missing) return filled, nan_mask def _filter_nanaware(arr, filter_func, *args, use_gpu=True, **kwargs): """Apply a filter to an array while preserving NaN zones. 1. Fill NaN with nearest-neighbor interpolation 2. Apply the filter 3. Restore original NaN mask on the result Args: arr: Input array (numpy or cupy). filter_func: Function that takes (array, *args, **kwargs) and returns filtered array. use_gpu: If True, apply filter on GPU (send filled array to GPU first). Returns: Filtered array with original NaN positions preserved. """ is_gpu_arr = HAS_GPU and isinstance(arr, cp.ndarray) arr_np = to_cpu(arr) if is_gpu_arr else arr filled, nan_mask = _fill_nans(arr_np) if use_gpu and HAS_GPU: filled_gpu = to_gpu(filled) result_gpu = filter_func(filled_gpu, *args, **kwargs) result = to_cpu(result_gpu) else: result = filter_func(filled, *args, **kwargs) result[nan_mask] = np.nan return result # ============================================================ # Core terrain visualizations # ============================================================ def generate_hillshade(dem_file, basename, vis_dir, resolution): """Generate multi-directional hillshade (NW, NE, SW, SE) — GPU if available.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Hillshade multidirectionnel{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_hillshade_multi.tif" try: dem_np, transform, crs = _read_dem(dem_file) dem = to_gpu(dem_np) dy, dx = xp.gradient(dem) azimuts = [315, 45, 225, 135] altitude = 30 hillshades = [] slope = xp.arctan(xp.sqrt(dx**2 + dy**2)) aspect = xp.arctan2(dy, dx) sin_slope = xp.sin(slope) cos_slope = xp.cos(slope) alt_rad = xp.radians(xp.array(altitude)) sin_alt = xp.sin(alt_rad) cos_alt = xp.cos(alt_rad) for az in azimuts: az_rad = xp.radians(xp.array(az)) hs = sin_alt * sin_slope + cos_alt * cos_slope * xp.cos(az_rad - aspect) hillshades.append(xp.clip(hs, 0, 1)) combined = xp.mean(xp.array(hillshades), axis=0) _save_tif(output, to_cpu(combined), transform, crs) logger.info(f" ✓ Hillshade terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur hillshade: {e}", exc_info=True) return None def generate_slope(dem_file, basename, vis_dir, resolution): """Generate slope map (degrees) — GPU if available.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Pente (Slope){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_slope.tif" try: dem_np, transform, crs = _read_dem(dem_file) dem = to_gpu(dem_np) dy, dx = xp.gradient(dem) slope = xp.arctan(xp.sqrt(dx**2 + dy**2)) * 180 / xp.pi _save_tif(output, to_cpu(slope), transform, crs) logger.info(f" ✓ Pente terminée ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur slope: {e}", exc_info=True) return None def generate_aspect(dem_file, basename, vis_dir, resolution): """Generate aspect (slope orientation) map — GPU if available.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Aspect (Orientation){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_aspect.tif" try: dem_np, transform, crs = _read_dem(dem_file) dem = to_gpu(dem_np) dy, dx = xp.gradient(dem) aspect = xp.arctan2(dy, dx) * 180 / xp.pi aspect = xp.mod(aspect, 360) _save_tif(output, to_cpu(aspect), transform, crs) logger.info(f" ✓ Aspect terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur aspect: {e}", exc_info=True) return None def generate_curvature(dem_file, basename, vis_dir, resolution): """Generate curvature (terrain concavity/convexity) map — GPU if available.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Courbure (Curvature){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_curvature.tif" try: dem_np, transform, crs = _read_dem(dem_file) dem = to_gpu(dem_np) dz_dx = xp.gradient(dem, axis=1) dz_dy = xp.gradient(dem, axis=0) d2z_dx2 = xp.gradient(dz_dx, axis=1) d2z_dy2 = xp.gradient(dz_dy, axis=0) curvature = (d2z_dx2 + d2z_dy2) / 2 _save_tif(output, to_cpu(curvature), transform, crs) logger.info(f" ✓ Courbure terminée ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur curvature: {e}", exc_info=True) return None # ============================================================ # GPU-accelerated visualizations # ============================================================ def generate_lrm(dem_file, basename, vis_dir, resolution): """Local Relief Model - deviation from local mean (GPU if available).""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Local Relief Model{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_lrm.tif" try: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) local_mean = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=15/resolution) lrm = dem_np - local_mean lrm[nan_mask] = np.nan _save_tif(output, lrm.astype(np.float32), transform, crs) logger.info(f" ✓ LRM terminé ({time.time()-t0:.1f}s){gpu_tag}") return output logger.info(f" ✓ LRM terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur LRM: {e}", exc_info=True) return None def generate_svf(dem_file, basename, vis_dir, resolution): """Sky-View Factor - ray-tracing on 16 azimuths (GPU if available). For each pixel, trace rays in N directions, find the max horizon angle in each direction, then SVF = (1/N) * sum(cos²(horizon_angle)). Valleys/crevices have low SVF (obstructed sky), ridges/peaks have high SVF. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Sky-View Factor (ray-tracing){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_svf.tif" try: dem_np, transform, crs = _read_dem(dem_file) rows, cols = dem_np.shape res = resolution dem = to_gpu(dem_np) n_dirs = 16 angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False) dx = np.cos(angles) dy = np.sin(angles) max_dist = int(50 / res) padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan) svf = xp.zeros_like(dem) for d_idx in range(n_dirs): ddx, ddy = dx[d_idx], dy[d_idx] horizon = xp.zeros_like(dem) for step in range(1, max_dist + 1): px = int(round(ddx * step)) py = int(round(ddy * step)) dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2) if dist_m < res * 0.5: continue elev_diff = padded[max_dist + py:max_dist + py + rows, max_dist + px:max_dist + px + cols] - dem angle = xp.arctan2(elev_diff, dist_m) horizon = xp.where(xp.isnan(angle), horizon, xp.maximum(horizon, xp.nan_to_num(angle, nan=0))) svf += xp.cos(xp.pi / 2 - horizon) ** 2 svf /= n_dirs svf_np = to_cpu(svf).astype(np.float32) _save_tif(output, svf_np, transform, crs) logger.info(f" ✓ SVF terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur SVF: {e}", exc_info=True) return None def generate_openness(dem_file, basename, vis_dir, resolution, positive=True): """Positive/Negative Openness - true zenith/nadir angle computation (GPU if available). For each pixel, in 8 directions (N, NE, E, SE, S, SW, W, NW): - Positive openness: max zenith angle (angle from vertical to highest visible terrain) - Negative openness: max nadir angle (angle from vertical down to lowest terrain) Result is averaged across all 8 directions. """ name = "positive_openness" if positive else "negative_openness" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → {name.replace('_', ' ').title()} (ray-tracing){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_{name}.tif" try: dem_np, transform, crs = _read_dem(dem_file) rows, cols = dem_np.shape res = resolution dem = to_gpu(dem_np) n_dirs = 8 angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False) dx = np.cos(angles) dy = np.sin(angles) max_dist = int(50 / res) padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan) openness_sum = xp.zeros_like(dem) for d_idx in range(n_dirs): ddx, ddy = dx[d_idx], dy[d_idx] max_angle = xp.zeros_like(dem) for step in range(1, max_dist + 1): px = int(round(ddx * step)) py = int(round(ddy * step)) dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2) if dist_m < res * 0.5: continue elev_diff = padded[max_dist + py:max_dist + py + rows, max_dist + px:max_dist + px + cols] - dem if positive: angle = xp.arctan2(xp.maximum(elev_diff, 0), dist_m) else: angle = xp.arctan2(xp.maximum(-elev_diff, 0), dist_m) max_angle = xp.where(xp.isnan(angle), max_angle, xp.maximum(max_angle, xp.nan_to_num(angle, nan=0))) openness_sum += max_angle openness_result = to_cpu(xp.degrees(openness_sum / n_dirs)).astype(np.float32) _save_tif(output, openness_result, transform, crs) logger.info(f" ✓ {name} terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur openness: {e}", exc_info=True) return None def generate_mslrm(dem_file, basename, vis_dir, resolution): """Multi-Scale Relief Model (MSRM) - LRM at 5 scales combined (GPU if available).""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Multi-Scale Relief Model (MSRM){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_mslrm.tif" try: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) sigmas = [5, 10, 25, 50, 100] lrm_stack = [] for sigma in sigmas: sigma_px = sigma / resolution local_mean = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_px) lrm = dem_np - local_mean lrm[nan_mask] = np.nan valid_lrm = lrm[~nan_mask] lrm_std = max(np.nanstd(valid_lrm), 0.01) if len(valid_lrm) > 0 else 0.01 lrm_norm = lrm / lrm_std lrm_stack.append(lrm_norm.astype(np.float32)) lrm_array = np.array(lrm_stack) with np.errstate(invalid='ignore', divide='ignore'): with warnings.catch_warnings(): warnings.filterwarnings('ignore', message='Mean of empty slice') mslrm = np.sqrt(np.nanmean(lrm_array ** 2, axis=0)) mslrm[nan_mask] = np.nan _save_tif(output, mslrm.astype(np.float32), transform, crs) logger.info(f" ✓ MSRM terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur MSRM: {e}", exc_info=True) return None def generate_tpi(dem_file, basename, vis_dir, resolution): """Multi-Scale Topographic Position Index (GPU if available). TPI = elevation - mean(neighborhood). Computed at fine (5m) and broad (100m) scales. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → TPI multi-échelle{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_tpi.tif" try: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) fine_size = int(5 / resolution) if fine_size % 2 == 0: fine_size += 1 fine_mean = _filter_nanaware(dem_np, xp_uniform_filter, size=fine_size) tpi_fine = dem_np - fine_mean tpi_fine[nan_mask] = np.nan broad_size = int(100 / resolution) if broad_size % 2 == 0: broad_size += 1 broad_mean = _filter_nanaware(dem_np, xp_uniform_filter, size=broad_size) tpi_broad = dem_np - broad_mean tpi_broad[nan_mask] = np.nan fine_std = max(np.nanstd(tpi_fine[~nan_mask]), 0.01) if np.any(~nan_mask) else 0.01 broad_std = max(np.nanstd(tpi_broad[~nan_mask]), 0.01) if np.any(~nan_mask) else 0.01 tpi_combined = 0.6 * (tpi_fine / fine_std) + 0.4 * (tpi_broad / broad_std) tpi_combined[nan_mask] = np.nan _save_tif(output, tpi_combined.astype(np.float32), transform, crs) logger.info(f" ✓ TPI terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur TPI: {e}", exc_info=True) return None # ============================================================ # Depression / hydrology # ============================================================ def generate_depressions(dem_file, basename, vis_dir, resolution): """Depression detection using hydrological sink filling — GPU if available.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Détection dépressions (hydrologique){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_depressions.tif" try: dem_np, transform, crs = _read_dem(dem_file) dem = to_gpu(dem_np) from scipy.ndimage import generate_binary_structure struct = generate_binary_structure(2, 2) dem_filled = xp.copy(dem) nodata_mask = xp.isnan(dem_filled) dem_filled[nodata_mask] = xp.nanmax(dem) + 1000 changed = True iterations = 0 max_iter = 100 while changed and iterations < max_iter: neighbor_min = xp_minimum_filter(dem_filled, footprint=struct) sinks = (dem_filled < neighbor_min) & ~nodata_mask if not xp.any(sinks): break new_dem = xp.maximum(dem_filled, neighbor_min) new_dem[nodata_mask] = xp.nan changed = bool(xp.any(new_dem != dem_filled)) dem_filled = new_dem iterations += 1 depressions = to_cpu(dem_filled - dem) depressions[to_cpu(nodata_mask)] = np.nan depressions = np.where(depressions > 0.01, depressions, 0) _save_tif(output, depressions, transform, crs) logger.info(f" ✓ Dépressions terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur dépressions: {e}", exc_info=True) return None # ============================================================ # SAILORE # ============================================================ def generate_sailore(dem_file, basename, vis_dir, resolution): """SAILORE - Self-Adaptive Improved Local Relief Model (GPU if available). Kernel size adapts to local slope: flat areas get larger kernels, steep areas get smaller kernels. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → SAILORE (LRM adaptatif){gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_sailore.tif" try: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) gy, gx = np.gradient(dem_np, resolution) slope = np.arctan(np.sqrt(gx**2 + gy**2)) slope_deg = np.degrees(slope) slope_deg[nan_mask] = np.nan sigma_min = 2.0 / resolution sigma_max = 25.0 / resolution slope_norm = np.clip(slope_deg / 30.0, 0, 1) lrm_fine = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_min) lrm_fine[nan_mask] = np.nan lrm_medium = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=(sigma_min + sigma_max) / 2) lrm_medium[nan_mask] = np.nan lrm_coarse = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_max) lrm_coarse[nan_mask] = np.nan w_fine = slope_norm w_medium = 1 - 2 * np.abs(slope_norm - 0.5) w_coarse = 1 - slope_norm w_total = w_fine + w_medium + w_coarse w_total[w_total == 0] = 1 sailore = (w_fine * lrm_fine + w_medium * lrm_medium + w_coarse * lrm_coarse) / w_total sailore[nan_mask] = np.nan _save_tif(output, sailore.astype(np.float32), transform, crs) logger.info(f" ✓ SAILORE terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur SAILORE: {e}", exc_info=True) return None # ============================================================ # Roughness # ============================================================ def generate_roughness(dem_file, basename, vis_dir, resolution): """Surface roughness - standard deviation of elevation in a window (GPU-accelerated).""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Rugosité de surface{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_roughness.tif" try: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) window_size = int(5 / resolution) if window_size % 2 == 0: window_size += 1 # Vectorized std: sqrt(E[X²] - (E[X])²) via uniform_filter (NaN-aware) local_mean = _filter_nanaware(dem_np.astype(np.float64), xp_uniform_filter, size=window_size) local_mean_sq = _filter_nanaware(dem_np.astype(np.float64)**2, xp_uniform_filter, size=window_size) roughness = np.sqrt(np.maximum(local_mean_sq - local_mean * local_mean, 0)) roughness[nan_mask] = np.nan roughness = to_cpu(roughness) _save_tif(output, roughness, transform, crs) logger.info(f" ✓ Rugosité terminée ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur rugosité: {e}", exc_info=True) return None # ============================================================ # Anomalies # ============================================================ def generate_anomalies(dem_file, basename, vis_dir, resolution): """Statistical anomaly detection - z-score of local relief + Local Moran's I — GPU if available.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Détection anomalies statistiques{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_anomalies.tif" try: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) lrm_mean_val = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=15 / resolution) lrm = dem_np - lrm_mean_val lrm[nan_mask] = np.nan valid_lrm = lrm[~nan_mask] lrm_mean = np.nanmean(valid_lrm) if len(valid_lrm) > 0 else 0 lrm_std = max(np.nanstd(valid_lrm), 0.01) if len(valid_lrm) > 0 else 0.01 z_score = (lrm - lrm_mean) / lrm_std window = int(10 / resolution) if window % 2 == 0: window += 1 local_mean = _filter_nanaware(z_score, xp_uniform_filter, size=window) z_mean = np.nanmean(valid_lrm) if len(valid_lrm) > 0 else 0 z_std = max(np.nanstd(z_score[~nan_mask]), 0.01) if np.any(~nan_mask) else 0.01 morans_i = z_score * (local_mean - z_mean) / z_std anomaly_score = np.abs(z_score) * np.sign(morans_i) anomaly_score[nan_mask] = np.nan _save_tif(output, anomaly_score.astype(np.float32), transform, crs) logger.info(f" ✓ Anomalies terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur anomalies: {e}", exc_info=True) return None # ============================================================ # Wavelet # ============================================================ def generate_wavelet(dem_file, basename, vis_dir, resolution): """Mexican Hat wavelet multi-scale analysis (GPU if available). CWT 2D at multiple scales to detect circular features. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Ondelette Mexican Hat multi-échelle{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_wavelet.tif" try: dem_np, transform, crs = _read_dem(dem_file) nan_mask = np.isnan(dem_np) scales = [2, 5, 10, 20, 50] wavelet_stack = [] for scale_m in scales: sigma_px = scale_m / resolution filled, _ = _fill_nans(dem_np.astype(np.float64)) if HAS_GPU: from cupyx.scipy.ndimage import gaussian_laplace as gpu_gaussian_laplace response = -gpu_gaussian_laplace(to_gpu(filled), sigma=sigma_px) response = to_cpu(response) else: from scipy.ndimage import gaussian_laplace response = -gaussian_laplace(filled, sigma=sigma_px) response[nan_mask] = np.nan valid = response[~nan_mask] std_val = max(np.nanstd(valid), 0.01) if len(valid) > 0 else 0.01 response = response / std_val wavelet_stack.append(response) with np.errstate(invalid='ignore', divide='ignore'): with warnings.catch_warnings(): warnings.filterwarnings('ignore', message='Mean of empty slice') combined = np.sqrt(np.nanmean(np.array(wavelet_stack) ** 2, axis=0)) combined[nan_mask] = np.nan _save_tif(output, combined.astype(np.float32), transform, crs) logger.info(f" ✓ Ondelette terminée ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur ondelette: {e}", exc_info=True) return None # ============================================================ # Flow accumulation # ============================================================ def generate_flow(dem_file, basename, vis_dir, resolution): """Flow accumulation using D8 algorithm — sink filling on GPU, accumulation on CPU.""" gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Accumulation de flux D8{gpu_tag}...") t0 = time.time() output = vis_dir / f"{basename}_flow.tif" try: dem_np, transform, crs = _read_dem(dem_file) rows, cols = dem_np.shape nodata_mask = np.isnan(dem_np) # Sink filling — GPU-accelerated dem_gpu = to_gpu(dem_np) nodata_mask_gpu = xp.isnan(dem_gpu) dem_filled = xp.copy(dem_gpu) dem_filled[nodata_mask_gpu] = xp.nanmax(dem_gpu) + 1000 from scipy.ndimage import generate_binary_structure struct = generate_binary_structure(2, 2) for _ in range(50): neighbor_min = xp_minimum_filter(dem_filled, footprint=struct) sinks = (dem_filled < neighbor_min) & ~nodata_mask_gpu if not xp.any(sinks): break dem_filled = xp.where(sinks, neighbor_min, dem_filled) dem_filled[nodata_mask_gpu] = xp.nan dem_filled_np = to_cpu(dem_filled) # D8 slope + accumulation — CPU (sequential by nature) dx8 = [1, 1, 0, -1, -1, -1, 0, 1] dy8 = [0, 1, 1, 1, 0, -1, -1, -1] dist8 = [1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2)] flow_dir = np.full((rows, cols), -1, dtype=np.int8) max_slope = np.zeros((rows, cols), dtype=np.float64) padded = np.pad(dem_filled_np, 1, mode='constant', constant_values=np.nanmax(dem_filled_np[~np.isnan(dem_filled_np)]) + 10000) for d in range(8): nx = 1 + dx8[d] ny = 1 + dy8[d] neighbor_elev = padded[ny:ny + rows, nx:nx + cols] slope = (dem_filled_np - neighbor_elev) / (dist8[d] * resolution) slope[nodata_mask] = -1 better = slope > max_slope flow_dir[better] = d max_slope[better] = slope[better] flat_dem = dem_filled_np[~nodata_mask].flatten() valid_indices = np.where(~nodata_mask.flatten())[0] sort_order = valid_indices[np.argsort(-flat_dem)] flow_acc = np.ones((rows, cols), dtype=np.float32) flow_acc[nodata_mask] = 0 for idx in sort_order: r, c = divmod(idx, cols) d = flow_dir[r, c] if d < 0: continue nr, nc = r + dy8[d], c + dx8[d] if 0 <= nr < rows and 0 <= nc < cols and not nodata_mask[nr, nc]: flow_acc[nr, nc] += flow_acc[r, c] flow_log = np.log1p(flow_acc) _save_tif(output, flow_log, transform, crs) logger.info(f" ✓ Flux terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: logger.error(f" ✗ Erreur flux: {e}", exc_info=True) return None