Skip to content

Commit

Permalink
Merge pull request #1297 from OSOceanAcoustics/dev
Browse files Browse the repository at this point in the history
* Bump actions/cache from 3.0.11 to 3.2.0 (#18)

Bumps [actions/cache](https://github.com/actions/cache) from 3.0.11 to 3.2.0.
- [Release notes](https://github.com/actions/cache/releases)
- [Changelog](https://github.com/actions/cache/blob/main/RELEASES.md)
- [Commits](actions/cache@v3.0.11...v3.2.0)

---
updated-dependencies:
- dependency-name: actions/cache
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Update compress_pulse to Parallelize Convolution (#1208)

Update compress_pulse to Parallelize Convolution. 
This addresses #1164.

* Optimize `harmonize_env_param_time` (#1235)

* Remove extra elses

* Ensure harmonize_env_param_time works with dask arrays

* Remove unused import

* Optimize Frequency Differencing with Dask (#1198)

* Update frequency_differencing method: add dask optimizations if Sv data is dask array

* Update test_frequency_differencing function and incorporate test cases for dask

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fix ci error

* Update echopype/mask/api.py

Co-authored-by: Don Setiawan <[email protected]>

* Update echopype/mask/api.py: assign attrs out of if-else

* Update echopype/mask/api.py: remove unused code

* refactor: Update dask chunks iter to use xr.map_blocks

* refactor: Cleanup code and make more explicit

* Add better comments

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fix pre-commit ci errors

* refactor: Update algorithm to use map_blocks

Update the looping algorithm to use dask array
map_block instead. This would remove direct slicing
ambiguity and prepare for future xarray
map_block usage.

* refactor: Modify to have no hardcoded channel slicing

* refactor: Assign Sv data array

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Don Setiawan <[email protected]>

* Optimize get vend cal params power (#1285)

* Optimize get vend cal params power (#2)

* Refactor function to avoid expand_dims and sortby if required

* test: increase co-ordinates to 1000 for ping_time dimension

* style: upper-casing variables

* style: upper-casing variables

* style: revert

---------

Co-authored-by: Anant Mittal <[email protected]>

* test: update test_get_interp_da() to test on 200 time_coordinates (#3)

* test: refactor test data (#4)

* test: refactor test data (#5)

* test: refactor test data

* test: refactor test data

---------

Co-authored-by: Anant Mittal <[email protected]>

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Anant Mittal <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Don Setiawan <[email protected]>
Co-authored-by: anujsinha3 <[email protected]>
  • Loading branch information
6 people authored Apr 8, 2024
2 parents 646759e + 84a87d9 commit 57ec40d
Show file tree
Hide file tree
Showing 8 changed files with 1,372 additions and 478 deletions.
14 changes: 7 additions & 7 deletions echopype/calibrate/cal_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def get_vend_cal_params_power(beam: xr.Dataset, vend: xr.Dataset, param: str) ->
raise ValueError(f"{param} does not exist in the Vendor_specific group!")

# Find idx to select the corresponding param value
# by matching beam["transmit_duration_nominal"] with ds_vend["pulse_length"]
# by matching beam["transmit_duration_nominal"] with vend["pulse_length"]
transmit_isnull = beam["transmit_duration_nominal"].isnull()
idxmin = np.abs(beam["transmit_duration_nominal"] - vend["pulse_length"]).idxmin(
dim="pulse_length_bin"
Expand All @@ -297,13 +297,13 @@ def get_vend_cal_params_power(beam: xr.Dataset, vend: xr.Dataset, param: str) ->
idxmin = idxmin.where(~transmit_isnull, 0).astype(int)

# Get param dataarray into correct shape
da_param = (
vend[param]
.expand_dims(dim={"ping_time": idxmin["ping_time"]}) # expand dims for direct indexing
.sortby(idxmin.channel) # sortby in case channel sequence differs in vend and beam
)
da_param = vend[param].transpose("pulse_length_bin", "channel")

if not np.array_equal(da_param.channel.data, idxmin.channel.data):
da_param = da_param.sortby(
da_param.channel, ascending=False
) # sortby because channel sequence differs in vend and beam

# Select corresponding index and clean up the original nan elements
da_param = da_param.sel(pulse_length_bin=idxmin, drop=True)

# Set the nan elements back to nan.
Expand Down
25 changes: 9 additions & 16 deletions echopype/calibrate/ek80_complex.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,34 +273,27 @@ def compress_pulse(backscatter: xr.DataArray, chirp: Dict) -> xr.DataArray:
A data array containing pulse compression output.
"""
pc_all = []

for chan in backscatter["channel"]:
# Select channel `chan` and drop the specific beam dimension if all of the values are nan.
backscatter_chan = backscatter.sel(channel=chan).dropna(dim="beam", how="all")

tx = chirp[str(chan.values)]
replica = np.flipud(np.conj(tx))

pc = xr.apply_ufunc(
lambda m: np.apply_along_axis(
lambda m: (signal.convolve(m, replica, mode="full")[tx.size - 1 :]),
axis=2,
arr=m,
),
lambda m: (signal.convolve(m, replica, mode="full")[replica.size - 1 :]),
backscatter_chan,
input_core_dims=[["range_sample"]],
output_core_dims=[["range_sample"]],
# exclude_dims={"range_sample"},
)
dask="parallelized",
vectorize=True,
output_dtypes=[np.complex64],
).compute()

pc_all.append(pc)

pc_all = xr.DataArray(
pc_all,
coords={
"channel": backscatter["channel"],
"ping_time": backscatter["ping_time"],
"beam": backscatter["beam"],
"range_sample": backscatter["range_sample"],
},
)
pc_all = xr.concat(pc_all, dim="channel")

return pc_all

Expand Down
29 changes: 15 additions & 14 deletions echopype/calibrate/env_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,24 @@ def harmonize_env_param_time(
if isinstance(p, xr.DataArray):
if "time1" not in p.coords:
return p
else:
# If there's only 1 time1 value,
# or if after dropping NaN there's only 1 time1 value
if p["time1"].size == 1 or p.dropna(dim="time1").size == 1:
return p.dropna(dim="time1").squeeze(dim="time1").drop("time1")

# Direct assignment if all timestamps are identical (EK60 data)
elif np.all(p["time1"].values == ping_time.values):
return p.rename({"time1": "ping_time"})
# If there's only 1 time1 value,
# or if after dropping NaN there's only 1 time1 value
if p["time1"].size == 1 or p.dropna(dim="time1").size == 1:
return p.dropna(dim="time1").squeeze(dim="time1").drop("time1")

elif ping_time is None:
raise ValueError(f"ping_time needs to be provided for interpolating {p.name}")
if ping_time is None:
raise ValueError(
f"ping_time needs to be provided for comparison or interpolating {p.name}"
)

else:
return p.dropna(dim="time1").interp(time1=ping_time)
else:
return p
# Direct assignment if all timestamps are identical (EK60 data)
if np.array_equal(p["time1"].data, ping_time.data):
return p.rename({"time1": "ping_time"})

# Interpolate `p` to `ping_time`
return p.dropna(dim="time1").interp(time1=ping_time)
return p


def sanitize_user_env_dict(
Expand Down
87 changes: 73 additions & 14 deletions echopype/mask/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import pathlib
from typing import List, Optional, Union

import dask
import dask.array
import numpy as np
import xarray as xr

Expand Down Expand Up @@ -481,8 +483,8 @@ def frequency_differencing(
>>> Sv_ds = xr.Dataset(data_vars={"Sv": Sv_da, "frequency_nominal": freq_nom})
...
>>> # compute frequency-differencing mask using channel names
>>> echopype.mask.frequency_differencing(source_Sv=mock_Sv_ds, storage_options={},
... freqABEq=None, chanABEq = '"chan1" - "chan2">=10.0')
>>> echopype.mask.frequency_differencing(source_Sv=Sv_ds, storage_options={},
... freqABEq=None, chanABEq = '"chan1" - "chan2">=10.0dB')
<xarray.DataArray 'mask' (ping_time: 5, range_sample: 5)>
array([[False, False, False, False, False],
[False, False, False, False, False],
Expand Down Expand Up @@ -523,23 +525,80 @@ def frequency_differencing(
chanA = chanAB[0]
chanB = chanAB[1]

# get the left-hand side of condition
lhs = source_Sv["Sv"].sel(channel=chanA) - source_Sv["Sv"].sel(channel=chanB)
def _get_lhs(
Sv_block: np.ndarray, chanA_idx: int, chanB_idx: int, chan_dim_idx: int = 0
) -> np.ndarray:
"""Get left-hand side of condition"""

# create mask using operator lookup table
da = xr.where(str2ops[operator](lhs, diff), True, False)
def _sel_channel(chan_idx):
return tuple(
[chan_idx if i == chan_dim_idx else slice(None) for i in range(Sv_block.ndim)]
)

# get the left-hand side of condition (lhs)
return Sv_block[_sel_channel(chanA_idx)] - Sv_block[_sel_channel(chanB_idx)]

def _create_mask(lhs: np.ndarray, diff: float) -> np.ndarray:
"""Create mask using operator lookup table"""
return xr.where(str2ops[operator](lhs, diff), True, False)

# Get the Sv data array
Sv_data_array = source_Sv["Sv"]

# Determine channel index based on names
channels = list(source_Sv["channel"].to_numpy())
chanA_idx = channels.index(chanA)
chanB_idx = channels.index(chanB)
# Get the channel dimension index for filtering
chan_dim_idx = Sv_data_array.dims.index("channel")

# If Sv data is not dask array
if not isinstance(Sv_data_array.variable._data, dask.array.Array):
# get the left-hand side of condition
lhs = _get_lhs(Sv_data_array, chanA_idx, chanB_idx, chan_dim_idx=chan_dim_idx)

# create mask using operator lookup table
da = _create_mask(lhs, diff)
# If Sv data is dask array
else:
# Get the final data array template
template = Sv_data_array.isel(channel=0).drop_vars("channel")

dask_array_data = Sv_data_array.data
# Perform block wise computation
dask_array_result = (
dask_array_data
# Compute the left-hand side of condition
# drop the first axis (channel) as it is dropped in the result
.map_blocks(
_get_lhs,
chanA_idx,
chanB_idx,
chan_dim_idx=chan_dim_idx,
dtype=dask_array_data.dtype,
drop_axis=0,
)
# create mask using operator lookup table
.map_blocks(_create_mask, diff)
)

# Create DataArray of the result
da = xr.DataArray(
data=dask_array_result,
coords=template.coords,
)

xr_dataarray_attrs = {
"mask_type": "frequency differencing",
"history": f"{datetime.datetime.utcnow()} +00:00. "
"Mask created by mask.frequency_differencing. "
f"Operation: Sv['{chanA}'] - Sv['{chanB}'] {operator} {diff}",
}

# assign a name to DataArray
da.name = "mask"

# assign provenance attributes
mask_attrs = {"mask_type": "frequency differencing"}
history_attr = (
f"{datetime.datetime.utcnow()} +00:00. "
"Mask created by mask.frequency_differencing. "
f"Operation: Sv['{chanA}'] - Sv['{chanB}'] {operator} {diff}"
)

da = da.assign_attrs({**mask_attrs, **{"history": history_attr}})
da = da.assign_attrs(xr_dataarray_attrs)

return da
Loading

0 comments on commit 57ec40d

Please sign in to comment.