diff --git a/echopop/analysis.py b/echopop/analysis.py index c96aec4e..28951c4d 100644 --- a/echopop/analysis.py +++ b/echopop/analysis.py @@ -39,7 +39,13 @@ optimize_variogram, ) from .statistics import stratified_transect_statistic -from .utils.validate_dict import VariogramEmpirical +from .utils.validate_dict import ( + KrigingAnalysis, + KrigingParameters, + MeshCrop, + VariogramBase, + VariogramEmpirical, +) def process_transect_data( @@ -355,6 +361,26 @@ def krige(input_dict: dict, analysis_dict: dict, settings_dict: dict) -> tuple[p arguments and user-defined inputs. """ + # Validate cropping method parameters + validated_cropping_methods = MeshCrop.create(**settings_dict["cropping_parameters"]) + # ---- Update the dictionary + settings_dict["cropping_parameters"].update({**validated_cropping_methods}) + + # Validate the variogram parameters + valid_variogram_parameters = VariogramBase.create(**settings_dict["variogram_parameters"]) + # ---- Update the dictionary + settings_dict["variogram_parameters"].update({**valid_variogram_parameters}) + + # Validate kriging parameters + valid_kriging_parameters = KrigingParameters.create( + **{**settings_dict["kriging_parameters"], **settings_dict["variogram_parameters"]} + ) + # ---- Update the dictionary + settings_dict["kriging_parameters"].update({**valid_kriging_parameters}) + + # Validate the additional kriging arguments + _ = KrigingAnalysis(**settings_dict["variogram_parameters"]) + # Extract kriging mesh data mesh_data = input_dict["statistics"]["kriging"]["mesh_df"] @@ -364,18 +390,22 @@ def krige(input_dict: dict, analysis_dict: dict, settings_dict: dict) -> tuple[p # Define the and prepare the processed and georeferenced transect data transect_data = edit_transect_columns(analysis_dict["transect"], settings_dict) - # Crop the mesh grid if the kriged data will not be extrapolated - if not settings_dict["extrapolate"]: - # ---- Compute the cropped mesh - mesh_full = crop_mesh(transect_data, mesh_data, settings_dict) - if (settings_dict["verbose"]) & (settings_dict["crop_method"] == "convex_hull"): - # ---- Print alert - print( - "Kriging mesh cropped to prevent extrapolation beyond the defined " - f"""`mesh_buffer_distance` value ({settings_dict['mesh_buffer_distance']} nmi).""" - ) + # Add kriging parameters to the settings config + settings_dict.update( + { + "kriging_parameters": { + **input_dict["statistics"]["kriging"]["model_config"], + **valid_kriging_parameters, + }, + "variogram_parameters": { + **settings_dict["variogram_parameters"], + **valid_variogram_parameters, + }, + }, + ) - else: + # Crop the mesh grid if the kriged data will not be extrapolated + if settings_dict["extrapolate"]: # ---- Else, extract original mesh dataframe mesh_df = mesh_data.copy() # ---- Extract longitude column name @@ -386,6 +416,17 @@ def krige(input_dict: dict, analysis_dict: dict, settings_dict: dict) -> tuple[p mesh_full = mesh_df.copy().rename( columns={f"{mesh_longitude}": "longitude", f"{mesh_latitude}": "latitude"} ) + else: + # ---- Compute the cropped mesh + mesh_full = crop_mesh(transect_data, mesh_data, validated_cropping_methods) + if (settings_dict["verbose"]) and ( + validated_cropping_methods["crop_method"] == "convex_hull" + ): + # ---- Print alert + print( + f"Kriging mesh cropped to prevent extrapolation beyond the defined " + f"`mesh_buffer_distance` value ({settings_dict['mesh_buffer_distance']} nmi)." + ) # Standardize the x- and y-coordinates, if necessary if settings_dict["standardize_coordinates"]: @@ -396,8 +437,8 @@ def krige(input_dict: dict, analysis_dict: dict, settings_dict: dict) -> tuple[p if settings_dict["verbose"]: # ---- Print alert print( - """Longitude and latitude coordinates (WGS84) converted to standardized """ - """coordinates (x and y).""" + "Longitude and latitude coordinates (WGS84) converted to standardized " + "coordinates (x and y)." ) else: # ---- Else, duplicate the transect longitude and latitude coordinates as 'x' and 'y' diff --git a/echopop/spatial/mesh.py b/echopop/spatial/mesh.py index 152cb6dc..fdb3d434 100644 --- a/echopop/spatial/mesh.py +++ b/echopop/spatial/mesh.py @@ -10,7 +10,7 @@ from ..spatial.transect import transect_bearing, transect_extent -def crop_mesh(transect_data: pd.DataFrame, mesh_data: pd.DataFrame, settings_dict: dict): +def crop_mesh(transect_data: pd.DataFrame, mesh_data: pd.DataFrame, cropping_parameters: dict): """ Crop survey kriging mesh. @@ -20,7 +20,7 @@ def crop_mesh(transect_data: pd.DataFrame, mesh_data: pd.DataFrame, settings_dic Georeferenced transect data. mesh_data: pd.DataFrame Kriging mesh. - settings_dict: dict + cropping_parameters: dict Dictionary containing relevant algorithm variables and arguments. """ # Rename the mesh coordinate names, if necessary @@ -35,11 +35,11 @@ def crop_mesh(transect_data: pd.DataFrame, mesh_data: pd.DataFrame, settings_dic # Select and return cropped mesh depending on the cropping method # ---- Interpolation - if settings_dict["crop_method"] == "transect_ends": - return transect_ends_crop_method(transect_data.copy(), mesh, settings_dict) + if cropping_parameters["crop_method"] == "transect_ends": + return transect_ends_crop_method(transect_data.copy(), mesh, cropping_parameters) # ---- Convex hull - elif settings_dict["crop_method"] == "convex_hull": - return hull_crop_method(transect_data.copy(), mesh, settings_dict) + elif cropping_parameters["crop_method"] == "convex_hull": + return hull_crop_method(transect_data.copy(), mesh, cropping_parameters) def hull_crop_method(transect_data: pd.DataFrame, mesh_data: pd.DataFrame, settings_dict: dict): @@ -89,7 +89,7 @@ def hull_crop_method(transect_data: pd.DataFrame, mesh_data: pd.DataFrame, setti def transect_ends_crop_method( - transect_data: pd.DataFrame, mesh_data: pd.DataFrame, settings_dict: dict + transect_data: pd.DataFrame, mesh_data: pd.DataFrame, cropping_parameters: dict ): """ Crop the kriging mesh by interpolating the eastern and western boundaries of the survey @@ -101,15 +101,15 @@ def transect_ends_crop_method( Georeferenced transect data. mesh_data: pd.DataFrame Kriging mesh. - settings_dict: dict + cropping_parameters: dict Dictionary containing relevant algorithm variables and arguments. """ # Extract the analysis settings # ---- Number of nearest transects - latitude_resolution = settings_dict["latitude_resolution"] + latitude_resolution = cropping_parameters["latitude_resolution"] # ---- Grid buffer distance (nmi) - bearing_tolerance = settings_dict["bearing_tolerance"] + bearing_tolerance = cropping_parameters["bearing_tolerance"] # Convert latitude resolution to degrees latitude latitude_resolution_deg = latitude_resolution / 60.0 @@ -272,7 +272,7 @@ def transect_ends_crop_method( ) # -------- Append the indices region_2_index.append(idx[0]) - # ---- Region 2 + # ---- Region 3 region_3_index = [] # -------- Compute the change in longitude (degrees) delta_longitude = latitude_resolution_deg * np.cos(np.radians(region_3_latitude)) @@ -291,17 +291,24 @@ def transect_ends_crop_method( if np.isnan(region_3_extents[0][i]) | np.isnan(region_3_extents[1][i]): # -------- Compute the indices for the northern- and southernmost coordinates # -------- North - lat_w_max = np.argmax(transect_west["latitude"]) + # lat_w_max = np.argmax(transect_west["latitude"]) + lat_w_max = np.argmin(transect_west["latitude"]) # -------- South - lat_e_max = np.argmax(transect_east["latitude"]) + # lat_e_max = np.argmax(transect_east["latitude"]) + lat_e_max = np.argmin(transect_east["latitude"]) # -------- Slope - slope = ( + # slope = ( + # transect_west["longitude"].iloc[lat_w_max] + # - transect_east["longitude"].iloc[lat_e_max] + # ) / (transect_west["latitude"].min() - transect_east["latitude"].min()) + slope = (transect_west["latitude"].min() - transect_east["latitude"].min()) / ( transect_west["longitude"].iloc[lat_w_max] - transect_east["longitude"].iloc[lat_e_max] - ) / (transect_west["latitude"].max() - transect_east["latitude"].max()) + ) # -------- Set a new border threshold longitude_slope_i = ( - slope * (region_3_latitude[i] - transect_east["latitude"].max()) + # slope * (region_3_latitude[i] - transect_east["latitude"].max()) + slope * (region_3_latitude[i] - transect_east["latitude"].min()) + transect_east["longitude"].iloc[lat_e_max] ) if np.isnan(region_3_extents[0][i]): diff --git a/echopop/survey.py b/echopop/survey.py index 85752068..95a3a137 100644 --- a/echopop/survey.py +++ b/echopop/survey.py @@ -18,7 +18,13 @@ from .spatial.projection import transform_geometry from .spatial.transect import edit_transect_columns from .utils import load as el, load_nasc as eln, message as em -from .utils.validate_dict import VariogramBase, VariogramInitial, VariogramOptimize +from .utils.validate_dict import ( + KrigingParameters, + MeshCrop, + VariogramBase, + VariogramInitial, + VariogramOptimize, +) class Survey: @@ -398,7 +404,7 @@ def fit_variogram( "hole_effect_range", "decay_power", ], - variable: Literal["biomass", "abundance"] = "biomass", + variable: Literal["biomass"] = "biomass", verbose: bool = True, ): """ @@ -461,7 +467,7 @@ def fit_variogram( """ # Validate "variable" input - if variable not in ["biomass", "abundance"]: + if variable not in ["biomass"]: raise ValueError( f"The user input for `variable` ({variable}) is invalid. Only `variable='biomass'` " f"and `variable='abundance'` are valid inputs for the `fit_variogram()` method." @@ -537,20 +543,13 @@ def fit_variogram( # !!! TODO: develop different name for "crop_method = 'interpolation'" def kriging_analysis( self, - bearing_tolerance: float = 15.0, + cropping_parameters: MeshCrop = {}, + kriging_parameters: KrigingParameters = {}, coordinate_transform: bool = True, - crop_method: Literal["transect_ends", "convex_hull"] = "transect_ends", extrapolate: bool = False, - best_fit_variogram: bool = True, - kriging_parameters: Optional[dict] = None, - latitude_resolution: float = 1.25, - mesh_buffer_distance: float = 1.25, - num_nearest_transects: int = 4, - projection: Optional[str] = None, - stratum: str = "ks", - variable: str = "biomass_density", - variogram_model: Union[str, List[str]] = ["bessel", "exponential"], - variogram_parameters: Optional[dict] = None, + best_fit_variogram: bool = False, + variable: Literal["biomass"] = "biomass", + variogram_parameters: Optional[VariogramBase] = None, verbose: bool = True, ): """ @@ -563,90 +562,80 @@ def kriging_analysis( Biological variable that will be interpolated via kriging """ - # Parameterize analysis settings that will be applied to the kriging analysis + # Populate settings dictionary with input argument values/entries self.analysis["settings"].update( { "kriging": { - "exclude_age1": self.analysis["settings"]["transect"]["exclude_age1"], + "best_fit_variogram": best_fit_variogram, + "cropping_parameters": {**cropping_parameters}, "extrapolate": extrapolate, - "kriging_parameters": ( - self.input["statistics"]["kriging"]["model_config"] - if kriging_parameters is None - else kriging_parameters - ), - "projection": ( - self.config["geospatial"]["init"] if projection is None else projection - ), + "kriging_parameters": {**kriging_parameters}, "standardize_coordinates": coordinate_transform, - "stratum": stratum.lower(), - "stratum_name": "stratum_num" if stratum == "ks" else "inpfc", "variable": variable, - "variogram_parameters": ( - self.input["statistics"]["variogram"]["model_config"] - if variogram_parameters is None - else variogram_parameters - ), "verbose": verbose, - } - } + }, + }, ) - # Update variogram model - self.analysis["settings"]["kriging"]["variogram_parameters"]["model"] = variogram_model + # Inherited settings/configurations (contingent on previously executed methods) + self.analysis["settings"]["kriging"].update( + { + # ---- From `self.config` + "projection": self.config["geospatial"]["init"], + # ---- From `self.transect_analysis` settings + "exclude_age1": self.analysis["settings"]["transect"]["exclude_age1"], + "stratum": self.analysis["settings"]["transect"]["stratum"], + }, + ) - # Update variogram parameters to use fitted if the values are available - if best_fit_variogram: - if "variogram" in self.results: - if "model_fit" in self.results["variogram"]: - # ---- Parameters - self.analysis["settings"]["kriging"].update( - {"variogram_parameters": self.results["variogram"]["model_fit"]} - ) - # ---- Update model - self.analysis["settings"]["kriging"]["variogram_parameters"]["model"] = ( - self.results["variogram"]["model"] + # Calculate additional keys for the settings + self.analysis["settings"]["kriging"].update( + { + "stratum_name": ( + "stratum_num" + if self.analysis["settings"]["kriging"]["stratum"] == "ks" + else "inpfc" + ), + "variogram_parameters": ( + { + **self.input["statistics"]["variogram"]["model_config"], + **variogram_parameters, + } + if ( + variogram_parameters + and "model_config" in self.input["statistics"]["variogram"] ) - else: - raise ValueError( - "Argument `best_fit_variogram` is invalid. No fitted variogram parameters " - "have been estimated via the `fit_variogram()` Survey-class method. " - "Either run the `fit_variogram()` method first, or set the argument " - "`best_fit_variogram=False`." + else ( + self.input["statistics"]["variogram"]["model_config"] + if ( + not variogram_parameters + and "model_config" in self.input["statistics"]["variogram"] + ) + else ( + { + **self.results["variogram"]["model"], + **self.results["variogram"]["model_fit"], + } + if best_fit_variogram is True + else {} + ) ) - - # Prepare temporary message concerning coordinate transformation = False - if not coordinate_transform: - raise ValueError( - """Kriging without coordinate standardization is currently """ - """unavailable due to the kriging parameter `search_radius` being only defined """ - """for transformed x- and y-coordinates.""" + ), + } + ) + # ---- Further append variogram parameters if they were ran + if "variogram" in self.analysis["settings"]: + self.analysis["settings"]["kriging"]["variogram_parameters"].update( + **self.analysis["settings"]["variogram"] ) # Run kriging analysis # ----> Generates a georeferenced dataframe for the entire mesh grid, summary statistics, # ----> and adds intermediate data products to the analysis attribute - # ---- If kriging results are not extrapolated beyond the survey region: - if not extrapolate: - # ---- Update the analysis settings - self.analysis["settings"]["kriging"].update( - { - "bearing_tolerance": bearing_tolerance, - "crop_method": crop_method, - "latitude_resolution": latitude_resolution, - "mesh_buffer_distance": mesh_buffer_distance, - "num_nearest_transect": num_nearest_transects, - } - ) - # ---- Run kriging algorithm - kriged_results, self.analysis = krige( - self.input, self.analysis, self.analysis["settings"]["kriging"] - ) - # ---- If kriging results are extrapolated beyond the survey region: - else: - # ---- Run kriging algorithm - kriged_results, self.analysis = krige( - self.input, self.analysis, self.analysis["settings"]["kriging"] - ) + # ---- Run kriging algorithm + kriged_results, self.analysis = krige( + self.input, self.analysis, self.analysis["settings"]["kriging"] + ) # Save the results to the `results` attribute self.results.update({"kriging": kriged_results}) diff --git a/echopop/tests/test_pydantic_validation.py b/echopop/tests/test_pydantic_validation.py new file mode 100644 index 00000000..5dcd209a --- /dev/null +++ b/echopop/tests/test_pydantic_validation.py @@ -0,0 +1,258 @@ +import re + +import numpy as np +import pytest +from pydantic import ValidationError + +from echopop.utils.validate_dict import KrigingAnalysis, KrigingParameterInputs, MeshCrop + + +@pytest.mark.parametrize( + "input, expected, exception", + [ + (dict(), None, "Both 'correlation_range' and 'search_radius' arguments are missing"), + ( + dict(correlation_range=1.0), + dict(anisotropy=0.0, kmin=3, kmax=10, correlation_range=1.0, search_radius=3.0), + None, + ), + ( + dict(correlation_range=1.0, search_radius=5.0), + dict(anisotropy=0.0, kmin=3, kmax=10, correlation_range=1.0, search_radius=5.0), + None, + ), + ( + dict(anisotropy=1, correlation_range=2, search_radius=3), + dict(anisotropy=1.0, kmin=3, kmax=10, correlation_range=2.0, search_radius=3.0), + None, + ), + ( + dict(kmin=3.0, kmax=10.0, correlation_range=1.0), + None, + ["Value must be a non-negative integer", "Value must be a non-negative integer"], + ), + ( + dict(kmin=10, kmax=3, correlation_range=1.0), + None, + "Defined 'kmax' (3) must be greater than or equal to 'kmin' (10)", + ), + ( + dict(anisotropy=0.00, kmin=1, kmax=2, correlation_range=-1.0, search_radius=-1.0), + None, + [ + "Input should be greater than or equal to 3", + "Input should be greater than or equal to 3", + "Value must be a non-negative float", + "Value must be a non-negative float", + ], + ), + ( + dict( + anisotropy=np.nan, + kmin=np.nan, + kmax=np.nan, + correlation_range=np.nan, + search_radius=np.nan, + ), + None, + [ + "Input should be a finite number", + "Value must be a non-negative integer", + "Value must be a non-negative integer", + "Input should be greater than 0", + "Input should be greater than 0", + ], + ), + ( + dict( + anisotropy=np.inf, + kmin=np.inf, + kmax=np.inf, + correlation_range=np.inf, + search_radius=np.inf, + ), + None, + [ + "Value must be a non-negative real number", + "Value must be a non-negative integer", + "Value must be a non-negative integer", + "Value must be a non-negative real number", + "Value must be a non-negative real number", + ], + ), + ], + ids=[ + "Empty inputs [invalid]", + "Produce valid 'search_radius' based on valid 'correlation_range' input", + "Define both 'search_radius' and 'correlation_range'", + "Coerce integer inputs for 'anisotropy', 'correlation_range', and 'search_radius'", + "Invalid float datatyping for 'kmin' and 'kmax'", + "Enforcing 'kmax' > 'kmin'", + "Invalid values for numerics [lower limits]", + "NaN inputs", + "Inf inputs", + ], +) +def test_KrigingParameters_model(input, expected, exception): + + # ------------------------- + if exception is not None: + if isinstance(exception, list): + for e in exception: + with pytest.raises(ValidationError, match=re.escape(e)): + assert KrigingParameterInputs.create(**input) + else: + with pytest.raises(ValidationError, match=re.escape(exception)): + assert KrigingParameterInputs.create(**input) + else: + result = KrigingParameterInputs.create(**input) + assert result == expected + + +@pytest.mark.parametrize( + "input, expected, exception", + [ + ( + dict(), + dict( + best_fit_variogram=False, + coordinate_transform=True, + extrapolate=False, + variable="biomass", + verbose=True, + ), + None, + ), + ( + dict(best_fit_variogram=3, coordinate_transform=3, extrapolate=3, verbose=3), + None, + [ + "Input should be a valid boolean", + "Input should be a valid boolean", + "Input should be a valid boolean", + "Input should be a valid boolean", + ], + ), + (dict(variable="krakens"), None, "Input should be 'biomass'"), + ], + ids=[ + "Default values [no inputs, empty dictionary]", + "Invalid boolean inputs [integers, not bool/str]", + "Invalid Literal input for 'variable'", + ], +) +def test_KrigingAnalysis_model(input, expected, exception): + + # ------------------------- + if exception is not None: + if isinstance(exception, list): + for e in exception: + with pytest.raises(ValidationError, match=re.escape(e)): + assert KrigingAnalysis.create(**input) + else: + with pytest.raises(ValidationError, match=re.escape(exception)): + assert KrigingAnalysis.create(**input) + else: + result = KrigingAnalysis.create(**input) + assert result == expected + + +@pytest.mark.parametrize( + "input, expected, exception", + [ + ( + dict(), + dict( + crop_method="transect_ends", + num_nearest_transects=4, + mesh_buffer_distance=1.25, + latitude_resolution=1.25, + bearing_tolerance=15.0, + ), + None, + ), + (dict(crop_method="invalid"), None, "Input should be 'transect_ends' or 'convex_hull'"), + ( + dict(mesh_buffer_distance=1.0, latitude_resolution=1.0, bearing_tolerance=15), + dict( + crop_method="transect_ends", + num_nearest_transects=4, + mesh_buffer_distance=1.0, + latitude_resolution=1.0, + bearing_tolerance=15.0, + ), + None, + ), + ( + dict(num_nearest_transects=1.0), + None, + "Value error, Value must be a non-negative integer.", + ), + (dict(bearing_tolerance="a"), None, "Value must be a non-negative real angle"), + ( + dict(mesh_buffer_distance=-1.0, latitude_resolution=-1.0, bearing_tolerance=-1.0), + None, + [ + "Value must be a non-negative float", + "Value must be a non-negative float", + "Value must be a non-negative real angle", + ], + ), + ( + dict( + num_nearest_transects=np.nan, + mesh_buffer_distance=np.nan, + latitude_resolution=np.nan, + bearing_tolerance=np.nan, + ), + None, + [ + "Value must be a non-negative integer.", + "Input should be greater than 0", + "Input should be greater than 0", + "Input should be greater than 0", + ], + ), + ( + dict( + num_nearest_transects=np.inf, + mesh_buffer_distance=np.inf, + latitude_resolution=np.inf, + bearing_tolerance=np.inf, + ), + None, + [ + "Value must be a non-negative integer.", + "Value must be a non-negative real number", + "Value must be a non-negative real number", + "Value must be a non-negative real angle", + ], + ), + (dict(bearing_tolerance=181.0), None, "Input should be less than or equal to 180"), + ], + ids=[ + "Default values [no inputs, empty dictionary]", + "Invalid Literal for 'crop_method'", + "Valid int-to-float coercion", + "Invalid floats where value should be int", + "Invalid 'bearing_tolerance' [str]", + "Invalid values below limits", + "All NaN values for numeric inputs", + "All Inf values for numeric inputs", + "Invalid 'bearing_tolerance' input [upper limit]", + ], +) +def test_MeshCrop_model(input, expected, exception): + + # ------------------------- + if exception is not None: + if isinstance(exception, list): + for e in exception: + with pytest.raises(ValidationError, match=re.escape(e)): + assert MeshCrop.create(**input) + else: + with pytest.raises(ValidationError, match=re.escape(exception)): + assert MeshCrop.create(**input) + else: + result = MeshCrop.create(**input) + assert result == expected diff --git a/echopop/utils/message.py b/echopop/utils/message.py index 90bfedd1..fce6479d 100644 --- a/echopop/utils/message.py +++ b/echopop/utils/message.py @@ -244,7 +244,9 @@ def kriging_results_msg(kriging_results_dict: pd.DataFrame, settings_dict: dict) # Break down strings # ---- Mesh cropping mesh_crop = ( - settings_dict["crop_method"].capitalize() if not settings_dict["extrapolate"] else None + settings_dict["cropping_parameters"]["crop_method"].capitalize() + if not settings_dict["extrapolate"] + else None ) # ---- Replace '_' mesh_crop = mesh_crop.replace("_", " ") diff --git a/echopop/utils/validate.py b/echopop/utils/validate.py index 5bf38ec7..d4361601 100644 --- a/echopop/utils/validate.py +++ b/echopop/utils/validate.py @@ -11,6 +11,7 @@ class posint(int): """Positive-only integer (includes 0)""" __failstate__ = "must be a non-negative integer" + __origin__ = "posint" def __new__(cls, value): if not isinstance(value, int) or value < 0: @@ -22,6 +23,7 @@ class posfloat(float): """Positive-only float (includes 0.0)""" __failstate__ = "must be a non-negative float" + __origin__ = "posfloat" def __new__(cls, value): if not isinstance(value, (float, int)) or value < 0: @@ -33,6 +35,7 @@ class realposfloat(posfloat): """Real number positive-only float (includes 0.0)""" __failstate__ = "must be a non-negative real number" + __origin__ = "realposfloat" def __new__(cls, value): if not isinstance(value, (float, int)) or np.isinf(value): # Check if value is infinity @@ -44,6 +47,7 @@ class realcircle(realposfloat): """Real number in a unit circle""" __failstate__ = "must be a non-negative real angle (as a 'float') between 0.0 and 360.0 degrees" + __origin__ = "realcircle" def __new__(cls, value): if not isinstance(value, (float, int)) or (value < 0.0 or value > 360.0): diff --git a/echopop/utils/validate_df.py b/echopop/utils/validate_df.py index 26322301..df56a3d7 100644 --- a/echopop/utils/validate_df.py +++ b/echopop/utils/validate_df.py @@ -445,7 +445,6 @@ class VarioKrigingPara(BaseDataFrame): corr: Series[float] = Field(ge=0.0, nullable=False, alias="vario.corr") hole: Series[float] = Field(ge=0.0, nullable=False, alias="vario.hole") lscl: Series[float] = Field(ge=0.0, nullable=False, alias="vario.lscl") - model: Series[int] = Field(nullable=False, alias="vario.model") nugt: Series[float] = Field(ge=0.0, nullable=False, alias="vario.nugt") powr: Series[float] = Field(ge=0.0, nullable=False, alias="vario.powr") range: Series[float] = Field(ge=0.0, nullable=False, alias="vario.range") diff --git a/echopop/utils/validate_dict.py b/echopop/utils/validate_dict.py index 4c19e3d9..732d76e7 100644 --- a/echopop/utils/validate_dict.py +++ b/echopop/utils/validate_dict.py @@ -1016,3 +1016,157 @@ def validate(params: Dict[str, InitialValues]) -> None: # -------- # print("Validate passed.") # -------- + + +class MeshCrop( + BaseModel, + arbitrary_types_allowed=True, + title="kriging mesh cropping parameters ('cropping_parameters')", +): + crop_method: Literal["transect_ends", "convex_hull"] = Field(default="transect_ends") + num_nearest_transects: posint = Field(gt=0, default=4) + mesh_buffer_distance: realposfloat = Field(gt=0.0, default=1.25, allow_inf_nan=False) + latitude_resolution: realposfloat = Field(gt=0.0, default=1.25, allow_inf_nan=False) + bearing_tolerance: realcircle = Field(gt=0.0, default=15.0, le=180.0, allow_inf_nan=False) + + @field_validator("num_nearest_transects", mode="before") + def validate_posint(cls, v): + return posint(v) + + @field_validator("bearing_tolerance", mode="before") + def validate_realcircle(cls, v): + return realcircle(v) + + @field_validator("mesh_buffer_distance", "latitude_resolution", mode="before") + def validate_realposfloat(cls, v): + return realposfloat(v) + + def __init__( + self, + crop_method: Literal["transect_ends", "convex_hull"] = "transect_ends", + num_nearest_transects: posint = 4, + mesh_buffer_distance: realposfloat = 1.25, + latitude_resolution: realposfloat = 1.25, + bearing_tolerance: realcircle = 15.0, + **kwargs, + ): + """ + Mesh cropping method parameters + """ + + try: + super().__init__( + crop_method=crop_method, + num_nearest_transects=num_nearest_transects, + mesh_buffer_distance=mesh_buffer_distance, + latitude_resolution=latitude_resolution, + bearing_tolerance=bearing_tolerance, + ) + except ValidationError as e: + # Drop traceback + e.__traceback__ = None + raise e + + # Factory method + @classmethod + def create(cls, **kwargs): + """ + Factory creation method to create a `MeshCrop` instance + """ + return cls(**kwargs).model_dump(exclude_none=True) + + +class KrigingParameterInputs( + BaseModel, arbitrary_types_allowed=True, title="kriging model parameters ('kriging_parameters')" +): + anisotropy: realposfloat = Field(default=0.0, allow_inf_nan=False) + kmin: posint = Field(default=3, ge=3) + kmax: posint = Field(default=10, ge=3) + correlation_range: Optional[realposfloat] = Field(default=None, gt=0.0, allow_inf_nan=False) + search_radius: Optional[realposfloat] = Field(default=None, gt=0.0, allow_inf_nan=False) + + @field_validator("kmin", "kmax", mode="before") + def validate_posint(cls, v): + return posint(v) + + @field_validator("anisotropy", "correlation_range", "search_radius", mode="before") + def validate_realposfloat(cls, v): + if v is None: + return v + else: + return realposfloat(v) + + @model_validator(mode="before") + def validate_k_window(cls, values): + # ---- Get `kmin` + kmin = values.get("kmin", 3) + # ---- Get 'kmax' + kmax = values.get("kmax", 10) + # ---- Ensure that `kmax >= kmin` + if kmax < kmin: + # ---- Raise Error + raise ValueError( + f"Defined 'kmax' ({kmax}) must be greater than or equal to 'kmin' ({kmin})." + ) + # ---- Return values + return values + + @model_validator(mode="before") + def validate_spatial_correlation_params(cls, values): + # ---- Get `correlation_range` + correlation_range = values.get("correlation_range", None) + # ---- Get 'search_radius' + search_radius = values.get("search_radius", None) + # ---- Ensure that both parameters are not None + if not correlation_range and not search_radius: + # ---- Raise Error + raise ValueError( + "Both 'correlation_range' and 'search_radius' arguments are missing. At least one " + "must be defined." + ) + # ---- Return values + return values + + # Factory method + @classmethod + def create(cls, **kwargs): + """ + Factory creation method to create a `KrigingParameters` instance + """ + + # Collect errors, if any arise + try: + # ---- Test validate + _ = cls(**kwargs) + # ---- Edit values if needed + if kwargs.get("search_radius") is None and kwargs["correlation_range"] is not None: + kwargs["search_radius"] = kwargs["correlation_range"] * 3 + # ---- Produce the dictionary as an output + return cls(**kwargs).model_dump(exclude_none=True) + except ValidationError as e: + e.__traceback__ = None + raise e + + +class KrigingAnalysis(BaseModel, arbitrary_types_allowed=True): + best_fit_variogram: bool = Field(default=False) + coordinate_transform: bool = Field(default=True) + extrapolate: bool = Field(default=False) + variable: Literal["biomass"] = Field(default="biomass") + verbose: bool = Field(default=True) + + def __init__(self, **kwargs): + try: + super().__init__(**kwargs) + except ValidationError as e: + # Drop traceback + e.__traceback__ = None + raise e + + # Factory method + @classmethod + def create(cls, **kwargs): + """ + Factory creation method to create a `KrigingAnalysis` instance + """ + return cls(**kwargs).model_dump(exclude_none=True)