Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for consolidate subpackage functions to accept both in-memory or stored datasets #1216

Merged
merged 12 commits into from
Dec 18, 2023
104 changes: 63 additions & 41 deletions echopype/consolidate/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import pathlib
from pathlib import Path
from typing import Optional, Union

import numpy as np
Expand All @@ -8,14 +9,14 @@
from ..calibrate.ek80_complex import get_filter_coeff
from ..echodata import EchoData
from ..echodata.simrad import retrieve_correct_beam_group
from ..utils.io import validate_source_ds_da
from ..utils.io import get_file_format, open_source
from ..utils.prov import add_processing_level
from .split_beam_angle import add_angle_to_ds, get_angle_complex_samples, get_angle_power_samples
from .split_beam_angle import get_angle_complex_samples, get_angle_power_samples

POSITION_VARIABLES = ["latitude", "longitude"]


def swap_dims_channel_frequency(ds: xr.Dataset) -> xr.Dataset:
def swap_dims_channel_frequency(ds: Union[xr.Dataset, str, pathlib.Path]) -> xr.Dataset:
"""
Use frequency_nominal in place of channel to be dataset dimension and coorindate.

Expand All @@ -24,8 +25,9 @@ def swap_dims_channel_frequency(ds: xr.Dataset) -> xr.Dataset:

Parameters
----------
ds : xr.Dataset
Dataset for which the dimension will be swapped
ds : xr.Dataset or str or pathlib.Path
Dataset or path to a file containing the Dataset
for which the dimension will be swapped

Returns
-------
Expand All @@ -35,6 +37,7 @@ def swap_dims_channel_frequency(ds: xr.Dataset) -> xr.Dataset:
-----
This operation is only possible when there are no duplicated frequencies present in the file.
"""
ds = open_source(ds, "dataset", {})
# Only possible if no duplicated frequencies
if np.unique(ds["frequency_nominal"]).size == ds["frequency_nominal"].size:
return (
Expand All @@ -50,7 +53,7 @@ def swap_dims_channel_frequency(ds: xr.Dataset) -> xr.Dataset:


def add_depth(
ds: xr.Dataset,
ds: Union[xr.Dataset, str, pathlib.Path],
depth_offset: float = 0,
tilt: float = 0,
downward: bool = True,
Expand All @@ -64,8 +67,9 @@ def add_depth(

Parameters
----------
ds : xr.Dataset
Source Sv dataset to which a depth variable will be added.
ds : xr.Dataset or str or pathlib.Path
Source Sv dataset or path to a file containing the Source Sv dataset
to which a depth variable will be added.
Must contain `echo_range`.
depth_offset : float
Offset along the vertical (depth) dimension to account for actual transducer
Expand Down Expand Up @@ -114,6 +118,7 @@ def add_depth(
# else:
# tilt = 0

ds = open_source(ds, "dataset", {})
# Multiplication factor depending on if transducers are pointing downward
mult = 1 if downward else -1

Expand All @@ -132,7 +137,11 @@ def add_depth(


@add_processing_level("L2A")
def add_location(ds: xr.Dataset, echodata: EchoData = None, nmea_sentence: Optional[str] = None):
def add_location(
ds: Union[xr.Dataset, str, pathlib.Path],
echodata: Optional[Union[EchoData, str, pathlib.Path]],
nmea_sentence: Optional[str] = None,
):
"""
Add geographical location (latitude/longitude) to the Sv dataset.

Expand All @@ -142,10 +151,12 @@ def add_location(ds: xr.Dataset, echodata: EchoData = None, nmea_sentence: Optio

Parameters
----------
ds : xr.Dataset
An Sv or MVBS dataset for which the geographical locations will be added to
echodata
An `EchoData` object holding the raw data
ds : xr.Dataset or str or pathlib.Path
An Sv or MVBS dataset or path to a file containing the Sv or MVBS
dataset for which the geographical locations will be added to
echodata : EchoData or str or pathlib.Path
An ``EchoData`` object or path to a file containing the ``EchoData``
object holding the raw data
nmea_sentence
NMEA sentence to select a subset of location data (optional)

Expand Down Expand Up @@ -174,6 +185,9 @@ def sel_interp(var, time_dim_name):
# Values may be nan if there are ping_time values outside the time_dim_name range
return position_var.interp(**{time_dim_name: ds["ping_time"]})

ds = open_source(ds, "dataset", {})
echodata = open_source(echodata, "echodata", {})

if "longitude" not in echodata["Platform"] or echodata["Platform"]["longitude"].isnull().all():
raise ValueError("Coordinate variables not present or all nan")

Expand All @@ -198,12 +212,12 @@ def sel_interp(var, time_dim_name):

def add_splitbeam_angle(
source_Sv: Union[xr.Dataset, str, pathlib.Path],
echodata: EchoData,
echodata: Union[EchoData, str, pathlib.Path],
waveform_mode: str,
encode_mode: str,
pulse_compression: bool = False,
storage_options: dict = {},
return_dataset: bool = True,
to_disk: bool = True,
) -> xr.Dataset:
"""
Add split-beam (alongship/athwartship) angles into the Sv dataset.
Expand All @@ -218,8 +232,9 @@ def add_splitbeam_angle(
source_Sv: xr.Dataset or str or pathlib.Path
The Sv Dataset or path to a file containing the Sv Dataset,
to which the split-beam angles will be added
echodata: EchoData
An ``EchoData`` object holding the raw data
echodata: EchoData or str or pathlib.Path
An ``EchoData`` object or path to a file containing the ``EchoData``
object holding the raw data
waveform_mode : {"CW", "BB"}
Type of transmit waveform

Expand All @@ -240,19 +255,20 @@ def add_splitbeam_angle(
storage_options: dict, default={}
Any additional parameters for the storage backend, corresponding to the
path provided for ``source_Sv``
return_dataset: bool, default=True
If ``True``, ``source_Sv`` with split-beam angles added will be returned.
``return_dataset=False`` is useful when ``source_Sv`` is a path and
to_disk: bool, default=True
If ``False``, ``to_disk`` with split-beam angles added will be returned.
``to_disk=True`` is useful when ``source_Sv`` is a path and
users only want to write the split-beam angle data to this path.

Returns
-------
xr.Dataset or None
If ``return_dataset=False``, nothing will be returned.
If ``return_dataset=True``, either the input dataset ``source_Sv``
If ``to_disk=False``, nothing will be returned.
If ``to_disk=True``, either the input dataset ``source_Sv``
or a lazy-loaded Dataset (from the path ``source_Sv``)
with split-beam angles added will be returned.


Raises
------
ValueError
Expand All @@ -279,6 +295,19 @@ def add_splitbeam_angle(
`echodata`` will be identical. If this is not the case, only angle data corresponding
to channels existing in ``source_Sv`` will be added.
"""
# ensure that when source_Sv is a Dataset then to_disk should be False
if not isinstance(source_Sv, (str, Path)) and to_disk:
raise ValueError(
"The input source_Sv must be a path when to_disk=True, "
"so that the split-beam angles can be written to disk!"
)

# obtain the file format of source_Sv if it is a path
if isinstance(source_Sv, (str, Path)):
source_Sv_type = get_file_format(source_Sv)

source_Sv = open_source(source_Sv, "dataset", storage_options)
echodata = open_source(echodata, "echodata", storage_options)

# ensure that echodata was produced by EK60 or EK80-like sensors
if echodata.sonar_model not in ["EK60", "ES70", "EK80", "ES80", "EA640"]:
Expand All @@ -287,22 +316,6 @@ def add_splitbeam_angle(
"transducers, split-beam angles cannot be added to source_Sv!"
)

# validate the source_Sv type or path (if it is provided)
source_Sv, file_type = validate_source_ds_da(source_Sv, storage_options)

# initialize source_Sv_path
source_Sv_path = None

if isinstance(source_Sv, str):
# store source_Sv path so we can use it to write to later
source_Sv_path = source_Sv

# TODO: In the future we can improve this by obtaining the variable names, channels,
# and dimension lengths directly from source_Sv using zarr or netcdf4. This would
# prevent the unnecessary loading in of the coordinates, which the below statement does.
# open up Dataset using source_Sv path
source_Sv = xr.open_dataset(source_Sv, engine=file_type, chunks={}, **storage_options)

# raise not implemented error if source_Sv corresponds to MVBS
if source_Sv.attrs["processing_function"] == "commongrid.compute_MVBS":
raise NotImplementedError("Adding split-beam data to MVBS has not been implemented!")
Expand Down Expand Up @@ -364,9 +377,18 @@ def add_splitbeam_angle(
theta, phi = get_angle_complex_samples(ds_beam, angle_params)

# add theta and phi to source_Sv input
source_Sv = add_angle_to_ds(
theta, phi, source_Sv, return_dataset, source_Sv_path, file_type, storage_options
)
theta.attrs["long_name"] = "split-beam alongship angle"
phi.attrs["long_name"] = "split-beam athwartship angle"

# add the split-beam angles to the provided Dataset
source_Sv["angle_alongship"] = theta
source_Sv["angle_athwartship"] = phi
if to_disk:
if source_Sv_type == "netcdf4":
source_Sv.to_netcdf(mode="a", **storage_options)
else:
source_Sv.to_zarr(mode="a", **storage_options)
source_Sv = open_source(source_Sv, "dataset", storage_options)

# Add history attribute
history_attr = (
Expand Down
78 changes: 1 addition & 77 deletions echopype/consolidate/split_beam_angle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Contains functions necessary to compute the split-beam (alongship/athwartship)
angles and add them to a Dataset.
"""
from typing import List, Optional, Tuple
from typing import List, Tuple

import numpy as np
import xarray as xr
Expand Down Expand Up @@ -245,79 +245,3 @@ def get_angle_complex_samples(
)

return theta, phi


def add_angle_to_ds(
theta: xr.Dataset,
phi: xr.Dataset,
ds: xr.Dataset,
return_dataset: bool,
source_ds_path: Optional[str] = None,
file_type: Optional[str] = None,
storage_options: dict = {},
) -> Optional[xr.Dataset]:
"""
Adds the split-beam angle data to the provided input ``ds``.

Parameters
----------
theta: xr.Dataset
The calculated split-beam alongship angle
phi: xr.Dataset
The calculated split-beam athwartship angle
ds: xr.Dataset
The Dataset that ``theta`` and ``phi`` will be added to
return_dataset: bool
Whether a dataset will be returned or not
source_ds_path: str, optional
The path to the file corresponding to ``ds``, if it exists
file_type: {"netcdf4", "zarr"}, optional
The file type corresponding to ``source_ds_path``
storage_options: dict, default={}
Any additional parameters for the storage backend, corresponding to the
path ``source_ds_path``

Returns
-------
xr.Dataset or None
If ``return_dataset=False``, nothing will be returned. If ``return_dataset=True``
either the input dataset ``ds`` or a lazy-loaded Dataset (obtained from
the path provided by ``source_ds_path``) with the split-beam angle data added
will be returned.
"""

# TODO: do we want to add anymore attributes to these variables?
# add appropriate attributes to theta and phi
theta.attrs["long_name"] = "split-beam alongship angle"
phi.attrs["long_name"] = "split-beam athwartship angle"

if source_ds_path is not None:
# put the variables into a Dataset, so they can be written at the same time
# add ds attributes to splitb_ds since they will be overwritten by to_netcdf/zarr
splitb_ds = xr.Dataset(
data_vars={"angle_alongship": theta, "angle_athwartship": phi},
coords=theta.coords,
attrs=ds.attrs,
)

# release any resources linked to ds (necessary for to_netcdf)
ds.close()

# write the split-beam angle data to the provided path
if file_type == "netcdf4":
splitb_ds.to_netcdf(path=source_ds_path, mode="a", **storage_options)
else:
splitb_ds.to_zarr(store=source_ds_path, mode="a", **storage_options)

if return_dataset:
# open up and return Dataset in source_ds_path
return xr.open_dataset(source_ds_path, engine=file_type, chunks={}, **storage_options)

else:
# add the split-beam angles to the provided Dataset
ds["angle_alongship"] = theta
ds["angle_athwartship"] = phi

if return_dataset:
# return input dataset with split-beam angle data
return ds
7 changes: 6 additions & 1 deletion echopype/echodata/echodata.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from ..echodata.utils_platform import _clip_by_time_dim, get_mappings_expanded
from ..utils.coding import sanitize_dtypes, set_time_encodings
from ..utils.io import check_file_existence, delete_zarr_store, sanitize_file_path
from ..utils.log import _init_logger
from ..utils.prov import add_processing_level
from .convention import sonarnetcdf_1
Expand Down Expand Up @@ -93,6 +92,8 @@ def cleanup_swap_files(self):
v.store for k, v in dask_graph.items() if "original-from-zarr" in k
]
fs = zarr_stores[0].fs
from ..utils.io import delete_zarr_store

praneethratna marked this conversation as resolved.
Show resolved Hide resolved
for store in zarr_stores:
delete_zarr_store(store, fs)

Expand Down Expand Up @@ -486,11 +487,15 @@ def _load_file(self, raw_path: "PathHint"):

def _check_path(self, filepath: "PathHint"):
"""Check if converted_raw_path exists"""
from ..utils.io import check_file_existence

file_exists = check_file_existence(filepath, self.storage_options)
if not file_exists:
raise FileNotFoundError(f"There is no file named {filepath}")

def _sanitize_path(self, filepath: "PathHint") -> "PathHint":
from ..utils.io import sanitize_file_path

filepath = sanitize_file_path(filepath, self.storage_options)
return filepath

Expand Down
Loading