diff --git a/Dockerfile b/Dockerfile index 3c18100..024b195 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,8 +30,7 @@ RUN pip3 install --no-cache-dir \ # Copy scripts COPY process_lidar.py /usr/local/bin/ -COPY semantic_classifier.py /usr/local/bin/ -RUN chmod +x /usr/local/bin/process_lidar.py /usr/local/bin/semantic_classifier.py +RUN chmod +x /usr/local/bin/process_lidar.py # Create directories with correct permissions RUN mkdir -p /data/output /data/input && \ diff --git a/process_lidar.py b/process_lidar.py index 08b048e..16ea815 100755 --- a/process_lidar.py +++ b/process_lidar.py @@ -48,9 +48,8 @@ class LidarArchaeoPipeline: self.dtm_dir = self.output_dir / "DTM" self.vis_dir = self.output_dir / "visualisations" - self.report_dir = self.output_dir / "rapports" - for d in [self.dtm_dir, self.vis_dir, self.report_dir]: + for d in [self.dtm_dir, self.vis_dir]: d.mkdir(exist_ok=True) print(f"✓ Pipeline initialisé (Python pur)") @@ -110,7 +109,53 @@ class LidarArchaeoPipeline: print(f" ✓ Classification déjà effectuée") return output_las - pipeline_json = self.create_pipeline_json(laz_file, output_las) + # First, try to fix ReturnNumber values if needed + fixed_las = self.temp_dir / f"{laz_file.stem}_fixed.las" + + # Create preprocessing pipeline to fix ReturnNumber=0 issue using expression + fix_pipeline = [ + { + "type": "readers.las", + "filename": str(laz_file) + }, + { + "type": "filters.expression", + "expression": "ReturnNumber = MAX(ReturnNumber, 1)" + }, + { + "type": "filters.expression", + "expression": "NumberOfReturns = MAX(NumberOfReturns, 1)" + }, + { + "type": "writers.las", + "filename": str(fixed_las), + "extra_dims": "all" + } + ] + + try: + # Try with fixed data first + fix_json = json.dumps(fix_pipeline) + fix_pipe_file = self.temp_dir / "fix_pipeline.json" + with open(fix_pipe_file, 'w') as f: + f.write(fix_json) + + import subprocess + result = subprocess.run(['pdal', 'pipeline', str(fix_pipe_file)], + capture_output=True, text=True) + if result.returncode == 0: + print(f" → Correction ReturnNumber effectuée") + input_for_smrf = fixed_las + else: + print(f" → Erreur correction, utilisation fichier original") + input_for_smrf = laz_file + if fixed_las.exists(): + fixed_las.unlink() + except Exception as e: + print(f" → Erreur prétraitement: {e}") + input_for_smrf = laz_file + + pipeline_json = self.create_pipeline_json(input_for_smrf, output_las) pipeline_file = self.temp_dir / "pipeline.json" with open(pipeline_file, 'w') as f: @@ -670,8 +715,8 @@ class LidarArchaeoPipeline: # Create figure with white background for JPEG fig, ax = plt.subplots(figsize=(18, 12), facecolor='white') - # Display data - im = ax.imshow(data, cmap=cmap, aspect='equal') + # Display data with north at the top + im = ax.imshow(data, cmap=cmap, aspect='equal', origin='upper') # Enhanced colorbar with explicit values cbar = plt.colorbar(im, ax=ax, pad=0.03, shrink=0.75, aspect=30) @@ -691,11 +736,42 @@ class LidarArchaeoPipeline: verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8)) ax.axis('off') + + # Add vector north arrow above the colorbar on the right side (black lines) + from matplotlib.patches import FancyArrow, Polygon + from mpl_toolkits.axes_grid1.inset_locator import inset_axes + + # Create inset axes positioned above the colorbar on the right + # The colorbar is typically at the right, so we place north arrow above it + north_ax = inset_axes(ax, width="5%", height="8%", loc='upper right', + bbox_to_anchor=(-0.02, 0.15, 1, 1), bbox_transform=ax.transAxes) + + north_ax.set_xlim(0, 1) + north_ax.set_ylim(0, 1) + north_ax.axis('off') + + # Draw vector arrow pointing north (black lines) + # Main arrow shaft + north_ax.plot([0.5, 0.5], [0.15, 0.65], color='black', linewidth=2.5, zorder=10) + # Arrow head (triangle outline) + arrow_head_outline = [[0.5, 0.25], [0.35, 0.45], [0.5, 0.65], [0.65, 0.45]] + for i in range(len(arrow_head_outline) - 1): + north_ax.plot([arrow_head_outline[i][0], arrow_head_outline[i+1][0]], + [arrow_head_outline[i][1], arrow_head_outline[i+1][1]], + color='black', linewidth=2.5, zorder=10) + # Fill the arrow head + north_ax.add_patch(Polygon([[0.5, 0.25], [0.35, 0.45], [0.5, 0.65], [0.65, 0.45]], + closed=True, facecolor='black', edgecolor='black', zorder=9)) + + # Add "N" label above the arrow + north_ax.text(0.5, 0.92, 'N', ha='center', va='top', + fontsize=14, fontweight='bold', color='black', zorder=11) + fig.patch.set_facecolor('white') plt.tight_layout() # Save as JPEG - plt.savefig(jpg_file, dpi=150, bbox_inches='tight', facecolor='white', format='jpg') + plt.savefig(jpg_file, dpi=150, bbox_inches='tight', pad_inches=0.1, facecolor='white', format='jpg') plt.close() # Delete the source TIFF file to save space @@ -735,94 +811,8 @@ class LidarArchaeoPipeline: return vis_results - def create_overview(self, basename): - """Create overview image with all visualizations (JPEG)""" - patterns = { - 'Hillshade': '*_hillshade_multi.jpg', - 'Pente': '*_slope.jpg', - 'Aspect': '*_aspect.jpg', - 'Courbure': '*_curvature.jpg', - 'Éclairage': '*_solar.jpg', - 'Sky-View Factor': '*_svf.jpg', - 'Local Relief': '*_lrm.jpg', - 'Pos. Openness': '*_positive_openness.jpg', - 'Neg. Openness': '*_negative_openness.jpg' - } - - images = {} - for name, pattern in patterns.items(): - files = list(self.vis_dir.glob(pattern)) - if files: - images[name] = str(files[0]) - - if len(images) < 2: - return None - - # 3x3 grid for 9 visualizations - fig, axes = plt.subplots(3, 3, figsize=(24, 18), facecolor='white') - axes = axes.flatten() - - for idx, (name, img_path) in enumerate(images.items()): - if idx >= 9: - break - img = plt.imread(img_path) - axes[idx].imshow(img) - axes[idx].set_title(name, fontsize=11, fontweight='bold') - axes[idx].axis('off') - - # Hide unused subplots - for idx in range(len(images), 9): - axes[idx].axis('off') - - # Title - fig.suptitle( - f"Analyse Archéologique LiDAR - {basename}", - fontsize=18, - fontweight='bold', - y=0.98 - ) - - plt.tight_layout() - output = self.report_dir / f"{basename}_overview.jpg" - plt.savefig(output, dpi=120, bbox_inches='tight', facecolor='white', format='jpg') - plt.close() - - return output - # ============ Complete Pipeline ============ - def run_semantic_classification(self, dtm_file, basename): - """Run semantic classification on DTM""" - print(f"\n[4/4] Classification Sémantique Automatique...") - - try: - # Import semantic classifier - from semantic_classifier import ArchaeoSemanticClassifier - - # Create output subdirectory for semantic results - semantic_dir = self.output_dir / "semantic" - semantic_dir.mkdir(exist_ok=True) - - # Run classification - classifier = ArchaeoSemanticClassifier(dtm_file, semantic_dir) - results = classifier.process(basename) - - print(f" ✓ Classification sémantique terminée") - print(f" → Carte: {results['tif'].name}") - print(f" → Visualisation: {results['jpg'].name}") - stats_file = Path(results['tif']).parent / f"{Path(results['tif']).stem.replace('_semantic', '')}_statistics.json" - print(f" → Statistiques: {stats_file.name}") - - return results - except ImportError as e: - print(f" ✗ Module non disponible: {e}") - return None - except Exception as e: - print(f" ✗ Erreur classification: {e}") - import traceback - traceback.print_exc() - return None - def process_file(self, laz_file): """Process a single LAZ file""" basename = laz_file.stem @@ -831,42 +821,23 @@ class LidarArchaeoPipeline: print(f"{'='*60}") # Step 1: Ground classification - print(f"\n[1/4] Classification du sol...") + print(f"\n[1/3] Classification du sol...") las_file = self.classify_ground(laz_file) if not las_file: print(f" ✗ Échec classification") return False # Step 2: Generate DTM - print(f"\n[2/4] Génération DTM...") + print(f"\n[2/3] Génération DTM...") dtm_file = self.create_dtm_fast(las_file, basename) if not dtm_file: print(f" ✗ Échec DTM") return False # Step 3: Visualizations - print(f"\n[3/4] Visualisations archéologiques...") + print(f"\n[3/3] Visualisations archéologiques...") vis = self.generate_all_visualizations(dtm_file, basename) - # Overview - overview = self.create_overview(basename) - if overview: - print(f"\n ✓ Vue synthétique: {overview}") - - # Step 4: Semantic Classification (NEW!) - semantic_results = self.run_semantic_classification(dtm_file, basename) - - print(f"\n✓ {basename} traité avec succès !") - return True - - # Overview - overview = self.create_overview(basename) - if overview: - print(f"\n ✓ Vue synthétique: {overview}") - - # Step 4: Semantic Classification (NEW!) - semantic_results = self.run_semantic_classification(dtm_file, basename) - print(f"\n✓ {basename} traité avec succès !") return True @@ -908,7 +879,6 @@ class LidarArchaeoPipeline: print(f"\nRésultats dans: {self.output_dir}") print(f" • DTM: {self.dtm_dir}") print(f" • Visualisations JPEG: {self.vis_dir}") - print(f" • Rapports: {self.report_dir}") # Clean up temporary files to save space print(f"\nNettoyage des fichiers temporaires...") diff --git a/semantic_classifier.py b/semantic_classifier.py deleted file mode 100644 index ab8ee94..0000000 --- a/semantic_classifier.py +++ /dev/null @@ -1,293 +0,0 @@ -#!/usr/bin/env python3 -""" -Module de Classification Sémantique Simplifié pour LiDAR Archéologique -Approche robuste avec K-Means pour classification automatique -""" - -import numpy as np -import rasterio -from rasterio.transform import from_bounds -from sklearn.cluster import KMeans -from scipy import ndimage -import json -from pathlib import Path -import matplotlib.pyplot as plt -import matplotlib.colors as mcolors - - -class ArchaeoSemanticClassifier: - """Classification sémantique automatique robuste""" - - def __init__(self, dtm_file, output_dir): - self.dtm_file = Path(dtm_file) - self.output_dir = Path(output_dir) - self.output_dir.mkdir(parents=True, exist_ok=True) - - # Load DTM - with rasterio.open(self.dtm_file) as src: - self.dem = src.read(1) - self.transform = src.transform - self.crs = src.crs - self.height, self.width = self.dem.shape - - print(f"✓ Classifieur initialisé (DTM: {self.width}x{self.height})") - - def extract_features_simple(self): - """Extraction simplifiée des caractéristiques""" - print(" → Extraction caractéristiques...") - - # Calculate gradients - dy, dx = np.gradient(self.dem) - - # Slope - slope = np.arctan(np.sqrt(dx**2 + dy**2)) * 180 / np.pi - - # Aspect - aspect = np.arctan2(-dy, dx) * 180 / np.pi - aspect = np.mod(aspect, 360) - - # Curvature - dz_dx = np.gradient(dx, axis=1) - dz_dy = np.gradient(dy, axis=0) - curvature = (dz_dx + dz_dy) / 2 - - # Local Relief - from scipy.ndimage import uniform_filter - local_mean = uniform_filter(self.dem, size=int(15/0.5)) - local_relief = self.dem - local_mean - - return { - 'elevation': self.dem, - 'slope': slope, - 'aspect': aspect, - 'curvature': curvature, - 'local_relief': local_relief - } - - def classify_kmeans(self, n_clusters=6): - """Classification K-Means robuste""" - print(" → Classification K-Means...") - - features = self.extract_features_simple() - - # Normalize each feature to 0-1 - normalized_features = {} - for name, data in features.items(): - min_val = np.percentile(data, 2) - max_val = np.percentile(data, 98) - normalized = np.clip((data - min_val) / (max_val - min_val + 1e-6), 0, 1) - normalized_features[name] = normalized - - # Stack features for clustering - feature_stack = np.stack([ - normalized_features['elevation'].flatten(), - normalized_features['slope'].flatten(), - normalized_features['curvature'].flatten(), - normalized_features['local_relief'].flatten() - ], axis=1) - - # Remove NaN values - valid_mask = ~np.isnan(feature_stack).any(axis=1) - feature_stack = feature_stack[valid_mask] - - # K-Means clustering with random_state for reproducibility - kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10, max_iter=300) - labels_flat = kmeans.fit_predict(feature_stack) - - # Create full resolution labels - full_labels = np.zeros(self.height * self.width, dtype=int) - full_indices = np.where(valid_mask)[0] - full_labels[full_indices] = labels_flat + 1 # +1 to shift from 0-based - full_labels = full_labels.reshape(self.height, self.width) - - # Interpret clusters - self._interpret_clusters(kmeans.cluster_centers_, features) - - return full_labels - - def _interpret_clusters(self, centers, features): - """Interprète les clusters selon les centroïdes""" - interpretations = {} - - # Centers are in normalized feature space - # Features order: elevation, slope, curvature, local_relief - for i, center in enumerate(centers): - elev = center[0] - slope = center[1] - curve = center[2] - relief = center[3] - - # Interpret based on feature values - if relief > 0.7: - category = "ÉLEVÉE (Tumulus possible)" - elif relief < -0.3: - category = "ENFONCÉ (Fossé, cavité)" - elif abs(curve) > 0.5: - if curve > 0: - category = "CONVEX (Bosse, monticule)" - else: - category = "CONCAVE (Creux, dépression)" - elif slope > 0.6: - category = "PENTE FORTE (Talus, mur)" - elif slope < 0.2 and elev < 0.3: - category = "PLAT (Zone plane)" - else: - category = "TOPOGRAPHIE MIXTE" - - interpretations[i + 1] = category # +1 because labels are 1-indexed - - print(f" Interprétation des {len(centers)} clusters :") - for i, interp in interpretations.items(): - print(f" Classe {i}: {interp}") - - def generate_semantic_map(self, labels): - """Génère une carte sémantique colorée""" - print(" → Génération carte sémantique...") - - # Create color map for semantic classes - # 0: Background, 1-6: Different semantic classes - colors = { - 0: [200, 200, 200], # Gray: Background/Unknown - 1: [139, 69, 19], # Brown: Linear/Walls - 2: [128, 0, 128], # Purple: Circular/Mounds - 3: [255, 140, 0], # Orange: Elevated - 4: [0, 200, 255], # Cyan: Depressed - 5: [220, 220, 0], # Yellow: Slope - 6: [0, 128, 0] # Green: Vegetation/Natural - } - - # Create RGB image - rgb = np.zeros((self.height, self.width, 3), dtype=np.uint8) - - for class_id, color in colors.items(): - mask = labels == class_id - for c in range(3): - rgb[:, :, c][mask] = color[c] - - return rgb - - def process(self, basename): - """Pipeline complet de classification sémantique""" - print(f"\n{'='*60}") - print(f" CLASSIFICATION SÉMANTIQUE - {basename}") - print(f"{'='*60}") - - # Run K-Means classification - labels = self.classify_kmeans(n_clusters=6) - - # Generate semantic map - rgb = self.generate_semantic_map(labels) - - # Save semantic classification - output_tif = self.output_dir / f"{basename}_semantic.tif" - with rasterio.open( - output_tif, - 'w', - driver='GTiff', - height=self.height, - width=self.width, - count=1, - dtype='uint8', - crs=self.crs, - transform=self.transform, - compress='lzw' - ) as dst: - dst.write(labels, 1) - - # Save visualization - output_jpg = self.output_dir / f"{basename}_semantic.jpg" - plt.figure(figsize=(16, 12), facecolor='white') - plt.imshow(rgb) - - # Create legend - legend_elements = [ - plt.Rectangle((0, 0), 1, 1, facecolor=np.array(c)/255, edgecolor='black', label=label) - for label, c in [ - ("Inconnu/Fond", [200, 200, 200]), - ("Linéaire (murs)", [139, 69, 19]), - ("Circulaire (tumulus)", [128, 0, 128]), - ("Élevé (monticules)", [255, 140, 0]), - ("Enfoncé (fossés)", [0, 200, 255]), - ("Pente forte (talus)", [220, 220, 0]), - ("Naturel", [0, 128, 0]) - ] - ] - - plt.legend(handles=legend_elements, loc='upper right', fontsize=11) - plt.title(f"Classification Sémantique LiDAR - {basename}\n", - fontsize=14, fontweight='bold', pad=15) - plt.axis('off') - plt.tight_layout() - plt.savefig(output_jpg, dpi=150, bbox_inches='tight', format='jpg') - plt.close() - - # Generate statistics - stats = self._generate_stats(labels, basename) - - print(f"\n✓ Classification terminée !") - print(f" • Carte sémantique: {output_tif.name}") - print(f" • Visualisation: {output_jpg.name}") - print(f" • Statistiques: {self.output_dir / f'{basename}_statistics.json'}") - - return { - 'labels': labels, - 'tif': output_tif, - 'jpg': output_jpg, - 'stats': stats - } - - def _generate_stats(self, labels, basename): - """Génère les statistiques de classification""" - print(" → Génération statistiques...") - - total_pixels = labels.size - stats = {} - class_names = { - 0: "Inconnu/Fond", - 1: "Linéaire", - 2: "Circulaire", - 3: "Élevée", - 4: "Enfoncée", - 5: "Pente forte", - 6: "Naturel" - } - - for class_id in range(7): - count = np.sum(labels == class_id) - percentage = (count / total_pixels) * 100 - if count > 0: - stats[class_id] = { - 'name': class_names[class_id], - 'count': int(count), - 'percentage': float(percentage) - } - - # Save as JSON - stats_file = self.output_dir / f"{basename}_statistics.json" - with open(stats_file, 'w') as f: - json.dump(stats, f, indent=2, default=lambda x: float(x) if isinstance(x, (np.floating, np.integer)) else x) - - # Print summary - print(f"\n 📊 Statistiques :") - for class_id, info in stats.items(): - print(f" {info['name']}: {info['count']:.0f} px ({info['percentage']:.1f}%)") - - return stats - - -def main(): - import sys - - if len(sys.argv) < 2: - print("Usage: python semantic_classifier.py [output_dir]") - sys.exit(1) - - dtm_file = sys.argv[1] - output_dir = sys.argv[2] if len(sys.argv) > 2 else "semantic_output" - - classifier = ArchaeoSemanticClassifier(dtm_file, output_dir) - classifier.process(Path(dtm_file).stem) - - -if __name__ == "__main__": - main()