From e2bd6b25363133ae6a8aab224fceaa1a184a0486 Mon Sep 17 00:00:00 2001 From: Jacquin Antoine Date: Thu, 14 May 2026 02:19:42 +0200 Subject: [PATCH] Remove RRIM and Multi-Hillshade RGB, fix DTM resolution reuse bug, add --init to docker run - Remove generate_rrim, generate_multi_hillshade, _compute_openness_both - Remove corresponding VIZ_STEPS entries, COLORMAPS, RGB_LEGENDS, and tests - Fix DTM resolution mismatch: existing DTM at different resolution is now regenerated instead of silently reused - Propagate actual DTM resolution to visualizations and rendering - Add --init to docker run commands for proper signal handling on Ctrl+C - Add .playwright-mcp/ to .gitignore Co-Authored-By: Claude Opus 4.6 --- .gitignore | 1 + lidar_pipeline/pipeline.py | 58 ++++-- lidar_pipeline/rendering.py | 12 +- lidar_pipeline/tests/test_visualizations.py | 65 ------ lidar_pipeline/visualizations.py | 212 ++------------------ run.sh | 4 +- 6 files changed, 60 insertions(+), 292 deletions(-) diff --git a/.gitignore b/.gitignore index 0871d3e..e2fde0d 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,4 @@ htmlcov/ # Éventuels fichiers de cache matplotlib matplotlibrc +.playwright-mcp/ diff --git a/lidar_pipeline/pipeline.py b/lidar_pipeline/pipeline.py index 400fdfc..bd21759 100644 --- a/lidar_pipeline/pipeline.py +++ b/lidar_pipeline/pipeline.py @@ -61,7 +61,7 @@ from .visualizations import ( generate_lrm, generate_svf, generate_openness, generate_mslrm, generate_tpi, generate_sailore, generate_roughness, generate_anomalies, generate_wavelet, - generate_flow, generate_rrim, generate_multi_hillshade, generate_local_dominance, + generate_flow, generate_local_dominance, ) from .gpu import gpu_cleanup from .ign import generate_ign_overlay @@ -87,8 +87,6 @@ VIZ_STEPS = [ ('anomalies', generate_anomalies), ('wavelet', generate_wavelet), ('flow', generate_flow), - ('rrim', lambda d, b, v, r, shared=None: generate_rrim(d, b, v, r, shared=shared)), - ('multi_hillshade', lambda d, b, v, r, shared=None: generate_multi_hillshade(d, b, v, r, shared=shared)), ('local_dominance', generate_local_dominance), ('ortho', lambda d, b, v, r: generate_ign_overlay( d, b, v, r, @@ -164,11 +162,14 @@ class LidarArchaeoPipeline: return False return True - def generate_all_visualizations(self, dtm_file, basename): + def generate_all_visualizations(self, dtm_file, basename, resolution=None): """Generate all archaeological visualizations for one DTM file. - Returns a dict of {name: tif_path} for successful generations. + Args: + resolution: Actual resolution from DTM geotransform. If None, uses self.resolution. """ + if resolution is None: + resolution = self.resolution logger.info(" Génération visualisations:") # Create per-file subdirectory @@ -178,7 +179,7 @@ class LidarArchaeoPipeline: # Pre-compute shared DEM data (gradient, NaN mask, LRM) once for all visualizations logger.info(" Pré-calcul données partagées (gradient, LRM)...") t_shared = time.time() - shared = SharedDEM(dtm_file, self.resolution) + shared = SharedDEM(dtm_file, resolution) logger.info(f" ✓ Données partagées prêtes ({time.time()-t_shared:.1f}s)") vis_results = {} @@ -225,9 +226,9 @@ class LidarArchaeoPipeline: try: # IGN overlays don't use SharedDEM (they download external data) if name in ('ortho', 'topo'): - result = func(dtm_file, basename, file_vis_dir, self.resolution) + result = func(dtm_file, basename, file_vis_dir, resolution) else: - result = func(dtm_file, basename, file_vis_dir, self.resolution, shared=shared) + result = func(dtm_file, basename, file_vis_dir, resolution, shared=shared) vis_results[name] = result elapsed = time.time() - t0 if result: @@ -250,7 +251,7 @@ class LidarArchaeoPipeline: } for name, tif_file in vis_results.items(): if tif_file and isinstance(tif_file, Path) and tif_file.suffix == '.tif' and tif_file.exists(): - webp_file = tif_to_png(tif_file, file_vis_dir, self.resolution, keep_tif=self.keep_tif, source_info=source_info) + webp_file = tif_to_png(tif_file, file_vis_dir, resolution, keep_tif=self.keep_tif, source_info=source_info) if webp_file: logger.info(f" ✓ {webp_file.name}") @@ -271,17 +272,30 @@ class LidarArchaeoPipeline: logger.info(f"FICHIER : {basename}") logger.info("=" * 60) - # Skip ground classification + DTM if DTM already exists + # Skip ground classification + DTM if DTM already exists with matching resolution # --force only affects visualizations/PDF, not classification/DTM # Use --force-classification to force reclassification dtm_path = self.dtm_dir / f"{basename}_dtm.tif" if dtm_path.exists(): - logger.info("[1/5] Classification du sol — sautée (DTM existant)") - logger.info("[2/5] Génération DTM — sautée (DTM existant)") - dtm_file = dtm_path - t_classif = 0 - t_dtm = 0 - else: + # Check that existing DTM resolution matches requested resolution + import rasterio + try: + with rasterio.open(dtm_path) as src: + existing_res = abs(src.transform.a) + if abs(existing_res - self.resolution) > 0.01: + logger.info(f"[1/5] DTM existant à {existing_res}m/px — résolution demandée {self.resolution}m/px → régénération") + dtm_path.unlink() + else: + logger.info(f"[1/5] Classification du sol — sautée (DTM existant à {existing_res}m/px)") + logger.info("[2/5] Génération DTM — sautée (DTM existant)") + dtm_file = dtm_path + t_classif = 0 + t_dtm = 0 + except Exception: + logger.warning(f"Impossible de lire le DTM existant — régénération") + dtm_path.unlink() + + if not dtm_path.exists(): # Step 1: Ground classification logger.info("[1/5] Classification du sol...") t1 = time.time() @@ -302,9 +316,13 @@ class LidarArchaeoPipeline: return False logger.info(f" ✓ DTM terminé ({t_dtm:.1f}s)") - # Step 3: Visualizations - logger.info("[3/5] Visualisations archéologiques...") - self.generate_all_visualizations(dtm_file, basename) + # Step 3: Visualizations — use actual resolution from DTM + import rasterio + with rasterio.open(dtm_file) as src: + actual_res = abs(src.transform.a) + if abs(actual_res - self.resolution) > 0.01: + logger.info(f" Résolution DTM: {actual_res}m/px (demandée: {self.resolution}m/px)") + self.generate_all_visualizations(dtm_file, basename, actual_res) # Step 4: PDF report t_pdf = 0 @@ -315,7 +333,7 @@ class LidarArchaeoPipeline: else: logger.info("[4/5] Rapport PDF A3...") t4 = time.time() - generate_pdf_report(basename, file_vis_dir, self.pdf_dir, self.resolution) + generate_pdf_report(basename, file_vis_dir, self.pdf_dir, actual_res) t_pdf = time.time() - t4 logger.info(f" ✓ Rapport PDF terminé ({t_pdf:.1f}s)") diff --git a/lidar_pipeline/rendering.py b/lidar_pipeline/rendering.py index 9aababa..b751cfe 100644 --- a/lidar_pipeline/rendering.py +++ b/lidar_pipeline/rendering.py @@ -178,16 +178,6 @@ RGB_LEGENDS = { 'legend': 'Carte IGN\nPlan topographique', 'description': 'Carte topographique IGN (Plan IGN)', }, - 'rrim': { - 'title': 'RRIM — Red Relief Image Map (composite RGB)', - 'legend': 'Rouge = Openness positive (crêtes, levées)\nVert = Pente inversée (plat = clair)\nBleu = Openness négative (fossés, dépressions)', - 'description': 'Composite RGB synthétique pour prospection archéologique', - }, - 'multi_hillshade': { - 'title': 'Hillshade Composite RGB (3 azimuts)', - 'legend': 'Rouge = Éclairage NW (315°)\nVert = Éclairage SE (135°)\nBleu = Éclairage NE (45°)', - 'description': 'Composite couleur révélant les structures selon leur orientation', - }, } @@ -300,7 +290,7 @@ def tif_to_png(tif_file, vis_dir, resolution, keep_tif=False, source_info=None): try: with rasterio.open(tif_file) as src: - is_rgb = src.count >= 3 and any(k in str(tif_file) for k in ('ortho', 'topo', 'rrim', 'multi_hillshade')) + is_rgb = src.count >= 3 and any(k in str(tif_file) for k in ('ortho', 'topo')) if is_rgb: data = src.read([1, 2, 3]) diff --git a/lidar_pipeline/tests/test_visualizations.py b/lidar_pipeline/tests/test_visualizations.py index b412bd0..444352d 100644 --- a/lidar_pipeline/tests/test_visualizations.py +++ b/lidar_pipeline/tests/test_visualizations.py @@ -201,71 +201,6 @@ class TestFlow: assert np.nanmin(valid) >= 0 -class TestRRIM: - def test_generates_tif(self, synthetic_dem, tmp_output_dir): - from lidar_pipeline.visualizations import generate_rrim - result = generate_rrim(synthetic_dem, "test", tmp_output_dir, 5.0) - assert result is not None - assert result.exists() - assert result.suffix == ".tif" - - def test_rrim_is_rgb_3band(self, synthetic_dem, tmp_output_dir): - import rasterio - from lidar_pipeline.visualizations import generate_rrim - result = generate_rrim(synthetic_dem, "test", tmp_output_dir, 5.0) - with rasterio.open(result) as src: - assert src.count == 3, f"Expected 3 bands, got {src.count}" - assert src.dtypes[0] == 'uint8' - - def test_rrim_values_0_255(self, synthetic_dem, tmp_output_dir): - import rasterio - from lidar_pipeline.visualizations import generate_rrim - result = generate_rrim(synthetic_dem, "test", tmp_output_dir, 5.0) - with rasterio.open(result) as src: - for band in range(1, 4): - data = src.read(band) - assert data.min() >= 0 - assert data.max() <= 255 - - def test_rrim_no_nan(self, synthetic_dem, tmp_output_dir): - """RRIM is uint8 RGB — NaN zones are set to 0 (black).""" - import rasterio - from lidar_pipeline.visualizations import generate_rrim - result = generate_rrim(synthetic_dem, "test", tmp_output_dir, 5.0) - with rasterio.open(result) as src: - # uint8 bands should not have NaN - for band in range(1, 4): - data = src.read(band) - assert not np.isnan(data).any(), f"Band {band} has NaN values" - - -class TestMultiHillshade: - def test_generates_tif(self, synthetic_dem, tmp_output_dir): - from lidar_pipeline.visualizations import generate_multi_hillshade - result = generate_multi_hillshade(synthetic_dem, "test", tmp_output_dir, 5.0) - assert result is not None - assert result.exists() - assert result.suffix == ".tif" - - def test_multi_hillshade_is_rgb_3band(self, synthetic_dem, tmp_output_dir): - import rasterio - from lidar_pipeline.visualizations import generate_multi_hillshade - result = generate_multi_hillshade(synthetic_dem, "test", tmp_output_dir, 5.0) - with rasterio.open(result) as src: - assert src.count == 3, f"Expected 3 bands, got {src.count}" - assert src.dtypes[0] == 'uint8' - - def test_multi_hillshade_values_0_255(self, synthetic_dem, tmp_output_dir): - import rasterio - from lidar_pipeline.visualizations import generate_multi_hillshade - result = generate_multi_hillshade(synthetic_dem, "test", tmp_output_dir, 5.0) - with rasterio.open(result) as src: - for band in range(1, 4): - data = src.read(band) - assert data.min() >= 0 - assert data.max() <= 255 - - class TestLocalDominance: def test_generates_tif(self, synthetic_dem, tmp_output_dir): from lidar_pipeline.visualizations import generate_local_dominance diff --git a/lidar_pipeline/visualizations.py b/lidar_pipeline/visualizations.py index 788201a..5943a16 100644 --- a/lidar_pipeline/visualizations.py +++ b/lidar_pipeline/visualizations.py @@ -201,11 +201,11 @@ def _filter_nanaware(arr, filter_func, *args, use_gpu=True, **kwargs): # ============================================================ def generate_hillshade(dem_file, basename, vis_dir, resolution, shared=None): - """Generate multi-directional hillshade with slope shading — GPU if available. + """Generate multi-directional hillshade with contrast enhancement — GPU if available. - Combines 4-direction hillshade (NW, NE, SW, SE) with slope shading - for improved micro-relief visibility on flat terrain. - Result = 0.7 * hillshade + 0.3 * cos(slope). + Combines 4-direction hillshade (NW, NE, SW, SE) with slope shading. + Applies percentile normalization and gamma correction to restore + contrast lost by averaging multiple azimuths. """ gpu_tag = " [GPU]" if HAS_GPU else "" logger.info(f" → Hillshade multidirectionnel{gpu_tag}...") @@ -249,8 +249,20 @@ def generate_hillshade(dem_file, basename, vis_dir, resolution, shared=None): slope_shaded = cos_slope combined = 0.7 * combined_hillshade + 0.3 * slope_shaded - nan_mask = shared.nan_mask if shared else np.isnan(to_cpu(dem_np)) - _save_tif(output, to_cpu(combined), transform, crs, nan_mask=nan_mask) + # Contrast enhancement: percentile stretch + gamma + # Averaging 4 azimuths flattens contrast — this restores it + combined_np = to_cpu(combined) + nan_mask = shared.nan_mask if shared else np.isnan(to_cpu(dem_np) if HAS_GPU else dem_np) + valid = combined_np[~nan_mask] + if len(valid) > 0: + p2, p98 = np.percentile(valid, 2), np.percentile(valid, 98) + if p98 - p2 > 0.01: + combined_np = np.clip((combined_np - p2) / (p98 - p2), 0, 1) + # Gamma correction to enhance shadows + gamma = 0.8 + combined_np = np.power(combined_np, gamma) + + _save_tif(output, combined_np.astype(np.float32), transform, crs, nan_mask=nan_mask) logger.info(f" ✓ Hillshade terminé ({time.time()-t0:.1f}s){gpu_tag}") return output except Exception as e: @@ -528,194 +540,6 @@ def generate_openness(dem_file, basename, vis_dir, resolution, positive=True, sh return None -def _compute_openness_both(dem, resolution, nan_mask, n_dirs=8, radius=50): - """Compute positive and negative openness in one ray-tracing pass. - - Traces rays in n_dirs directions up to radius pixels, measuring: - - positive openness: max angle above horizontal to visible terrain - - negative openness: max angle below horizontal to visible terrain - - Returns (pos_open, neg_open) as float32 arrays in degrees. - NaN mask is applied after computation. - """ - rows, cols = dem.shape - res = resolution - max_dist = int(radius / res) - - angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False) - dx_dir = np.cos(angles) - dy_dir = np.sin(angles) - - padded = np.pad(dem, max_dist, mode='constant', constant_values=np.nanmax(dem[~nan_mask]) + 10000 if np.any(~nan_mask) else 0) - - pos_sum = np.zeros_like(dem) - neg_sum = np.zeros_like(dem) - - for d_idx in range(n_dirs): - ddx, ddy = dx_dir[d_idx], dy_dir[d_idx] - max_pos_angle = np.zeros_like(dem) - max_neg_angle = np.zeros_like(dem) - - for step in range(1, max_dist + 1): - px = int(round(ddx * step)) - py = int(round(ddy * step)) - dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2) - if dist_m < res * 0.5: - continue - - elev_diff = padded[max_dist + py:max_dist + py + rows, - max_dist + px:max_dist + px + cols] - dem - - pos_angle = np.arctan2(np.maximum(elev_diff, 0), dist_m) - neg_angle = np.arctan2(np.maximum(-elev_diff, 0), dist_m) - - valid = ~np.isnan(elev_diff) - max_pos_angle[valid] = np.maximum(max_pos_angle[valid], pos_angle[valid]) - max_neg_angle[valid] = np.maximum(max_neg_angle[valid], neg_angle[valid]) - - pos_sum += max_pos_angle - neg_sum += max_neg_angle - - pos_open = np.degrees(pos_sum / n_dirs).astype(np.float32) - neg_open = np.degrees(neg_sum / n_dirs).astype(np.float32) - pos_open[nan_mask] = np.nan - neg_open[nan_mask] = np.nan - return pos_open, neg_open - - -def generate_rrim(dem_file, basename, vis_dir, resolution, shared=None, - n_dirs=8, radius=50, pmin=2, pmax=98, contrast=1.5): - """Red Relief Image Map — RGB composite for archaeological prospection. - - Combines slope, positive openness, and negative openness into a single - false-color image where: - Red = positive openness (ridges, elevated features) - Green = inverted slope (flat = bright, steep = dark) - Blue = negative openness (depressions, ditches) - - Each channel is normalized via percentiles and enhanced with a gamma curve. - """ - gpu_tag = " [GPU]" if HAS_GPU else "" - logger.info(f" → RRIM (Red Relief Image){gpu_tag}...") - t0 = time.time() - output = vis_dir / f"{basename}_rrim.tif" - - try: - if shared: - transform = shared.transform - crs = shared.crs - dem_np = shared.dem_np - nan_mask = shared.nan_mask - slope_rad = shared.slope_rad - dem_for_ray = to_gpu(shared.filled) if HAS_GPU else shared.filled - else: - dem_np, transform, crs = _read_dem(dem_file) - nan_mask = np.isnan(dem_np) - filled, _ = _fill_nans(dem_np) - dem_for_ray = to_gpu(filled) if HAS_GPU else filled - dy, dx = np.gradient(filled, resolution) - slope_rad = np.arctan(np.sqrt(dx**2 + dy**2)) - - # Compute both openness values (ray-tracing on filled DEM) - pos_open, neg_open = _compute_openness_both( - to_cpu(dem_for_ray) if HAS_GPU else dem_for_ray, - resolution, nan_mask, n_dirs=n_dirs, radius=radius - ) - - # Normalize each component to [0, 1] using percentiles - slope_deg = np.degrees(slope_rad) - slope_deg[nan_mask] = np.nan - - def _normalize(arr, lo, hi): - valid = arr[~np.isnan(arr)] - if len(valid) == 0: - return np.zeros_like(arr, dtype=np.float32) - vlo = np.percentile(valid, lo) - vhi = np.percentile(valid, hi) - if vhi - vlo < 1e-6: - return np.full_like(arr, 0.5, dtype=np.float32) - norm = np.clip((arr - vlo) / (vhi - vlo), 0, 1) - # Apply gamma for contrast - norm = np.power(norm, 1.0 / contrast) - return norm.astype(np.float32) - - r = _normalize(pos_open, pmin, pmax) # Red: positive openness (ridges) - g = _normalize(90 - slope_deg, pmin, pmax) # Green: inverted slope (flat=bright) - g[nan_mask] = np.nan - b = _normalize(neg_open, pmin, pmax) # Blue: negative openness (ditches) - - # Assemble RGB (uint8) - rgb = np.stack([r, g, b], axis=0) # (3, H, W) - rgb = np.nan_to_num(rgb, nan=0.0) - rgb_uint8 = (np.clip(rgb, 0, 1) * 255).astype(np.uint8) - - _save_tif(output, rgb_uint8, transform, crs, dtype='uint8', count=3) - logger.info(f" ✓ RRIM terminé ({time.time()-t0:.1f}s){gpu_tag}") - return output - except Exception as e: - logger.error(f" ✗ Erreur RRIM: {e}", exc_info=True) - return None - - -def generate_multi_hillshade(dem_file, basename, vis_dir, resolution, shared=None, - azimuths=(315, 135, 45), altitude=30, blend_slope=0.3): - """Multi-directional hillshade RGB composite — 3 azimuths mapped to R/G/B. - - Each azimuth produces a hillshade mapped to a color channel: - Red = azimuth 315° (NW illumination) - Green = azimuth 135° (SE illumination) - Blue = azimuth 45° (NE illumination) - - Shadow direction reveals structure orientation through color. - """ - gpu_tag = " [GPU]" if HAS_GPU else "" - logger.info(f" → Hillshade Composite RGB{gpu_tag}...") - t0 = time.time() - output = vis_dir / f"{basename}_multi_hillshade.tif" - - try: - if shared: - transform = shared.transform - crs = shared.crs - nan_mask = shared.nan_mask - slope_rad = to_gpu(shared.slope_rad) if HAS_GPU else shared.slope_rad - aspect = to_gpu(shared.aspect) if HAS_GPU else shared.aspect - else: - dem_np, transform, crs = _read_dem(dem_file) - nan_mask = np.isnan(dem_np) - filled, _ = _fill_nans(dem_np) - dem = to_gpu(filled) if HAS_GPU else filled - dy, dx = xp.gradient(dem, resolution) - slope_rad = xp.arctan(xp.sqrt(dx**2 + dy**2)) - aspect = xp.arctan2(dy, dx) - - alt_rad = xp.radians(xp.array(altitude)) - sin_alt = xp.sin(alt_rad) - cos_alt = xp.cos(alt_rad) - cos_slope = xp.cos(slope_rad) - - channels = [] - for az in azimuths: - az_rad = xp.radians(xp.array(az)) - hs = sin_alt * xp.sin(slope_rad) + cos_alt * cos_slope * xp.cos(az_rad - aspect) - blended = (1 - blend_slope) * xp.clip(hs, 0, 1) + blend_slope * cos_slope - channels.append(to_cpu(blended).astype(np.float32)) - - gpu_cleanup() - - # Assemble RGB (uint8) - rgb = np.stack(channels, axis=0) # (3, H, W) - rgb[:, nan_mask] = 0.0 - rgb_uint8 = (np.clip(rgb, 0, 1) * 255).astype(np.uint8) - - _save_tif(output, rgb_uint8, transform, crs, dtype='uint8', count=3) - logger.info(f" ✓ Hillshade Composite RGB terminé ({time.time()-t0:.1f}s){gpu_tag}") - return output - except Exception as e: - logger.error(f" ✗ Erreur multi_hillshade: {e}", exc_info=True) - return None - - def generate_local_dominance(dem_file, basename, vis_dir, resolution, shared=None, radius=15, pmin=2, pmax=98): """Local Dominance — proportion of neighborhood below center point. diff --git a/run.sh b/run.sh index 68d0bbe..fa44b63 100755 --- a/run.sh +++ b/run.sh @@ -134,7 +134,7 @@ if [[ " $* " == *" --test "* ]]; then echo "============================================" echo " Tests unitaires LiDAR Pipeline" echo "============================================" - docker run --rm $GPU_FLAG \ + docker run --rm --init $GPU_FLAG \ "$IMAGE_NAME" \ python3 -m pytest -v --pyargs lidar_pipeline.tests exit $? @@ -174,7 +174,7 @@ if [ -n "$FILE_ARGS" ]; then CMD_ARGS="$CMD_ARGS --file $FILE_ARGS" fi -docker run --rm $GPU_FLAG \ +docker run --rm --init $GPU_FLAG \ --user 1000:1000 \ -v "${INPUT_DIR}:/data/input:ro" \ -v "${OUTPUT_DIR}:/data/output" \