From df1fce0bea9918b24bae6746422f16efddba5674 Mon Sep 17 00:00:00 2001 From: Mares2022 Date: Mon, 5 Jun 2023 18:28:30 +0200 Subject: [PATCH 1/3] Change hazard object into functions Change hazard object into functions --- hydromt_fiat/fiat.py | 2 +- hydromt_fiat/hazard_functions.py | 332 +++++++++++++++++++++++ hydromt_fiat/validation_functions.py | 144 ++++++++++ hydromt_fiat/workflows/hazard.py | 376 ++++++++++----------------- tests/test_hazard.py | 28 +- 5 files changed, 633 insertions(+), 249 deletions(-) create mode 100644 hydromt_fiat/hazard_functions.py create mode 100644 hydromt_fiat/validation_functions.py diff --git a/hydromt_fiat/fiat.py b/hydromt_fiat/fiat.py index 86287fa8..70a4b06c 100644 --- a/hydromt_fiat/fiat.py +++ b/hydromt_fiat/fiat.py @@ -263,7 +263,7 @@ def _configread(self, fn): return config.load_file(fn) def check_path_exists(self, fn): - """TODO: decide to use this or another function (check_file_exist in validation.py)""" + """TODO: decide to use this or another function (check_file_exist in py)""" path = Path(fn) self.logger.debug(f"Reading file {str(path.name)}") if not fn.is_file(): diff --git a/hydromt_fiat/hazard_functions.py b/hydromt_fiat/hazard_functions.py new file mode 100644 index 00000000..9392b80a --- /dev/null +++ b/hydromt_fiat/hazard_functions.py @@ -0,0 +1,332 @@ +from hydromt_fiat.validation_functions import * +from pathlib import Path +import geopandas as gpd +from ast import literal_eval +import os +import xarray as xr + +def get_lists( + map_fn, + map_type, + chunks, + rp, + crs, + nodata, + var, + +): + dict_lists = dict() + + def validate_param(dictionary, param, name, types): + param_lst = [param] if isinstance(param, types) else param + check_param_type(param_lst, name=name, types=types) + dictionary[name+'_lst'] = param_lst + return + + validate_param(dict_lists, map_fn, name="map_fn", types=(str, Path)) + validate_param(dict_lists, map_type, name="map_type", types=str) + if chunks != "auto": + validate_param(dict_lists, chunks, name="chunks", types=(int, dict)) + if rp is not None: + validate_param(dict_lists, rp, name="rp", types=(float, int)) + if crs is not None: + validate_param(dict_lists, crs, name="crs", types=(int, str)) + if nodata is not None: + validate_param(dict_lists, nodata, name="nodata", types=(float, int)) + if var is not None: + validate_param(dict_lists, var, name="var", types=str) + + return dict_lists + +def check_parameters( + dict_lists, + model, + chunks, + rp, + crs, + nodata, + var, +): + + def error_message(variable_list): + raise IndexError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") + # raise TypeError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") + + # Checks the hazard input parameter types. + + # Checks map path list + map_fn_lst = dict_lists['map_fn_lst'] + + # Checks map path list + if not len(dict_lists['map_type_lst']) == 1 and not len(dict_lists['map_type_lst']) == len(map_fn_lst): + error_message("map_type") + + # Checks the chunk list. The list must be equal to the number of maps. + if chunks != "auto": + if not len(dict_lists['chunks_lst']) == 1 and not len(dict_lists['chunks_lst']) == len(map_fn_lst): + error_message("chunks") + + # Checks the return period list. The list must be equal to the number of maps. + if rp is not None: + if not len(dict_lists['rp_lst']) == len(map_fn_lst): + error_message("rp") + + # Checks the projection list + if crs is not None: + if not len(dict_lists['crs_lst']) == 1 and not len(dict_lists['crs_lst']) == len(map_fn_lst): + error_message("crs") + + # Checks the no data list + if nodata is not None: + if not len(dict_lists['nodata_lst']) == 1 and not len(dict_lists['nodata_lst']) == len(map_fn_lst): + error_message("nodata") + + # Checks the var list + if var is not None: + if not len(dict_lists['var_lst']) == 1 and not len(dict_lists['var_lst']) == len(map_fn_lst): + error_message('var') + + # Check if the hazard input files exist. + check_file_exist(model, param_lst=map_fn_lst, name="map_fn") + +def process_maps( + dict_lists, + model, + name_catalog, + hazard_type, + risk_output, + crs, + nodata, + var, + chunks, + region=gpd.GeoDataFrame(), + **kwargs, +): + map_fn_lst = dict_lists['map_fn_lst'] + map_type_lst = dict_lists['map_type_lst'] + + list_names = [] + list_rp = [] + + for idx, da_map_fn in enumerate(map_fn_lst): + + # Check if it is a path or a name from the catalog + if os.path.exists(da_map_fn): + da_map_fn = Path(da_map_fn) + da_name = da_map_fn.stem + da_suffix = da_map_fn.suffix + list_names.append(da_name) + else: + da_name = da_map_fn + list_names.append(da_name) + + da_type = get_param( + map_type_lst, + map_fn_lst, + "hazard", + da_name, + idx, + "map type" + ) + + # Get the local hazard map. + kwargs.update(chunks=chunks if chunks == "auto" else dict_lists['chunks_lst'][idx]) + + if "da_suffix" in locals() and da_suffix == ".nc": + if var is None: + raise ValueError( + "The 'var' parameter is required when reading NetCDF data." + ) + da_var = get_param( + dict_lists['var_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "NetCDF variable", + ) + kwargs.update(variables=da_var) + + # reading from path + if da_map_fn.stem: + if da_map_fn.stem == "sfincs_map": + ds_map = xr.open_dataset(da_map_fn) + da = ds_map[kwargs["variables"]].squeeze(dim="timemax").drop_vars("timemax") + da.raster.set_crs(ds_map.crs.epsg_code) + da.raster.set_nodata(nodata=ds_map.encoding.get("_FillValue")) + da.reset_coords(['spatial_ref'], drop=False) + da.encoding["_FillValue"] = None + + else: + if not region.empty: + da = model.data_catalog.get_rasterdataset( + da_map_fn, geom=region, **kwargs + ) + else: + da = model.data_catalog.get_rasterdataset(da_map_fn, **kwargs) + # reading from the datacatalog + else: + if not region.empty: + da = model.data_catalog.get_rasterdataset( + name_catalog, variables=da_name, geom=region + ) + else: + da = model.data_catalog.get_rasterdataset( + name_catalog, variables=da_name + ) + + # Set the coordinate reference system. + if crs is not None: + da_crs = get_param( + dict_lists['crs_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "coordinate reference system", + ) + da_crs_str = da_crs if "EPSG" in da_crs else f"EPSG:{da_crs}" + da.raster.set_crs(da_crs_str) + elif crs is None and not da.raster.crs: + raise ValueError( + "The hazard map has no coordinate reference system assigned." + ) + + # Set nodata and mask the nodata value. + if nodata is not None: + da_nodata = get_param( + dict_lists['nodata_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "nodata" + ) + da.raster.set_nodata(nodata=da_nodata) + elif nodata is None and da.raster.nodata is None: + raise ValueError("The hazard map has no nodata value assigned.") + + + # Correct (if necessary) the grid orientation from the lower to the upper left corner. + # This check could not be implemented into the sfincs_map outputs. They require to be transformed to geotiff first + if da_name != "sfincs_map": + if da.raster.res[1] > 0: + da = da.reindex( + {da.raster.y_dim: list(reversed(da.raster.ycoords))} + ) + + # Check if the obtained hazard map is identical. + if model.staticmaps and not model.staticmaps.raster.identical_grid(da): + raise ValueError("The hazard maps should have identical grids.") + + # Get the return period input parameter. + if 'rp_lst' in dict_lists: + da_rp = get_param( + dict_lists['rp_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "return period", + ) + else: + da_rp =None + + if risk_output: + da = da.expand_dims({'rp': [da_rp]}, axis=0) + + if risk_output and da_rp is None: + # Get (if possible) the return period from dataset names if the input parameter is None. + if "rp" in da_name.lower(): + + def fstrip(x): + return x in "0123456789." + + rp_str = "".join( + filter(fstrip, da_name.lower().split("rp")[-1]) + ).lstrip("0") + + try: + assert isinstance( + literal_eval(rp_str) if rp_str else None, (int, float) + ) + da_rp = literal_eval(rp_str) + list_rp.append(da_rp) + + except AssertionError: + raise ValueError( + f"Could not derive the return period for hazard map: {da_name}." + ) + else: + raise ValueError( + "The hazard map must contain a return period in order to conduct a risk calculation." + ) + + # Add the hazard map to config and staticmaps. + check_uniqueness( + model, + "hazard", + da_type, + da_name, + { + "usage": True, + "map_fn": da_map_fn, + "map_type": da_type, + "rp": da_rp, + "crs": da.raster.crs, + "nodata": da.raster.nodata, + # "var": None if "var_lst" not in locals() else self.var_lst[idx], + "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], + "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], + }, + file_type="hazard", + filename=da_name, + ) + + model.set_config( + "hazard", + da_type, + da_name, + { + "usage": "True", + "map_fn": da_map_fn, + "map_type": da_type, + "rp": da_rp, + "crs": da.raster.crs, + "nodata": da.raster.nodata, + # "var": None if "var_lst" not in locals() else self.var_lst[idx], + "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], + "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], + }, + ) + + model.set_maps(da, da_name) + post = f"(rp {da_rp})" if risk_output else "" + model.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}") + + if risk_output: + maps = model.maps + list_keys = list(maps.keys()) + maps_0 = maps[list_keys[0]].rename('risk') + list_keys.pop(0) + + for idx, x in enumerate(list_keys): + key_name = list_keys[idx] + layer = maps[key_name] + maps_0 = xr.concat([maps_0, layer], dim='rp') + + new_da = maps_0.to_dataset(name='RISK') + new_da.attrs = { "returnperiod": list(list_rp), + "type":self.map_type_lst, + 'name':list_names, + "Analysis": "Risk"} + + model.hazard = new_da + model.set_maps(model.hazard, 'HydroMT_Fiat_hazard') + + list_maps = list(model.maps.keys()) + + if risk_output: + for item in list_maps[:-1]: + model.maps.pop(item) + diff --git a/hydromt_fiat/validation_functions.py b/hydromt_fiat/validation_functions.py new file mode 100644 index 00000000..86ca18f7 --- /dev/null +++ b/hydromt_fiat/validation_functions.py @@ -0,0 +1,144 @@ +from pathlib import Path + +def check_dir_exist(dir, name=None): + """ """ + + if not isinstance(dir, Path): + raise TypeError( + f"The directory indicated by the '{name}' parameter does not exist." + ) + + +def check_file_exist(model, param_lst, name=None, input_dir=None): + """ """ + root = Path(model.root) + param_lst = [Path(p) for p in param_lst] + for param_idx, param in enumerate(param_lst): + if isinstance(param, dict): + fn_lst = list(param.values()) + else: + fn_lst = [param] + for fn_idx, fn in enumerate(fn_lst): + if not Path(fn).is_file(): + if root.joinpath(fn).is_file(): + if isinstance(param, dict): + param_lst[param_idx][ + list(param.keys())[fn_idx] + ] = root.joinpath(fn) + else: + param_lst[param_idx] = root.joinpath(fn) + if input_dir is not None: + if model.get_config(input_dir).joinpath(fn).is_file(): + if isinstance(param, dict): + param_lst[param_idx][ + list(param.keys())[fn_idx] + ] = model.get_config(input_dir).joinpath(fn) + else: + param_lst[param_idx] = model.get_config( + input_dir + ).joinpath(fn) + else: + if isinstance(param, dict): + param_lst[param_idx][list(param.keys())[fn_idx]] = Path(fn) + else: + param_lst[param_idx] = Path(fn) + try: + if isinstance(param, dict): + assert isinstance( + param_lst[param_idx][list(param.keys())[fn_idx]], Path + ) + else: + assert isinstance(param_lst[param_idx], Path) + except AssertionError: + if input_dir is None: + raise TypeError( + f"The file indicated by the '{name}' parameter does not" + f" exist in the directory '{root}'." + ) + else: + raise TypeError( + f"The file indicated by the '{name}' parameter does not" + f" exist in either of the directories '{root}' or " + f"'{model.get_config(input_dir)}'." + ) + +# def check_uniqueness(model, *args, file_type=None, filename=None): +# """TODO: this function should be checked""" + +# args = list(args) +# if len(args) == 1 and "." in args[0]: +# args = args[0].split(".") + args[1:] +# branch = args.pop(-1) +# for key in args[::-1]: +# branch = {key: branch} + +# if args[0].get_config(args[0], args[1]): +# for key in model.staticmaps.data_vars: +# if filename == key: +# raise ValueError( +# f"The filenames of the {file_type} maps should be unique." +# ) +# if ( +# model.get_config(args[0], args[1], key) +# == list(branch[args[0]][args[1]].values())[0] +# ): +# raise ValueError("Each model input layers must be unique.") + + +def check_uniqueness(model, *args, file_type=None, filename=None): + """ """ + + args = list(args) + if len(args) == 1 and "." in args[0]: + args = args[0].split(".") + args[1:] + branch = args.pop(-1) + for key in args[::-1]: + branch = {key: branch} + + if model.get_config(args[0], args[1]): + for key in model.staticmaps.data_vars: + if filename == key: + raise ValueError( + f"The filenames of the {file_type} maps should be unique." + ) + if ( + model.get_config(args[0], args[1], key) + == list(branch[args[0]][args[1]].values())[0] + ): + raise ValueError(f"Each model input layers must be unique.") + + +def check_param_type(param, name=None, types=None): + """ """ + + if not isinstance(param, list): + raise TypeError( + f"The '{name}_lst' parameter should be a of {list}, received a " + f"{type(param)} instead." + ) + for i in param: + if not isinstance(i, types): + if isinstance(types, tuple): + types = " or ".join([str(j) for j in types]) + else: + types = types + raise TypeError( + f"The '{name}' parameter should be a of {types}, received a " + f"{type(i)} instead." + ) + + + + +def get_param(param_lst, map_fn_lst, file_type, filename, i, param_name): + """ """ + + if len(param_lst) == 1: + return param_lst[0] + elif len(param_lst) != 1 and len(map_fn_lst) == len(param_lst): + return param_lst[i] + elif len(param_lst) != 1 and len(map_fn_lst) != len(param_lst): + raise IndexError( + f"Could not derive the {param_name} parameter for {file_type} " + f"map: {filename}." + ) \ No newline at end of file diff --git a/hydromt_fiat/workflows/hazard.py b/hydromt_fiat/workflows/hazard.py index 30bb79ff..56db2a60 100644 --- a/hydromt_fiat/workflows/hazard.py +++ b/hydromt_fiat/workflows/hazard.py @@ -1,4 +1,4 @@ -from hydromt_fiat.validation import Validation +from hydromt_fiat.validation_functions import * from pathlib import Path import geopandas as gpd from ast import literal_eval @@ -9,11 +9,11 @@ class Hazard: def __init__(self): self.crs = "" - self.check = Validation() + self.map_fn_lst = [] + self.map_type_lst = [] - def checkInputs( + def get_lists( self, - model_fiat, map_fn, map_type, chunks, @@ -21,91 +21,87 @@ def checkInputs( crs, nodata, var, - ): + + ): + dict_lists = dict() + def validate_param(dictionary, param, name, types): + param_lst = [param] if isinstance(param, types) else param + check_param_type(param_lst, name=name, types=types) + dictionary[name+'_lst'] = param_lst + return + + validate_param(dict_lists, map_fn, name="map_fn", types=(str, Path)) + validate_param(dict_lists, map_type, name="map_type", types=str) + if chunks != "auto": + validate_param(dict_lists, chunks, name="chunks", types=(int, dict)) + if rp is not None: + validate_param(dict_lists, rp, name="rp", types=(float, int)) + if crs is not None: + validate_param(dict_lists, crs, name="crs", types=(int, str)) + if nodata is not None: + validate_param(dict_lists, nodata, name="nodata", types=(float, int)) + if var is not None: + validate_param(dict_lists, var, name="var", types=str) + + return dict_lists + + def check_parameters( + self, + dict_lists, + model, + chunks, + rp, + crs, + nodata, + var, + ): + def error_message(variable_list): + raise IndexError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") + # raise TypeError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") + # Checks the hazard input parameter types. + # Checks map path list - self.map_fn_lst = [map_fn] if isinstance(map_fn, (str, Path)) else map_fn - self.check.check_param_type(self.map_fn_lst, name="map_fn", types=(str, Path)) + map_fn_lst = dict_lists['map_fn_lst'] + # Checks map path list - self.map_type_lst = ( - [map_type] if isinstance(map_type, (str, Path)) else map_type - ) - self.check.check_param_type(self.map_type_lst, name="map_type", types=str) - if not len(self.map_type_lst) == 1 and not len(self.map_type_lst) == len( - self.map_fn_lst - ): - raise IndexError( - "The number of 'map_type' parameters should match with the number of " - "'map_fn' parameters." - ) + if not len(dict_lists['map_type_lst']) == 1 and not len(dict_lists['map_type_lst']) == len(map_fn_lst): + error_message("map_type") + # Checks the chunk list. The list must be equal to the number of maps. if chunks != "auto": - self.chunks_lst = [chunks] if isinstance(chunks, (int, dict)) else chunks - self.check.check_param_type( - self.chunks_lst, name="chunks", types=(int, dict) - ) - if not len(self.chunks_lst) == 1 and not len(self.chunks_lst) == len( - self.map_fn_lst - ): - raise IndexError( - "The number of 'chunks' parameters should match with the number of " - "'map_fn' parameters." - ) + if not len(dict_lists['chunks_lst']) == 1 and not len(dict_lists['chunks_lst']) == len(map_fn_lst): + error_message("chunks") + # Checks the return period list. The list must be equal to the number of maps. if rp is not None: - self.rp_lst = [rp] if isinstance(rp, (int, float)) else rp - self.check.check_param_type(self.rp_lst, name="rp", types=(float, int)) - if not len(self.rp_lst) == len(self.map_fn_lst): - raise IndexError( - "The number of 'rp' parameters should match with the number of " - "'map_fn' parameters." - ) + if not len(dict_lists['rp_lst']) == len(map_fn_lst): + error_message("rp") + # Checks the projection list if crs is not None: - self.crs_lst = [str(crs)] if isinstance(crs, (int, str)) else crs - self.check.check_param_type(self.crs_lst, name="crs", types=(int, str)) - if not len(self.crs_lst) == 1 and not len(self.crs_lst) == len( - self.map_fn_lst - ): - raise IndexError( - "The number of 'crs' parameters should match with the number of " - "'map_fn' parameters." - ) + if not len(dict_lists['crs_lst']) == 1 and not len(dict_lists['crs_lst']) == len(map_fn_lst): + error_message("crs") + # Checks the no data list if nodata is not None: - self.nodata_lst = [nodata] if isinstance(nodata, (float, int)) else nodata - self.check.check_param_type( - self.nodata_lst, name="nodata", types=(float, int) - ) - if not len(self.nodata_lst) == 1 and not len(self.nodata_lst) == len( - self.map_fn_lst - ): - raise IndexError( - "The number of 'nodata' parameters should match with the number of " - "'map_fn' parameters." - ) + if not len(dict_lists['nodata_lst']) == 1 and not len(dict_lists['nodata_lst']) == len(map_fn_lst): + error_message("nodata") + # Checks the var list if var is not None: - self.var_lst = [var] if isinstance(var, str) else var - self.check.check_param_type(self.var_lst, name="var", types=str) - if not len(self.var_lst) == 1 and not len(self.var_lst) == len( - self.map_fn_lst - ): - raise IndexError( - "The number of 'var' parameters should match with the number of " - "'map_fn' parameters." - ) + if not len(dict_lists['var_lst']) == 1 and not len(dict_lists['var_lst']) == len(map_fn_lst): + error_message('var') # Check if the hazard input files exist. - self.check.check_file_exist( - model_fiat.root, param_lst=self.map_fn_lst, name="map_fn" - ) + check_file_exist(model, param_lst=map_fn_lst, name="map_fn") - def readMaps( + def process_maps( self, - model_fiat, + dict_lists, + model, name_catalog, hazard_type, risk_output, @@ -116,135 +112,135 @@ def readMaps( region=gpd.GeoDataFrame(), **kwargs, ): - # This list of names should be provided to have the maps in order - # catalog = model_fiat.data_catalog.get_rasterdataset(name_catalog) - # self.map_fn_lst = [i for i in list(catalog.variables) if maps_id in i] + map_fn_lst = dict_lists['map_fn_lst'] + map_type_lst = dict_lists['map_type_lst'] - # Read the hazard map(s) and add to config and staticmaps. - list_rp = [] - list_names = [] + list_names = [] + list_rp = [] - for idx, da_map_fn in enumerate(self.map_fn_lst): - # Check if it is a path or a name from the catalog + for idx, da_map_fn in enumerate(map_fn_lst): + # Check if it is a path or a name from the catalog if os.path.exists(da_map_fn): da_map_fn = Path(da_map_fn) - da_name = da_map_fn.stem + da_name = da_map_fn.stem da_suffix = da_map_fn.suffix list_names.append(da_name) - else: da_name = da_map_fn list_names.append(da_name) - da_type = self.check.get_param( - self.map_type_lst, self.map_fn_lst, "hazard", da_name, idx, "map type" + da_type = get_param( + map_type_lst, + map_fn_lst, + "hazard", + da_name, + idx, + "map type" ) # Get the local hazard map. - kwargs.update(chunks=chunks if chunks == "auto" else self.chunks_lst[idx]) + kwargs.update(chunks=chunks if chunks == "auto" else dict_lists['chunks_lst'][idx]) if "da_suffix" in locals() and da_suffix == ".nc": if var is None: raise ValueError( "The 'var' parameter is required when reading NetCDF data." ) - kwargs.update( - variables=self.check.get_param( - self.var_lst, - self.map_fn_lst, + da_var = get_param( + dict_lists['var_lst'], + map_fn_lst, "hazard", da_name, idx, "NetCDF variable", - ) ) + kwargs.update(variables=da_var) - if os.path.exists(da_map_fn): + # reading from path + if da_map_fn.stem: if da_map_fn.stem == "sfincs_map": - #da = model_fiat.data_catalog.get_rasterdataset(da_map_fn) The file cannot be directly read by HydroMT - ds_map = xr.open_dataset(da_map_fn) - da = ds_map[kwargs["variables"]].squeeze(dim="timemax").drop_vars("timemax") + da = ds_map[kwargs["variables"]].squeeze(dim="timemax").drop_vars("timemax") da.raster.set_crs(ds_map.crs.epsg_code) da.raster.set_nodata(nodata=ds_map.encoding.get("_FillValue")) da.reset_coords(['spatial_ref'], drop=False) da.encoding["_FillValue"] = None - #da_name = kwargs["variables"] + else: if not region.empty: - da = model_fiat.data_catalog.get_rasterdataset( + da = model.data_catalog.get_rasterdataset( da_map_fn, geom=region, **kwargs ) else: - da = model_fiat.data_catalog.get_rasterdataset(da_map_fn, **kwargs) + da = model.data_catalog.get_rasterdataset(da_map_fn, **kwargs) + # reading from the datacatalog else: if not region.empty: - da = model_fiat.data_catalog.get_rasterdataset( + da = model.data_catalog.get_rasterdataset( name_catalog, variables=da_name, geom=region ) else: - da = model_fiat.data_catalog.get_rasterdataset( + da = model.data_catalog.get_rasterdataset( name_catalog, variables=da_name ) - # Set (if necessary) the coordinate reference system. - # if crs is not None and not da.raster.crs.is_epsg_code: - # if crs is not None and not da.raster.crs: - # Change to be defined by the user + # Set the coordinate reference system. if crs is not None: - da_crs = self.check.get_param( - self.crs_lst, - self.map_fn_lst, - "hazard", - da_name, - idx, - "coordinate reference system", + da_crs = get_param( + dict_lists['crs_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "coordinate reference system", ) da_crs_str = da_crs if "EPSG" in da_crs else f"EPSG:{da_crs}" da.raster.set_crs(da_crs_str) - # elif crs is None and not da.raster.crs.is_epsg_code: elif crs is None and not da.raster.crs: raise ValueError( "The hazard map has no coordinate reference system assigned." ) - # Set (if necessary) and mask the nodata value. + # Set nodata and mask the nodata value. if nodata is not None: - da_nodata = self.check.get_param( - self.nodata_lst, self.map_fn_lst, "hazard", da_name, idx, "nodata" + da_nodata = get_param( + dict_lists['nodata_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "nodata" ) da.raster.set_nodata(nodata=da_nodata) elif nodata is None and da.raster.nodata is None: raise ValueError("The hazard map has no nodata value assigned.") + - # Correct (if necessary) the grid orientation from the lower to the upper left corner. - if da_name != "sfincs_map": #This check could not be implemented into the sfincs_map outputs + # Correct (if necessary) the grid orientation from the lower to the upper left corner. + # This check could not be implemented into the sfincs_map outputs. They require to be transformed to geotiff first + if da_name != "sfincs_map": if da.raster.res[1] > 0: da = da.reindex( {da.raster.y_dim: list(reversed(da.raster.ycoords))} ) # Check if the obtained hazard map is identical. - if ( - model_fiat.staticmaps - and not model_fiat.staticmaps.raster.identical_grid(da) - ): + if model.staticmaps and not model.staticmaps.raster.identical_grid(da): raise ValueError("The hazard maps should have identical grids.") # Get the return period input parameter. - da_rp = ( - self.check.get_param( - self.rp_lst, - self.map_fn_lst, - "hazard", - da_name, - idx, - "return period", - ) - if hasattr(self, "rp_lst ") - else None - ) + if 'rp_lst' in dict_lists: + da_rp = get_param( + dict_lists['rp_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "return period", + ) + else: + da_rp =None if risk_output: da = da.expand_dims({'rp': [da_rp]}, axis=0) @@ -277,8 +273,8 @@ def fstrip(x): ) # Add the hazard map to config and staticmaps. - self.check.check_uniqueness( - model_fiat, + check_uniqueness( + model, "hazard", da_type, da_name, @@ -290,16 +286,16 @@ def fstrip(x): "crs": da.raster.crs, "nodata": da.raster.nodata, # "var": None if "var_lst" not in locals() else self.var_lst[idx], - "var": None if not hasattr(self, "var_lst") else self.var_lst[idx], - "chunks": "auto" if chunks == "auto" else self.chunks_lst[idx], + "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], + "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], }, file_type="hazard", filename=da_name, ) - model_fiat.set_config( + model.set_config( "hazard", - # da_type, + da_type, da_name, { "usage": "True", @@ -309,40 +305,17 @@ def fstrip(x): "crs": da.raster.crs, "nodata": da.raster.nodata, # "var": None if "var_lst" not in locals() else self.var_lst[idx], - "var": None if not hasattr(self, "var_lst") else self.var_lst[idx], - "chunks": "auto" if chunks == "auto" else self.chunks_lst[idx], + "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], + "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], }, ) - # extra functionalities - # da = da.rename(f"map_{da_rp}") - # da.attrs = { - # "usage": "True", - # "map_fn": da_map_fn, - # "map_type": da_type, - # "rp": da_rp, - # "crs": da.raster.crs, - # "nodata": da.raster.nodata, - # # "var": None if "var_lst" not in locals() else self.var_lst[idx], - # "var": None if not hasattr(self, "var_lst") else self.var_lst[idx], - # "chunks": "auto" if chunks == "auto" else self.chunks_lst[idx], - # } - - - - model_fiat.set_maps(da, da_name) - # post = f"(rp {da_rp})" if rp is not None and risk_output else "" + model.set_maps(da, da_name) post = f"(rp {da_rp})" if risk_output else "" - model_fiat.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}") - - # new_da = xr.concat([new_da, da], dim='rp') - - # import numpy as np - # new_da = xr.DataArray(np.zeros_like(da), dims=da.dims, coords=da.coords).rename('new_dataframe') - + model.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}") if risk_output: - maps = model_fiat.maps + maps = model.maps list_keys = list(maps.keys()) maps_0 = maps[list_keys[0]].rename('risk') list_keys.pop(0) @@ -358,79 +331,12 @@ def fstrip(x): 'name':list_names, "Analysis": "Risk"} - model_fiat.hazard = new_da - model_fiat.set_maps(model_fiat.hazard, 'HydroMT_Fiat_hazard') + model.hazard = new_da + model.set_maps(model.hazard, 'HydroMT_Fiat_hazard') - list_maps = list(model_fiat.maps.keys()) + list_maps = list(model.maps.keys()) if risk_output: for item in list_maps[:-1]: - model_fiat.maps.pop(item) - - # else: - # model_fiat.hazard = new_da - - - # # model_fiat.maps.to_netcdf("test_hazard_1/test.nc") - - # #model_fiat.set_maps(catalog, "all") - # maps = model_fiat.maps - # list_keys = list(maps.keys()) - # maps_0 = maps[list_keys[0]].rename('risk') - # list_keys.pop(0) - - # for idx, x in enumerate(list_keys): - # key_name = list_keys[idx] - # layer = maps[key_name] - # maps_0 = xr.concat([maps_0, layer], dim='rp') - - # if not risk_output: - # # new_da = maps_0.drop_dims('rp') - # new_da = maps_0.to_dataset(name='EVENT') - # new_da.attrs = { "returnperiod": list(list_rp), - # "type":self.map_type_lst, - # 'name':list_names, - # "Analysis": "Event base"} - #TODO: A netcdf is not required when working with evetns - # else: - # new_da = maps_0.to_dataset(name='RISK') - # new_da.attrs = { "returnperiod": list(list_rp), - # "type":self.map_type_lst, - # 'name':list_names, - # "Analysis": "Risk"} - - # if risk_output: - # model_fiat.hazard = new_da - # model_fiat.set_maps(model_fiat.hazard, 'HydroMT_Fiat_hazard') - # else: - # model_fiat.hazard = new_da - - # list_maps = list(model_fiat.maps.keys()) - - # if risk_output: - # for item in list_maps[:-1]: - # model_fiat.maps.pop(item) - - - # ds = xr.Dataset(maps) - # ds.raster.set_crs(da.raster.crs) - - # # new_da= ds.to_array(dim='rp') - - # # new_da = new_da.to_dataset(name='Full_stack') - # # new_da.attrs = { "returnperiod": list_rp, - # # "type":self.map_type_lst, - # # 'name':list_names} - - # # for var_name, var_data in ds.variables.items(): - # # new_da = xr.concat([new_da, ds[var_name]], dim='rp') - - # for layer_name in ds: - # layer = ds[layer_name] - # new_da = xr.concat([new_da, layer], dim='rp') + model.maps.pop(item) - # # new_da = new_da.to_dataset() - # # config = model_fiat.config - # # new_da.to_netcdf("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database/test_hazard_1/hazard/test_final_v2.nc") - # #C:\Users\fuentesm\CISNE\HydroMT_sprint_sessions - return model_fiat.maps \ No newline at end of file diff --git a/tests/test_hazard.py b/tests/test_hazard.py index d1dfead4..559f7b01 100644 --- a/tests/test_hazard.py +++ b/tests/test_hazard.py @@ -1,18 +1,12 @@ from hydromt_fiat.fiat import FiatModel -from hydromt_fiat.workflows.hazard import Hazard +from hydromt_fiat.hazard_functions import * from hydromt.config import configread from pathlib import Path import pytest -EXAMPLEDIR = Path().absolute() / "examples" DATASET = Path("P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database") _cases = { - # "fiat_flood": { - # "region_grid": Path("data").joinpath("flood_hand", "hand_050cm_rp02.tif"), - # "example" : "fiat_flood", - # "ini" : "fiat_flood.ini", - # }, "fiat_objects": { "folder" : "test_hazard_1", @@ -43,10 +37,7 @@ def test_Hazard(case): risk_output = configread(config_fn)['setup_hazard']['risk_output'] hazard_type = configread(config_fn)['setup_config']['hazard_type'] - hyfm_hazard = Hazard() - - hyfm_hazard.checkInputs( - hyfm, + param_lst = get_lists( map_fn, map_type, chunks, @@ -56,7 +47,18 @@ def test_Hazard(case): var, ) - ds = hyfm_hazard.readMaps( + check_parameters( + param_lst, + hyfm, + chunks, + rp, + crs, + nodata, + var, + ) + + process_maps( + param_lst, hyfm, name_catalog, hazard_type, @@ -67,6 +69,6 @@ def test_Hazard(case): chunks, ) - assert hyfm + assert param_lst From 7b38fb819f43d330b7446a6a762326ff78960d7f Mon Sep 17 00:00:00 2001 From: Mares2022 Date: Tue, 6 Jun 2023 13:22:53 +0200 Subject: [PATCH 2/3] Rename hazard and validation python codes Rename hazard and validation python codes --- hydromt_fiat/fiat.py | 2 +- hydromt_fiat/hazard_functions.py | 332 --------------- hydromt_fiat/validation.py | 232 +++++----- hydromt_fiat/validation_functions.py | 144 ------- hydromt_fiat/workflows/hazard.py | 610 +++++++++++++-------------- hydromt_fiat/workflows/hazard_old.py | 342 +++++++++++++++ tests/test_hazard.py | 2 +- 7 files changed, 772 insertions(+), 892 deletions(-) delete mode 100644 hydromt_fiat/hazard_functions.py delete mode 100644 hydromt_fiat/validation_functions.py create mode 100644 hydromt_fiat/workflows/hazard_old.py diff --git a/hydromt_fiat/fiat.py b/hydromt_fiat/fiat.py index 70a4b06c..a11fea60 100644 --- a/hydromt_fiat/fiat.py +++ b/hydromt_fiat/fiat.py @@ -1,7 +1,7 @@ """Implement fiat model class""" from hydromt_fiat.workflows.vulnerability import Vulnerability -from hydromt_fiat.workflows.hazard import Hazard +from hydromt_fiat.workflows.hazard_old import Hazard from hydromt.models.model_grid import GridModel from hydromt_fiat.workflows.exposure_vector import ExposureVector import logging diff --git a/hydromt_fiat/hazard_functions.py b/hydromt_fiat/hazard_functions.py deleted file mode 100644 index 9392b80a..00000000 --- a/hydromt_fiat/hazard_functions.py +++ /dev/null @@ -1,332 +0,0 @@ -from hydromt_fiat.validation_functions import * -from pathlib import Path -import geopandas as gpd -from ast import literal_eval -import os -import xarray as xr - -def get_lists( - map_fn, - map_type, - chunks, - rp, - crs, - nodata, - var, - -): - dict_lists = dict() - - def validate_param(dictionary, param, name, types): - param_lst = [param] if isinstance(param, types) else param - check_param_type(param_lst, name=name, types=types) - dictionary[name+'_lst'] = param_lst - return - - validate_param(dict_lists, map_fn, name="map_fn", types=(str, Path)) - validate_param(dict_lists, map_type, name="map_type", types=str) - if chunks != "auto": - validate_param(dict_lists, chunks, name="chunks", types=(int, dict)) - if rp is not None: - validate_param(dict_lists, rp, name="rp", types=(float, int)) - if crs is not None: - validate_param(dict_lists, crs, name="crs", types=(int, str)) - if nodata is not None: - validate_param(dict_lists, nodata, name="nodata", types=(float, int)) - if var is not None: - validate_param(dict_lists, var, name="var", types=str) - - return dict_lists - -def check_parameters( - dict_lists, - model, - chunks, - rp, - crs, - nodata, - var, -): - - def error_message(variable_list): - raise IndexError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") - # raise TypeError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") - - # Checks the hazard input parameter types. - - # Checks map path list - map_fn_lst = dict_lists['map_fn_lst'] - - # Checks map path list - if not len(dict_lists['map_type_lst']) == 1 and not len(dict_lists['map_type_lst']) == len(map_fn_lst): - error_message("map_type") - - # Checks the chunk list. The list must be equal to the number of maps. - if chunks != "auto": - if not len(dict_lists['chunks_lst']) == 1 and not len(dict_lists['chunks_lst']) == len(map_fn_lst): - error_message("chunks") - - # Checks the return period list. The list must be equal to the number of maps. - if rp is not None: - if not len(dict_lists['rp_lst']) == len(map_fn_lst): - error_message("rp") - - # Checks the projection list - if crs is not None: - if not len(dict_lists['crs_lst']) == 1 and not len(dict_lists['crs_lst']) == len(map_fn_lst): - error_message("crs") - - # Checks the no data list - if nodata is not None: - if not len(dict_lists['nodata_lst']) == 1 and not len(dict_lists['nodata_lst']) == len(map_fn_lst): - error_message("nodata") - - # Checks the var list - if var is not None: - if not len(dict_lists['var_lst']) == 1 and not len(dict_lists['var_lst']) == len(map_fn_lst): - error_message('var') - - # Check if the hazard input files exist. - check_file_exist(model, param_lst=map_fn_lst, name="map_fn") - -def process_maps( - dict_lists, - model, - name_catalog, - hazard_type, - risk_output, - crs, - nodata, - var, - chunks, - region=gpd.GeoDataFrame(), - **kwargs, -): - map_fn_lst = dict_lists['map_fn_lst'] - map_type_lst = dict_lists['map_type_lst'] - - list_names = [] - list_rp = [] - - for idx, da_map_fn in enumerate(map_fn_lst): - - # Check if it is a path or a name from the catalog - if os.path.exists(da_map_fn): - da_map_fn = Path(da_map_fn) - da_name = da_map_fn.stem - da_suffix = da_map_fn.suffix - list_names.append(da_name) - else: - da_name = da_map_fn - list_names.append(da_name) - - da_type = get_param( - map_type_lst, - map_fn_lst, - "hazard", - da_name, - idx, - "map type" - ) - - # Get the local hazard map. - kwargs.update(chunks=chunks if chunks == "auto" else dict_lists['chunks_lst'][idx]) - - if "da_suffix" in locals() and da_suffix == ".nc": - if var is None: - raise ValueError( - "The 'var' parameter is required when reading NetCDF data." - ) - da_var = get_param( - dict_lists['var_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "NetCDF variable", - ) - kwargs.update(variables=da_var) - - # reading from path - if da_map_fn.stem: - if da_map_fn.stem == "sfincs_map": - ds_map = xr.open_dataset(da_map_fn) - da = ds_map[kwargs["variables"]].squeeze(dim="timemax").drop_vars("timemax") - da.raster.set_crs(ds_map.crs.epsg_code) - da.raster.set_nodata(nodata=ds_map.encoding.get("_FillValue")) - da.reset_coords(['spatial_ref'], drop=False) - da.encoding["_FillValue"] = None - - else: - if not region.empty: - da = model.data_catalog.get_rasterdataset( - da_map_fn, geom=region, **kwargs - ) - else: - da = model.data_catalog.get_rasterdataset(da_map_fn, **kwargs) - # reading from the datacatalog - else: - if not region.empty: - da = model.data_catalog.get_rasterdataset( - name_catalog, variables=da_name, geom=region - ) - else: - da = model.data_catalog.get_rasterdataset( - name_catalog, variables=da_name - ) - - # Set the coordinate reference system. - if crs is not None: - da_crs = get_param( - dict_lists['crs_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "coordinate reference system", - ) - da_crs_str = da_crs if "EPSG" in da_crs else f"EPSG:{da_crs}" - da.raster.set_crs(da_crs_str) - elif crs is None and not da.raster.crs: - raise ValueError( - "The hazard map has no coordinate reference system assigned." - ) - - # Set nodata and mask the nodata value. - if nodata is not None: - da_nodata = get_param( - dict_lists['nodata_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "nodata" - ) - da.raster.set_nodata(nodata=da_nodata) - elif nodata is None and da.raster.nodata is None: - raise ValueError("The hazard map has no nodata value assigned.") - - - # Correct (if necessary) the grid orientation from the lower to the upper left corner. - # This check could not be implemented into the sfincs_map outputs. They require to be transformed to geotiff first - if da_name != "sfincs_map": - if da.raster.res[1] > 0: - da = da.reindex( - {da.raster.y_dim: list(reversed(da.raster.ycoords))} - ) - - # Check if the obtained hazard map is identical. - if model.staticmaps and not model.staticmaps.raster.identical_grid(da): - raise ValueError("The hazard maps should have identical grids.") - - # Get the return period input parameter. - if 'rp_lst' in dict_lists: - da_rp = get_param( - dict_lists['rp_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "return period", - ) - else: - da_rp =None - - if risk_output: - da = da.expand_dims({'rp': [da_rp]}, axis=0) - - if risk_output and da_rp is None: - # Get (if possible) the return period from dataset names if the input parameter is None. - if "rp" in da_name.lower(): - - def fstrip(x): - return x in "0123456789." - - rp_str = "".join( - filter(fstrip, da_name.lower().split("rp")[-1]) - ).lstrip("0") - - try: - assert isinstance( - literal_eval(rp_str) if rp_str else None, (int, float) - ) - da_rp = literal_eval(rp_str) - list_rp.append(da_rp) - - except AssertionError: - raise ValueError( - f"Could not derive the return period for hazard map: {da_name}." - ) - else: - raise ValueError( - "The hazard map must contain a return period in order to conduct a risk calculation." - ) - - # Add the hazard map to config and staticmaps. - check_uniqueness( - model, - "hazard", - da_type, - da_name, - { - "usage": True, - "map_fn": da_map_fn, - "map_type": da_type, - "rp": da_rp, - "crs": da.raster.crs, - "nodata": da.raster.nodata, - # "var": None if "var_lst" not in locals() else self.var_lst[idx], - "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], - "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], - }, - file_type="hazard", - filename=da_name, - ) - - model.set_config( - "hazard", - da_type, - da_name, - { - "usage": "True", - "map_fn": da_map_fn, - "map_type": da_type, - "rp": da_rp, - "crs": da.raster.crs, - "nodata": da.raster.nodata, - # "var": None if "var_lst" not in locals() else self.var_lst[idx], - "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], - "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], - }, - ) - - model.set_maps(da, da_name) - post = f"(rp {da_rp})" if risk_output else "" - model.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}") - - if risk_output: - maps = model.maps - list_keys = list(maps.keys()) - maps_0 = maps[list_keys[0]].rename('risk') - list_keys.pop(0) - - for idx, x in enumerate(list_keys): - key_name = list_keys[idx] - layer = maps[key_name] - maps_0 = xr.concat([maps_0, layer], dim='rp') - - new_da = maps_0.to_dataset(name='RISK') - new_da.attrs = { "returnperiod": list(list_rp), - "type":self.map_type_lst, - 'name':list_names, - "Analysis": "Risk"} - - model.hazard = new_da - model.set_maps(model.hazard, 'HydroMT_Fiat_hazard') - - list_maps = list(model.maps.keys()) - - if risk_output: - for item in list_maps[:-1]: - model.maps.pop(item) - diff --git a/hydromt_fiat/validation.py b/hydromt_fiat/validation.py index 917ae249..86ca18f7 100644 --- a/hydromt_fiat/validation.py +++ b/hydromt_fiat/validation.py @@ -1,120 +1,144 @@ from pathlib import Path -### TO BE UPDATED ### -class Validation: - """CONTROL FUNCTIONS""" +def check_dir_exist(dir, name=None): + """ """ - def check_dir_exist(self, dir, name=None): - """ """ + if not isinstance(dir, Path): + raise TypeError( + f"The directory indicated by the '{name}' parameter does not exist." + ) - if not isinstance(dir, Path): - raise TypeError( - f"The directory indicated by the '{name}' parameter does not exist." - ) - def check_file_exist(self, root, param_lst, name=None, input_dir=None): - """ """ - root = Path(root) - param_lst = [Path(p) for p in param_lst] - for param_idx, param in enumerate(param_lst): - if isinstance(param, dict): - fn_lst = list(param.values()) - else: - fn_lst = [param] - for fn_idx, fn in enumerate(fn_lst): - if not Path(fn).is_file(): - if root.joinpath(fn).is_file(): +def check_file_exist(model, param_lst, name=None, input_dir=None): + """ """ + root = Path(model.root) + param_lst = [Path(p) for p in param_lst] + for param_idx, param in enumerate(param_lst): + if isinstance(param, dict): + fn_lst = list(param.values()) + else: + fn_lst = [param] + for fn_idx, fn in enumerate(fn_lst): + if not Path(fn).is_file(): + if root.joinpath(fn).is_file(): + if isinstance(param, dict): + param_lst[param_idx][ + list(param.keys())[fn_idx] + ] = root.joinpath(fn) + else: + param_lst[param_idx] = root.joinpath(fn) + if input_dir is not None: + if model.get_config(input_dir).joinpath(fn).is_file(): if isinstance(param, dict): param_lst[param_idx][ list(param.keys())[fn_idx] - ] = root.joinpath(fn) + ] = model.get_config(input_dir).joinpath(fn) else: - param_lst[param_idx] = root.joinpath(fn) - if input_dir is not None: - if self.get_config(input_dir).joinpath(fn).is_file(): - if isinstance(param, dict): - param_lst[param_idx][ - list(param.keys())[fn_idx] - ] = self.get_config(input_dir).joinpath(fn) - else: - param_lst[param_idx] = self.get_config( - input_dir - ).joinpath(fn) + param_lst[param_idx] = model.get_config( + input_dir + ).joinpath(fn) + else: + if isinstance(param, dict): + param_lst[param_idx][list(param.keys())[fn_idx]] = Path(fn) else: - if isinstance(param, dict): - param_lst[param_idx][list(param.keys())[fn_idx]] = Path(fn) - else: - param_lst[param_idx] = Path(fn) - try: - if isinstance(param, dict): - assert isinstance( - param_lst[param_idx][list(param.keys())[fn_idx]], Path - ) - else: - assert isinstance(param_lst[param_idx], Path) - except AssertionError: - if input_dir is None: - raise TypeError( - f"The file indicated by the '{name}' parameter does not" - f" exist in the directory '{root}'." - ) - else: - raise TypeError( - f"The file indicated by the '{name}' parameter does not" - f" exist in either of the directories '{root}' or " - f"'{self.get_config(input_dir)}'." - ) - - def check_uniqueness(self, *args, file_type=None, filename=None): - """TODO: this function should be checked""" - - args = list(args) - if len(args) == 1 and "." in args[0]: - args = args[0].split(".") + args[1:] - branch = args.pop(-1) - for key in args[::-1]: - branch = {key: branch} - - if args[0].get_config(args[0], args[1]): - for key in self.staticmaps.data_vars: - if filename == key: - raise ValueError( - f"The filenames of the {file_type} maps should be unique." + param_lst[param_idx] = Path(fn) + try: + if isinstance(param, dict): + assert isinstance( + param_lst[param_idx][list(param.keys())[fn_idx]], Path + ) + else: + assert isinstance(param_lst[param_idx], Path) + except AssertionError: + if input_dir is None: + raise TypeError( + f"The file indicated by the '{name}' parameter does not" + f" exist in the directory '{root}'." + ) + else: + raise TypeError( + f"The file indicated by the '{name}' parameter does not" + f" exist in either of the directories '{root}' or " + f"'{model.get_config(input_dir)}'." ) - if ( - self.get_config(args[0], args[1], key) - == list(branch[args[0]][args[1]].values())[0] - ): - raise ValueError("Each model input layers must be unique.") - def check_param_type(self, param, name=None, types=None): - """ """ +# def check_uniqueness(model, *args, file_type=None, filename=None): +# """TODO: this function should be checked""" - if not isinstance(param, list): - raise TypeError( - f"The '{name}_lst' parameter should be a of {list}, received a " - f"{type(param)} instead." - ) - for i in param: - if not isinstance(i, types): - if isinstance(types, tuple): - types = " or ".join([str(j) for j in types]) - else: - types = types - raise TypeError( - f"The '{name}' parameter should be a of {types}, received a " - f"{type(i)} instead." +# args = list(args) +# if len(args) == 1 and "." in args[0]: +# args = args[0].split(".") + args[1:] +# branch = args.pop(-1) +# for key in args[::-1]: +# branch = {key: branch} + +# if args[0].get_config(args[0], args[1]): +# for key in model.staticmaps.data_vars: +# if filename == key: +# raise ValueError( +# f"The filenames of the {file_type} maps should be unique." +# ) +# if ( +# model.get_config(args[0], args[1], key) +# == list(branch[args[0]][args[1]].values())[0] +# ): +# raise ValueError("Each model input layers must be unique.") + + +def check_uniqueness(model, *args, file_type=None, filename=None): + """ """ + + args = list(args) + if len(args) == 1 and "." in args[0]: + args = args[0].split(".") + args[1:] + branch = args.pop(-1) + for key in args[::-1]: + branch = {key: branch} + + if model.get_config(args[0], args[1]): + for key in model.staticmaps.data_vars: + if filename == key: + raise ValueError( + f"The filenames of the {file_type} maps should be unique." ) + if ( + model.get_config(args[0], args[1], key) + == list(branch[args[0]][args[1]].values())[0] + ): + raise ValueError(f"Each model input layers must be unique.") + - def get_param(self, param_lst, map_fn_lst, file_type, filename, i, param_name): - """ """ - - if len(param_lst) == 1: - return param_lst[0] - elif len(param_lst) != 1 and len(map_fn_lst) == len(param_lst): - return param_lst[i] - elif len(param_lst) != 1 and len(map_fn_lst) != len(param_lst): - raise IndexError( - f"Could not derive the {param_name} parameter for {file_type} " - f"map: {filename}." +def check_param_type(param, name=None, types=None): + """ """ + + if not isinstance(param, list): + raise TypeError( + f"The '{name}_lst' parameter should be a of {list}, received a " + f"{type(param)} instead." + ) + for i in param: + if not isinstance(i, types): + if isinstance(types, tuple): + types = " or ".join([str(j) for j in types]) + else: + types = types + raise TypeError( + f"The '{name}' parameter should be a of {types}, received a " + f"{type(i)} instead." ) + + + + +def get_param(param_lst, map_fn_lst, file_type, filename, i, param_name): + """ """ + + if len(param_lst) == 1: + return param_lst[0] + elif len(param_lst) != 1 and len(map_fn_lst) == len(param_lst): + return param_lst[i] + elif len(param_lst) != 1 and len(map_fn_lst) != len(param_lst): + raise IndexError( + f"Could not derive the {param_name} parameter for {file_type} " + f"map: {filename}." + ) \ No newline at end of file diff --git a/hydromt_fiat/validation_functions.py b/hydromt_fiat/validation_functions.py deleted file mode 100644 index 86ca18f7..00000000 --- a/hydromt_fiat/validation_functions.py +++ /dev/null @@ -1,144 +0,0 @@ -from pathlib import Path - -def check_dir_exist(dir, name=None): - """ """ - - if not isinstance(dir, Path): - raise TypeError( - f"The directory indicated by the '{name}' parameter does not exist." - ) - - -def check_file_exist(model, param_lst, name=None, input_dir=None): - """ """ - root = Path(model.root) - param_lst = [Path(p) for p in param_lst] - for param_idx, param in enumerate(param_lst): - if isinstance(param, dict): - fn_lst = list(param.values()) - else: - fn_lst = [param] - for fn_idx, fn in enumerate(fn_lst): - if not Path(fn).is_file(): - if root.joinpath(fn).is_file(): - if isinstance(param, dict): - param_lst[param_idx][ - list(param.keys())[fn_idx] - ] = root.joinpath(fn) - else: - param_lst[param_idx] = root.joinpath(fn) - if input_dir is not None: - if model.get_config(input_dir).joinpath(fn).is_file(): - if isinstance(param, dict): - param_lst[param_idx][ - list(param.keys())[fn_idx] - ] = model.get_config(input_dir).joinpath(fn) - else: - param_lst[param_idx] = model.get_config( - input_dir - ).joinpath(fn) - else: - if isinstance(param, dict): - param_lst[param_idx][list(param.keys())[fn_idx]] = Path(fn) - else: - param_lst[param_idx] = Path(fn) - try: - if isinstance(param, dict): - assert isinstance( - param_lst[param_idx][list(param.keys())[fn_idx]], Path - ) - else: - assert isinstance(param_lst[param_idx], Path) - except AssertionError: - if input_dir is None: - raise TypeError( - f"The file indicated by the '{name}' parameter does not" - f" exist in the directory '{root}'." - ) - else: - raise TypeError( - f"The file indicated by the '{name}' parameter does not" - f" exist in either of the directories '{root}' or " - f"'{model.get_config(input_dir)}'." - ) - -# def check_uniqueness(model, *args, file_type=None, filename=None): -# """TODO: this function should be checked""" - -# args = list(args) -# if len(args) == 1 and "." in args[0]: -# args = args[0].split(".") + args[1:] -# branch = args.pop(-1) -# for key in args[::-1]: -# branch = {key: branch} - -# if args[0].get_config(args[0], args[1]): -# for key in model.staticmaps.data_vars: -# if filename == key: -# raise ValueError( -# f"The filenames of the {file_type} maps should be unique." -# ) -# if ( -# model.get_config(args[0], args[1], key) -# == list(branch[args[0]][args[1]].values())[0] -# ): -# raise ValueError("Each model input layers must be unique.") - - -def check_uniqueness(model, *args, file_type=None, filename=None): - """ """ - - args = list(args) - if len(args) == 1 and "." in args[0]: - args = args[0].split(".") + args[1:] - branch = args.pop(-1) - for key in args[::-1]: - branch = {key: branch} - - if model.get_config(args[0], args[1]): - for key in model.staticmaps.data_vars: - if filename == key: - raise ValueError( - f"The filenames of the {file_type} maps should be unique." - ) - if ( - model.get_config(args[0], args[1], key) - == list(branch[args[0]][args[1]].values())[0] - ): - raise ValueError(f"Each model input layers must be unique.") - - -def check_param_type(param, name=None, types=None): - """ """ - - if not isinstance(param, list): - raise TypeError( - f"The '{name}_lst' parameter should be a of {list}, received a " - f"{type(param)} instead." - ) - for i in param: - if not isinstance(i, types): - if isinstance(types, tuple): - types = " or ".join([str(j) for j in types]) - else: - types = types - raise TypeError( - f"The '{name}' parameter should be a of {types}, received a " - f"{type(i)} instead." - ) - - - - -def get_param(param_lst, map_fn_lst, file_type, filename, i, param_name): - """ """ - - if len(param_lst) == 1: - return param_lst[0] - elif len(param_lst) != 1 and len(map_fn_lst) == len(param_lst): - return param_lst[i] - elif len(param_lst) != 1 and len(map_fn_lst) != len(param_lst): - raise IndexError( - f"Could not derive the {param_name} parameter for {file_type} " - f"map: {filename}." - ) \ No newline at end of file diff --git a/hydromt_fiat/workflows/hazard.py b/hydromt_fiat/workflows/hazard.py index 56db2a60..d6895def 100644 --- a/hydromt_fiat/workflows/hazard.py +++ b/hydromt_fiat/workflows/hazard.py @@ -1,342 +1,332 @@ -from hydromt_fiat.validation_functions import * +from hydromt_fiat.validation import * from pathlib import Path import geopandas as gpd from ast import literal_eval import os import xarray as xr - -class Hazard: - def __init__(self): - self.crs = "" - self.map_fn_lst = [] - self.map_type_lst = [] - - def get_lists( - self, - map_fn, - map_type, - chunks, - rp, - crs, - nodata, - var, - - ): - dict_lists = dict() +def get_lists( + map_fn, + map_type, + chunks, + rp, + crs, + nodata, + var, - def validate_param(dictionary, param, name, types): - param_lst = [param] if isinstance(param, types) else param - check_param_type(param_lst, name=name, types=types) - dictionary[name+'_lst'] = param_lst - return - - validate_param(dict_lists, map_fn, name="map_fn", types=(str, Path)) - validate_param(dict_lists, map_type, name="map_type", types=str) - if chunks != "auto": - validate_param(dict_lists, chunks, name="chunks", types=(int, dict)) - if rp is not None: - validate_param(dict_lists, rp, name="rp", types=(float, int)) - if crs is not None: - validate_param(dict_lists, crs, name="crs", types=(int, str)) - if nodata is not None: - validate_param(dict_lists, nodata, name="nodata", types=(float, int)) - if var is not None: - validate_param(dict_lists, var, name="var", types=str) - - return dict_lists +): + dict_lists = dict() - def check_parameters( - self, - dict_lists, - model, - chunks, - rp, - crs, - nodata, - var, - ): - - def error_message(variable_list): - raise IndexError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") - # raise TypeError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") + def validate_param(dictionary, param, name, types): + param_lst = [param] if isinstance(param, types) else param + check_param_type(param_lst, name=name, types=types) + dictionary[name+'_lst'] = param_lst + return + + validate_param(dict_lists, map_fn, name="map_fn", types=(str, Path)) + validate_param(dict_lists, map_type, name="map_type", types=str) + if chunks != "auto": + validate_param(dict_lists, chunks, name="chunks", types=(int, dict)) + if rp is not None: + validate_param(dict_lists, rp, name="rp", types=(float, int)) + if crs is not None: + validate_param(dict_lists, crs, name="crs", types=(int, str)) + if nodata is not None: + validate_param(dict_lists, nodata, name="nodata", types=(float, int)) + if var is not None: + validate_param(dict_lists, var, name="var", types=str) + + return dict_lists + +def check_parameters( + dict_lists, + model, + chunks, + rp, + crs, + nodata, + var, +): + + def error_message(variable_list): + raise IndexError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") + # raise TypeError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") - # Checks the hazard input parameter types. + # Checks the hazard input parameter types. - # Checks map path list - map_fn_lst = dict_lists['map_fn_lst'] + # Checks map path list + map_fn_lst = dict_lists['map_fn_lst'] - # Checks map path list - if not len(dict_lists['map_type_lst']) == 1 and not len(dict_lists['map_type_lst']) == len(map_fn_lst): - error_message("map_type") + # Checks map path list + if not len(dict_lists['map_type_lst']) == 1 and not len(dict_lists['map_type_lst']) == len(map_fn_lst): + error_message("map_type") + + # Checks the chunk list. The list must be equal to the number of maps. + if chunks != "auto": + if not len(dict_lists['chunks_lst']) == 1 and not len(dict_lists['chunks_lst']) == len(map_fn_lst): + error_message("chunks") - # Checks the chunk list. The list must be equal to the number of maps. - if chunks != "auto": - if not len(dict_lists['chunks_lst']) == 1 and not len(dict_lists['chunks_lst']) == len(map_fn_lst): - error_message("chunks") - - # Checks the return period list. The list must be equal to the number of maps. - if rp is not None: - if not len(dict_lists['rp_lst']) == len(map_fn_lst): - error_message("rp") - - # Checks the projection list - if crs is not None: - if not len(dict_lists['crs_lst']) == 1 and not len(dict_lists['crs_lst']) == len(map_fn_lst): - error_message("crs") - - # Checks the no data list - if nodata is not None: - if not len(dict_lists['nodata_lst']) == 1 and not len(dict_lists['nodata_lst']) == len(map_fn_lst): - error_message("nodata") - - # Checks the var list - if var is not None: - if not len(dict_lists['var_lst']) == 1 and not len(dict_lists['var_lst']) == len(map_fn_lst): - error_message('var') - - # Check if the hazard input files exist. - check_file_exist(model, param_lst=map_fn_lst, name="map_fn") - - def process_maps( - self, - dict_lists, - model, - name_catalog, - hazard_type, - risk_output, - crs, - nodata, - var, - chunks, - region=gpd.GeoDataFrame(), - **kwargs, - ): - map_fn_lst = dict_lists['map_fn_lst'] - map_type_lst = dict_lists['map_type_lst'] - - list_names = [] - list_rp = [] - - for idx, da_map_fn in enumerate(map_fn_lst): - - # Check if it is a path or a name from the catalog - if os.path.exists(da_map_fn): - da_map_fn = Path(da_map_fn) - da_name = da_map_fn.stem - da_suffix = da_map_fn.suffix - list_names.append(da_name) - else: - da_name = da_map_fn - list_names.append(da_name) - - da_type = get_param( - map_type_lst, - map_fn_lst, - "hazard", - da_name, - idx, - "map type" - ) - - # Get the local hazard map. - kwargs.update(chunks=chunks if chunks == "auto" else dict_lists['chunks_lst'][idx]) - - if "da_suffix" in locals() and da_suffix == ".nc": - if var is None: - raise ValueError( - "The 'var' parameter is required when reading NetCDF data." - ) - da_var = get_param( - dict_lists['var_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "NetCDF variable", + # Checks the return period list. The list must be equal to the number of maps. + if rp is not None: + if not len(dict_lists['rp_lst']) == len(map_fn_lst): + error_message("rp") + + # Checks the projection list + if crs is not None: + if not len(dict_lists['crs_lst']) == 1 and not len(dict_lists['crs_lst']) == len(map_fn_lst): + error_message("crs") + + # Checks the no data list + if nodata is not None: + if not len(dict_lists['nodata_lst']) == 1 and not len(dict_lists['nodata_lst']) == len(map_fn_lst): + error_message("nodata") + + # Checks the var list + if var is not None: + if not len(dict_lists['var_lst']) == 1 and not len(dict_lists['var_lst']) == len(map_fn_lst): + error_message('var') + + # Check if the hazard input files exist. + check_file_exist(model, param_lst=map_fn_lst, name="map_fn") + +def process_maps( + dict_lists, + model, + name_catalog, + hazard_type, + risk_output, + crs, + nodata, + var, + chunks, + region=gpd.GeoDataFrame(), + **kwargs, +): + map_fn_lst = dict_lists['map_fn_lst'] + map_type_lst = dict_lists['map_type_lst'] + + list_names = [] + list_rp = [] + + for idx, da_map_fn in enumerate(map_fn_lst): + + # Check if it is a path or a name from the catalog + if os.path.exists(da_map_fn): + da_map_fn = Path(da_map_fn) + da_name = da_map_fn.stem + da_suffix = da_map_fn.suffix + list_names.append(da_name) + else: + da_name = da_map_fn + list_names.append(da_name) + + da_type = get_param( + map_type_lst, + map_fn_lst, + "hazard", + da_name, + idx, + "map type" + ) + + # Get the local hazard map. + kwargs.update(chunks=chunks if chunks == "auto" else dict_lists['chunks_lst'][idx]) + + if "da_suffix" in locals() and da_suffix == ".nc": + if var is None: + raise ValueError( + "The 'var' parameter is required when reading NetCDF data." ) - kwargs.update(variables=da_var) - - # reading from path - if da_map_fn.stem: - if da_map_fn.stem == "sfincs_map": - ds_map = xr.open_dataset(da_map_fn) - da = ds_map[kwargs["variables"]].squeeze(dim="timemax").drop_vars("timemax") - da.raster.set_crs(ds_map.crs.epsg_code) - da.raster.set_nodata(nodata=ds_map.encoding.get("_FillValue")) - da.reset_coords(['spatial_ref'], drop=False) - da.encoding["_FillValue"] = None + da_var = get_param( + dict_lists['var_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "NetCDF variable", + ) + kwargs.update(variables=da_var) + + # reading from path + if da_map_fn.stem: + if da_map_fn.stem == "sfincs_map": + ds_map = xr.open_dataset(da_map_fn) + da = ds_map[kwargs["variables"]].squeeze(dim="timemax").drop_vars("timemax") + da.raster.set_crs(ds_map.crs.epsg_code) + da.raster.set_nodata(nodata=ds_map.encoding.get("_FillValue")) + da.reset_coords(['spatial_ref'], drop=False) + da.encoding["_FillValue"] = None - else: - if not region.empty: - da = model.data_catalog.get_rasterdataset( - da_map_fn, geom=region, **kwargs - ) - else: - da = model.data_catalog.get_rasterdataset(da_map_fn, **kwargs) - # reading from the datacatalog else: if not region.empty: da = model.data_catalog.get_rasterdataset( - name_catalog, variables=da_name, geom=region + da_map_fn, geom=region, **kwargs ) else: - da = model.data_catalog.get_rasterdataset( - name_catalog, variables=da_name - ) - - # Set the coordinate reference system. - if crs is not None: - da_crs = get_param( - dict_lists['crs_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "coordinate reference system", + da = model.data_catalog.get_rasterdataset(da_map_fn, **kwargs) + # reading from the datacatalog + else: + if not region.empty: + da = model.data_catalog.get_rasterdataset( + name_catalog, variables=da_name, geom=region ) - da_crs_str = da_crs if "EPSG" in da_crs else f"EPSG:{da_crs}" - da.raster.set_crs(da_crs_str) - elif crs is None and not da.raster.crs: - raise ValueError( - "The hazard map has no coordinate reference system assigned." + else: + da = model.data_catalog.get_rasterdataset( + name_catalog, variables=da_name ) - # Set nodata and mask the nodata value. - if nodata is not None: - da_nodata = get_param( - dict_lists['nodata_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "nodata" - ) - da.raster.set_nodata(nodata=da_nodata) - elif nodata is None and da.raster.nodata is None: - raise ValueError("The hazard map has no nodata value assigned.") + # Set the coordinate reference system. + if crs is not None: + da_crs = get_param( + dict_lists['crs_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "coordinate reference system", + ) + da_crs_str = da_crs if "EPSG" in da_crs else f"EPSG:{da_crs}" + da.raster.set_crs(da_crs_str) + elif crs is None and not da.raster.crs: + raise ValueError( + "The hazard map has no coordinate reference system assigned." + ) + + # Set nodata and mask the nodata value. + if nodata is not None: + da_nodata = get_param( + dict_lists['nodata_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "nodata" + ) + da.raster.set_nodata(nodata=da_nodata) + elif nodata is None and da.raster.nodata is None: + raise ValueError("The hazard map has no nodata value assigned.") + + + # Correct (if necessary) the grid orientation from the lower to the upper left corner. + # This check could not be implemented into the sfincs_map outputs. They require to be transformed to geotiff first + if da_name != "sfincs_map": + if da.raster.res[1] > 0: + da = da.reindex( + {da.raster.y_dim: list(reversed(da.raster.ycoords))} + ) + # Check if the obtained hazard map is identical. + if model.staticmaps and not model.staticmaps.raster.identical_grid(da): + raise ValueError("The hazard maps should have identical grids.") + + # Get the return period input parameter. + if 'rp_lst' in dict_lists: + da_rp = get_param( + dict_lists['rp_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "return period", + ) + else: + da_rp =None - # Correct (if necessary) the grid orientation from the lower to the upper left corner. - # This check could not be implemented into the sfincs_map outputs. They require to be transformed to geotiff first - if da_name != "sfincs_map": - if da.raster.res[1] > 0: - da = da.reindex( - {da.raster.y_dim: list(reversed(da.raster.ycoords))} - ) + if risk_output: + da = da.expand_dims({'rp': [da_rp]}, axis=0) + + if risk_output and da_rp is None: + # Get (if possible) the return period from dataset names if the input parameter is None. + if "rp" in da_name.lower(): + + def fstrip(x): + return x in "0123456789." + + rp_str = "".join( + filter(fstrip, da_name.lower().split("rp")[-1]) + ).lstrip("0") - # Check if the obtained hazard map is identical. - if model.staticmaps and not model.staticmaps.raster.identical_grid(da): - raise ValueError("The hazard maps should have identical grids.") - - # Get the return period input parameter. - if 'rp_lst' in dict_lists: - da_rp = get_param( - dict_lists['rp_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "return period", + try: + assert isinstance( + literal_eval(rp_str) if rp_str else None, (int, float) ) - else: - da_rp =None - - if risk_output: - da = da.expand_dims({'rp': [da_rp]}, axis=0) - - if risk_output and da_rp is None: - # Get (if possible) the return period from dataset names if the input parameter is None. - if "rp" in da_name.lower(): - - def fstrip(x): - return x in "0123456789." - - rp_str = "".join( - filter(fstrip, da_name.lower().split("rp")[-1]) - ).lstrip("0") - - try: - assert isinstance( - literal_eval(rp_str) if rp_str else None, (int, float) - ) - da_rp = literal_eval(rp_str) - list_rp.append(da_rp) - - except AssertionError: - raise ValueError( - f"Could not derive the return period for hazard map: {da_name}." - ) - else: + da_rp = literal_eval(rp_str) + list_rp.append(da_rp) + + except AssertionError: raise ValueError( - "The hazard map must contain a return period in order to conduct a risk calculation." + f"Could not derive the return period for hazard map: {da_name}." ) + else: + raise ValueError( + "The hazard map must contain a return period in order to conduct a risk calculation." + ) - # Add the hazard map to config and staticmaps. - check_uniqueness( - model, - "hazard", - da_type, - da_name, - { - "usage": True, - "map_fn": da_map_fn, - "map_type": da_type, - "rp": da_rp, - "crs": da.raster.crs, - "nodata": da.raster.nodata, - # "var": None if "var_lst" not in locals() else self.var_lst[idx], - "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], - "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], - }, - file_type="hazard", - filename=da_name, - ) - - model.set_config( - "hazard", - da_type, - da_name, - { - "usage": "True", - "map_fn": da_map_fn, - "map_type": da_type, - "rp": da_rp, - "crs": da.raster.crs, - "nodata": da.raster.nodata, - # "var": None if "var_lst" not in locals() else self.var_lst[idx], - "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], - "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], - }, - ) - - model.set_maps(da, da_name) - post = f"(rp {da_rp})" if risk_output else "" - model.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}") + # Add the hazard map to config and staticmaps. + check_uniqueness( + model, + "hazard", + da_type, + da_name, + { + "usage": True, + "map_fn": da_map_fn, + "map_type": da_type, + "rp": da_rp, + "crs": da.raster.crs, + "nodata": da.raster.nodata, + # "var": None if "var_lst" not in locals() else self.var_lst[idx], + "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], + "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], + }, + file_type="hazard", + filename=da_name, + ) + + model.set_config( + "hazard", + da_type, + da_name, + { + "usage": "True", + "map_fn": da_map_fn, + "map_type": da_type, + "rp": da_rp, + "crs": da.raster.crs, + "nodata": da.raster.nodata, + # "var": None if "var_lst" not in locals() else self.var_lst[idx], + "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], + "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], + }, + ) + + model.set_maps(da, da_name) + post = f"(rp {da_rp})" if risk_output else "" + model.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}") + + if risk_output: + maps = model.maps + list_keys = list(maps.keys()) + maps_0 = maps[list_keys[0]].rename('risk') + list_keys.pop(0) + + for idx, x in enumerate(list_keys): + key_name = list_keys[idx] + layer = maps[key_name] + maps_0 = xr.concat([maps_0, layer], dim='rp') + + new_da = maps_0.to_dataset(name='RISK') + new_da.attrs = { "returnperiod": list(list_rp), + "type":self.map_type_lst, + 'name':list_names, + "Analysis": "Risk"} + + model.hazard = new_da + model.set_maps(model.hazard, 'HydroMT_Fiat_hazard') + + list_maps = list(model.maps.keys()) if risk_output: - maps = model.maps - list_keys = list(maps.keys()) - maps_0 = maps[list_keys[0]].rename('risk') - list_keys.pop(0) - - for idx, x in enumerate(list_keys): - key_name = list_keys[idx] - layer = maps[key_name] - maps_0 = xr.concat([maps_0, layer], dim='rp') - - new_da = maps_0.to_dataset(name='RISK') - new_da.attrs = { "returnperiod": list(list_rp), - "type":self.map_type_lst, - 'name':list_names, - "Analysis": "Risk"} - - model.hazard = new_da - model.set_maps(model.hazard, 'HydroMT_Fiat_hazard') - - list_maps = list(model.maps.keys()) - - if risk_output: - for item in list_maps[:-1]: - model.maps.pop(item) + for item in list_maps[:-1]: + model.maps.pop(item) diff --git a/hydromt_fiat/workflows/hazard_old.py b/hydromt_fiat/workflows/hazard_old.py new file mode 100644 index 00000000..16645b9b --- /dev/null +++ b/hydromt_fiat/workflows/hazard_old.py @@ -0,0 +1,342 @@ +from hydromt_fiat.validation import * +from pathlib import Path +import geopandas as gpd +from ast import literal_eval +import os +import xarray as xr + + +class Hazard: + def __init__(self): + self.crs = "" + self.map_fn_lst = [] + self.map_type_lst = [] + + def get_lists( + self, + map_fn, + map_type, + chunks, + rp, + crs, + nodata, + var, + + ): + dict_lists = dict() + + def validate_param(dictionary, param, name, types): + param_lst = [param] if isinstance(param, types) else param + check_param_type(param_lst, name=name, types=types) + dictionary[name+'_lst'] = param_lst + return + + validate_param(dict_lists, map_fn, name="map_fn", types=(str, Path)) + validate_param(dict_lists, map_type, name="map_type", types=str) + if chunks != "auto": + validate_param(dict_lists, chunks, name="chunks", types=(int, dict)) + if rp is not None: + validate_param(dict_lists, rp, name="rp", types=(float, int)) + if crs is not None: + validate_param(dict_lists, crs, name="crs", types=(int, str)) + if nodata is not None: + validate_param(dict_lists, nodata, name="nodata", types=(float, int)) + if var is not None: + validate_param(dict_lists, var, name="var", types=str) + + return dict_lists + + def check_parameters( + self, + dict_lists, + model, + chunks, + rp, + crs, + nodata, + var, + ): + + def error_message(variable_list): + raise IndexError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") + # raise TypeError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") + + # Checks the hazard input parameter types. + + # Checks map path list + map_fn_lst = dict_lists['map_fn_lst'] + + # Checks map path list + if not len(dict_lists['map_type_lst']) == 1 and not len(dict_lists['map_type_lst']) == len(map_fn_lst): + error_message("map_type") + + # Checks the chunk list. The list must be equal to the number of maps. + if chunks != "auto": + if not len(dict_lists['chunks_lst']) == 1 and not len(dict_lists['chunks_lst']) == len(map_fn_lst): + error_message("chunks") + + # Checks the return period list. The list must be equal to the number of maps. + if rp is not None: + if not len(dict_lists['rp_lst']) == len(map_fn_lst): + error_message("rp") + + # Checks the projection list + if crs is not None: + if not len(dict_lists['crs_lst']) == 1 and not len(dict_lists['crs_lst']) == len(map_fn_lst): + error_message("crs") + + # Checks the no data list + if nodata is not None: + if not len(dict_lists['nodata_lst']) == 1 and not len(dict_lists['nodata_lst']) == len(map_fn_lst): + error_message("nodata") + + # Checks the var list + if var is not None: + if not len(dict_lists['var_lst']) == 1 and not len(dict_lists['var_lst']) == len(map_fn_lst): + error_message('var') + + # Check if the hazard input files exist. + check_file_exist(model, param_lst=map_fn_lst, name="map_fn") + + def process_maps( + self, + dict_lists, + model, + name_catalog, + hazard_type, + risk_output, + crs, + nodata, + var, + chunks, + region=gpd.GeoDataFrame(), + **kwargs, + ): + map_fn_lst = dict_lists['map_fn_lst'] + map_type_lst = dict_lists['map_type_lst'] + + list_names = [] + list_rp = [] + + for idx, da_map_fn in enumerate(map_fn_lst): + + # Check if it is a path or a name from the catalog + if os.path.exists(da_map_fn): + da_map_fn = Path(da_map_fn) + da_name = da_map_fn.stem + da_suffix = da_map_fn.suffix + list_names.append(da_name) + else: + da_name = da_map_fn + list_names.append(da_name) + + da_type = get_param( + map_type_lst, + map_fn_lst, + "hazard", + da_name, + idx, + "map type" + ) + + # Get the local hazard map. + kwargs.update(chunks=chunks if chunks == "auto" else dict_lists['chunks_lst'][idx]) + + if "da_suffix" in locals() and da_suffix == ".nc": + if var is None: + raise ValueError( + "The 'var' parameter is required when reading NetCDF data." + ) + da_var = get_param( + dict_lists['var_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "NetCDF variable", + ) + kwargs.update(variables=da_var) + + # reading from path + if da_map_fn.stem: + if da_map_fn.stem == "sfincs_map": + ds_map = xr.open_dataset(da_map_fn) + da = ds_map[kwargs["variables"]].squeeze(dim="timemax").drop_vars("timemax") + da.raster.set_crs(ds_map.crs.epsg_code) + da.raster.set_nodata(nodata=ds_map.encoding.get("_FillValue")) + da.reset_coords(['spatial_ref'], drop=False) + da.encoding["_FillValue"] = None + + else: + if not region.empty: + da = model.data_catalog.get_rasterdataset( + da_map_fn, geom=region, **kwargs + ) + else: + da = model.data_catalog.get_rasterdataset(da_map_fn, **kwargs) + # reading from the datacatalog + else: + if not region.empty: + da = model.data_catalog.get_rasterdataset( + name_catalog, variables=da_name, geom=region + ) + else: + da = model.data_catalog.get_rasterdataset( + name_catalog, variables=da_name + ) + + # Set the coordinate reference system. + if crs is not None: + da_crs = get_param( + dict_lists['crs_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "coordinate reference system", + ) + da_crs_str = da_crs if "EPSG" in da_crs else f"EPSG:{da_crs}" + da.raster.set_crs(da_crs_str) + elif crs is None and not da.raster.crs: + raise ValueError( + "The hazard map has no coordinate reference system assigned." + ) + + # Set nodata and mask the nodata value. + if nodata is not None: + da_nodata = get_param( + dict_lists['nodata_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "nodata" + ) + da.raster.set_nodata(nodata=da_nodata) + elif nodata is None and da.raster.nodata is None: + raise ValueError("The hazard map has no nodata value assigned.") + + + # Correct (if necessary) the grid orientation from the lower to the upper left corner. + # This check could not be implemented into the sfincs_map outputs. They require to be transformed to geotiff first + if da_name != "sfincs_map": + if da.raster.res[1] > 0: + da = da.reindex( + {da.raster.y_dim: list(reversed(da.raster.ycoords))} + ) + + # Check if the obtained hazard map is identical. + if model.staticmaps and not model.staticmaps.raster.identical_grid(da): + raise ValueError("The hazard maps should have identical grids.") + + # Get the return period input parameter. + if 'rp_lst' in dict_lists: + da_rp = get_param( + dict_lists['rp_lst'], + map_fn_lst, + "hazard", + da_name, + idx, + "return period", + ) + else: + da_rp =None + + if risk_output: + da = da.expand_dims({'rp': [da_rp]}, axis=0) + + if risk_output and da_rp is None: + # Get (if possible) the return period from dataset names if the input parameter is None. + if "rp" in da_name.lower(): + + def fstrip(x): + return x in "0123456789." + + rp_str = "".join( + filter(fstrip, da_name.lower().split("rp")[-1]) + ).lstrip("0") + + try: + assert isinstance( + literal_eval(rp_str) if rp_str else None, (int, float) + ) + da_rp = literal_eval(rp_str) + list_rp.append(da_rp) + + except AssertionError: + raise ValueError( + f"Could not derive the return period for hazard map: {da_name}." + ) + else: + raise ValueError( + "The hazard map must contain a return period in order to conduct a risk calculation." + ) + + # Add the hazard map to config and staticmaps. + check_uniqueness( + model, + "hazard", + da_type, + da_name, + { + "usage": True, + "map_fn": da_map_fn, + "map_type": da_type, + "rp": da_rp, + "crs": da.raster.crs, + "nodata": da.raster.nodata, + # "var": None if "var_lst" not in locals() else self.var_lst[idx], + "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], + "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], + }, + file_type="hazard", + filename=da_name, + ) + + model.set_config( + "hazard", + da_type, + da_name, + { + "usage": "True", + "map_fn": da_map_fn, + "map_type": da_type, + "rp": da_rp, + "crs": da.raster.crs, + "nodata": da.raster.nodata, + # "var": None if "var_lst" not in locals() else self.var_lst[idx], + "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], + "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], + }, + ) + + model.set_maps(da, da_name) + post = f"(rp {da_rp})" if risk_output else "" + model.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}") + + if risk_output: + maps = model.maps + list_keys = list(maps.keys()) + maps_0 = maps[list_keys[0]].rename('risk') + list_keys.pop(0) + + for idx, x in enumerate(list_keys): + key_name = list_keys[idx] + layer = maps[key_name] + maps_0 = xr.concat([maps_0, layer], dim='rp') + + new_da = maps_0.to_dataset(name='RISK') + new_da.attrs = { "returnperiod": list(list_rp), + "type":self.map_type_lst, + 'name':list_names, + "Analysis": "Risk"} + + model.hazard = new_da + model.set_maps(model.hazard, 'HydroMT_Fiat_hazard') + + list_maps = list(model.maps.keys()) + + if risk_output: + for item in list_maps[:-1]: + model.maps.pop(item) + diff --git a/tests/test_hazard.py b/tests/test_hazard.py index 559f7b01..577e8ac6 100644 --- a/tests/test_hazard.py +++ b/tests/test_hazard.py @@ -1,5 +1,5 @@ from hydromt_fiat.fiat import FiatModel -from hydromt_fiat.hazard_functions import * +from hydromt_fiat.workflows.hazard import * from hydromt.config import configread from pathlib import Path import pytest From 9c254601bec8fbc98261b09a458e67b2c0928843 Mon Sep 17 00:00:00 2001 From: Mares2022 Date: Tue, 6 Jun 2023 13:27:02 +0200 Subject: [PATCH 3/3] Keep hazard class it is needed in fiat.py Keep hazard class it is needed in fiat.py --- hydromt_fiat/workflows/hazard_old.py | 328 --------------------------- 1 file changed, 328 deletions(-) diff --git a/hydromt_fiat/workflows/hazard_old.py b/hydromt_fiat/workflows/hazard_old.py index 16645b9b..7714e94f 100644 --- a/hydromt_fiat/workflows/hazard_old.py +++ b/hydromt_fiat/workflows/hazard_old.py @@ -12,331 +12,3 @@ def __init__(self): self.map_fn_lst = [] self.map_type_lst = [] - def get_lists( - self, - map_fn, - map_type, - chunks, - rp, - crs, - nodata, - var, - - ): - dict_lists = dict() - - def validate_param(dictionary, param, name, types): - param_lst = [param] if isinstance(param, types) else param - check_param_type(param_lst, name=name, types=types) - dictionary[name+'_lst'] = param_lst - return - - validate_param(dict_lists, map_fn, name="map_fn", types=(str, Path)) - validate_param(dict_lists, map_type, name="map_type", types=str) - if chunks != "auto": - validate_param(dict_lists, chunks, name="chunks", types=(int, dict)) - if rp is not None: - validate_param(dict_lists, rp, name="rp", types=(float, int)) - if crs is not None: - validate_param(dict_lists, crs, name="crs", types=(int, str)) - if nodata is not None: - validate_param(dict_lists, nodata, name="nodata", types=(float, int)) - if var is not None: - validate_param(dict_lists, var, name="var", types=str) - - return dict_lists - - def check_parameters( - self, - dict_lists, - model, - chunks, - rp, - crs, - nodata, - var, - ): - - def error_message(variable_list): - raise IndexError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") - # raise TypeError(f"The number of '{variable_list}' parameters should match with the number of 'map_fn' parameters.") - - # Checks the hazard input parameter types. - - # Checks map path list - map_fn_lst = dict_lists['map_fn_lst'] - - # Checks map path list - if not len(dict_lists['map_type_lst']) == 1 and not len(dict_lists['map_type_lst']) == len(map_fn_lst): - error_message("map_type") - - # Checks the chunk list. The list must be equal to the number of maps. - if chunks != "auto": - if not len(dict_lists['chunks_lst']) == 1 and not len(dict_lists['chunks_lst']) == len(map_fn_lst): - error_message("chunks") - - # Checks the return period list. The list must be equal to the number of maps. - if rp is not None: - if not len(dict_lists['rp_lst']) == len(map_fn_lst): - error_message("rp") - - # Checks the projection list - if crs is not None: - if not len(dict_lists['crs_lst']) == 1 and not len(dict_lists['crs_lst']) == len(map_fn_lst): - error_message("crs") - - # Checks the no data list - if nodata is not None: - if not len(dict_lists['nodata_lst']) == 1 and not len(dict_lists['nodata_lst']) == len(map_fn_lst): - error_message("nodata") - - # Checks the var list - if var is not None: - if not len(dict_lists['var_lst']) == 1 and not len(dict_lists['var_lst']) == len(map_fn_lst): - error_message('var') - - # Check if the hazard input files exist. - check_file_exist(model, param_lst=map_fn_lst, name="map_fn") - - def process_maps( - self, - dict_lists, - model, - name_catalog, - hazard_type, - risk_output, - crs, - nodata, - var, - chunks, - region=gpd.GeoDataFrame(), - **kwargs, - ): - map_fn_lst = dict_lists['map_fn_lst'] - map_type_lst = dict_lists['map_type_lst'] - - list_names = [] - list_rp = [] - - for idx, da_map_fn in enumerate(map_fn_lst): - - # Check if it is a path or a name from the catalog - if os.path.exists(da_map_fn): - da_map_fn = Path(da_map_fn) - da_name = da_map_fn.stem - da_suffix = da_map_fn.suffix - list_names.append(da_name) - else: - da_name = da_map_fn - list_names.append(da_name) - - da_type = get_param( - map_type_lst, - map_fn_lst, - "hazard", - da_name, - idx, - "map type" - ) - - # Get the local hazard map. - kwargs.update(chunks=chunks if chunks == "auto" else dict_lists['chunks_lst'][idx]) - - if "da_suffix" in locals() and da_suffix == ".nc": - if var is None: - raise ValueError( - "The 'var' parameter is required when reading NetCDF data." - ) - da_var = get_param( - dict_lists['var_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "NetCDF variable", - ) - kwargs.update(variables=da_var) - - # reading from path - if da_map_fn.stem: - if da_map_fn.stem == "sfincs_map": - ds_map = xr.open_dataset(da_map_fn) - da = ds_map[kwargs["variables"]].squeeze(dim="timemax").drop_vars("timemax") - da.raster.set_crs(ds_map.crs.epsg_code) - da.raster.set_nodata(nodata=ds_map.encoding.get("_FillValue")) - da.reset_coords(['spatial_ref'], drop=False) - da.encoding["_FillValue"] = None - - else: - if not region.empty: - da = model.data_catalog.get_rasterdataset( - da_map_fn, geom=region, **kwargs - ) - else: - da = model.data_catalog.get_rasterdataset(da_map_fn, **kwargs) - # reading from the datacatalog - else: - if not region.empty: - da = model.data_catalog.get_rasterdataset( - name_catalog, variables=da_name, geom=region - ) - else: - da = model.data_catalog.get_rasterdataset( - name_catalog, variables=da_name - ) - - # Set the coordinate reference system. - if crs is not None: - da_crs = get_param( - dict_lists['crs_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "coordinate reference system", - ) - da_crs_str = da_crs if "EPSG" in da_crs else f"EPSG:{da_crs}" - da.raster.set_crs(da_crs_str) - elif crs is None and not da.raster.crs: - raise ValueError( - "The hazard map has no coordinate reference system assigned." - ) - - # Set nodata and mask the nodata value. - if nodata is not None: - da_nodata = get_param( - dict_lists['nodata_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "nodata" - ) - da.raster.set_nodata(nodata=da_nodata) - elif nodata is None and da.raster.nodata is None: - raise ValueError("The hazard map has no nodata value assigned.") - - - # Correct (if necessary) the grid orientation from the lower to the upper left corner. - # This check could not be implemented into the sfincs_map outputs. They require to be transformed to geotiff first - if da_name != "sfincs_map": - if da.raster.res[1] > 0: - da = da.reindex( - {da.raster.y_dim: list(reversed(da.raster.ycoords))} - ) - - # Check if the obtained hazard map is identical. - if model.staticmaps and not model.staticmaps.raster.identical_grid(da): - raise ValueError("The hazard maps should have identical grids.") - - # Get the return period input parameter. - if 'rp_lst' in dict_lists: - da_rp = get_param( - dict_lists['rp_lst'], - map_fn_lst, - "hazard", - da_name, - idx, - "return period", - ) - else: - da_rp =None - - if risk_output: - da = da.expand_dims({'rp': [da_rp]}, axis=0) - - if risk_output and da_rp is None: - # Get (if possible) the return period from dataset names if the input parameter is None. - if "rp" in da_name.lower(): - - def fstrip(x): - return x in "0123456789." - - rp_str = "".join( - filter(fstrip, da_name.lower().split("rp")[-1]) - ).lstrip("0") - - try: - assert isinstance( - literal_eval(rp_str) if rp_str else None, (int, float) - ) - da_rp = literal_eval(rp_str) - list_rp.append(da_rp) - - except AssertionError: - raise ValueError( - f"Could not derive the return period for hazard map: {da_name}." - ) - else: - raise ValueError( - "The hazard map must contain a return period in order to conduct a risk calculation." - ) - - # Add the hazard map to config and staticmaps. - check_uniqueness( - model, - "hazard", - da_type, - da_name, - { - "usage": True, - "map_fn": da_map_fn, - "map_type": da_type, - "rp": da_rp, - "crs": da.raster.crs, - "nodata": da.raster.nodata, - # "var": None if "var_lst" not in locals() else self.var_lst[idx], - "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], - "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], - }, - file_type="hazard", - filename=da_name, - ) - - model.set_config( - "hazard", - da_type, - da_name, - { - "usage": "True", - "map_fn": da_map_fn, - "map_type": da_type, - "rp": da_rp, - "crs": da.raster.crs, - "nodata": da.raster.nodata, - # "var": None if "var_lst" not in locals() else self.var_lst[idx], - "var": None if not 'var_lst' in dict_lists else dict_lists['var_lst'][idx], - "chunks": "auto" if chunks == "auto" else dict_lists['chunks_lst'][idx], - }, - ) - - model.set_maps(da, da_name) - post = f"(rp {da_rp})" if risk_output else "" - model.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}") - - if risk_output: - maps = model.maps - list_keys = list(maps.keys()) - maps_0 = maps[list_keys[0]].rename('risk') - list_keys.pop(0) - - for idx, x in enumerate(list_keys): - key_name = list_keys[idx] - layer = maps[key_name] - maps_0 = xr.concat([maps_0, layer], dim='rp') - - new_da = maps_0.to_dataset(name='RISK') - new_da.attrs = { "returnperiod": list(list_rp), - "type":self.map_type_lst, - 'name':list_names, - "Analysis": "Risk"} - - model.hazard = new_da - model.set_maps(model.hazard, 'HydroMT_Fiat_hazard') - - list_maps = list(model.maps.keys()) - - if risk_output: - for item in list_maps[:-1]: - model.maps.pop(item) -