From e769ecfb218f6ef20b62b16b10ad2cffae012fbe Mon Sep 17 00:00:00 2001 From: Ruxandra Valcu Date: Tue, 7 Nov 2023 11:45:21 +0200 Subject: [PATCH] Merge/dev (#111) * added support for AZFP multiple phase attributes parsing (#1182) * added support for multiple phase attributes parsing * minor changes to code * Update echopype/convert/set_groups_azfp.py Co-authored-by: Emilio Mayorga * Update set_groups_azfp.py * Update test_convert_azfp.py --------- Co-authored-by: Emilio Mayorga * ci: Update CI to barebone python [all tests ci] (#1192) * ci: Add installed packages listing * ci: Add pip listing * ci: Remove conda from PR action * ci: Update worklows and add P2Z skip * ci: More cleanup to workflow * ci: Bump actions/setup-python from 4.7.0 to 4.7.1 * fix: Fix encoding in 'to_zarr' to remove 'preferred_chunks' before saving (#1128) * Encoding and Chunk matching only if DaskArray * Removing preffered chunks from encoding dict * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * `parse_azfp` now parses AZFP pressure data according to given matlab code (#1189) * added support for parsing azfp pressure data * fixed failing ci tests * Revert "fixed failing ci tests" This reverts commit c378dc0cc68b70daadb04cd5a06a5ca296581b51. * minor changes to tests * Update test_convert_azfp.py * Update echopype/tests/convert/test_convert_azfp.py --------- Co-authored-by: Emilio Mayorga * Enhanced `update_platform` to Auto-assign first `ping_time` as `Platform` timestamp for fixed-location update case without timestamp. [all tests ci] (#1196) * first is auto assigned as timestamp for lon and lat update without timestamp * fix small typo --------- Co-authored-by: Wu-Jung Lee * Added `depth_from_pressure` method required for the calculation of `vertical_offset` value (#1207) * added depth_from_pressure method * Function arguments can be scalars or sequences; and add more tests (#2) * Reverse order of equation terms to match UNESCO 1983 source * For depth_from_pressure, accept scalars, lists and arrays for all 3 arguments. Include consistency checks. * Add a greater variety of depth_from_pressure tests. * Update echopype/utils/misc.py * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: Emilio Mayorga Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * fix(commongrid): fix bugs and improve `compute_NASC` using flox (#1167) * chore(deps): add flox dependency >=0.7.2 * fix(commongrid): fixes 'compute_MVBS' so it can work better and scale Under the hood, binning along ping time and echo range now uses flox. This allows for scalability and more community-maintained. * docs: add small code comment * refactor: change how ping_time index is retrieved * refactor: remove for loop for channel * test(mvbs): add mock Sv datasets and tests for dims (#2) Note that @leewujung also changed mean to nanmean for skipping NaNs in each bin. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix: change dask to numpy Changed the use of dask for log10 to numpy instead since numpy can also handle dask array inputs properly. * feat: Add method argument Added 'method' argument to 'get_MVBS_along_channels' and also expose additional keyword arguments control for flox. * fix(commongrid): Fixed to include lat lon Fixed 'compute_MVBS' function to now include latitude and longitude if the variables exists in the Sv dataset. Additionally, the flox method and keyword arguments are now exposed within the 'compute_MVBS' function. Ref: Issue #1002 * refactor: Set defaults to recommended After some investigation, @lsetiawan concluded that at this time the method 'map-reduce', engine 'numpy', and reindex True works the best, so this is now set as default. Also, getting echo range maximum is through direct data slicing rather than computation. * feat(commongrid): Add 'range_var' argument to 'compute_MVBS' Added a new argument 'range_var' so that user can set the range variable to perform binning with. There are 2 options of 'echo_range' and 'depth': - 'echo_range': When this is set, variable 'water_level' is now included in the resulting MVBS dataset - 'depth': A check is in place to ensure that this variable exists before moving forward and use this to perform range binning. Ref: Issue #1002 * fix: Add missing attributes for lat lon * test: Update test to use random generator * fix: Add case for no 'water_level' Added a case for dataset that doesn't have water level variable. * test(nasc): Remove 'compute_NASC' import to avoid failure * fix: Removed assumption on echo range max Reverted back the echo range max computation to computing on the fly since there may be some NaN values. * test: Extract api test and add markings Extracted fixtures to conftest.py for commongrid. Additionally, clean up unused functions and mark tests b/w unit and integration. Added a new test module called 'test_api.py' for 'commongrid.api'. * test: Add latlon test for 'compute_MVBS' Added a test for Sv dataset that contains latitude and longitude going through 'compute_MVBS' to ensure that those variables gets propagated through. Ref: #1002 * test: Add small get_MVBS_along_channels test Added test for 'get_MVBS_along_channels' with either 'depth' as the 'range_var' or checking for 'has_positions' is True or False. * refactor: Integrate suggested changes Integrated suggested changes from review such as additional comments in code, fixing some variable names, and extracting out the lin2log and log2lin functions. Additionally, now echopype imports pint library to start having unit checks in the input for compute_MVBS. * test: Added check for position values * test: Update range_meter_bin to strings * test: Added 'compute_MVBS' values test * Update echopype/tests/utils/test_processinglevels_integration.py compute_MVBS now should preserve the processing level attributes. So, test for presence rather than absence * test: Add 'nan' sprinkles Sprinkled 'nan' values all over 'echo_range' to ensure that computed values from 'compute_MVBS' doesn't take into account the 'nan'. Added check for the expected distribution of 'nan' in the resulting array. Ref: https://github.com/OSOceanAcoustics/echopype/pull/1124#issuecomment-1703643268 * revert: Revert the use of 'pint' Removed dependency to 'pint' and use simple regex to ensure that 'range_bin' input is unit 'm'. Renamed 'range_meter_bin' argument to 'range_bin'. Ref: https://github.com/OSOceanAcoustics/echopype/pull/1124#issuecomment-1711629755 * feat: Allow 'range_bin' to have space * fix: Apply suggestions from code review Applied fix for regex not capturing decimal values by @emiliom Ref: https://github.com/OSOceanAcoustics/echopype/pull/1124/files#r1320422121 Co-authored-by: Emilio Mayorga * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * test: Fix test text for wrong unit * test: Remove the 'e.g.' part on pytest Removed the part with '(e.g., '10m')' since it's messing up pytests regex matching. * revive the function to make changes easier to see * add TODOs * add computation steps * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * remove unused _lin2log * test: Remove remnant for test_ek.py * refactor: Extract range_bin parsing and add close arg Extracts out the 'range_bin' string to float into a private function. Additionally now there's a fine tune argument for bin close edges so user can specify either close is 'left' or 'right'. Bins are converted to pandas interval index before passing into 'get_MVBS_along_channels'. * refactor: Update arg types to include interval index Added argument type options for 'range_interval' and 'ping_interval' to also be interval index. * test: Update tests to have brute force creation Changed mock mvbs to be created by doing brute force rather than hard coding. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * test: Fix brute force mvbs gen Fixes the generation of expected mvbs with brute force as well as tweaks to mvbs_along_channel test. * chore: Clean up old code for doing compute MVBS Removes old code that perfoms compute_MVBS since now we've switched over to flox * chore(pytest): Added custom markers 'unit' and 'integration' * docs: Update docstring for `compute_MVBS` Added options for literal arguments * refactor: Change 'parse_range_bin' to 'parse_x_bin' Make bin parsing to be more general by making it to 'parse_x_bin'. * refactor: Initial unification of MVBS and NASC Added setup and validate function for shared checks between compute MVBS and NASC so only unique checks are in its individual function. * fix typo when porting from notebook * correct attribute units from m to nmi * refactor: Add typehints and use method * feat: Add get_x_along_channels Added 'get_x_along_channels' function that generalizes the reduction routines from 'get_MVBS_along_channels'. This now removes the old function in mvbs.py module. Additionally, uses of 'get_MVBS_along_channels' has been removed from the test and code for 'compute_MVBS'. * feat: Implement new 'compute_NASC' Use 'get_x_along_channels' for 'compute_NASC' and turn on old 'test_nasc.py' for initial nasc testing * test: Renamed and moved get_x_along_channels test * fix: Use 'ffill' and 'bfill' Fixes the 'FutureWarning' coming from pandas since as of pandas version 2.1.0 the 'method' argument for 'fillna' is deprecated. Ref: https://github.com/OSOceanAcoustics/echopype/pull/1167#issuecomment-1732651809 * feat: Allow import 'compute_NASC' from 'commongrid' module * fix: Fix bug on setup and validate and test Fixes bug on 'setup_and_validate' during variable checks. Also added simple testing for values from flox vs echoview. * test: Update simple NASC integration test * test: Add brute force values test for NASC * chore: Apply suggestions from code review Co-authored-by: Wu-Jung Lee * refactor: Extract position reduction * refactor: Separate sv mean and raw computations * test: Remove empty test_nasc.py * docs: Update docs for functions * refactor: Move helper funcs to utils.py * add L4 processing level to compute_NASC --------- Co-authored-by: Landung 'Don' Setiawan Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Emilio Mayorga --------- Co-authored-by: Praneeth Ratna <63547155+praneethratna@users.noreply.github.com> Co-authored-by: Emilio Mayorga Co-authored-by: Don Setiawan Co-authored-by: Soham Butala <38330817+Sohambutala@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Wu-Jung Lee --- .github/workflows/build.yaml | 37 +- .github/workflows/pr.yaml | 39 +- echopype/commongrid/__init__.py | 3 +- echopype/commongrid/api.py | 467 ++++++--------- echopype/commongrid/mvbs.py | 89 --- echopype/commongrid/nasc.py | 86 --- echopype/commongrid/utils.py | 557 ++++++++++++++++++ echopype/convert/parse_azfp.py | 73 ++- echopype/convert/set_groups_azfp.py | 71 ++- echopype/echodata/echodata.py | 16 +- echopype/tests/commongrid/conftest.py | 354 +++++++++-- echopype/tests/commongrid/test_api.py | 169 +++++- echopype/tests/commongrid/test_mvbs.py | 67 --- echopype/tests/commongrid/test_nasc.py | 38 -- echopype/tests/convert/test_convert_azfp.py | 108 +++- echopype/tests/convert/test_parsed_to_zarr.py | 2 + echopype/tests/echodata/test_echodata.py | 37 +- echopype/tests/utils/test_utils_misc.py | 31 + echopype/utils/coding.py | 3 + echopype/utils/io.py | 4 +- echopype/utils/misc.py | 75 +++ 21 files changed, 1567 insertions(+), 759 deletions(-) delete mode 100644 echopype/commongrid/mvbs.py delete mode 100644 echopype/commongrid/nasc.py create mode 100644 echopype/commongrid/utils.py delete mode 100644 echopype/tests/commongrid/test_mvbs.py delete mode 100644 echopype/tests/commongrid/test_nasc.py create mode 100644 echopype/tests/utils/test_utils_misc.py diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 13c398aca..69b0659a6 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -9,25 +9,20 @@ on: workflow_dispatch: env: - CONDA_ENV: echopype NUM_WORKERS: 2 jobs: test: name: ${{ matrix.python-version }}-build - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest if: ${{ !contains(github.event.head_commit.message, '[skip ci]') }} continue-on-error: ${{ matrix.experimental }} strategy: fail-fast: false matrix: - python-version: ["3.9", "3.10"] # TODO: add back 3.11 once parsed2zarr is fixed + python-version: ["3.9", "3.10", "3.11"] runs-on: [ubuntu-latest] experimental: [false] - include: - - runs-on: ubuntu-latest - python-version: "3.11" - experimental: true services: # TODO: figure out how to update tag when there's a new one minio: @@ -46,30 +41,20 @@ jobs: - name: Set environment variables run: | echo "PYTHON_VERSION=${{ matrix.python-version }}" >> $GITHUB_ENV - - name: Setup micromamba - uses: mamba-org/setup-micromamba@v1 + - name: Set up Python + uses: actions/setup-python@v4.7.1 with: - environment-file: .ci_helpers/py${{ matrix.python-version }}.yaml - environment-name: ${{ env.CONDA_ENV }} - cache-environment: true - post-cleanup: 'all' - - name: Print conda environment - shell: bash -l {0} - run: | - micromamba info - micromamba list + python-version: ${{ matrix.python-version }} + - name: Upgrade pip + run: python -m pip install --upgrade pip - name: Remove docker-compose python - if: ${{ matrix.python-version == '3.10' || matrix.python-version == '3.11' }} - shell: bash -l {0} run: sed -i "/docker-compose/d" requirements-dev.txt - name: Install dev tools - shell: bash -l {0} - run: | - micromamba install -c conda-forge -n ${{ env.CONDA_ENV }} --yes --file requirements-dev.txt + run: python -m pip install -r requirements-dev.txt - name: Install echopype - shell: bash -l {0} - run: | - python -m pip install -e .[plot] + run: python -m pip install -e ".[plot]" + - name: Print installed packages + run: python -m pip list - name: Copying test data to services shell: bash -l {0} run: | diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 800a1e96f..b44acb8b7 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -5,7 +5,6 @@ on: paths-ignore: ["**/docker.yaml", "docs"] env: - CONDA_ENV: echopype NUM_WORKERS: 2 jobs: @@ -17,16 +16,9 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.9", "3.10"] # TODO: add back 3.11 once parsed2zarr is fixed + python-version: ["3.9", "3.10", "3.11"] runs-on: [ubuntu-latest] experimental: [false] - include: - - runs-on: ubuntu-latest - python-version: "3.11" - experimental: true - defaults: - run: - shell: bash -l {0} services: # TODO: figure out how to update tag when there's a new one minio: @@ -42,35 +34,28 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for all branches and tags. + - name: Set up Python + uses: actions/setup-python@v4.7.1 + with: + python-version: ${{ matrix.python-version }} + - name: Upgrade pip + run: python -m pip install --upgrade pip - name: Set environment variables run: | echo "PYTHON_VERSION=${{ matrix.python-version }}" >> $GITHUB_ENV - - name: Setup micromamba - uses: mamba-org/setup-micromamba@v1 - with: - environment-file: .ci_helpers/py${{ matrix.python-version }}.yaml - environment-name: ${{ env.CONDA_ENV }} - cache-environment: true - post-cleanup: 'all' - - name: Print conda environment - run: | - micromamba info - micromamba list - name: Remove docker-compose python - if: ${{ matrix.python-version == '3.10' || matrix.python-version == '3.11' }} run: sed -i "/docker-compose/d" requirements-dev.txt - name: Install dev tools - run: | - micromamba install -c conda-forge -n ${{ env.CONDA_ENV }} --yes --file requirements-dev.txt + run: python -m pip install -r requirements-dev.txt # We only want to install this on one run, because otherwise we'll have # duplicate annotations. - name: Install error reporter if: ${{ matrix.python-version == '3.9' }} - run: | - python -m pip install pytest-github-actions-annotate-failures + run: python -m pip install pytest-github-actions-annotate-failures - name: Install echopype - run: | - python -m pip install -e .[plot] + run: python -m pip install -e ".[plot]" + - name: Print installed packages + run: python -m pip list - name: Copying test data to services run: | python .ci_helpers/docker/setup-services.py --deploy --data-only --http-server ${{ job.services.httpserver.id }} diff --git a/echopype/commongrid/__init__.py b/echopype/commongrid/__init__.py index 642434353..63172d2bd 100644 --- a/echopype/commongrid/__init__.py +++ b/echopype/commongrid/__init__.py @@ -1,6 +1,7 @@ -from .api import compute_MVBS, compute_MVBS_index_binning +from .api import compute_MVBS, compute_MVBS_index_binning, compute_NASC __all__ = [ "compute_MVBS", + "compute_NASC", "compute_MVBS_index_binning", ] diff --git a/echopype/commongrid/api.py b/echopype/commongrid/api.py index 8c9132b99..396a943be 100644 --- a/echopype/commongrid/api.py +++ b/echopype/commongrid/api.py @@ -1,7 +1,7 @@ """ Functions for enhancing the spatial and temporal coherence of data. """ -import re +import logging from typing import Literal import numpy as np @@ -10,157 +10,19 @@ from ..consolidate.api import POSITION_VARIABLES from ..utils.prov import add_processing_level, echopype_prov_attrs, insert_input_processing_level -from .mvbs import get_MVBS_along_channels +from .utils import ( + _convert_bins_to_interval_index, + _get_reduced_positions, + _parse_x_bin, + _set_MVBS_attrs, + _set_var_attrs, + _setup_and_validate, + compute_raw_MVBS, + compute_raw_NASC, + get_distance_from_latlon, +) - -def _set_var_attrs(da, long_name, units, round_digits, standard_name=None): - """ - Attach common attributes to DataArray variable. - - Parameters - ---------- - da : xr.DataArray - DataArray that will receive attributes - long_name : str - Variable long_name attribute - units : str - Variable units attribute - round_digits : int - Number of digits after decimal point for rounding off actual_range - standard_name : str - CF standard_name, if available (optional) - """ - - da.attrs = { - "long_name": long_name, - "units": units, - "actual_range": [ - round(float(da.min().values), round_digits), - round(float(da.max().values), round_digits), - ], - } - if standard_name: - da.attrs["standard_name"] = standard_name - - -def _set_MVBS_attrs(ds): - """ - Attach common attributes. - - Parameters - ---------- - ds : xr.Dataset - dataset containing MVBS - """ - ds["ping_time"].attrs = { - "long_name": "Ping time", - "standard_name": "time", - "axis": "T", - } - - _set_var_attrs( - ds["Sv"], - long_name="Mean volume backscattering strength (MVBS, mean Sv re 1 m-1)", - units="dB", - round_digits=2, - ) - - -def _convert_bins_to_interval_index( - bins: list, closed: Literal["left", "right"] = "left" -) -> pd.IntervalIndex: - """ - Convert bins to sorted pandas IntervalIndex - with specified closed end - - Parameters - ---------- - bins : list - The bin edges - closed : {'left', 'right'}, default 'left' - Which side of bin interval is closed - - Returns - ------- - pd.IntervalIndex - The resulting IntervalIndex - """ - return pd.IntervalIndex.from_breaks(bins, closed=closed).sort_values() - - -def _parse_x_bin(x_bin: str, x_label="range_bin") -> float: - """ - Parses x bin string, check unit, - and returns x bin in the specified unit. - - Currently only available for: - range_bin: meters (m) - dist_bin: nautical miles (nmi) - - Parameters - ---------- - x_bin : str - X bin string, e.g., "0.5nmi" or "10m" - x_label : {"range_bin", "dist_bin"}, default "range_bin" - The label of the x bin. - - Returns - ------- - float - The resulting x bin value in x unit, - based on label. - - Raises - ------ - ValueError - If the x bin string doesn't include unit value. - TypeError - If the x bin is not a type string. - KeyError - If the x label is not one of the available labels. - """ - x_bin_map = { - "range_bin": { - "name": "Range bin", - "unit": "m", - "ex": "10m", - "unit_label": "meters", - "pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(m)", - }, - "dist_bin": { - "name": "Distance bin", - "unit": "nmi", - "ex": "0.5nmi", - "unit_label": "nautical miles", - "pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(nmi)", - }, - } - x_bin_info = x_bin_map.get(x_label, None) - - if x_bin_info is None: - raise KeyError(f"x_label must be one of {list(x_bin_map.keys())}") - - # First check for bin types - if not isinstance(x_bin, str): - raise TypeError("'x_bin' must be a string") - # normalize to lower case - # for x_bin - x_bin = x_bin.strip().lower() - # Only matches meters - match_obj = re.match(x_bin_info["pattern"], x_bin) - - # Do some checks on x_bin inputs - if match_obj is None: - # This shouldn't be other units - raise ValueError( - f"{x_bin_info['name']} must be in " - f"{x_bin_info['unit_label']} " - f"(e.g., '{x_bin_info['ex']}')." - ) - - # Convert back to float - x_bin = float(match_obj.group(1)) - return x_bin +logger = logging.getLogger(__name__) @add_processing_level("L3*") @@ -201,7 +63,7 @@ def compute_MVBS( for more details. closed: {'left', 'right'}, default 'left' Which side of bin interval is closed. - **kwargs + **flox_kwargs Additional keyword arguments to be passed to flox reduction function. @@ -210,28 +72,15 @@ def compute_MVBS( A dataset containing bin-averaged Sv """ + # Setup and validate + # * Sv dataset must contain specified range_var + # * Parse range_bin + # * Check closed value + ds_Sv, range_bin = _setup_and_validate(ds_Sv, range_var, range_bin, closed) + if not isinstance(ping_time_bin, str): raise TypeError("ping_time_bin must be a string") - range_bin = _parse_x_bin(range_bin, "range_bin") - - # Clean up filenames dimension if it exists - # not needed here - if "filenames" in ds_Sv.dims: - ds_Sv = ds_Sv.drop_dims("filenames") - - # Check if range_var is valid - if range_var not in ["echo_range", "depth"]: - raise ValueError("range_var must be one of 'echo_range' or 'depth'.") - - # Check if range_var exists in ds_Sv - if range_var not in ds_Sv.data_vars: - raise ValueError(f"range_var '{range_var}' does not exist in the input dataset.") - - # Check for closed values - if closed not in ["right", "left"]: - raise ValueError(f"{closed} is not a valid option. Options are 'left' or 'right'.") - # create bin information for echo_range # this computes the echo range max since there might NaNs in the data echo_range_max = ds_Sv[range_var].max() @@ -249,7 +98,7 @@ def compute_MVBS( # Set interval index for groups ping_interval = _convert_bins_to_interval_index(ping_interval, closed=closed) range_interval = _convert_bins_to_interval_index(range_interval, closed=closed) - raw_MVBS = get_MVBS_along_channels( + raw_MVBS = compute_raw_MVBS( ds_Sv, range_interval, ping_interval, @@ -269,12 +118,9 @@ def compute_MVBS( }, ) - # "has_positions" attribute is inserted in get_MVBS_along_channels - # when the dataset has position information + # If dataset has position information # propagate this to the final MVBS dataset - if raw_MVBS.attrs.get("has_positions", False): - for var in POSITION_VARIABLES: - ds_MVBS[var] = (["ping_time"], raw_MVBS[var].data, ds_Sv[var].attrs) + ds_MVBS = _get_reduced_positions(ds_Sv, ds_MVBS, "MVBS", ping_interval) # Add water level if uses echo_range and it exists in Sv dataset if range_var == "echo_range" and "water_level" in ds_Sv.data_vars: @@ -415,131 +261,148 @@ def compute_MVBS_index_binning(ds_Sv, range_sample_num=100, ping_num=100): return ds_MVBS -# def compute_NASC( -# ds_Sv: xr.Dataset, -# cell_dist: Union[int, float], # TODO: allow xr.DataArray -# cell_depth: Union[int, float], # TODO: allow xr.DataArray -# ) -> xr.Dataset: -# """ -# Compute Nautical Areal Scattering Coefficient (NASC) from an Sv dataset. - -# Parameters -# ---------- -# ds_Sv : xr.Dataset -# A dataset containing Sv data. -# The Sv dataset must contain ``latitude``, ``longitude``, and ``depth`` as data variables. -# cell_dist: int, float -# The horizontal size of each NASC cell, in nautical miles [nmi] -# cell_depth: int, float -# The vertical size of each NASC cell, in meters [m] - -# Returns -# ------- -# xr.Dataset -# A dataset containing NASC - -# Notes -# ----- -# The NASC computation implemented here corresponds to the Echoview algorithm PRC_NASC -# https://support.echoview.com/WebHelp/Reference/Algorithms/Analysis_Variables/PRC_ABC_and_PRC_NASC.htm#PRC_NASC # noqa -# The difference is that since in echopype masking of the Sv dataset is done explicitly using -# functions in the ``mask`` subpackage so the computation only involves computing the -# mean Sv and the mean height within each cell. - -# In addition, here the binning of pings into individual cells is based on the actual horizontal -# distance computed from the latitude and longitude coordinates of each ping in the Sv dataset. -# Therefore, both regular and irregular horizontal distance in the Sv dataset are allowed. -# This is different from Echoview's assumption of constant ping rate, vessel speed, and sample -# thickness when computing mean Sv. -# """ -# # Check Sv contains lat/lon -# if "latitude" not in ds_Sv or "longitude" not in ds_Sv: -# raise ValueError("Both 'latitude' and 'longitude' must exist in the input Sv dataset.") - -# # Check if depth vectors are identical within each channel -# if not ds_Sv["depth"].groupby("channel").map(check_identical_depth).all(): -# raise ValueError( -# "Only Sv data with identical depth vectors across all pings " -# "are allowed in the current compute_NASC implementation." -# ) - -# # Get distance from lat/lon in nautical miles -# dist_nmi = get_distance_from_latlon(ds_Sv) - -# # Find binning indices along distance -# bin_num_dist, dist_bin_idx = get_dist_bin_info(dist_nmi, cell_dist) # dist_bin_idx is 1-based - -# # Find binning indices along depth: channel-dependent -# bin_num_depth, depth_bin_idx = get_depth_bin_info(ds_Sv, cell_depth) # depth_bin_idx is 1-based # noqa - -# # Compute mean sv (volume backscattering coefficient, linear scale) -# # This is essentially to compute MVBS over a the cell defined here, -# # which are typically larger than those used for MVBS. -# # The implementation below is brute force looping, but can be optimized -# # by experimenting with different delayed schemes. -# # The optimized routines can then be used here and -# # in commongrid.compute_MVBS and clean.estimate_noise -# sv_mean = [] -# for ch_seq in np.arange(ds_Sv["channel"].size): -# # TODO: .compute each channel sequentially? -# # dask.delay within each channel? -# ds_Sv_ch = ds_Sv["Sv"].isel(channel=ch_seq).data # preserve the underlying type - -# sv_mean_dist_depth = [] -# for dist_idx in np.arange(bin_num_dist) + 1: # along ping_time -# sv_mean_depth = [] -# for depth_idx in np.arange(bin_num_depth) + 1: # along depth -# # Sv dim: ping_time x depth -# Sv_cut = ds_Sv_ch[dist_idx == dist_bin_idx, :][ -# :, depth_idx == depth_bin_idx[ch_seq] -# ] -# sv_mean_depth.append(np.nanmean(10 ** (Sv_cut / 10))) -# sv_mean_dist_depth.append(sv_mean_depth) - -# sv_mean.append(sv_mean_dist_depth) - -# # Compute mean height -# # For data with uniform depth step size, mean height = vertical size of cell -# height_mean = cell_depth -# # TODO: generalize to variable depth step size - -# ds_NASC = xr.DataArray( -# np.array(sv_mean) * height_mean, -# dims=["channel", "distance", "depth"], -# coords={ -# "channel": ds_Sv["channel"].values, -# "distance": np.arange(bin_num_dist) * cell_dist, -# "depth": np.arange(bin_num_depth) * cell_depth, -# }, -# name="NASC", -# ).to_dataset() - -# ds_NASC["frequency_nominal"] = ds_Sv["frequency_nominal"] # re-attach frequency_nominal - -# # Attach attributes -# _set_var_attrs( -# ds_NASC["NASC"], -# long_name="Nautical Areal Scattering Coefficient (NASC, m2 nmi-2)", -# units="m2 nmi-2", -# round_digits=3, -# ) -# _set_var_attrs(ds_NASC["distance"], "Cumulative distance", "m", 3) -# _set_var_attrs(ds_NASC["depth"], "Cell depth", "m", 3, standard_name="depth") - -# # Calculate and add ACDD bounding box global attributes -# ds_NASC.attrs["Conventions"] = "CF-1.7,ACDD-1.3" -# ds_NASC.attrs["time_coverage_start"] = np.datetime_as_string( -# ds_Sv["ping_time"].min().values, timezone="UTC" -# ) -# ds_NASC.attrs["time_coverage_end"] = np.datetime_as_string( -# ds_Sv["ping_time"].max().values, timezone="UTC" -# ) -# ds_NASC.attrs["geospatial_lat_min"] = round(float(ds_Sv["latitude"].min().values), 5) -# ds_NASC.attrs["geospatial_lat_max"] = round(float(ds_Sv["latitude"].max().values), 5) -# ds_NASC.attrs["geospatial_lon_min"] = round(float(ds_Sv["longitude"].min().values), 5) -# ds_NASC.attrs["geospatial_lon_max"] = round(float(ds_Sv["longitude"].max().values), 5) - -# return ds_NASC +@add_processing_level("L4") +def compute_NASC( + ds_Sv: xr.Dataset, + range_bin: str = "10m", + dist_bin: str = "0.5nmi", + method: str = "map-reduce", + closed: Literal["left", "right"] = "left", + **flox_kwargs, +) -> xr.Dataset: + """ + Compute Nautical Areal Scattering Coefficient (NASC) from an Sv dataset. + + Parameters + ---------- + ds_Sv : xr.Dataset + A dataset containing Sv data. + The Sv dataset must contain ``latitude``, ``longitude``, and ``depth`` as data variables. + range_bin : str, default '10m' + bin size along ``depth`` in meters (m). + dist_bin : str, default '0.5nmi' + bin size along ``distance`` in nautical miles (nmi). + method: str, default 'map-reduce' + The flox strategy for reduction of dask arrays only. + See flox `documentation `_ + for more details. + closed: {'left', 'right'}, default 'left' + Which side of bin interval is closed. + **flox_kwargs + Additional keyword arguments to be passed + to flox reduction function. + + Returns + ------- + xr.Dataset + A dataset containing NASC + + Notes + ----- + The NASC computation implemented here generally corresponds to the Echoview algorithm PRC_NASC + https://support.echoview.com/WebHelp/Reference/Algorithms/Analysis_Variables/PRC_ABC_and_PRC_NASC.htm#PRC_NASC # noqa + The difference is that since in echopype masking of the Sv dataset is done explicitly using + functions in the ``mask`` subpackage, the computation only involves computing the + mean Sv and the mean height within each cell, where some Sv "pixels" may have been + masked as NaN. + + In addition, in echopype the binning of pings into individual cells is based on the actual horizontal + distance computed from the latitude and longitude coordinates of each ping in the Sv dataset. + Therefore, both regular and irregular horizontal distance in the Sv dataset are allowed. + This is different from Echoview's assumption of constant ping rate, vessel speed, and sample + thickness when computing mean Sv + (see https://support.echoview.com/WebHelp/Reference/Algorithms/Analysis_Variables/Sv_mean.htm#Conversions). # noqa + """ + # Set range_var to be 'depth' + range_var = "depth" + + # Setup and validate + # * Sv dataset must contain latitude, longitude, and depth + # * Parse range_bin + # * Check closed value + ds_Sv, range_bin = _setup_and_validate( + ds_Sv, range_var, range_bin, closed, required_data_vars=POSITION_VARIABLES + ) + + # Check if dist_bin is a string + if not isinstance(dist_bin, str): + raise TypeError("dist_bin must be a string") + + # Parse the dist_bin string and convert to float + dist_bin = _parse_x_bin(dist_bin, "dist_bin") + + # Get distance from lat/lon in nautical miles + dist_nmi = get_distance_from_latlon(ds_Sv) + ds_Sv = ds_Sv.assign_coords({"distance_nmi": ("ping_time", dist_nmi)}).swap_dims( + {"ping_time": "distance_nmi"} + ) + + # create bin information along range_var + # this computes the range_var max since there might NaNs in the data + range_var_max = ds_Sv[range_var].max() + range_interval = np.arange(0, range_var_max + range_bin, range_bin) + + # create bin information along distance_nmi + # this computes the distance max since there might NaNs in the data + dist_max = ds_Sv["distance_nmi"].max() + dist_interval = np.arange(0, dist_max + dist_bin, dist_bin) + + # Set interval index for groups + dist_interval = _convert_bins_to_interval_index(dist_interval, closed=closed) + range_interval = _convert_bins_to_interval_index(range_interval, closed=closed) + + raw_NASC = compute_raw_NASC( + ds_Sv, + range_interval, + dist_interval, + method=method, + **flox_kwargs, + ) + + # create MVBS dataset + # by transforming the binned dimensions to regular coords + ds_NASC = xr.Dataset( + data_vars={"NASC": (["channel", "distance", range_var], raw_NASC["sv"].data)}, + coords={ + "distance": np.array([v.left for v in raw_NASC["distance_nmi_bins"].values]), + "channel": raw_NASC["channel"].values, + range_var: np.array([v.left for v in raw_NASC[f"{range_var}_bins"].values]), + }, + ) + + # If dataset has position information + # propagate this to the final NASC dataset + ds_NASC = _get_reduced_positions(ds_Sv, ds_NASC, "NASC", dist_interval) + + # Set ping time binning information + ds_NASC["ping_time"] = (["distance"], raw_NASC["ping_time"].data, ds_Sv["ping_time"].attrs) + + ds_NASC["frequency_nominal"] = ds_Sv["frequency_nominal"] # re-attach frequency_nominal + + # Attach attributes + _set_var_attrs( + ds_NASC["NASC"], + long_name="Nautical Areal Scattering Coefficient (NASC, m2 nmi-2)", + units="m2 nmi-2", + round_digits=3, + ) + _set_var_attrs(ds_NASC["distance"], "Cumulative distance", "nmi", 3) + _set_var_attrs(ds_NASC["depth"], "Cell depth", "m", 3, standard_name="depth") + + # Calculate and add ACDD bounding box global attributes + ds_NASC.attrs["Conventions"] = "CF-1.7,ACDD-1.3" + ds_NASC.attrs["time_coverage_start"] = np.datetime_as_string( + ds_Sv["ping_time"].min().values, timezone="UTC" + ) + ds_NASC.attrs["time_coverage_end"] = np.datetime_as_string( + ds_Sv["ping_time"].max().values, timezone="UTC" + ) + ds_NASC.attrs["geospatial_lat_min"] = round(float(ds_Sv["latitude"].min().values), 5) + ds_NASC.attrs["geospatial_lat_max"] = round(float(ds_Sv["latitude"].max().values), 5) + ds_NASC.attrs["geospatial_lon_min"] = round(float(ds_Sv["longitude"].min().values), 5) + ds_NASC.attrs["geospatial_lon_max"] = round(float(ds_Sv["longitude"].max().values), 5) + + return ds_NASC def regrid(): diff --git a/echopype/commongrid/mvbs.py b/echopype/commongrid/mvbs.py deleted file mode 100644 index 2fbc1b38f..000000000 --- a/echopype/commongrid/mvbs.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -Contains core functions needed to compute the MVBS of an input dataset. -""" - -from typing import Literal, Union - -import numpy as np -import pandas as pd -import xarray as xr -from flox.xarray import xarray_reduce - -from ..consolidate.api import POSITION_VARIABLES -from ..utils.compute import _lin2log, _log2lin - - -def get_MVBS_along_channels( - ds_Sv: xr.Dataset, - range_interval: Union[pd.IntervalIndex, np.ndarray], - ping_interval: Union[pd.IntervalIndex, np.ndarray], - range_var: Literal["echo_range", "depth"] = "echo_range", - method: str = "map-reduce", - **kwargs -) -> xr.Dataset: - """ - Computes the MVBS of ``ds_Sv`` along each channel for the given - intervals. - - Parameters - ---------- - ds_Sv: xr.Dataset - A Dataset containing ``Sv`` and ``echo_range`` data with coordinates - ``channel``, ``ping_time``, and ``range_sample`` - range_interval: pd.IntervalIndex or np.ndarray - 1D array or interval index representing - the bins required for ``range_var`` - ping_interval: pd.IntervalIndex or np.ndarray - 1D array or interval index representing - the bins required for ``ping_time`` - range_var: str - The variable to use for range binning. - Either ``echo_range`` or ``depth``. - method: str - The flox strategy for reduction of dask arrays only. - See flox `documentation `_ - for more details. - **kwargs - Additional keyword arguments to be passed - to flox reduction function - - Returns - ------- - xr.Dataset - The MVBS dataset of the input ``ds_Sv`` for all channels - """ - - # average should be done in linear domain - sv = ds_Sv["Sv"].pipe(_log2lin) - - # Get positions if exists - # otherwise just use an empty dataset - ds_Pos = xr.Dataset(attrs={"has_positions": False}) - if all(v in ds_Sv for v in POSITION_VARIABLES): - ds_Pos = xarray_reduce( - ds_Sv[POSITION_VARIABLES], - ds_Sv["ping_time"], - func="nanmean", - expected_groups=(ping_interval), - isbin=True, - method=method, - ) - ds_Pos.attrs["has_positions"] = True - - # reduce along ping_time and echo_range or depth - # by binning and averaging - mvbs = xarray_reduce( - sv, - sv["channel"], - ds_Sv["ping_time"], - ds_Sv[range_var], - func="nanmean", - expected_groups=(None, ping_interval, range_interval), - isbin=[False, True, True], - method=method, - **kwargs - ) - - # apply inverse mapping to get back to the original domain and store values - da_MVBS = mvbs.pipe(_lin2log) - return xr.merge([ds_Pos, da_MVBS]) diff --git a/echopype/commongrid/nasc.py b/echopype/commongrid/nasc.py deleted file mode 100644 index 2656f2bc7..000000000 --- a/echopype/commongrid/nasc.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -An overhaul is required for the below compute_NASC implementation, to: -- increase the computational efficiency -- debug the current code of any discrepancy against Echoview implementation -- potentially provide an alternative based on ping-by-ping Sv - -This script contains functions used by commongrid.compute_NASC, -but a subset of these overlap with operations needed -for commongrid.compute_MVBS and clean.estimate_noise. -The compute_MVBS and remove_noise code needs to be refactored, -and the compute_NASC needs to be optimized. -The plan is to create a common util set of functions for use in -these functions in an upcoming release. -""" - -import numpy as np -import xarray as xr -from geopy import distance - - -def check_identical_depth(ds_ch): - """ - Check if all pings have the same depth vector. - """ - # Depth vector are identical for all pings, if: - # - the number of non-NaN range_sample is the same for all pings, AND - # - all pings have the same max range - num_nan = np.isnan(ds_ch.values).sum(axis=1) - nan_check = True if np.all(num_nan == 0) or np.unique(num_nan).size == 1 else False - - if not nan_check: - return xr.DataArray(False, coords={"channel": ds_ch["channel"]}) - else: - # max range of each ping should be identical - max_range_ping = ds_ch.values[np.arange(ds_ch.shape[0]), ds_ch.shape[1] - num_nan - 1] - if np.unique(max_range_ping).size == 1: - return xr.DataArray(True, coords={"channel": ds_ch["channel"]}) - else: - return xr.DataArray(False, coords={"channel": ds_ch["channel"]}) - - -def get_depth_bin_info(ds_Sv, cell_depth): - """ - Find binning indices along depth - """ - depth_ping1 = ds_Sv["depth"].isel(ping_time=0) - num_nan = np.isnan(depth_ping1.values).sum(axis=1) - # ping 1 max range of each channel - max_range_ch = depth_ping1.values[ - np.arange(depth_ping1.shape[0]), depth_ping1.shape[1] - num_nan - 1 - ] - bin_num_depth = np.ceil(max_range_ch.max() / cell_depth) # use max range of all channel - depth_bin_idx = [ - np.digitize(dp1, np.arange(bin_num_depth + 1) * cell_depth, right=False) - for dp1 in depth_ping1 - ] - return bin_num_depth, depth_bin_idx - - -def get_distance_from_latlon(ds_Sv): - # Get distance from lat/lon in nautical miles - df_pos = ds_Sv["latitude"].to_dataframe().join(ds_Sv["longitude"].to_dataframe()) - df_pos["latitude_prev"] = df_pos["latitude"].shift(-1) - df_pos["longitude_prev"] = df_pos["longitude"].shift(-1) - df_latlon_nonan = df_pos.dropna().copy() - df_latlon_nonan["dist"] = df_latlon_nonan.apply( - lambda x: distance.distance( - (x["latitude"], x["longitude"]), - (x["latitude_prev"], x["longitude_prev"]), - ).nm, - axis=1, - ) - df_pos = df_pos.join(df_latlon_nonan["dist"], how="left") - df_pos["dist"] = df_pos["dist"].cumsum() - df_pos["dist"] = df_pos["dist"].fillna(method="ffill").fillna(method="bfill") - - return df_pos["dist"].values - - -def get_dist_bin_info(dist_nmi, cell_dist): - bin_num_dist = np.ceil(dist_nmi.max() / cell_dist) - if np.mod(dist_nmi.max(), cell_dist) == 0: - # increment bin num if last element coincides with bin edge - bin_num_dist = bin_num_dist + 1 - dist_bin_idx = np.digitize(dist_nmi, np.arange(bin_num_dist + 1) * cell_dist, right=False) - return bin_num_dist, dist_bin_idx diff --git a/echopype/commongrid/utils.py b/echopype/commongrid/utils.py new file mode 100644 index 000000000..39d2cd62a --- /dev/null +++ b/echopype/commongrid/utils.py @@ -0,0 +1,557 @@ +import logging +import re +from typing import Literal, Optional, Tuple, Union + +import numpy as np +import pandas as pd +import xarray as xr +from flox.xarray import xarray_reduce +from geopy import distance + +from ..consolidate.api import POSITION_VARIABLES +from ..utils.compute import _lin2log, _log2lin + +logger = logging.getLogger(__name__) + + +def compute_raw_MVBS( + ds_Sv: xr.Dataset, + range_interval: Union[pd.IntervalIndex, np.ndarray], + ping_interval: Union[pd.IntervalIndex, np.ndarray], + range_var: Literal["echo_range", "depth"] = "echo_range", + method="map-reduce", + **flox_kwargs, +): + """ + Compute the raw unformatted MVBS of ``ds_Sv``. + + Parameters + ---------- + ds_Sv : xr.Dataset + A Dataset containing ``Sv`` and ``echo_range`` data + with coordinates ``channel``, ``ping_time``, and ``range_sample`` + at bare minimum. + Or this can contain ``Sv`` and ``depth`` data with similar coordinates. + range_interval: pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``range_var`` + ping_interval : pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``ping_time``. + range_var: {'echo_range', 'depth'}, default 'echo_range' + The variable to use for range binning. + Either ``echo_range`` or ``depth``. + method: str + The flox strategy for reduction of dask arrays only. + See flox `documentation `_ + for more details. + **flox_kwargs + Additional keyword arguments to be passed + to flox reduction function. + + Returns + ------- + xr.Dataset + The MVBS dataset of the input ``ds_Sv`` for all channels. + """ + # Set initial variables + ds = xr.Dataset() + x_var = "ping_time" + + sv_mean = _groupby_x_along_channels( + ds_Sv, + range_interval, + x_interval=ping_interval, + x_var=x_var, + range_var=range_var, + method=method, + **flox_kwargs, + ) + + # This is MVBS computation + # apply inverse mapping to get back to the original domain and store values + da_MVBS = sv_mean.pipe(_lin2log) + # return xr.merge([ds_Pos, da_MVBS]) + return xr.merge([ds, da_MVBS]) + + +def compute_raw_NASC( + ds_Sv: xr.Dataset, + range_interval: Union[pd.IntervalIndex, np.ndarray], + dist_interval: Union[pd.IntervalIndex, np.ndarray], + method="map-reduce", + **flox_kwargs, +): + """ + Compute the raw unformatted NASC of ``ds_Sv``. + + Parameters + ---------- + ds_Sv : xr.Dataset + A Dataset containing ``Sv`` and ``depth`` data + with coordinates ``channel``, ``distance_nmi``, and ``range_sample``. + range_interval: pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``range_var``. + dist_interval : pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``distance_nmi``. + method: str + The flox strategy for reduction of dask arrays only. + See flox `documentation `_ + for more details. + **flox_kwargs + Additional keyword arguments to be passed + to flox reduction function. + + Returns + ------- + xr.Dataset + The MVBS or NASC dataset of the input ``ds_Sv`` for all channels + """ + # Set initial variables + ds = xr.Dataset() + x_var = "distance_nmi" + range_var = "depth" + + # Determine range_dim for NASC computation + range_dim = "range_sample" + if range_dim not in ds_Sv.dims: + range_dim = "depth" + + sv_mean = _groupby_x_along_channels( + ds_Sv, + range_interval, + x_interval=dist_interval, + x_var=x_var, + range_var=range_var, + method=method, + **flox_kwargs, + ) + + # Get mean ping_time along distance_nmi + # this is only done for NASC computation, + # since for MVBS the ping_time is used for binning already. + ds_ping_time = xarray_reduce( + ds_Sv["ping_time"], + ds_Sv[x_var], + func="nanmean", + expected_groups=(dist_interval), + isbin=True, + method=method, + ) + + # Mean height: approach to use flox + # Numerator (h_mean_num): + # - create a dataarray filled with the first difference of sample height + # with 2D coordinate (distance, depth) + # - flox xarray_reduce along both distance and depth, summing over each 2D bin + # Denominator (h_mean_denom): + # - create a datararray filled with 1, with 1D coordinate (distance) + # - flox xarray_reduce along distance, summing over each 1D bin + # h_mean = N/D + da_denom = xr.ones_like(ds_Sv[x_var]) + h_mean_denom = xarray_reduce( + da_denom, + ds_Sv[x_var], + func="sum", + expected_groups=(dist_interval), + isbin=[True], + method=method, + ) + + h_mean_num = xarray_reduce( + ds_Sv[range_var].diff(dim=range_dim, label="lower"), # use lower end label after diff + ds_Sv["channel"], + ds_Sv[x_var], + ds_Sv[range_var].isel(**{range_dim: slice(0, -1)}), + func="sum", + expected_groups=(None, dist_interval, range_interval), + isbin=[False, True, True], + method=method, + ) + h_mean = h_mean_num / h_mean_denom + + # Combine to compute NASC and name it + raw_NASC = sv_mean * h_mean * 4 * np.pi * 1852**2 + raw_NASC.name = "sv" + + return xr.merge([ds, ds_ping_time, raw_NASC]) + + +def get_distance_from_latlon(ds_Sv): + # Get distance from lat/lon in nautical miles + df_pos = ds_Sv["latitude"].to_dataframe().join(ds_Sv["longitude"].to_dataframe()) + df_pos["latitude_prev"] = df_pos["latitude"].shift(-1) + df_pos["longitude_prev"] = df_pos["longitude"].shift(-1) + df_latlon_nonan = df_pos.dropna().copy() + df_latlon_nonan["dist"] = df_latlon_nonan.apply( + lambda x: distance.distance( + (x["latitude"], x["longitude"]), + (x["latitude_prev"], x["longitude_prev"]), + ).nm, + axis=1, + ) + df_pos = df_pos.join(df_latlon_nonan["dist"], how="left") + df_pos["dist"] = df_pos["dist"].cumsum() + df_pos["dist"] = df_pos["dist"].ffill().bfill() + + return df_pos["dist"].values + + +def _set_var_attrs(da, long_name, units, round_digits, standard_name=None): + """ + Attach common attributes to DataArray variable. + + Parameters + ---------- + da : xr.DataArray + DataArray that will receive attributes + long_name : str + Variable long_name attribute + units : str + Variable units attribute + round_digits : int + Number of digits after decimal point for rounding off actual_range + standard_name : str + CF standard_name, if available (optional) + """ + + da.attrs = { + "long_name": long_name, + "units": units, + "actual_range": [ + round(float(da.min().values), round_digits), + round(float(da.max().values), round_digits), + ], + } + if standard_name: + da.attrs["standard_name"] = standard_name + + +def _set_MVBS_attrs(ds): + """ + Attach common attributes. + + Parameters + ---------- + ds : xr.Dataset + dataset containing MVBS + """ + ds["ping_time"].attrs = { + "long_name": "Ping time", + "standard_name": "time", + "axis": "T", + } + + _set_var_attrs( + ds["Sv"], + long_name="Mean volume backscattering strength (MVBS, mean Sv re 1 m-1)", + units="dB", + round_digits=2, + ) + + +def _convert_bins_to_interval_index( + bins: list, closed: Literal["left", "right"] = "left" +) -> pd.IntervalIndex: + """ + Convert bins to sorted pandas IntervalIndex + with specified closed end + + Parameters + ---------- + bins : list + The bin edges + closed : {'left', 'right'}, default 'left' + Which side of bin interval is closed + + Returns + ------- + pd.IntervalIndex + The resulting IntervalIndex + """ + return pd.IntervalIndex.from_breaks(bins, closed=closed).sort_values() + + +def _parse_x_bin(x_bin: str, x_label="range_bin") -> float: + """ + Parses x bin string, check unit, + and returns x bin in the specified unit. + + Currently only available for: + range_bin: meters (m) + dist_bin: nautical miles (nmi) + + Parameters + ---------- + x_bin : str + X bin string, e.g., "0.5nmi" or "10m" + x_label : {"range_bin", "dist_bin"}, default "range_bin" + The label of the x bin. + + Returns + ------- + float + The resulting x bin value in x unit, + based on label. + + Raises + ------ + ValueError + If the x bin string doesn't include unit value. + TypeError + If the x bin is not a type string. + KeyError + If the x label is not one of the available labels. + """ + x_bin_map = { + "range_bin": { + "name": "Range bin", + "unit": "m", + "ex": "10m", + "unit_label": "meters", + "pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(m)", + }, + "dist_bin": { + "name": "Distance bin", + "unit": "nmi", + "ex": "0.5nmi", + "unit_label": "nautical miles", + "pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(nmi)", + }, + } + x_bin_info = x_bin_map.get(x_label, None) + + if x_bin_info is None: + raise KeyError(f"x_label must be one of {list(x_bin_map.keys())}") + + # First check for bin types + if not isinstance(x_bin, str): + raise TypeError("'x_bin' must be a string") + # normalize to lower case + # for x_bin + x_bin = x_bin.strip().lower() + # Only matches meters + match_obj = re.match(x_bin_info["pattern"], x_bin) + + # Do some checks on x_bin inputs + if match_obj is None: + # This shouldn't be other units + raise ValueError( + f"{x_bin_info['name']} must be in " + f"{x_bin_info['unit_label']} " + f"(e.g., '{x_bin_info['ex']}')." + ) + + # Convert back to float + x_bin = float(match_obj.group(1)) + return x_bin + + +def _setup_and_validate( + ds_Sv: xr.Dataset, + range_var: Literal["echo_range", "depth"] = "echo_range", + range_bin: Optional[str] = None, + closed: Literal["left", "right"] = "left", + required_data_vars: Optional[list] = None, +) -> Tuple[xr.Dataset, float]: + """ + Setup and validate shared arguments for + ``compute_X`` functions. + + For now this is only used by ``compute_MVBS`` and ``compute_NASC``. + + Parameters + ---------- + ds_Sv : xr.Dataset + Sv dataset + range_var : {'echo_range', 'depth'}, default 'echo_range' + The variable to use for range binning. + Must be one of ``echo_range`` or ``depth``. + Note that ``depth`` is only available if the input dataset contains + ``depth`` as a data variable. + range_bin : str, default None + bin size along ``echo_range`` or ``depth`` in meters. + closed: {'left', 'right'}, default 'left' + Which side of bin interval is closed. + required_data_vars : list, optional + List of required data variables in ds_Sv. + If None, defaults to empty list. + + Returns + ------- + ds_Sv : xr.Dataset + Modified Sv dataset + range_bin : float + The range bin value in meters + """ + + # Check if range_var is valid + if range_var not in ["echo_range", "depth"]: + raise ValueError("range_var must be one of 'echo_range' or 'depth'.") + + # Set to default empty list if None + if required_data_vars is None: + required_data_vars = [] + + # Check if required data variables exists in ds_Sv + # Use set to ensure no duplicates + required_data_vars = set(required_data_vars + [range_var]) + if not all([var in ds_Sv.variables for var in required_data_vars]): + raise ValueError( + "Input Sv dataset must contain all of " f"the following variables: {required_data_vars}" + ) + + # Check if range_bin is a string + if not isinstance(range_bin, str): + raise TypeError("range_bin must be a string") + + # Parse the range_bin string and convert to float + range_bin = _parse_x_bin(range_bin, "range_bin") + + # Check for closed values + if closed not in ["right", "left"]: + raise ValueError(f"{closed} is not a valid option. Options are 'left' or 'right'.") + + # Clean up filenames dimension if it exists + # not needed here + if "filenames" in ds_Sv.dims: + ds_Sv = ds_Sv.drop_dims("filenames") + + return ds_Sv, range_bin + + +def _get_reduced_positions( + ds_Sv: xr.Dataset, + ds_X: xr.Dataset, + X: Literal["MVBS", "NASC"], + x_interval: Union[pd.IntervalIndex, np.ndarray], +) -> xr.Dataset: + """Helper function to get reduced positions + + Parameters + ---------- + ds_Sv : xr.Dataset + The input Sv dataset + ds_X : xr.Dataset + The input X dataset, either ``ds_MVBS`` or ``ds_NASC`` + X : {'MVBS', 'NASC'} + The type of X dataset + x_interval : pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for X dataset. + + MVBS: ``ping_time`` + NASC: ``distance_nmi`` + + Returns + ------- + xr.Dataset + The X dataset with reduced position variables + such as latitude and longitude + """ + # Get positions if exists + # otherwise return the input ds_X + if all(v in ds_Sv for v in POSITION_VARIABLES): + x_var = x_dim = "ping_time" + if X == "NASC": + x_var = "distance_nmi" + x_dim = "distance" + + ds_Pos = xarray_reduce( + ds_Sv[POSITION_VARIABLES], + ds_Sv[x_var], + func="nanmean", + expected_groups=(x_interval), + isbin=True, + method="map-reduce", + ) + + for var in POSITION_VARIABLES: + ds_X[var] = ([x_dim], ds_Pos[var].data, ds_Sv[var].attrs) + return ds_X + + +def _groupby_x_along_channels( + ds_Sv: xr.Dataset, + range_interval: Union[pd.IntervalIndex, np.ndarray], + x_interval: Union[pd.IntervalIndex, np.ndarray], + x_var: Literal["ping_time", "distance_nmi"] = "ping_time", + range_var: Literal["echo_range", "depth"] = "echo_range", + method: str = "map-reduce", + **flox_kwargs, +) -> xr.Dataset: + """ + Perform groupby of ``ds_Sv`` along each channel for the given + intervals to get the 'sv' mean value. + + Parameters + ---------- + ds_Sv : xr.Dataset + A Dataset containing ``Sv`` and other variables, + depending on computation performed. + + For MVBS computation, this must contain ``Sv`` and ``echo_range`` data + with coordinates ``channel``, ``ping_time``, and ``range_sample`` + at bare minimum. + Or this can contain ``Sv`` and ``depth`` data with similar coordinates. + + For NASC computatioon this must contain ``Sv`` and ``depth`` data + with coordinates ``channel``, ``distance_nmi``, and ``range_sample``. + range_interval: pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``range_var`` + x_interval : pd.IntervalIndex or np.ndarray + 1D array or interval index representing + the bins required for ``ping_time`` or ``distance_nmi``. + x_var : {'ping_time', 'distance_nmi'}, default 'ping_time' + The variable to use for x binning. This will determine + if computation is for MVBS or NASC. + range_var: {'echo_range', 'depth'}, default 'echo_range' + The variable to use for range binning. + Either ``echo_range`` or ``depth``. + + **For NASC, this must be ``depth``.** + method: str + The flox strategy for reduction of dask arrays only. + See flox `documentation `_ + for more details. + **flox_kwargs + Additional keyword arguments to be passed + to flox reduction function. + + Returns + ------- + xr.Dataset + The MVBS or NASC dataset of the input ``ds_Sv`` for all channels + """ + # Check if x_var is valid, currently only support + # ping_time and distance_nmi, which indicates + # either a MVBS or NASC computation + if x_var not in ["ping_time", "distance_nmi"]: + raise ValueError("x_var must be 'ping_time' or 'distance_nmi'") + + # Set correct range_var just in case + if x_var == "distance_nmi" and range_var != "depth": + logger.warning("x_var is 'distance_nmi', setting range_var to 'depth'") + range_var = "depth" + + # average should be done in linear domain + sv = ds_Sv["Sv"].pipe(_log2lin) + + # reduce along ping_time or distance_nmi + # and echo_range or depth + # by binning and averaging + sv_mean = xarray_reduce( + sv, + ds_Sv["channel"], + ds_Sv[x_var], + ds_Sv[range_var], + func="nanmean", + expected_groups=(None, x_interval, range_interval), + isbin=[False, True, True], + method=method, + **flox_kwargs, + ) + return sv_mean diff --git a/echopype/convert/parse_azfp.py b/echopype/convert/parse_azfp.py index 2c031a53f..ae9f928e2 100644 --- a/echopype/convert/parse_azfp.py +++ b/echopype/convert/parse_azfp.py @@ -91,26 +91,35 @@ def load_AZFP_xml(self): """ xmlmap = fsspec.get_mapper(self.xml_path, **self.storage_options) - root = ET.parse(xmlmap.fs.open(xmlmap.root)).getroot() - - for child in root.iter(): - if len(child.tag) > 3 and not child.tag.startswith("VTX"): - camel_case_tag = camelcase2snakecase(child.tag) - else: - camel_case_tag = child.tag - if len(child.attrib) > 0: - for key, val in child.attrib.items(): - self.parameters[camel_case_tag + "_" + camelcase2snakecase(key)].append(val) - - if all(char == "\n" for char in child.text): - continue - else: - try: - val = int(child.text) - except ValueError: - val = float(child.text) - - self.parameters[camel_case_tag].append(val) + phase_number = None + for event, child in ET.iterparse(xmlmap.fs.open(xmlmap.root), events=("start", "end")): + if event == "end" and child.tag == "Phases": + phase_number = None + if event == "start": + if len(child.tag) > 3 and not child.tag.startswith("VTX"): + camel_case_tag = camelcase2snakecase(child.tag) + else: + camel_case_tag = child.tag + + if len(child.attrib) > 0: + for key, val in child.attrib.items(): + attrib_tag = camel_case_tag + "_" + camelcase2snakecase(key) + if phase_number is not None and camel_case_tag != "phase": + attrib_tag += f"_phase{phase_number}" + self.parameters[attrib_tag].append(val) + if child.tag == "Phase": + phase_number = val + + if all(char == "\n" for char in child.text): + continue + else: + try: + val = int(child.text) + except ValueError: + val = float(child.text) + if phase_number is not None and camel_case_tag != "phase": + camel_case_tag += f"_phase{phase_number}" + self.parameters[camel_case_tag].append(val) # Handling the case where there is only one value for each parameter for key, val in self.parameters.items(): @@ -188,6 +197,25 @@ def _compute_battery(self, ping_num, battery_type): return N * USL5_BAT_CONSTANT + def _compute_pressure(self, ping_num, is_valid): + """ + Compute pressure in decibar + + Parameters + ---------- + ping_num + ping number + is_valid + whether the associated parameters have valid values + """ + if not is_valid or self.parameters["sensors_flag_pressure_sensor_installed"] == "no": + return np.nan + + counts = self.unpacked_data["ancillary"][ping_num][3] + v_in = 2.5 * (counts / 65535) + P = v_in * self.parameters["a1"] + self.parameters["a0"] - 10.125 + return P + def parse_raw(self): """ Parse raw data file from AZFP echosounder. @@ -205,6 +233,7 @@ def _test_valid_params(params): return True temperature_is_valid = _test_valid_params(["ka", "kb", "kc"]) + pressure_is_valid = _test_valid_params(["a0", "a1"]) tilt_x_is_valid = _test_valid_params(["X_a", "X_b", "X_c"]) tilt_y_is_valid = _test_valid_params(["Y_a", "Y_b", "Y_c"]) @@ -226,6 +255,10 @@ def _test_valid_params(params): self.unpacked_data["temperature"].append( self._compute_temperature(ping_num, temperature_is_valid) ) + # Compute pressure from unpacked_data[ii]['ancillary'][3] + self.unpacked_data["pressure"].append( + self._compute_pressure(ping_num, pressure_is_valid) + ) # compute x tilt from unpacked_data[ii]['ancillary][0] self.unpacked_data["tilt_x"].append( self._compute_tilt(ping_num, "X", tilt_x_is_valid) diff --git a/echopype/convert/set_groups_azfp.py b/echopype/convert/set_groups_azfp.py index 9c37eec1e..7dc3fdf21 100644 --- a/echopype/convert/set_groups_azfp.py +++ b/echopype/convert/set_groups_azfp.py @@ -81,7 +81,7 @@ def _create_unique_channel_name(self): """ serial_number = self.parser_obj.unpacked_data["serial_number"] - frequency_number = self.parser_obj.parameters["frequency_number"] + frequency_number = self.parser_obj.parameters["frequency_number_phase1"] if serial_number.size == 1: freq_as_str = self.freq_sorted.astype(int).astype(str) @@ -114,7 +114,16 @@ def set_env(self) -> xr.Dataset: "standard_name": "sea_water_temperature", "units": "deg_C", }, - ) + ), + "pressure": ( + ["time1"], + self.parser_obj.unpacked_data["pressure"], + { + "long_name": "Sea water pressure", + "standard_name": "sea_water_pressure_due_to_sea_water", + "units": "dbar", + }, + ), }, coords={ "time1": ( @@ -500,7 +509,25 @@ def set_vendor(self) -> xr.Dataset: unpacked_data = self.parser_obj.unpacked_data parameters = self.parser_obj.parameters ping_time = self.parser_obj.ping_time - tdn = parameters["pulse_len"][self.freq_ind_sorted] / 1e6 + phase_params = ["burst_interval", "pings_per_burst", "average_burst_pings"] + phase_freq_params = [ + "dig_rate", + "range_samples", + "range_averaging_samples", + "lock_out_index", + "gain", + "storage_format", + ] + tdn = [] + for num in parameters["phase_number"]: + tdn.append(parameters[f"pulse_len_phase{num}"][self.freq_ind_sorted] / 1e6) + tdn = np.array(tdn) + for param in phase_freq_params: + for num in parameters["phase_number"]: + parameters[param].append(parameters[f"{param}_phase{num}"][self.freq_ind_sorted]) + for param in phase_params: + for num in parameters["phase_number"]: + parameters[param].append(parameters[f"{param}_phase{num}"]) anc = np.array(unpacked_data["ancillary"]) # convert to np array for easy slicing # Build variables in the output xarray Dataset @@ -639,13 +666,13 @@ def set_vendor(self) -> xr.Dataset: ), # parameters with channel dimension from XML file "XML_transmit_duration_nominal": ( - ["channel"], + ["phase_number", "channel"], tdn, {"long_name": "(From XML file) Nominal bandwidth of transmitted pulse"}, ), # tdn comes from parameters "XML_gain_correction": ( - ["channel"], - parameters["gain"][self.freq_ind_sorted], + ["phase_number", "channel"], + parameters["gain"], {"long_name": "(From XML file) Gain correction"}, ), "instrument_type": parameters["instrument_type"][0], @@ -660,8 +687,8 @@ def set_vendor(self) -> xr.Dataset: "parameter_version": parameters["parameter_version"], "configuration_version": parameters["configuration_version"], "XML_digitization_rate": ( - ["channel"], - parameters["dig_rate"][self.freq_ind_sorted], + ["phase_number", "channel"], + parameters["dig_rate"], { "long_name": "(From XML file) Number of samples per second in kHz that is " "processed by the A/D converter when digitizing the returned acoustic " @@ -669,8 +696,8 @@ def set_vendor(self) -> xr.Dataset: }, ), "XML_lockout_index": ( - ["channel"], - parameters["lock_out_index"][self.freq_ind_sorted], + ["phase_number", "channel"], + parameters["lock_out_index"], { "long_name": "(From XML file) The distance, rounded to the nearest " "Bin Size after the pulse is transmitted that over which AZFP will " @@ -713,17 +740,17 @@ def set_vendor(self) -> xr.Dataset: ), "Sv_offset": (["channel"], Sv_offset), "number_of_samples_digitized_per_pings": ( - ["channel"], - parameters["range_samples"][self.freq_ind_sorted], + ["phase_number", "channel"], + parameters["range_samples"], ), "number_of_digitized_samples_averaged_per_pings": ( - ["channel"], - parameters["range_averaging_samples"][self.freq_ind_sorted], + ["phase_number", "channel"], + parameters["range_averaging_samples"], ), # parameters with dim len=0 from XML file "XML_sensors_flag": parameters["sensors_flag"], "XML_burst_interval": ( - [], + ["phase_number"], parameters["burst_interval"], { "long_name": "Time in seconds between bursts or between pings if the burst " @@ -732,8 +759,14 @@ def set_vendor(self) -> xr.Dataset: ), "XML_sonar_serial_number": parameters["serial_number"], "number_of_frequency": parameters["num_freq"], - "number_of_pings_per_burst": parameters["pings_per_burst"], - "average_burst_pings_flag": parameters["average_burst_pings"], + "number_of_pings_per_burst": ( + ["phase_number"], + parameters["pings_per_burst"], + ), + "average_burst_pings_flag": ( + ["phase_number"], + parameters["average_burst_pings"], + ), # temperature coefficients from XML file **{ f"temperature_k{var}": ( @@ -789,6 +822,10 @@ def set_vendor(self) -> xr.Dataset: list(range(len(unpacked_data["ancillary"][0]))), ), "ad_len": (["ad_len"], list(range(len(unpacked_data["ad"][0])))), + "phase_number": ( + ["phase_number"], + sorted([int(num) for num in parameters["phase_number"]]), + ), }, ) return set_time_encodings(ds) diff --git a/echopype/echodata/echodata.py b/echopype/echodata/echodata.py index 7adeb3dbe..b1293d1b1 100644 --- a/echopype/echodata/echodata.py +++ b/echopype/echodata/echodata.py @@ -517,9 +517,21 @@ def _clip_by_time_dim(external_ds, ext_time_dim_name): if v["ext_time_dim_name"] == "scalar" ] for platform_var in scalar_vars: + # Set timestamp equal to the first ping time whenever either + # latitude or longitude is updated without a time dimension ext_var = mappings_expanded[platform_var]["external_var"] - # Replace the scalar value and add a history attribute - platform[platform_var].data = float(extra_platform_data[ext_var].data) + if platform_var == "latitude" or platform_var == "longitude": + platform[platform_var].data = np.array([extra_platform_data[ext_var].data]) + platform[platform_var] = platform[platform_var].assign_coords( + **{ + platform[platform_var].dims[0]: [ + self["Sonar/Beam_group1"]["ping_time"].data[0] + ] + } + ) + else: + # Replace the scalar value and add a history attribute + platform[platform_var].data = float(extra_platform_data[ext_var].data) platform[platform_var] = platform[platform_var].assign_attrs( **{"history": f"{history_attr}. From external {ext_var} variable."} ) diff --git a/echopype/tests/commongrid/conftest.py b/echopype/tests/commongrid/conftest.py index 27b166a03..9c30c504f 100644 --- a/echopype/tests/commongrid/conftest.py +++ b/echopype/tests/commongrid/conftest.py @@ -3,8 +3,12 @@ import xarray as xr import numpy as np import pandas as pd +from typing import Literal from echopype.consolidate import add_depth +from echopype.commongrid.utils import ( + get_distance_from_latlon, +) import echopype as ep @@ -87,15 +91,47 @@ def mock_Sv_sample(mock_parameters): return np.tile(depth_data, (channel_len, ping_time_len, 1)) +def _add_latlon_depth(ds_Sv, latlon=False, depth=False, lat_attrs={}, lon_attrs={}, depth_offset=0): + """Adds lat lon variables and/or depth to ds_Sv""" + if latlon: + # Add lat lon + n_pings = ds_Sv.ping_time.shape[0] + latitude = np.linspace(42.48916859, 42.49071833, num=n_pings) + longitude = np.linspace(-124.88296688, -124.81919229, num=n_pings) + + ds_Sv["latitude"] = (["ping_time"], latitude, lat_attrs) + ds_Sv["longitude"] = (["ping_time"], longitude, lon_attrs) + + # Need processing level code for compute MVBS to work! + ds_Sv.attrs["processing_level"] = "Level 2A" + + if depth: + # Add depth + ds_Sv = ds_Sv.pipe(add_depth, depth_offset=depth_offset) + return ds_Sv + + @pytest.fixture -def mock_Sv_dataset_regular(mock_parameters, mock_Sv_sample): +def mock_Sv_dataset_regular(mock_parameters, mock_Sv_sample, lat_attrs, lon_attrs, depth_offset): ds_Sv = _gen_Sv_echo_range_regular(**mock_parameters, ping_time_jitter_max_ms=0) ds_Sv["Sv"].data = mock_Sv_sample + + # Add latlon and depth + ds_Sv = _add_latlon_depth( + ds_Sv, + latlon=True, + depth=True, + lat_attrs=lat_attrs, + lon_attrs=lon_attrs, + depth_offset=depth_offset, + ) return ds_Sv @pytest.fixture -def mock_Sv_dataset_irregular(mock_parameters, mock_Sv_sample, mock_nan_ilocs): +def mock_Sv_dataset_irregular( + mock_parameters, mock_Sv_sample, mock_nan_ilocs, lat_attrs, lon_attrs, depth_offset +): depth_interval = [0.5, 0.32, 0.2] depth_ping_time_len = [2, 3, 5] ds_Sv = _gen_Sv_echo_range_irregular( @@ -105,6 +141,17 @@ def mock_Sv_dataset_irregular(mock_parameters, mock_Sv_sample, mock_nan_ilocs): ping_time_jitter_max_ms=30, # Added jitter to ping_time ) ds_Sv["Sv"].data = mock_Sv_sample + + # Add latlon and depth + ds_Sv = _add_latlon_depth( + ds_Sv, + latlon=True, + depth=True, + lat_attrs=lat_attrs, + lon_attrs=lon_attrs, + depth_offset=depth_offset, + ) + # Sprinkle nans around echo_range for pos in mock_nan_ilocs: ds_Sv["echo_range"][pos] = np.nan @@ -129,13 +176,29 @@ def mock_mvbs_array_regular(mock_Sv_dataset_regular, mock_mvbs_inputs, mock_para ping_time_bin = mock_mvbs_inputs["ping_time_bin"] range_bin = mock_mvbs_inputs["range_meter_bin"] channel_len = mock_parameters["channel_len"] - expected_mvbs_val = _get_expected_mvbs_val( - ds_Sv, ping_time_bin, range_bin, channel_len - ) + expected_mvbs_val = _get_expected_mvbs_val(ds_Sv, ping_time_bin, range_bin, channel_len) return expected_mvbs_val +@pytest.fixture +def mock_nasc_array_regular(mock_Sv_dataset_regular, mock_parameters): + """ + Mock Sv sample result from compute_MVBS + + Dimension: (2, 3, 5) + Ping time bin: 1s + Range bin: 2m + """ + ds_Sv = mock_Sv_dataset_regular + dist_bin = 0.5 + range_bin = 2 + channel_len = mock_parameters["channel_len"] + expected_nasc_val = _get_expected_nasc_val(ds_Sv, dist_bin, range_bin, channel_len) + + return expected_nasc_val + + @pytest.fixture def mock_mvbs_array_irregular(mock_Sv_dataset_irregular, mock_mvbs_inputs, mock_parameters): """ @@ -149,13 +212,29 @@ def mock_mvbs_array_irregular(mock_Sv_dataset_irregular, mock_mvbs_inputs, mock_ ping_time_bin = mock_mvbs_inputs["ping_time_bin"] range_bin = mock_mvbs_inputs["range_meter_bin"] channel_len = mock_parameters["channel_len"] - expected_mvbs_val = _get_expected_mvbs_val( - ds_Sv, ping_time_bin, range_bin, channel_len - ) + expected_mvbs_val = _get_expected_mvbs_val(ds_Sv, ping_time_bin, range_bin, channel_len) return expected_mvbs_val +@pytest.fixture +def mock_nasc_array_irregular(mock_Sv_dataset_irregular, mock_parameters): + """ + Mock Sv sample result from compute_MVBS + + Dimension: (2, 3, 5) + Ping time bin: 1s + Range bin: 2m + """ + ds_Sv = mock_Sv_dataset_irregular + dist_bin = 0.5 + range_bin = 2 + channel_len = mock_parameters["channel_len"] + expected_nasc_val = _get_expected_nasc_val(ds_Sv, dist_bin, range_bin, channel_len) + + return expected_nasc_val + + @pytest.fixture( params=[ ( @@ -285,15 +364,9 @@ def depth_offset(): @pytest.fixture def ds_Sv_echo_range_regular_w_latlon(ds_Sv_echo_range_regular, lat_attrs, lon_attrs): """Sv dataset with latitude and longitude""" - n_pings = ds_Sv_echo_range_regular.ping_time.shape[0] - latitude = np.linspace(42, 43, num=n_pings) - longitude = np.linspace(-124, -125, num=n_pings) - - ds_Sv_echo_range_regular["latitude"] = (["ping_time"], latitude, lat_attrs) - ds_Sv_echo_range_regular["longitude"] = (["ping_time"], longitude, lon_attrs) - - # Need processing level code for compute MVBS to work! - ds_Sv_echo_range_regular.attrs["processing_level"] = "Level 2A" + ds_Sv_echo_range_regular = _add_latlon_depth( + ds_Sv_echo_range_regular, latlon=True, lat_attrs=lat_attrs, lon_attrs=lon_attrs + ) return ds_Sv_echo_range_regular @@ -319,14 +392,228 @@ def ds_Sv_echo_range_irregular(random_number_generator): ) +# Helper functions for NASC testing +def _create_dataset(i, sv, dim, rng): + dims = { + "range_sample": np.arange(5), + "distance_nmi": np.arange(5), + } + # Add one for other channel + sv = sv + (rng.random() * 5) + Sv = ep.utils.compute._lin2log(sv) + ds_Sv = xr.Dataset( + { + "Sv": (list(dims.keys()), Sv), + "depth": (list(dims.keys()), np.array([dim] * 5).T), + "ping_time": ( + ["distance_nmi"], + pd.date_range("2020-01-01", periods=len(dim), freq="1min"), + ), + }, + coords=dict(channel=f"ch_{i}", **dims), + ) + return ds_Sv + + +def get_NASC_echoview(ds_Sv, ch_idx=0, r0=2, r1=20): + """ + Computes NASC using echoview's method, 1 channel only, + as described in https://gist.github.com/leewujung/3b058ab63c3b897b273b33b907b62f6d + """ + r = ds_Sv.depth.isel(channel=ch_idx, distance_nmi=0).values + # get r0 and r1 indexes + # these are used to slice the desired Sv samples + r0 = np.argmin(abs(r - r0)) + r1 = np.argmin(abs(r - r1)) + + sh = np.r_[np.diff(r), np.nan] + + sv = ds_Sv["Sv"].pipe(ep.utils.compute._log2lin).isel(channel=ch_idx).values + sv_mean_echoview = np.nanmean(sv[r0:r1]) + h_mean_echoview = np.sum(sh[r0:r1]) * sv.shape[1] / sv.shape[1] + + NASC_echoview = sv_mean_echoview * h_mean_echoview * 4 * np.pi * 1852**2 + return NASC_echoview + + +@pytest.fixture +def mock_Sv_dataset_NASC(mock_parameters, random_number_generator): + channel_len = mock_parameters["channel_len"] + dim0 = np.array([0.5, 1.5, 2.5, 3.5, 9]) + sv0 = np.array( + [ + [1.0, 2.0, 3.0, 4.0, np.nan], + [6.0, 7.0, 8.0, 9.0, 10.0], + [11.0, 12.0, 13.0, 14.0, 15.0], + [16.0, 17.0, 18.0, 19.0, np.nan], + [21.0, 22.0, 23.0, 24.0, 25.0], + ] + ) + return xr.concat( + [_create_dataset(i, sv0, dim0, random_number_generator) for i in range(channel_len)], + dim="channel", + ) + + # Helper functions to generate mock Sv and MVBS dataset +def _get_expected_nasc_val( + ds_Sv: xr.Dataset, dist_bin: str, range_bin: float, channel_len: int = 2 +) -> np.ndarray: + """ + Helper functions to generate expected NASC outputs from mock Sv dataset + by brute-force looping and compute the mean + + Parameters + ---------- + ds_Sv : xr.Dataset + Mock Sv dataset + dist_bin : float + Distance bin + range_bin : float + Range bin + channel_len : int, default 2 + Number of channels + """ + # Get distance from lat/lon in nautical miles + dist_nmi = get_distance_from_latlon(ds_Sv) + ds_Sv = ds_Sv.assign_coords({"distance_nmi": ("ping_time", dist_nmi)}).swap_dims( + {"ping_time": "distance_nmi"} + ) + + # create bin information along distance_nmi + # this computes the distance max since there might NaNs in the data + dist_max = ds_Sv["distance_nmi"].max() + dist_interval = np.arange(0, dist_max + dist_bin, dist_bin) + + # create bin information for depth + # this computes the depth max since there might NaNs in the data + depth_max = ds_Sv["depth"].max() + range_interval = np.arange(0, depth_max + range_bin, range_bin) + + sv = ds_Sv["Sv"].pipe(ep.utils.compute._log2lin) + + # Compute sv mean + sv_mean = _brute_mean_reduce_3d( + sv, ds_Sv, "depth", "distance_nmi", channel_len, dist_interval, range_interval + ) + + # Calculate denominator + h_mean_denom = np.ones(len(dist_interval) - 1) * np.nan + for x_idx in range(len(dist_interval) - 1): + x_range = ds_Sv["distance_nmi"].sel( + distance_nmi=slice(dist_interval[x_idx], dist_interval[x_idx + 1]) + ) + h_mean_denom[x_idx] = float(len(x_range.data)) + + # Calculate numerator + r_diff = ds_Sv["depth"].diff(dim="range_sample", label="lower") + depth = ds_Sv["depth"].isel(**{"range_sample": slice(0, -1)}) + h_mean_num = np.ones((channel_len, len(dist_interval) - 1, len(range_interval) - 1)) * np.nan + for ch_idx in range(channel_len): + for x_idx in range(len(dist_interval) - 1): + for r_idx in range(len(range_interval) - 1): + x_range = depth.isel(channel=ch_idx).sel( + **{"distance_nmi": slice(dist_interval[x_idx], dist_interval[x_idx + 1])} + ) + r_idx_active = np.logical_and( + x_range.data >= range_interval[r_idx], + x_range.data < range_interval[r_idx + 1], + ) + r_tmp = ( + r_diff.isel(channel=ch_idx) + .sel(**{"distance_nmi": slice(dist_interval[x_idx], dist_interval[x_idx + 1])}) + .data[r_idx_active] + ) + if 0 in r_tmp.shape: + h_mean_num[ch_idx, x_idx, r_idx] = np.nan + else: + h_mean_num[ch_idx, x_idx, r_idx] = np.sum(r_tmp) + + # Compute raw NASC + h_mean_num_da = xr.DataArray(h_mean_num, dims=["channel", "distance_nmi", "depth"]) + h_mean_denom_da = xr.DataArray(h_mean_denom, dims=["distance_nmi"]) + h_mean = h_mean_num_da / h_mean_denom_da + # Combine to compute NASC + return sv_mean * h_mean * 4 * np.pi * 1852**2 + + +def _brute_mean_reduce_3d( + sv: xr.DataArray, + ds_Sv: xr.Dataset, + range_var: Literal["echo_range", "depth"], + x_var: Literal["ping_time", "distance_nmi"], + channel_len: int, + x_interval: list, + range_interval: list, +) -> np.ndarray: + """ + Perform brute force reduction on sv data for 3 Dimensions + + Parameters + ---------- + sv : xr.DataArray + A DataArray containing ``sv`` data with coordinates + ds_Sv : xr.Dataset + A Dataset containing ``Sv`` and other variables, + depending on computation performed. + + For MVBS computation, this must contain ``Sv`` and ``echo_range`` data + with coordinates ``channel``, ``ping_time``, and ``range_sample`` + at bare minimum. + Or this can contain ``Sv`` and ``depth`` data with similar coordinates. + + For NASC computatioon this must contain ``Sv`` and ``depth`` data + with coordinates ``channel``, ``distance_nmi``, and ``range_sample``. + range_var: {'echo_range', 'depth'}, default 'echo_range' + The variable to use for range binning. + Either ``echo_range`` or ``depth``. + + **For NASC, this must be ``depth``.** + x_var : {'ping_time', 'distance_nmi'}, default 'ping_time' + The variable to use for x binning. This will determine + if computation is for MVBS or NASC. + channel_len : int + Number of channels + x_interval : list + 1D array or interval index representing + the bins required for ``ping_time`` or ``distance_nmi``. + range_interval : list + 1D array or interval index representing + the bins required for ``range_var`` + """ + mean_vals = np.ones((channel_len, len(x_interval) - 1, len(range_interval) - 1)) * np.nan + + for ch_idx in range(channel_len): + for x_idx in range(len(x_interval) - 1): + for r_idx in range(len(range_interval) - 1): + x_range = ( + ds_Sv[range_var] + .isel(channel=ch_idx) + .sel(**{x_var: slice(x_interval[x_idx], x_interval[x_idx + 1])}) + ) + r_idx_active = np.logical_and( + x_range.data >= range_interval[r_idx], + x_range.data < range_interval[r_idx + 1], + ) + sv_tmp = ( + sv.isel(channel=ch_idx) + .sel(**{x_var: slice(x_interval[x_idx], x_interval[x_idx + 1])}) + .data[r_idx_active] + ) + if 0 in sv_tmp.shape: + mean_vals[ch_idx, x_idx, r_idx] = np.nan + else: + mean_vals[ch_idx, x_idx, r_idx] = np.mean(sv_tmp) + return mean_vals + + def _get_expected_mvbs_val( ds_Sv: xr.Dataset, ping_time_bin: str, range_bin: float, channel_len: int = 2 ) -> np.ndarray: """ Helper functions to generate expected MVBS outputs from mock Sv dataset by brute-force looping and compute the mean - + Parameters ---------- ds_Sv : xr.Dataset @@ -350,30 +637,13 @@ def _get_expected_mvbs_val( # create bin information for echo_range # this computes the echo range max since there might NaNs in the data echo_range_max = ds_Sv["echo_range"].max() - range_interval = np.arange(0, echo_range_max + 2, range_bin) + range_interval = np.arange(0, echo_range_max + range_bin, range_bin) sv = ds_Sv["Sv"].pipe(ep.utils.compute._log2lin) - expected_mvbs_val = np.ones((2, len(ping_interval) - 1, len(range_interval) - 1)) * np.nan - - for ch_idx in range(channel_len): - for p_idx in range(len(ping_interval) - 1): - for r_idx in range(len(range_interval) - 1): - echo_range = ( - ds_Sv['echo_range'] - .isel(channel=ch_idx) - .sel(ping_time=slice(ping_interval[p_idx], ping_interval[p_idx+1])) - ) - r_idx_active = np.logical_and( - echo_range.data >= range_interval[r_idx], - echo_range.data < range_interval[r_idx+1] - ) - sv_tmp = sv.isel(channel=ch_idx).sel( - ping_time=slice(ping_interval[p_idx], ping_interval[p_idx+1])).data[r_idx_active] - if 0 in sv_tmp.shape: - expected_mvbs_val[ch_idx, p_idx, r_idx] = np.nan - else: - expected_mvbs_val[ch_idx, p_idx, r_idx] = np.mean(sv_tmp) + expected_mvbs_val = _brute_mean_reduce_3d( + sv, ds_Sv, "echo_range", "ping_time", channel_len, ping_interval, range_interval + ) return ep.utils.compute._lin2log(expected_mvbs_val) @@ -400,7 +670,7 @@ def _gen_Sv_echo_range_regular( Generate a Sv dataset with uniform echo_range across all ping_time. ping_time_jitter_max_ms controlled jitter in milliseconds in ping_time. - + Parameters ------------ channel_len @@ -457,7 +727,7 @@ def _gen_Sv_echo_range_irregular( Generate a Sv dataset with uniform echo_range across all ping_time. ping_time_jitter_max_ms controlled jitter in milliseconds in ping_time. - + Parameters ------------ channel_len @@ -468,7 +738,7 @@ def _gen_Sv_echo_range_irregular( depth intervals, may have multiple values depth_ping_time_len the number of pings to use each of the depth_interval - for example, with depth_interval=[0.5, 0.32, 0.13] + for example, with depth_interval=[0.5, 0.32, 0.13] and depth_ping_time_len=[100, 300, 200], the first 100 pings have echo_range with depth intervals of 0.5 m, the next 300 pings have echo_range with depth intervals of 0.32 m, diff --git a/echopype/tests/commongrid/test_api.py b/echopype/tests/commongrid/test_api.py index d618d6443..6e3a84385 100644 --- a/echopype/tests/commongrid/test_api.py +++ b/echopype/tests/commongrid/test_api.py @@ -1,6 +1,17 @@ import pytest -import echopype as ep + import numpy as np +import pandas as pd +from flox.xarray import xarray_reduce +import echopype as ep +from echopype.consolidate import add_location, add_depth +from echopype.commongrid.utils import ( + _parse_x_bin, + _groupby_x_along_channels, + get_distance_from_latlon, + compute_raw_NASC +) +from echopype.tests.commongrid.conftest import get_NASC_echoview # Utilities Tests @@ -32,11 +43,126 @@ def test__parse_x_bin(x_bin, x_label, expected_result): else: assert ep.commongrid.api._parse_x_bin(x_bin, x_label) == expected_result + +@pytest.mark.unit +@pytest.mark.parametrize( + ["range_var", "lat_lon"], [("depth", False), ("echo_range", False)] +) +def test__groupby_x_along_channels(request, range_var, lat_lon): + """Testing the underlying function of compute_MVBS and compute_NASC""" + range_bin = 20 + ping_time_bin = "20S" + method = "map-reduce" + + flox_kwargs = {"reindex": True} + + # Retrieve the correct dataset + if range_var == "depth": + ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular_w_depth") + else: + ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular") + + # compute range interval + echo_range_max = ds_Sv[range_var].max() + range_interval = np.arange(0, echo_range_max + range_bin, range_bin) + + # create bin information needed for ping_time + d_index = ( + ds_Sv["ping_time"] + .resample(ping_time=ping_time_bin, skipna=True) + .asfreq() + .indexes["ping_time"] + ) + ping_interval = d_index.union([d_index[-1] + pd.Timedelta(ping_time_bin)]) + + sv_mean = _groupby_x_along_channels( + ds_Sv, + range_interval, + x_interval=ping_interval, + x_var="ping_time", + range_var=range_var, + method=method, + **flox_kwargs + ) + + # Check that the range_var is in the dimension + assert f"{range_var}_bins" in sv_mean.dims + + # NASC Tests @pytest.mark.integration -@pytest.mark.skip(reason="NASC is not implemented yet") -def test_compute_NASC(test_data_samples): - pass +@pytest.mark.parametrize("compute_mvbs", [True, False]) +def test_compute_NASC(request, test_data_samples, compute_mvbs): + if any(request.node.callspec.id.startswith(id) for id in ["ek80", "azfp"]): + pytest.skip("Skipping NASC test for ek80 and azfp, no data available") + + ( + filepath, + sonar_model, + azfp_xml_path, + range_kwargs, + ) = test_data_samples + ed = ep.open_raw(filepath, sonar_model, azfp_xml_path) + if ed.sonar_model.lower() == "azfp": + avg_temperature = ed["Environment"]["temperature"].values.mean() + env_params = { + "temperature": avg_temperature, + "salinity": 27.9, + "pressure": 59, + } + range_kwargs["env_params"] = env_params + if "azfp_cal_type" in range_kwargs: + range_kwargs.pop("azfp_cal_type") + ds_Sv = ep.calibrate.compute_Sv(ed, **range_kwargs) + + # Adds location and depth information + ds_Sv = ds_Sv.pipe(add_location, ed).pipe( + add_depth, depth_offset=ed["Platform"].water_level.values + ) + + if compute_mvbs: + range_bin = "2m" + ping_time_bin = "1s" + + ds_Sv = ds_Sv.pipe( + ep.commongrid.compute_MVBS, + range_var="depth", + range_bin=range_bin, + ping_time_bin=ping_time_bin, + ) + + dist_bin = "0.5nmi" + range_bin = "10m" + + ds_NASC = ep.commongrid.compute_NASC(ds_Sv, range_bin=range_bin, dist_bin=dist_bin) + assert ds_NASC is not None + + dist_nmi = get_distance_from_latlon(ds_Sv) + + # Check dimensions + dist_bin = _parse_x_bin(dist_bin, "dist_bin") + range_bin = _parse_x_bin(range_bin) + da_NASC = ds_NASC["NASC"] + assert da_NASC.dims == ("channel", "distance", "depth") + assert np.all(ds_NASC["channel"].values == ds_Sv["channel"].values) + assert da_NASC["depth"].size == np.ceil(ds_Sv["depth"].max() / range_bin) + assert da_NASC["distance"].size == np.ceil(dist_nmi.max() / dist_bin) + + +@pytest.mark.unit +def test_simple_NASC_Echoview_values(mock_Sv_dataset_NASC): + dist_interval = np.array([-5, 10]) + range_interval = np.array([1, 5]) + raw_NASC = compute_raw_NASC( + mock_Sv_dataset_NASC, + range_interval, + dist_interval, + ) + for ch_idx, _ in enumerate(raw_NASC.channel): + NASC_echoview = get_NASC_echoview(mock_Sv_dataset_NASC, ch_idx) + assert np.allclose( + raw_NASC.sv.isel(channel=ch_idx)[0, 0], NASC_echoview, atol=1e-10, rtol=1e-10 + ) # MVBS Tests @@ -123,7 +249,7 @@ def test_compute_MVBS_invalid_range_var(ds_Sv_echo_range_regular, range_var): ep.commongrid.compute_MVBS(ds_Sv_echo_range_regular, range_var=range_var) elif range_var == "depth": with pytest.raises( - ValueError, match=f"range_var '{range_var}' does not exist in the input dataset." + ValueError, match=r"Input Sv dataset must contain all of the following variables" ): ep.commongrid.compute_MVBS(ds_Sv_echo_range_regular, range_var=range_var) else: @@ -287,3 +413,36 @@ def _parse_nans(mvbs, ds_Sv) -> np.ndarray: # Ensures that the computation of MVBS takes doesn't take into account NaN values # that are sporadically placed in the echo_range values assert np.array_equal(np.isnan(ds_MVBS.Sv.values), expected_outputs) + + +@pytest.mark.integration +@pytest.mark.parametrize( + ("er_type"), + [ + ("regular"), + ("irregular"), + ], +) +def test_compute_NASC_values(request, er_type): + """Tests for the values of compute_NASC on regular and irregular data.""" + + range_bin = "2m" + dist_bin = "0.5nmi" + + if er_type == "regular": + ds_Sv = request.getfixturevalue("mock_Sv_dataset_regular") + expected_nasc = request.getfixturevalue("mock_nasc_array_regular") + else: + # Mock irregular dataset contains jitter + # and NaN values in the bottom echo_range + ds_Sv = request.getfixturevalue("mock_Sv_dataset_irregular") + expected_nasc = request.getfixturevalue("mock_nasc_array_irregular") + + ds_NASC = ep.commongrid.compute_NASC(ds_Sv, range_bin=range_bin, dist_bin=dist_bin) + + assert ds_NASC.NASC.shape == expected_nasc.shape + # Floating digits need to check with all close not equal + # Compare the values of the MVBS array with the expected values + assert np.allclose( + ds_NASC.NASC.values, expected_nasc.values, atol=1e-10, rtol=1e-10, equal_nan=True + ) diff --git a/echopype/tests/commongrid/test_mvbs.py b/echopype/tests/commongrid/test_mvbs.py deleted file mode 100644 index 78a77be0a..000000000 --- a/echopype/tests/commongrid/test_mvbs.py +++ /dev/null @@ -1,67 +0,0 @@ -import numpy as np -import pandas as pd -import pytest -from echopype.commongrid.mvbs import get_MVBS_along_channels -from echopype.consolidate.api import POSITION_VARIABLES -from flox.xarray import xarray_reduce - -@pytest.mark.unit -@pytest.mark.parametrize(["range_var", "lat_lon"], [("depth", False), ("echo_range", True), ("echo_range", False)]) -def test_get_MVBS_along_channels(request, range_var, lat_lon): - """Testing the underlying function of compute_MVBS""" - range_bin = 20 - ping_time_bin = "20S" - method = "map-reduce" - - flox_kwargs = { - "reindex": True - } - - # Retrieve the correct dataset - if range_var == "depth": - ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular_w_depth") - elif range_var == "echo_range" and lat_lon is True: - ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular_w_latlon") - else: - ds_Sv = request.getfixturevalue("ds_Sv_echo_range_regular") - - # compute range interval - echo_range_max = ds_Sv[range_var].max() - range_interval = np.arange(0, echo_range_max + range_bin, range_bin) - - # create bin information needed for ping_time - d_index = ( - ds_Sv["ping_time"] - .resample(ping_time=ping_time_bin, skipna=True) - .asfreq() - .indexes["ping_time"] - ) - ping_interval = d_index.union([d_index[-1] + pd.Timedelta(ping_time_bin)]) - - raw_MVBS = get_MVBS_along_channels( - ds_Sv, range_interval, ping_interval, - range_var=range_var, method=method, **flox_kwargs - ) - - # Check that the range_var is in the dimension - assert f"{range_var}_bins" in raw_MVBS.dims - - # When it's echo_range and lat_lon, the dataset should have positions - if range_var == "echo_range" and lat_lon is True: - assert raw_MVBS.attrs["has_positions"] is True - assert all(v in raw_MVBS for v in POSITION_VARIABLES) - - # Compute xarray reduce manually for this - expected_Pos = xarray_reduce( - ds_Sv[POSITION_VARIABLES], - ds_Sv["ping_time"], - func="nanmean", - expected_groups=(ping_interval), - isbin=True, - method=method, - ) - - for v in POSITION_VARIABLES: - assert np.array_equal(raw_MVBS[v].data, expected_Pos[v].data) - else: - assert raw_MVBS.attrs["has_positions"] is False diff --git a/echopype/tests/commongrid/test_nasc.py b/echopype/tests/commongrid/test_nasc.py deleted file mode 100644 index 0430f99c1..000000000 --- a/echopype/tests/commongrid/test_nasc.py +++ /dev/null @@ -1,38 +0,0 @@ -import pytest - -import numpy as np - -from echopype import open_raw -from echopype.calibrate import compute_Sv -# from echopype.commongrid import compute_NASC -from echopype.commongrid.nasc import ( - get_distance_from_latlon, - get_depth_bin_info, - get_dist_bin_info, - get_distance_from_latlon, -) -from echopype.consolidate import add_location, add_depth - - -@pytest.fixture -def ek60_path(test_path): - return test_path['EK60'] - - -# def test_compute_NASC(ek60_path): -# raw_path = ek60_path / "ncei-wcsd/Summer2017-D20170620-T011027.raw" - -# ed = open_raw(raw_path, sonar_model="EK60") -# ds_Sv = add_depth(add_location(compute_Sv(ed), ed, nmea_sentence="GGA")) -# cell_dist = 0.1 -# cell_depth = 20 -# ds_NASC = compute_NASC(ds_Sv, cell_dist, cell_depth) - -# dist_nmi = get_distance_from_latlon(ds_Sv) - -# # Check dimensions -# da_NASC = ds_NASC["NASC"] -# assert da_NASC.dims == ("channel", "distance", "depth") -# assert np.all(ds_NASC["channel"].values == ds_Sv["channel"].values) -# assert da_NASC["depth"].size == np.ceil(ds_Sv["depth"].max() / cell_depth) -# assert da_NASC["distance"].size == np.ceil(dist_nmi.max() / cell_dist) diff --git a/echopype/tests/convert/test_convert_azfp.py b/echopype/tests/convert/test_convert_azfp.py index 4b8f411b6..487bec243 100644 --- a/echopype/tests/convert/test_convert_azfp.py +++ b/echopype/tests/convert/test_convert_azfp.py @@ -156,8 +156,8 @@ def test_convert_azfp_01a_different_ranges(azfp_path): check_platform_required_scalar_vars(echodata) -def test_convert_azfp_01a_notemperature_notilt(azfp_path): - """Test converting file with no valid temperature or tilt data.""" +def test_convert_azfp_01a_no_temperature_pressure_tilt(azfp_path): + """Test converting file with no valid temperature, pressure and tilt data.""" azfp_01a_path = azfp_path / 'rutgers_glider_notemperature/22052500.01A' azfp_xml_path = azfp_path / 'rutgers_glider_notemperature/22052501.XML' @@ -169,6 +169,10 @@ def test_convert_azfp_01a_notemperature_notilt(azfp_path): assert "temperature" in echodata["Environment"] assert echodata["Environment"]["temperature"].isnull().all() + # Pressure variable is present in the Environment group and its values are all nan + assert "pressure" in echodata["Environment"] + assert echodata["Environment"]["pressure"].isnull().all() + # Tilt variables are present in the Platform group and their values are all nan assert "tilt_x" in echodata["Platform"] assert "tilt_y" in echodata["Platform"] @@ -176,15 +180,33 @@ def test_convert_azfp_01a_notemperature_notilt(azfp_path): assert echodata["Platform"]["tilt_y"].isnull().all() +def test_convert_azfp_01a_pressure_temperature(azfp_path): + """Test converting file with valid pressure and temperature data.""" + azfp_01a_path = azfp_path / 'pressure' / '22042221.01A' + azfp_xml_path = azfp_path / 'pressure' / '22042220.XML' + + echodata = open_raw( + raw_file=azfp_01a_path, sonar_model='AZFP', xml_path=azfp_xml_path + ) + + # Pressure variable is present in the Environment group and its values are not all nan + assert "pressure" in echodata["Environment"] + assert not echodata["Environment"]["pressure"].isnull().all() + + # Temperature variable is present in the Environment group and its values are not all nan + assert "temperature" in echodata["Environment"] + assert not echodata["Environment"]["temperature"].isnull().all() + + def test_load_parse_azfp_xml(azfp_path): - azfp_01a_path = azfp_path / '17082117.01A' - azfp_xml_path = azfp_path / '17030815.XML' - parseAZFP = ParseAZFP(str(azfp_01a_path), str(azfp_xml_path)) + azfp_xml_path = azfp_path / '23081211.XML' + parseAZFP = ParseAZFP(None, str(azfp_xml_path)) parseAZFP.load_AZFP_xml() expected_params = ['instrument_type_string', 'instrument_type', 'major', 'minor', 'date', 'program_name', 'program', 'CPU', 'serial_number', 'board_version', - 'file_version', 'parameter_version', 'configuration_version', 'eclock', + 'file_version', 'parameter_version', 'configuration_version', 'backplane', + 'delay_transmission_string', 'delay_transmission', 'eclock', 'digital_board_version', 'sensors_flag_pressure_sensor_installed', 'sensors_flag_paros_installed', 'sensors_flag', 'U0', 'Y1', 'Y2', 'Y3', 'C1', 'C2', 'C3', 'D1', 'D2', 'T1', 'T2', 'T3', 'T4', 'T5', 'X_a', 'X_b', @@ -194,33 +216,55 @@ def test_load_parse_azfp_xml(azfp_path): 'VTX3', 'BP', 'EL', 'DS', 'min_pulse_len', 'sound_speed', 'start_date_svalue', 'start_date', 'num_frequencies', 'num_phases', 'data_output_svalue', 'data_output', 'frequency_units', 'frequency', - 'phase_number', 'phase_type_svalue', 'phase_type', 'duration_svalue', - 'duration', 'ping_period_units', 'ping_period', 'burst_interval_units', - 'burst_interval', 'pings_per_burst_units', 'pings_per_burst', - 'average_burst_pings_units', 'average_burst_pings', 'frequency_number', - 'acquire_frequency_units', 'acquire_frequency', 'pulse_len_units', - 'pulse_len', 'dig_rate_units', 'dig_rate', 'range_samples_units', - 'range_samples', 'range_averaging_samples_units', 'range_averaging_samples', - 'lock_out_index_units', 'lock_out_index', 'gain_units', 'gain', - 'storage_format_units', 'storage_format'] + 'phase_number', 'start_date_svalue_phase1', 'start_date_phase1', + 'phase_type_svalue_phase1', 'phase_type_phase1', 'duration_svalue_phase1', + 'duration_phase1', 'ping_period_units_phase1', 'ping_period_phase1', + 'burst_interval_units_phase1', 'burst_interval_phase1', + 'pings_per_burst_units_phase1', 'pings_per_burst_phase1', + 'average_burst_pings_units_phase1', 'average_burst_pings_phase1', + 'frequency_number_phase1', 'acquire_frequency_units_phase1', + 'acquire_frequency_phase1', 'pulse_len_units_phase1', 'pulse_len_phase1', + 'dig_rate_units_phase1', 'dig_rate_phase1', 'range_samples_units_phase1', + 'range_samples_phase1', 'range_averaging_samples_units_phase1', + 'range_averaging_samples_phase1', 'lock_out_index_units_phase1', + 'lock_out_index_phase1', 'gain_units_phase1', 'gain_phase1', + 'storage_format_units_phase1', 'storage_format_phase1', + 'start_date_svalue_phase2', 'start_date_phase2', 'phase_type_svalue_phase2', + 'phase_type_phase2', 'duration_svalue_phase2', 'duration_phase2', + 'ping_period_units_phase2', 'ping_period_phase2', + 'burst_interval_units_phase2', 'burst_interval_phase2', + 'pings_per_burst_units_phase2', 'pings_per_burst_phase2', + 'average_burst_pings_units_phase2', 'average_burst_pings_phase2', + 'frequency_number_phase2', 'acquire_frequency_units_phase2', + 'acquire_frequency_phase2', 'pulse_len_units_phase2', 'pulse_len_phase2', + 'dig_rate_units_phase2', 'dig_rate_phase2', 'range_samples_units_phase2', + 'range_samples_phase2', 'range_averaging_samples_units_phase2', + 'range_averaging_samples_phase2', 'lock_out_index_units_phase2', + 'lock_out_index_phase2', 'gain_units_phase2', 'gain_phase2', + 'storage_format_units_phase2', 'storage_format_phase2', 'rt_version', + 'rt_frequency', 'enabled', 'direction', 'water_depth_high_tide', + 'instrument_depth_high_tide'] assert set(parseAZFP.parameters.keys()) == set(expected_params) assert list(set(parseAZFP.parameters['instrument_type_string']))[0] == 'AZFP' assert isinstance(parseAZFP.parameters['num_freq'], int) - assert isinstance(parseAZFP.parameters['pulse_len'], list) assert parseAZFP.parameters['num_freq'] == 4 - assert len(parseAZFP.parameters['frequency_number']) == 4 - assert parseAZFP.parameters['frequency_number'] == ['1', '2', '3', '4'] - assert parseAZFP.parameters['kHz'] == [125, 200, 455, 769] - - expected_len_params = ['acquire_frequency', 'pulse_len', 'dig_rate', 'range_samples', - 'range_averaging_samples', 'lock_out_index', 'gain', 'storage_format'] - assert all(len(parseAZFP.parameters[x]) == 4 for x in expected_len_params) - assert parseAZFP.parameters['acquire_frequency'] == [1, 1, 1, 1] - assert parseAZFP.parameters['pulse_len'] == [300, 300, 300, 300] - assert parseAZFP.parameters['dig_rate'] == [20000, 20000, 20000, 20000] - assert parseAZFP.parameters['range_samples'] == [1752, 1752, 1764, 540] - assert parseAZFP.parameters['range_averaging_samples'] == [4, 4, 4, 4] - assert parseAZFP.parameters['lock_out_index'] == [0, 0, 0, 0] - assert parseAZFP.parameters['gain'] == [1, 1, 1, 1] - assert parseAZFP.parameters['storage_format'] == [1, 1, 1, 1] - + assert parseAZFP.parameters['kHz'] == [67, 120, 200, 455] + + expected_len_params = ['acquire_frequency', 'pulse_len', 'dig_rate', + 'range_samples', 'range_averaging_samples', + 'lock_out_index', 'gain', 'storage_format'] + for num in parseAZFP.parameters["phase_number"]: + assert isinstance(parseAZFP.parameters[f"pulse_len_phase{num}"], list) + assert len(parseAZFP.parameters[f"acquire_frequency_phase{num}"]) == 4 + assert all(len(parseAZFP.parameters[f"{x}_phase{num}"]) == 4 for x in expected_len_params) + assert parseAZFP.parameters[f"frequency_number_phase{num}"] == ['1', '2', '3', '4'] + assert parseAZFP.parameters[f"acquire_frequency_phase{num}"] == [1, 1, 1, 1] + assert parseAZFP.parameters[f"dig_rate_phase{num}"] == [20000, 20000, 20000, 20000] + assert parseAZFP.parameters[f"range_averaging_samples_phase{num}"] == [1, 1, 1, 1] + assert parseAZFP.parameters[f"lock_out_index_phase{num}"] == [0, 0, 0, 0] + assert parseAZFP.parameters[f"gain_phase{num}"] == [1, 1, 1, 1] + assert parseAZFP.parameters[f"storage_format_phase{num}"] == [0, 0, 0, 0] + assert parseAZFP.parameters['pulse_len_phase1'] == [1000, 1000, 1000, 1000] + assert parseAZFP.parameters['pulse_len_phase2'] == [0, 0, 0, 0] + assert parseAZFP.parameters['range_samples_phase1'] == [8273, 8273, 8273, 8273] + assert parseAZFP.parameters['range_samples_phase2'] == [2750, 2750, 2750, 2750] diff --git a/echopype/tests/convert/test_parsed_to_zarr.py b/echopype/tests/convert/test_parsed_to_zarr.py index 9b836753c..1fb30ebc4 100644 --- a/echopype/tests/convert/test_parsed_to_zarr.py +++ b/echopype/tests/convert/test_parsed_to_zarr.py @@ -15,6 +15,8 @@ from echopype.echodata.convention import sonarnetcdf_1 from echopype.convert.api import _check_file, SONAR_MODELS +pytestmark = pytest.mark.skip(reason="Removed Parsed2Zarr") + test_bucket_name = "echopype-test" port = 5555 endpoint_uri = "http://127.0.0.1:%s/" % port diff --git a/echopype/tests/echodata/test_echodata.py b/echopype/tests/echodata/test_echodata.py index e78c007eb..d58800ba9 100644 --- a/echopype/tests/echodata/test_echodata.py +++ b/echopype/tests/echodata/test_echodata.py @@ -1,6 +1,5 @@ from textwrap import dedent -import os import fsspec from pathlib import Path import shutil @@ -9,7 +8,6 @@ from zarr.errors import GroupNotFoundError import echopype -from echopype.calibrate.env_params_old import EnvParams from echopype.echodata import EchoData from echopype import open_converted from echopype.calibrate.calibrate_ek import CalibrateEK60, CalibrateEK80 @@ -325,8 +323,8 @@ def test_get_dataset(self, converted_zarr): def test_to_zarr_consolidated(self, mock_echodata, consolidated): """ Tests to_zarr consolidation. Currently, this test uses a mock EchoData object that only - has attributes. The consolidated flag provided will be used in every to_zarr call (which - is used to write each EchoData group to zarr_path). + has attributes. The consolidated flag provided will be used in every to_zarr call (which + is used to write each EchoData group to zarr_path). """ zarr_path = Path('test.zarr') mock_echodata.to_zarr(str(zarr_path), consolidated=consolidated, overwrite=True) @@ -693,3 +691,34 @@ def test_update_platform_no_update(test_path): variable_mappings = {"longitude": "longitude", "latitude": "latitude"} ed.update_platform(extra_platform_data, variable_mappings=variable_mappings) + +def test_update_platform_latlon_notimestamp(test_path): + raw_file = test_path["EK60"] / "ooi" / "CE02SHBP-MJ01C-07-ZPLSCB101_OOI-D20191201-T000000.raw" + ed = echopype.open_raw(raw_file, sonar_model="EK60") + + extra_platform_data = xr.Dataset( + { + "lon": ([], float(-100.0)), + "lat": ([], float(-50.0)), + } + ) + + platform_preexisting_dims = ed["Platform"].dims + + # variable names in mappings different from actual external dataset + variable_mappings = {"longitude": "lon", "latitude": "lat"} + + ed.update_platform(extra_platform_data, variable_mappings=variable_mappings) + + # Updated variables are not all nan + for variable in variable_mappings.keys(): + assert not np.isnan(ed["Platform"][variable].values).all() + + # Number of dimensions in Platform group should be as previous + assert len(ed["Platform"].dims) == len(platform_preexisting_dims) + + # Dimension assignment + assert ed["Platform"]["longitude"].dims[0] == ed["Platform"]["latitude"].dims[0] + assert ed["Platform"]["longitude"].dims[0] in platform_preexisting_dims + assert ed["Platform"]["latitude"].dims[0] in platform_preexisting_dims + assert ed['Platform']['longitude'].coords['time1'].values[0] == ed['Sonar/Beam_group1'].ping_time.data[0] diff --git a/echopype/tests/utils/test_utils_misc.py b/echopype/tests/utils/test_utils_misc.py new file mode 100644 index 000000000..5765acac9 --- /dev/null +++ b/echopype/tests/utils/test_utils_misc.py @@ -0,0 +1,31 @@ +import pytest + +import numpy as np +from echopype.utils.misc import depth_from_pressure + + +def test_depth_from_pressure(): + # A single pressure value and defaults for the other arguments + pressure = 100.0 + depth = depth_from_pressure(pressure) + assert np.isclose(depth, 99.2954) + + # Array of pressure and list of latitude values + pressure = np.array([100.0, 101.0, 101.0]) + latitude = [0.0, 30.0, 50.0] + depth = depth_from_pressure(pressure, latitude) + assert np.allclose(depth, [99.4265, 100.2881, 100.1096]) + + # Scalars specified for all 3 arguments + pressure = 1000.0 + latitude = 0.0 + atm_pres_surf = 10.1325 # standard atm pressure at sea level + depth = depth_from_pressure(pressure, latitude, atm_pres_surf) + assert np.isclose(depth, 982.0882) + + # ValueError triggered by argument arrays having different lengths + pressure = np.array([100.0, 101.0, 101.0]) + latitude = [0.0, 30.0] + with pytest.raises(ValueError) as excinfo: + depth = depth_from_pressure(pressure, latitude) + assert str(excinfo.value) == "Sequence shape or size does not match pressure" diff --git a/echopype/utils/coding.py b/echopype/utils/coding.py index 626aeb6e5..8679296e3 100644 --- a/echopype/utils/coding.py +++ b/echopype/utils/coding.py @@ -208,6 +208,9 @@ def set_zarr_encodings( chunks = _get_auto_chunk(val, chunk_size=chunk_size) encoding[name]["chunks"] = chunks + if PREFERRED_CHUNKS in encoding[name]: + # Remove 'preferred_chunks', use chunks only instead + encoding[name].pop(PREFERRED_CHUNKS) return encoding diff --git a/echopype/utils/io.py b/echopype/utils/io.py index 2350eef35..8f54480d8 100644 --- a/echopype/utils/io.py +++ b/echopype/utils/io.py @@ -11,6 +11,7 @@ import fsspec import xarray as xr +from dask.array import Array as DaskArray from fsspec import FSMap from fsspec.implementations.local import LocalFileSystem @@ -61,7 +62,8 @@ def save_file(ds, path, mode, engine, group=None, compression_settings=None, **k elif engine == "zarr": # Ensure that encoding and chunks match for var, enc in encoding.items(): - ds[var] = ds[var].chunk(enc.get("chunks", {})) + if isinstance(ds[var].data, DaskArray): + ds[var] = ds[var].chunk(enc.get("chunks", {})) ds.to_zarr(store=path, mode=mode, group=group, encoding=encoding, **kwargs) else: raise ValueError(f"{engine} is not a supported save format") diff --git a/echopype/utils/misc.py b/echopype/utils/misc.py index 838f3ece6..cb03d43b6 100644 --- a/echopype/utils/misc.py +++ b/echopype/utils/misc.py @@ -1,3 +1,11 @@ +from typing import List, Optional, Tuple, Union + +import numpy as np +from numpy.typing import NDArray + +FloatSequence = Union[List[float], Tuple[float], NDArray[float]] + + def camelcase2snakecase(camel_case_str): """ Convert string from CamelCase to snake_case @@ -13,6 +21,72 @@ def camelcase2snakecase(camel_case_str): return camel_case_str.lower() +def depth_from_pressure( + pressure: Union[float, FloatSequence], + latitude: Optional[Union[float, FloatSequence]] = 30.0, + atm_pres_surf: Optional[Union[float, FloatSequence]] = 0.0, +) -> NDArray[float]: + """ + Convert pressure to depth using UNESCO 1983 algorithm. + + UNESCO. 1983. Algorithms for computation of fundamental properties of seawater (Pressure to + Depth conversion, pages 25-27). Prepared by Fofonoff, N.P. and Millard, R.C. UNESCO technical + papers in marine science, 44. http://unesdoc.unesco.org/images/0005/000598/059832eb.pdf + + Parameters + ---------- + pressure : Union[float, FloatSequence] + Pressure in dbar + latitude : Union[float, FloatSequence], default=30.0 + Latitude in decimal degrees. + atm_pres_surf : Union[float, FloatSequence], default=0.0 + Atmospheric pressure at the surface in dbar. + Use the default 0.0 value if pressure is corrected to be 0 at the surface. + Otherwise, enter a correction for pressure due to air, sea ice and any other + medium that may be present + + Returns + ------- + depth : NDArray[float] + Depth in meters + """ + + def _as_nparray_check(v, check_vs_pressure=False): + """ + Convert to np.array if not already a np.array. + Ensure latitude and atm_pres_surf are of the same size and shape as + pressure if they are not scalar. + """ + v_array = np.array(v) if not isinstance(v, np.ndarray) else v + if check_vs_pressure: + if v_array.size != 1: + if v_array.size != pressure.size or v_array.shape != pressure.shape: + raise ValueError("Sequence shape or size does not match pressure") + return v_array + + pressure = _as_nparray_check(pressure) + latitude = _as_nparray_check(latitude, check_vs_pressure=True) + atm_pres_surf = _as_nparray_check(atm_pres_surf, check_vs_pressure=True) + + # Constants + g = 9.780318 + c1 = 9.72659 + c2 = -2.2512e-5 + c3 = 2.279e-10 + c4 = -1.82e-15 + k1 = 5.2788e-3 + k2 = 2.36e-5 + k3 = 1.092e-6 + + # Calculate depth + pressure = pressure - atm_pres_surf + depth_w_g = c1 * pressure + c2 * pressure**2 + c3 * pressure**3 + c4 * pressure**4 + x = np.sin(np.deg2rad(latitude)) + gravity = g * (1.0 + k1 * x**2 + k2 * x**4) + k3 * pressure + depth = depth_w_g / gravity + return depth + + def frequency_nominal_to_channel(source_Sv, frequency_nominal: int): """ Given a value for a nominal frequency, returns the channel associated with it @@ -23,3 +97,4 @@ def frequency_nominal_to_channel(source_Sv, frequency_nominal: int): assert len(chan) == 1, "Frequency not uniquely identified" channel = chan[0] return channel +