From 1f9af2ba20454f59e8ccb2ad8316954d83be731a Mon Sep 17 00:00:00 2001 From: Qiusheng Wu Date: Sat, 4 Nov 2023 09:03:47 -0400 Subject: [PATCH] Add GDAL and OpenFileGDB functions (#585) --- leafmap/common.py | 158 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 150 insertions(+), 8 deletions(-) diff --git a/leafmap/common.py b/leafmap/common.py index 242a491b9b..aef64fb902 100644 --- a/leafmap/common.py +++ b/leafmap/common.py @@ -3696,9 +3696,7 @@ def numpy_to_image( if np_array.dtype == np.float64 or np_array.dtype == np.float32: # Convert the array to uint8 # np_array = (np_array * 255).astype(np.uint8) - np.interp(np_array, (np_array.min(), np_array.max()), (0, 255)).astype( - np.uint8 - ) + np.interp(np_array, (np_array.min(), np_array.max()), (0, 255)).astype(np.uint8) else: # The array is already uint8 np_array = np_array @@ -11032,7 +11030,7 @@ def mbtiles_to_pmtiles( def vector_to_pmtiles( input_file: str, - output_file: str, + output_file: Optional[str] = None, layer_name: Optional[str] = None, projection: Optional[str] = "EPSG:4326", overwrite: bool = False, @@ -11069,6 +11067,9 @@ def vector_to_pmtiles( print("conda install -c conda-forge tippecanoe") return None + if output_file is None: + output_file = os.path.splitext(input_file)[0] + ".pmtiles" + if not output_file.endswith(".pmtiles"): raise ValueError("Error: output file must be a .pmtiles file.") @@ -11249,7 +11250,9 @@ def pmtiles_metadata(input_file: str) -> Dict[str, Union[str, int, List[str]]]: return metadata -def raster_to_vector(source, output, simplify_tolerance=None, dst_crs=None, open_args={}, **kwargs): +def raster_to_vector( + source, output, simplify_tolerance=None, dst_crs=None, open_args={}, **kwargs +): """Vectorize a raster dataset. Args: @@ -11431,8 +11434,8 @@ def blend_images( plt.axis(axis) plt.show() else: - return blend_img - + return blend_img + def regularize(source, output=None, crs="EPSG:4326", **kwargs): """Regularize a polygon GeoDataFrame. @@ -11462,4 +11465,143 @@ def regularize(source, output=None, crs="EPSG:4326", **kwargs): if output is not None: result.to_file(output, **kwargs) else: - return result \ No newline at end of file + return result + + +def get_gdal_drivers() -> List[str]: + """Get a list of available driver names in the GDAL library. + + Returns: + List[str]: A list of available driver names. + """ + from osgeo import ogr + + driver_list = [] + + # Iterate over all registered drivers + for i in range(ogr.GetDriverCount()): + driver = ogr.GetDriver(i) + driver_name = driver.GetName() + driver_list.append(driver_name) + + return driver_list + + +def get_gdal_file_extension(driver_name: str) -> Optional[str]: + """Get the file extension corresponding to a driver name in the GDAL library. + + Args: + driver_name (str): The name of the driver. + + Returns: + Optional[str]: The file extension corresponding to the driver name, or None if the driver is not found or does not have a specific file extension. + """ + from osgeo import ogr + + driver = ogr.GetDriverByName(driver_name) + if driver is None: + drivers = get_gdal_drivers() + raise ValueError( + f"Driver {driver_name} not found. Available drivers: {drivers}" + ) + + metadata = driver.GetMetadata() + if "DMD_EXTENSION" in metadata: + file_extension = driver.GetMetadataItem("DMD_EXTENSION") + else: + file_extensions = driver.GetMetadataItem("DMD_EXTENSIONS") + if file_extensions == "json geojson": + file_extension = "geojson" + else: + file_extension = file_extensions.split()[0].lower() + + return file_extension + + +def gdb_to_vector( + gdb_path: str, + out_dir: str, + layers: Optional[List[str]] = None, + gdal_driver: str = "GPKG", + file_extension: Optional[str] = None, +): + """Converts layers from a File Geodatabase (GDB) to a vector format. + + Args: + gdb_path (str): The path to the File Geodatabase (GDB). + out_dir (str): The output directory to save the converted files. + layers (Optional[List[str]]): A list of layer names to convert. If None, all layers will be converted. Default is None. + gdal_driver (str): The GDAL driver name for the output vector format. Default is "GPKG". + file_extension (Optional[str]): The file extension for the output files. If None, it will be determined automatically based on the gdal_driver. Default is None. + + Returns: + None + """ + from osgeo import ogr + + # Open the GDB + gdb_driver = ogr.GetDriverByName("OpenFileGDB") + gdb_dataset = gdb_driver.Open(gdb_path, 0) + + # Get the number of layers in the GDB + layer_count = gdb_dataset.GetLayerCount() + + if isinstance(layers, str): + layers = [layers] + + # Iterate over the layers + for i in range(layer_count): + layer = gdb_dataset.GetLayerByIndex(i) + feature_class_name = layer.GetName() + + if layers is not None: + if feature_class_name not in layers: + continue + + if file_extension is None: + file_extension = get_gdal_file_extension(gdal_driver) + + # Create the output file path + output_file = os.path.join(out_dir, feature_class_name + "." + file_extension) + + # Create the output driver + output_driver = ogr.GetDriverByName(gdal_driver) + output_dataset = output_driver.CreateDataSource(output_file) + + # Copy the input layer to the output format + output_dataset.CopyLayer(layer, feature_class_name) + + output_dataset = None + + # Close the GDB dataset + gdb_dataset = None + + +def gdb_layer_names(gdb_path: str) -> List[str]: + """Get a list of layer names in a File Geodatabase (GDB). + + Args: + gdb_path (str): The path to the File Geodatabase (GDB). + + Returns: + List[str]: A list of layer names in the GDB. + """ + + from osgeo import ogr + + # Open the GDB + gdb_driver = ogr.GetDriverByName("OpenFileGDB") + gdb_dataset = gdb_driver.Open(gdb_path, 0) + + # Get the number of layers in the GDB + layer_count = gdb_dataset.GetLayerCount() + # Iterate over the layers + layer_names = [] + for i in range(layer_count): + layer = gdb_dataset.GetLayerByIndex(i) + feature_class_name = layer.GetName() + layer_names.append(feature_class_name) + + # Close the GDB dataset + gdb_dataset = None + return layer_names