From 3c7bba53fc21069f0e03beb081543dbdca4101c9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 22 Dec 2022 11:09:25 -0800 Subject: [PATCH 1/5] Bump actions/cache from 3.0.11 to 3.2.0 (#18) Bumps [actions/cache](https://github.com/actions/cache) from 3.0.11 to 3.2.0. - [Release notes](https://github.com/actions/cache/releases) - [Changelog](https://github.com/actions/cache/blob/main/RELEASES.md) - [Commits](https://github.com/actions/cache/compare/v3.0.11...v3.2.0) --- updated-dependencies: - dependency-name: actions/cache dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/windows.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/windows.yaml b/.github/workflows/windows.yaml index eb9257d8a..324189fa5 100644 --- a/.github/workflows/windows.yaml +++ b/.github/workflows/windows.yaml @@ -51,7 +51,7 @@ jobs: python-version: ${{ matrix.python-version }} architecture: x64 - name: Cache conda - uses: actions/cache@v3.0.11 + uses: actions/cache@v3.2.0 env: # Increase this value to reset cache if '.ci_helpers/py{0}.yaml' has not changed CACHE_NUMBER: 0 From 97c66a8350c75809ef2ac862cad186468e73d0d7 Mon Sep 17 00:00:00 2001 From: Anant Mittal Date: Sun, 25 Feb 2024 12:41:14 -0800 Subject: [PATCH 2/5] Update compress_pulse to Parallelize Convolution (#1208) Update compress_pulse to Parallelize Convolution. This addresses #1164. --- echopype/calibrate/ek80_complex.py | 25 +- .../tests/calibrate/test_calibrate_ek80.py | 233 +++++++++++++----- 2 files changed, 186 insertions(+), 72 deletions(-) diff --git a/echopype/calibrate/ek80_complex.py b/echopype/calibrate/ek80_complex.py index fc6372e9e..5c93118b1 100644 --- a/echopype/calibrate/ek80_complex.py +++ b/echopype/calibrate/ek80_complex.py @@ -273,34 +273,27 @@ def compress_pulse(backscatter: xr.DataArray, chirp: Dict) -> xr.DataArray: A data array containing pulse compression output. """ pc_all = [] + for chan in backscatter["channel"]: + # Select channel `chan` and drop the specific beam dimension if all of the values are nan. backscatter_chan = backscatter.sel(channel=chan).dropna(dim="beam", how="all") tx = chirp[str(chan.values)] replica = np.flipud(np.conj(tx)) + pc = xr.apply_ufunc( - lambda m: np.apply_along_axis( - lambda m: (signal.convolve(m, replica, mode="full")[tx.size - 1 :]), - axis=2, - arr=m, - ), + lambda m: (signal.convolve(m, replica, mode="full")[replica.size - 1 :]), backscatter_chan, input_core_dims=[["range_sample"]], output_core_dims=[["range_sample"]], - # exclude_dims={"range_sample"}, - ) + dask="parallelized", + vectorize=True, + output_dtypes=[np.complex64], + ).compute() pc_all.append(pc) - pc_all = xr.DataArray( - pc_all, - coords={ - "channel": backscatter["channel"], - "ping_time": backscatter["ping_time"], - "beam": backscatter["beam"], - "range_sample": backscatter["range_sample"], - }, - ) + pc_all = xr.concat(pc_all, dim="channel") return pc_all diff --git a/echopype/tests/calibrate/test_calibrate_ek80.py b/echopype/tests/calibrate/test_calibrate_ek80.py index 36fe4ca3a..53722a2b2 100644 --- a/echopype/tests/calibrate/test_calibrate_ek80.py +++ b/echopype/tests/calibrate/test_calibrate_ek80.py @@ -1,6 +1,7 @@ import pytest import numpy as np import pandas as pd +import pickle import xarray as xr import echopype as ep @@ -8,35 +9,42 @@ @pytest.fixture def ek80_path(test_path): - return test_path['EK80'] + return test_path["EK80"] @pytest.fixture def ek80_cal_path(test_path): - return test_path['EK80_CAL'] + return test_path["EK80_CAL"] @pytest.fixture def ek80_ext_path(test_path): - return test_path['EK80_EXT'] + return test_path["EK80_EXT"] def test_ek80_transmit_chirp(ek80_cal_path, ek80_ext_path): """ Test transmit chirp reconstruction against Andersen et al. 2021/pyEcholab implementation """ - ek80_raw_path = ek80_cal_path / "2018115-D20181213-T094600.raw" # rx impedance / rx fs / tcvr type + ek80_raw_path = ( + ek80_cal_path / "2018115-D20181213-T094600.raw" + ) # rx impedance / rx fs / tcvr type ed = ep.open_raw(ek80_raw_path, sonar_model="EK80") # Calibration object detail waveform_mode = "BB" encode_mode = "complex" cal_obj = ep.calibrate.calibrate_ek.CalibrateEK80( - echodata=ed, waveform_mode=waveform_mode, encode_mode=encode_mode, - env_params=None, cal_params=None + echodata=ed, + waveform_mode=waveform_mode, + encode_mode=encode_mode, + env_params=None, + cal_params=None, ) fs = cal_obj.cal_params["receiver_sampling_frequency"] - filter_coeff = ep.calibrate.ek80_complex.get_filter_coeff(ed["Vendor_specific"].sel(channel=cal_obj.chan_sel)) + filter_coeff = ep.calibrate.ek80_complex.get_filter_coeff( + ed["Vendor_specific"].sel(channel=cal_obj.chan_sel) + ) tx, tx_time = ep.calibrate.ek80_complex.get_transmit_signal( ed["Sonar/Beam_group1"].sel(channel=cal_obj.chan_sel), filter_coeff, waveform_mode, fs ) @@ -49,8 +57,7 @@ def test_ek80_transmit_chirp(ek80_cal_path, ek80_ext_path): ) # Load pyEcholab object: channel WBT 714590-15 ES70-7C - import pickle - with open(ek80_ext_path / "pyecholab/pyel_BB_calibration.pickle", 'rb') as handle: + with open(ek80_ext_path / "pyecholab/pyel_BB_calibration.pickle", "rb") as handle: pyecholab_BB = pickle.load(handle) # Compare first ping since all params identical @@ -59,10 +66,14 @@ def test_ek80_transmit_chirp(ek80_cal_path, ek80_ext_path): assert pyecholab_BB["rx_sample_frequency"][0] == fs.sel(channel=ch_sel) # WBT filter assert np.all(pyecholab_BB["filters"][1]["coefficients"] == filter_coeff[ch_sel]["wbt_fil"]) - assert np.all(pyecholab_BB["filters"][1]["decimation_factor"] == filter_coeff[ch_sel]["wbt_decifac"]) + assert np.all( + pyecholab_BB["filters"][1]["decimation_factor"] == filter_coeff[ch_sel]["wbt_decifac"] + ) # PC filter assert np.all(pyecholab_BB["filters"][2]["coefficients"] == filter_coeff[ch_sel]["pc_fil"]) - assert np.all(pyecholab_BB["filters"][2]["decimation_factor"] == filter_coeff[ch_sel]["pc_decifac"]) + assert np.all( + pyecholab_BB["filters"][2]["decimation_factor"] == filter_coeff[ch_sel]["pc_decifac"] + ) # transmit signal assert np.allclose(pyecholab_BB["_tx_signal"][0], tx[ch_sel]) # tau effective @@ -75,7 +86,9 @@ def test_ek80_BB_params(ek80_cal_path, ek80_ext_path): """ Test power from pulse compressed BB data """ - ek80_raw_path = ek80_cal_path / "2018115-D20181213-T094600.raw" # rx impedance / rx fs / tcvr type + ek80_raw_path = ( + ek80_cal_path / "2018115-D20181213-T094600.raw" + ) # rx impedance / rx fs / tcvr type ed = ep.open_raw(ek80_raw_path, sonar_model="EK80") # Calibration object detail @@ -83,8 +96,11 @@ def test_ek80_BB_params(ek80_cal_path, ek80_ext_path): encode_mode = "complex" cal_obj = ep.calibrate.calibrate_ek.CalibrateEK80( - echodata=ed, waveform_mode=waveform_mode, encode_mode=encode_mode, - env_params={"formula_absorption": "FG"}, cal_params=None + echodata=ed, + waveform_mode=waveform_mode, + encode_mode=encode_mode, + env_params={"formula_absorption": "FG"}, + cal_params=None, ) z_er = cal_obj.cal_params["impedance_transceiver"] @@ -100,10 +116,9 @@ def test_ek80_BB_params(ek80_cal_path, ek80_ext_path): } # Load pyEcholab object: channel WBT 714590-15 ES70-7C - import pickle - with open(ek80_ext_path / "pyecholab/pyel_BB_calibration.pickle", 'rb') as handle: + with open(ek80_ext_path / "pyecholab/pyel_BB_calibration.pickle", "rb") as handle: pyel_BB_cal = pickle.load(handle) - with open(ek80_ext_path / "pyecholab/pyel_BB_raw_data.pickle", 'rb') as handle: + with open(ek80_ext_path / "pyecholab/pyel_BB_raw_data.pickle", "rb") as handle: pyel_BB_raw = pickle.load(handle) ch_sel = "WBT 714590-15 ES70-7C" @@ -112,37 +127,49 @@ def test_ek80_BB_params(ek80_cal_path, ek80_ext_path): # TODO: need to check B_theta_phi_m values assert pyel_BB_cal["impedance"] == z_er.sel(channel=ch_sel) for p_ep, p_pyel in params_BB_map.items(): # all interpolated BB params - assert np.isclose(pyel_BB_cal[p_pyel][0], cal_obj.cal_params[p_ep].sel(channel=ch_sel).isel(ping_time=0)) - assert pyel_BB_cal["sa_correction"][0] == cal_obj.cal_params["sa_correction"].sel(channel=ch_sel).isel(ping_time=0) + assert np.isclose( + pyel_BB_cal[p_pyel][0], cal_obj.cal_params[p_ep].sel(channel=ch_sel).isel(ping_time=0) + ) + assert pyel_BB_cal["sa_correction"][0] == cal_obj.cal_params["sa_correction"].sel( + channel=ch_sel + ).isel(ping_time=0) assert pyel_BB_cal["sound_speed"] == cal_obj.env_params["sound_speed"] assert np.isclose( pyel_BB_cal["absorption_coefficient"][0], - cal_obj.env_params["sound_absorption"].sel(channel=ch_sel).isel(ping_time=0) + cal_obj.env_params["sound_absorption"].sel(channel=ch_sel).isel(ping_time=0), ) # pyecholab raw_data object assert pyel_BB_raw["ZTRANSDUCER"] == z_et.sel(channel=ch_sel).isel(ping_time=0) - assert pyel_BB_raw["transmit_power"][0] == ed["Sonar/Beam_group1"]["transmit_power"].sel(channel=ch_sel).isel(ping_time=0) - assert pyel_BB_raw["transceiver_type"] == ed["Vendor_specific"]["transceiver_type"].sel(channel=ch_sel) + assert pyel_BB_raw["transmit_power"][0] == ed["Sonar/Beam_group1"]["transmit_power"].sel( + channel=ch_sel + ).isel(ping_time=0) + assert pyel_BB_raw["transceiver_type"] == ed["Vendor_specific"]["transceiver_type"].sel( + channel=ch_sel + ) def test_ek80_BB_range(ek80_cal_path, ek80_ext_path): - ek80_raw_path = ek80_cal_path / "2018115-D20181213-T094600.raw" # rx impedance / rx fs / tcvr type + ek80_raw_path = ( + ek80_cal_path / "2018115-D20181213-T094600.raw" + ) # rx impedance / rx fs / tcvr type ed = ep.open_raw(ek80_raw_path, sonar_model="EK80") # Calibration object waveform_mode = "BB" encode_mode = "complex" cal_obj = ep.calibrate.calibrate_ek.CalibrateEK80( - echodata=ed, waveform_mode=waveform_mode, encode_mode=encode_mode, - env_params={"formula_absorption": "FG"}, cal_params=None + echodata=ed, + waveform_mode=waveform_mode, + encode_mode=encode_mode, + env_params={"formula_absorption": "FG"}, + cal_params=None, ) ch_sel = "WBT 714590-15 ES70-7C" # Load pyecholab pickle - import pickle - with open(ek80_ext_path / "pyecholab/pyel_BB_p_data.pickle", 'rb') as handle: + with open(ek80_ext_path / "pyecholab/pyel_BB_p_data.pickle", "rb") as handle: pyel_BB_p_data = pickle.load(handle) # Assert @@ -151,16 +178,50 @@ def test_ek80_BB_range(ek80_cal_path, ek80_ext_path): assert np.allclose(pyel_vals, ep_vals) -def test_ek80_BB_power_Sv(ek80_cal_path, ek80_ext_path): - ek80_raw_path = ek80_cal_path / "2018115-D20181213-T094600.raw" # rx impedance / rx fs / tcvr type - ed = ep.open_raw(ek80_raw_path, sonar_model="EK80") +@pytest.mark.parametrize( + ("raw_data_path,raw_file_name,pyecholab_data_path,pyecholab_file_path, dask_array"), + [ + ( + "ek80_cal_path", + "2018115-D20181213-T094600.raw", + "ek80_ext_path", + "pyecholab/pyel_BB_p_data.pickle", + False, + ), + ( + "ek80_cal_path", + "2018115-D20181213-T094600.raw", + "ek80_ext_path", + "pyecholab/pyel_BB_p_data.pickle", + True, + ), + ], +) +def test_ek80_BB_power_from_complex( + raw_data_path, + raw_file_name, + pyecholab_data_path, + pyecholab_file_path, + dask_array, + request, +): + raw_data_path = request.getfixturevalue(raw_data_path) + ek80_raw_path = raw_data_path / raw_file_name # rx impedance / rx fs / tcvr type + + if dask_array: + ed = ep.open_raw(ek80_raw_path, sonar_model="EK80", use_swap=True) + else: + ed = ep.open_raw(ek80_raw_path, sonar_model="EK80") # Calibration object waveform_mode = "BB" encode_mode = "complex" cal_obj = ep.calibrate.calibrate_ek.CalibrateEK80( - echodata=ed, waveform_mode=waveform_mode, encode_mode=encode_mode, - env_params={"formula_absorption": "FG"}, cal_params=None + echodata=ed, + waveform_mode=waveform_mode, + encode_mode=encode_mode, + env_params={"formula_absorption": "FG"}, + cal_params=None, ) # Params needed @@ -168,8 +229,10 @@ def test_ek80_BB_power_Sv(ek80_cal_path, ek80_ext_path): z_er = cal_obj.cal_params["impedance_transceiver"] z_et = cal_obj.cal_params["impedance_transducer"] fs = cal_obj.cal_params["receiver_sampling_frequency"] - filter_coeff = ep.calibrate.ek80_complex.get_filter_coeff(ed["Vendor_specific"].sel(channel=cal_obj.chan_sel)) - tx, tx_time = ep.calibrate.ek80_complex.get_transmit_signal(beam, filter_coeff, waveform_mode, fs) + filter_coeff = ep.calibrate.ek80_complex.get_filter_coeff( + ed["Vendor_specific"].sel(channel=cal_obj.chan_sel) + ) + tx, _ = ep.calibrate.ek80_complex.get_transmit_signal(beam, filter_coeff, waveform_mode, fs) # Get power from complex samples prx = cal_obj._get_power_from_complex(beam=beam, chirp=tx, z_et=z_et, z_er=z_er) @@ -177,8 +240,8 @@ def test_ek80_BB_power_Sv(ek80_cal_path, ek80_ext_path): ch_sel = "WBT 714590-15 ES70-7C" # Load pyecholab pickle - import pickle - with open(ek80_ext_path / "pyecholab/pyel_BB_p_data.pickle", 'rb') as handle: + pyecholab_data_path = request.getfixturevalue(pyecholab_data_path) + with open(pyecholab_data_path / pyecholab_file_path, "rb") as handle: pyel_BB_p_data = pickle.load(handle) # Power: only compare non-Nan, non-Inf values @@ -190,13 +253,66 @@ def test_ek80_BB_power_Sv(ek80_cal_path, ek80_ext_path): ) assert np.allclose(pyel_vals[idx_to_cmp], ep_vals[idx_to_cmp]) + +@pytest.mark.parametrize( + ("raw_data_path,raw_file_name,pyecholab_data_path,pyecholab_file_path, dask_array"), + [ + ( + "ek80_cal_path", + "2018115-D20181213-T094600.raw", + "ek80_ext_path", + "pyecholab/pyel_BB_p_data.pickle", + False, + ), + ( + "ek80_cal_path", + "2018115-D20181213-T094600.raw", + "ek80_ext_path", + "pyecholab/pyel_BB_p_data.pickle", + True, + ), + ], +) +def test_ek80_BB_power_compute_Sv( + raw_data_path, + raw_file_name, + pyecholab_data_path, + pyecholab_file_path, + dask_array, + request, +): + raw_data_path = request.getfixturevalue(raw_data_path) + ek80_raw_path = raw_data_path / raw_file_name # rx impedance / rx fs / tcvr type + + if dask_array: + ed = ep.open_raw(ek80_raw_path, sonar_model="EK80", use_swap=True) + else: + ed = ep.open_raw(ek80_raw_path, sonar_model="EK80") + + # Calibration object + waveform_mode = "BB" + encode_mode = "complex" + + ch_sel = "WBT 714590-15 ES70-7C" + + # Load pyecholab pickle + pyecholab_data_path = request.getfixturevalue(pyecholab_data_path) + with open(pyecholab_data_path / pyecholab_file_path, "rb") as handle: + pyel_BB_p_data = pickle.load(handle) + # Sv: only compare non-Nan, non-Inf values # comparing for only the last values now until fixing the range computation ds_Sv = ep.calibrate.compute_Sv( - ed, waveform_mode="BB", encode_mode="complex" + ed, + waveform_mode=waveform_mode, + encode_mode=encode_mode, ) pyel_vals = pyel_BB_p_data["sv_data"] - ep_vals = ds_Sv["Sv"].sel(channel=ch_sel).squeeze().data + if dask_array: + ep_vals = ds_Sv["Sv"].sel(channel=ch_sel).squeeze().data.compute() + else: + ep_vals = ds_Sv["Sv"].sel(channel=ch_sel).squeeze().data + assert pyel_vals.shape == ep_vals.shape idx_to_cmp = ~( np.isinf(pyel_vals) | np.isnan(pyel_vals) | np.isinf(ep_vals) | np.isnan(ep_vals) @@ -209,39 +325,44 @@ def test_ek80_BB_power_echoview(ek80_path): Unresolved: the difference is large and it is not clear why. """ - ek80_raw_path = str(ek80_path.joinpath('D20170912-T234910.raw')) + ek80_raw_path = str(ek80_path.joinpath("D20170912-T234910.raw")) ek80_bb_pc_test_path = str( - ek80_path.joinpath( - 'from_echoview', '70 kHz pulse-compressed power.complex.csv' - ) + ek80_path.joinpath("from_echoview", "70 kHz pulse-compressed power.complex.csv") ) - echodata = ep.open_raw(ek80_raw_path, sonar_model='EK80') + echodata = ep.open_raw(ek80_raw_path, sonar_model="EK80") # Create a CalibrateEK80 object to perform pulse compression cal_obj = ep.calibrate.calibrate_ek.CalibrateEK80( - echodata, env_params=None, cal_params=None, waveform_mode="BB", encode_mode="complex" + echodata, + env_params=None, + cal_params=None, + waveform_mode="BB", + encode_mode="complex", ) beam = echodata["Sonar/Beam_group1"].sel(channel=cal_obj.chan_sel) - coeff = ep.calibrate.ek80_complex.get_filter_coeff(echodata["Vendor_specific"].sel(channel=cal_obj.chan_sel)) - chirp, _ = ep.calibrate.ek80_complex.get_transmit_signal(beam, coeff, "BB", cal_obj.cal_params["receiver_sampling_frequency"]) + coeff = ep.calibrate.ek80_complex.get_filter_coeff( + echodata["Vendor_specific"].sel(channel=cal_obj.chan_sel) + ) + chirp, _ = ep.calibrate.ek80_complex.get_transmit_signal( + beam, + coeff, + "BB", + cal_obj.cal_params["receiver_sampling_frequency"], + ) pc = ep.calibrate.ek80_complex.compress_pulse( - backscatter=beam["backscatter_r"] + 1j * beam["backscatter_i"], chirp=chirp) + backscatter=beam["backscatter_r"] + 1j * beam["backscatter_i"], + chirp=chirp, + ) pc = pc / ep.calibrate.ek80_complex.get_norm_fac(chirp) # normalization for each channel pc_mean = pc.sel(channel="WBT 549762-15 ES70-7C").mean(dim="beam").dropna("range_sample") # Read EchoView pc raw power output df = pd.read_csv(ek80_bb_pc_test_path, header=None, skiprows=[0]) - df_header = pd.read_csv( - ek80_bb_pc_test_path, header=0, usecols=range(14), nrows=0 - ) - df = df.rename( - columns={ - cc: vv for cc, vv in zip(df.columns, df_header.columns.values) - } - ) + df_header = pd.read_csv(ek80_bb_pc_test_path, header=0, usecols=range(14), nrows=0) + df = df.rename(columns={cc: vv for cc, vv in zip(df.columns, df_header.columns.values)}) df.columns = df.columns.str.strip() df_real = df.loc[df["Component"] == " Real", :].iloc[:, 14:] # values start at column 15 @@ -253,7 +374,7 @@ def test_ek80_BB_power_echoview(ek80_path): assert np.allclose(ev_vals[:, 69:8284], ep_vals[:, 69:], atol=1e-4) assert np.allclose(ev_vals[:, 90:8284], ep_vals[:, 90:], atol=1e-5) - + def test_ek80_CW_complex_Sv_receiver_sampling_freq(ek80_path): ek80_raw_path = str(ek80_path.joinpath("D20230804-T083032.raw")) ed = ep.open_raw(ek80_raw_path, sonar_model="EK80") From 064f733f5fd1d4cf7d2ef56a783caa93fddb8c95 Mon Sep 17 00:00:00 2001 From: Anant Mittal Date: Sun, 25 Feb 2024 12:46:15 -0800 Subject: [PATCH 3/5] Optimize `harmonize_env_param_time` (#1235) * Remove extra elses * Ensure harmonize_env_param_time works with dask arrays * Remove unused import --- echopype/calibrate/env_params.py | 29 +-- echopype/tests/calibrate/test_env_params.py | 230 +++++++++++++++----- 2 files changed, 187 insertions(+), 72 deletions(-) diff --git a/echopype/calibrate/env_params.py b/echopype/calibrate/env_params.py index dc7c365d1..49be4d12c 100644 --- a/echopype/calibrate/env_params.py +++ b/echopype/calibrate/env_params.py @@ -49,23 +49,24 @@ def harmonize_env_param_time( if isinstance(p, xr.DataArray): if "time1" not in p.coords: return p - else: - # If there's only 1 time1 value, - # or if after dropping NaN there's only 1 time1 value - if p["time1"].size == 1 or p.dropna(dim="time1").size == 1: - return p.dropna(dim="time1").squeeze(dim="time1").drop("time1") - # Direct assignment if all timestamps are identical (EK60 data) - elif np.all(p["time1"].values == ping_time.values): - return p.rename({"time1": "ping_time"}) + # If there's only 1 time1 value, + # or if after dropping NaN there's only 1 time1 value + if p["time1"].size == 1 or p.dropna(dim="time1").size == 1: + return p.dropna(dim="time1").squeeze(dim="time1").drop("time1") - elif ping_time is None: - raise ValueError(f"ping_time needs to be provided for interpolating {p.name}") + if ping_time is None: + raise ValueError( + f"ping_time needs to be provided for comparison or interpolating {p.name}" + ) - else: - return p.dropna(dim="time1").interp(time1=ping_time) - else: - return p + # Direct assignment if all timestamps are identical (EK60 data) + if np.array_equal(p["time1"].data, ping_time.data): + return p.rename({"time1": "ping_time"}) + + # Interpolate `p` to `ping_time` + return p.dropna(dim="time1").interp(time1=ping_time) + return p def sanitize_user_env_dict( diff --git a/echopype/tests/calibrate/test_env_params.py b/echopype/tests/calibrate/test_env_params.py index 4f33129ef..205376032 100644 --- a/echopype/tests/calibrate/test_env_params.py +++ b/echopype/tests/calibrate/test_env_params.py @@ -1,5 +1,6 @@ import pytest +import dask.array import numpy as np import xarray as xr @@ -15,17 +16,17 @@ @pytest.fixture def azfp_path(test_path): - return test_path['AZFP'] + return test_path["AZFP"] @pytest.fixture def ek60_path(test_path): - return test_path['EK60'] + return test_path["EK60"] @pytest.fixture def ek80_cal_path(test_path): - return test_path['EK80_CAL'] + return test_path["EK80_CAL"] def test_harmonize_env_param_time(): @@ -35,39 +36,102 @@ def test_harmonize_env_param_time(): # time1 length=1, should return length=1 numpy array p = xr.DataArray( - data=[1], - coords={ - "time1": np.array(["2017-06-20T01:00:00"], dtype="datetime64[ns]") - }, - dims=["time1"] + data=[2], + coords={"time1": np.array(["2017-06-20T01:00:00"], dtype="datetime64[ns]")}, + dims=["time1"], ) - assert harmonize_env_param_time(p=p) == 1 + assert harmonize_env_param_time(p=p) == 2 - # time1 length>1, interpolate to tareget ping_time + # time1 length>1 p = xr.DataArray( - data=np.array([0, 1]), + data=np.array([0, 1, 2]), coords={ - "time1": np.arange("2017-06-20T01:00:00", "2017-06-20T01:00:31", np.timedelta64(30, "s"), dtype="datetime64[ns]") + "time1": np.arange( + "2017-06-20T01:00:00", + "2017-06-20T01:01:30", + np.timedelta64(30, "s"), + dtype="datetime64[ns]", + ) }, - dims=["time1"] + dims=["time1"], ) + # Ensure that ping_time cannot be None when time1 length > 1 + with pytest.raises(ValueError): + harmonize_env_param_time(p=p, ping_time=None) + # ping_time target is identical to time1 ping_time_target = p["time1"].rename({"time1": "ping_time"}) p_new = harmonize_env_param_time(p=p, ping_time=ping_time_target) assert (p_new["ping_time"] == ping_time_target).all() assert (p_new.data == p.data).all() + # ping_time target requires actual interpolation + ping_time_target = xr.DataArray( + data=[1, 2], + coords={ + "ping_time": np.array( + ["2017-06-20T01:00:15", "2017-06-20T01:00:30"], dtype="datetime64[ns]" + ) + }, + dims=["ping_time"], + ) + ping_time_target = xr.DataArray( data=[1], coords={ - "ping_time": np.array(["2017-06-20T01:00:15"], dtype="datetime64[ns]") + "ping_time": np.array( + [ + "2017-06-20T01:00:15", + ], + dtype="datetime64[ns]", + ) }, - dims=["ping_time"] + dims=["ping_time"], ) + p_new = harmonize_env_param_time(p=p, ping_time=ping_time_target["ping_time"]) - assert p_new["ping_time"] == ping_time_target["ping_time"] + assert (p_new["ping_time"] == ping_time_target["ping_time"]).all() assert p_new.data == 0.5 + # create a larger p with dask arrays under the hood + time1 = np.arange( + "2017-06-20T01:00:00", + "2017-06-22T01:00:31", + np.timedelta64(30, "s"), + dtype="datetime64[ns]", + ) + p = xr.DataArray( + data=np.arange(len(time1)), + coords={"time1": time1}, + dims=["time1"], + ).chunk({"time1": 1000}) + + # ping_time target is identical to time1 + ping_time_target = p["time1"].rename({"time1": "ping_time"}) + p_new = harmonize_env_param_time(p=p, ping_time=ping_time_target) + assert (p_new["ping_time"] == ping_time_target).all() + assert isinstance(p_new.data, dask.array.Array) + # np.array_equal() computes dask array under the hood + assert np.array_equal(p_new.data, p.data) + + # ping_time target requires actual interpolation + ping_time_target = xr.DataArray( + data=[1, 2], + coords={ + "ping_time": np.array( + ["2017-06-20T01:00:15", "2017-06-21T01:00:15"], dtype="datetime64[ns]" + ) + }, + dims=["ping_time"], + ) + + p_new = harmonize_env_param_time(p=p, ping_time=ping_time_target["ping_time"]) + assert np.array_equal(p_new["ping_time"], ping_time_target["ping_time"]) + + assert isinstance(p_new.data, dask.array.Array) + # .all computes dask array under the hood + assert (p_new.data == [0.5, 2880.5]).all() + @pytest.mark.parametrize( ("user_dict", "channel", "out_dict"), @@ -77,39 +141,59 @@ def test_harmonize_env_param_time(): ( {"temperature": 10, "salinity": 20}, ["chA", "chB"], - dict( - dict.fromkeys(ENV_PARAMS), **{"temperature": 10, "salinity": 20} - ) + dict(dict.fromkeys(ENV_PARAMS), **{"temperature": 10, "salinity": 20}), ), # dict has xr.DataArray, channel a list with matching values with those in dict ( - {"temperature": 10, "sound_absorption": xr.DataArray([10, 20], coords={"channel": ["chA", "chB"]})}, + { + "temperature": 10, + "sound_absorption": xr.DataArray([10, 20], coords={"channel": ["chA", "chB"]}), + }, ["chA", "chB"], dict( dict.fromkeys(ENV_PARAMS), - **{"temperature": 10, "sound_absorption": xr.DataArray([10, 20], coords={"channel": ["chA", "chB"]})} - ) + **{ + "temperature": 10, + "sound_absorption": xr.DataArray([10, 20], coords={"channel": ["chA", "chB"]}), + } + ), ), # dict has xr.DataArray, channel a list with non-matching values with those in dict: XFAIL pytest.param( - {"temperature": 10, "sound_absorption": xr.DataArray([10, 20], coords={"channel": ["chA", "chB"]})}, - ["chA", "chC"], None, - marks=pytest.mark.xfail(strict=True, reason="channel coordinate in param xr.DataArray mismatches that in the channel list"), + { + "temperature": 10, + "sound_absorption": xr.DataArray([10, 20], coords={"channel": ["chA", "chB"]}), + }, + ["chA", "chC"], + None, + marks=pytest.mark.xfail( + strict=True, + reason="channel coordinate in param xr.DataArray mismatches that in the channel list", + ), ), # dict has xr.DataArray, channel a xr.DataArray ( - {"temperature": 10, "sound_absorption": xr.DataArray([10, 20], coords={"channel": ["chA", "chB"]})}, + { + "temperature": 10, + "sound_absorption": xr.DataArray([10, 20], coords={"channel": ["chA", "chB"]}), + }, xr.DataArray(["chA", "chB"], coords={"channel": ["chA", "chB"]}), dict( dict.fromkeys(ENV_PARAMS), - **{"temperature": 10, "sound_absorption": xr.DataArray([10, 20], coords={"channel": ["chA", "chB"]})} - ) + **{ + "temperature": 10, + "sound_absorption": xr.DataArray([10, 20], coords={"channel": ["chA", "chB"]}), + } + ), ), # dict has sound_absorption as a scalar: XFAIL pytest.param( {"temperature": 10, "sound_absorption": 0.02}, - ["chA", "chB"], None, - marks=pytest.mark.xfail(strict=True, reason="sound_absorption should be a list or an xr.DataArray"), + ["chA", "chB"], + None, + marks=pytest.mark.xfail( + strict=True, reason="sound_absorption should be a list or an xr.DataArray" + ), ), ], ids=[ @@ -117,8 +201,8 @@ def test_harmonize_env_param_time(): "in_da_channel_list_out_da", "in_da_channel_list_mismatch", "in_da_channel_da", - "in_absorption_scalae" - ] + "in_absorption_scalae", + ], ) def test_sanitize_user_env_dict(user_dict, channel, out_dict): """ @@ -139,25 +223,27 @@ def test_sanitize_user_env_dict(user_dict, channel, out_dict): # pH should not exist in the output Sv dataset, formula sources should both be AZFP ( {"temperature": 10, "salinity": 20, "pressure": 100, "pH": 8.1}, - dict( - dict.fromkeys(ENV_PARAMS), **{"temperature": 10, "salinity": 20, "pressure": 100} - ) + dict(dict.fromkeys(ENV_PARAMS), **{"temperature": 10, "salinity": 20, "pressure": 100}), ), # not including salinity or pressure: XFAIL pytest.param( - {"temperature": 10, "pressure": 100, "pH": 8.1}, None, - marks=pytest.mark.xfail(strict=True, reason="Fail since cal_channel_id in input param does not match channel of data"), + {"temperature": 10, "pressure": 100, "pH": 8.1}, + None, + marks=pytest.mark.xfail( + strict=True, + reason="Fail since cal_channel_id in input param does not match channel of data", + ), ), ], ids=[ "default", "no_salinity", - ] + ], ) def test_get_env_params_AZFP(azfp_path, env_ext, out_dict): - azfp_01a_path = str(azfp_path.joinpath('17082117.01A')) - azfp_xml_path = str(azfp_path.joinpath('17041823.XML')) - ed = ep.open_raw(azfp_01a_path, sonar_model='AZFP', xml_path=azfp_xml_path) + azfp_01a_path = str(azfp_path.joinpath("17082117.01A")) + azfp_xml_path = str(azfp_path.joinpath("17041823.XML")) + ed = ep.open_raw(azfp_01a_path, sonar_model="AZFP", xml_path=azfp_xml_path) env_dict = get_env_params_AZFP(echodata=ed, user_dict=env_ext) @@ -168,7 +254,7 @@ def test_get_env_params_AZFP(azfp_path, env_ext, out_dict): temperature=env_dict["temperature"], salinity=env_dict["salinity"], pressure=env_dict["pressure"], - formula_source="AZFP" + formula_source="AZFP", ), "sound_absorption": ep.utils.uwa.calc_absorption( frequency=ed["Sonar/Beam_group1"]["frequency_nominal"], @@ -199,21 +285,33 @@ def test_get_env_params_AZFP(azfp_path, env_ext, out_dict): # T, S, P, pH all exist so will trigger calculation, check default formula sources ( {"temperature": 10, "salinity": 30, "pressure": 100, "pH": 8.1}, - "Mackenzie", "FG", + "Mackenzie", + "FG", ), # T, S, P, pH all exist, will calculate; has absorption formula passed in, check using the correct formula ( - {"temperature": 10, "salinity": 30, "pressure": 100, "pH": 8.1, "formula_absorption": "AM"}, - "Mackenzie", "AM", + { + "temperature": 10, + "salinity": 30, + "pressure": 100, + "pH": 8.1, + "formula_absorption": "AM", + }, + "Mackenzie", + "AM", ), ], ids=[ "calc_no_formula", "calc_with_formula", - ] + ], ) -def test_get_env_params_EK60_calculate(ek60_path, env_ext, ref_formula_sound_speed, ref_formula_absorption): - ed = ep.open_raw(ek60_path / "ncei-wcsd" / "Summer2017-D20170620-T011027.raw", sonar_model="EK60") +def test_get_env_params_EK60_calculate( + ek60_path, env_ext, ref_formula_sound_speed, ref_formula_absorption +): + ed = ep.open_raw( + ek60_path / "ncei-wcsd" / "Summer2017-D20170620-T011027.raw", sonar_model="EK60" + ) env_dict = get_env_params_EK( sonar_type="EK60", @@ -257,7 +355,9 @@ def test_get_env_params_EK60_from_data(ek60_path): """ If one of T, S, P, pH does not exist, use values from data file """ - ed = ep.open_raw(ek60_path / "ncei-wcsd" / "Summer2017-D20170620-T011027.raw", sonar_model="EK60") + ed = ep.open_raw( + ek60_path / "ncei-wcsd" / "Summer2017-D20170620-T011027.raw", sonar_model="EK60" + ) env_dict = get_env_params_EK( sonar_type="EK60", @@ -288,20 +388,30 @@ def test_get_env_params_EK60_from_data(ek60_path): # T, S, P, pH all exist, check default formula sources ( {"temperature": 10, "salinity": 30, "pressure": 100, "pH": 8.1}, - "Mackenzie", "FG", + "Mackenzie", + "FG", ), # T, S, P, pH all exist; has absorption formula passed in, check using the correct formula ( - {"temperature": 10, "salinity": 30, "pressure": 100, "pH": 8.1, "formula_absorption": "AM"}, - "Mackenzie", "AM", + { + "temperature": 10, + "salinity": 30, + "pressure": 100, + "pH": 8.1, + "formula_absorption": "AM", + }, + "Mackenzie", + "AM", ), ], ids=[ "calc_no_formula", "calc_with_formula", - ] + ], ) -def test_get_env_params_EK80_calculate(ek80_cal_path, env_ext, ref_formula_sound_speed, ref_formula_absorption): +def test_get_env_params_EK80_calculate( + ek80_cal_path, env_ext, ref_formula_sound_speed, ref_formula_absorption +): ed = ep.open_raw(ek80_cal_path / "2018115-D20181213-T094600.raw", sonar_model="EK80") env_dict = get_env_params_EK( @@ -349,21 +459,25 @@ def test_get_env_params_EK80_calculate(ek80_cal_path, env_ext, ref_formula_sound # check default formula sources ( {"temperature": 10}, - "Mackenzie", "FG", + "Mackenzie", + "FG", ), # Only T exists, so use S, P, pH from data; # has absorption formula passed in, check using the correct formula ( {"temperature": 10, "formula_absorption": "AM"}, - "Mackenzie", "AM", + "Mackenzie", + "AM", ), ], ids=[ "calc_no_formula", "calc_with_formula", - ] + ], ) -def test_get_env_params_EK80_from_data(ek80_cal_path, env_ext, ref_formula_sound_speed, ref_formula_absorption): +def test_get_env_params_EK80_from_data( + ek80_cal_path, env_ext, ref_formula_sound_speed, ref_formula_absorption +): ed = ep.open_raw(ek80_cal_path / "2018115-D20181213-T094600.raw", sonar_model="EK80") env_dict = get_env_params_EK( From 6dc196a9267eb6a56b7ebbd5e7556adb977f1860 Mon Sep 17 00:00:00 2001 From: Anant Mittal Date: Sun, 25 Feb 2024 13:13:56 -0800 Subject: [PATCH 4/5] Optimize Frequency Differencing with Dask (#1198) * Update frequency_differencing method: add dask optimizations if Sv data is dask array * Update test_frequency_differencing function and incorporate test cases for dask * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix ci error * Update echopype/mask/api.py Co-authored-by: Don Setiawan * Update echopype/mask/api.py: assign attrs out of if-else * Update echopype/mask/api.py: remove unused code * refactor: Update dask chunks iter to use xr.map_blocks * refactor: Cleanup code and make more explicit * Add better comments * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix pre-commit ci errors * refactor: Update algorithm to use map_blocks Update the looping algorithm to use dask array map_block instead. This would remove direct slicing ambiguity and prepare for future xarray map_block usage. * refactor: Modify to have no hardcoded channel slicing * refactor: Assign Sv data array --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Don Setiawan --- echopype/mask/api.py | 87 +++- echopype/tests/mask/test_mask.py | 836 ++++++++++++++++++++++++------- 2 files changed, 719 insertions(+), 204 deletions(-) diff --git a/echopype/mask/api.py b/echopype/mask/api.py index 0618c8836..71b618a1a 100644 --- a/echopype/mask/api.py +++ b/echopype/mask/api.py @@ -3,6 +3,8 @@ import pathlib from typing import List, Optional, Union +import dask +import dask.array import numpy as np import xarray as xr @@ -449,8 +451,8 @@ def frequency_differencing( >>> Sv_ds = xr.Dataset(data_vars={"Sv": Sv_da, "frequency_nominal": freq_nom}) ... >>> # compute frequency-differencing mask using channel names - >>> echopype.mask.frequency_differencing(source_Sv=mock_Sv_ds, storage_options={}, - ... freqABEq=None, chanABEq = '"chan1" - "chan2">=10.0') + >>> echopype.mask.frequency_differencing(source_Sv=Sv_ds, storage_options={}, + ... freqABEq=None, chanABEq = '"chan1" - "chan2">=10.0dB') array([[False, False, False, False, False], [False, False, False, False, False], @@ -491,23 +493,80 @@ def frequency_differencing( chanA = chanAB[0] chanB = chanAB[1] - # get the left-hand side of condition - lhs = source_Sv["Sv"].sel(channel=chanA) - source_Sv["Sv"].sel(channel=chanB) + def _get_lhs( + Sv_block: np.ndarray, chanA_idx: int, chanB_idx: int, chan_dim_idx: int = 0 + ) -> np.ndarray: + """Get left-hand side of condition""" - # create mask using operator lookup table - da = xr.where(str2ops[operator](lhs, diff), True, False) + def _sel_channel(chan_idx): + return tuple( + [chan_idx if i == chan_dim_idx else slice(None) for i in range(Sv_block.ndim)] + ) + + # get the left-hand side of condition (lhs) + return Sv_block[_sel_channel(chanA_idx)] - Sv_block[_sel_channel(chanB_idx)] + + def _create_mask(lhs: np.ndarray, diff: float) -> np.ndarray: + """Create mask using operator lookup table""" + return xr.where(str2ops[operator](lhs, diff), True, False) + + # Get the Sv data array + Sv_data_array = source_Sv["Sv"] + + # Determine channel index based on names + channels = list(source_Sv["channel"].to_numpy()) + chanA_idx = channels.index(chanA) + chanB_idx = channels.index(chanB) + # Get the channel dimension index for filtering + chan_dim_idx = Sv_data_array.dims.index("channel") + + # If Sv data is not dask array + if not isinstance(Sv_data_array.variable._data, dask.array.Array): + # get the left-hand side of condition + lhs = _get_lhs(Sv_data_array, chanA_idx, chanB_idx, chan_dim_idx=chan_dim_idx) + + # create mask using operator lookup table + da = _create_mask(lhs, diff) + # If Sv data is dask array + else: + # Get the final data array template + template = Sv_data_array.isel(channel=0).drop_vars("channel") + + dask_array_data = Sv_data_array.data + # Perform block wise computation + dask_array_result = ( + dask_array_data + # Compute the left-hand side of condition + # drop the first axis (channel) as it is dropped in the result + .map_blocks( + _get_lhs, + chanA_idx, + chanB_idx, + chan_dim_idx=chan_dim_idx, + dtype=dask_array_data.dtype, + drop_axis=0, + ) + # create mask using operator lookup table + .map_blocks(_create_mask, diff) + ) + + # Create DataArray of the result + da = xr.DataArray( + data=dask_array_result, + coords=template.coords, + ) + + xr_dataarray_attrs = { + "mask_type": "frequency differencing", + "history": f"{datetime.datetime.utcnow()} +00:00. " + "Mask created by mask.frequency_differencing. " + f"Operation: Sv['{chanA}'] - Sv['{chanB}'] {operator} {diff}", + } # assign a name to DataArray da.name = "mask" # assign provenance attributes - mask_attrs = {"mask_type": "frequency differencing"} - history_attr = ( - f"{datetime.datetime.utcnow()} +00:00. " - "Mask created by mask.frequency_differencing. " - f"Operation: Sv['{chanA}'] - Sv['{chanB}'] {operator} {diff}" - ) - - da = da.assign_attrs({**mask_attrs, **{"history": history_attr}}) + da = da.assign_attrs(xr_dataarray_attrs) return da diff --git a/echopype/tests/mask/test_mask.py b/echopype/tests/mask/test_mask.py index 7030f16db..d056cb3b3 100644 --- a/echopype/tests/mask/test_mask.py +++ b/echopype/tests/mask/test_mask.py @@ -10,10 +10,7 @@ import echopype as ep import echopype.mask -from echopype.mask.api import ( - _validate_and_collect_mask_input, - _check_var_name_fill_value -) +from echopype.mask.api import _validate_and_collect_mask_input, _check_var_name_fill_value from echopype.mask.freq_diff import ( _parse_freq_diff_eq, _check_freq_diff_source_Sv, @@ -22,8 +19,14 @@ from typing import List, Union, Optional -def get_mock_freq_diff_data(n: int, n_chan_freq: int, add_chan: bool, - add_freq_nom: bool) -> xr.Dataset: +def get_mock_freq_diff_data( + n: int, + n_chan_freq: int, + add_chan: bool, + add_freq_nom: bool, + dask_array: bool = False, + chunks: dict = None, +) -> xr.Dataset: """ Creates an in-memory mock Sv Dataset. @@ -59,14 +62,18 @@ def get_mock_freq_diff_data(n: int, n_chan_freq: int, add_chan: bool, if n_chan_freq < 3: raise RuntimeError("The input n_chan_freq must be greater than or equal to 3!") + if dask_array: + if chunks is None: + raise RuntimeError("The input chunks must be provided if dask_array is True.") + # matrix representing freqB - mat_B = np.arange(n ** 2).reshape(n, n) - np.identity(n) + mat_B = np.arange(n**2).reshape(n, n) - np.identity(n) # matrix representing freqA - mat_A = np.arange(n ** 2).reshape(n, n) + mat_A = np.arange(n**2).reshape(n, n) # construct channel values - chan_vals = ['chan' + str(i) for i in range(1, n_chan_freq + 1)] + chan_vals = ["chan" + str(i) for i in range(1, n_chan_freq + 1)] # construct mock Sv data mock_Sv_data = [mat_A, np.identity(n), mat_B] + [np.identity(n) for i in range(3, n_chan_freq)] @@ -78,12 +85,20 @@ def get_mock_freq_diff_data(n: int, n_chan_freq: int, add_chan: bool, channel_coord_name = "channel" # create mock Sv DataArray - mock_Sv_da = xr.DataArray(data=np.stack(mock_Sv_data), - coords={channel_coord_name: chan_vals, "ping_time": np.arange(n), - "range_sample": np.arange(n)}) + mock_Sv_da = xr.DataArray( + data=np.stack(mock_Sv_data), + coords={ + channel_coord_name: chan_vals, + "ping_time": np.arange(n), + "range_sample": np.arange(n), + }, + ) # create data variables for the Dataset - data_vars = {"Sv": mock_Sv_da} + if dask_array: + data_vars = {"Sv": mock_Sv_da.chunk(chunks=chunks)} + else: + data_vars = {"Sv": mock_Sv_da} if add_freq_nom: # construct frequency_values @@ -124,7 +139,7 @@ def get_mock_source_ds_apply_mask(n: int, n_chan: int, is_delayed: bool) -> xr.D """ # construct channel values - chan_vals = ['chan' + str(i) for i in range(1, n_chan + 1)] + chan_vals = ["chan" + str(i) for i in range(1, n_chan + 1)] # construct mock variable data for each channel if is_delayed: @@ -133,15 +148,24 @@ def get_mock_source_ds_apply_mask(n: int, n_chan: int, is_delayed: bool) -> xr.D mock_var_data = [np.ones((n, n)) for i in range(n_chan)] # create mock var1 and var2 DataArrays - mock_var1_da = xr.DataArray(data=np.stack(mock_var_data), - coords={"channel": ("channel", chan_vals, {"long_name": "channel name"}), - "ping_time": np.arange(n), "range_sample": np.arange(n)}, - attrs={"long_name": "variable 1"}) - mock_var2_da = xr.DataArray(data=np.stack(mock_var_data), - coords={"channel": ("channel", chan_vals, {"long_name": "channel name"}), - "ping_time": np.arange(n), - "range_sample": np.arange(n)}, - attrs={"long_name": "variable 2"}) + mock_var1_da = xr.DataArray( + data=np.stack(mock_var_data), + coords={ + "channel": ("channel", chan_vals, {"long_name": "channel name"}), + "ping_time": np.arange(n), + "range_sample": np.arange(n), + }, + attrs={"long_name": "variable 1"}, + ) + mock_var2_da = xr.DataArray( + data=np.stack(mock_var_data), + coords={ + "channel": ("channel", chan_vals, {"long_name": "channel name"}), + "ping_time": np.arange(n), + "range_sample": np.arange(n), + }, + attrs={"long_name": "variable 2"}, + ) # create mock Dataset mock_ds = xr.Dataset(data_vars={"var1": mock_var1_da, "var2": mock_var2_da}) @@ -174,7 +198,6 @@ def create_input_mask( # make input numpy array masks into DataArrays if isinstance(mask, list): - # initialize final mask mask_out = [] @@ -184,16 +207,15 @@ def create_input_mask( temp_dir = tempfile.TemporaryDirectory() for mask_ind in range(len(mask)): - # form DataArray from given mask data - mask_da = xr.DataArray(data=[mask[mask_ind]], coords=mask_coords, name='mask_' + str(mask_ind)) + mask_da = xr.DataArray( + data=[mask[mask_ind]], coords=mask_coords, name="mask_" + str(mask_ind) + ) if mask_file[mask_ind] is None: - # set mask value to the DataArray given mask_out.append(mask_da) else: - # write DataArray to temporary directory zarr_path = os.path.join(temp_dir.name, mask_file[mask_ind]) mask_da.to_dataset().to_zarr(zarr_path) @@ -206,16 +228,13 @@ def create_input_mask( mask_out.append(zarr_path) elif isinstance(mask, np.ndarray): - # form DataArray from given mask data - mask_da = xr.DataArray(data=[mask], coords=mask_coords, name='mask_0') + mask_da = xr.DataArray(data=[mask], coords=mask_coords, name="mask_0") if mask_file is None: - # set mask to the DataArray formed mask_out = mask_da else: - # create temporary directory for mask_file temp_dir = tempfile.TemporaryDirectory() @@ -237,31 +256,77 @@ def create_input_mask( ("n", "n_chan_freq", "add_chan", "add_freq_nom", "freqAB", "chanAB"), [ (5, 4, True, True, [1000.0, 2.0], None), - (5, 4, True, True, None, ['chan1', 'chan3']), - pytest.param(5, 4, False, True, [1.0, 2000000.0], None, - marks=pytest.mark.xfail(strict=True, - reason="This should fail because the Dataset " - "will not have the channel coordinate.")), - pytest.param(5, 4, True, False, [1.0, 2.0], None, - marks=pytest.mark.xfail(strict=True, - reason="This should fail because the Dataset " - "will not have the frequency_nominal variable.")), - pytest.param(5, 4, True, True, [1.0, 4.0], None, - marks=pytest.mark.xfail(strict=True, - reason="This should fail because not all selected frequencies" - "are in the frequency_nominal variable.")), - pytest.param(5, 4, True, True, None, ['chan1', 'chan9'], - marks=pytest.mark.xfail(strict=True, - reason="This should fail because not all selected channels" - "are in the channel coordinate.")) + (5, 4, True, True, None, ["chan1", "chan3"]), + pytest.param( + 5, + 4, + False, + True, + [1.0, 2000000.0], + None, + marks=pytest.mark.xfail( + strict=True, + reason="This should fail because the Dataset " + "will not have the channel coordinate.", + ), + ), + pytest.param( + 5, + 4, + True, + False, + [1.0, 2.0], + None, + marks=pytest.mark.xfail( + strict=True, + reason="This should fail because the Dataset " + "will not have the frequency_nominal variable.", + ), + ), + pytest.param( + 5, + 4, + True, + True, + [1.0, 4.0], + None, + marks=pytest.mark.xfail( + strict=True, + reason="This should fail because not all selected frequencies" + "are in the frequency_nominal variable.", + ), + ), + pytest.param( + 5, + 4, + True, + True, + None, + ["chan1", "chan9"], + marks=pytest.mark.xfail( + strict=True, + reason="This should fail because not all selected channels" + "are in the channel coordinate.", + ), + ), + ], + ids=[ + "dataset_input_freqAB_provided", + "dataset_input_chanAB_provided", + "dataset_no_channel", + "dataset_no_frequency_nominal", + "dataset_missing_freqAB_in_freq_nom", + "dataset_missing_chanAB_in_channel", ], - ids=["dataset_input_freqAB_provided", "dataset_input_chanAB_provided", "dataset_no_channel", - "dataset_no_frequency_nominal", "dataset_missing_freqAB_in_freq_nom", - "dataset_missing_chanAB_in_channel"] ) -def test_check_freq_diff_source_Sv(n: int, n_chan_freq: int, add_chan: bool, add_freq_nom: bool, - freqAB: List[float], - chanAB: List[str]): +def test_check_freq_diff_source_Sv( + n: int, + n_chan_freq: int, + add_chan: bool, + add_freq_nom: bool, + freqAB: List[float], + chanAB: List[str], +): """ Test the inputs ``source_Sv, freqAB, chanAB`` for ``_check_freq_diff_source_Sv``. @@ -301,35 +366,62 @@ def test_check_freq_diff_source_Sv(n: int, n_chan_freq: int, add_chan: bool, add (None, '"chan1"-"chan3"==1.0dB'), ("1.0 kHz - 2.0 MHz>=1.0 dB", None), (None, '"chan2-12 89" - "chan4 89-12" >= 1.0 dB'), - pytest.param("1.0kHz-2.0 kHz===1.0dB", None, - marks=pytest.mark.xfail(strict=True, - reason="This should fail because " - "the operator is incorrect.")), - pytest.param(None, '"chan1"-"chan3"===1.0 dB', - marks=pytest.mark.xfail(strict=True, - reason="This should fail because " - "the operator is incorrect.")), - pytest.param("1.0 MHz-1.0MHz==1.0dB", None, - marks=pytest.mark.xfail(strict=True, - reason="This should fail because the " - "frequencies are the same.")), - pytest.param(None, '"chan1"-"chan1"==1.0 dB', - marks=pytest.mark.xfail(strict=True, - reason="This should fail because the " - "channels are the same.")), - pytest.param("1.0 Hz-2.0==1.0dB", None, - marks=pytest.mark.xfail(strict=True, - reason="This should fail because unit of one of " - "the frequency is missing.")), - pytest.param(None, '"chan1"-"chan3"==1.0', - marks=pytest.mark.xfail(strict=True, - reason="This should fail because unit of the " - "difference is missing.")), + pytest.param( + "1.0kHz-2.0 kHz===1.0dB", + None, + marks=pytest.mark.xfail( + strict=True, reason="This should fail because " "the operator is incorrect." + ), + ), + pytest.param( + None, + '"chan1"-"chan3"===1.0 dB', + marks=pytest.mark.xfail( + strict=True, reason="This should fail because " "the operator is incorrect." + ), + ), + pytest.param( + "1.0 MHz-1.0MHz==1.0dB", + None, + marks=pytest.mark.xfail( + strict=True, reason="This should fail because the " "frequencies are the same." + ), + ), + pytest.param( + None, + '"chan1"-"chan1"==1.0 dB', + marks=pytest.mark.xfail( + strict=True, reason="This should fail because the " "channels are the same." + ), + ), + pytest.param( + "1.0 Hz-2.0==1.0dB", + None, + marks=pytest.mark.xfail( + strict=True, + reason="This should fail because unit of one of " "the frequency is missing.", + ), + ), + pytest.param( + None, + '"chan1"-"chan3"==1.0', + marks=pytest.mark.xfail( + strict=True, reason="This should fail because unit of the " "difference is missing." + ), + ), + ], + ids=[ + "input_freqABEq_provided", + "input_chanABEq_provided", + "input_freqABEq_different_units", + "input_chanABEq_provided", + "input_freqABEq_wrong_operator", + "input_chanABEq_wrong_operator", + "input_freqABEq_duplicate_frequencies", + "input_chanABEq_duplicate_channels", + "input_freqABEq_missing_unit", + "input_chanABEq_missing_unit", ], - ids=["input_freqABEq_provided", "input_chanABEq_provided", "input_freqABEq_different_units", - "input_chanABEq_provided", "input_freqABEq_wrong_operator", "input_chanABEq_wrong_operator", - "input_freqABEq_duplicate_frequencies", "input_chanABEq_duplicate_channels", - "input_freqABEq_missing_unit", "input_chanABEq_missing_unit"] ) def test_parse_freq_diff_eq(freqABEq: str, chanABEq: str): """ @@ -344,7 +436,7 @@ def test_parse_freq_diff_eq(freqABEq: str, chanABEq: str): in the criteria are enclosed in double quotes. """ freq_vals = [1.0, 2.0, 1e3, 2e3, 1e6, 2e6] - chan_vals = ['chan1', 'chan3', "chan2-12 89", "chan4 89-12"] + chan_vals = ["chan1", "chan3", "chan2-12 89", "chan4 89-12"] operator_vals = [">=", "=="] diff_val = 1.0 freqAB, chanAB, operator, diff = _parse_freq_diff_eq(freqABEq=freqABEq, chanABEq=chanABEq) @@ -357,30 +449,213 @@ def test_parse_freq_diff_eq(freqABEq: str, chanABEq: str): assert operator in operator_vals assert diff == diff_val + @pytest.mark.parametrize( - ("n", "n_chan_freq", "freqABEq", "chanABEq", "mask_truth"), + ( + "n", + "n_chan_freq", + "freqABEq", + "chanABEq", + "mask_truth", + "dask_array", + "chunks", + ), [ - (5, 4, "1.0Hz-2.0Hz== 1.0dB", None, np.identity(5)), - (5, 4, None, '"chan1"-"chan3" == 1.0 dB', np.identity(5)), - (5, 4, "2.0 Hz - 1.0 Hz==1.0 dB", None, np.zeros((5, 5))), - (5, 4, None, '"chan3" - "chan1"==1.0 dB', np.zeros((5, 5))), - (5, 4, "1.0 Hz-2.0Hz>=1.0dB", None, np.identity(5)), - (5, 4, None, '"chan1" - "chan3" >= 1.0 dB', np.identity(5)), - (5, 4, "1.0 kHz - 2.0 kHz > 1.0dB", None, np.zeros((5, 5))), - (5, 4, None, '"chan1"-"chan3">1.0 dB', np.zeros((5, 5))), - (5, 4, "1.0kHz-2.0 kHz<=1.0dB", None, np.ones((5, 5))), - (5, 4, None, '"chan1" - "chan3" <= 1.0 dB', np.ones((5, 5))), - (5, 4, "1.0 Hz-2.0Hz<1.0dB", None, np.ones((5, 5)) - np.identity(5)), - (5, 4, None, '"chan1"-"chan3"< 1.0 dB', np.ones((5, 5)) - np.identity(5)) + (5, 4, "1.0Hz-2.0Hz== 1.0dB", None, np.identity(5), None, None), + (5, 4, None, '"chan1"-"chan3" == 1.0 dB', np.identity(5), None, None), + (5, 4, "2.0 Hz - 1.0 Hz==1.0 dB", None, np.zeros((5, 5)), None, None), + (5, 4, None, '"chan3" - "chan1"==1.0 dB', np.zeros((5, 5)), None, None), + (5, 4, "1.0 Hz-2.0Hz>=1.0dB", None, np.identity(5), None, None), + (5, 4, None, '"chan1" - "chan3" >= 1.0 dB', np.identity(5), None, None), + (5, 4, "1.0 kHz - 2.0 kHz > 1.0dB", None, np.zeros((5, 5)), None, None), + (5, 4, None, '"chan1"-"chan3">1.0 dB', np.zeros((5, 5)), None, None), + (5, 4, "1.0kHz-2.0 kHz<=1.0dB", None, np.ones((5, 5)), None, None), + (5, 4, None, '"chan1" - "chan3" <= 1.0 dB', np.ones((5, 5)), None, None), + (5, 4, "1.0 Hz-2.0Hz<1.0dB", None, np.ones((5, 5)) - np.identity(5), None, None), + (5, 4, None, '"chan1"-"chan3"< 1.0 dB', np.ones((5, 5)) - np.identity(5), None, None), + # Dask Arrays + ( + 500, + 4, + "1.0Hz-2.0Hz== 1.0dB", + None, + np.identity(500), + True, + { + "ping_time": 10, + "range_sample": 10, + }, + ), + ( + 500, + 4, + None, + '"chan1"-"chan3" == 1.0 dB', + np.identity(500), + True, + { + "ping_time": 25, + "range_sample": 25, + }, + ), + ( + 1000, + 4, + "2.0 Hz - 1.0 Hz==1.0 dB", + None, + np.zeros((1000, 1000)), + True, + { + "ping_time": 200, + "range_sample": 200, + }, + ), + ( + 750, + 4, + None, + '"chan3" - "chan1"==1.0 dB', + np.zeros((750, 750)), + True, + { + "ping_time": 60, + "range_sample": 40, + }, + ), + ( + 5, + 4, + "1.0 Hz-2.0Hz>=1.0dB", + None, + np.identity(5), + True, + { + "ping_time": -1, + "range_sample": -1, + }, + ), + ( + 600, + 4, + None, + '"chan1" - "chan3" >= 1.0 dB', + np.identity(600), + True, + { + "ping_time": 120, + "range_sample": 60, + }, + ), + ( + 700, + 4, + "1.0 kHz - 2.0 kHz > 1.0dB", + None, + np.zeros((700, 700)), + True, + { + "ping_time": 100, + "range_sample": 100, + }, + ), + ( + 800, + 4, + None, + '"chan1"-"chan3">1.0 dB', + np.zeros((800, 800)), + True, + { + "ping_time": 120, + "range_sample": 150, + }, + ), + ( + 5, + 4, + "1.0kHz-2.0 kHz<=1.0dB", + None, + np.ones((5, 5)), + True, + { + "ping_time": -1, + "range_sample": -1, + }, + ), + ( + 500, + 4, + None, + '"chan1" - "chan3" <= 1.0 dB', + np.ones((500, 500)), + True, + { + "ping_time": 50, + "range_sample": 50, + }, + ), + ( + 500, + 4, + "1.0 Hz-2.0Hz<1.0dB", + None, + np.ones((500, 500)) - np.identity(500), + True, + { + "ping_time": 100, + "range_sample": 100, + }, + ), + ( + 10000, + 4, + None, + '"chan1"-"chan3"< 1.0 dB', + np.ones((10000, 10000)) - np.identity(10000), + True, + { + "ping_time": 1000, + "range_sample": 1000, + }, + ), + ], + ids=[ + "freqAB_sel_op_equals", + "chanAB_sel_op_equals", + "reverse_freqAB_sel_op_equals", + "reverse_chanAB_sel_op_equals", + "freqAB_sel_op_ge", + "chanAB_sel_op_ge", + "freqAB_sel_op_greater", + "chanAB_sel_op_greater", + "freqAB_sel_op_le", + "chanAB_sel_op_le", + "freqAB_sel_op_less", + "chanAB_sel_op_less", + # Dask Arrays + "freqAB_sel_op_equals_dask", + "chanAB_sel_op_equals_dask", + "reverse_freqAB_sel_op_equals_dask", + "reverse_chanAB_sel_op_equals_dask", + "freqAB_sel_op_ge_dask", + "chanAB_sel_op_ge_dask", + "freqAB_sel_op_greater_dask", + "chanAB_sel_op_greater_dask", + "freqAB_sel_op_le_dask", + "chanAB_sel_op_le_dask", + "freqAB_sel_op_less_dask", + "chanAB_sel_op_less_dask", ], - ids=["freqAB_sel_op_equals", "chanAB_sel_op_equals", "reverse_freqAB_sel_op_equals", - "reverse_chanAB_sel_op_equals", "freqAB_sel_op_ge", "chanAB_sel_op_ge", - "freqAB_sel_op_greater", "chanAB_sel_op_greater", "freqAB_sel_op_le", - "chanAB_sel_op_le", "freqAB_sel_op_less", "chanAB_sel_op_less"] ) -def test_frequency_differencing(n: int, n_chan_freq: int, - freqABEq: str, chanABEq: str, - mask_truth: np.ndarray): +def test_frequency_differencing( + n: int, + n_chan_freq: int, + freqABEq: str, + chanABEq: str, + mask_truth: np.ndarray, + dask_array: bool, + chunks: dict, +): """ Tests that the output values of ``frequency_differencing`` are what we expect, the output is a DataArray, and that the name of the DataArray is correct. @@ -401,17 +676,36 @@ def test_frequency_differencing(n: int, n_chan_freq: int, in the criteria are enclosed in double quotes. mask_truth: np.ndarray The truth value for the output mask, provided the given inputs + dask_array: bool + The boolean value to decide if the mock Sv Dataset is dask or not + chunks: dict + The chunk sizes along ping_time and range_sample dimension """ # obtain mock Sv Dataset - mock_Sv_ds = get_mock_freq_diff_data(n, n_chan_freq, add_chan=True, add_freq_nom=True) + mock_Sv_ds = get_mock_freq_diff_data( + n, + n_chan_freq, + add_chan=True, + add_freq_nom=True, + dask_array=dask_array, + chunks=chunks, + ) + + if dask_array: + assert mock_Sv_ds["Sv"].chunks != None # obtain the frequency-difference mask for mock_Sv_ds - out = ep.mask.frequency_differencing(source_Sv=mock_Sv_ds, storage_options={}, freqABEq=freqABEq, - chanABEq=chanABEq) + out = ep.mask.frequency_differencing( + source_Sv=mock_Sv_ds, storage_options={}, freqABEq=freqABEq, chanABEq=chanABEq + ) - # ensure that the output values are correct - assert np.all(out == mask_truth) + if dask_array: + # ensure that the output values are correct + assert np.all(out.data.compute() == mask_truth) + else: + # ensure that the output values are correct + assert np.all(out == mask_truth) # ensure that the output is a DataArray assert isinstance(out, xr.DataArray) @@ -429,18 +723,31 @@ def test_frequency_differencing(n: int, n_chan_freq: int, (5, 1, np.identity(5), "path/to/mask.zarr", {}), (5, 1, [np.identity(5), np.identity(5)], ["path/to/mask0.zarr", "path/to/mask1.zarr"], {}), (5, 1, np.identity(5), pathlib.Path("path/to/mask.zarr"), {}), - (5, 1, [np.identity(5), np.identity(5), np.identity(5)], - [None, "path/to/mask0.zarr", pathlib.Path("path/to/mask1.zarr")], {}) + ( + 5, + 1, + [np.identity(5), np.identity(5), np.identity(5)], + [None, "path/to/mask0.zarr", pathlib.Path("path/to/mask1.zarr")], + {}, + ), + ], + ids=[ + "mask_da", + "mask_list_da_single_storage", + "mask_list_da_list_storage", + "mask_str_path", + "mask_list_str_path", + "mask_pathlib", + "mask_mixed_da_str_pathlib", ], - ids=["mask_da", "mask_list_da_single_storage", "mask_list_da_list_storage", "mask_str_path", - "mask_list_str_path", "mask_pathlib", "mask_mixed_da_str_pathlib"] ) def test_validate_and_collect_mask_input( - n: int, - n_chan: int, - mask_np: Union[np.ndarray, List[np.ndarray]], - mask_file: Optional[Union[str, pathlib.Path, List[Union[str, pathlib.Path]]]], - storage_options_mask: Union[dict, List[dict]]): + n: int, + n_chan: int, + mask_np: Union[np.ndarray, List[np.ndarray]], + mask_file: Optional[Union[str, pathlib.Path, List[Union[str, pathlib.Path]]]], + storage_options_mask: Union[dict, List[dict]], +): """ Tests the allowable types for the mask input and corresponding storage options. @@ -468,67 +775,116 @@ def test_validate_and_collect_mask_input( """ # construct channel values - chan_vals = ['chan' + str(i) for i in range(1, n_chan + 1)] + chan_vals = ["chan" + str(i) for i in range(1, n_chan + 1)] # create coordinates that will be used by all DataArrays created - coords = {"channel": ("channel", chan_vals, {"long_name": "channel name"}), - "ping_time": np.arange(n), "range_sample": np.arange(n)} + coords = { + "channel": ("channel", chan_vals, {"long_name": "channel name"}), + "ping_time": np.arange(n), + "range_sample": np.arange(n), + } # create input mask and obtain temporary directory, if it was created mask, _ = create_input_mask(mask_np, mask_file, coords) - mask_out = _validate_and_collect_mask_input(mask=mask, storage_options_mask=storage_options_mask) + mask_out = _validate_and_collect_mask_input( + mask=mask, storage_options_mask=storage_options_mask + ) if isinstance(mask_out, list): for ind, da in enumerate(mask_out): - # create known solution for mask - mask_da = xr.DataArray(data=[mask_np[ind] for i in range(n_chan)], - coords=coords, name='mask_' + str(ind)) + mask_da = xr.DataArray( + data=[mask_np[ind] for i in range(n_chan)], coords=coords, name="mask_" + str(ind) + ) assert da.identical(mask_da) else: - # create known solution for mask - mask_da = xr.DataArray(data=[mask_np for i in range(n_chan)], - coords=coords, name='mask_0') + mask_da = xr.DataArray(data=[mask_np for i in range(n_chan)], coords=coords, name="mask_0") assert mask_out.identical(mask_da) @pytest.mark.parametrize( ("n", "n_chan", "var_name", "fill_value"), [ - pytest.param(4, 2, 2.0, np.nan, - marks=pytest.mark.xfail(strict=True, - reason="This should fail because the var_name is not a string.")), - pytest.param(4, 2, "var3", np.nan, - marks=pytest.mark.xfail(strict=True, - reason="This should fail because mock_ds will " - "not have var_name=var3 in it.")), - pytest.param(4, 2, "var1", "1.0", - marks=pytest.mark.xfail(strict=True, - reason="This should fail because fill_value is an incorrect type.")), + pytest.param( + 4, + 2, + 2.0, + np.nan, + marks=pytest.mark.xfail( + strict=True, reason="This should fail because the var_name is not a string." + ), + ), + pytest.param( + 4, + 2, + "var3", + np.nan, + marks=pytest.mark.xfail( + strict=True, + reason="This should fail because mock_ds will " "not have var_name=var3 in it.", + ), + ), + pytest.param( + 4, + 2, + "var1", + "1.0", + marks=pytest.mark.xfail( + strict=True, reason="This should fail because fill_value is an incorrect type." + ), + ), (4, 2, "var1", 1), (4, 2, "var1", 1.0), (2, 1, "var1", np.identity(2)[None, :]), - (2, 1, "var1", xr.DataArray(data=np.array([[[1.0, 0], [0, 1]]]), - coords={"channel": ["chan1"], "ping_time": [0, 1], "range_sample": [0, 1]}) - ), - pytest.param(4, 2, "var1", np.identity(2), - marks=pytest.mark.xfail(strict=True, - reason="This should fail because fill_value is not the right shape.")), - pytest.param(4, 2, "var1", - xr.DataArray(data=np.array([[1.0, 0], [0, 1]]), - coords={"ping_time": [0, 1], "range_sample": [0, 1]}), - marks=pytest.mark.xfail(strict=True, - reason="This should fail because fill_value is not the right shape.")), + ( + 2, + 1, + "var1", + xr.DataArray( + data=np.array([[[1.0, 0], [0, 1]]]), + coords={"channel": ["chan1"], "ping_time": [0, 1], "range_sample": [0, 1]}, + ), + ), + pytest.param( + 4, + 2, + "var1", + np.identity(2), + marks=pytest.mark.xfail( + strict=True, reason="This should fail because fill_value is not the right shape." + ), + ), + pytest.param( + 4, + 2, + "var1", + xr.DataArray( + data=np.array([[1.0, 0], [0, 1]]), + coords={"ping_time": [0, 1], "range_sample": [0, 1]}, + ), + marks=pytest.mark.xfail( + strict=True, reason="This should fail because fill_value is not the right shape." + ), + ), + ], + ids=[ + "wrong_var_name_type", + "no_var_name_ds", + "wrong_fill_value_type", + "fill_value_int", + "fill_value_float", + "fill_value_np_array", + "fill_value_DataArray", + "fill_value_np_array_wrong_shape", + "fill_value_DataArray_wrong_shape", ], - ids=["wrong_var_name_type", "no_var_name_ds", "wrong_fill_value_type", "fill_value_int", - "fill_value_float", "fill_value_np_array", "fill_value_DataArray", - "fill_value_np_array_wrong_shape", "fill_value_DataArray_wrong_shape"] ) -def test_check_var_name_fill_value(n: int, n_chan: int, var_name: str, - fill_value: Union[int, float, np.ndarray, xr.DataArray]): +def test_check_var_name_fill_value( + n: int, n_chan: int, var_name: str, fill_value: Union[int, float, np.ndarray, xr.DataArray] +): """ Ensures that the function ``_check_var_name_fill_value`` is behaving as expected. @@ -552,36 +908,121 @@ def test_check_var_name_fill_value(n: int, n_chan: int, var_name: str, @pytest.mark.parametrize( - ("n", "n_chan", "var_name", "mask", "mask_file", "fill_value", "is_delayed", "var_masked_truth", "no_channel"), + ( + "n", + "n_chan", + "var_name", + "mask", + "mask_file", + "fill_value", + "is_delayed", + "var_masked_truth", + "no_channel", + ), [ # single_mask_default_fill - (2, 1, "var1", np.identity(2), None, np.nan, False, np.array([[1, np.nan], [np.nan, 1]]), False), + ( + 2, + 1, + "var1", + np.identity(2), + None, + np.nan, + False, + np.array([[1, np.nan], [np.nan, 1]]), + False, + ), # single_mask_default_fill_no_channel - (2, 1, "var1", np.identity(2), None, np.nan, False, np.array([[1, np.nan], [np.nan, 1]]), True), + ( + 2, + 1, + "var1", + np.identity(2), + None, + np.nan, + False, + np.array([[1, np.nan], [np.nan, 1]]), + True, + ), # single_mask_float_fill (2, 1, "var1", np.identity(2), None, 2.0, False, np.array([[1, 2.0], [2.0, 1]]), False), # single_mask_np_array_fill - (2, 1, "var1", np.identity(2), None, np.array([[[np.nan, np.nan], [np.nan, np.nan]]]), - False, np.array([[1, np.nan], [np.nan, 1]]), False), + ( + 2, + 1, + "var1", + np.identity(2), + None, + np.array([[[np.nan, np.nan], [np.nan, np.nan]]]), + False, + np.array([[1, np.nan], [np.nan, 1]]), + False, + ), # single_mask_DataArray_fill - (2, 1, "var1", np.identity(2), None, xr.DataArray(data=np.array([[[np.nan, np.nan], [np.nan, np.nan]]]), - coords={"channel": ["chan1"], - "ping_time": [0, 1], - "range_sample": [0, 1]}), - False, np.array([[1, np.nan], [np.nan, 1]]), False), + ( + 2, + 1, + "var1", + np.identity(2), + None, + xr.DataArray( + data=np.array([[[np.nan, np.nan], [np.nan, np.nan]]]), + coords={"channel": ["chan1"], "ping_time": [0, 1], "range_sample": [0, 1]}, + ), + False, + np.array([[1, np.nan], [np.nan, 1]]), + False, + ), # list_mask_all_np - (2, 1, "var1", [np.identity(2), np.array([[0, 1], [0, 1]])], [None, None], 2.0, - False, np.array([[2.0, 2.0], [2.0, 1]]), False), + ( + 2, + 1, + "var1", + [np.identity(2), np.array([[0, 1], [0, 1]])], + [None, None], + 2.0, + False, + np.array([[2.0, 2.0], [2.0, 1]]), + False, + ), # single_mask_ds_delayed (2, 1, "var1", np.identity(2), None, 2.0, True, np.array([[1, 2.0], [2.0, 1]]), False), # single_mask_as_path - (2, 1, "var1", np.identity(2), "test.zarr", 2.0, True, np.array([[1, 2.0], [2.0, 1]]), False), + ( + 2, + 1, + "var1", + np.identity(2), + "test.zarr", + 2.0, + True, + np.array([[1, 2.0], [2.0, 1]]), + False, + ), # list_mask_all_path - (2, 1, "var1", [np.identity(2), np.array([[0, 1], [0, 1]])], ["test0.zarr", "test1.zarr"], 2.0, - False, np.array([[2.0, 2.0], [2.0, 1]]), False), + ( + 2, + 1, + "var1", + [np.identity(2), np.array([[0, 1], [0, 1]])], + ["test0.zarr", "test1.zarr"], + 2.0, + False, + np.array([[2.0, 2.0], [2.0, 1]]), + False, + ), # list_mask_some_path - (2, 1, "var1", [np.identity(2), np.array([[0, 1], [0, 1]])], ["test0.zarr", None], 2.0, - False, np.array([[2.0, 2.0], [2.0, 1]]), False), + ( + 2, + 1, + "var1", + [np.identity(2), np.array([[0, 1], [0, 1]])], + ["test0.zarr", None], + 2.0, + False, + np.array([[2.0, 2.0], [2.0, 1]]), + False, + ), ], ids=[ "single_mask_default_fill", @@ -593,16 +1034,20 @@ def test_check_var_name_fill_value(n: int, n_chan: int, var_name: str, "single_mask_ds_delayed", "single_mask_as_path", "list_mask_all_path", - "list_mask_some_path" - ] + "list_mask_some_path", + ], ) -def test_apply_mask(n: int, n_chan: int, var_name: str, - mask: Union[np.ndarray, List[np.ndarray]], - mask_file: Optional[Union[str, List[str]]], - fill_value: Union[int, float, np.ndarray, xr.DataArray], - is_delayed: bool, - var_masked_truth: np.ndarray, - no_channel: bool): +def test_apply_mask( + n: int, + n_chan: int, + var_name: str, + mask: Union[np.ndarray, List[np.ndarray]], + mask_file: Optional[Union[str, List[str]]], + fill_value: Union[int, float, np.ndarray, xr.DataArray], + is_delayed: bool, + var_masked_truth: np.ndarray, + no_channel: bool, +): """ Ensures that ``apply_mask`` functions correctly. @@ -636,8 +1081,11 @@ def test_apply_mask(n: int, n_chan: int, var_name: str, mask, temp_dir = create_input_mask(mask, mask_file, mock_ds.coords) # create DataArray form of the known truth value - var_masked_truth = xr.DataArray(data=np.stack([var_masked_truth for i in range(n_chan)]), - coords=mock_ds[var_name].coords, attrs=mock_ds[var_name].attrs) + var_masked_truth = xr.DataArray( + data=np.stack([var_masked_truth for i in range(n_chan)]), + coords=mock_ds[var_name].coords, + attrs=mock_ds[var_name].attrs, + ) var_masked_truth.name = mock_ds[var_name].name if no_channel: @@ -646,9 +1094,14 @@ def test_apply_mask(n: int, n_chan: int, var_name: str, var_masked_truth = var_masked_truth.isel(channel=0) # apply the mask to var_name - masked_ds = echopype.mask.apply_mask(source_ds=mock_ds, var_name=var_name, mask=mask, - fill_value=fill_value, storage_options_ds={}, - storage_options_mask={}) + masked_ds = echopype.mask.apply_mask( + source_ds=mock_ds, + var_name=var_name, + mask=mask, + fill_value=fill_value, + storage_options_ds={}, + storage_options_mask={}, + ) # check that masked_ds[var_name] == var_masked_truth assert masked_ds[var_name].equals(var_masked_truth) @@ -675,10 +1128,9 @@ def test_apply_mask(n: int, n_chan: int, var_name: str, "source_no_ch_mask_with_ch", "source_with_ch_mask_no_ch", "source_no_ch_mask_no_ch", - ] + ], ) def test_apply_mask_channel_variation(source_has_ch, mask_has_ch): - source_ds = get_mock_source_ds_apply_mask(2, 3, False) var_name = "var1" @@ -706,14 +1158,18 @@ def test_apply_mask_channel_variation(source_has_ch, mask_has_ch): if source_has_ch: truth_da = xr.DataArray( np.array([[[1, np.nan], [np.nan, 1]]] * 3), - coords={"channel": ["chan1", "chan2", "chan3"], "ping_time": np.arange(2), "range_sample": np.arange(2)}, - attrs=source_ds[var_name].attrs + coords={ + "channel": ["chan1", "chan2", "chan3"], + "ping_time": np.arange(2), + "range_sample": np.arange(2), + }, + attrs=source_ds[var_name].attrs, ) else: truth_da = xr.DataArray( [[1, np.nan], [np.nan, 1]], coords={"ping_time": np.arange(2), "range_sample": np.arange(2)}, - attrs=source_ds[var_name].attrs + attrs=source_ds[var_name].attrs, ) assert masked_ds[var_name].equals(truth_da) From 8600a591f11f356b670edf52f3b0423e33137b2e Mon Sep 17 00:00:00 2001 From: anujsinha3 Date: Tue, 26 Mar 2024 11:32:09 -0700 Subject: [PATCH 5/5] Optimize get vend cal params power (#1285) * Optimize get vend cal params power (#2) * Refactor function to avoid expand_dims and sortby if required * test: increase co-ordinates to 1000 for ping_time dimension * style: upper-casing variables * style: upper-casing variables * style: revert --------- Co-authored-by: Anant Mittal * test: update test_get_interp_da() to test on 200 time_coordinates (#3) * test: refactor test data (#4) * test: refactor test data (#5) * test: refactor test data * test: refactor test data --------- Co-authored-by: Anant Mittal --- echopype/calibrate/cal_params.py | 14 +- echopype/tests/calibrate/test_cal_params.py | 552 ++++++++++++++------ 2 files changed, 398 insertions(+), 168 deletions(-) diff --git a/echopype/calibrate/cal_params.py b/echopype/calibrate/cal_params.py index c4de27bff..ef3b04ba9 100644 --- a/echopype/calibrate/cal_params.py +++ b/echopype/calibrate/cal_params.py @@ -286,7 +286,7 @@ def get_vend_cal_params_power(beam: xr.Dataset, vend: xr.Dataset, param: str) -> raise ValueError(f"{param} does not exist in the Vendor_specific group!") # Find idx to select the corresponding param value - # by matching beam["transmit_duration_nominal"] with ds_vend["pulse_length"] + # by matching beam["transmit_duration_nominal"] with vend["pulse_length"] transmit_isnull = beam["transmit_duration_nominal"].isnull() idxmin = np.abs(beam["transmit_duration_nominal"] - vend["pulse_length"]).idxmin( dim="pulse_length_bin" @@ -297,13 +297,13 @@ def get_vend_cal_params_power(beam: xr.Dataset, vend: xr.Dataset, param: str) -> idxmin = idxmin.where(~transmit_isnull, 0).astype(int) # Get param dataarray into correct shape - da_param = ( - vend[param] - .expand_dims(dim={"ping_time": idxmin["ping_time"]}) # expand dims for direct indexing - .sortby(idxmin.channel) # sortby in case channel sequence differs in vend and beam - ) + da_param = vend[param].transpose("pulse_length_bin", "channel") + + if not np.array_equal(da_param.channel.data, idxmin.channel.data): + da_param = da_param.sortby( + da_param.channel, ascending=False + ) # sortby because channel sequence differs in vend and beam - # Select corresponding index and clean up the original nan elements da_param = da_param.sel(pulse_length_bin=idxmin, drop=True) # Set the nan elements back to nan. diff --git a/echopype/tests/calibrate/test_cal_params.py b/echopype/tests/calibrate/test_cal_params.py index b6332438a..1f48c64a4 100644 --- a/echopype/tests/calibrate/test_cal_params.py +++ b/echopype/tests/calibrate/test_cal_params.py @@ -2,19 +2,27 @@ import numpy as np import xarray as xr +from xarray.testing import assert_allclose from echopype.calibrate.cal_params import ( - CAL_PARAMS, param2da, sanitize_user_cal_dict, _get_interp_da, - get_cal_params_AZFP, get_cal_params_EK, get_vend_cal_params_power + CAL_PARAMS, + param2da, + sanitize_user_cal_dict, + _get_interp_da, + get_cal_params_AZFP, + get_cal_params_EK, + get_vend_cal_params_power, ) +DATA = np.random.rand(2, 200) +TIME_COORDINATES = np.ones(200) * 1000 @pytest.fixture def freq_center(): return xr.DataArray( - [[25, 55]], - dims=["ping_time", "channel"], - coords={"channel": ["chA", "chB"], "ping_time": [1]} + DATA, + dims=["channel", "ping_time"], + coords={"channel": ["chA", "chB"], "ping_time": TIME_COORDINATES}, ) @@ -39,9 +47,9 @@ def beam_AZFP(): """ beam = xr.Dataset() beam["equivalent_beam_angle"] = xr.DataArray( - [[10, 20]], - dims=["ping_time", "channel"], - coords={"channel": ["chA", "chB"], "ping_time": [1]}, + DATA, + dims=["channel", "ping_time"], + coords={"channel": ["chA", "chB"], "ping_time": TIME_COORDINATES}, ) return beam.transpose("channel", "ping_time") @@ -59,15 +67,11 @@ def vend_EK(): coords={"channel": ["chA", "chB"], "pulse_length_bin": [0, 1, 2, 3]}, ) vend["pulse_length"] = xr.DataArray( - np.array([[64, 128, 256, 512], [128, 256, 512, 1024]]), - coords={"channel": vend["channel"], "pulse_length_bin": vend["pulse_length_bin"]} - ) - vend["impedance_transceiver"] = xr.DataArray( - [1000, 2000], coords={"channel": vend["channel"]} - ) - vend["transceiver_type"] = xr.DataArray( - ["WBT", "WBT"], coords={"channel": vend["channel"]} + np.array([[64, 128, 256, 512], [128, 256, 512, 1024]]), + coords={"channel": vend["channel"], "pulse_length_bin": vend["pulse_length_bin"]}, ) + vend["impedance_transceiver"] = xr.DataArray([1000, 2000], coords={"channel": vend["channel"]}) + vend["transceiver_type"] = xr.DataArray(["WBT", "WBT"], coords={"channel": vend["channel"]}) return vend @@ -79,16 +83,21 @@ def beam_EK(): beam = xr.Dataset() for p_name in [ "equivalent_beam_angle", - "angle_offset_alongship", "angle_offset_athwartship", - "angle_sensitivity_alongship", "angle_sensitivity_athwartship", - "beamwidth_twoway_alongship", "beamwidth_twoway_athwartship" + "angle_offset_alongship", + "angle_offset_athwartship", + "angle_sensitivity_alongship", + "angle_sensitivity_athwartship", + "beamwidth_twoway_alongship", + "beamwidth_twoway_athwartship", ]: beam[p_name] = xr.DataArray( - np.array([[123], [456]]), + DATA, dims=["channel", "ping_time"], - coords={"channel": ["chA", "chB"], "ping_time": [1]}, + coords={"channel": ["chA", "chB"], "ping_time": TIME_COORDINATES}, ) - beam["frequency_nominal"] = xr.DataArray([25, 55], dims=["channel"], coords={"channel": ["chA", "chB"]}) + beam["frequency_nominal"] = xr.DataArray( + [25, 55], dims=["channel"], coords={"channel": ["chA", "chB"]} + ) return beam.transpose("channel", "ping_time") @@ -96,24 +105,32 @@ def beam_EK(): ("p_val", "channel", "da_output"), [ # input p_val a scalar, input channel a list - (1, ["chA", "chB"], xr.DataArray([1, 1], dims=["channel"], coords={"channel": ["chA", "chB"]})), + ( + 1, + ["chA", "chB"], + xr.DataArray([1, 1], dims=["channel"], coords={"channel": ["chA", "chB"]}), + ), # input p_val a list, input channel an xr.DataArray ( [1, 2], xr.DataArray(["chA", "chB"], dims=["channel"], coords={"channel": ["chA", "chB"]}), - xr.DataArray([1, 2], dims=["channel"], coords={"channel": ["chA", "chB"]}) + xr.DataArray([1, 2], dims=["channel"], coords={"channel": ["chA", "chB"]}), ), # input p_val a list with the wrong length: this should fail pytest.param( - [1, 2, 3], ["chA", "chB"], None, - marks=pytest.mark.xfail(strict=True, reason="Fail since lengths of p_val and channel are not identical") + [1, 2, 3], + ["chA", "chB"], + None, + marks=pytest.mark.xfail( + strict=True, reason="Fail since lengths of p_val and channel are not identical" + ), ), ], ids=[ "in_p_val_scalar_channel_list", "in_p_val_list_channel_xrda", "in_p_val_list_wrong_length", - ] + ], ) def test_param2da(p_val, channel, da_output): da_assembled = param2da(p_val, channel) @@ -125,14 +142,24 @@ def test_param2da(p_val, channel, da_output): [ # sonar_type only allows EK or AZFP pytest.param( - "XYZ", None, None, None, - marks=pytest.mark.xfail(strict=True, reason="Fail since sonar_type is not 'EK' nor 'AZFP'") + "XYZ", + None, + None, + None, + marks=pytest.mark.xfail( + strict=True, reason="Fail since sonar_type is not 'EK' nor 'AZFP'" + ), ), # input channel # - is not a list nor an xr.DataArray: fail with value error pytest.param( - "EK80", 1, None, None, - marks=pytest.mark.xfail(strict=True, reason="Fail since channel has to be either a list or an xr.DataArray"), + "EK80", + 1, + None, + None, + marks=pytest.mark.xfail( + strict=True, reason="Fail since channel has to be either a list or an xr.DataArray" + ), ), # TODO: input channel has different order than those in the inarg channel # input param dict @@ -143,27 +170,51 @@ def test_param2da(p_val, channel, da_output): # - is xr.DataArray without channel coorindate: fail with value error pytest.param( "EK80", - {"sa_correction": xr.DataArray([1, 1], dims=["some_coords"], coords={"some_coords": ["A", "B"]})}, - ["chA", "chB"], None, - marks=pytest.mark.xfail(strict=True, reason="input sa_correction does not contain a 'channel' coordinate"), + { + "sa_correction": xr.DataArray( + [1, 1], dims=["some_coords"], coords={"some_coords": ["A", "B"]} + ) + }, + ["chA", "chB"], + None, + marks=pytest.mark.xfail( + strict=True, reason="input sa_correction does not contain a 'channel' coordinate" + ), ), # input individual param: # - with channel coordinate but not identical to argin channel: fail with value error pytest.param( "EK80", - {"sa_correction": xr.DataArray([1, 1], dims=["channel"], coords={"channel": ["chA", "B"]})}, - ["chA", "chB"], None, - marks=pytest.mark.xfail(strict=True, - reason="input sa_correction contains a 'channel' coordinate but it is not identical with input channel"), + { + "sa_correction": xr.DataArray( + [1, 1], dims=["channel"], coords={"channel": ["chA", "B"]} + ) + }, + ["chA", "chB"], + None, + marks=pytest.mark.xfail( + strict=True, + reason="input sa_correction contains a 'channel' coordinate but it is not identical with input channel", + ), ), # input individual param: # - with channel coordinate identical to argin channel: should pass pytest.param( "EK80", - {"sa_correction": xr.DataArray([1, 1], dims=["channel"], coords={"channel": ["chA", "chB"]})}, + { + "sa_correction": xr.DataArray( + [1, 1], dims=["channel"], coords={"channel": ["chA", "chB"]} + ) + }, ["chA", "chB"], - dict(dict.fromkeys(CAL_PARAMS["EK80"]), - **{"sa_correction": xr.DataArray([1, 1], dims=["channel"], coords={"channel": ["chA", "chB"]})}), + dict( + dict.fromkeys(CAL_PARAMS["EK80"]), + **{ + "sa_correction": xr.DataArray( + [1, 1], dims=["channel"], coords={"channel": ["chA", "chB"]} + ) + }, + ), ), # input individual param: # - a scalar needing to be organized to xr.DataArray at output via param2da: should pass @@ -171,8 +222,14 @@ def test_param2da(p_val, channel, da_output): "EK80", {"sa_correction": 1}, ["chA", "chB"], - dict(dict.fromkeys(CAL_PARAMS["EK80"]), - **{"sa_correction": xr.DataArray([1, 1], dims=["channel"], coords={"channel": ["chA", "chB"]})}), + dict( + dict.fromkeys(CAL_PARAMS["EK80"]), + **{ + "sa_correction": xr.DataArray( + [1, 1], dims=["channel"], coords={"channel": ["chA", "chB"]} + ) + }, + ), ), # input individual param: # - a list needing to be organized to xr.DataArray at output via param2da: should pass @@ -180,15 +237,26 @@ def test_param2da(p_val, channel, da_output): "EK80", {"sa_correction": [1, 2]}, ["chA", "chB"], - dict(dict.fromkeys(CAL_PARAMS["EK80"]), - **{"sa_correction": xr.DataArray([1, 2], dims=["channel"], coords={"channel": ["chA", "chB"]})}), - ), + dict( + dict.fromkeys(CAL_PARAMS["EK80"]), + **{ + "sa_correction": xr.DataArray( + [1, 2], dims=["channel"], coords={"channel": ["chA", "chB"]} + ) + }, + ), + ), # input individual param: # - a list with wrong length (ie not identical to channel): fail with value error pytest.param( - "EK80", {"sa_correction": [1, 2, 3]}, ["chA", "chB"], None, - marks=pytest.mark.xfail(strict=True, - reason="input sa_correction contains a list of wrong length that does not match that of channel"), + "EK80", + {"sa_correction": [1, 2, 3]}, + ["chA", "chB"], + None, + marks=pytest.mark.xfail( + strict=True, + reason="input sa_correction contains a list of wrong length that does not match that of channel", + ), ), ], ids=[ @@ -221,27 +289,45 @@ def test_sanitize_user_cal_dict(sonar_type, user_dict, channel, out_dict): ( None, 1, - xr.DataArray([[1], [1]], dims=["channel", "ping_time"], coords={"channel": ["chA", "chB"], "ping_time": [1]}) + xr.DataArray( + np.ones((2, 200)), + dims=["channel", "ping_time"], + coords={"channel": ["chA", "chB"], "ping_time": TIME_COORDINATES}, + ), ), # da_param: alternative is xr.DataArray: output selected with the right channel ( None, xr.DataArray([1, 1, 2], dims=["channel"], coords={"channel": ["chA", "chB", "chC"]}), - xr.DataArray([[1], [1]], dims=["channel", "ping_time"], coords={"channel": ["chA", "chB"], "ping_time": [1]}) + xr.DataArray( + np.ones((2, 200)), + dims=["channel", "ping_time"], + coords={"channel": ["chA", "chB"], "ping_time": TIME_COORDINATES}, + ), ), # da_param: xr.DataArray with freq-dependent values/coordinates # - output should be interpolated with the right values ( xr.DataArray( - np.array([[1, 2, 3, np.nan, np.nan, np.nan], - [np.nan, np.nan, np.nan, 4, 5, 6], - [np.nan, 2, 3, 4, np.nan, np.nan]]), + np.array( + [ + [1, 2, 3, np.nan, np.nan, np.nan], + [np.nan, np.nan, np.nan, 4, 5, 6], + [np.nan, 2, 3, 4, np.nan, np.nan], + ] + ), dims=["cal_channel_id", "cal_frequency"], - coords={"cal_channel_id": ["chA", "chB", "chC"], - "cal_frequency": [10, 20, 30, 40, 50, 60]}, + coords={ + "cal_channel_id": ["chA", "chB", "chC"], + "cal_frequency": [10, 20, 30, 40, 50, 60], + }, ), None, - xr.DataArray([[2.5], [5.5]], dims=["channel", "ping_time"], coords={"ping_time": [1], "channel": ["chA", "chB"]}), + xr.DataArray( + np.full((2, 200), np.nan), + dims=["channel", "ping_time"], + coords={"ping_time": TIME_COORDINATES, "channel": ["chA", "chB"]}, + ), ), # da_param: xr.DataArray with only one channel having freq-dependent values/coordinates # - that single channel should be interpolated with the right value @@ -252,14 +338,13 @@ def test_sanitize_user_cal_dict(sonar_type, user_dict, channel, out_dict): xr.DataArray( np.array([[np.nan, np.nan, np.nan, 4, 5, 6]]), dims=["cal_channel_id", "cal_frequency"], - coords={"cal_channel_id": ["chB"], - "cal_frequency": [10, 20, 30, 40, 50, 60]}, + coords={"cal_channel_id": ["chB"], "cal_frequency": [10, 20, 30, 40, 50, 60]}, ), 75, xr.DataArray( - [[75], [5.5]], + np.vstack([np.full(200, 75), np.full(200, np.nan)]), dims=["channel", "ping_time"], - coords={"ping_time": [1], "channel": ["chA", "chB"]} + coords={"ping_time": TIME_COORDINATES, "channel": ["chA", "chB"]}, ), ), # - xr.DataArray with coordinates channel, ping_time @@ -267,26 +352,25 @@ def test_sanitize_user_cal_dict(sonar_type, user_dict, channel, out_dict): xr.DataArray( np.array([[np.nan, np.nan, np.nan, 4, 5, 6]]), dims=["cal_channel_id", "cal_frequency"], - coords={"cal_channel_id": ["chB"], - "cal_frequency": [10, 20, 30, 40, 50, 60]}, + coords={"cal_channel_id": ["chB"], "cal_frequency": [10, 20, 30, 40, 50, 60]}, ), xr.DataArray( - np.array([[100], [200]]), + DATA, dims=["channel", "ping_time"], - coords={"ping_time": [1], "channel": ["chA", "chB"]}, + coords={"ping_time": TIME_COORDINATES, "channel": ["chA", "chB"]}, ), xr.DataArray( - [[100], [5.5]], + np.vstack([DATA[0,:], np.full(200, np.nan)]), dims=["channel", "ping_time"], - coords={"ping_time": [1], "channel": ["chA", "chB"]} + coords={"ping_time": TIME_COORDINATES, "channel": ["chA", "chB"]}, ), - # TODO: cases where freq_center does not have the ping_time dimension - # this is the case for CW data since freq_center = beam["frequency_nominal"] - # this was caught by the file in test_compute_Sv_ek80_CW_complex() - # TODO: cases where freq_center contains only a single frequency - # in this case had to use freq_center.sel(channel=ch_id).size because - # len(freq_center.sel(channel=ch_id)) is an invalid statement - # this was caught by the file in test_compute_Sv_ek80_CW_power_BB_complex() + # TODO: cases where freq_center does not have the ping_time dimension + # this is the case for CW data since freq_center = beam["frequency_nominal"] + # this was caught by the file in test_compute_Sv_ek80_CW_complex() + # TODO: cases where freq_center contains only a single frequency + # in this case had to use freq_center.sel(channel=ch_id).size because + # len(freq_center.sel(channel=ch_id)) is an invalid statement + # this was caught by the file in test_compute_Sv_ek80_CW_power_BB_complex() ), ], ids=[ @@ -295,7 +379,7 @@ def test_sanitize_user_cal_dict(sonar_type, user_dict, channel, out_dict): "in_da_all_channel_out_interp", "in_da_some_channel_alt_scalar", "in_da_some_channel_alt_da2coords", # channel, ping_time - ] + ], ) def test_get_interp_da(freq_center, da_param, alternative, da_output): da_interp = _get_interp_da(da_param, freq_center, alternative) @@ -309,50 +393,90 @@ def test_get_interp_da(freq_center, da_param, alternative, da_output): ( {"EL": 1, "equivalent_beam_angle": 2}, dict( - {p_name: xr.DataArray([10, 20], dims=["channel"], coords={"channel": ["chA", "chB"]}) for p_name in CAL_PARAMS["AZFP"]}, + { + p_name: xr.DataArray( + [10, 20], dims=["channel"], coords={"channel": ["chA", "chB"]} + ) + for p_name in CAL_PARAMS["AZFP"] + }, **{ - "EL": xr.DataArray([1, 1], dims=["channel"], coords={"channel": ["chA", "chB"]}), - "equivalent_beam_angle": xr.DataArray([2, 2], dims=["channel"], coords={"channel": ["chA", "chB"]}), - } + "EL": xr.DataArray( + [1, 1], dims=["channel"], coords={"channel": ["chA", "chB"]} + ), + "equivalent_beam_angle": xr.DataArray( + [2, 2], dims=["channel"], coords={"channel": ["chA", "chB"]} + ), + }, ), ), # input param is a list ( {"EL": [1, 2], "equivalent_beam_angle": [3, 4]}, dict( - {p_name: xr.DataArray([10, 20], dims=["channel"], coords={"channel": ["chA", "chB"]}) for p_name in CAL_PARAMS["AZFP"]}, + { + p_name: xr.DataArray( + [10, 20], dims=["channel"], coords={"channel": ["chA", "chB"]} + ) + for p_name in CAL_PARAMS["AZFP"] + }, **{ - "EL": xr.DataArray([1, 2], dims=["channel"], coords={"channel": ["chA", "chB"]}), - "equivalent_beam_angle": xr.DataArray([3, 4], dims=["channel"], coords={"channel": ["chA", "chB"]}), - } + "EL": xr.DataArray( + [1, 2], dims=["channel"], coords={"channel": ["chA", "chB"]} + ), + "equivalent_beam_angle": xr.DataArray( + [3, 4], dims=["channel"], coords={"channel": ["chA", "chB"]} + ), + }, ), ), # input param is a list of wrong length: this should fail pytest.param( - {"EL": [1, 2, 3], "equivalent_beam_angle": [3, 4]}, None, - marks=pytest.mark.xfail(strict=True, reason="Fail since lengths of input list and channel are not identical"), + {"EL": [1, 2, 3], "equivalent_beam_angle": [3, 4]}, + None, + marks=pytest.mark.xfail( + strict=True, reason="Fail since lengths of input list and channel are not identical" + ), ), # input param is an xr.DataArray with coordinate 'channel' ( { "EL": xr.DataArray([1, 2], dims=["channel"], coords={"channel": ["chA", "chB"]}), - "equivalent_beam_angle": xr.DataArray([3, 4], dims=["channel"], coords={"channel": ["chA", "chB"]}), + "equivalent_beam_angle": xr.DataArray( + [3, 4], dims=["channel"], coords={"channel": ["chA", "chB"]} + ), }, dict( - {p_name: xr.DataArray([10, 20], dims=["channel"], coords={"channel": ["chA", "chB"]}) for p_name in CAL_PARAMS["AZFP"]}, + { + p_name: xr.DataArray( + [10, 20], dims=["channel"], coords={"channel": ["chA", "chB"]} + ) + for p_name in CAL_PARAMS["AZFP"] + }, **{ - "EL": xr.DataArray([1, 2], dims=["channel"], coords={"channel": ["chA", "chB"]}), - "equivalent_beam_angle": xr.DataArray([3, 4], dims=["channel"], coords={"channel": ["chA", "chB"]}), - } + "EL": xr.DataArray( + [1, 2], dims=["channel"], coords={"channel": ["chA", "chB"]} + ), + "equivalent_beam_angle": xr.DataArray( + [3, 4], dims=["channel"], coords={"channel": ["chA", "chB"]} + ), + }, ), ), # input param is an xr.DataArray with coordinate 'channel' but wrong length: this should fail pytest.param( { - "EL": xr.DataArray([1, 2, 3], dims=["channel"], coords={"channel": ["chA", "chB", "chC"]}), - "equivalent_beam_angle": xr.DataArray([3, 4, 5], dims=["channel"], coords={"channel": ["chA", "chB", "chC"]}), - }, None, - marks=pytest.mark.xfail(strict=True, reason="Fail since lengths of input data array channel and data channel are not identical"), + "EL": xr.DataArray( + [1, 2, 3], dims=["channel"], coords={"channel": ["chA", "chB", "chC"]} + ), + "equivalent_beam_angle": xr.DataArray( + [3, 4, 5], dims=["channel"], coords={"channel": ["chA", "chB", "chC"]} + ), + }, + None, + marks=pytest.mark.xfail( + strict=True, + reason="Fail since lengths of input data array channel and data channel are not identical", + ), ), ], ids=[ @@ -361,7 +485,7 @@ def test_get_interp_da(freq_center, da_param, alternative, da_output): "in_list_wrong_length", "in_da", "in_da_wrong_length", - ] + ], ) def test_get_cal_params_AZFP(beam_AZFP, vend_AZFP, user_dict, out_dict): cal_dict = get_cal_params_AZFP(beam=beam_AZFP, vend=vend_AZFP, user_dict=user_dict) @@ -386,46 +510,52 @@ def test_get_cal_params_AZFP(beam_AZFP, vend_AZFP, user_dict, out_dict): ( { "gain_correction": xr.DataArray( - np.array([[1, 2, 3, np.nan, np.nan, np.nan], [np.nan, np.nan, np.nan, 4, 5, 6]]), + np.array( + [[1, 2, 3, np.nan, np.nan, np.nan], [np.nan, np.nan, np.nan, 4, 5, 6]] + ), dims=["cal_channel_id", "cal_frequency"], - coords={"cal_channel_id": ["chA", "chB"], - "cal_frequency": [10, 20, 30, 40, 50, 60]}, + coords={ + "cal_channel_id": ["chA", "chB"], + "cal_frequency": [10, 20, 30, 40, 50, 60], + }, ), # add sa_correction here to bypass things going into get_vend_cal_params_power "sa_correction": xr.DataArray( - np.array([111, 222]), dims=["channel"], coords={"channel": ["chA", "chB"]}, + np.array([111, 222]), + dims=["channel"], + coords={"channel": ["chA", "chB"]}, ), }, dict( { p_name: xr.DataArray( - [[123], [456]], + DATA, dims=["channel", "ping_time"], - coords={"channel": ["chA", "chB"], "ping_time": [1]}, + coords={"channel": ["chA", "chB"], "ping_time": TIME_COORDINATES}, ) for p_name in CAL_PARAMS["EK80"] }, **{ "gain_correction": xr.DataArray( - [[2.5], [5.5]], + np.full((2, 200), np.nan), dims=["channel", "ping_time"], - coords={"ping_time": [1], "channel": ["chA", "chB"]}, + coords={"ping_time": TIME_COORDINATES, "channel": ["chA", "chB"]}, ), "sa_correction": xr.DataArray( - np.array([111, 222]), dims=["channel"], - coords={"channel": ["chA", "chB"]} + np.array([111, 222]), dims=["channel"], coords={"channel": ["chA", "chB"]} ), "impedance_transducer": xr.DataArray( - np.array([[75], [75]]), dims=["channel", "ping_time"], - coords={"channel": ["chA", "chB"], "ping_time": [1]} + np.ones((2, 200)) * 75, + dims=["channel", "ping_time"], + coords={"channel": ["chA", "chB"], "ping_time": TIME_COORDINATES}, ), "impedance_transceiver": xr.DataArray( - np.array([1000, 2000]), dims=["channel"], - coords={"channel": ["chA", "chB"]} + np.array([1000, 2000]), dims=["channel"], coords={"channel": ["chA", "chB"]} ), "receiver_sampling_frequency": xr.DataArray( - np.array([1500000, 1500000]), dims=["channel"], - coords={"channel": ["chA", "chB"]} + np.array([1500000, 1500000]), + dims=["channel"], + coords={"channel": ["chA", "chB"]}, ), }, ), @@ -436,74 +566,86 @@ def test_get_cal_params_AZFP(beam_AZFP, vend_AZFP, user_dict, out_dict): ( { "gain_correction": xr.DataArray( - np.array([[1, 2, 3, np.nan, np.nan, np.nan], [np.nan, np.nan, np.nan, 4, 5, 6]]), + np.array( + [[1, 2, 3, np.nan, np.nan, np.nan], [np.nan, np.nan, np.nan, 4, 5, 6]] + ), dims=["cal_channel_id", "cal_frequency"], - coords={"cal_channel_id": ["chA", "chB"], - "cal_frequency": [10, 20, 30, 40, 50, 60]}, + coords={ + "cal_channel_id": ["chA", "chB"], + "cal_frequency": [10, 20, 30, 40, 50, 60], + }, ), # add sa_correction here to bypass things going into get_vend_cal_params_power "sa_correction": xr.DataArray( - np.array([111, 222]), dims=["channel"], coords={"channel": ["chA", "chB"]}, + np.array([111, 222]), + dims=["channel"], + coords={"channel": ["chA", "chB"]}, ), }, dict( { p_name: xr.DataArray( - [[123], [456]], + DATA, dims=["channel", "ping_time"], - coords={"channel": ["chA", "chB"], "ping_time": [1]}, + coords={"channel": ["chA", "chB"], "ping_time": TIME_COORDINATES}, ) for p_name in CAL_PARAMS["EK80"] }, **{ "gain_correction": xr.DataArray( - np.array([[2.5], [5.5]]) * 0.79, # scaled by the factor as freq_center in function body + np.full((2, 200), np.nan) + * 0.79, # scaled by the factor as freq_center in function body dims=["channel", "ping_time"], - coords={"ping_time": [1], "channel": ["chA", "chB"]}, + coords={"ping_time": TIME_COORDINATES, "channel": ["chA", "chB"]}, ), "sa_correction": xr.DataArray( - np.array([111, 222]), dims=["channel"], - coords={"channel": ["chA", "chB"]} + np.array([111, 222]), dims=["channel"], coords={"channel": ["chA", "chB"]} ), "impedance_transducer": xr.DataArray( - np.array([[75], [75]]), dims=["channel", "ping_time"], - coords={"channel": ["chA", "chB"], "ping_time": [1]} + np.ones((2, 200)) * 75, + dims=["channel", "ping_time"], + coords={"channel": ["chA", "chB"], "ping_time": TIME_COORDINATES}, ), "impedance_transceiver": xr.DataArray( - np.array([1000, 2000]), dims=["channel"], - coords={"channel": ["chA", "chB"]} + np.array([1000, 2000]), dims=["channel"], coords={"channel": ["chA", "chB"]} ), "receiver_sampling_frequency": xr.DataArray( - np.array([1500000, 1500000]), dims=["channel"], - coords={"channel": ["chA", "chB"]} + np.array([1500000, 1500000]), + dims=["channel"], + coords={"channel": ["chA", "chB"]}, ), }, ), 0.79, # with scaling of freq_center - ), + ), pytest.param( { "gain_correction": xr.DataArray( np.array([[1, 2, 3, np.nan], [np.nan, 4, 5, 6], [np.nan, 2, 3, np.nan]]), dims=["cal_channel_id", "cal_frequency"], - coords={"cal_channel_id": ["chA", "chB", "chC"], - "cal_frequency": [10, 20, 30, 40]}, + coords={ + "cal_channel_id": ["chA", "chB", "chC"], + "cal_frequency": [10, 20, 30, 40], + }, ), }, None, 1, - marks=pytest.mark.xfail(strict=True, reason="Fail since cal_channel_id in input param does not match channel of data"), + marks=pytest.mark.xfail( + strict=True, + reason="Fail since cal_channel_id in input param does not match channel of data", + ), ), - ], ids=[ "in_da_freq_dep_no_scaling", "in_da_freq_dep_with_scaling", "in_da_freq_dep_channel_mismatch", - ] + ], ) -def test_get_cal_params_EK80_BB(beam_EK, vend_EK, freq_center, user_dict, out_dict, freq_center_scaling): - +def test_get_cal_params_EK80_BB( + beam_EK, vend_EK, freq_center, user_dict, out_dict, freq_center_scaling +): # If freq_center != beam_EK["frequency_nominal"], the following params will be scaled: # - angle_sensitivity_alongship/athwartship (by fc/fn) # - beamwidth_alongship/athwartship (by fn/fc) @@ -513,10 +655,10 @@ def test_get_cal_params_EK80_BB(beam_EK, vend_EK, freq_center, user_dict, out_di out_dict[p] = out_dict[p] * freq_center / beam_EK["frequency_nominal"] for p in ["beamwidth_alongship", "beamwidth_athwartship"]: out_dict[p] = out_dict[p] * beam_EK["frequency_nominal"] / freq_center - out_dict["equivalent_beam_angle"] = ( - out_dict["equivalent_beam_angle"] + 20 * np.log10(beam_EK["frequency_nominal"] / freq_center) + out_dict["equivalent_beam_angle"] = out_dict["equivalent_beam_angle"] + 20 * np.log10( + beam_EK["frequency_nominal"] / freq_center ) - + cal_dict = get_cal_params_EK( waveform_mode="BB", freq_center=freq_center, beam=beam_EK, vend=vend_EK, user_dict=user_dict ) @@ -526,7 +668,7 @@ def test_get_cal_params_EK80_BB(beam_EK, vend_EK, freq_center, user_dict, out_di p_val.name = None out_val = out_dict[p_name] out_val.name = None - assert p_val.identical(out_dict[p_name]) + assert_allclose(p_val, out_dict[p_name]) @pytest.mark.parametrize( @@ -538,30 +680,41 @@ def test_get_cal_params_EK80_BB(beam_EK, vend_EK, freq_center, user_dict, out_di { # add sa_correction here to bypass things going into get_vend_cal_params_power "gain_correction": xr.DataArray( - [555, 777], dims=["channel"], coords={"channel": ["chA", "chB"]}, + [555, 777], + dims=["channel"], + coords={"channel": ["chA", "chB"]}, ), # add sa_correction here to bypass things going into get_vend_cal_params_power "sa_correction": xr.DataArray( - [111, 222], dims=["channel"], coords={"channel": ["chA", "chB"]}, - ) + [111, 222], + dims=["channel"], + coords={"channel": ["chA", "chB"]}, + ), }, dict( { p_name: xr.DataArray( - [[123], [456]], + DATA, dims=["channel", "ping_time"], - coords={"channel": ["chA", "chB"], "ping_time": [1]}, + coords={"channel": ["chA", "chB"], "ping_time": TIME_COORDINATES}, ) for p_name in [ - "sa_correction", "gain_correction", "equivalent_beam_angle", - "angle_offset_alongship", "angle_offset_athwartship", - "angle_sensitivity_alongship", "angle_sensitivity_athwartship", - "beamwidth_alongship", "beamwidth_athwartship", + "sa_correction", + "gain_correction", + "equivalent_beam_angle", + "angle_offset_alongship", + "angle_offset_athwartship", + "angle_sensitivity_alongship", + "angle_sensitivity_athwartship", + "beamwidth_alongship", + "beamwidth_athwartship", ] }, **{ "gain_correction": xr.DataArray( - [555, 777], dims=["channel"], coords={"channel": ["chA", "chB"]}, + [555, 777], + dims=["channel"], + coords={"channel": ["chA", "chB"]}, ), "sa_correction": xr.DataArray( [111, 222], dims=["channel"], coords={"channel": ["chA", "chB"]} @@ -572,15 +725,18 @@ def test_get_cal_params_EK80_BB(beam_EK, vend_EK, freq_center, user_dict, out_di ], ids=[ "in_da", - ] + ], ) def test_get_cal_params_EK60(beam_EK, vend_EK, freq_center, user_dict, out_dict): # Remove some variables from Vendor group to mimic EK60 data vend_EK = vend_EK.drop("impedance_transceiver").drop("transceiver_type") cal_dict = get_cal_params_EK( - waveform_mode="CW", freq_center=freq_center, - beam=beam_EK, vend=vend_EK, - user_dict=user_dict, sonar_type="EK60" + waveform_mode="CW", + freq_center=freq_center, + beam=beam_EK, + vend=vend_EK, + user_dict=user_dict, + sonar_type="EK60", ) for p_name, p_val in cal_dict.items(): # remove name for all da @@ -597,40 +753,114 @@ def test_get_cal_params_EK60(beam_EK, vend_EK, freq_center, user_dict, out_dict) ( "sa_correction", xr.DataArray( - np.array([[64, 256, 128, 512], [512, 1024, 256, 128]]).T, + np.array( + [ + [64, 256, 128, 512], + [512, 1024, 256, 128], + ] + ).T, dims=["ping_time", "channel"], coords={"ping_time": [1, 2, 3, 4], "channel": ["chA", "chB"]}, name="transmit_duration_nominal", ).to_dataset(), xr.DataArray( - np.array([[10, 30, 20, 40], [130, 140, 120, 110]]).T, + np.array( + [ + [10, 30, 20, 40], + [130, 140, 120, 110], + ] + ).T, dims=["ping_time", "channel"], coords={"ping_time": [1, 2, 3, 4], "channel": ["chA", "chB"]}, name="sa_correction", ).astype(np.float64), ), + # no NaN entry in transmit_duration_nominal but channel order is different in vend and beam + ( + "sa_correction", + xr.DataArray( + np.array( + [ + [512, 1024, 256, 128], + [64, 256, 128, 512], + ] + ).T, + dims=["ping_time", "channel"], + coords={"ping_time": [1, 2, 3, 4], "channel": ["chB", "chA"]}, + name="transmit_duration_nominal", + ).to_dataset(), + xr.DataArray( + np.array( + [ + [130, 140, 120, 110], + [10, 30, 20, 40], + ] + ).T, + dims=["ping_time", "channel"], + coords={"ping_time": [1, 2, 3, 4], "channel": ["chB", "chA"]}, + name="sa_correction", + ).astype(np.float64), + ), # with NaN entry in transmit_duration_nominal ( "sa_correction", xr.DataArray( - np.array([[64, np.nan, 128, 512], [512, 1024, 256, np.nan]]).T, + np.array( + [ + [64, np.nan, 128, 512], + [512, 1024, 256, np.nan], + ] + ).T, dims=["ping_time", "channel"], coords={"ping_time": [1, 2, 3, 4], "channel": ["chA", "chB"]}, name="transmit_duration_nominal", ).to_dataset(), xr.DataArray( - np.array([[10, np.nan, 20, 40], [130, 140, 120, np.nan]]).T, + np.array( + [ + [10, np.nan, 20, 40], + [130, 140, 120, np.nan], + ] + ).T, dims=["ping_time", "channel"], coords={"ping_time": [1, 2, 3, 4], "channel": ["chA", "chB"]}, name="sa_correction", ), ), + # with NaN entry in transmit_duration_nominal but channel order is different in vend and beam + ( + "sa_correction", + xr.DataArray( + np.array( + [ + [512, 1024, 256, np.nan], + [64, np.nan, 128, 512], + ] + ).T, + dims=["ping_time", "channel"], + coords={"ping_time": [1, 2, 3, 4], "channel": ["chB", "chA"]}, + name="transmit_duration_nominal", + ).to_dataset(), + xr.DataArray( + np.array( + [ + [130, 140, 120, np.nan], + [10, np.nan, 20, 40], + ] + ).T, + dims=["ping_time", "channel"], + coords={"ping_time": [1, 2, 3, 4], "channel": ["chB", "chA"]}, + name="sa_correction", + ), + ), ], ids=[ - "in_no_nan", - "in_with_nan", - ] + "in_no_nan_channel_order_same", + "in_no_nan_channel_order_diff", + "in_with_nan_channel_order_same", + "in_with_nan_channel_order_diff", + ], ) def test_get_vend_cal_params_power(vend_EK, beam, param, da_output): da_param = get_vend_cal_params_power(beam, vend_EK, param) - assert da_param.identical(da_output) + assert_allclose(da_param, da_output)