diff --git a/echopype/commongrid/api.py b/echopype/commongrid/api.py index ee71c12ec..8c9132b99 100644 --- a/echopype/commongrid/api.py +++ b/echopype/commongrid/api.py @@ -1,10 +1,14 @@ """ Functions for enhancing the spatial and temporal coherence of data. """ +import re +from typing import Literal + import numpy as np import pandas as pd import xarray as xr +from ..consolidate.api import POSITION_VARIABLES from ..utils.prov import add_processing_level, echopype_prov_attrs, insert_input_processing_level from .mvbs import get_MVBS_along_channels @@ -62,11 +66,117 @@ def _set_MVBS_attrs(ds): ) +def _convert_bins_to_interval_index( + bins: list, closed: Literal["left", "right"] = "left" +) -> pd.IntervalIndex: + """ + Convert bins to sorted pandas IntervalIndex + with specified closed end + + Parameters + ---------- + bins : list + The bin edges + closed : {'left', 'right'}, default 'left' + Which side of bin interval is closed + + Returns + ------- + pd.IntervalIndex + The resulting IntervalIndex + """ + return pd.IntervalIndex.from_breaks(bins, closed=closed).sort_values() + + +def _parse_x_bin(x_bin: str, x_label="range_bin") -> float: + """ + Parses x bin string, check unit, + and returns x bin in the specified unit. + + Currently only available for: + range_bin: meters (m) + dist_bin: nautical miles (nmi) + + Parameters + ---------- + x_bin : str + X bin string, e.g., "0.5nmi" or "10m" + x_label : {"range_bin", "dist_bin"}, default "range_bin" + The label of the x bin. + + Returns + ------- + float + The resulting x bin value in x unit, + based on label. + + Raises + ------ + ValueError + If the x bin string doesn't include unit value. + TypeError + If the x bin is not a type string. + KeyError + If the x label is not one of the available labels. + """ + x_bin_map = { + "range_bin": { + "name": "Range bin", + "unit": "m", + "ex": "10m", + "unit_label": "meters", + "pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(m)", + }, + "dist_bin": { + "name": "Distance bin", + "unit": "nmi", + "ex": "0.5nmi", + "unit_label": "nautical miles", + "pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(nmi)", + }, + } + x_bin_info = x_bin_map.get(x_label, None) + + if x_bin_info is None: + raise KeyError(f"x_label must be one of {list(x_bin_map.keys())}") + + # First check for bin types + if not isinstance(x_bin, str): + raise TypeError("'x_bin' must be a string") + # normalize to lower case + # for x_bin + x_bin = x_bin.strip().lower() + # Only matches meters + match_obj = re.match(x_bin_info["pattern"], x_bin) + + # Do some checks on x_bin inputs + if match_obj is None: + # This shouldn't be other units + raise ValueError( + f"{x_bin_info['name']} must be in " + f"{x_bin_info['unit_label']} " + f"(e.g., '{x_bin_info['ex']}')." + ) + + # Convert back to float + x_bin = float(match_obj.group(1)) + return x_bin + + @add_processing_level("L3*") -def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"): +def compute_MVBS( + ds_Sv: xr.Dataset, + range_var: Literal["echo_range", "depth"] = "echo_range", + range_bin: str = "20m", + ping_time_bin: str = "20S", + method="map-reduce", + closed: Literal["left", "right"] = "left", + **flox_kwargs, +): """ Compute Mean Volume Backscattering Strength (MVBS) - based on intervals of range (``echo_range``) and ``ping_time`` specified in physical units. + based on intervals of range (``echo_range``) or depth (``depth``) + and ``ping_time`` specified in physical units. Output of this function differs from that of ``compute_MVBS_index_binning``, which computes bin-averaged Sv according to intervals of ``echo_range`` and ``ping_time`` specified as @@ -76,41 +186,99 @@ def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"): ---------- ds_Sv : xr.Dataset dataset containing Sv and ``echo_range`` [m] - range_meter_bin : Union[int, float] - bin size along ``echo_range`` in meters, default to ``20`` - ping_time_bin : str - bin size along ``ping_time``, default to ``20S`` + range_var: {'echo_range', 'depth'}, default 'echo_range' + The variable to use for range binning. + Must be one of ``echo_range`` or ``depth``. + Note that ``depth`` is only available if the input dataset contains + ``depth`` as a data variable. + range_bin : str, default '20m' + bin size along ``echo_range`` or ``depth`` in meters. + ping_time_bin : str, default '20S' + bin size along ``ping_time`` + method: str, default 'map-reduce' + The flox strategy for reduction of dask arrays only. + See flox `documentation `_ + for more details. + closed: {'left', 'right'}, default 'left' + Which side of bin interval is closed. + **kwargs + Additional keyword arguments to be passed + to flox reduction function. Returns ------- A dataset containing bin-averaged Sv """ + if not isinstance(ping_time_bin, str): + raise TypeError("ping_time_bin must be a string") + + range_bin = _parse_x_bin(range_bin, "range_bin") + + # Clean up filenames dimension if it exists + # not needed here + if "filenames" in ds_Sv.dims: + ds_Sv = ds_Sv.drop_dims("filenames") + + # Check if range_var is valid + if range_var not in ["echo_range", "depth"]: + raise ValueError("range_var must be one of 'echo_range' or 'depth'.") + + # Check if range_var exists in ds_Sv + if range_var not in ds_Sv.data_vars: + raise ValueError(f"range_var '{range_var}' does not exist in the input dataset.") + + # Check for closed values + if closed not in ["right", "left"]: + raise ValueError(f"{closed} is not a valid option. Options are 'left' or 'right'.") + # create bin information for echo_range - range_interval = np.arange(0, ds_Sv["echo_range"].max() + range_meter_bin, range_meter_bin) + # this computes the echo range max since there might NaNs in the data + echo_range_max = ds_Sv[range_var].max() + range_interval = np.arange(0, echo_range_max + range_bin, range_bin) # create bin information needed for ping_time - ping_interval = ( - ds_Sv.ping_time.resample(ping_time=ping_time_bin, skipna=True).asfreq().ping_time.values + d_index = ( + ds_Sv["ping_time"] + .resample(ping_time=ping_time_bin, skipna=True) + .first() # Not actually being used, but needed to get the bin groups + .indexes["ping_time"] + ) + ping_interval = d_index.union([d_index[-1] + pd.Timedelta(ping_time_bin)]).values + + # Set interval index for groups + ping_interval = _convert_bins_to_interval_index(ping_interval, closed=closed) + range_interval = _convert_bins_to_interval_index(range_interval, closed=closed) + raw_MVBS = get_MVBS_along_channels( + ds_Sv, + range_interval, + ping_interval, + range_var=range_var, + method=method, + **flox_kwargs, ) - - # calculate the MVBS along each channel - MVBS_values = get_MVBS_along_channels(ds_Sv, range_interval, ping_interval) # create MVBS dataset + # by transforming the binned dimensions to regular coords ds_MVBS = xr.Dataset( - data_vars={"Sv": (["channel", "ping_time", "echo_range"], MVBS_values)}, + data_vars={"Sv": (["channel", "ping_time", range_var], raw_MVBS["Sv"].data)}, coords={ - "ping_time": ping_interval, - "channel": ds_Sv.channel, - "echo_range": range_interval[:-1], + "ping_time": np.array([v.left for v in raw_MVBS.ping_time_bins.values]), + "channel": raw_MVBS.channel.values, + range_var: np.array([v.left for v in raw_MVBS[f"{range_var}_bins"].values]), }, ) - # TODO: look into why 'filenames' exist here as a variable - # Added this check to support the test in test_process.py::test_compute_MVBS - if "filenames" in ds_MVBS.variables: - ds_MVBS = ds_MVBS.drop_vars("filenames") + # "has_positions" attribute is inserted in get_MVBS_along_channels + # when the dataset has position information + # propagate this to the final MVBS dataset + if raw_MVBS.attrs.get("has_positions", False): + for var in POSITION_VARIABLES: + ds_MVBS[var] = (["ping_time"], raw_MVBS[var].data, ds_Sv[var].attrs) + + # Add water level if uses echo_range and it exists in Sv dataset + if range_var == "echo_range" and "water_level" in ds_Sv.data_vars: + ds_MVBS["water_level"] = ds_Sv["water_level"] # ping_time_bin parsing and conversions # Need to convert between pd.Timedelta and np.timedelta64 offsets/frequency strings @@ -143,17 +311,17 @@ def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"): # Attach attributes _set_MVBS_attrs(ds_MVBS) - ds_MVBS["echo_range"].attrs = {"long_name": "Range distance", "units": "m"} + ds_MVBS[range_var].attrs = {"long_name": "Range distance", "units": "m"} ds_MVBS["Sv"] = ds_MVBS["Sv"].assign_attrs( { "cell_methods": ( f"ping_time: mean (interval: {ping_time_bin_resvalue} {ping_time_bin_resunit_label} " # noqa "comment: ping_time is the interval start) " - f"echo_range: mean (interval: {range_meter_bin} meter " - "comment: echo_range is the interval start)" + f"{range_var}: mean (interval: {range_bin} meter " + f"comment: {range_var} is the interval start)" ), "binning_mode": "physical units", - "range_meter_interval": str(range_meter_bin) + "m", + "range_meter_interval": str(range_bin) + "m", "ping_time_interval": ping_time_bin, "actual_range": [ round(float(ds_MVBS["Sv"].min().values), 2), diff --git a/echopype/commongrid/mvbs.py b/echopype/commongrid/mvbs.py index 2c0006f35..2fbc1b38f 100644 --- a/echopype/commongrid/mvbs.py +++ b/echopype/commongrid/mvbs.py @@ -2,411 +2,25 @@ Contains core functions needed to compute the MVBS of an input dataset. """ -import warnings -from typing import Tuple, Union +from typing import Literal, Union -import dask.array import numpy as np +import pandas as pd import xarray as xr +from flox.xarray import xarray_reduce - -def get_bin_indices( - echo_range: np.ndarray, bins_er: np.ndarray, times: np.ndarray, bins_time: np.ndarray -) -> Tuple[np.ndarray, np.ndarray]: - """ - Obtains the bin index of ``echo_range`` and ``times`` based - on the binning ``bins_er`` and ``bins_time``, respectively. - - Parameters - ---------- - echo_range: np.ndarray - 2D array of echo range values - bins_er: np.ndarray - 1D array (used by np.digitize) representing the binning required for ``echo_range`` - times: np.ndarray - 1D array corresponding to the time values that should be binned - bins_time: np.ndarray - 1D array (used by np.digitize) representing the binning required for ``times`` - - Returns - ------- - digitized_echo_range: np.ndarray - 2D array of bin indices for ``echo_range`` - bin_time_ind: np.ndarray - 1D array of bin indices for ``times`` - """ - - # get bin index for each echo range value - digitized_echo_range = np.digitize(echo_range, bins_er, right=False) - - # turn datetime into integers, so we can use np.digitize - if isinstance(times, dask.array.Array): - times_i8 = times.compute().data.view("i8") - else: - times_i8 = times.view("i8") - - # turn datetime into integers, so we can use np.digitize - bins_time_i8 = bins_time.view("i8") - - # get bin index for each time - bin_time_ind = np.digitize(times_i8, bins_time_i8, right=False) - - return digitized_echo_range, bin_time_ind - - -def bin_and_mean_echo_range( - arr: Union[np.ndarray, dask.array.Array], digitized_echo_range: np.ndarray, n_bin_er: int -) -> Union[np.ndarray, dask.array.Array]: - """ - Bins and means ``arr`` with respect to the ``echo_range`` bins. - - Parameters - ---------- - arr: np.ndarray or dask.array.Array - 2D array (dimension: [``echo_range`` x ``ping_time``]) to bin along ``echo_range`` - and compute mean of each bin - digitized_echo_range: np.ndarray - 2D array of bin indices for ``echo_range`` - n_bin_er: int - The number of echo range bins - - Returns - ------- - er_means: np.ndarray or dask.array.Array - 2D array representing the bin and mean of ``arr`` along ``echo_range`` - """ - - binned_means = [] - for bin_er in range(1, n_bin_er): - # Catch a known warning that can occur, which does not impact the results - with warnings.catch_warnings(): - # ignore warnings caused by taking a mean of an array filled with NaNs - warnings.filterwarnings(action="ignore", message="Mean of empty slice") - - # bin and mean echo_range dimension - er_selected_data = np.nanmean(arr[:, digitized_echo_range == bin_er], axis=1) - - # collect all echo_range bins - binned_means.append(er_selected_data) - - # create full echo_range binned array - er_means = np.vstack(binned_means) - - return er_means - - -def get_unequal_rows(mat: np.ndarray, row: np.ndarray) -> np.ndarray: - """ - Obtains those row indices of ``mat`` that are not equal - to ``row``. - - Parameters - ---------- - mat: np.ndarray - 2D array with the same column dimension as the number - of elements in ``row`` - row: np.ndarray - 1D array with the same number of element elements as - the column dimension of ``mat`` - - Returns - ------- - row_ind_not_equal: np.ndarray - The row indices of ``mat`` that are not equal to ``row`` - - Notes - ----- - Elements with NaNs are considered equal if they are in the same position. - """ - - # compare row against all rows in mat (allowing for NaNs to be equal) - element_nan_equal = (mat == row) | (np.isnan(mat) & np.isnan(row)) - - # determine if mat row is equal to row - row_not_equal = np.logical_not(np.all(element_nan_equal, axis=1)) - - if isinstance(row_not_equal, dask.array.Array): - row_not_equal = row_not_equal.compute() - - # get those row indices that are not equal to row - row_ind_not_equal = np.argwhere(row_not_equal).flatten() - - return row_ind_not_equal - - -def if_all_er_steps_identical(er_chan: Union[xr.DataArray, np.ndarray]) -> bool: - """ - A comprehensive check that determines if all ``echo_range`` values - along ``ping_time`` have the same step size. If they do not have - the same step sizes, then grouping of the ``echo_range`` values - will be necessary. - - Parameters - ---------- - er_chan: xr.DataArray or np.ndarray - 2D array containing the ``echo_range`` values for each ``ping_time`` - - Returns - ------- - bool - True, if grouping of ``echo_range`` along ``ping_time`` is necessary, otherwise False - - Notes - ----- - ``er_chan`` should have rows corresponding to ``ping_time`` and columns - corresponding to ``range_sample`` - """ - - # grab the in-memory numpy echo_range values, if necessary - if isinstance(er_chan, xr.DataArray): - er_chan = er_chan.values - - # grab the first ping_time that is not filled with NaNs - ping_index = 0 - while np.all(np.isnan(er_chan[ping_index, :])): - ping_index += 1 - - # determine those rows of er_chan that are not equal to the row ping_index - unequal_ping_ind = get_unequal_rows(er_chan, er_chan[ping_index, :]) - - if len(unequal_ping_ind) > 0: - # see if all unequal_ping_ind are filled with NaNs - all_nans = np.all(np.all(np.isnan(er_chan[unequal_ping_ind, :]), axis=1)) - - if all_nans: - # All echo_range values have the same step size - return False - else: - # Some echo_range values have different step sizes - return True - else: - # All echo_range values have the same step size - return False - - -def if_last_er_steps_identical(er_chan: Union[xr.DataArray, np.ndarray]) -> bool: - """ - An alternative (less comprehensive) check that determines if all - ``echo_range`` values along ``ping_time`` have the same step size. - If they do not have the same step sizes, then grouping of the - ``echo_range`` values will be necessary. - - Parameters - ---------- - er_chan: xr.DataArray or np.ndarray - 2D array containing the ``echo_range`` values for each ``ping_time`` - - Returns - ------- - bool - True, if grouping of ``echo_range`` along ``ping_time`` is necessary, otherwise False - - Notes - ----- - It is possible that this method will incorrectly determine if grouping - is necessary. - - ``er_chan`` should have rows corresponding to ``ping_time`` and columns - corresponding to ``range_sample`` - """ - - # determine the number of NaNs in each ping and find the unique number of NaNs - unique_num_nans = np.unique(np.isnan(er_chan.data).sum(axis=1)) - - # compute the results, if necessary, to allow for downstream checks - if isinstance(unique_num_nans, dask.array.Array): - unique_num_nans = unique_num_nans.compute() - - # determine if any value is not 0 or er_chan.shape[1] - unexpected_num_nans = False in np.logical_or( - unique_num_nans == 0, unique_num_nans == er_chan.shape[1] - ) - - if unexpected_num_nans: - # echo_range varies with ping_time - return True - else: - # make sure that the final echo_range value for each ping_time is the same (account for NaN) - num_non_nans = np.logical_not(np.isnan(np.unique(er_chan.data[:, -1]))).sum() - - # compute the results, if necessary, to allow for downstream checks - if isinstance(num_non_nans, dask.array.Array): - num_non_nans = num_non_nans.compute() - - if num_non_nans > 1: - # echo_range varies with ping_time - return True - else: - # echo_range does not vary with ping_time - return False - - -def is_er_grouping_needed( - echo_range: Union[xr.DataArray, np.ndarray], comprehensive_er_check: bool -) -> bool: - """ - Determines if ``echo_range`` values along ``ping_time`` can change and - thus need to be grouped. - - Parameters - ---------- - echo_range: xr.DataArray or np.ndarray - 2D array containing the ``echo_range`` values for each ``ping_time`` - comprehensive_er_check: bool - If True, a more comprehensive check will be completed to determine if ``echo_range`` - grouping along ``ping_time`` is needed, otherwise a less comprehensive check will be done - - Returns - ------- - bool - If True grouping of ``echo_range`` will be required, else it will not - be necessary - """ - - if comprehensive_er_check: - return if_all_er_steps_identical(echo_range) - else: - return if_last_er_steps_identical(echo_range) - - -def group_dig_er_bin_mean_echo_range( - arr: Union[np.ndarray, dask.array.Array], - digitized_echo_range: Union[np.ndarray, dask.array.Array], - n_bin_er: int, -) -> Union[np.ndarray, dask.array.Array]: - """ - Groups the rows of ``arr`` such that they have the same corresponding - row values in ``digitized_echo_range``, then applies ``bin_and_mean_echo_range`` - on each group, and lastly assembles the correctly ordered ``er_means`` array - representing the bin and mean of ``arr`` with respect to ``echo_range``. - - Parameters - ---------- - arr: dask.array.Array or np.ndarray - The 2D array whose values should be binned - digitized_echo_range: dask.array.Array or np.ndarray - 2D array of bin indices for ``echo_range`` - n_bin_er: int - The number of echo range bins - - Returns - ------- - er_means: dask.array.Array or np.ndarray - The bin and mean of ``arr`` with respect to ``echo_range`` - """ - - # compute bin indices to allow for downstream processes (mainly axis argument in unique) - if isinstance(digitized_echo_range, dask.array.Array): - digitized_echo_range = digitized_echo_range.compute() - - # determine the unique rows of digitized_echo_range and the inverse - unique_er_bin_ind, unique_inverse = np.unique(digitized_echo_range, axis=0, return_inverse=True) - - # create groups of row indices using the unique inverse - grps_same_ind = [ - np.argwhere(unique_inverse == grp).flatten() for grp in np.unique(unique_inverse) - ] - - # for each group bin and mean arr along echo_range - # note: the values appended may not be in the correct final order - binned_er = [] - for count, grp in enumerate(grps_same_ind): - binned_er.append( - bin_and_mean_echo_range(arr[grp, :], unique_er_bin_ind[count, :], n_bin_er) - ) - - # construct er_means and put the columns in the correct order - binned_er_array = np.hstack(binned_er) - correct_column_ind = np.argsort(np.concatenate(grps_same_ind)) - er_means = binned_er_array[:, correct_column_ind] - - return er_means - - -def bin_and_mean_2d( - arr: Union[dask.array.Array, np.ndarray], - bins_time: np.ndarray, - bins_er: np.ndarray, - times: np.ndarray, - echo_range: np.ndarray, - comprehensive_er_check: bool = True, -) -> np.ndarray: - """ - Bins and means ``arr`` based on ``times`` and ``echo_range``, - and their corresponding bins. If ``arr`` is ``Sv`` then this - will compute the MVBS. - - Parameters - ---------- - arr: dask.array.Array or np.ndarray - The 2D array whose values should be binned - bins_time: np.ndarray - 1D array (used by np.digitize) representing the binning required for ``times`` - bins_er: np.ndarray - 1D array (used by np.digitize) representing the binning required for ``echo_range`` - times: np.ndarray - 1D array corresponding to the time values that should be binned - echo_range: np.ndarray - 2D array of echo range values - comprehensive_er_check: bool - If True, a more comprehensive check will be completed to determine if ``echo_range`` - grouping along ``ping_time`` is needed, otherwise a less comprehensive check will be done - - Returns - ------- - final_reduced: np.ndarray - The final binned and mean ``arr``, if ``arr`` is ``Sv`` then this is the MVBS - - Notes - ----- - This function assumes that ``arr`` has rows corresponding to - ``ping_time`` and columns corresponding to ``echo_range``. - - This function should not be run if the number of ``echo_range`` values - vary amongst ``ping_times``. This should not occur for our current use - of echopype-generated Sv data. - """ - - # get the number of echo range and time bins - n_bin_er = len(bins_er) - n_bin_time = len(bins_time) - - # obtain the bin indices for echo_range and times - digitized_echo_range, bin_time_ind = get_bin_indices(echo_range, bins_er, times, bins_time) - - # determine if grouping of echo_range values with the same step size is necessary - er_grouping_needed = is_er_grouping_needed(echo_range, comprehensive_er_check) - - if er_grouping_needed: - # groups, bins, and means arr with respect to echo_range - er_means = group_dig_er_bin_mean_echo_range(arr, digitized_echo_range, n_bin_er) - else: - # bin and mean arr with respect to echo_range - er_means = bin_and_mean_echo_range(arr, digitized_echo_range[0, :], n_bin_er) - - # if er_means is a dask array we compute it so the graph does not get too large - if isinstance(er_means, dask.array.Array): - er_means = er_means.compute() - - # create final reduced array i.e. MVBS - final = np.empty((n_bin_time, n_bin_er - 1)) - for bin_time in range(1, n_bin_time + 1): - # obtain er_mean indices corresponding to the time bin - indices = np.argwhere(bin_time_ind == bin_time).flatten() - - if len(indices) == 0: - # fill values with NaN, if there are no values in the bin - final[bin_time - 1, :] = np.nan - else: - # bin and mean the er_mean time bin - final[bin_time - 1, :] = np.nanmean(er_means[:, indices], axis=1) - - return final +from ..consolidate.api import POSITION_VARIABLES +from ..utils.compute import _lin2log, _log2lin def get_MVBS_along_channels( - ds_Sv: xr.Dataset, echo_range_interval: np.ndarray, ping_interval: np.ndarray -) -> np.ndarray: + ds_Sv: xr.Dataset, + range_interval: Union[pd.IntervalIndex, np.ndarray], + ping_interval: Union[pd.IntervalIndex, np.ndarray], + range_var: Literal["echo_range", "depth"] = "echo_range", + method: str = "map-reduce", + **kwargs +) -> xr.Dataset: """ Computes the MVBS of ``ds_Sv`` along each channel for the given intervals. @@ -416,46 +30,60 @@ def get_MVBS_along_channels( ds_Sv: xr.Dataset A Dataset containing ``Sv`` and ``echo_range`` data with coordinates ``channel``, ``ping_time``, and ``range_sample`` - echo_range_interval: np.ndarray - 1D array (used by np.digitize) representing the binning required for ``echo_range`` - ping_interval: np.ndarray - 1D array (used by np.digitize) representing the binning required for ``ping_time`` + range_interval: pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``range_var`` + ping_interval: pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``ping_time`` + range_var: str + The variable to use for range binning. + Either ``echo_range`` or ``depth``. + method: str + The flox strategy for reduction of dask arrays only. + See flox `documentation `_ + for more details. + **kwargs + Additional keyword arguments to be passed + to flox reduction function Returns ------- - np.ndarray - The MVBS value of the input ``ds_Sv`` for all channels - - Notes - ----- - If the values in ``ds_Sv`` are delayed then the binning and mean of ``Sv`` with - respect to ``echo_range`` will take place, then the delayed result will be computed, - and lastly the binning and mean with respect to ``ping_time`` will be completed. It - is necessary to apply a compute midway through this method because Dask graph layers - get too large and this makes downstream operations very inefficient. - """ - - all_MVBS = [] - for chan in ds_Sv.channel: - # squeeze to remove "channel" dim if present - # TODO: not sure why not already removed for the AZFP case. Investigate. - ds = ds_Sv.sel(channel=chan).squeeze() - - # average should be done in linear domain - sv = 10 ** (ds["Sv"] / 10) - - # get MVBS for channel in linear domain - chan_MVBS = bin_and_mean_2d( - sv.data, - bins_time=ping_interval, - bins_er=echo_range_interval, - times=sv.ping_time.data, - echo_range=ds["echo_range"], - comprehensive_er_check=True, + xr.Dataset + The MVBS dataset of the input ``ds_Sv`` for all channels + """ + + # average should be done in linear domain + sv = ds_Sv["Sv"].pipe(_log2lin) + + # Get positions if exists + # otherwise just use an empty dataset + ds_Pos = xr.Dataset(attrs={"has_positions": False}) + if all(v in ds_Sv for v in POSITION_VARIABLES): + ds_Pos = xarray_reduce( + ds_Sv[POSITION_VARIABLES], + ds_Sv["ping_time"], + func="nanmean", + expected_groups=(ping_interval), + isbin=True, + method=method, ) + ds_Pos.attrs["has_positions"] = True + + # reduce along ping_time and echo_range or depth + # by binning and averaging + mvbs = xarray_reduce( + sv, + sv["channel"], + ds_Sv["ping_time"], + ds_Sv[range_var], + func="nanmean", + expected_groups=(None, ping_interval, range_interval), + isbin=[False, True, True], + method=method, + **kwargs + ) - # apply inverse mapping to get back to the original domain and store values - all_MVBS.append(10 * np.log10(chan_MVBS)) - - # collect the MVBS values for each channel - return np.stack(all_MVBS, axis=0) + # apply inverse mapping to get back to the original domain and store values + da_MVBS = mvbs.pipe(_lin2log) + return xr.merge([ds_Pos, da_MVBS]) diff --git a/echopype/consolidate/api.py b/echopype/consolidate/api.py index a08144a5c..364c76be3 100644 --- a/echopype/consolidate/api.py +++ b/echopype/consolidate/api.py @@ -12,6 +12,8 @@ from ..utils.prov import add_processing_level from .split_beam_angle import add_angle_to_ds, get_angle_complex_samples, get_angle_power_samples +POSITION_VARIABLES = ["latitude", "longitude"] + def swap_dims_channel_frequency(ds: xr.Dataset) -> xr.Dataset: """ @@ -185,7 +187,7 @@ def sel_interp(var, time_dim_name): f"{datetime.datetime.utcnow()} +00:00. " "Interpolated or propagated from Platform latitude/longitude." # noqa ) - for da_name in ["latitude", "longitude"]: + for da_name in POSITION_VARIABLES: interp_ds[da_name] = interp_ds[da_name].assign_attrs({"history": history_attr}) if time_dim_name in interp_ds: diff --git a/echopype/tests/commongrid/conftest.py b/echopype/tests/commongrid/conftest.py new file mode 100644 index 000000000..27b166a03 --- /dev/null +++ b/echopype/tests/commongrid/conftest.py @@ -0,0 +1,519 @@ +import pytest + +import xarray as xr +import numpy as np +import pandas as pd + +from echopype.consolidate import add_depth +import echopype as ep + + +@pytest.fixture +def random_number_generator(): + """Random number generator for tests""" + return np.random.default_rng() + + +@pytest.fixture +def mock_nan_ilocs(): + """NaN i locations for irregular Sv dataset + + It's a list of tuples, each tuple contains + (channel, ping_time, range_sample) + + Notes + ----- + This was created with the following code: + + ``` + import numpy as np + + random_positions = [] + for i in range(20): + random_positions.append(( + np.random.randint(0, 2), + np.random.randint(0, 5), + np.random.randint(0, 20)) + ) + ``` + """ + return [ + (1, 1, 10), + (1, 0, 16), + (0, 3, 6), + (0, 2, 11), + (0, 2, 6), + (1, 1, 14), + (0, 1, 17), + (1, 4, 19), + (0, 3, 3), + (0, 0, 19), + (0, 1, 5), + (1, 2, 9), + (1, 4, 18), + (0, 1, 5), + (0, 4, 4), + (0, 1, 6), + (1, 2, 2), + (0, 1, 2), + (0, 4, 8), + (0, 1, 1), + ] + + +@pytest.fixture +def mock_parameters(): + """Small mock parameters""" + return { + "channel_len": 2, + "ping_time_len": 10, + "depth_len": 20, + "ping_time_interval": "0.3S", + } + + +@pytest.fixture +def mock_Sv_sample(mock_parameters): + """ + Mock Sv sample + + Dimension: (2, 10, 20) + """ + channel_len = mock_parameters["channel_len"] + ping_time_len = mock_parameters["ping_time_len"] + depth_len = mock_parameters["depth_len"] + + depth_data = np.linspace(0, 1, num=depth_len) + return np.tile(depth_data, (channel_len, ping_time_len, 1)) + + +@pytest.fixture +def mock_Sv_dataset_regular(mock_parameters, mock_Sv_sample): + ds_Sv = _gen_Sv_echo_range_regular(**mock_parameters, ping_time_jitter_max_ms=0) + ds_Sv["Sv"].data = mock_Sv_sample + return ds_Sv + + +@pytest.fixture +def mock_Sv_dataset_irregular(mock_parameters, mock_Sv_sample, mock_nan_ilocs): + depth_interval = [0.5, 0.32, 0.2] + depth_ping_time_len = [2, 3, 5] + ds_Sv = _gen_Sv_echo_range_irregular( + **mock_parameters, + depth_interval=depth_interval, + depth_ping_time_len=depth_ping_time_len, + ping_time_jitter_max_ms=30, # Added jitter to ping_time + ) + ds_Sv["Sv"].data = mock_Sv_sample + # Sprinkle nans around echo_range + for pos in mock_nan_ilocs: + ds_Sv["echo_range"][pos] = np.nan + return ds_Sv + + +@pytest.fixture +def mock_mvbs_inputs(): + return dict(range_meter_bin=2, ping_time_bin="1s") + + +@pytest.fixture +def mock_mvbs_array_regular(mock_Sv_dataset_regular, mock_mvbs_inputs, mock_parameters): + """ + Mock Sv sample result from compute_MVBS + + Dimension: (2, 3, 5) + Ping time bin: 1s + Range bin: 2m + """ + ds_Sv = mock_Sv_dataset_regular + ping_time_bin = mock_mvbs_inputs["ping_time_bin"] + range_bin = mock_mvbs_inputs["range_meter_bin"] + channel_len = mock_parameters["channel_len"] + expected_mvbs_val = _get_expected_mvbs_val( + ds_Sv, ping_time_bin, range_bin, channel_len + ) + + return expected_mvbs_val + + +@pytest.fixture +def mock_mvbs_array_irregular(mock_Sv_dataset_irregular, mock_mvbs_inputs, mock_parameters): + """ + Mock Sv sample irregular result from compute_MVBS + + Dimension: (2, 3, 5) + Ping time bin: 1s + Range bin: 2m + """ + ds_Sv = mock_Sv_dataset_irregular + ping_time_bin = mock_mvbs_inputs["ping_time_bin"] + range_bin = mock_mvbs_inputs["range_meter_bin"] + channel_len = mock_parameters["channel_len"] + expected_mvbs_val = _get_expected_mvbs_val( + ds_Sv, ping_time_bin, range_bin, channel_len + ) + + return expected_mvbs_val + + +@pytest.fixture( + params=[ + ( + ("EK60", "ncei-wcsd", "Summer2017-D20170719-T211347.raw"), + "EK60", + None, + {}, + ), + ( + ("EK80_NEW", "echopype-test-D20211004-T235930.raw"), + "EK80", + None, + {"waveform_mode": "BB", "encode_mode": "complex"}, + ), + ( + ("EK80_NEW", "D20211004-T233354.raw"), + "EK80", + None, + {"waveform_mode": "CW", "encode_mode": "power"}, + ), + ( + ("EK80_NEW", "D20211004-T233115.raw"), + "EK80", + None, + {"waveform_mode": "CW", "encode_mode": "complex"}, + ), + (("ES70", "D20151202-T020259.raw"), "ES70", None, {}), + (("AZFP", "17082117.01A"), "AZFP", ("AZFP", "17041823.XML"), {}), + ( + ("AD2CP", "raw", "090", "rawtest.090.00001.ad2cp"), + "AD2CP", + None, + {}, + ), + ], + ids=[ + "ek60_cw_power", + "ek80_bb_complex", + "ek80_cw_power", + "ek80_cw_complex", + "es70", + "azfp", + "ad2cp", + ], +) +def test_data_samples(request, test_path): + ( + filepath, + sonar_model, + azfp_xml_path, + range_kwargs, + ) = request.param + if sonar_model.lower() in ["es70", "ad2cp"]: + pytest.xfail( + reason="Not supported at the moment", + ) + path_model, *paths = filepath + filepath = test_path[path_model].joinpath(*paths) + + if azfp_xml_path is not None: + path_model, *paths = azfp_xml_path + azfp_xml_path = test_path[path_model].joinpath(*paths) + return ( + filepath, + sonar_model, + azfp_xml_path, + range_kwargs, + ) + + +@pytest.fixture +def regular_data_params(): + return { + "channel_len": 4, + "depth_len": 4000, + "ping_time_len": 100, + "ping_time_jitter_max_ms": 0, + } + + +@pytest.fixture +def ds_Sv_echo_range_regular(regular_data_params, random_number_generator): + return _gen_Sv_echo_range_regular( + **regular_data_params, + random_number_generator=random_number_generator, + ) + + +@pytest.fixture +def latlon_history_attr(): + return ( + "2023-08-31 12:00:00.000000 +00:00. " + "Interpolated or propagated from Platform latitude/longitude." # noqa + ) + + +@pytest.fixture +def lat_attrs(latlon_history_attr): + """Latitude attributes""" + return { + "long_name": "Platform latitude", + "standard_name": "latitude", + "units": "degrees_north", + "valid_range": "(-90.0, 90.0)", + "history": latlon_history_attr, + } + + +@pytest.fixture +def lon_attrs(latlon_history_attr): + """Longitude attributes""" + return { + "long_name": "Platform longitude", + "standard_name": "longitude", + "units": "degrees_east", + "valid_range": "(-180.0, 180.0)", + "history": latlon_history_attr, + } + + +@pytest.fixture +def depth_offset(): + """Depth offset for calculating depth""" + return 2.5 + + +@pytest.fixture +def ds_Sv_echo_range_regular_w_latlon(ds_Sv_echo_range_regular, lat_attrs, lon_attrs): + """Sv dataset with latitude and longitude""" + n_pings = ds_Sv_echo_range_regular.ping_time.shape[0] + latitude = np.linspace(42, 43, num=n_pings) + longitude = np.linspace(-124, -125, num=n_pings) + + ds_Sv_echo_range_regular["latitude"] = (["ping_time"], latitude, lat_attrs) + ds_Sv_echo_range_regular["longitude"] = (["ping_time"], longitude, lon_attrs) + + # Need processing level code for compute MVBS to work! + ds_Sv_echo_range_regular.attrs["processing_level"] = "Level 2A" + return ds_Sv_echo_range_regular + + +@pytest.fixture +def ds_Sv_echo_range_regular_w_depth(ds_Sv_echo_range_regular, depth_offset): + """Sv dataset with depth""" + return ds_Sv_echo_range_regular.pipe(add_depth, depth_offset=depth_offset) + + +@pytest.fixture +def ds_Sv_echo_range_irregular(random_number_generator): + depth_interval = [0.5, 0.32, 0.13] + depth_ping_time_len = [100, 300, 200] + ping_time_len = 600 + ping_time_interval = "0.3S" + return _gen_Sv_echo_range_irregular( + depth_interval=depth_interval, + depth_ping_time_len=depth_ping_time_len, + ping_time_len=ping_time_len, + ping_time_interval=ping_time_interval, + ping_time_jitter_max_ms=0, + random_number_generator=random_number_generator, + ) + + +# Helper functions to generate mock Sv and MVBS dataset +def _get_expected_mvbs_val( + ds_Sv: xr.Dataset, ping_time_bin: str, range_bin: float, channel_len: int = 2 +) -> np.ndarray: + """ + Helper functions to generate expected MVBS outputs from mock Sv dataset + by brute-force looping and compute the mean + + Parameters + ---------- + ds_Sv : xr.Dataset + Mock Sv dataset + ping_time_bin : str + Ping time bin + range_bin : float + Range bin + channel_len : int, default 2 + Number of channels + """ + # create bin information needed for ping_time + d_index = ( + ds_Sv["ping_time"] + .resample(ping_time=ping_time_bin, skipna=True) + .first() # Not actually being used, but needed to get the bin groups + .indexes["ping_time"] + ) + ping_interval = d_index.union([d_index[-1] + pd.Timedelta(ping_time_bin)]).values + + # create bin information for echo_range + # this computes the echo range max since there might NaNs in the data + echo_range_max = ds_Sv["echo_range"].max() + range_interval = np.arange(0, echo_range_max + 2, range_bin) + + sv = ds_Sv["Sv"].pipe(ep.utils.compute._log2lin) + + expected_mvbs_val = np.ones((2, len(ping_interval) - 1, len(range_interval) - 1)) * np.nan + + for ch_idx in range(channel_len): + for p_idx in range(len(ping_interval) - 1): + for r_idx in range(len(range_interval) - 1): + echo_range = ( + ds_Sv['echo_range'] + .isel(channel=ch_idx) + .sel(ping_time=slice(ping_interval[p_idx], ping_interval[p_idx+1])) + ) + r_idx_active = np.logical_and( + echo_range.data >= range_interval[r_idx], + echo_range.data < range_interval[r_idx+1] + ) + sv_tmp = sv.isel(channel=ch_idx).sel( + ping_time=slice(ping_interval[p_idx], ping_interval[p_idx+1])).data[r_idx_active] + if 0 in sv_tmp.shape: + expected_mvbs_val[ch_idx, p_idx, r_idx] = np.nan + else: + expected_mvbs_val[ch_idx, p_idx, r_idx] = np.mean(sv_tmp) + return ep.utils.compute._lin2log(expected_mvbs_val) + + +def _gen_ping_time(ping_time_len, ping_time_interval, ping_time_jitter_max_ms=0): + ping_time = pd.date_range("2018-07-01", periods=ping_time_len, freq=ping_time_interval) + if ping_time_jitter_max_ms != 0: # if to add jitter + jitter = ( + np.random.randint(ping_time_jitter_max_ms, size=ping_time_len) / 1000 + ) # convert to seconds + ping_time = pd.to_datetime(ping_time.astype(int) / 1e9 + jitter, unit="s") + return ping_time + + +def _gen_Sv_echo_range_regular( + channel_len=2, + depth_len=100, + depth_interval=0.5, + ping_time_len=600, + ping_time_interval="0.3S", + ping_time_jitter_max_ms=0, + random_number_generator=None, +): + """ + Generate a Sv dataset with uniform echo_range across all ping_time. + + ping_time_jitter_max_ms controlled jitter in milliseconds in ping_time. + + Parameters + ------------ + channel_len + number of channels + depth_len + number of total depth bins + depth_interval + depth intervals, may have multiple values + ping_time_len + total number of ping_time + ping_time_interval + interval between pings + ping_time_jitter_max_ms + jitter of ping_time in milliseconds + """ + + if random_number_generator is None: + random_number_generator = np.random.default_rng() + + # regular echo_range + echo_range = np.array([[np.arange(depth_len)] * ping_time_len] * channel_len) * depth_interval + + # generate dataset + ds_Sv = xr.Dataset( + data_vars={ + "Sv": ( + ["channel", "ping_time", "range_sample"], + random_number_generator.random(size=(channel_len, ping_time_len, depth_len)), + ), + "echo_range": (["channel", "ping_time", "range_sample"], echo_range), + "frequency_nominal": (["channel"], np.arange(channel_len)), + }, + coords={ + "channel": [f"ch_{ch}" for ch in range(channel_len)], + "ping_time": _gen_ping_time(ping_time_len, ping_time_interval, ping_time_jitter_max_ms), + "range_sample": np.arange(depth_len), + }, + ) + + return ds_Sv + + +def _gen_Sv_echo_range_irregular( + channel_len=2, + depth_len=100, + depth_interval=[0.5, 0.32, 0.13], + depth_ping_time_len=[100, 300, 200], + ping_time_len=600, + ping_time_interval="0.3S", + ping_time_jitter_max_ms=0, + random_number_generator=None, +): + """ + Generate a Sv dataset with uniform echo_range across all ping_time. + + ping_time_jitter_max_ms controlled jitter in milliseconds in ping_time. + + Parameters + ------------ + channel_len + number of channels + depth_len + number of total depth bins + depth_interval + depth intervals, may have multiple values + depth_ping_time_len + the number of pings to use each of the depth_interval + for example, with depth_interval=[0.5, 0.32, 0.13] + and depth_ping_time_len=[100, 300, 200], + the first 100 pings have echo_range with depth intervals of 0.5 m, + the next 300 pings have echo_range with depth intervals of 0.32 m, + and the last 200 pings have echo_range with depth intervals of 0.13 m. + ping_time_len + total number of ping_time + ping_time_interval + interval between pings + ping_time_jitter_max_ms + jitter of ping_time in milliseconds + """ + if random_number_generator is None: + random_number_generator = np.random.default_rng() + + # check input + if len(depth_interval) != len(depth_ping_time_len): + raise ValueError("The number of depth_interval and depth_ping_time_len must be equal!") + + if ping_time_len != np.array(depth_ping_time_len).sum(): + raise ValueError("The number of total pings does not match!") + + # irregular echo_range + echo_range_list = [] + for d, dp in zip(depth_interval, depth_ping_time_len): + echo_range_list.append(np.array([[np.arange(depth_len)] * dp] * channel_len) * d) + echo_range = np.hstack(echo_range_list) + + # generate dataset + ds_Sv = xr.Dataset( + data_vars={ + "Sv": ( + ["channel", "ping_time", "range_sample"], + random_number_generator.random(size=(channel_len, ping_time_len, depth_len)), + ), + "echo_range": (["channel", "ping_time", "range_sample"], echo_range), + "frequency_nominal": (["channel"], np.arange(channel_len)), + }, + coords={ + "channel": [f"ch_{ch}" for ch in range(channel_len)], + "ping_time": _gen_ping_time(ping_time_len, ping_time_interval, ping_time_jitter_max_ms), + "range_sample": np.arange(depth_len), + }, + ) + + return ds_Sv + + +# End helper functions diff --git a/echopype/tests/commongrid/test_api.py b/echopype/tests/commongrid/test_api.py new file mode 100644 index 000000000..d618d6443 --- /dev/null +++ b/echopype/tests/commongrid/test_api.py @@ -0,0 +1,289 @@ +import pytest +import echopype as ep +import numpy as np + + +# Utilities Tests +@pytest.mark.parametrize( + ["x_bin", "x_label", "expected_result"], + [ + # Success + ("10m", "range_bin", 10.0), + ("0.2m", "range_bin", 0.2), + ("0.5nmi", "dist_bin", 0.5), + # Errored + (10, "range_bin", TypeError), + ("10km", "range_bin", ValueError), + ("10", "range_bin", ValueError), + ("10m", "invalid_label", KeyError), + ], +) +def test__parse_x_bin(x_bin, x_label, expected_result): + if x_label == "invalid_label": + expected_error_msg = r"x_label must be one of" + elif isinstance(x_bin, int): + expected_error_msg = r"must be a string" + elif x_bin in ["10km", "10"]: + expected_error_msg = r"must be in" + + if not isinstance(expected_result, float): + with pytest.raises(expected_result, match=expected_error_msg): + ep.commongrid.api._parse_x_bin(x_bin, x_label) + else: + assert ep.commongrid.api._parse_x_bin(x_bin, x_label) == expected_result + +# NASC Tests +@pytest.mark.integration +@pytest.mark.skip(reason="NASC is not implemented yet") +def test_compute_NASC(test_data_samples): + pass + + +# MVBS Tests +@pytest.mark.integration +def test_compute_MVBS_index_binning(ds_Sv_echo_range_regular, regular_data_params): + """Test compute_MVBS_index_binning on mock data""" + + ping_num = 3 # number of pings to average over + range_sample_num = 7 # number of range_samples to average over + nchan = regular_data_params["channel_len"] + npings = regular_data_params["ping_time_len"] + nrange_samples = regular_data_params["depth_len"] + + # Binned MVBS test + ds_MVBS = ep.commongrid.compute_MVBS_index_binning( + ds_Sv_echo_range_regular, range_sample_num=range_sample_num, ping_num=ping_num + ) + + # Shape test + data_binned_shape = np.ceil( + (nchan, npings / ping_num, nrange_samples / range_sample_num) + ).astype(int) + assert np.all(ds_MVBS.Sv.shape == data_binned_shape) + + # Expected values compute + # average should be done in linear domain + da_sv = 10 ** (ds_Sv_echo_range_regular["Sv"] / 10) + expected = 10 * np.log10( + da_sv.coarsen(ping_time=ping_num, range_sample=range_sample_num, boundary="pad").mean( + skipna=True + ) + ) + + # Test all values in MVBS + assert np.array_equal(ds_MVBS.Sv.data, expected.data) + + +@pytest.mark.unit +@pytest.mark.parametrize( + ["range_bin", "ping_time_bin"], [(5, "10S"), ("10m", 10), ("10km", "10S"), ("10", "10S")] +) +def test_compute_MVBS_bin_inputs_fail(ds_Sv_echo_range_regular, range_bin, ping_time_bin): + expected_error = ValueError + if isinstance(range_bin, int) or isinstance(ping_time_bin, int): + expected_error = TypeError + match = r"must be a string" + else: + match = r"Range bin must be in meters" + + with pytest.raises(expected_error, match=match): + ep.commongrid.compute_MVBS( + ds_Sv_echo_range_regular, range_bin=range_bin, ping_time_bin=ping_time_bin + ) + + +@pytest.mark.unit +def test_compute_MVBS_w_latlon(ds_Sv_echo_range_regular_w_latlon, lat_attrs, lon_attrs): + """Testing for compute_MVBS with latitude and longitude""" + from echopype.consolidate.api import POSITION_VARIABLES + + ds_MVBS = ep.commongrid.compute_MVBS( + ds_Sv_echo_range_regular_w_latlon, range_bin="5m", ping_time_bin="10S" + ) + for var in POSITION_VARIABLES: + # Check to ensure variable is in dataset + assert var in ds_MVBS.data_vars + # Check for correct shape, which is just ping time + assert ds_MVBS[var].shape == ds_MVBS.ping_time.shape + + # Check if attributes match + if var == "latitude": + assert ds_MVBS[var].attrs == lat_attrs + elif var == "longitude": + assert ds_MVBS[var].attrs == lon_attrs + + +@pytest.mark.unit +@pytest.mark.parametrize("range_var", ["my_range", "echo_range", "depth"]) +def test_compute_MVBS_invalid_range_var(ds_Sv_echo_range_regular, range_var): + """Test compute MVBS range_var on mock data""" + + if range_var == "my_range": + with pytest.raises(ValueError, match="range_var must be one of 'echo_range' or 'depth'."): + ep.commongrid.compute_MVBS(ds_Sv_echo_range_regular, range_var=range_var) + elif range_var == "depth": + with pytest.raises( + ValueError, match=f"range_var '{range_var}' does not exist in the input dataset." + ): + ep.commongrid.compute_MVBS(ds_Sv_echo_range_regular, range_var=range_var) + else: + pass + + +@pytest.mark.integration +def test_compute_MVBS(test_data_samples): + """ + Test running through from open_raw to compute_MVBS. + """ + ( + filepath, + sonar_model, + azfp_xml_path, + range_kwargs, + ) = test_data_samples + ed = ep.open_raw(filepath, sonar_model, azfp_xml_path) + if ed.sonar_model.lower() == "azfp": + avg_temperature = ed["Environment"]["temperature"].values.mean() + env_params = { + "temperature": avg_temperature, + "salinity": 27.9, + "pressure": 59, + } + range_kwargs["env_params"] = env_params + if "azfp_cal_type" in range_kwargs: + range_kwargs.pop("azfp_cal_type") + Sv = ep.calibrate.compute_Sv(ed, **range_kwargs) + ping_time_bin = "20S" + ds_MVBS = ep.commongrid.compute_MVBS(Sv, ping_time_bin=ping_time_bin) + assert ds_MVBS is not None + + # Test to see if ping_time was resampled correctly + expected_ping_time = ( + Sv["ping_time"].resample(ping_time=ping_time_bin, skipna=True).asfreq().indexes["ping_time"] + ) + assert np.array_equal(ds_MVBS.ping_time.data, expected_ping_time.values) + + +@pytest.mark.integration +@pytest.mark.parametrize( + ("er_type"), + [ + ("regular"), + ("irregular"), + ], +) +def test_compute_MVBS_range_output(request, er_type): + """ + Tests the shape of compute_MVBS output on regular and irregular data. + The irregularity in the input echo_range would cause some rows or columns + of the output Sv to contain NaN. + Here we test for the expected shape after dropping the NaNs + for specific ping_time bins. + """ + # set jitter=0 to get predictable number of ping within each echo_range groups + if er_type == "regular": + ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular") + else: + ds_Sv = request.getfixturevalue("ds_Sv_echo_range_irregular") + + ds_MVBS = ep.commongrid.compute_MVBS(ds_Sv, range_bin="5m", ping_time_bin="10S") + + if er_type == "regular": + expected_len = ( + ds_Sv["channel"].size, # channel + np.ceil(np.diff(ds_Sv["ping_time"][[0, -1]].astype(int)) / 1e9 / 10), # ping_time + np.ceil(ds_Sv["echo_range"].max() / 5), # depth + ) + assert ds_MVBS["Sv"].shape == expected_len + else: + assert (ds_MVBS["Sv"].isel(ping_time=slice(None, 3)).dropna(dim="echo_range").shape) == ( + 2, + 3, + 10, + ) # full array, no NaN + assert (ds_MVBS["Sv"].isel(ping_time=slice(3, 12)).dropna(dim="echo_range").shape) == ( + 2, + 9, + 7, + ) # bottom bins contain NaN + assert (ds_MVBS["Sv"].isel(ping_time=slice(12, None)).dropna(dim="echo_range").shape) == ( + 2, + 6, + 3, + ) # bottom bins contain NaN + + +@pytest.mark.integration +@pytest.mark.parametrize( + ("er_type"), + [ + ("regular"), + ("irregular"), + ], +) +def test_compute_MVBS_values(request, er_type): + """Tests for the values of compute_MVBS on regular and irregular data.""" + + def _parse_nans(mvbs, ds_Sv) -> np.ndarray: + """Go through and figure out nan values in result""" + echo_range_step = np.unique(np.diff(mvbs.Sv.echo_range.values))[0] + expected_outs = [] + # Loop over channels + for chan in mvbs.Sv.channel.values: + # Get ping times for this channel + ping_times = mvbs.Sv.ping_time.values + # Compute the total number of pings + ping_len = len(ping_times) + # Variable to store the expected output for this channel + chan_expected = [] + for idx in range(ping_len): + # Loop over pings and create slices + if idx < ping_len - 1: + ping_slice = slice(ping_times[idx], ping_times[idx + 1]) + else: + ping_slice = slice(ping_times[idx], None) + + # Get the original echo_range data for this channel and ping slice + da = ds_Sv.echo_range.sel(channel=chan, ping_time=ping_slice) + # Drop the nan values since this shouldn't be included in actual + # computation for compute_MVBS, a.k.a. 'nanmean' + mean_values = da.dropna(dim="ping_time", how="all").values + # Compute the histogram of the mean values to get distribution + hist, _ = np.histogram( + mean_values[~np.isnan(mean_values)], + bins=np.append( + mvbs.Sv.echo_range.values, + # Add one more bin to account for the last value + mvbs.Sv.echo_range.values.max() + echo_range_step, + ), + ) + # Convert any non-zero values to False, and zero values to True + # to imitate having nan values since there's no value for that bin + chan_expected.append([False if v > 0 else True for v in hist]) + expected_outs.append(chan_expected) + return np.array(expected_outs) + + range_bin = "2m" + ping_time_bin = "1s" + + if er_type == "regular": + ds_Sv = request.getfixturevalue("mock_Sv_dataset_regular") + expected_mvbs = request.getfixturevalue("mock_mvbs_array_regular") + else: + # Mock irregular dataset contains jitter + # and NaN values in the bottom echo_range + ds_Sv = request.getfixturevalue("mock_Sv_dataset_irregular") + expected_mvbs = request.getfixturevalue("mock_mvbs_array_irregular") + + ds_MVBS = ep.commongrid.compute_MVBS(ds_Sv, range_bin=range_bin, ping_time_bin=ping_time_bin) + + expected_outputs = _parse_nans(ds_MVBS, ds_Sv) + + assert ds_MVBS.Sv.shape == expected_mvbs.shape + # Floating digits need to check with all close not equal + # Compare the values of the MVBS array with the expected values + assert np.allclose(ds_MVBS.Sv.values, expected_mvbs, atol=1e-10, rtol=1e-10, equal_nan=True) + + # Ensures that the computation of MVBS takes doesn't take into account NaN values + # that are sporadically placed in the echo_range values + assert np.array_equal(np.isnan(ds_MVBS.Sv.values), expected_outputs) diff --git a/echopype/tests/commongrid/test_mvbs.py b/echopype/tests/commongrid/test_mvbs.py index 449fe0b9c..78a77be0a 100644 --- a/echopype/tests/commongrid/test_mvbs.py +++ b/echopype/tests/commongrid/test_mvbs.py @@ -1,835 +1,67 @@ -import dask.array import numpy as np -from numpy.random import default_rng import pandas as pd import pytest -from typing import Tuple, Iterable, Union -import xarray as xr - -import echopype as ep -from echopype.commongrid.mvbs import bin_and_mean_2d - - -@pytest.fixture( - params=[ - ( - ("EK60", "ncei-wcsd", "Summer2017-D20170719-T211347.raw"), - "EK60", - None, - {}, - ), - ( - ("EK80_NEW", "echopype-test-D20211004-T235930.raw"), - "EK80", - None, - {'waveform_mode': 'BB', 'encode_mode': 'complex'}, - ), - ( - ("EK80_NEW", "D20211004-T233354.raw"), - "EK80", - None, - {'waveform_mode': 'CW', 'encode_mode': 'power'}, - ), - ( - ("EK80_NEW", "D20211004-T233115.raw"), - "EK80", - None, - {'waveform_mode': 'CW', 'encode_mode': 'complex'}, - ), - (("ES70", "D20151202-T020259.raw"), "ES70", None, {}), - (("AZFP", "17082117.01A"), "AZFP", ("AZFP", "17041823.XML"), {}), - ( - ("AD2CP", "raw", "090", "rawtest.090.00001.ad2cp"), - "AD2CP", - None, - {}, - ), - ], - ids=[ - "ek60_cw_power", - "ek80_bb_complex", - "ek80_cw_power", - "ek80_cw_complex", - "es70", - "azfp", - "ad2cp", - ], -) -def test_data_samples(request, test_path): - ( - filepath, - sonar_model, - azfp_xml_path, - range_kwargs, - ) = request.param - if sonar_model.lower() in ['es70', 'ad2cp']: - pytest.xfail( - reason="Not supported at the moment", - ) - path_model, *paths = filepath - filepath = test_path[path_model].joinpath(*paths) - - if azfp_xml_path is not None: - path_model, *paths = azfp_xml_path - azfp_xml_path = test_path[path_model].joinpath(*paths) - return ( - filepath, - sonar_model, - azfp_xml_path, - range_kwargs, - ) - - -def _construct_MVBS_toy_data( - nchan, npings, nrange_samples, ping_size, range_sample_size -): - """Construct data with values that increase every ping_num and ``range_sample_num`` - so that the result of computing MVBS is a smaller array - that increases regularly for each resampled ``ping_time`` and ``range_sample`` - - Parameters - ---------- - nchan : int - number of channels - npings : int - number of pings - nrange_samples : int - number of range samples - ping_size : int - number of pings with the same value - range_sample_size : int - number of range samples with the same value - - Returns - ------- - np.ndarray - Array with blocks of ``ping_time`` and ``range_sample`` with the same value, - so that computing the MVBS will result in regularly increasing values - every row and column - """ - data = np.ones((nchan, npings, nrange_samples)) - for p_i, ping in enumerate(range(0, npings, ping_size)): - for r_i, rb in enumerate(range(0, nrange_samples, range_sample_size)): - data[0, ping : ping + ping_size, rb : rb + range_sample_size] += ( - r_i + p_i - ) - # First channel increases by 1 each row and column, second increases by 2, third by 3, etc. - for f in range(nchan): - data[f] = data[0] * (f + 1) - - return data - - -def _construct_MVBS_test_data(nchan, npings, nrange_samples): - """Construct data for testing the toy data from - `_construct_MVBS_toy_data` after it has gone through the - MVBS calculation. - - Parameters - ---------- - nchan : int - number of channels - npings : int - number of pings - nrange_samples : int - number of range samples - - Returns - ------- - np.ndarray - Array with values that increases regularly - every ping and range sample - """ - - # Construct test array - test_array = np.add(*np.indices((npings, nrange_samples))) - return np.array([(test_array + 1) * (f + 1) for f in range(nchan)]) - - -def test_compute_MVBS_index_binning(): - """Test compute_MVBS_index_binning on toy data""" - - # Parameters for toy data - nchan, npings, nrange_samples = 4, 40, 400 - ping_num = 3 # number of pings to average over - range_sample_num = 7 # number of range_samples to average over - - # Construct toy data that increases regularly every ping_num and range_sample_num - data = _construct_MVBS_toy_data( - nchan=nchan, - npings=npings, - nrange_samples=nrange_samples, - ping_size=ping_num, - range_sample_size=range_sample_num, - ) - - data_log = 10 * np.log10(data) # Convert to log domain - chan_index = np.arange(nchan).astype(str) - ping_index = np.arange(npings) - range_sample = np.arange(nrange_samples) - Sv = xr.DataArray( - data_log, - coords=[ - ('channel', chan_index), - ('ping_time', ping_index), - ('range_sample', range_sample), - ], - ) - Sv.name = "Sv" - ds_Sv = Sv.to_dataset() - ds_Sv["frequency_nominal"] = chan_index # just so there's value in freq_nominal - ds_Sv = ds_Sv.assign( - echo_range=xr.DataArray( - np.array([[np.linspace(0, 10, nrange_samples)] * npings] * nchan), - coords=Sv.coords, - ) - ) - - # Binned MVBS test - ds_MVBS = ep.commongrid.compute_MVBS_index_binning( - ds_Sv, range_sample_num=range_sample_num, ping_num=ping_num - ) - data_test = 10 ** (ds_MVBS.Sv / 10) # Convert to linear domain - - # Shape test - data_binned_shape = np.ceil( - (nchan, npings / ping_num, nrange_samples / range_sample_num) - ).astype(int) - assert np.all(data_test.shape == data_binned_shape) - - # Construct test array that increases by 1 for each range_sample and ping_time - test_array = _construct_MVBS_test_data( - nchan, data_binned_shape[1], data_binned_shape[2] - ) - - # Test all values in MVBS - assert np.allclose(data_test, test_array, rtol=0, atol=1e-12) - - -def _coll_test_comp_MVBS(ds_Sv, nchan, ping_num, - range_sample_num, ping_time_bin, - total_range, range_meter_bin): - """A collection of tests for test_compute_MVBS""" - - ds_MVBS = ep.commongrid.compute_MVBS( - ds_Sv, - range_meter_bin=range_meter_bin, - ping_time_bin=f'{ping_time_bin}S', - ) - - data_test = 10 ** (ds_MVBS.Sv / 10) # Convert to linear domain - - # Shape test - data_binned_shape = np.ceil((nchan, ping_num, range_sample_num)).astype(int) - assert np.all(data_test.shape == data_binned_shape) - - # Construct test array that increases by 1 for each range_sample and ping_time - test_array = _construct_MVBS_test_data( - nchan, data_binned_shape[1], data_binned_shape[2] - ) - - # Test all values in MVBS - assert np.allclose(data_test, test_array, rtol=0, atol=1e-12) - - # Test to see if ping_time was resampled correctly - test_ping_time = pd.date_range( - '1/1/2020', periods=np.ceil(ping_num), freq=f'{ping_time_bin}S' - ) - assert np.array_equal(data_test.ping_time, test_ping_time) - - # Test to see if range was resampled correctly - test_range = np.arange(0, total_range, range_meter_bin) - assert np.array_equal(data_test.echo_range, test_range) - - -def _fill_w_nans(narr, nan_ping_time, nan_range_sample): - """ - A routine that fills a numpy array with nans. - - Parameters - ---------- - narr : numpy array - Array of dimensions (ping_time, range_sample) - nan_ping_time : list - ping times to fill with nans - nan_range_sample: list - range samples to fill with nans - """ - if len(nan_ping_time) != len(nan_range_sample): - raise ValueError('These lists must be the same size!') - - # fill in nans according to the provided lists - for i, j in zip(nan_ping_time, nan_range_sample): - narr[i, j] = np.nan - - return narr - - -def _nan_cases_comp_MVBS(ds_Sv, chan): - """ - For a single channel, obtains numpy array - filled with nans for various cases - """ - - # get echo_range values for a single channel - one_chan_er = ds_Sv.echo_range.sel(channel=chan).copy().values - - # ping times to fill with NaNs - nan_ping_time_1 = [slice(None), slice(None)] - # range samples to fill with NaNs - nan_range_sample_1 = [3, 4] - # pad all ping_times with nans for a certain range_sample - case_1 = _fill_w_nans(one_chan_er, nan_ping_time_1, nan_range_sample_1) - - # get echo_range values for a single channel - one_chan_er = ds_Sv.echo_range.sel(channel=chan).copy().values - # ping times to fill with NaNs - nan_ping_time_2 = [1, 3, 5, 9] - # range samples to fill with NaNs - nan_range_sample_2 = [slice(None), slice(None), slice(None), slice(None)] - # pad all range_samples of certain ping_times - case_2 = _fill_w_nans(one_chan_er, nan_ping_time_2, nan_range_sample_2) - - # get echo_range values for a single channel - one_chan_er = ds_Sv.echo_range.sel(channel=chan).copy().values - # ping times to fill with NaNs - nan_ping_time_3 = [0, 2, 5, 7] - # range samples to fill with NaNs - nan_range_sample_3 = [slice(0, 2), slice(None), slice(None), slice(0, 3)] - # pad all range_samples of certain ping_times and - # pad some ping_times with nans for a certain range_sample - case_3 = _fill_w_nans(one_chan_er, nan_ping_time_3, nan_range_sample_3) - - return case_1, case_2, case_3 - - -def test_compute_MVBS(): - """Test compute_MVBS on toy data""" - - # Parameters for fake data - nchan, npings, nrange_samples = 4, 100, 4000 - range_meter_bin = 7 # range in meters to average over - ping_time_bin = 3 # number of seconds to average over - ping_rate = 2 # Number of pings per second - range_sample_per_meter = 30 # Number of range_samples per meter - - # Useful conversions - ping_num = ( - npings / ping_rate / ping_time_bin - ) # number of pings to average over - range_sample_num = ( - nrange_samples / range_sample_per_meter / range_meter_bin - ) # number of range_samples to average over - total_range = nrange_samples / range_sample_per_meter # total range in meters - - # Construct data with values that increase with range and time - # so that when compute_MVBS is performed, the result is a smaller array - # that increases by a constant for each meter_bin and time_bin - data = _construct_MVBS_toy_data( - nchan=nchan, - npings=npings, - nrange_samples=nrange_samples, - ping_size=ping_rate * ping_time_bin, - range_sample_size=range_sample_per_meter * range_meter_bin, - ) - - data_log = 10 * np.log10(data) # Convert to log domain - chan_index = np.arange(nchan).astype(str) - freq_nom = np.arange(nchan) - # Generate a date range with `npings` number of pings with the frequency of the ping_rate - ping_time = pd.date_range( - '1/1/2020', periods=npings, freq=f'{1/ping_rate}S' +from echopype.commongrid.mvbs import get_MVBS_along_channels +from echopype.consolidate.api import POSITION_VARIABLES +from flox.xarray import xarray_reduce + +@pytest.mark.unit +@pytest.mark.parametrize(["range_var", "lat_lon"], [("depth", False), ("echo_range", True), ("echo_range", False)]) +def test_get_MVBS_along_channels(request, range_var, lat_lon): + """Testing the underlying function of compute_MVBS""" + range_bin = 20 + ping_time_bin = "20S" + method = "map-reduce" + + flox_kwargs = { + "reindex": True + } + + # Retrieve the correct dataset + if range_var == "depth": + ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular_w_depth") + elif range_var == "echo_range" and lat_lon is True: + ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular_w_latlon") + else: + ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular") + + # compute range interval + echo_range_max = ds_Sv[range_var].max() + range_interval = np.arange(0, echo_range_max + range_bin, range_bin) + + # create bin information needed for ping_time + d_index = ( + ds_Sv["ping_time"] + .resample(ping_time=ping_time_bin, skipna=True) + .asfreq() + .indexes["ping_time"] ) - range_sample = np.arange(nrange_samples) - Sv = xr.DataArray( - data_log, - coords=[ - ('channel', chan_index), - ('ping_time', ping_time), - ('range_sample', range_sample), - ], + ping_interval = d_index.union([d_index[-1] + pd.Timedelta(ping_time_bin)]) + + raw_MVBS = get_MVBS_along_channels( + ds_Sv, range_interval, ping_interval, + range_var=range_var, method=method, **flox_kwargs ) - Sv.name = "Sv" - ds_Sv = Sv.to_dataset() - ds_Sv = ds_Sv.assign( - frequency_nominal=xr.DataArray(freq_nom, coords={'channel': chan_index}), - echo_range=xr.DataArray( - np.array( - [[np.linspace(0, total_range, nrange_samples)] * npings] * nchan - ), - coords=Sv.coords, + + # Check that the range_var is in the dimension + assert f"{range_var}_bins" in raw_MVBS.dims + + # When it's echo_range and lat_lon, the dataset should have positions + if range_var == "echo_range" and lat_lon is True: + assert raw_MVBS.attrs["has_positions"] is True + assert all(v in raw_MVBS for v in POSITION_VARIABLES) + + # Compute xarray reduce manually for this + expected_Pos = xarray_reduce( + ds_Sv[POSITION_VARIABLES], + ds_Sv["ping_time"], + func="nanmean", + expected_groups=(ping_interval), + isbin=True, + method=method, ) - ) - - # initial test of compute_MVBS - _coll_test_comp_MVBS(ds_Sv, nchan, ping_num, - range_sample_num, ping_time_bin, - total_range, range_meter_bin) - - # TODO: use @pytest.fixture params/ids - # for multiple similar tests using the same set of parameters - # different nan cases for a single channel - case_1, case_2, case_3 = _nan_cases_comp_MVBS(ds_Sv, chan='0') - - # pad all ping_times with nans for a certain range_sample - ds_Sv['echo_range'].loc[{'channel': '0'}] = case_1 - - _coll_test_comp_MVBS(ds_Sv, nchan, ping_num, - range_sample_num, ping_time_bin, - total_range, range_meter_bin) - - # pad all range_samples of certain ping_times - ds_Sv['echo_range'].loc[{'channel': '0'}] = case_2 - - _coll_test_comp_MVBS(ds_Sv, nchan, ping_num, - range_sample_num, ping_time_bin, - total_range, range_meter_bin) - - # pad all range_samples of certain ping_times and - # pad some ping_times with nans for a certain range_sample - ds_Sv['echo_range'].loc[{'channel': '0'}] = case_3 - - _coll_test_comp_MVBS(ds_Sv, nchan, ping_num, - range_sample_num, ping_time_bin, - total_range, range_meter_bin) - - -def test_commongrid_mvbs(test_data_samples): - """ - Test running through from open_raw to compute_MVBS. - """ - ( - filepath, - sonar_model, - azfp_xml_path, - range_kwargs, - ) = test_data_samples - ed = ep.open_raw(filepath, sonar_model, azfp_xml_path) - if ed.sonar_model.lower() == 'azfp': - avg_temperature = ed["Environment"]['temperature'].values.mean() - env_params = { - 'temperature': avg_temperature, - 'salinity': 27.9, - 'pressure': 59, - } - range_kwargs['env_params'] = env_params - if 'azfp_cal_type' in range_kwargs: - range_kwargs.pop('azfp_cal_type') - Sv = ep.calibrate.compute_Sv(ed, **range_kwargs) - assert ep.commongrid.compute_MVBS(Sv) is not None - - -def create_bins(csum_array: np.ndarray) -> Iterable: - """ - Constructs bin ranges based off of a cumulative - sum array. - - Parameters - ---------- - csum_array: np.ndarray - 1D array representing a cumulative sum - - Returns - ------- - bins: list - A list whose elements are the lower and upper bin ranges - """ - - bins = [] - - # construct bins - for count, csum in enumerate(csum_array): - - if count == 0: - - bins.append([0, csum]) - - else: - - # add 0.01 so that left bins don't overlap - bins.append([csum_array[count-1] + 0.01, csum]) - - return bins - - -def create_echo_range_related_data(ping_bins: Iterable, - num_pings_in_bin: np.ndarray, - er_range: list, er_bins: Iterable, - final_num_er_bins: int, - create_dask: bool, - rng: np.random.Generator, - ping_bin_nan_ind: int) -> Tuple[list, list, list]: - """ - Creates ``echo_range`` values and associated bin information. - - Parameters - ---------- - ping_bins: list - A list whose elements are the lower and upper ping time bin ranges - num_pings_in_bin: np.ndarray - Specifies the number of pings in each ping time bin - er_range: list - A list whose first element is the lowest and second element is - the highest possible number of echo range values in a given bin - er_bins: list - A list whose elements are the lower and upper echo range bin ranges - final_num_er_bins: int - The total number of echo range bins - create_dask: bool - If True ``final_arrays`` values will be - dask arrays, else they will be numpy arrays - rng: np.random.Generator - The generator for random values - ping_bin_nan_ind: int - The ping bin index to fill with NaNs - - Returns - ------- - all_er_bin_nums: list of np.ndarray - A list whose elements are the number of values in each echo_range - bin, for each ping bin - ping_times_in_bin: list of np.ndarray - A list whose elements are the ping_time values for each corresponding bin - final_arrays: list of np.ndarray or dask.array.Array - A list whose elements are the echo_range values for a given ping and - echo range bin block - """ - - final_arrays = [] - all_er_bin_nums = [] - ping_times_in_bin = [] - - # build echo_range array - for ping_ind, ping_bin in enumerate(ping_bins): - - # create the ping times associated with each ping bin - ping_times_in_bin.append(rng.uniform(ping_bin[0], ping_bin[1], (num_pings_in_bin[ping_ind],))) - - # randomly determine the number of values in each echo_range bin - num_er_in_bin = rng.integers(low=er_range[0], high=er_range[1], size=final_num_er_bins) - - # store the number of values in each echo_range bin - all_er_bin_nums.append(num_er_in_bin) - - er_row_block = [] - for count, bin_val in enumerate(er_bins): - - # create a block of echo_range values - if create_dask: - a = dask.array.random.uniform(bin_val[0], bin_val[1], (num_pings_in_bin[ping_ind], - num_er_in_bin[count])) - else: - a = rng.uniform(bin_val[0], bin_val[1], (num_pings_in_bin[ping_ind], - num_er_in_bin[count])) - - # store the block of echo_range values - er_row_block.append(a) - - # set all echo_range values at ping index to NaN - if ping_ind == ping_bin_nan_ind: - a[:, :] = np.nan - - # collect and construct echo_range row block - final_arrays.append(np.concatenate(er_row_block, axis=1)) - - return all_er_bin_nums, ping_times_in_bin, final_arrays - - -def construct_2d_echo_range_array(final_arrays: Iterable[np.ndarray], - ping_csum: np.ndarray, - create_dask: bool) -> Tuple[Union[np.ndarray, dask.array.Array], int]: - """ - Creates the final 2D ``echo_range`` array with appropriate padding. - - Parameters - ---------- - final_arrays: list of np.ndarray - A list whose elements are the echo_range values for a given ping and - echo range bin block - ping_csum: np.ndarray - 1D array representing the cumulative sum for the number of ping times - in each ping bin - create_dask: bool - If True ``final_er`` will be a dask array, else it will be a numpy array - - Returns - ------- - final_er: np.ndarray or dask.array.Array - The final 2D ``echo_range`` array - max_num_er_elem: int - The maximum number of ``echo_range`` elements amongst all times - """ - - # get maximum number of echo_range elements amongst all times - max_num_er_elem = max([arr.shape[1] for arr in final_arrays]) - - # total number of ping times - tot_num_times = ping_csum[-1] - - # pad echo_range dimension with nans and create final echo_range - if create_dask: - final_er = dask.array.ones(shape=(tot_num_times, max_num_er_elem)) * np.nan + + for v in POSITION_VARIABLES: + assert np.array_equal(raw_MVBS[v].data, expected_Pos[v].data) else: - final_er = np.empty((tot_num_times, max_num_er_elem)) - final_er[:] = np.nan - - for count, arr in enumerate(final_arrays): - - if count == 0: - final_er[0:ping_csum[count], 0:arr.shape[1]] = arr - else: - final_er[ping_csum[count - 1]:ping_csum[count], 0:arr.shape[1]] = arr - - return final_er, max_num_er_elem - - -def construct_2d_sv_array(max_num_er_elem: int, ping_csum: np.ndarray, - all_er_bin_nums: Iterable[np.ndarray], - num_pings_in_bin: np.ndarray, - create_dask: bool, - ping_bin_nan_ind: int) -> Tuple[Union[np.ndarray, dask.array.Array], - np.ndarray]: - """ - Creates the final 2D Sv array with appropriate padding. - - Parameters - ---------- - max_num_er_elem: int - The maximum number of ``echo_range`` elements amongst all times - ping_csum: np.ndarray - 1D array representing the cumulative sum for the number of ping times - in each ping bin - all_er_bin_nums: list of np.ndarray - A list whose elements are the number of values in each echo_range - bin, for each ping bin - num_pings_in_bin: np.ndarray - Specifies the number of pings in each ping time bin - create_dask: bool - If True ``final_sv`` will be a dask array, else it will be a numpy array - ping_bin_nan_ind: int - The ping bin index to fill with NaNs - - Returns - ------- - final_sv: np.ndarray or dask.array.Array - The final 2D Sv array - final_MVBS: np.ndarray - The final 2D known MVBS array - """ - - # total number of ping times - tot_num_times = ping_csum[-1] - - # pad echo_range dimension with nans and create final sv - if create_dask: - final_sv = dask.array.ones(shape=(tot_num_times, max_num_er_elem)) * np.nan - else: - final_sv = np.empty((tot_num_times, max_num_er_elem)) - final_sv[:] = np.nan - - final_means = [] - for count, arr in enumerate(all_er_bin_nums): - - # create sv row values using natural numbers - sv_row_list = [np.arange(1, num_elem + 1, 1, dtype=np.float64) for num_elem in arr] - - # create final sv row - sv_row = np.concatenate(sv_row_list) - - # get final mean which is n+1/2 (since we are using natural numbers) - ping_mean = [(len(elem) + 1) / 2.0 for elem in sv_row_list] - - # create sv row block - sv_row_block = np.tile(sv_row, (num_pings_in_bin[count], 1)) - - if count == ping_bin_nan_ind: - - # fill values with NaNs - ping_mean = [np.nan]*len(sv_row_list) - sv_row_block[:, :] = np.nan - - # store means for ping - final_means.append(ping_mean) - - if count == 0: - final_sv[0:ping_csum[count], 0:sv_row_block.shape[1]] = sv_row_block - else: - final_sv[ping_csum[count - 1]:ping_csum[count], 0:sv_row_block.shape[1]] = sv_row_block - - # create final sv MVBS - final_MVBS = np.vstack(final_means) - - return final_sv, final_MVBS - - -def create_known_mean_data(final_num_ping_bins: int, - final_num_er_bins: int, - ping_range: list, - er_range: list, create_dask: bool, - rng: np.random.Generator) -> Tuple[np.ndarray, np.ndarray, Iterable, - Iterable, np.ndarray, np.ndarray]: - """ - Orchestrates the creation of ``echo_range``, ``ping_time``, and ``Sv`` arrays - where the MVBS is known. - - Parameters - ---------- - final_num_ping_bins: int - The total number of ping time bins - final_num_er_bins: int - The total number of echo range bins - ping_range: list - A list whose first element is the lowest and second element is - the highest possible number of ping time values in a given bin - er_range: list - A list whose first element is the lowest and second element is - the highest possible number of echo range values in a given bin - create_dask: bool - If True the ``Sv`` and ``echo_range`` values produced will be - dask arrays, else they will be numpy arrays. - rng: np.random.Generator - generator for random integers - - Returns - ------- - final_MVBS: np.ndarray - The final 2D known MVBS array - final_sv: np.ndarray - The final 2D Sv array - ping_bins: Iterable - A list whose elements are the lower and upper ping time bin ranges - er_bins: Iterable - A list whose elements are the lower and upper echo range bin ranges - final_er: np.ndarray - The final 2D ``echo_range`` array - final_ping_time: np.ndarray - The final 1D ``ping_time`` array - """ - - # randomly generate the number of pings in each ping bin - num_pings_in_bin = rng.integers(low=ping_range[0], high=ping_range[1], size=final_num_ping_bins) - - # create bins for ping_time dimension - ping_csum = np.cumsum(num_pings_in_bin) - ping_bins = create_bins(ping_csum) - - # create bins for echo_range dimension - num_er_in_bin = rng.integers(low=er_range[0], high=er_range[1], size=final_num_er_bins) - er_csum = np.cumsum(num_er_in_bin) - er_bins = create_bins(er_csum) - - # randomly select one ping bin to fill with NaNs - ping_bin_nan_ind = rng.choice(len(ping_bins)) - - # create the echo_range data and associated bin information - all_er_bin_nums, ping_times_in_bin, final_er_arrays = create_echo_range_related_data(ping_bins, num_pings_in_bin, - er_range, er_bins, - final_num_er_bins, - create_dask, - rng, - ping_bin_nan_ind) - - # create the final echo_range array using created data and padding - final_er, max_num_er_elem = construct_2d_echo_range_array(final_er_arrays, ping_csum, create_dask) - - # get final ping_time dimension - final_ping_time = np.concatenate(ping_times_in_bin).astype('datetime64[ns]') - - # create the final sv array - final_sv, final_MVBS = construct_2d_sv_array(max_num_er_elem, ping_csum, - all_er_bin_nums, num_pings_in_bin, - create_dask, ping_bin_nan_ind) - - return final_MVBS, final_sv, ping_bins, er_bins, final_er, final_ping_time - - -@pytest.fixture( - params=[ - { - "create_dask": True, - "final_num_ping_bins": 10, - "final_num_er_bins": 10, - "ping_range": [10, 1000], - "er_range": [10, 1000] - }, - { - "create_dask": False, - "final_num_ping_bins": 10, - "final_num_er_bins": 10, - "ping_range": [10, 1000], - "er_range": [10, 1000] - }, - ], - ids=[ - "delayed_data", - "in_memory_data" - ], -) -def bin_and_mean_2d_params(request): - """ - Obtains all necessary parameters for ``test_bin_and_mean_2d``. - """ - - return list(request.param.values()) - - -def test_bin_and_mean_2d(bin_and_mean_2d_params) -> None: - """ - Tests the function ``bin_and_mean_2d``, which is the core - method for ``compute_MVBS``. This is done by creating mock - data (which can have varying number of ``echo_range`` values - for each ``ping_time``) with known means. - - Parameters - ---------- - create_dask: bool - If True the ``Sv`` and ``echo_range`` values produced will be - dask arrays, else they will be numpy arrays. - final_num_ping_bins: int - The total number of ping time bins - final_num_er_bins: int - The total number of echo range bins - ping_range: list - A list whose first element is the lowest and second element is - the highest possible number of ping time values in a given bin - er_range: list - A list whose first element is the lowest and second element is - the highest possible number of echo range values in a given bin - """ - - # get all parameters needed to create the mock data - create_dask, final_num_ping_bins, final_num_er_bins, ping_range, er_range = bin_and_mean_2d_params - - # randomly generate a seed - seed = np.random.randint(low=10, high=100000, size=1)[0] - - print(f"seed used to generate mock data: {seed}") - - # establish generator for random integers - rng = default_rng(seed=seed) - - # seed dask random generator - if create_dask: - dask.array.random.seed(seed=seed) - - # create echo_range, ping_time, and Sv arrays where the MVBS is known - known_MVBS, final_sv, ping_bins, er_bins, final_er, final_ping_time = create_known_mean_data(final_num_ping_bins, - final_num_er_bins, - ping_range, er_range, - create_dask, - rng) - - # put the created ping bins into a form that works with bin_and_mean_2d - digitize_ping_bin = np.array([*ping_bins[0]] + [bin_val[1] for bin_val in ping_bins[1:-1]]) - digitize_ping_bin = digitize_ping_bin.astype('datetime64[ns]') - - # put the created echo range bins into a form that works with bin_and_mean_2d - digitize_er_bin = np.array([*er_bins[0]] + [bin_val[1] for bin_val in er_bins[1:]]) - - # calculate MVBS for mock data set - calc_MVBS = bin_and_mean_2d(arr=final_sv, bins_time=digitize_ping_bin, - bins_er=digitize_er_bin, times=final_ping_time, - echo_range=final_er, comprehensive_er_check=True) - - # compare known MVBS solution against its calculated counterpart - assert np.allclose(calc_MVBS, known_MVBS, atol=1e-10, rtol=1e-10, equal_nan=True) + assert raw_MVBS.attrs["has_positions"] is False diff --git a/echopype/tests/utils/test_processinglevels_integration.py b/echopype/tests/utils/test_processinglevels_integration.py index 0dadbfc87..10c81c9eb 100644 --- a/echopype/tests/utils/test_processinglevels_integration.py +++ b/echopype/tests/utils/test_processinglevels_integration.py @@ -127,8 +127,6 @@ def _freqdiff_applymask(test_ds): # ---- Compute MVBS # compute_MVBS expects a variable named "Sv" - # No product level is assigned because at present compute_MVBS drops the lat/lon data - # associated with the input Sv dataset # ds = ds.rename_vars(name_dict={"Sv": "Sv_unmasked", "Sv_ch0": "Sv"}) - mvbs_ds = ep.commongrid.compute_MVBS(ds, range_meter_bin=30, ping_time_bin='1min') - _absence_test(mvbs_ds) + mvbs_ds = ep.commongrid.compute_MVBS(ds, range_bin="30m", ping_time_bin='1min') + _presence_test(mvbs_ds, "Level 3B") diff --git a/echopype/utils/compute.py b/echopype/utils/compute.py new file mode 100644 index 000000000..936a1187d --- /dev/null +++ b/echopype/utils/compute.py @@ -0,0 +1,41 @@ +"""compute.py + +Module containing various helper functions +for performing computations within echopype. +""" +from typing import Union + +import dask.array +import numpy as np + + +def _log2lin(data: Union[dask.array.Array, np.ndarray]) -> Union[dask.array.Array, np.ndarray]: + """Perform log to linear transform on data + + Parameters + ---------- + data : dask.array.Array or np.ndarray + The data to be transformed + + Returns + ------- + dask.array.Array or np.ndarray + The transformed data + """ + return 10 ** (data / 10) + + +def _lin2log(data: Union[dask.array.Array, np.ndarray]) -> Union[dask.array.Array, np.ndarray]: + """Perform linear to log transform on data + + Parameters + ---------- + data : dask.array.Array or np.ndarray + The data to be transformed + + Returns + ------- + dask.array.Array or np.ndarray + The transformed data + """ + return 10 * np.log10(data) diff --git a/pytest.ini b/pytest.ini index 7a3a8bfd2..9ad97f3a7 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,5 +1,7 @@ # test directory [pytest] testpaths = echopype/tests - cache_dir = .cache +markers = + unit: marks tests as unit tests + integration: marks tests as integration tests diff --git a/requirements.txt b/requirements.txt index 2cb7c15cd..09ba1a7c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,4 @@ aiohttp xarray-datatree==0.0.6 psutil>=5.9.1 geopy +flox>=0.7.2,<1.0.0