diff --git a/echopype/commongrid/api.py b/echopype/commongrid/api.py index 4bac09813..0b1a1eb58 100644 --- a/echopype/commongrid/api.py +++ b/echopype/commongrid/api.py @@ -31,8 +31,9 @@ def compute_MVBS( ds_Sv: xr.Dataset, range_var: Literal["echo_range", "depth"] = "echo_range", range_bin: str = "20m", - ping_time_bin: str = "20S", + ping_time_bin: str = "20s", method="map-reduce", + skipna=True, closed: Literal["left", "right"] = "left", **flox_kwargs, ): @@ -56,12 +57,15 @@ def compute_MVBS( ``depth`` as a data variable. range_bin : str, default '20m' bin size along ``echo_range`` or ``depth`` in meters. - ping_time_bin : str, default '20S' + ping_time_bin : str, default '20s' bin size along ``ping_time`` method: str, default 'map-reduce' The flox strategy for reduction of dask arrays only. See flox `documentation `_ for more details. + skipna: bool, default True + If true, the mean operation skips NaN values. + Else, the mean operation includes NaN values. closed: {'left', 'right'}, default 'left' Which side of bin interval is closed. **flox_kwargs @@ -105,6 +109,7 @@ def compute_MVBS( ping_interval, range_var=range_var, method=method, + skipna=skipna, **flox_kwargs, ) @@ -268,6 +273,7 @@ def compute_NASC( range_bin: str = "10m", dist_bin: str = "0.5nmi", method: str = "map-reduce", + skipna=True, closed: Literal["left", "right"] = "left", **flox_kwargs, ) -> xr.Dataset: @@ -287,6 +293,9 @@ def compute_NASC( The flox strategy for reduction of dask arrays only. See flox `documentation `_ for more details. + skipna: bool, default True + If true, the mean operation skips NaN values. + Else, the mean operation includes NaN values. closed: {'left', 'right'}, default 'left' Which side of bin interval is closed. **flox_kwargs @@ -357,6 +366,7 @@ def compute_NASC( range_interval, dist_interval, method=method, + skipna=skipna, **flox_kwargs, ) diff --git a/echopype/commongrid/utils.py b/echopype/commongrid/utils.py index 39d2cd62a..f80c5a257 100644 --- a/echopype/commongrid/utils.py +++ b/echopype/commongrid/utils.py @@ -20,6 +20,7 @@ def compute_raw_MVBS( ping_interval: Union[pd.IntervalIndex, np.ndarray], range_var: Literal["echo_range", "depth"] = "echo_range", method="map-reduce", + skipna=True, **flox_kwargs, ): """ @@ -45,6 +46,9 @@ def compute_raw_MVBS( The flox strategy for reduction of dask arrays only. See flox `documentation `_ for more details. + skipna: bool, default True + If true, the mean operation skips NaN values. + Else, the mean operation includes NaN values. **flox_kwargs Additional keyword arguments to be passed to flox reduction function. @@ -65,6 +69,8 @@ def compute_raw_MVBS( x_var=x_var, range_var=range_var, method=method, + func="nanmean" if skipna else "mean", + skipna=skipna, **flox_kwargs, ) @@ -80,6 +86,7 @@ def compute_raw_NASC( range_interval: Union[pd.IntervalIndex, np.ndarray], dist_interval: Union[pd.IntervalIndex, np.ndarray], method="map-reduce", + skipna=True, **flox_kwargs, ): """ @@ -100,6 +107,9 @@ def compute_raw_NASC( The flox strategy for reduction of dask arrays only. See flox `documentation `_ for more details. + skipna: bool, default True + If true, the mean operation skips NaN values. + Else, the mean operation includes NaN values. **flox_kwargs Additional keyword arguments to be passed to flox reduction function. @@ -126,6 +136,8 @@ def compute_raw_NASC( x_var=x_var, range_var=range_var, method=method, + func="nanmean" if skipna else "mean", + skipna=skipna, **flox_kwargs, ) @@ -136,6 +148,7 @@ def compute_raw_NASC( ds_Sv["ping_time"], ds_Sv[x_var], func="nanmean", + skipna=True, expected_groups=(dist_interval), isbin=True, method=method, @@ -154,7 +167,8 @@ def compute_raw_NASC( h_mean_denom = xarray_reduce( da_denom, ds_Sv[x_var], - func="sum", + func="nansum", + skipna=True, expected_groups=(dist_interval), isbin=[True], method=method, @@ -165,7 +179,8 @@ def compute_raw_NASC( ds_Sv["channel"], ds_Sv[x_var], ds_Sv[range_var].isel(**{range_dim: slice(0, -1)}), - func="sum", + func="nansum", + skipna=True, expected_groups=(None, dist_interval, range_interval), isbin=[False, True, True], method=method, @@ -480,6 +495,8 @@ def _groupby_x_along_channels( x_var: Literal["ping_time", "distance_nmi"] = "ping_time", range_var: Literal["echo_range", "depth"] = "echo_range", method: str = "map-reduce", + func: str = "nanmean", + skipna: bool = True, **flox_kwargs, ) -> xr.Dataset: """ @@ -517,6 +534,15 @@ def _groupby_x_along_channels( The flox strategy for reduction of dask arrays only. See flox `documentation `_ for more details. + func: str, default 'nanmean' + The aggregation function used for reducing the data array. + By default, 'nanmean' is used. Other options can be found in the flox `documentation + `_. + skipna: bool, default True + If true, aggregation function skips NaN values. + Else, aggregation function includes NaN values. + Note that if ``func`` is set to 'mean' and ``skipna`` is set to True, then aggregation + will have the same behavior as if func is set to 'nanmean'. **flox_kwargs Additional keyword arguments to be passed to flox reduction function. @@ -540,6 +566,22 @@ def _groupby_x_along_channels( # average should be done in linear domain sv = ds_Sv["Sv"].pipe(_log2lin) + # Check for any NaNs in the coordinate arrays: + named_arrays = { + x_var: ds_Sv[x_var].data, + range_var: ds_Sv[range_var].data, + } + aggregation_msg = ( + "Aggregation may be negatively impacted since Flox will not aggregate any " + "```Sv``` values that have corresponding NaN coordinate values. Consider handling " + "these values before calling your intended commongrid function." + ) + for array_name, array in named_arrays.items(): + if np.isnan(array).any(): + logging.warning( + f"The ```{array_name}``` coordinate array contain NaNs. {aggregation_msg}" + ) + # reduce along ping_time or distance_nmi # and echo_range or depth # by binning and averaging @@ -548,10 +590,11 @@ def _groupby_x_along_channels( ds_Sv["channel"], ds_Sv[x_var], ds_Sv[range_var], - func="nanmean", expected_groups=(None, x_interval, range_interval), isbin=[False, True, True], method=method, + func=func, + skipna=skipna, **flox_kwargs, ) return sv_mean diff --git a/echopype/echodata/echodata.py b/echopype/echodata/echodata.py index 1f04a8abc..4c0663759 100644 --- a/echopype/echodata/echodata.py +++ b/echopype/echodata/echodata.py @@ -378,7 +378,7 @@ def update_platform( } ) time_dims_max = max([int(dim[-1]) for dim in platform.dims if dim.startswith("time")]) - new_time_dims = [f"time{time_dims_max+i+1}" for i in range(len(ext_time_dims))] + new_time_dims = [f"time{time_dims_max + i + 1}" for i in range(len(ext_time_dims))] # Map each new time dim name to the external time dim name: new_time_dims_mappings = {new: ext for new, ext in zip(new_time_dims, ext_time_dims)} diff --git a/echopype/tests/commongrid/conftest.py b/echopype/tests/commongrid/conftest.py index 6e67f9f6d..f6b73d083 100644 --- a/echopype/tests/commongrid/conftest.py +++ b/echopype/tests/commongrid/conftest.py @@ -162,6 +162,7 @@ def mock_Sv_dataset_irregular( # Sprinkle nans around echo_range for pos in mock_nan_ilocs: ds_Sv["echo_range"][pos] = np.nan + ds_Sv["Sv"][pos] = np.nan return ds_Sv @@ -201,7 +202,7 @@ def mock_nasc_array_regular(mock_Sv_dataset_regular, mock_parameters): dist_bin = 0.5 range_bin = 2 channel_len = mock_parameters["channel_len"] - expected_nasc_val = _get_expected_nasc_val(ds_Sv, dist_bin, range_bin, channel_len) + expected_nasc_val = _get_expected_nasc_val_nanmean(ds_Sv, dist_bin, range_bin, channel_len) return expected_nasc_val @@ -237,7 +238,7 @@ def mock_nasc_array_irregular(mock_Sv_dataset_irregular, mock_parameters): dist_bin = 0.5 range_bin = 2 channel_len = mock_parameters["channel_len"] - expected_nasc_val = _get_expected_nasc_val(ds_Sv, dist_bin, range_bin, channel_len) + expected_nasc_val = _get_expected_nasc_val_nanmean(ds_Sv, dist_bin, range_bin, channel_len) return expected_nasc_val @@ -463,12 +464,12 @@ def mock_Sv_dataset_NASC(mock_parameters, random_number_generator): # Helper functions to generate mock Sv and MVBS dataset -def _get_expected_nasc_val( +def _get_expected_nasc_val_nanmean( ds_Sv: xr.Dataset, dist_bin: str, range_bin: float, channel_len: int = 2 ) -> np.ndarray: """ Helper functions to generate expected NASC outputs from mock Sv dataset - by brute-force looping and compute the mean + by brute-force looping and compute the nanmean Parameters ---------- @@ -500,7 +501,7 @@ def _get_expected_nasc_val( sv = ds_Sv["Sv"].pipe(ep.utils.compute._log2lin) # Compute sv mean - sv_mean = _brute_mean_reduce_3d( + sv_mean = _brute_nanmean_reduce_3d( sv, ds_Sv, "depth", "distance_nmi", channel_len, dist_interval, range_interval ) @@ -544,7 +545,7 @@ def _get_expected_nasc_val( return sv_mean * h_mean * 4 * np.pi * 1852**2 -def _brute_mean_reduce_3d( +def _brute_nanmean_reduce_3d( sv: xr.DataArray, ds_Sv: xr.Dataset, range_var: Literal["echo_range", "depth"], @@ -610,7 +611,7 @@ def _brute_mean_reduce_3d( if 0 in sv_tmp.shape: mean_vals[ch_idx, x_idx, r_idx] = np.nan else: - mean_vals[ch_idx, x_idx, r_idx] = np.mean(sv_tmp) + mean_vals[ch_idx, x_idx, r_idx] = np.nanmean(sv_tmp) return mean_vals diff --git a/echopype/tests/commongrid/test_commongrid_api.py b/echopype/tests/commongrid/test_commongrid_api.py index 6e3a84385..75d3a4516 100644 --- a/echopype/tests/commongrid/test_commongrid_api.py +++ b/echopype/tests/commongrid/test_commongrid_api.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd from flox.xarray import xarray_reduce +import xarray as xr import echopype as ep from echopype.consolidate import add_location, add_depth from echopype.commongrid.utils import ( @@ -438,7 +439,7 @@ def test_compute_NASC_values(request, er_type): ds_Sv = request.getfixturevalue("mock_Sv_dataset_irregular") expected_nasc = request.getfixturevalue("mock_nasc_array_irregular") - ds_NASC = ep.commongrid.compute_NASC(ds_Sv, range_bin=range_bin, dist_bin=dist_bin) + ds_NASC = ep.commongrid.compute_NASC(ds_Sv, range_bin=range_bin, dist_bin=dist_bin, skipna=True) assert ds_NASC.NASC.shape == expected_nasc.shape # Floating digits need to check with all close not equal @@ -446,3 +447,90 @@ def test_compute_NASC_values(request, er_type): assert np.allclose( ds_NASC.NASC.values, expected_nasc.values, atol=1e-10, rtol=1e-10, equal_nan=True ) + +@pytest.mark.integration +@pytest.mark.parametrize( + ("operation","skipna", "range_var"), + [ + ("MVBS", True, "depth"), + ("MVBS", False, "depth"), + ("MVBS", True, "echo_range"), + ("MVBS", False, "echo_range"), + # NASC `range_var` always defaults to `depth` so we set this as `None``. + ("NASC", True, None), + ("NASC", False, None), + ], +) +def test_compute_MVBS_NASC_skipna_nan_and_non_nan_values( + request, + operation, + skipna, + range_var, + caplog, +): + # Create subset dataset with 2 channels, 2 ping times, and 20 range samples: + + # Get fixture for irregular Sv + ds_Sv = request.getfixturevalue("mock_Sv_dataset_irregular") + # Already has 2 channels and 20 range samples, so subset for only ping time + subset_ds_Sv = ds_Sv.isel(ping_time=slice(0,2)) + + # Compute MVBS / Compute NASC + if operation == "MVBS": + if range_var == "echo_range": + # Turn on logger verbosity + ep.utils.log.verbose(override=False) + + da = ep.commongrid.compute_MVBS( + subset_ds_Sv, + range_var=range_var, + range_bin="2m", + skipna=skipna + )["Sv"] + + if range_var == "echo_range": + # Check for appropriate warning + aggregation_msg = ( + "Aggregation may be negatively impacted since Flox will not aggregate any " + "```Sv``` values that have corresponding NaN coordinate values. Consider handling " + "these values before calling your intended commongrid function." + ) + expected_warning = f"The ```echo_range``` coordinate array contain NaNs. {aggregation_msg}" + assert any(expected_warning in record.message for record in caplog.records) + + # Turn off logger verbosity + ep.utils.log.verbose(override=True) + else: + da = ep.commongrid.compute_NASC(subset_ds_Sv, range_bin="2m", skipna=skipna)["NASC"] + + # Create NaN Mask: True if NaN, False if Not + da_nan_mask = np.isnan(da.data) + + # Check for appropriate NaN/non-NaN values: + + if range_var == "echo_range": + # Note that ALL `Sv` values that are `NaN` have corresponding `echo_range` + # values that are also ALL `NaN``. Flox does not bin any values that have `NaN` + # coordinates, and so none of the values that are aggregated into the 5 bins + # have any `NaNs` that are aggregated into them. + expected_values = [ + [[False, False, False, False, False]], + [[False, False, False, False, False]] + ] + assert np.array_equal(da_nan_mask, np.array(expected_values)) + else: + # Note that the first value along the depth dimension is always NaN due to the minimum + # depth value in the Sv dataset being 2.5 (2.5m from the added offset), since both MVBS and + # NASC outputs start at depth=0 and so the first depth bin (0m-2m) doesn't contain anything. + if skipna: + expected_values = [ + [[True, False, False, False, False, False]], + [[True, False, False, False, False, False]] + ] + assert np.array_equal(da_nan_mask, np.array(expected_values)) + else: + expected_values = [ + [[True, True, True, False, False, True]], + [[True, False, False, True, True, True]] + ] + assert np.array_equal(da_nan_mask, np.array(expected_values))