Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose func arg and skip_na to compute_MVBS users #1269

Merged
14 changes: 12 additions & 2 deletions echopype/commongrid/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ def compute_MVBS(
ds_Sv: xr.Dataset,
range_var: Literal["echo_range", "depth"] = "echo_range",
range_bin: str = "20m",
ping_time_bin: str = "20S",
ping_time_bin: str = "20s",
method="map-reduce",
skipna=True,
closed: Literal["left", "right"] = "left",
**flox_kwargs,
):
Expand All @@ -56,12 +57,15 @@ def compute_MVBS(
``depth`` as a data variable.
range_bin : str, default '20m'
bin size along ``echo_range`` or ``depth`` in meters.
ping_time_bin : str, default '20S'
ping_time_bin : str, default '20s'
bin size along ``ping_time``
method: str, default 'map-reduce'
The flox strategy for reduction of dask arrays only.
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
for more details.
skipna: bool, default True
If true, the mean operation skips NaN values.
Else, the mean operation includes NaN values.
closed: {'left', 'right'}, default 'left'
Which side of bin interval is closed.
**flox_kwargs
Expand Down Expand Up @@ -105,6 +109,7 @@ def compute_MVBS(
ping_interval,
range_var=range_var,
method=method,
skipna=skipna,
**flox_kwargs,
)

Expand Down Expand Up @@ -268,6 +273,7 @@ def compute_NASC(
range_bin: str = "10m",
dist_bin: str = "0.5nmi",
method: str = "map-reduce",
skipna=True,
closed: Literal["left", "right"] = "left",
**flox_kwargs,
) -> xr.Dataset:
Expand All @@ -287,6 +293,9 @@ def compute_NASC(
The flox strategy for reduction of dask arrays only.
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
for more details.
skipna: bool, default True
If true, the mean operation skips NaN values.
Else, the mean operation includes NaN values.
closed: {'left', 'right'}, default 'left'
Which side of bin interval is closed.
**flox_kwargs
Expand Down Expand Up @@ -357,6 +366,7 @@ def compute_NASC(
range_interval,
dist_interval,
method=method,
skipna=skipna,
**flox_kwargs,
)

Expand Down
49 changes: 46 additions & 3 deletions echopype/commongrid/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def compute_raw_MVBS(
ping_interval: Union[pd.IntervalIndex, np.ndarray],
range_var: Literal["echo_range", "depth"] = "echo_range",
method="map-reduce",
skipna=True,
**flox_kwargs,
):
"""
Expand All @@ -45,6 +46,9 @@ def compute_raw_MVBS(
The flox strategy for reduction of dask arrays only.
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
for more details.
skipna: bool, default True
If true, the mean operation skips NaN values.
Else, the mean operation includes NaN values.
**flox_kwargs
Additional keyword arguments to be passed
to flox reduction function.
Expand All @@ -65,6 +69,8 @@ def compute_raw_MVBS(
x_var=x_var,
range_var=range_var,
method=method,
func="nanmean" if skipna else "mean",
skipna=skipna,
**flox_kwargs,
)

Expand All @@ -80,6 +86,7 @@ def compute_raw_NASC(
range_interval: Union[pd.IntervalIndex, np.ndarray],
dist_interval: Union[pd.IntervalIndex, np.ndarray],
method="map-reduce",
skipna=True,
**flox_kwargs,
):
"""
Expand All @@ -100,6 +107,9 @@ def compute_raw_NASC(
The flox strategy for reduction of dask arrays only.
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
for more details.
skipna: bool, default True
If true, the mean operation skips NaN values.
Else, the mean operation includes NaN values.
**flox_kwargs
Additional keyword arguments to be passed
to flox reduction function.
Expand All @@ -126,6 +136,8 @@ def compute_raw_NASC(
x_var=x_var,
range_var=range_var,
method=method,
func="nanmean" if skipna else "mean",
skipna=skipna,
**flox_kwargs,
)

Expand All @@ -136,6 +148,7 @@ def compute_raw_NASC(
ds_Sv["ping_time"],
ds_Sv[x_var],
func="nanmean",
skipna=True,
expected_groups=(dist_interval),
isbin=True,
method=method,
Expand All @@ -154,7 +167,8 @@ def compute_raw_NASC(
h_mean_denom = xarray_reduce(
da_denom,
ds_Sv[x_var],
func="sum",
func="nansum",
skipna=True,
expected_groups=(dist_interval),
isbin=[True],
method=method,
Expand All @@ -165,7 +179,8 @@ def compute_raw_NASC(
ds_Sv["channel"],
ds_Sv[x_var],
ds_Sv[range_var].isel(**{range_dim: slice(0, -1)}),
func="sum",
func="nansum",
skipna=True,
expected_groups=(None, dist_interval, range_interval),
isbin=[False, True, True],
method=method,
Expand Down Expand Up @@ -480,6 +495,8 @@ def _groupby_x_along_channels(
x_var: Literal["ping_time", "distance_nmi"] = "ping_time",
range_var: Literal["echo_range", "depth"] = "echo_range",
method: str = "map-reduce",
func: str = "nanmean",
skipna: bool = True,
**flox_kwargs,
) -> xr.Dataset:
"""
Expand Down Expand Up @@ -517,6 +534,15 @@ def _groupby_x_along_channels(
The flox strategy for reduction of dask arrays only.
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
for more details.
func: str, default 'nanmean'
The aggregation function used for reducing the data array.
By default, 'nanmean' is used. Other options can be found in the flox `documentation
<https://flox.readthedocs.io/en/latest/generated/flox.xarray.xarray_reduce.html>`_.
skipna: bool, default True
If true, aggregation function skips NaN values.
Else, aggregation function includes NaN values.
Note that if ``func`` is set to 'mean' and ``skipna`` is set to True, then aggregation
will have the same behavior as if func is set to 'nanmean'.
**flox_kwargs
Additional keyword arguments to be passed
to flox reduction function.
Expand All @@ -540,6 +566,22 @@ def _groupby_x_along_channels(
# average should be done in linear domain
sv = ds_Sv["Sv"].pipe(_log2lin)

# Check for any NaNs in the coordinate arrays:
named_arrays = {
x_var: ds_Sv[x_var].data,
range_var: ds_Sv[range_var].data,
}
aggregation_msg = (
"Aggregation may be negatively impacted since Flox will not aggregate any "
"```Sv``` values that have corresponding NaN coordinate values. Consider handling "
"these values before calling your intended commongrid function."
)
for array_name, array in named_arrays.items():
if np.isnan(array).any():
logging.warning(
f"The ```{array_name}``` coordinate array contain NaNs. {aggregation_msg}"
)

# reduce along ping_time or distance_nmi
# and echo_range or depth
# by binning and averaging
Expand All @@ -548,10 +590,11 @@ def _groupby_x_along_channels(
ds_Sv["channel"],
ds_Sv[x_var],
ds_Sv[range_var],
func="nanmean",
expected_groups=(None, x_interval, range_interval),
isbin=[False, True, True],
method=method,
func=func,
skipna=skipna,
**flox_kwargs,
)
return sv_mean
2 changes: 1 addition & 1 deletion echopype/echodata/echodata.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def update_platform(
}
)
time_dims_max = max([int(dim[-1]) for dim in platform.dims if dim.startswith("time")])
new_time_dims = [f"time{time_dims_max+i+1}" for i in range(len(ext_time_dims))]
new_time_dims = [f"time{time_dims_max + i + 1}" for i in range(len(ext_time_dims))]
# Map each new time dim name to the external time dim name:
new_time_dims_mappings = {new: ext for new, ext in zip(new_time_dims, ext_time_dims)}

Expand Down
15 changes: 8 additions & 7 deletions echopype/tests/commongrid/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def mock_Sv_dataset_irregular(
# Sprinkle nans around echo_range
for pos in mock_nan_ilocs:
ds_Sv["echo_range"][pos] = np.nan
ds_Sv["Sv"][pos] = np.nan
return ds_Sv


Expand Down Expand Up @@ -201,7 +202,7 @@ def mock_nasc_array_regular(mock_Sv_dataset_regular, mock_parameters):
dist_bin = 0.5
range_bin = 2
channel_len = mock_parameters["channel_len"]
expected_nasc_val = _get_expected_nasc_val(ds_Sv, dist_bin, range_bin, channel_len)
expected_nasc_val = _get_expected_nasc_val_nanmean(ds_Sv, dist_bin, range_bin, channel_len)

return expected_nasc_val

Expand Down Expand Up @@ -237,7 +238,7 @@ def mock_nasc_array_irregular(mock_Sv_dataset_irregular, mock_parameters):
dist_bin = 0.5
range_bin = 2
channel_len = mock_parameters["channel_len"]
expected_nasc_val = _get_expected_nasc_val(ds_Sv, dist_bin, range_bin, channel_len)
expected_nasc_val = _get_expected_nasc_val_nanmean(ds_Sv, dist_bin, range_bin, channel_len)

return expected_nasc_val

Expand Down Expand Up @@ -463,12 +464,12 @@ def mock_Sv_dataset_NASC(mock_parameters, random_number_generator):


# Helper functions to generate mock Sv and MVBS dataset
def _get_expected_nasc_val(
def _get_expected_nasc_val_nanmean(
ds_Sv: xr.Dataset, dist_bin: str, range_bin: float, channel_len: int = 2
) -> np.ndarray:
"""
Helper functions to generate expected NASC outputs from mock Sv dataset
by brute-force looping and compute the mean
by brute-force looping and compute the nanmean

Parameters
----------
Expand Down Expand Up @@ -500,7 +501,7 @@ def _get_expected_nasc_val(
sv = ds_Sv["Sv"].pipe(ep.utils.compute._log2lin)

# Compute sv mean
sv_mean = _brute_mean_reduce_3d(
sv_mean = _brute_nanmean_reduce_3d(
sv, ds_Sv, "depth", "distance_nmi", channel_len, dist_interval, range_interval
)

Expand Down Expand Up @@ -544,7 +545,7 @@ def _get_expected_nasc_val(
return sv_mean * h_mean * 4 * np.pi * 1852**2


def _brute_mean_reduce_3d(
def _brute_nanmean_reduce_3d(
sv: xr.DataArray,
ds_Sv: xr.Dataset,
range_var: Literal["echo_range", "depth"],
Expand Down Expand Up @@ -610,7 +611,7 @@ def _brute_mean_reduce_3d(
if 0 in sv_tmp.shape:
mean_vals[ch_idx, x_idx, r_idx] = np.nan
else:
mean_vals[ch_idx, x_idx, r_idx] = np.mean(sv_tmp)
mean_vals[ch_idx, x_idx, r_idx] = np.nanmean(sv_tmp)
return mean_vals


Expand Down
90 changes: 89 additions & 1 deletion echopype/tests/commongrid/test_commongrid_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pandas as pd
from flox.xarray import xarray_reduce
import xarray as xr
import echopype as ep
from echopype.consolidate import add_location, add_depth
from echopype.commongrid.utils import (
Expand Down Expand Up @@ -438,11 +439,98 @@ def test_compute_NASC_values(request, er_type):
ds_Sv = request.getfixturevalue("mock_Sv_dataset_irregular")
expected_nasc = request.getfixturevalue("mock_nasc_array_irregular")

ds_NASC = ep.commongrid.compute_NASC(ds_Sv, range_bin=range_bin, dist_bin=dist_bin)
ds_NASC = ep.commongrid.compute_NASC(ds_Sv, range_bin=range_bin, dist_bin=dist_bin, skipna=True)

assert ds_NASC.NASC.shape == expected_nasc.shape
# Floating digits need to check with all close not equal
# Compare the values of the MVBS array with the expected values
assert np.allclose(
ds_NASC.NASC.values, expected_nasc.values, atol=1e-10, rtol=1e-10, equal_nan=True
)

@pytest.mark.integration
@pytest.mark.parametrize(
("operation","skipna", "range_var"),
[
("MVBS", True, "depth"),
("MVBS", False, "depth"),
("MVBS", True, "echo_range"),
("MVBS", False, "echo_range"),
# NASC `range_var` always defaults to `depth` so we set this as `None``.
("NASC", True, None),
("NASC", False, None),
],
)
def test_compute_MVBS_NASC_skipna_nan_and_non_nan_values(
request,
operation,
skipna,
range_var,
caplog,
):
# Create subset dataset with 2 channels, 2 ping times, and 20 range samples:

# Get fixture for irregular Sv
ds_Sv = request.getfixturevalue("mock_Sv_dataset_irregular")
# Already has 2 channels and 20 range samples, so subset for only ping time
subset_ds_Sv = ds_Sv.isel(ping_time=slice(0,2))

# Compute MVBS / Compute NASC
if operation == "MVBS":
if range_var == "echo_range":
# Turn on logger verbosity
ep.utils.log.verbose(override=False)

da = ep.commongrid.compute_MVBS(
subset_ds_Sv,
range_var=range_var,
range_bin="2m",
skipna=skipna
)["Sv"]

if range_var == "echo_range":
# Check for appropriate warning
aggregation_msg = (
"Aggregation may be negatively impacted since Flox will not aggregate any "
"```Sv``` values that have corresponding NaN coordinate values. Consider handling "
"these values before calling your intended commongrid function."
)
expected_warning = f"The ```echo_range``` coordinate array contain NaNs. {aggregation_msg}"
assert any(expected_warning in record.message for record in caplog.records)

# Turn off logger verbosity
ep.utils.log.verbose(override=True)
else:
da = ep.commongrid.compute_NASC(subset_ds_Sv, range_bin="2m", skipna=skipna)["NASC"]

# Create NaN Mask: True if NaN, False if Not
da_nan_mask = np.isnan(da.data)

# Check for appropriate NaN/non-NaN values:

if range_var == "echo_range":
# Note that ALL `Sv` values that are `NaN` have corresponding `echo_range`
# values that are also ALL `NaN``. Flox does not bin any values that have `NaN`
# coordinates, and so none of the values that are aggregated into the 5 bins
# have any `NaNs` that are aggregated into them.
expected_values = [
[[False, False, False, False, False]],
[[False, False, False, False, False]]
]
assert np.array_equal(da_nan_mask, np.array(expected_values))
else:
# Note that the first value along the depth dimension is always NaN due to the minimum
# depth value in the Sv dataset being 2.5 (2.5m from the added offset), since both MVBS and
# NASC outputs start at depth=0 and so the first depth bin (0m-2m) doesn't contain anything.
if skipna:
expected_values = [
[[True, False, False, False, False, False]],
[[True, False, False, False, False, False]]
]
assert np.array_equal(da_nan_mask, np.array(expected_values))
else:
expected_values = [
[[True, True, True, False, False, True]],
[[True, False, False, True, True, True]]
]
assert np.array_equal(da_nan_mask, np.array(expected_values))
Loading