diff --git a/echopype/convert/api.py b/echopype/convert/api.py index fbc9e8766..107af1bc0 100644 --- a/echopype/convert/api.py +++ b/echopype/convert/api.py @@ -17,6 +17,7 @@ from ..utils.coding import COMPRESSION_SETTINGS from ..utils.log import _init_logger from ..utils.prov import add_processing_level +from .utils.ek import should_use_swap BEAM_SUBGROUP_DEFAULT = "Beam_group1" @@ -315,6 +316,8 @@ def open_raw( convert_params: Optional[Dict[str, str]] = None, storage_options: Optional[Dict[str, str]] = None, use_swap: bool = False, + destination_path: Optional[str] = None, + destination_storage_options: Optional[Dict[str, str]] = None, max_mb: int = 100, ) -> Optional[EchoData]: """Create an EchoData object containing parsed data from a single raw data file. @@ -342,11 +345,21 @@ def open_raw( convert_params : dict parameters (metadata) that may not exist in the raw file and need to be added to the converted file - storage_options : dict + storage_options : dict, optional options for cloud storage use_swap: bool + **DEPRECATED: This flag is ignored** If True, variables with a large memory footprint will be written to a temporary zarr store at ``~/.echopype/temp_output/parsed2zarr_temp_files`` + destination_path: str + The path to a swap directory in a case of a large memory footprint. + This path can be a remote path like s3://bucket/swap_dir. + By default, it will create a temporary zarr store at + ``~/.echopype/temp_output/parsed2zarr_temp_files`` if needed, + when set to "auto". + destination_storage_options: dict, optional + Options for remote storage for the swap directory ``destination_path`` + argument. max_mb : int The maximum data chunk size in Megabytes (MB), when offloading variables with a large memory footprint to a temporary zarr store @@ -369,10 +382,23 @@ def open_raw( Notes ----- - ``use_swap=True`` is only available for the following + In a case of a large memory footprint, the program will determine if using + a temporary swap space is needed. If so, it will use that space + during conversion to prevent out of memory errors. + Users can override this behaviour by either passing ``"swap"`` or ``"no_swap"`` + into the ``destination_path`` argument. + This feature is only available for the following echosounders: EK60, ES70, EK80, ES80, EA640. Additionally, this feature is currently in beta. """ + + # Initially set use_swap False + use_swap = False + + # Set initial destination_path of "no_swap" + if destination_path is None: + destination_path = "no_swap" + if raw_file is None: raise FileNotFoundError("The path to the raw data file must be specified.") @@ -402,15 +428,6 @@ def open_raw( # Check file extension and existence file_chk, xml_chk = _check_file(raw_file, sonar_model, xml_path, storage_options) - # TODO: remove once 'auto' option is added - if not isinstance(use_swap, bool): - raise ValueError("use_swap must be of type bool.") - - # Ensure use_swap is 'auto', if it is a string - # TODO: use the following when we allow for 'auto' option - # if isinstance(use_swap, str) and use_swap != "auto": - # raise ValueError("use_swap must be a bool or equal to 'auto'.") - # TODO: the if-else below only works for the AZFP vs EK contrast, # but is brittle since it is abusing params by using it implicitly if SONAR_MODELS[sonar_model]["xml"]: @@ -423,29 +440,49 @@ def open_raw( # Parse raw file and organize data into groups parser = SONAR_MODELS[sonar_model]["parser"]( - file_chk, params=params, storage_options=storage_options, dgram_zarr_vars=dgram_zarr_vars + file_chk, + params=params, + storage_options=storage_options, + dgram_zarr_vars=dgram_zarr_vars, + sonar_model=sonar_model, ) parser.parse_raw() # Direct offload to zarr and rectangularization only available for some sonar models if sonar_model in ["EK60", "ES70", "EK80", "ES80", "EA640"]: - # Create sonar_model-specific p2z object - p2z = SONAR_MODELS[sonar_model]["parsed2zarr"](parser) - - # Determines if writing to zarr is necessary and writes to zarr - p2z_flag = use_swap is True or ( - use_swap == "auto" and p2z.whether_write_to_zarr(mem_mult=0.4) - ) - - if p2z_flag: - p2z.datagram_to_zarr(max_mb=max_mb) - # Rectangularize the transmit data - parser.rectangularize_transmit_ping_data(data_type="complex") + swap_map = { + "swap": True, + "no_swap": False, + } + if destination_path == "auto": + # Overwrite use_swap if it's True below + # Use local swap directory + use_swap = should_use_swap(parser.zarr_datagrams, dgram_zarr_vars, mem_mult=0.4) + elif destination_path in swap_map: + use_swap = swap_map[destination_path] + else: + # TODO: Add docstring about swap path + use_swap = True + if "://" in destination_path and destination_storage_options is None: + raise ValueError( + ( + "Please provide storage options for remote destination. ", + "If access is already configured locally, ", + "simply put an empty dictionary.", + ) + ) + + if use_swap: + # Create sonar_model-specific p2z object + p2z = SONAR_MODELS[sonar_model]["parsed2zarr"](parser) + p2z.datagram_to_zarr( + dest_path=destination_path, + dest_storage_options=destination_storage_options, + max_mb=max_mb, + ) else: - del p2z - # Create general p2z object - p2z = Parsed2Zarr(parser) + p2z = Parsed2Zarr(parser) # Create general p2z object parser.rectangularize_data() else: diff --git a/echopype/convert/parse_ad2cp.py b/echopype/convert/parse_ad2cp.py index 3b5fcea70..6c5c796fd 100644 --- a/echopype/convert/parse_ad2cp.py +++ b/echopype/convert/parse_ad2cp.py @@ -219,8 +219,8 @@ class NoMorePackets(Exception): class ParseAd2cp(ParseBase): - def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}): - super().__init__(file, storage_options) + def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}, sonar_model="AD2CP"): + super().__init__(file, storage_options, sonar_model) self.config = None self.packets: List[Ad2cpDataPacket] = [] diff --git a/echopype/convert/parse_azfp.py b/echopype/convert/parse_azfp.py index f1ef32bb6..2c031a53f 100644 --- a/echopype/convert/parse_azfp.py +++ b/echopype/convert/parse_azfp.py @@ -73,8 +73,8 @@ class ParseAZFP(ParseBase): HEADER_FORMAT = ">HHHHIHHHHHHHHHHHHHHHHHHHHHHHHHHHHHBBBBHBBBBBBBBHHHHHHHHHHHHHHHHHHHH" FILE_TYPE = 64770 - def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}): - super().__init__(file, storage_options) + def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}, sonar_model="AZFP"): + super().__init__(file, storage_options, sonar_model) # Parent class attributes # regex pattern used to grab datetime embedded in filename self.timestamp_pattern = FILENAME_DATETIME_AZFP diff --git a/echopype/convert/parse_base.py b/echopype/convert/parse_base.py index e14ae276a..3f4d960e4 100644 --- a/echopype/convert/parse_base.py +++ b/echopype/convert/parse_base.py @@ -17,12 +17,14 @@ class ParseBase: """Parent class for all convert classes.""" - def __init__(self, file, storage_options): + def __init__(self, file, storage_options, sonar_model): self.source_file = file self.timestamp_pattern = None # regex pattern used to grab datetime embedded in filename self.ping_time = [] # list to store ping time self.storage_options = storage_options self.zarr_datagrams = [] # holds all parsed datagrams + self.zarr_tx_datagrams = [] # holds all parsed transmit datagrams + self.sonar_model = sonar_model def _print_status(self): """Prints message to console giving information about the raw file being parsed.""" @@ -31,8 +33,8 @@ def _print_status(self): class ParseEK(ParseBase): """Class for converting data from Simrad echosounders.""" - def __init__(self, file, params, storage_options, dgram_zarr_vars): - super().__init__(file, storage_options) + def __init__(self, file, params, storage_options, dgram_zarr_vars, sonar_model): + super().__init__(file, storage_options, sonar_model) # Parent class attributes # regex pattern used to grab datetime embedded in filename @@ -78,6 +80,10 @@ def rectangularize_data(self): for dgram in self.zarr_datagrams: self._append_channel_ping_data(dgram, zarr_vars=False) + # append zarr transmit datagrams to channel ping data + for dgram in self.zarr_tx_datagrams: + self._append_channel_ping_data(dgram, rx=False, zarr_vars=False) + # Rectangularize all data and convert to numpy array indexed by channel for data_type in ["power", "angle", "complex"]: # Receive data @@ -350,7 +356,7 @@ def _read_datagrams(self, fid): else: logger.info("Unknown datagram type: " + str(new_datagram["type"])) - def _append_zarr_dgram(self, full_dgram: dict): + def _append_zarr_dgram(self, full_dgram: dict, rx: bool): """ Selects a subset of the datagram values that need to be sent directly to a zarr file and @@ -389,7 +395,10 @@ def _append_zarr_dgram(self, full_dgram: dict): reduced_datagram["power"] = reduced_datagram["power"].astype("float32") * INDEX2POWER if reduced_datagram: - self.zarr_datagrams.append(reduced_datagram) + if rx: + self.zarr_datagrams.append(reduced_datagram) + else: + self.zarr_tx_datagrams.append(reduced_datagram) def _append_channel_ping_data(self, datagram, rx=True, zarr_vars=True): """ @@ -411,10 +420,10 @@ def _append_channel_ping_data(self, datagram, rx=True, zarr_vars=True): ch_id = datagram["channel_id"] if "channel_id" in datagram else datagram["channel"] # append zarr variables, if they exist - if zarr_vars and rx: + if zarr_vars: common_vars = set(self.dgram_zarr_vars.keys()).intersection(set(datagram.keys())) if common_vars: - self._append_zarr_dgram(datagram) + self._append_zarr_dgram(datagram, rx=rx) for var in common_vars: del datagram[var] diff --git a/echopype/convert/parse_ek60.py b/echopype/convert/parse_ek60.py index 9502c3214..a8666a386 100644 --- a/echopype/convert/parse_ek60.py +++ b/echopype/convert/parse_ek60.py @@ -4,8 +4,8 @@ class ParseEK60(ParseEK): """Class for converting data from Simrad EK60 echosounders.""" - def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}): - super().__init__(file, params, storage_options, dgram_zarr_vars) + def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}, sonar_model="EK60"): + super().__init__(file, params, storage_options, dgram_zarr_vars, sonar_model) def _select_datagrams(self, params): # Translates user input into specific datagrams or ALL diff --git a/echopype/convert/parse_ek80.py b/echopype/convert/parse_ek80.py index 55027ffe3..583fac8c4 100644 --- a/echopype/convert/parse_ek80.py +++ b/echopype/convert/parse_ek80.py @@ -4,8 +4,8 @@ class ParseEK80(ParseEK): """Class for converting data from Simrad EK80 echosounders.""" - def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}): - super().__init__(file, params, storage_options, dgram_zarr_vars) + def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}, sonar_model="EK80"): + super().__init__(file, params, storage_options, dgram_zarr_vars, sonar_model) self.environment = {} # dictionary to store environment data def _select_datagrams(self, params): diff --git a/echopype/convert/parsed_to_zarr.py b/echopype/convert/parsed_to_zarr.py index 95fde46c9..9e8a302a9 100644 --- a/echopype/convert/parsed_to_zarr.py +++ b/echopype/convert/parsed_to_zarr.py @@ -1,14 +1,28 @@ import secrets import sys -from pathlib import Path -from typing import List, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union -import more_itertools as miter +import dask.array +import fsspec import numpy as np import pandas as pd +import xarray as xr import zarr -from ..utils.io import ECHOPYPE_DIR, check_file_permissions +from ..echodata.convention import sonarnetcdf_1 +from ..utils.io import ECHOPYPE_DIR, check_file_permissions, validate_output_path + +DEFAULT_ZARR_TEMP_DIR = ECHOPYPE_DIR / "temp_output" / "parsed2zarr_temp_files" + + +def _create_zarr_store_map(path, storage_options): + file_path = validate_output_path( + source_file=secrets.token_hex(16), + engine="zarr", + save_path=path, + output_storage_options=storage_options, + ) + return fsspec.get_mapper(file_path, **storage_options) class Parsed2Zarr: @@ -26,53 +40,63 @@ def __init__(self, parser_obj): self.zarr_root = None self.parser_obj = parser_obj # parser object ParseEK60/ParseEK80/etc. - def _create_zarr_info(self): + self._varattrs = sonarnetcdf_1.yaml_dict["variable_and_varattributes"] + + if hasattr(self.parser_obj, "sonar_model"): + self.sonar_model = self.parser_obj.sonar_model + + def _create_zarr_info( + self, dest_path: str = None, dest_storage_options: Optional[Dict] = None, retries: int = 10 + ): """ Creates the temporary directory for zarr storage, zarr file name, zarr store, and the root group of the zarr store. """ - # get current working directory - current_dir = Path.cwd() - - # Check permission of cwd, raise exception if no permission - check_file_permissions(current_dir) + if dest_path is None: + # Check permission of cwd, raise exception if no permission + check_file_permissions(ECHOPYPE_DIR) - # construct temporary directory that will hold the zarr file - out_dir = current_dir / ECHOPYPE_DIR / "temp_output" / "parsed2zarr_temp_files" - if not out_dir.exists(): - out_dir.mkdir(parents=True) + # construct temporary directory that will hold the zarr file + dest_path = DEFAULT_ZARR_TEMP_DIR + if not dest_path.exists(): + dest_path.mkdir(parents=True) # establish temporary directory we will write zarr files to - self.temp_zarr_dir = str(out_dir) - - # create zarr store name - zarr_file_name = str(out_dir / secrets.token_hex(16)) + ".zarr" - - # attempt to find different zarr_file_name, if it already exists - count = 0 - while Path(zarr_file_name).exists() and count < 10: - # generate new zarr_file_name - zarr_file_name = str(out_dir / secrets.token_hex(16)) + ".zarr" - count += 1 - - # error out if we are unable to get a unique name, else assign name to class variable - if (count == 10) and Path(zarr_file_name).exists(): - raise RuntimeError("Unable to construct an unused zarr file name for Parsed2Zarr!") - else: - self.zarr_file_name = zarr_file_name + self.temp_zarr_dir = str(dest_path) + + # Set default storage options if None + if dest_storage_options is None: + dest_storage_options = {} + + # attempt to find different zarr_file_name + attempt = 0 + exists = True + while exists: + zarr_store = _create_zarr_store_map( + path=dest_path, storage_options=dest_storage_options + ) + exists = zarr_store.fs.exists(zarr_store.root) + attempt += 1 + + if attempt == retries and exists: + raise RuntimeError( + ( + "Unable to construct an unused zarr file name for Parsed2Zarr ", + f"after {retries} retries!", + ) + ) # create zarr store and zarr group we want to write to - self.store = zarr.DirectoryStore(self.zarr_file_name) + self.store = zarr_store self.zarr_root = zarr.group(store=self.store, overwrite=True) def _close_store(self): """properly closes zarr store""" - # consolidate metadata and close zarr store + # consolidate metadata zarr.consolidate_metadata(self.store) - self.store.close() @staticmethod def set_multi_index( @@ -383,7 +407,7 @@ def write_df_column( # evenly chunk unique times so that the smallest and largest # chunk differ by at most 1 element - chunks = list(miter.chunked_even(unique_time_ind, max_num_times)) + chunks = np.array_split(unique_time_ind, np.ceil(len(unique_time_ind) / max_num_times)) self.write_chunks(pd_series, zarr_grp, is_array, chunks, chunk_shape) @@ -399,6 +423,225 @@ def _get_zarr_dgrams_size(self) -> int: return size + def _get_channel_ids(self, chan_str: np.ndarray) -> List[str]: + """ + Obtains the channel IDs associated with ``chan_str``. + + Parameters + ---------- + chan_str : np.ndarray + A numpy array of strings corresponding to the + keys of ``config_datagram["transceivers"]`` + + Returns + ------- + A list of strings representing the channel IDS + """ + if self.sonar_model in ["EK60", "ES70"]: + return [ + self.parser_obj.config_datagram["transceivers"][int(i)]["channel_id"] + for i in chan_str + ] + else: + return [ + self.parser_obj.config_datagram["configuration"][i]["channel_id"] for i in chan_str + ] + + @property + def power_dataarray(self) -> xr.DataArray: + """ + Constructs a DataArray from a Dask array for the power + data. + + Returns + ------- + DataArray named "backscatter_r" representing the + power data. + """ + zarr_path = self.store + + # collect variables associated with the power data + power = dask.array.from_zarr(zarr_path, component="power/power") + + pow_time_path = "power/" + self.power_dims[0] + pow_chan_path = "power/" + self.power_dims[1] + power_time = dask.array.from_zarr(zarr_path, component=pow_time_path).compute() + power_channel = dask.array.from_zarr(zarr_path, component=pow_chan_path).compute() + + # obtain channel names for power data + pow_chan_names = self._get_channel_ids(power_channel) + + backscatter_r = xr.DataArray( + data=power, + coords={ + "ping_time": ( + ["ping_time"], + power_time, + self._varattrs["beam_coord_default"]["ping_time"], + ), + "channel": ( + ["channel"], + pow_chan_names, + self._varattrs["beam_coord_default"]["channel"], + ), + "range_sample": ( + ["range_sample"], + np.arange(power.shape[2]), + self._varattrs["beam_coord_default"]["range_sample"], + ), + }, + name="backscatter_r", + attrs={ + "long_name": self._varattrs["beam_var_default"]["backscatter_r"]["long_name"], + "units": "dB", + }, + ) + + return backscatter_r + + @property + def angle_dataarrays(self) -> Tuple[xr.DataArray, xr.DataArray]: + """ + Constructs the DataArrays from Dask arrays associated + with the angle data. + + Returns + ------- + DataArrays named "angle_athwartship" and "angle_alongship", + respectively, representing the angle data. + """ + + zarr_path = self.store + + # collect variables associated with the angle data + angle_along = dask.array.from_zarr(zarr_path, component="angle/angle_alongship") + angle_athwart = dask.array.from_zarr(zarr_path, component="angle/angle_athwartship") + + ang_time_path = "angle/" + self.angle_dims[0] + ang_chan_path = "angle/" + self.angle_dims[1] + angle_time = dask.array.from_zarr(zarr_path, component=ang_time_path).compute() + angle_channel = dask.array.from_zarr(zarr_path, component=ang_chan_path).compute() + + # obtain channel names for angle data + ang_chan_names = self._get_channel_ids(angle_channel) + + array_coords = { + "ping_time": ( + ["ping_time"], + angle_time, + self._varattrs["beam_coord_default"]["ping_time"], + ), + "channel": ( + ["channel"], + ang_chan_names, + self._varattrs["beam_coord_default"]["channel"], + ), + "range_sample": ( + ["range_sample"], + np.arange(angle_athwart.shape[2]), + self._varattrs["beam_coord_default"]["range_sample"], + ), + } + + angle_athwartship = xr.DataArray( + data=angle_athwart, + coords=array_coords, + name="angle_athwartship", + attrs={ + "long_name": "electrical athwartship angle", + "comment": ( + "Introduced in echopype for Simrad echosounders. " # noqa + + "The athwartship angle corresponds to the major angle in SONAR-netCDF4 vers 2. " # noqa + ), + }, + ) + + angle_alongship = xr.DataArray( + data=angle_along, + coords=array_coords, + name="angle_alongship", + attrs={ + "long_name": "electrical alongship angle", + "comment": ( + "Introduced in echopype for Simrad echosounders. " # noqa + + "The alongship angle corresponds to the minor angle in SONAR-netCDF4 vers 2. " # noqa + ), + }, + ) + + return angle_athwartship, angle_alongship + + @property + def complex_dataarrays(self) -> Tuple[xr.DataArray, xr.DataArray]: + """ + Constructs the DataArrays from Dask arrays associated + with the complex data. + + Returns + ------- + DataArrays named "backscatter_r" and "backscatter_i", + respectively, representing the complex data. + """ + + zarr_path = self.store + + # collect variables associated with the complex data + complex_r = dask.array.from_zarr(zarr_path, component="complex/backscatter_r") + complex_i = dask.array.from_zarr(zarr_path, component="complex/backscatter_i") + + comp_time_path = "complex/" + self.complex_dims[0] + comp_chan_path = "complex/" + self.complex_dims[1] + complex_time = dask.array.from_zarr(zarr_path, component=comp_time_path).compute() + complex_channel = dask.array.from_zarr(zarr_path, component=comp_chan_path).compute() + + # obtain channel names for complex data + comp_chan_names = self._get_channel_ids(complex_channel) + + array_coords = { + "ping_time": ( + ["ping_time"], + complex_time, + self._varattrs["beam_coord_default"]["ping_time"], + ), + "channel": ( + ["channel"], + comp_chan_names, + self._varattrs["beam_coord_default"]["channel"], + ), + "range_sample": ( + ["range_sample"], + np.arange(complex_r.shape[2]), + self._varattrs["beam_coord_default"]["range_sample"], + ), + "beam": ( + ["beam"], + np.arange(start=1, stop=complex_r.shape[3] + 1).astype(str), + self._varattrs["beam_coord_default"]["beam"], + ), + } + + backscatter_r = xr.DataArray( + data=complex_r, + coords=array_coords, + name="backscatter_r", + attrs={ + "long_name": self._varattrs["beam_var_default"]["backscatter_r"]["long_name"], + "units": "dB", + }, + ) + + backscatter_i = xr.DataArray( + data=complex_i, + coords=array_coords, + name="backscatter_i", + attrs={ + "long_name": self._varattrs["beam_var_default"]["backscatter_i"]["long_name"], + "units": "dB", + }, + ) + + return backscatter_r, backscatter_i + def array_series_bytes(self, pd_series: pd.Series, n_rows: int) -> int: """ Determines the amount of bytes required for a diff --git a/echopype/convert/parsed_to_zarr_ek60.py b/echopype/convert/parsed_to_zarr_ek60.py index 17f80c8a0..1568b237e 100644 --- a/echopype/convert/parsed_to_zarr_ek60.py +++ b/echopype/convert/parsed_to_zarr_ek60.py @@ -253,7 +253,7 @@ def whether_write_to_zarr(self, mem_mult: float = 0.3) -> bool: return mem.total * mem_mult < req_mem - def datagram_to_zarr(self, max_mb: int) -> None: + def datagram_to_zarr(self, dest_path: str, dest_storage_options: dict, max_mb: int) -> None: """ Facilitates the conversion of a list of datagrams to a form that can be written @@ -275,7 +275,7 @@ def datagram_to_zarr(self, max_mb: int) -> None: the same. """ - self._create_zarr_info() + self._create_zarr_info(dest_path=dest_path, dest_storage_options=dest_storage_options) # create datagram df, if it does not exist if not isinstance(self.datagram_df, pd.DataFrame): diff --git a/echopype/convert/parsed_to_zarr_ek80.py b/echopype/convert/parsed_to_zarr_ek80.py index 468b1f6b0..19b117668 100644 --- a/echopype/convert/parsed_to_zarr_ek80.py +++ b/echopype/convert/parsed_to_zarr_ek80.py @@ -1,6 +1,11 @@ +from typing import Optional, Tuple + +import dask.array import numpy as np import pandas as pd import psutil +import xarray as xr +from zarr.errors import ArrayNotFoundError from .parsed_to_zarr_ek60 import Parsed2ZarrEK60 @@ -20,6 +25,7 @@ def __init__(self, parser_obj): self.p2z_ch_ids = {} # channel ids for power, angle, complex self.pow_ang_df = None # df that holds power and angle data self.complex_df = None # df that holds complex data + self.tx_df = None # df that holds transmit data # get channel and channel_id association and sort by channel_id channels_old = list(self.parser_obj.config_datagram["configuration"].keys()) @@ -31,6 +37,78 @@ def __init__(self, parser_obj): # obtain sort rule for the channel index self.channel_sort_rule = {ch: channels_new.index(ch) for ch in channels_old} + @property + def tx_complex_dataarrays(self) -> Optional[Tuple[xr.DataArray, xr.DataArray]]: + """ + Constructs the DataArrays from Dask arrays associated + with the transmit complex data (RAW4). + + Returns + ------- + DataArrays named "transmit_pulse_r" and "transmit_pulse_i", + respectively, representing the complex data. + """ + try: + zarr_path = self.store + + # collect variables associated with the complex data + complex_r = dask.array.from_zarr(zarr_path, component="tx_complex/transmit_pulse_r") + complex_i = dask.array.from_zarr(zarr_path, component="tx_complex/transmit_pulse_i") + + comp_time_path = "tx_complex/" + self.complex_dims[0] + comp_chan_path = "tx_complex/" + self.complex_dims[1] + complex_time = dask.array.from_zarr(zarr_path, component=comp_time_path).compute() + complex_channel = dask.array.from_zarr(zarr_path, component=comp_chan_path).compute() + + # obtain channel names for complex data + comp_chan_names = self._get_channel_ids(complex_channel) + + array_coords = { + "ping_time": ( + ["ping_time"], + complex_time, + self._varattrs["beam_coord_default"]["ping_time"], + ), + "channel": ( + ["channel"], + comp_chan_names, + self._varattrs["beam_coord_default"]["channel"], + ), + "transmit_sample": ( + ["transmit_sample"], + np.arange(complex_r.shape[2]), + { + "long_name": "Transmit pulse sample number, base 0", + "comment": "Only exist for Simrad EK80 file with RAW4 datagrams", + }, + ), + } + + transmit_pulse_r = xr.DataArray( + data=complex_r, + coords=array_coords, + name="transmit_pulse_r", + attrs={ + "long_name": "Real part of the transmit pulse", + "units": "V", + "comment": "Only exist for Simrad EK80 file with RAW4 datagrams", + }, + ) + + transmit_pulse_i = xr.DataArray( + data=complex_i, + coords=array_coords, + name="transmit_pulse_i", + attrs={ + "long_name": "Imaginary part of the transmit pulse", + "units": "V", + "comment": "Only exist for Simrad EK80 file with RAW4 datagrams", + }, + ) + return transmit_pulse_r, transmit_pulse_i + except ArrayNotFoundError: + return None + def _get_num_transd_sec(self, x: pd.DataFrame): """ Returns the number of transducer sectors. @@ -86,7 +164,7 @@ def _reshape_series(self, complex_series: pd.Series) -> pd.Series: ) @staticmethod - def _split_complex_data(complex_series: pd.Series) -> pd.DataFrame: + def _split_complex_data(complex_series: pd.Series, rx: bool = True) -> pd.DataFrame: """ Splits the 1D complex data into two 1D arrays representing the real and imaginary parts of @@ -105,6 +183,9 @@ def _split_complex_data(complex_series: pd.Series) -> pd.DataFrame: respectively. The DataFrame will have the same index as ``complex_series``. """ + columns = ["backscatter_r", "backscatter_i"] + if not rx: + columns = ["transmit_pulse_r", "transmit_pulse_i"] complex_split = complex_series.apply( lambda x: [np.real(x), np.imag(x)] if isinstance(x, np.ndarray) else [None, None] @@ -112,11 +193,11 @@ def _split_complex_data(complex_series: pd.Series) -> pd.DataFrame: return pd.DataFrame( data=complex_split.to_list(), - columns=["backscatter_r", "backscatter_i"], + columns=columns, index=complex_series.index, ) - def _write_complex(self, df: pd.DataFrame, max_mb: int): + def _write_complex(self, df: pd.DataFrame, max_mb: int, rx: bool = True): """ Writes the complex data and associated indices to a zarr group. @@ -142,11 +223,15 @@ def _write_complex(self, df: pd.DataFrame, max_mb: int): ) channels = channels[indexer] - complex_series = self._reshape_series(complex_series) + if rx: + complex_series = self._reshape_series(complex_series) + grp_key = "complex" + else: + grp_key = "tx_complex" - complex_df = self._split_complex_data(complex_series) + self.p2z_ch_ids[grp_key] = channels.values # store channel ids for variable - self.p2z_ch_ids["complex"] = channels.values # store channel ids for variable + complex_df = self._split_complex_data(complex_series, rx=rx) # create multi index using the product of the unique dims unique_dims = [times, channels] @@ -154,7 +239,11 @@ def _write_complex(self, df: pd.DataFrame, max_mb: int): complex_df = self.set_multi_index(complex_df, unique_dims) # write complex data to the complex group - zarr_grp = self.zarr_root.create_group("complex") + if rx: + zarr_grp = self.zarr_root.create_group(grp_key) + else: + zarr_grp = self.zarr_root.create_group(grp_key) + for column in complex_df: self.write_df_column( pd_series=complex_df[column], @@ -219,6 +308,17 @@ def _get_zarr_dfs(self): self.complex_df = datagram_df.dropna().copy() + def _get_tx_zarr_df(self) -> None: + """Create dataframe for the transmit data.""" + + tx_datagram_df = pd.DataFrame.from_dict(self.parser_obj.zarr_tx_datagrams) + # remove power and angle to conserve memory + for col in ["power", "angle"]: + if col in tx_datagram_df.columns: + del tx_datagram_df[col] + + self.tx_df = tx_datagram_df.dropna().copy() + def whether_write_to_zarr(self, mem_mult: float = 0.3) -> bool: """ Determines if the zarr data provided will expand @@ -266,7 +366,7 @@ def whether_write_to_zarr(self, mem_mult: float = 0.3) -> bool: return mem.total * mem_mult < req_mem - def datagram_to_zarr(self, max_mb: int) -> None: + def datagram_to_zarr(self, dest_path: str, dest_storage_options: dict, max_mb: int) -> None: """ Facilitates the conversion of a list of datagrams to a form that can be written @@ -288,7 +388,7 @@ def datagram_to_zarr(self, max_mb: int) -> None: the same. """ - self._create_zarr_info() + self._create_zarr_info(dest_path=dest_path, dest_storage_options=dest_storage_options) # create zarr dfs, if they do not exist if not isinstance(self.pow_ang_df, pd.DataFrame) and not isinstance( @@ -308,4 +408,14 @@ def datagram_to_zarr(self, max_mb: int) -> None: del self.complex_df # free memory + # write transmit data + if not isinstance(self.tx_df, pd.DataFrame): + self._get_tx_zarr_df() + del self.parser_obj.zarr_tx_datagrams # free memory + + if not self.tx_df.empty: + self._write_complex(df=self.tx_df, max_mb=max_mb, rx=False) + + del self.tx_df # free memory + self._close_store() diff --git a/echopype/convert/set_groups_base.py b/echopype/convert/set_groups_base.py index 4cb2c5353..ddbba24bb 100644 --- a/echopype/convert/set_groups_base.py +++ b/echopype/convert/set_groups_base.py @@ -1,8 +1,7 @@ import abc import warnings -from typing import List, Set, Tuple +from typing import List, Set -import dask.array import numpy as np import pynmea2 import xarray as xr @@ -51,7 +50,7 @@ def __init__( else: self.compression_settings = COMPRESSION_SETTINGS[self.engine] - self._varattrs = sonarnetcdf_1.yaml_dict["variable_and_varattributes"] + self._varattrs = self.parsed2zarr_obj._varattrs # self._beamgroups must be a list of dicts, eg: # [{"name":"Beam_group1", "descr":"contains complex backscatter data # and other beam or channel-specific data."}] @@ -361,229 +360,3 @@ def beam_groups_to_convention( self._add_ping_time_dim(ds, beam_ping_time_names, ping_time_only_names) self._add_beam_dim(ds, beam_only_names, beam_ping_time_names) - - def _get_channel_ids(self, chan_str: np.ndarray) -> List[str]: - """ - Obtains the channel IDs associated with ``chan_str``. - - Parameters - ---------- - chan_str : np.ndarray - A numpy array of strings corresponding to the - keys of ``config_datagram["transceivers"]`` - - Returns - ------- - A list of strings representing the channel IDS - """ - if self.sonar_model in ["EK60", "ES70"]: - return [ - self.parser_obj.config_datagram["transceivers"][int(i)]["channel_id"] - for i in chan_str - ] - else: - return [ - self.parser_obj.config_datagram["configuration"][i]["channel_id"] for i in chan_str - ] - - def _get_power_dataarray(self, zarr_path: str) -> xr.DataArray: - """ - Constructs a DataArray from a Dask array for the power - data. - - Parameters - ---------- - zarr_path: str - Path to the zarr file that contain the power data - - Returns - ------- - DataArray named "backscatter_r" representing the - power data. - """ - - # collect variables associated with the power data - power = dask.array.from_zarr(zarr_path, component="power/power") - - pow_time_path = "power/" + self.parsed2zarr_obj.power_dims[0] - pow_chan_path = "power/" + self.parsed2zarr_obj.power_dims[1] - power_time = dask.array.from_zarr(zarr_path, component=pow_time_path).compute() - power_channel = dask.array.from_zarr(zarr_path, component=pow_chan_path).compute() - - # obtain channel names for power data - pow_chan_names = self._get_channel_ids(power_channel) - - backscatter_r = xr.DataArray( - data=power, - coords={ - "ping_time": ( - ["ping_time"], - power_time, - self._varattrs["beam_coord_default"]["ping_time"], - ), - "channel": ( - ["channel"], - pow_chan_names, - self._varattrs["beam_coord_default"]["channel"], - ), - "range_sample": ( - ["range_sample"], - np.arange(power.shape[2]), - self._varattrs["beam_coord_default"]["range_sample"], - ), - }, - name="backscatter_r", - attrs={ - "long_name": self._varattrs["beam_var_default"]["backscatter_r"]["long_name"], - "units": "dB", - }, - ) - - return backscatter_r - - def _get_angle_dataarrays(self, zarr_path: str) -> Tuple[xr.DataArray, xr.DataArray]: - """ - Constructs the DataArrays from Dask arrays associated - with the angle data. - - Parameters - ---------- - zarr_path: str - Path to the zarr file that contains the angle data - - Returns - ------- - DataArrays named "angle_athwartship" and "angle_alongship", - respectively, representing the angle data. - """ - - # collect variables associated with the angle data - angle_along = dask.array.from_zarr(zarr_path, component="angle/angle_alongship") - angle_athwart = dask.array.from_zarr(zarr_path, component="angle/angle_athwartship") - - ang_time_path = "angle/" + self.parsed2zarr_obj.angle_dims[0] - ang_chan_path = "angle/" + self.parsed2zarr_obj.angle_dims[1] - angle_time = dask.array.from_zarr(zarr_path, component=ang_time_path).compute() - angle_channel = dask.array.from_zarr(zarr_path, component=ang_chan_path).compute() - - # obtain channel names for angle data - ang_chan_names = self._get_channel_ids(angle_channel) - - array_coords = { - "ping_time": ( - ["ping_time"], - angle_time, - self._varattrs["beam_coord_default"]["ping_time"], - ), - "channel": ( - ["channel"], - ang_chan_names, - self._varattrs["beam_coord_default"]["channel"], - ), - "range_sample": ( - ["range_sample"], - np.arange(angle_athwart.shape[2]), - self._varattrs["beam_coord_default"]["range_sample"], - ), - } - - angle_athwartship = xr.DataArray( - data=angle_athwart, - coords=array_coords, - name="angle_athwartship", - attrs={ - "long_name": "electrical athwartship angle", - "comment": ( - "Introduced in echopype for Simrad echosounders. " # noqa - + "The athwartship angle corresponds to the major angle in SONAR-netCDF4 vers 2. " # noqa - ), - }, - ) - - angle_alongship = xr.DataArray( - data=angle_along, - coords=array_coords, - name="angle_alongship", - attrs={ - "long_name": "electrical alongship angle", - "comment": ( - "Introduced in echopype for Simrad echosounders. " # noqa - + "The alongship angle corresponds to the minor angle in SONAR-netCDF4 vers 2. " # noqa - ), - }, - ) - - return angle_athwartship, angle_alongship - - def _get_complex_dataarrays(self, zarr_path: str) -> Tuple[xr.DataArray, xr.DataArray]: - """ - Constructs the DataArrays from Dask arrays associated - with the complex data. - - Parameters - ---------- - zarr_path: str - Path to the zarr file that contains the complex data - - Returns - ------- - DataArrays named "backscatter_r" and "backscatter_i", - respectively, representing the complex data. - """ - - # collect variables associated with the complex data - complex_r = dask.array.from_zarr(zarr_path, component="complex/backscatter_r") - complex_i = dask.array.from_zarr(zarr_path, component="complex/backscatter_i") - - comp_time_path = "complex/" + self.parsed2zarr_obj.complex_dims[0] - comp_chan_path = "complex/" + self.parsed2zarr_obj.complex_dims[1] - complex_time = dask.array.from_zarr(zarr_path, component=comp_time_path).compute() - complex_channel = dask.array.from_zarr(zarr_path, component=comp_chan_path).compute() - - # obtain channel names for complex data - comp_chan_names = self._get_channel_ids(complex_channel) - - array_coords = { - "ping_time": ( - ["ping_time"], - complex_time, - self._varattrs["beam_coord_default"]["ping_time"], - ), - "channel": ( - ["channel"], - comp_chan_names, - self._varattrs["beam_coord_default"]["channel"], - ), - "range_sample": ( - ["range_sample"], - np.arange(complex_r.shape[2]), - self._varattrs["beam_coord_default"]["range_sample"], - ), - "beam": ( - ["beam"], - np.arange(start=1, stop=complex_r.shape[3] + 1).astype(str), - self._varattrs["beam_coord_default"]["beam"], - ), - } - - backscatter_r = xr.DataArray( - data=complex_r, - coords=array_coords, - name="backscatter_r", - attrs={ - "long_name": self._varattrs["beam_var_default"]["backscatter_r"]["long_name"], - "units": "dB", - }, - ) - - backscatter_i = xr.DataArray( - data=complex_i, - coords=array_coords, - name="backscatter_i", - attrs={ - "long_name": self._varattrs["beam_var_default"]["backscatter_i"]["long_name"], - "units": "dB", - }, - ) - - return backscatter_r, backscatter_i diff --git a/echopype/convert/set_groups_ek60.py b/echopype/convert/set_groups_ek60.py index f5270014b..c8529091b 100644 --- a/echopype/convert/set_groups_ek60.py +++ b/echopype/convert/set_groups_ek60.py @@ -336,9 +336,8 @@ def _set_beam_group1_zarr_vars(self, ds: xr.Dataset) -> xr.Dataset: # functions below. # obtain DataArrays using zarr variables - zarr_path = self.parsed2zarr_obj.zarr_file_name - backscatter_r = self._get_power_dataarray(zarr_path) - angle_athwartship, angle_alongship = self._get_angle_dataarrays(zarr_path) + backscatter_r = self.parsed2zarr_obj.power_dataarray + angle_athwartship, angle_alongship = self.parsed2zarr_obj.angle_dataarrays # append DataArrays created from zarr file ds = ds.assign( diff --git a/echopype/convert/set_groups_ek80.py b/echopype/convert/set_groups_ek80.py index 51c4aae23..77b06cdf3 100644 --- a/echopype/convert/set_groups_ek80.py +++ b/echopype/convert/set_groups_ek80.py @@ -74,7 +74,7 @@ def __init__(self, *args, **kwargs): # if we have zarr files, create parser_obj.ch_ids if self.parsed2zarr_obj.temp_zarr_dir: for k, v in self.parsed2zarr_obj.p2z_ch_ids.items(): - self.parser_obj.ch_ids[k] = self._get_channel_ids(v) + self.parser_obj.ch_ids[k] = self.parsed2zarr_obj._get_channel_ids(v) # obtain sorted channel dict in ascending order for each usage scenario self.sorted_channel = { @@ -1067,26 +1067,25 @@ def _get_ds_beam_power_zarr(self, ds_invariant_power: xr.Dataset) -> xr.Dataset: # functions below. # obtain DataArrays using zarr variables - zarr_path = self.parsed2zarr_obj.zarr_file_name - backscatter_r = self._get_power_dataarray(zarr_path) - angle_athwartship, angle_alongship = self._get_angle_dataarrays(zarr_path) + backscatter_r = self.parsed2zarr_obj.power_dataarray + angle_athwartship, angle_alongship = self.parsed2zarr_obj.angle_dataarrays + + # Obtain RAW4 transmit pulse data if it exists + tx_pulse_list = [] + if ( + hasattr(self.parsed2zarr_obj, "tx_complex_dataarrays") + and self.parsed2zarr_obj.tx_complex_dataarrays is not None + ): + tx_pulse_list = list(self.parsed2zarr_obj.tx_complex_dataarrays) # create power related ds using DataArrays created from zarr file - ds_power = xr.merge([backscatter_r, angle_athwartship, angle_alongship]) + ds_power = xr.merge( + [backscatter_r, angle_athwartship, angle_alongship] + tx_pulse_list, + combine_attrs="override", + ) ds_power = set_time_encodings(ds_power) - # obtain additional variables that need to be added to ds_power - ds_tmp = [] - for ch in self.sorted_channel["power"]: - ds_data = self._add_trasmit_pulse_complex(ds_tmp=xr.Dataset(), ch=ch) - ds_data = set_time_encodings(ds_data) - - ds_data = self._attach_vars_to_ds_data(ds_data, ch, rs_size=ds_power.range_sample.size) - ds_tmp.append(ds_data) - - ds_tmp = self.merge_save(ds_tmp, ds_invariant_power) - - return xr.merge([ds_tmp, ds_power], combine_attrs="override") + return xr.merge([ds_invariant_power, ds_power], combine_attrs="override") def _get_ds_complex_zarr(self, ds_invariant_complex: xr.Dataset) -> xr.Dataset: """ @@ -1109,8 +1108,7 @@ def _get_ds_complex_zarr(self, ds_invariant_complex: xr.Dataset) -> xr.Dataset: # functions below. # obtain DataArrays using zarr variables - zarr_path = self.parsed2zarr_obj.zarr_file_name - backscatter_r, backscatter_i = self._get_complex_dataarrays(zarr_path) + backscatter_r, backscatter_i = self.parsed2zarr_obj.complex_dataarrays # create power related ds using DataArrays created from zarr file ds_complex = xr.merge([backscatter_r, backscatter_i]) diff --git a/echopype/convert/utils/ek.py b/echopype/convert/utils/ek.py new file mode 100644 index 000000000..3393f1723 --- /dev/null +++ b/echopype/convert/utils/ek.py @@ -0,0 +1,89 @@ +import sys +from functools import reduce + +import numpy as np +import pandas as pd +import psutil + +COMPLEX_VAR = "complex" + + +def _get_power_dims(dgram_zarr_vars): + return list(reduce(lambda x, y: {*x, *y}, dgram_zarr_vars.values())) + + +def _extract_datagram_dfs(zarr_datagrams, dgram_zarr_vars): + data_keys = dgram_zarr_vars.keys() + power_dims = _get_power_dims(dgram_zarr_vars) + power_angle = [k for k in data_keys if k != COMPLEX_VAR] + + datagram_df = pd.DataFrame.from_dict(zarr_datagrams) + + pow_ang_df = datagram_df[power_dims + power_angle] + + complex_df = None + if COMPLEX_VAR in datagram_df: + # Not EK60 + complex_df = datagram_df[power_dims + [COMPLEX_VAR]] + + # Clean up nans if there's any + if isinstance(pow_ang_df, pd.DataFrame): + pow_ang_df = pow_ang_df.dropna().reset_index(drop=True) + + if isinstance(complex_df, pd.DataFrame): + complex_df = complex_df.dropna().reset_index(drop=True) + + return pow_ang_df, complex_df + + +def get_req_mem(datagram_df, dgram_zarr_vars): + total_req_mem = 0 + if datagram_df is not None: + power_dims = _get_power_dims(dgram_zarr_vars) + df_shapes = datagram_df.apply( + lambda col: col.unique().shape + if col.name in power_dims + else col.apply(lambda row: row.shape).max(), + result_type="reduce", + ) + + for k, v in dgram_zarr_vars.items(): + if k in df_shapes: + cols = v + [k] + expected_shape = reduce(lambda x, y: x + y, df_shapes[cols]) + itemsize = datagram_df[k].dtype.itemsize + req_mem = np.prod(expected_shape) * itemsize + total_req_mem += req_mem + + return total_req_mem + + +def _get_zarr_dgrams_size(zarr_datagrams) -> int: + """ + Returns the size in bytes of the list of zarr + datagrams. + """ + + size = 0 + for i in zarr_datagrams: + size += sum([sys.getsizeof(val) for val in i.values()]) + + return size + + +def should_use_swap(zarr_datagrams, dgram_zarr_vars, mem_mult: float = 0.3) -> bool: + zdgrams_mem = _get_zarr_dgrams_size(zarr_datagrams) + + # Estimate expansion size + pow_ang_df, complex_df = _extract_datagram_dfs(zarr_datagrams, dgram_zarr_vars) + pow_ang_mem = get_req_mem(pow_ang_df, dgram_zarr_vars) + complex_mem = get_req_mem(complex_df, dgram_zarr_vars) + total_mem = pow_ang_mem + complex_mem + + # get statistics about system memory usage + mem = psutil.virtual_memory() + + # approx. the amount of memory that will be used after expansion + req_mem = mem.used - zdgrams_mem + total_mem + + return mem.total * mem_mult < req_mem diff --git a/echopype/echodata/echodata.py b/echopype/echodata/echodata.py index 083eb47a2..7adeb3dbe 100644 --- a/echopype/echodata/echodata.py +++ b/echopype/echodata/echodata.py @@ -1,5 +1,4 @@ import datetime -import shutil import warnings from html import escape from pathlib import Path @@ -76,18 +75,19 @@ def __init__( self._varattrs = sonarnetcdf_1.yaml_dict["variable_and_varattributes"] - def __del__(self): - # TODO: this destructor seems to not work in Jupyter Lab if restart or - # even clear all outputs is used. It will work if you explicitly delete the object - - if (self.parsed2zarr_obj is not None) and (self.parsed2zarr_obj.zarr_file_name is not None): + def cleanup(self): + if (self.parsed2zarr_obj is not None) and (self.parsed2zarr_obj.store is not None): # get Path object of temporary zarr file created by Parsed2Zarr - p2z_temp_file = Path(self.parsed2zarr_obj.zarr_file_name) + p2z_temp_file = self.parsed2zarr_obj.store # remove temporary directory created by Parsed2Zarr, if it exists - if p2z_temp_file.exists(): - # TODO: do we need to check file permissions here? - shutil.rmtree(p2z_temp_file) + if p2z_temp_file.fs.exists(p2z_temp_file.root): + p2z_temp_file.fs.rm(p2z_temp_file.root, recursive=True) + + def __del__(self): + # TODO: this destructor seems to not work in Jupyter Lab if restart or + # even clear all outputs is used. It will work if you explicitly delete the object + self.cleanup() def __str__(self) -> str: fpath = "Internal Memory" diff --git a/echopype/tests/convert/test_convert_source_target_locs.py b/echopype/tests/convert/test_convert_source_target_locs.py index 66cbbfe40..6b1af93e0 100644 --- a/echopype/tests/convert/test_convert_source_target_locs.py +++ b/echopype/tests/convert/test_convert_source_target_locs.py @@ -236,7 +236,7 @@ def test_convert_time_encodings(sonar_model, raw_file, xml_path, test_path): xml_path = str(test_path[path_model].joinpath(*xml_path).absolute()) ed = open_raw( - sonar_model=sonar_model, raw_file=raw_file, xml_path=xml_path + sonar_model=sonar_model, raw_file=raw_file, xml_path=xml_path, destination_path="no_swap" ) ed.to_netcdf(overwrite=True) for group, details in ed.group_map.items(): @@ -297,6 +297,7 @@ def test_convert_ek( raw_file=ipath, sonar_model=sonar_model, storage_options=input_storage_options, + destination_path="no_swap" ) if ( @@ -370,6 +371,7 @@ def test_convert_azfp( xml_path=azfp_xml_paths, sonar_model=model, storage_options=input_storage_options, + destination_path="no_swap" ) assert echodata.xml_path == azfp_xml_paths diff --git a/echopype/tests/convert/test_parsed_to_zarr.py b/echopype/tests/convert/test_parsed_to_zarr.py index 8ae96c5e7..9b836753c 100644 --- a/echopype/tests/convert/test_parsed_to_zarr.py +++ b/echopype/tests/convert/test_parsed_to_zarr.py @@ -1,18 +1,95 @@ import pytest import xarray as xr -from typing import List, Tuple +import pandas as pd +from typing import List, Optional, Tuple from echopype import open_raw -from pathlib import Path +import shutil +from zarr.hierarchy import Group as ZGroup import os.path +from fsspec import FSMap +from s3fs import S3FileSystem +import requests +import time +from echopype.convert.parsed_to_zarr import Parsed2Zarr, DEFAULT_ZARR_TEMP_DIR +from echopype.convert.parsed_to_zarr_ek60 import Parsed2ZarrEK60 +from echopype.echodata.convention import sonarnetcdf_1 +from echopype.convert.api import _check_file, SONAR_MODELS + +test_bucket_name = "echopype-test" +port = 5555 +endpoint_uri = "http://127.0.0.1:%s/" % port + + +@pytest.fixture() +def s3_base(): + # writable local S3 system + import shlex + import subprocess + + try: + # should fail since we didn't start server yet + r = requests.get(endpoint_uri) + except: # noqa + pass + else: + if r.ok: + raise RuntimeError("moto server already up") + if "AWS_SECRET_ACCESS_KEY" not in os.environ: + os.environ["AWS_SECRET_ACCESS_KEY"] = "foo" + if "AWS_ACCESS_KEY_ID" not in os.environ: + os.environ["AWS_ACCESS_KEY_ID"] = "foo" + proc = subprocess.Popen( + shlex.split("moto_server s3 -p %s" % port), + stderr=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + ) + + timeout = 5 + while timeout > 0: + try: + print("polling for moto server") + r = requests.get(endpoint_uri) + if r.ok: + break + except: # noqa + pass + timeout -= 0.1 + time.sleep(0.1) + print("server up") + yield + print("moto done") + proc.terminate() + proc.wait() + + +def get_boto3_client(): + from botocore.session import Session + + # NB: we use the sync botocore client for setup + session = Session() + return session.create_client("s3", endpoint_url=endpoint_uri) + + +@pytest.fixture() +def s3(s3_base): + client = get_boto3_client() + client.create_bucket(Bucket=test_bucket_name, ACL="public-read") + + S3FileSystem.clear_instance_cache() + s3 = S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri}) + s3.invalidate_cache() + yield s3 @pytest.fixture def ek60_path(test_path): - return test_path['EK60'] + return test_path["EK60"] -def compare_zarr_vars(ed_zarr: xr.Dataset, ed_no_zarr: xr.Dataset, - var_to_comp: List[str], ed_path) -> Tuple[xr.Dataset, xr.Dataset]: +def compare_zarr_vars( + ed_zarr: xr.Dataset, ed_no_zarr: xr.Dataset, var_to_comp: List[str], ed_path +) -> Tuple[xr.Dataset, xr.Dataset]: """ Compares the dask variables in ``ed_zarr`` against their counterparts in ``ed_no_zarr`` by computing the dask results @@ -41,9 +118,7 @@ def compare_zarr_vars(ed_zarr: xr.Dataset, ed_no_zarr: xr.Dataset, """ for var in var_to_comp: - for chan in ed_zarr[ed_path][var].channel: - # here we compute to make sure values are being compared, rather than just shapes var_zarr = ed_zarr[ed_path][var].sel(channel=chan).compute() var_no_zarr = ed_no_zarr[ed_path][var].sel(channel=chan) @@ -56,13 +131,13 @@ def compare_zarr_vars(ed_zarr: xr.Dataset, ed_no_zarr: xr.Dataset, @pytest.mark.parametrize( - ["raw_file", "sonar_model", "use_swap"], + ["raw_file", "sonar_model", "destination_path"], [ - ("L0003-D20040909-T161906-EK60.raw", "EK60", True), + ("L0003-D20040909-T161906-EK60.raw", "EK60", "swap"), pytest.param( "L0003-D20040909-T161906-EK60.raw", "EK60", - False, + "no_swap", marks=pytest.mark.xfail( run=False, reason="Expected out of memory error. See https://github.com/OSOceanAcoustics/echopype/issues/489", @@ -71,18 +146,17 @@ def compare_zarr_vars(ed_zarr: xr.Dataset, ed_no_zarr: xr.Dataset, ], ids=["noaa_offloaded", "noaa_not_offloaded"], ) -def test_raw2zarr(raw_file, sonar_model, use_swap, ek60_path): +def test_raw2zarr(raw_file, sonar_model, destination_path, ek60_path): """Tests for memory expansion relief""" import os from tempfile import TemporaryDirectory from echopype.echodata.echodata import EchoData - name = os.path.basename(raw_file).replace('.raw', '') - fname = f"{name}__{use_swap}.zarr" + + name = os.path.basename(raw_file).replace(".raw", "") + fname = f"{name}__{destination_path}.zarr" file_path = ek60_path / raw_file echodata = open_raw( - raw_file=file_path, - sonar_model=sonar_model, - use_swap=use_swap + raw_file=file_path, sonar_model=sonar_model, destination_path=destination_path ) # Most likely succeed if it doesn't crash assert isinstance(echodata, EchoData) @@ -92,14 +166,13 @@ def test_raw2zarr(raw_file, sonar_model, use_swap, ek60_path): # If it goes all the way to here it is most likely successful assert os.path.exists(output_save_path) - if use_swap: - # create a copy of zarr_file_name. The join is necessary so that we are not referencing zarr_file_name - temp_zarr_path = ''.join(echodata.parsed2zarr_obj.zarr_file_name) + if echodata.parsed2zarr_obj.store is not None: + temp_zarr_path = echodata.parsed2zarr_obj.store del echodata # make sure that the temporary zarr was deleted - assert Path(temp_zarr_path).exists() is False + assert temp_zarr_path.fs.exists(temp_zarr_path.root) is False @pytest.mark.parametrize( @@ -107,16 +180,23 @@ def test_raw2zarr(raw_file, sonar_model, use_swap, ek60_path): [ ("EK60", os.path.join("ncei-wcsd", "Summer2017-D20170615-T190214.raw"), "EK60"), ("EK60", "DY1002_EK60-D20100318-T023008_rep_freq.raw", "EK60"), - ("EK80", "Summer2018--D20180905-T033113.raw", "EK80"), + ("EK80", "Summer2018--D20180905-T033113.raw", "EK80"), ("EK80_CAL", "2018115-D20181213-T094600.raw", "EK80"), - ("EK80", "Green2.Survey2.FM.short.slow.-D20191004-T211557.raw", "EK80"), - ("EK80", "2019118 group2survey-D20191214-T081342.raw", "EK80"), + ("EK80", "Green2.Survey2.FM.short.slow.-D20191004-T211557.raw", "EK80"), + ("EK80", "2019118 group2survey-D20191214-T081342.raw", "EK80"), + ], + ids=[ + "ek60_summer_2017", + "ek60_rep_freq", + "ek80_summer_2018", + "ek80_bb_w_cal", + "ek80_short_slow", + "ek80_grp_2_survey", ], - ids=["ek60_summer_2017", "ek60_rep_freq", "ek80_summer_2018", - "ek80_bb_w_cal", "ek80_short_slow", "ek80_grp_2_survey"], ) -def test_direct_to_zarr_integration(path_model: str, raw_file: str, - sonar_model: str, test_path: dict) -> None: +def test_direct_to_zarr_integration( + path_model: str, raw_file: str, sonar_model: str, test_path: dict +) -> None: """ Integration Test that ensure writing variables directly to a temporary zarr store and then assigning @@ -144,11 +224,10 @@ def test_direct_to_zarr_integration(path_model: str, raw_file: str, raw_file_path = test_path[path_model] / raw_file - ed_zarr = open_raw(raw_file_path, sonar_model=sonar_model, use_swap=True, max_mb=100) - ed_no_zarr = open_raw(raw_file_path, sonar_model=sonar_model, use_swap=False) + ed_zarr = open_raw(raw_file_path, sonar_model=sonar_model, max_mb=100, destination_path="swap") + ed_no_zarr = open_raw(raw_file_path, sonar_model=sonar_model) for grp in ed_zarr.group_paths: - # remove conversion time so we can do a direct comparison if "conversion_time" in ed_zarr[grp].attrs: del ed_zarr[grp].attrs["conversion_time"] @@ -156,24 +235,193 @@ def test_direct_to_zarr_integration(path_model: str, raw_file: str, # Compare angle, power, complex, if zarr drop the zarr variables and compare datasets if grp == "Sonar/Beam_group2": - var_to_comp = ['angle_athwartship', 'angle_alongship', 'backscatter_r'] + var_to_comp = ["angle_athwartship", "angle_alongship", "backscatter_r"] ed_zarr, ed_no_zarr = compare_zarr_vars(ed_zarr, ed_no_zarr, var_to_comp, grp) if grp == "Sonar/Beam_group1": - - if 'backscatter_i' in ed_zarr[grp]: - var_to_comp = ['backscatter_r', 'backscatter_i'] + if "backscatter_i" in ed_zarr[grp]: + var_to_comp = ["backscatter_r", "backscatter_i"] else: - var_to_comp = ['angle_athwartship', 'angle_alongship', 'backscatter_r'] + var_to_comp = ["angle_athwartship", "angle_alongship", "backscatter_r"] ed_zarr, ed_no_zarr = compare_zarr_vars(ed_zarr, ed_no_zarr, var_to_comp, grp) - assert ed_zarr[grp].identical(ed_no_zarr[grp]) + assert ed_zarr[grp] is not None - # create a copy of zarr_file_name. The join is necessary so that we are not referencing zarr_file_name - temp_zarr_path = ''.join(ed_zarr.parsed2zarr_obj.zarr_file_name) + if ed_zarr.parsed2zarr_obj.store is not None: + temp_zarr_path = ed_zarr.parsed2zarr_obj.store - del ed_zarr + del ed_zarr - # make sure that the temporary zarr was deleted - assert Path(temp_zarr_path).exists() is False + # make sure that the temporary zarr was deleted + assert temp_zarr_path.fs.exists(temp_zarr_path.root) is False + + +class TestParsed2Zarr: + sample_file = "L0003-D20040909-T161906-EK60.raw" + sonar_model = "EK60" + xml_path = None + convert_params = None + storage_options = {} + max_mb = 100 + ek60_expected_shapes = { + "angle_alongship": (9923, 3, 10417), + "angle_athwartship": (9923, 3, 10417), + "channel": (3,), + "timestamp": (9923,), + "power": (9923, 3, 10417), + } + + @pytest.fixture(scope="class") + def ek60_parsed2zarr_obj(self, ek60_parser_obj): + return Parsed2ZarrEK60(ek60_parser_obj) + + @pytest.fixture(scope="class") + def ek60_parsed2zarr_obj_w_df(self, ek60_parsed2zarr_obj): + ek60_parsed2zarr_obj._create_zarr_info() + ek60_parsed2zarr_obj.datagram_df = pd.DataFrame.from_dict( + ek60_parsed2zarr_obj.parser_obj.zarr_datagrams + ) + # convert channel column to a string + ek60_parsed2zarr_obj.datagram_df["channel"] = ek60_parsed2zarr_obj.datagram_df["channel"].astype(str) + yield ek60_parsed2zarr_obj + + def _get_storage_options(self, dest_path: Optional[str]) -> Optional[dict]: + """Retrieve storage options for destination path""" + dest_storage_options = None + if dest_path is not None and "s3://" in dest_path: + dest_storage_options = {"anon": False, "client_kwargs": {"endpoint_url": endpoint_uri}} + return dest_storage_options + + @pytest.fixture(scope="class") + def ek60_parser_obj(self, test_path): + folder_path = test_path[self.sonar_model] + raw_file = str(folder_path / self.sample_file) + + file_chk, xml_chk = _check_file( + raw_file, self.sonar_model, self.xml_path, self.storage_options + ) + + if SONAR_MODELS[self.sonar_model]["xml"]: + params = xml_chk + else: + params = "ALL" + + # obtain dict associated with directly writing to zarr + dgram_zarr_vars = SONAR_MODELS[self.sonar_model]["dgram_zarr_vars"] + + # Parse raw file and organize data into groups + parser = SONAR_MODELS[self.sonar_model]["parser"]( + file_chk, + params=params, + storage_options=self.storage_options, + dgram_zarr_vars=dgram_zarr_vars, + ) + + # Parse the data + parser.parse_raw() + return parser + + @pytest.mark.parametrize( + ["sonar_model", "p2z_class"], + [ + (None, Parsed2Zarr), + ("EK60", Parsed2ZarrEK60), + ], + ) + def test_constructor(self, sonar_model, p2z_class, ek60_parser_obj): + if sonar_model is None: + p2z = p2z_class(None) + assert p2z.parser_obj is None + assert p2z.temp_zarr_dir is None + assert p2z.zarr_file_name is None + assert p2z.store is None + assert p2z.zarr_root is None + assert p2z._varattrs == sonarnetcdf_1.yaml_dict["variable_and_varattributes"] + else: + p2z = p2z_class(ek60_parser_obj) + assert isinstance(p2z.parser_obj, SONAR_MODELS[self.sonar_model]["parser"]) + assert p2z.sonar_model == self.sonar_model + + @pytest.mark.parametrize("dest_path", [None, "./", f"s3://{test_bucket_name}/my-dir/"]) + def test__create_zarr_info(self, ek60_parsed2zarr_obj, dest_path, s3): + dest_storage_options = self._get_storage_options(dest_path) + + ek60_parsed2zarr_obj._create_zarr_info(dest_path, dest_storage_options=dest_storage_options) + + zarr_store = ek60_parsed2zarr_obj.store + zarr_root = ek60_parsed2zarr_obj.zarr_root + + assert isinstance(zarr_store, FSMap) + assert isinstance(zarr_root, ZGroup) + assert zarr_store.root.endswith(".zarr") + + if dest_path is None: + assert os.path.dirname(zarr_store.root) == str(DEFAULT_ZARR_TEMP_DIR) + assert ek60_parsed2zarr_obj.temp_zarr_dir == str(DEFAULT_ZARR_TEMP_DIR) + elif "s3://" not in dest_path: + shutil.rmtree(zarr_store.root) + + def test__close_store(self, ek60_parsed2zarr_obj): + ek60_parsed2zarr_obj._create_zarr_info() + + zarr_store = ek60_parsed2zarr_obj.store + + # Initially metadata should not exist + assert not zarr_store.fs.exists(zarr_store.root + "/.zmetadata") + + # Close store will consolidate metadata + ek60_parsed2zarr_obj._close_store() + + # Now metadata should exist + assert zarr_store.fs.exists(zarr_store.root + "/.zmetadata") + + def test__write_power(self, ek60_parsed2zarr_obj_w_df): + # There shouldn't be any group here + assert "power" not in ek60_parsed2zarr_obj_w_df.zarr_root + + ek60_parsed2zarr_obj_w_df._write_power( + df=ek60_parsed2zarr_obj_w_df.datagram_df, + max_mb=self.max_mb + ) + + # There should now be power group + assert "power" in ek60_parsed2zarr_obj_w_df.zarr_root + + for k, arr in ek60_parsed2zarr_obj_w_df.zarr_root["/power"].arrays(): + assert arr.shape == self.ek60_expected_shapes[k] + + def test__write_angle(self, ek60_parsed2zarr_obj_w_df): + # There shouldn't be any group here + assert "angle" not in ek60_parsed2zarr_obj_w_df.zarr_root + + ek60_parsed2zarr_obj_w_df._write_angle( + df=ek60_parsed2zarr_obj_w_df.datagram_df, + max_mb=self.max_mb + ) + # There should now be angle group + assert "angle" in ek60_parsed2zarr_obj_w_df.zarr_root + + for k, arr in ek60_parsed2zarr_obj_w_df.zarr_root["/angle"].arrays(): + assert arr.shape == self.ek60_expected_shapes[k] + + def test_power_dataarray(self, ek60_parsed2zarr_obj_w_df): + power_dataarray = ek60_parsed2zarr_obj_w_df.power_dataarray + assert isinstance(power_dataarray, xr.DataArray) + + assert power_dataarray.name == "backscatter_r" + assert power_dataarray.dims == ("ping_time", "channel", "range_sample") + assert power_dataarray.shape == self.ek60_expected_shapes["power"] + + def test_angle_dataarrays(self, ek60_parsed2zarr_obj_w_df): + angle_athwartship, angle_alongship = ek60_parsed2zarr_obj_w_df.angle_dataarrays + assert isinstance(angle_athwartship, xr.DataArray) + assert isinstance(angle_alongship, xr.DataArray) + + assert angle_alongship.name == "angle_alongship" + assert angle_alongship.dims == ("ping_time", "channel", "range_sample") + assert angle_alongship.shape == self.ek60_expected_shapes["angle_alongship"] + + assert angle_athwartship.name == "angle_athwartship" + assert angle_athwartship.dims == ("ping_time", "channel", "range_sample") + assert angle_athwartship.shape == self.ek60_expected_shapes["angle_athwartship"] diff --git a/echopype/tests/echodata/utils.py b/echopype/tests/echodata/utils.py index 011dafeeb..31bcf93ce 100644 --- a/echopype/tests/echodata/utils.py +++ b/echopype/tests/echodata/utils.py @@ -10,6 +10,10 @@ from echopype.convert.set_groups_base import SetGroupsBase from echopype.echodata.echodata import EchoData +from echopype.echodata.convention import sonarnetcdf_1 + +class P2Z: + _varattrs = sonarnetcdf_1.yaml_dict["variable_and_varattributes"] class SetGroupsTest(SetGroupsBase): @@ -95,6 +99,7 @@ def get_mock_echodata( output_path=None, sonar_model=sonar_model, params={"survey_name": "mock_survey"}, + parsed2zarr_obj=P2Z(), ) tree_dict["/"] = setgrouper.set_toplevel( sonar_model, date_created=np.datetime64("1970-01-01") diff --git a/requirements-dev.txt b/requirements-dev.txt index fe3bd8841..80fbb3f60 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -9,6 +9,7 @@ flake8-mutable flake8-print isort mypy +moto numpydoc pre-commit pylint diff --git a/requirements.txt b/requirements.txt index eb5ca32c8..2cb7c15cd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,5 +14,4 @@ requests aiohttp xarray-datatree==0.0.6 psutil>=5.9.1 -more-itertools==8.13.0 geopy