Skip to content

Commit

Permalink
fix(commongrid): improve 'compute_MVBS' using flox [all tests ci] (#1124
Browse files Browse the repository at this point in the history
)

* chore(deps): add flox dependency >=0.7.2

* fix(commongrid): fixes 'compute_MVBS' so it can work better and scale

Under the hood, binning along ping time and echo range now uses flox.
This allows for scalability and more community-maintained.

* docs: add small code comment

* refactor: change how ping_time index is retrieved

* refactor: remove for loop for channel

* test(mvbs): add mock Sv datasets and tests for dims (#2)

Note that @leewujung also changed mean to nanmean for skipping NaNs in each bin.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix: change dask to numpy

Changed the use of dask for log10 to numpy instead since numpy can also handle dask array inputs properly.

* feat: Add method argument

Added 'method' argument to 'get_MVBS_along_channels'
and also expose additional keyword arguments control for
flox.

* fix(commongrid): Fixed to include lat lon

Fixed 'compute_MVBS' function to now
include latitude and longitude if the variables
exists in the Sv dataset.

Additionally, the flox method and keyword arguments
are now exposed within the 'compute_MVBS' function.

Ref: Issue #1002

* refactor: Set defaults to recommended

After some investigation, @lsetiawan
concluded that at this time the
method 'map-reduce', engine 'numpy',
and reindex True works the best,
so this is now set as default.

Also, getting echo range maximum is
through direct data slicing rather than
computation.

* feat(commongrid): Add 'range_var' argument to 'compute_MVBS'

Added a new argument 'range_var' so that user can set
the range variable to perform binning with.
There are 2 options of 'echo_range' and 'depth':

- 'echo_range': When this is set, variable 'water_level'
                is now included in the resulting MVBS dataset
- 'depth': A check is in place to ensure that this variable exists
           before moving forward and use this to perform range binning.

Ref: Issue #1002

* fix: Add missing attributes for lat lon

* test: Update test to use random generator

* fix: Add case for no 'water_level'

Added a case for dataset that doesn't have water level variable.

* test(nasc): Remove 'compute_NASC' import to avoid failure

* fix: Removed assumption on echo range max

Reverted back the echo range max computation
to computing on the fly since there may be some
NaN values.

* test: Extract api test and add markings

Extracted fixtures to conftest.py for commongrid.
Additionally, clean up unused functions and mark
tests b/w unit and integration. Added a new test module
called 'test_api.py' for 'commongrid.api'.

* test: Add latlon test for 'compute_MVBS'

Added a test for Sv dataset that contains latitude
and longitude going through 'compute_MVBS'
to ensure that those variables gets propagated through.

Ref: #1002

* test: Add small get_MVBS_along_channels test

Added test for 'get_MVBS_along_channels' with either 'depth'
as the 'range_var' or checking for 'has_positions' is True
or False.

* refactor: Integrate suggested changes

Integrated suggested changes from review such as
additional comments in code, fixing some variable names,
and extracting out the lin2log and log2lin functions.

Additionally, now echopype imports pint library
to start having unit checks in the input for compute_MVBS.

* test: Added check for position values

* test: Update range_meter_bin to strings

* test: Added 'compute_MVBS' values test

* Update echopype/tests/utils/test_processinglevels_integration.py

compute_MVBS now should preserve the processing level attributes. So, test for presence rather than absence

* test: Add 'nan' sprinkles

Sprinkled 'nan' values all over 'echo_range' to ensure that
computed values from 'compute_MVBS' doesn't take into account
the 'nan'. Added check for the expected distribution of 'nan'
in the resulting array.

Ref: #1124 (comment)

* revert: Revert the use of 'pint'

Removed dependency to 'pint' and use simple regex
to ensure that 'range_bin' input is unit 'm'.
Renamed 'range_meter_bin' argument to 'range_bin'.

Ref: #1124 (comment)

* feat: Allow 'range_bin' to have space

* fix: Apply suggestions from code review

Applied fix for regex not capturing decimal values by @emiliom

Ref: https://github.com/OSOceanAcoustics/echopype/pull/1124/files#r1320422121

Co-authored-by: Emilio Mayorga <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* test: Fix test text for wrong unit

* test: Remove the 'e.g.' part on pytest

Removed the part with '(e.g., '10m')' since
it's messing up pytests regex matching.

* test: Remove remnant for test_ek.py

* refactor: Extract range_bin parsing and add close arg

Extracts out the 'range_bin' string to float into a private function.
Additionally now there's a fine tune argument for bin close edges
so user can specify either close is 'left' or 'right'.
Bins are converted to pandas interval index before passing
into 'get_MVBS_along_channels'.

* refactor: Update arg types to include interval index

Added argument type options for 'range_interval' and 'ping_interval' to
also be interval index.

* test: Update tests to have brute force creation

Changed mock mvbs to be created by doing brute force
rather than hard coding.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* test: Fix brute force mvbs gen

Fixes the generation of expected mvbs with brute force
as well as tweaks to mvbs_along_channel test.

* chore: Clean up old code for doing compute MVBS

Removes old code that perfoms compute_MVBS since
now we've switched over to flox

* chore(pytest): Added custom markers 'unit' and 'integration'

* docs: Update docstring for `compute_MVBS`

Added options for literal arguments

* refactor: Change 'parse_range_bin' to 'parse_x_bin'

Make bin parsing to be more general by making it
to 'parse_x_bin'.

* chore: Update suggested changes

Update some texts from suggested review as discussed.

Ref: #1124 (review)

---------

Co-authored-by: Wu-Jung Lee <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Emilio Mayorga <[email protected]>
  • Loading branch information
4 people authored Sep 21, 2023
1 parent 2e564e9 commit b887be8
Show file tree
Hide file tree
Showing 10 changed files with 1,174 additions and 1,294 deletions.
216 changes: 192 additions & 24 deletions echopype/commongrid/api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""
Functions for enhancing the spatial and temporal coherence of data.
"""
import re
from typing import Literal

import numpy as np
import pandas as pd
import xarray as xr

from ..consolidate.api import POSITION_VARIABLES
from ..utils.prov import add_processing_level, echopype_prov_attrs, insert_input_processing_level
from .mvbs import get_MVBS_along_channels

Expand Down Expand Up @@ -62,11 +66,117 @@ def _set_MVBS_attrs(ds):
)


def _convert_bins_to_interval_index(
bins: list, closed: Literal["left", "right"] = "left"
) -> pd.IntervalIndex:
"""
Convert bins to sorted pandas IntervalIndex
with specified closed end
Parameters
----------
bins : list
The bin edges
closed : {'left', 'right'}, default 'left'
Which side of bin interval is closed
Returns
-------
pd.IntervalIndex
The resulting IntervalIndex
"""
return pd.IntervalIndex.from_breaks(bins, closed=closed).sort_values()


def _parse_x_bin(x_bin: str, x_label="range_bin") -> float:
"""
Parses x bin string, check unit,
and returns x bin in the specified unit.
Currently only available for:
range_bin: meters (m)
dist_bin: nautical miles (nmi)
Parameters
----------
x_bin : str
X bin string, e.g., "0.5nmi" or "10m"
x_label : {"range_bin", "dist_bin"}, default "range_bin"
The label of the x bin.
Returns
-------
float
The resulting x bin value in x unit,
based on label.
Raises
------
ValueError
If the x bin string doesn't include unit value.
TypeError
If the x bin is not a type string.
KeyError
If the x label is not one of the available labels.
"""
x_bin_map = {
"range_bin": {
"name": "Range bin",
"unit": "m",
"ex": "10m",
"unit_label": "meters",
"pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(m)",
},
"dist_bin": {
"name": "Distance bin",
"unit": "nmi",
"ex": "0.5nmi",
"unit_label": "nautical miles",
"pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(nmi)",
},
}
x_bin_info = x_bin_map.get(x_label, None)

if x_bin_info is None:
raise KeyError(f"x_label must be one of {list(x_bin_map.keys())}")

# First check for bin types
if not isinstance(x_bin, str):
raise TypeError("'x_bin' must be a string")
# normalize to lower case
# for x_bin
x_bin = x_bin.strip().lower()
# Only matches meters
match_obj = re.match(x_bin_info["pattern"], x_bin)

# Do some checks on x_bin inputs
if match_obj is None:
# This shouldn't be other units
raise ValueError(
f"{x_bin_info['name']} must be in "
f"{x_bin_info['unit_label']} "
f"(e.g., '{x_bin_info['ex']}')."
)

# Convert back to float
x_bin = float(match_obj.group(1))
return x_bin


@add_processing_level("L3*")
def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"):
def compute_MVBS(
ds_Sv: xr.Dataset,
range_var: Literal["echo_range", "depth"] = "echo_range",
range_bin: str = "20m",
ping_time_bin: str = "20S",
method="map-reduce",
closed: Literal["left", "right"] = "left",
**flox_kwargs,
):
"""
Compute Mean Volume Backscattering Strength (MVBS)
based on intervals of range (``echo_range``) and ``ping_time`` specified in physical units.
based on intervals of range (``echo_range``) or depth (``depth``)
and ``ping_time`` specified in physical units.
Output of this function differs from that of ``compute_MVBS_index_binning``, which computes
bin-averaged Sv according to intervals of ``echo_range`` and ``ping_time`` specified as
Expand All @@ -76,41 +186,99 @@ def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"):
----------
ds_Sv : xr.Dataset
dataset containing Sv and ``echo_range`` [m]
range_meter_bin : Union[int, float]
bin size along ``echo_range`` in meters, default to ``20``
ping_time_bin : str
bin size along ``ping_time``, default to ``20S``
range_var: {'echo_range', 'depth'}, default 'echo_range'
The variable to use for range binning.
Must be one of ``echo_range`` or ``depth``.
Note that ``depth`` is only available if the input dataset contains
``depth`` as a data variable.
range_bin : str, default '20m'
bin size along ``echo_range`` or ``depth`` in meters.
ping_time_bin : str, default '20S'
bin size along ``ping_time``
method: str, default 'map-reduce'
The flox strategy for reduction of dask arrays only.
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
for more details.
closed: {'left', 'right'}, default 'left'
Which side of bin interval is closed.
**kwargs
Additional keyword arguments to be passed
to flox reduction function.
Returns
-------
A dataset containing bin-averaged Sv
"""

if not isinstance(ping_time_bin, str):
raise TypeError("ping_time_bin must be a string")

range_bin = _parse_x_bin(range_bin, "range_bin")

# Clean up filenames dimension if it exists
# not needed here
if "filenames" in ds_Sv.dims:
ds_Sv = ds_Sv.drop_dims("filenames")

# Check if range_var is valid
if range_var not in ["echo_range", "depth"]:
raise ValueError("range_var must be one of 'echo_range' or 'depth'.")

# Check if range_var exists in ds_Sv
if range_var not in ds_Sv.data_vars:
raise ValueError(f"range_var '{range_var}' does not exist in the input dataset.")

# Check for closed values
if closed not in ["right", "left"]:
raise ValueError(f"{closed} is not a valid option. Options are 'left' or 'right'.")

# create bin information for echo_range
range_interval = np.arange(0, ds_Sv["echo_range"].max() + range_meter_bin, range_meter_bin)
# this computes the echo range max since there might NaNs in the data
echo_range_max = ds_Sv[range_var].max()
range_interval = np.arange(0, echo_range_max + range_bin, range_bin)

# create bin information needed for ping_time
ping_interval = (
ds_Sv.ping_time.resample(ping_time=ping_time_bin, skipna=True).asfreq().ping_time.values
d_index = (
ds_Sv["ping_time"]
.resample(ping_time=ping_time_bin, skipna=True)
.first() # Not actually being used, but needed to get the bin groups
.indexes["ping_time"]
)
ping_interval = d_index.union([d_index[-1] + pd.Timedelta(ping_time_bin)]).values

# Set interval index for groups
ping_interval = _convert_bins_to_interval_index(ping_interval, closed=closed)
range_interval = _convert_bins_to_interval_index(range_interval, closed=closed)
raw_MVBS = get_MVBS_along_channels(
ds_Sv,
range_interval,
ping_interval,
range_var=range_var,
method=method,
**flox_kwargs,
)

# calculate the MVBS along each channel
MVBS_values = get_MVBS_along_channels(ds_Sv, range_interval, ping_interval)

# create MVBS dataset
# by transforming the binned dimensions to regular coords
ds_MVBS = xr.Dataset(
data_vars={"Sv": (["channel", "ping_time", "echo_range"], MVBS_values)},
data_vars={"Sv": (["channel", "ping_time", range_var], raw_MVBS["Sv"].data)},
coords={
"ping_time": ping_interval,
"channel": ds_Sv.channel,
"echo_range": range_interval[:-1],
"ping_time": np.array([v.left for v in raw_MVBS.ping_time_bins.values]),
"channel": raw_MVBS.channel.values,
range_var: np.array([v.left for v in raw_MVBS[f"{range_var}_bins"].values]),
},
)

# TODO: look into why 'filenames' exist here as a variable
# Added this check to support the test in test_process.py::test_compute_MVBS
if "filenames" in ds_MVBS.variables:
ds_MVBS = ds_MVBS.drop_vars("filenames")
# "has_positions" attribute is inserted in get_MVBS_along_channels
# when the dataset has position information
# propagate this to the final MVBS dataset
if raw_MVBS.attrs.get("has_positions", False):
for var in POSITION_VARIABLES:
ds_MVBS[var] = (["ping_time"], raw_MVBS[var].data, ds_Sv[var].attrs)

# Add water level if uses echo_range and it exists in Sv dataset
if range_var == "echo_range" and "water_level" in ds_Sv.data_vars:
ds_MVBS["water_level"] = ds_Sv["water_level"]

# ping_time_bin parsing and conversions
# Need to convert between pd.Timedelta and np.timedelta64 offsets/frequency strings
Expand Down Expand Up @@ -143,17 +311,17 @@ def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"):

# Attach attributes
_set_MVBS_attrs(ds_MVBS)
ds_MVBS["echo_range"].attrs = {"long_name": "Range distance", "units": "m"}
ds_MVBS[range_var].attrs = {"long_name": "Range distance", "units": "m"}
ds_MVBS["Sv"] = ds_MVBS["Sv"].assign_attrs(
{
"cell_methods": (
f"ping_time: mean (interval: {ping_time_bin_resvalue} {ping_time_bin_resunit_label} " # noqa
"comment: ping_time is the interval start) "
f"echo_range: mean (interval: {range_meter_bin} meter "
"comment: echo_range is the interval start)"
f"{range_var}: mean (interval: {range_bin} meter "
f"comment: {range_var} is the interval start)"
),
"binning_mode": "physical units",
"range_meter_interval": str(range_meter_bin) + "m",
"range_meter_interval": str(range_bin) + "m",
"ping_time_interval": ping_time_bin,
"actual_range": [
round(float(ds_MVBS["Sv"].min().values), 2),
Expand Down
Loading

0 comments on commit b887be8

Please sign in to comment.