From 51c8df43cfea3d866968cf0430880a9f27a9138a Mon Sep 17 00:00:00 2001 From: ghiggi Date: Fri, 20 Dec 2024 17:51:13 +0100 Subject: [PATCH] Robustify L0C --- .../CAMPAIGN_NAME/metadata/station_name_1.yml | 2 +- .../CAMPAIGN_NAME/metadata/station_name_2.yml | 2 +- disdrodb/api/info.py | 154 ++++- disdrodb/api/io.py | 5 +- disdrodb/api/path.py | 81 ++- disdrodb/l0/l0_processing.py | 143 +++-- disdrodb/l0/l0a_processing.py | 2 +- disdrodb/l0/l0b_nc_processing.py | 3 +- disdrodb/l0/l0c_processing.py | 572 +++++++++++++++--- disdrodb/l0/readers/BRAZIL/CHUVA_LPM.py | 2 + disdrodb/l0/readers/NETHERLANDS/DELFT.py | 2 - disdrodb/l1/beard_model.py | 15 +- disdrodb/l1/fall_velocity.py | 2 +- disdrodb/l1/processing.py | 21 +- disdrodb/l1/resampling.py | 1 + disdrodb/l1/routines.py | 38 +- disdrodb/l1_env/routines.py | 2 +- disdrodb/l2/empirical_dsd.py | 147 ++--- disdrodb/l2/event.py | 353 ++++++++--- disdrodb/l2/processing.py | 48 +- disdrodb/l2/processing_options.py | 60 +- disdrodb/l2/routines.py | 236 ++++++-- disdrodb/psd/fitting.py | 198 +++--- disdrodb/psd/models.py | 57 +- disdrodb/routines.py | 2 +- .../Raw/EPFL/PARSIVEL_2007/metadata/10.yml | 2 +- ...GORM.s20170210000000.e20170210000400.V0.nc | Bin 87773 -> 88476 bytes .../Raw/UK/DIVEN/metadata/CAIRNGORM.yml | 2 +- .../CAMPAIGN_NAME/metadata/STATION_NAME.yml | 2 +- .../CAMPAIGN_NAME/metadata/STATION_NAME.yml | 2 +- .../CAMPAIGN_NAME/metadata/STATIONID.yml | 2 +- .../CAMPAIGN_NAME/metadata/STATION_NAME.yml | 2 +- .../metadata/123.yml | 2 +- disdrodb/tests/test_api/test_api_path.py | 14 +- disdrodb/tests/test_l0/test_l0a_processing.py | 3 +- disdrodb/utils/logger.py | 15 +- disdrodb/utils/time.py | 217 ++++++- docs/source/metadata_csv/Sensor_Info.csv | 2 +- 38 files changed, 1740 insertions(+), 673 deletions(-) diff --git a/data/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/station_name_1.yml b/data/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/station_name_1.yml index 58d5240e..18bf3020 100644 --- a/data/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/station_name_1.yml +++ b/data/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/station_name_1.yml @@ -36,7 +36,7 @@ firmware_version: "" sensor_beam_length: "" sensor_beam_width: "" sensor_nominal_width: "" -measurement_interval: "" +measurement_interval: 30 calibration_sensitivity: "" calibration_certification_date: "" calibration_certification_url: "" diff --git a/data/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/station_name_2.yml b/data/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/station_name_2.yml index 12e2198d..424ab818 100644 --- a/data/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/station_name_2.yml +++ b/data/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/station_name_2.yml @@ -36,7 +36,7 @@ firmware_version: "" sensor_beam_length: "" sensor_beam_width: "" sensor_nominal_width: "" -measurement_interval: "" +measurement_interval: 30 calibration_sensitivity: "" calibration_certification_date: "" calibration_certification_url: "" diff --git a/disdrodb/api/info.py b/disdrodb/api/info.py index 17ae25e0..62763538 100644 --- a/disdrodb/api/info.py +++ b/disdrodb/api/info.py @@ -19,11 +19,14 @@ """Retrieve file information from DISDRODB products file names and filepaths.""" import os +from collections import defaultdict from pathlib import Path import numpy as np from trollsift import Parser +from disdrodb.utils.time import acronym_to_seconds + ####--------------------------------------------------------------------------- ######################## #### FNAME PATTERNS #### @@ -32,7 +35,7 @@ "{product:s}.{campaign_name:s}.{station_name:s}.s{start_time:%Y%m%d%H%M%S}.e{end_time:%Y%m%d%H%M%S}" ".{version:s}.{data_format:s}" ) -DISDRODB_FNAME_L2E_PATTERN = ( +DISDRODB_FNAME_L2E_PATTERN = ( # also L0C and L1 --> accumulation_acronym = sample_interval "{product:s}.{accumulation_acronym}.{campaign_name:s}.{station_name:s}.s{start_time:%Y%m%d%H%M%S}.e{end_time:%Y%m%d%H%M%S}" ".{version:s}.{data_format:s}" ) @@ -50,10 +53,10 @@ def _parse_filename(filename): """Parse the filename with trollsift.""" - if filename.startswith("L0") or filename.startswith("L1"): + if filename.startswith("L0A") or filename.startswith("L0B"): p = Parser(DISDRODB_FNAME_L0_PATTERN) info_dict = p.parse(filename) - elif filename.startswith("L2E"): + elif filename.startswith("L2E") or filename.startswith("L1") or filename.startswith("L0C"): p = Parser(DISDRODB_FNAME_L2E_PATTERN) info_dict = p.parse(filename) elif filename.startswith("L2M"): @@ -71,6 +74,11 @@ def _get_info_from_filename(filename): info_dict = _parse_filename(filename) except ValueError: raise ValueError(f"{filename} can not be parsed. Report the issue.") + + # Add additional information to info dictionary + if "accumulation_acronym" in info_dict: + info_dict["sample_interval"] = acronym_to_seconds(info_dict["accumulation_acronym"]) + # Return info dictionary return info_dict @@ -152,6 +160,13 @@ def get_start_end_time_from_filepaths(filepaths): return np.array(list_start_time).astype("M8[s]"), np.array(list_end_time).astype("M8[s]") +def get_sample_interval_from_filepaths(filepaths): + """Return the sample interval of the specified files.""" + list_accumulation_acronym = get_key_from_filepaths(filepaths, key="accumulation_acronym") + list_sample_interval = [acronym_to_seconds(s) for s in list_accumulation_acronym] + return list_sample_interval + + ####--------------------------------------------------------------------------. ################################### #### DISDRODB Tree Components #### @@ -316,3 +331,136 @@ def infer_data_source_from_path(path: str) -> str: ####--------------------------------------------------------------------------. +####################### +#### Group utility #### +####################### + + +FILE_KEYS = [ + "product", + "subproduct", + "campaign_name", + "station_name", + "start_time", + "end_time", + "data_format", + "accumulation_acronym", + "sample_interval", +] + + +TIME_KEYS = [ + "year", + "month", + "month_name", + "quarter", + "season", + "day", + "doy", + "dow", + "hour", + "minute", + "second", +] + + +def check_groups(groups): + """Check groups validity.""" + if not isinstance(groups, (str, list)): + raise TypeError("'groups' must be a list (or a string if a single group is specified.") + if isinstance(groups, str): + groups = [groups] + groups = np.array(groups) + valid_keys = FILE_KEYS + TIME_KEYS + invalid_keys = groups[np.isin(groups, valid_keys, invert=True)] + if len(invalid_keys) > 0: + raise ValueError(f"The following group keys are invalid: {invalid_keys}. Valid values are {valid_keys}.") + return groups.tolist() + + +def get_season(time): + """Get season from `datetime.datetime` or `datetime.date` object.""" + month = time.month + if month in [12, 1, 2]: + return "DJF" # Winter (December, January, February) + if month in [3, 4, 5]: + return "MAM" # Spring (March, April, May) + if month in [6, 7, 8]: + return "JJA" # Summer (June, July, August) + return "SON" # Autumn (September, October, November) + + +def get_time_component(time, component): + """Get time component from `datetime.datetime` object.""" + func_dict = { + "year": lambda time: time.year, + "month": lambda time: time.month, + "day": lambda time: time.day, + "doy": lambda time: time.timetuple().tm_yday, # Day of year + "dow": lambda time: time.weekday(), # Day of week (0=Monday, 6=Sunday) + "hour": lambda time: time.hour, + "minute": lambda time: time.minute, + "second": lambda time: time.second, + # Additional + "month_name": lambda time: time.strftime("%B"), # Full month name + "quarter": lambda time: (time.month - 1) // 3 + 1, # Quarter (1-4) + "season": lambda time: get_season(time), # Season (DJF, MAM, JJA, SON) + } + return str(func_dict[component](time)) + + +def _get_groups_value(groups, filepath): + """Return the value associated to the groups keys. + + If multiple keys are specified, the value returned is a string of format: ``//...`` + + If a single key is specified and is ``start_time`` or ``end_time``, the function + returns a :py:class:`datetime.datetime` object. + """ + single_key = len(groups) == 1 + info_dict = get_info_from_filepath(filepath) + start_time = info_dict["start_time"] + list_key_values = [] + for key in groups: + if key in TIME_KEYS: + list_key_values.append(get_time_component(start_time, component=key)) + else: + value = info_dict.get(key, f"{key}=None") + list_key_values.append(value if single_key else str(value)) + if single_key: + return list_key_values[0] + return "/".join(list_key_values) + + +def group_filepaths(filepaths, groups=None): + """ + Group filepaths in a dictionary if groups are specified. + + Parameters + ---------- + filepaths : list + List of filepaths. + groups: list or str + The group keys by which to group the filepaths. + Valid group keys are ``product``, ``subproduct``, ``campaign_name``, ``station_name``, + ``start_time``, ``end_time``,``accumulation_acronym``,``sample_interval``, + ``data_format``, + ``year``, ``month``, ``day``, ``doy``, ``dow``, ``hour``, ``minute``, ``second``, + ``month_name``, ``quarter``, ``season``. + The time components are extracted from ``start_time`` ! + If groups is ``None`` returns the input filepaths list. + The default is ``None``. + + Returns + ------- + dict or list + Either a dictionary of format ``{: }``. + or the original input filepaths (if ``groups=None``) + + """ + if groups is None: + return filepaths + groups = check_groups(groups) + filepaths_dict = defaultdict(list) + _ = [filepaths_dict[_get_groups_value(groups, filepath)].append(filepath) for filepath in filepaths] + return dict(filepaths_dict) diff --git a/disdrodb/api/io.py b/disdrodb/api/io.py index d6d225b8..f19d791a 100644 --- a/disdrodb/api/io.py +++ b/disdrodb/api/io.py @@ -157,6 +157,7 @@ def _get_list_stations_with_data(product, campaign_dir): # Get stations directory list_stations_dir = _get_list_stations_dirs(product=product, campaign_dir=campaign_dir) # Count number of files within directory + # - TODO: here just check for one file ! list_nfiles_per_station = [count_files(station_dir, "*", recursive=True) for station_dir in list_stations_dir] # Keep only stations with at least one file stations_names = [os.path.basename(path) for n, path in zip(list_nfiles_per_station, list_stations_dir) if n >= 1] @@ -181,7 +182,6 @@ def _get_campaign_stations(base_dir, product, data_source, campaign_name): data_source=data_source, campaign_name=campaign_name, ) - # Get list of stations with data and metadata list_stations_data = _get_list_stations_with_data(product=product, campaign_dir=campaign_dir) list_stations_metadata = _get_list_stations_with_metadata(campaign_dir) @@ -387,7 +387,7 @@ def available_stations( raise_error_if_empty=False, base_dir=None, ): - """Return stations for which data are available on disk. + """Return stations for which data and metadata are available on disk. Raise an error if no stations are available. """ @@ -410,6 +410,7 @@ def available_stations( # If data_source is None, retrieve all stations if data_sources is None: list_info = _get_stations(base_dir=base_dir, product=product) + ###-----------------------------------------------. ### Filter by data_sources else: diff --git a/disdrodb/api/path.py b/disdrodb/api/path.py index cb61cddf..c4cb6ff1 100644 --- a/disdrodb/api/path.py +++ b/disdrodb/api/path.py @@ -17,7 +17,6 @@ # along with this program. If not, see . # -----------------------------------------------------------------------------. """Define paths within the DISDRODB infrastructure.""" - import os from typing import Optional @@ -25,6 +24,7 @@ from disdrodb.configs import get_base_dir from disdrodb.utils.directories import check_directory_exists +from disdrodb.utils.time import ensure_sample_interval_in_seconds, seconds_to_acronym ####--------------------------------------------------------------------------. #### Paths from BASE_DIR @@ -201,11 +201,11 @@ def define_issue_dir( def define_metadata_filepath( - product, data_source, campaign_name, station_name, base_dir=None, + product="RAW", check_exists=False, ): """Return the station metadata filepath in the DISDRODB infrastructure. @@ -357,13 +357,13 @@ def define_product_dir_tree( if product == "L2E": check_rolling(rolling) check_sample_interval(sample_interval) - sample_interval_acronym = get_sample_interval_acronym(seconds=sample_interval, rolling=rolling) + sample_interval_acronym = define_accumulation_acronym(seconds=sample_interval, rolling=rolling) return os.path.join(product, sample_interval_acronym) if product == "L2M": check_rolling(rolling) check_sample_interval(sample_interval) check_distribution(distribution) - sample_interval_acronym = get_sample_interval_acronym(seconds=sample_interval, rolling=rolling) + sample_interval_acronym = define_accumulation_acronym(seconds=sample_interval, rolling=rolling) distribution_acronym = get_distribution_acronym(distribution) return os.path.join(product, distribution_acronym, sample_interval_acronym) raise ValueError(f"The product {product} is not defined.") @@ -587,13 +587,13 @@ def define_data_dir( elif product == "L2E": check_rolling(rolling) check_sample_interval(sample_interval) - sample_interval_acronym = get_sample_interval_acronym(seconds=sample_interval, rolling=rolling) + sample_interval_acronym = define_accumulation_acronym(seconds=sample_interval, rolling=rolling) data_dir = os.path.join(station_dir, sample_interval_acronym) elif product == "L2M": check_rolling(rolling) check_sample_interval(sample_interval) check_distribution(distribution) - sample_interval_acronym = get_sample_interval_acronym(seconds=sample_interval, rolling=rolling) + sample_interval_acronym = define_accumulation_acronym(seconds=sample_interval, rolling=rolling) distribution_acronym = get_distribution_acronym(distribution) data_dir = os.path.join(station_dir, distribution_acronym, sample_interval_acronym) else: @@ -678,35 +678,19 @@ def get_distribution_acronym(distribution): return acronym_dict[distribution] -def get_sample_interval_acronym(seconds, rolling=False): - """ - Convert a duration in seconds to a readable string format (e.g., "1H30", "1D2H"). - - Parameters - ---------- - - seconds (int): The time duration in seconds. +def define_accumulation_acronym(seconds, rolling): + """Define the accumulation acronnym. - Returns - ------- - - str: The duration as a string in a format like "30S", "1MIN30S", "1H30MIN", or "1D2H". + Prefix the accumulation interval acronym with ROLL if rolling=True. """ - timedelta = pd.Timedelta(seconds=seconds) - components = timedelta.components - - parts = [] - if components.days > 0: - parts.append(f"{components.days}D") - if components.hours > 0: - parts.append(f"{components.hours}H") - if components.minutes > 0: - parts.append(f"{components.minutes}MIN") - if components.seconds > 0: - parts.append(f"{components.seconds}S") - sample_interval_acronym = "".join(parts) - # Prefix with ROLL if rolling=True + accumulation_acronym = seconds_to_acronym(seconds) if rolling: - sample_interval_acronym = f"ROLL{sample_interval_acronym}" - return sample_interval_acronym + accumulation_acronym = f"ROLL{accumulation_acronym}" + return accumulation_acronym + + +####--------------------------------------------------------------------------. +#### Filenames for DISDRODB products def define_filename( @@ -757,13 +741,16 @@ def define_filename( from disdrodb.utils.pandas import get_dataframe_start_end_time from disdrodb.utils.xarray import get_dataset_start_end_time + # -----------------------------------------. + # TODO: Define sample_interval_acronym + # - ADD sample_interval_acronym also to L0A and L0B + # - Add sample_interval_acronym also to L0C and L1 + # -----------------------------------------. # Define product acronym product_acronym = f"{product}" if product in ["L2E", "L2M"]: - sample_interval_acronym = get_sample_interval_acronym(seconds=sample_interval) - if rolling: - sample_interval_acronym = f"ROLL{sample_interval_acronym}" + sample_interval_acronym = define_accumulation_acronym(seconds=sample_interval, rolling=rolling) product_acronym = f"L2E.{sample_interval_acronym}" if product in ["L2M"]: distribution_acronym = get_distribution_acronym(distribution) @@ -882,11 +869,16 @@ def define_l0c_filename(ds, campaign_name: str, station_name: str) -> str: from disdrodb import PRODUCT_VERSION from disdrodb.utils.xarray import get_dataset_start_end_time + # TODO: add sample_interval as argument + sample_interval = int(ensure_sample_interval_in_seconds(ds["sample_interval"]).data.item()) + sample_interval_acronym = define_accumulation_acronym(sample_interval, rolling=False) starting_time, ending_time = get_dataset_start_end_time(ds) starting_time = pd.to_datetime(starting_time).strftime("%Y%m%d%H%M%S") ending_time = pd.to_datetime(ending_time).strftime("%Y%m%d%H%M%S") version = PRODUCT_VERSION - filename = f"L0C.{campaign_name}.{station_name}.s{starting_time}.e{ending_time}.{version}.nc" + filename = ( + f"L0C.{sample_interval_acronym}.{campaign_name}.{station_name}.s{starting_time}.e{ending_time}.{version}.nc" + ) return filename @@ -905,16 +897,21 @@ def define_l1_filename(ds, campaign_name, station_name: str) -> str: Returns ------- str - L0B file name. + L1 file name. """ from disdrodb import PRODUCT_VERSION from disdrodb.utils.xarray import get_dataset_start_end_time + # TODO: add sample_interval as argument + sample_interval = int(ensure_sample_interval_in_seconds(ds["sample_interval"]).data.item()) + sample_interval_acronym = define_accumulation_acronym(sample_interval, rolling=False) starting_time, ending_time = get_dataset_start_end_time(ds) starting_time = pd.to_datetime(starting_time).strftime("%Y%m%d%H%M%S") ending_time = pd.to_datetime(ending_time).strftime("%Y%m%d%H%M%S") version = PRODUCT_VERSION - filename = f"L1.{campaign_name}.{station_name}.s{starting_time}.e{ending_time}.{version}.nc" + filename = ( + f"L1.{sample_interval_acronym}.{campaign_name}.{station_name}.s{starting_time}.e{ending_time}.{version}.nc" + ) return filename @@ -938,9 +935,7 @@ def define_l2e_filename(ds, campaign_name: str, station_name: str, sample_interv from disdrodb import PRODUCT_VERSION from disdrodb.utils.xarray import get_dataset_start_end_time - sample_interval_acronym = get_sample_interval_acronym(seconds=sample_interval) - if rolling: - sample_interval_acronym = f"ROLL{sample_interval_acronym}" + sample_interval_acronym = define_accumulation_acronym(seconds=sample_interval, rolling=rolling) starting_time, ending_time = get_dataset_start_end_time(ds) starting_time = pd.to_datetime(starting_time).strftime("%Y%m%d%H%M%S") ending_time = pd.to_datetime(ending_time).strftime("%Y%m%d%H%M%S") @@ -979,9 +974,7 @@ def define_l2m_filename( from disdrodb.utils.xarray import get_dataset_start_end_time distribution_acronym = get_distribution_acronym(distribution) - sample_interval_acronym = get_sample_interval_acronym(seconds=sample_interval) - if rolling: - sample_interval_acronym = f"ROLL{sample_interval_acronym}" + sample_interval_acronym = define_accumulation_acronym(seconds=sample_interval, rolling=rolling) starting_time, ending_time = get_dataset_start_end_time(ds) starting_time = pd.to_datetime(starting_time).strftime("%Y%m%d%H%M%S") ending_time = pd.to_datetime(ending_time).strftime("%Y%m%d%H%M%S") diff --git a/disdrodb/l0/l0_processing.py b/disdrodb/l0/l0_processing.py index 55f533a6..408d3e15 100644 --- a/disdrodb/l0/l0_processing.py +++ b/disdrodb/l0/l0_processing.py @@ -25,7 +25,6 @@ from typing import Optional import dask -import xarray as xr from disdrodb.api.checks import check_sensor_name @@ -42,6 +41,7 @@ define_l0a_filename, define_l0b_filename, define_l0c_filename, + define_metadata_filepath, ) # get_disdrodb_path, @@ -65,6 +65,7 @@ from disdrodb.l0.l0c_processing import ( create_daily_file, get_files_per_days, + retrieve_possible_measurement_intervals, ) from disdrodb.metadata import read_station_metadata from disdrodb.utils.decorator import delayed_if_parallel, single_threaded_if_parallel @@ -80,6 +81,7 @@ # log_warning, from disdrodb.utils.writer import write_product +from disdrodb.utils.yaml import read_yaml logger = logging.getLogger(__name__) @@ -255,6 +257,8 @@ def _generate_l0b_from_nc( verbose, parallel, ): + import xarray as xr # Load in each process + # -----------------------------------------------------------------. # Define product name product = "L0B" @@ -327,6 +331,7 @@ def _generate_l0c( filepaths, data_dir, logs_dir, + metadata_filepath, campaign_name, station_name, # Processing options @@ -354,31 +359,39 @@ def _generate_l0c( ##------------------------------------------------------------------------. ### Core computation try: - # If already single file per day, copy L0B to L0C - # --> File start_time and end_time should also be within the day ! - # if len(filepaths) == 1: - # files_start_time, files_end_time = get_start_end_time_from_filepaths(filepaths) - # files_start_time.astype("M8[D]"), files_end_time.astype("M8[D]") - # copy_l0b_to_l0c_directory(filepaths[0]) - - # Produce L0C dataset - ds = create_daily_file(day=day, filepaths=filepaths, verbose=verbose) + # Retrieve measurement_intervals + # - TODO: in future available from dataset + metadata = read_yaml(metadata_filepath) + measurement_intervals = retrieve_possible_measurement_intervals(metadata) - # Write L0C netCDF4 dataset - if ds["time"].size > 1: + # Produce L0C datasets + dict_ds = create_daily_file( + day=day, + filepaths=filepaths, + measurement_intervals=measurement_intervals, + ensure_variables_equality=True, + logger=logger, + verbose=verbose, + ) - # Get sensor name from dataset - sensor_name = ds.attrs.get("sensor_name") + # Write a dataset for each sample interval + for ds in dict_ds.values(): # (sample_interval, ds) + # Write L0C netCDF4 dataset + if ds["time"].size > 1: + # Get sensor name from dataset + sensor_name = ds.attrs.get("sensor_name") + campaign_name = ds.attrs.get("campaign_name") + station_name = ds.attrs.get("station_name") - # Set encodings - ds = set_l0b_encodings(ds=ds, sensor_name=sensor_name) + # Set encodings + ds = set_l0b_encodings(ds=ds, sensor_name=sensor_name) - # Define filepath - filename = define_l0c_filename(ds, campaign_name=campaign_name, station_name=station_name) - filepath = os.path.join(data_dir, filename) + # Define filepath + filename = define_l0c_filename(ds, campaign_name=campaign_name, station_name=station_name) + filepath = os.path.join(data_dir, filename) - # Write to disk - write_product(ds, product=product, filepath=filepath, force=force) + # Write to disk + write_product(ds, product=product, filepath=filepath, force=force) # Clean environment del ds @@ -584,7 +597,7 @@ def run_l0a( # ---------------------------------------------------------------------. # End L0A processing if verbose: - timedelta_str = str(datetime.timedelta(seconds=time.time() - t_i)) + timedelta_str = str(datetime.timedelta(seconds=round(time.time() - t_i))) msg = f"L0A processing of station {station_name} completed in {timedelta_str}" log_info(logger=logger, msg=msg, verbose=verbose) @@ -804,7 +817,7 @@ def run_l0b_from_nc( # ---------------------------------------------------------------------. # End L0B processing if verbose: - timedelta_str = str(datetime.timedelta(seconds=time.time() - t_i)) + timedelta_str = str(datetime.timedelta(seconds=round(time.time() - t_i))) msg = f"L0B processing of station {station_name} completed in {timedelta_str}" log_info(logger=logger, msg=msg, verbose=verbose) @@ -998,14 +1011,29 @@ def run_l0b_station( ##----------------------------------------------------------------. # Get L0A files for the station required_product = get_required_product(product) - filepaths = get_filepaths( - base_dir=base_dir, - data_source=data_source, - campaign_name=campaign_name, - station_name=station_name, - product=required_product, - debugging_mode=debugging_mode, - ) + flag_not_available_data = False + try: + filepaths = get_filepaths( + base_dir=base_dir, + data_source=data_source, + campaign_name=campaign_name, + station_name=station_name, + product=required_product, + debugging_mode=debugging_mode, + ) + except Exception as e: + print(str(e)) # Case where no file paths available + flag_not_available_data = True + + # -------------------------------------------------------------------------. + # If no data available, print error message and return None + if flag_not_available_data: + msg = ( + f"{product} processing of {data_source} {campaign_name} {station_name}" + + f"has not been launched because of missing {required_product} data." + ) + print(msg) + return ##----------------------------------------------------------------. # Generate L0B files @@ -1078,7 +1106,7 @@ def run_l0b_station( # -----------------------------------------------------------------. # End L0B processing if verbose: - timedelta_str = str(datetime.timedelta(seconds=time.time() - t_i)) + timedelta_str = str(datetime.timedelta(seconds=round(time.time() - t_i))) msg = f"{product} processing of station_name {station_name} completed in {timedelta_str}" log_info(logger=logger, msg=msg, verbose=verbose) @@ -1113,7 +1141,15 @@ def run_l0c_station( """ Run the L0C processing of a specific DISDRODB station when invoked from the terminal. - This routine merge files together to generate daily files. + The DISDRODB L0A and L0B routines just convert source raw data into netCDF format. + The DISDRODB L0C routine ingests L0B files and performs data homogenization. + The DISDRODB L0C routine takes care of: + + - removing duplicated timesteps across files, + - merging/splitting files into daily files, + - regularizing timesteps for potentially trailing seconds, + - ensuring L0C files with unique sample intervals. + Duplicated timesteps are automatically dropped if their variable values coincides, otherwise an error is raised. @@ -1184,19 +1220,43 @@ def run_l0c_station( force=force, ) - # -------------------------------------------------------------------------. - # List files to process - required_product = get_required_product(product) - filepaths = get_filepaths( + # ------------------------------------------------------------------------. + # Define metadata filepath + metadata_filepath = define_metadata_filepath( base_dir=base_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name, - product=required_product, - # Processing options - debugging_mode=debugging_mode, ) + # -------------------------------------------------------------------------. + # List files to process + required_product = get_required_product(product) + flag_not_available_data = False + try: + filepaths = get_filepaths( + base_dir=base_dir, + data_source=data_source, + campaign_name=campaign_name, + station_name=station_name, + product=required_product, + # Processing options + debugging_mode=debugging_mode, + ) + except Exception as e: + print(str(e)) # Case where no file paths available + flag_not_available_data = True + + # -------------------------------------------------------------------------. + # If no data available, print error message and return None + if flag_not_available_data: + msg = ( + f"{product} processing of {data_source} {campaign_name} {station_name}" + + f"has not been launched because of missing {required_product} data." + ) + print(msg) + return + # -------------------------------------------------------------------------. # Retrieve dictionary with the required files for each day. dict_days_files = get_files_per_days(filepaths) @@ -1211,6 +1271,7 @@ def run_l0c_station( filepaths=filepaths, data_dir=data_dir, logs_dir=logs_dir, + metadata_filepath=metadata_filepath, campaign_name=campaign_name, station_name=station_name, # Processing options @@ -1237,7 +1298,7 @@ def run_l0c_station( # ---------------------------------------------------------------------. # End processing if verbose: - timedelta_str = str(datetime.timedelta(seconds=time.time() - t_i)) + timedelta_str = str(datetime.timedelta(seconds=round(time.time() - t_i))) msg = f"{product} processing of station {station_name} completed in {timedelta_str}" log_info(logger=logger, msg=msg, verbose=verbose) diff --git a/disdrodb/l0/l0a_processing.py b/disdrodb/l0/l0a_processing.py index 27439a9b..89c35b83 100644 --- a/disdrodb/l0/l0a_processing.py +++ b/disdrodb/l0/l0a_processing.py @@ -448,7 +448,7 @@ def remove_corrupted_rows(df): raise ValueError("No remaining rows after data corruption checks.") # If only one row available, raise also error if len(df) == 1: - raise ValueError("Only 1 row remains after data corruption checks. Check the file.") + raise ValueError("Only 1 row remains after data corruption checks. Check the raw file and maybe delete it.") # Return the dataframe return df diff --git a/disdrodb/l0/l0b_nc_processing.py b/disdrodb/l0/l0b_nc_processing.py index 97a4b650..734d4d0c 100644 --- a/disdrodb/l0/l0b_nc_processing.py +++ b/disdrodb/l0/l0b_nc_processing.py @@ -21,7 +21,6 @@ import logging import numpy as np -import xarray as xr from disdrodb.l0.l0b_processing import finalize_dataset from disdrodb.l0.standards import ( @@ -114,6 +113,8 @@ def subset_dataset(ds, dict_names, sensor_name): def add_dataset_missing_variables(ds, missing_vars, sensor_name): """Add missing xr.Dataset variables as ``np.nan`` xr.DataArrays.""" + import xarray as xr + from disdrodb.l0.standards import get_variables_dimension # Get dimension of each variables diff --git a/disdrodb/l0/l0c_processing.py b/disdrodb/l0/l0c_processing.py index 5bd8689a..c84253bf 100644 --- a/disdrodb/l0/l0c_processing.py +++ b/disdrodb/l0/l0c_processing.py @@ -17,21 +17,25 @@ # along with this program. If not, see . # -----------------------------------------------------------------------------. """Functions to process DISDRODB L0B files into DISDRODB L0C netCDF files.""" -import itertools import logging -import shutil import numpy as np import pandas as pd -import xarray as xr from disdrodb.api.info import get_start_end_time_from_filepaths from disdrodb.l1.resampling import add_sample_interval -from disdrodb.utils.time import infer_sample_interval, regularize_timesteps +from disdrodb.utils.logger import log_warning # , log_info +from disdrodb.utils.time import ( + ensure_sorted_by_time, + regularize_timesteps, +) logger = logging.getLogger(__name__) +TOLERANCE_SECONDS = 120 + + def get_files_per_days(filepaths): """ Organize files by the days they cover based on their start and end times. @@ -55,9 +59,9 @@ def get_files_per_days(filepaths): files_start_time, files_end_time = get_start_end_time_from_filepaths(filepaths) # Add tolerance to account for imprecise time logging by the sensors - # - Example: timestep 23:58 that should be 00.00 goes into the next day ... - files_start_time = files_start_time - np.array(60, dtype="m8[s]") - files_end_time = files_end_time + np.array(60, dtype="m8[s]") + # - Example: timestep 23:59:30 might be 00.00 and goes into the next day file ... + files_start_time = files_start_time - np.array(TOLERANCE_SECONDS, dtype="m8[s]") + files_end_time = files_end_time + np.array(TOLERANCE_SECONDS, dtype="m8[s]") # Retrieve file start day and end day start_day = files_start_time.min().astype("M8[D]") @@ -89,74 +93,331 @@ def get_files_per_days(filepaths): return dict_days -def find_isel_common_time(da1, da2): +def retrieve_possible_measurement_intervals(metadata): + """Retrieve list of possible measurements intervals.""" + measurement_interval = metadata.get("measurement_interval", []) + if isinstance(measurement_interval, (int, float, str)): + measurement_interval = [measurement_interval] + measurement_intervals = [int(v) for v in measurement_interval] + return measurement_intervals + + +def drop_timesteps_with_invalid_sample_interval(ds, measurement_intervals, verbose=True, logger=None): + """Drop timesteps with unexpected sample intervals.""" + # TODO + # - correct logged sample_interval for trailing seconds. Example (58,59,61,62) converted to 60 s ? + # - Need to know more how Parsivel software computes sample_interval variable ... + + # Retrieve logged sample_interval + sample_interval = ds["sample_interval"].compute().data + timesteps = ds["time"].compute().data + is_valid_sample_interval = np.isin(sample_interval.data, measurement_intervals) + indices_invalid_sample_interval = np.where(~is_valid_sample_interval)[0] + if len(indices_invalid_sample_interval) > 0: + # Log information for each invalid timestep + invalid_timesteps = pd.to_datetime(timesteps[indices_invalid_sample_interval]).strftime("%Y-%m-%d %H:%M:%S") + invalid_sample_intervals = sample_interval[indices_invalid_sample_interval] + for tt, ss in zip(invalid_timesteps, invalid_sample_intervals): + msg = f"Unexpected sampling interval ({ss} s) at {tt}. The measurement has been dropped." + log_warning(logger=logger, msg=msg, verbose=verbose) + # Remove timesteps with invalid sample intervals + indices_valid_sample_interval = np.where(is_valid_sample_interval)[0] + ds = ds.isel(time=indices_valid_sample_interval) + return ds + + +def split_dataset_by_sampling_intervals(ds, measurement_intervals, min_sample_interval=10, min_block_size=5): """ - Find the indices of common time steps between two data arrays. + Split a dataset into subsets where each subset has a consistent sampling interval. Parameters ---------- - da1 : xarray.DataArray - The first data array with a time coordinate. - da2 : xarray.DataArray - The second data array with a time coordinate. + ds : xarray.Dataset + The input dataset with a 'time' dimension. + measurement_intervals : list or array-like + A list of possible primary sampling intervals (in seconds) that the dataset might have. + min_sample_interval : int, optional + The minimum expected sampling interval in seconds. Defaults to 10s. + min_block_size : float, optional + The minimum number of timesteps with a given sampling interval to be considered. + Otherwise such portion of data is discarded ! + Defaults to 5 timesteps. Returns ------- - da1_isel : numpy.ndarray - Indices of the common time steps in the first data array. - da2_isel : numpy.ndarray - Indices of the common time steps in the second data array. - - Notes - ----- - This function assumes that both input data arrays have a "time" coordinate. - The function finds the intersection of the time steps in both data arrays - and returns the indices of these common time steps for each data array. + dict + A dictionary where keys are the identified sampling intervals (in seconds), + and values are xarray.Datasets containing only data from those intervals. """ - intersecting_timesteps = np.intersect1d(da1["time"], da2["time"]) - da1_isel = np.where(np.isin(da1["time"], intersecting_timesteps))[0] - da2_isel = np.where(np.isin(da2["time"], intersecting_timesteps))[0] - return da1_isel, da2_isel + # Define array of possible measurement intervals + measurement_intervals = np.array(measurement_intervals) + + # If a single measurement interval expected, return dictionary with input dataset + if len(measurement_intervals) == 1: + dict_ds = {measurement_intervals[0]: ds} + return dict_ds + + # Check sorted by time and sort if necessary + ds = ensure_sorted_by_time(ds) + + # Calculate time differences in seconds + deltadt = np.diff(ds["time"].data).astype("timedelta64[s]").astype(int) + + # Round each delta to the nearest multiple of 5 (because the smallest possible sample interval is 10 s) + # - This account for possible trailing seconds of the logger + # Example: for sample_interval = 10, deltat values like 8, 9, 11, 12 become 10 ... + # Example: for sample_interval = 10, deltat values like 6, 7 or 13, 14 become respectively 5 and 15 ... + # Example: for sample_interval = 30, deltat values like 28,29,30,31,32 deltat become 30 ... + # Example: for sample_interval = 30, deltat values like 26, 27 or 33, 34 become respectively 25 and 35 ... + min_half_sample_interval = min_sample_interval / 2 + deltadt = np.round(deltadt / min_half_sample_interval) * min_half_sample_interval + + # Map each delta to one of the possible_measurement_intervals if exact match, otherwise np.nan + mapped_intervals = np.where(np.isin(deltadt, measurement_intervals), deltadt, np.nan) + if np.all(np.isnan(mapped_intervals)): + raise ValueError("Impossible to identify timesteps with expected sampling intervals.") + + # Infill np.nan values by using neighbor intervals + # Forward fill + for i in range(1, len(mapped_intervals)): + if np.isnan(mapped_intervals[i]): + mapped_intervals[i] = mapped_intervals[i - 1] + + # Backward fill (in case the first entries were np.nan) + for i in range(len(mapped_intervals) - 2, -1, -1): + if np.isnan(mapped_intervals[i]): + mapped_intervals[i] = mapped_intervals[i + 1] + + # Now all intervals are assigned to one of the possible measurement_intervals. + # Identify boundaries where interval changes + change_points = np.where(mapped_intervals[:-1] != mapped_intervals[1:])[0] + 1 + + # Split ds into segments according to change_points + segments = np.split(np.arange(ds.sizes["time"]), change_points) + + # Remove segments with less than 10 points + segments = [seg for seg in segments if len(seg) >= min_block_size] + if len(segments) == 0: + raise ValueError( + f"No blocks of {min_block_size} consecutive timesteps with constant sampling interval are available.", + ) + + # Define dataset indices for each sampling interva + dict_sampling_interval_indices = {} + for seg in segments: + # Define the assumed sampling interval of such segment + start_idx = seg[0] + segment_sampling_interval = int(mapped_intervals[start_idx]) + if segment_sampling_interval not in dict_sampling_interval_indices: + dict_sampling_interval_indices[segment_sampling_interval] = [seg] + else: + dict_sampling_interval_indices[segment_sampling_interval].append(seg) + dict_sampling_interval_indices = { + k: np.concatenate(list_indices) for k, list_indices in dict_sampling_interval_indices.items() + } + + # Define dictionary of datasets + dict_ds = {k: ds.isel(time=indices) for k, indices in dict_sampling_interval_indices.items()} + return dict_ds + + +def has_same_value_over_time(da): + """ + Check if a DataArray has the same value over all timesteps, considering NaNs as equal. + Parameters + ---------- + da : xarray.DataArray + The DataArray to check. Must have a 'time' dimension. -def check_same_raw_drop_number_values(list_ds, filepaths): + Returns + ------- + bool + True if the values are the same (or NaN in the same positions) across all timesteps, + False otherwise. """ - Check if the 'raw_drop_number' values are the same across multiple datasets. + # Select the first timestep + da_first = da.isel(time=0) + + # Create a boolean array that identifies where values are equal or both NaN + equal_or_nan = (da == da_first) | (da.isnull() & da_first.isnull()) # noqa: PD003 + + # Check if all values match this condition across all dimensions + return bool(equal_or_nan.all().item()) + + +def remove_duplicated_timesteps(ds, ensure_variables_equality=True, logger=None, verbose=True): + """Removes duplicated timesteps from a xarray dataset.""" + # Check for duplicated timesteps + timesteps, counts = np.unique(ds["time"].data, return_counts=True) + duplicated_timesteps = timesteps[counts > 1] + + # If no duplicated timesteps, returns dataset as is + if len(duplicated_timesteps) == 0: + return ds + + # If there are duplicated timesteps + # - First check for variables equality + # - Keep first occurrence of duplicated timesteps if values are equals + # - Drop duplicated timesteps where values are different + different_duplicated_timesteps = [] + equal_duplicated_timesteps = [] + for t in duplicated_timesteps: + # Select dataset at given duplicated timestep + ds_duplicated = ds.sel(time=t) + n_t = len(ds_duplicated["time"]) + + # Check raw_drop_number equality + if not has_same_value_over_time(ds_duplicated["raw_drop_number"]): + different_duplicated_timesteps.append(t) + msg = ( + f"Presence of {n_t} duplicated timesteps at {t}." + "They have different 'raw_drop_number' values. These timesteps are dropped." + ) + log_warning(logger=logger, msg=msg, verbose=verbose) + + # Check other variables equality + other_variables_to_check = [v for v in ds.data_vars if v != "raw_drop_number"] + variables_with_different_values = [ + var for var in other_variables_to_check if not has_same_value_over_time(ds_duplicated[var]) + ] + if len(variables_with_different_values) > 0: + msg = ( + f"Presence of {n_t} duplicated timesteps at {t}." + f"The duplicated timesteps have different values in variables {variables_with_different_values}. " + ) + if ensure_variables_equality: + different_duplicated_timesteps.append(t) + msg = msg + "These timesteps are dropped." + else: + equal_duplicated_timesteps.append(t) + msg = msg + ( + "These timesteps are not dropped because 'raw_drop_number' values are equals." + "'ensure_variables_equality' is False." + ) + log_warning(logger=logger, msg=msg, verbose=verbose) + else: + equal_duplicated_timesteps.append(t) + + # Ensure single occurrence of duplicated timesteps + equal_duplicated_timesteps = np.unique(equal_duplicated_timesteps) + different_duplicated_timesteps = np.unique(different_duplicated_timesteps) + + # - Keep first occurrence of equal_duplicated_timesteps + if len(equal_duplicated_timesteps) > 0: + indices_to_drop = [np.where(ds["time"] == t)[0][1:] for t in equal_duplicated_timesteps] + indices_to_drop = np.concatenate(indices_to_drop) + # Keep only indices not in indices_to_drop + mask = ~np.isin(np.arange(ds["time"].size), indices_to_drop) + ds = ds.isel(time=np.where(mask)[0]) + + # - Drop different_duplicated_timesteps + if len(different_duplicated_timesteps) > 0: + mask = np.isin(ds["time"], different_duplicated_timesteps, invert=True) + ds = ds.isel(time=np.where(mask)[0]) - This function compares the 'raw_drop_number' values of multiple datasets to ensure they are identical - at common timesteps. + return ds - If any discrepancies are found, a ValueError is raised indicating which files - have differing values. - Parameters - ---------- - list_ds : list of xarray.Dataset - A list of xarray Datasets to be compared. - filepaths : list of str - A list of file paths corresponding to the datasets in `list_ds`. +def check_timesteps_regularity(ds, sample_interval, verbose=False, logger=None): + """Check for the regularity of timesteps.""" + # Check sorted by time and sort if necessary + ds = ensure_sorted_by_time(ds) + + # Calculate number of timesteps + n = len(ds["time"].data) + + # Calculate time differences in seconds + deltadt = np.diff(ds["time"].data).astype("timedelta64[s]").astype(int) + + # Identify unique time intervals and their occurrences + unique_deltadt, counts = np.unique(deltadt, return_counts=True) + + # Determine the most frequent time interval (mode) + most_frequent_deltadt_idx = np.argmax(counts) + most_frequent_deltadt = unique_deltadt[most_frequent_deltadt_idx] + + # Count fraction occurrence of deltadt + fractions = np.round(counts / len(deltadt) * 100, 2) + + # Compute stats about expected deltadt + sample_interval_counts = counts[unique_deltadt == sample_interval].item() + sample_interval_fraction = fractions[unique_deltadt == sample_interval].item() + + # Compute stats about most frequent deltadt + most_frequent_deltadt_counts = counts[unique_deltadt == most_frequent_deltadt].item() + most_frequent_deltadt_fraction = fractions[unique_deltadt == most_frequent_deltadt].item() + + # Compute stats about unexpected deltadt + unexpected_intervals = unique_deltadt[unique_deltadt != sample_interval] + unexpected_intervals_counts = counts[unique_deltadt != sample_interval] + unexpected_intervals_fractions = fractions[unique_deltadt != sample_interval] + frequent_unexpected_intervals = unexpected_intervals[unexpected_intervals_fractions > 5] + + # Report warning if the samplin_interval deltadt occurs less often than 60 % of times + # -> TODO: maybe only report in stations where the disdro does not log only data when rainy + if sample_interval_fraction < 60: + msg = ( + f"The expected (sampling) interval between observations occurs only " + f"{sample_interval_counts}/{n} times ({sample_interval_fraction} %)." + ) + + # Report warning if a deltadt occurs more often then the sampling interval + if most_frequent_deltadt != sample_interval: + msg = ( + f"The most frequent time interval between observations is {most_frequent_deltadt} s " + f"(occurs {most_frequent_deltadt_counts}/{n} times) ({most_frequent_deltadt_fraction}%) " + f"although the expected (sampling) interval is {sample_interval} s " + f"and occurs {sample_interval_counts}/{n} times ({sample_interval_fraction}%)." + ) + log_warning(logger=logger, msg=msg, verbose=verbose) + + # Report with a warning all unexpected deltadt with frequency larger than 5 % + if len(frequent_unexpected_intervals) > 0: + msg_parts = ["The following unexpected intervals occur frequently:"] + for interval in frequent_unexpected_intervals: + c = unexpected_intervals_counts[unexpected_intervals == interval].item() + f = unexpected_intervals_fractions[unexpected_intervals == interval].item() + msg_parts.append(f" {interval} ({f}%) ({c}/{n}) | ") + msg = " ".join(msg_parts) + + msg = "The following time intervals between observations occurs often: " + for interval in frequent_unexpected_intervals: + c = unexpected_intervals_counts[unexpected_intervals == interval].item() + f = unexpected_intervals_fractions[unexpected_intervals == interval].item() + msg = msg + f"{interval} s ({f}%) ({c}/{n})" + log_warning(logger=logger, msg=msg, verbose=verbose) + return ds - Raises - ------ - ValueError - If 'raw_drop_number' values differ at any common timestep between any two datasets. + +def finalize_l0c_dataset(ds, sample_interval, start_day, end_day, verbose=True, logger=None): + """Finalize a L0C dataset with unique sampling interval. + + It adds the sampling_interval coordinate and it regularizes + the timesteps for trailing seconds. """ - # Retrieve variable to compare - list_drop_number = [ds["raw_drop_number"].compute() for ds in list_ds] - # Compare values - combos = list(itertools.combinations(range(len(list_drop_number)), 2)) - for i, j in combos: - da1 = list_drop_number[i] - da2 = list_drop_number[j] - da1_isel, da2_isel = find_isel_common_time(da1=da1, da2=da2) - if not np.all(da1.isel(time=da1_isel).data == da2.isel(time=da2_isel).data): - file1 = filepaths[i] - file2 = filepaths[i] - msg = f"Duplicated timesteps have different values between file {file1} and {file2}" - raise ValueError(msg) - - -def create_daily_file(day, filepaths, verbose=True): + # Add sample interval as coordinate + ds = add_sample_interval(ds, sample_interval=sample_interval) + + # Regularize timesteps (for trailing seconds) + ds = regularize_timesteps( + ds, + sample_interval=sample_interval, + robust=False, # if True, raise error if an error occur during regularization + add_quality_flag=True, + verbose=verbose, + logger=logger, + ) + + # Performs checks about timesteps regularity + ds = check_timesteps_regularity(ds=ds, sample_interval=sample_interval, verbose=verbose, logger=logger) + + # Slice for requested day + ds = ds.sel({"time": slice(start_day, end_day)}) + return ds + + +def create_daily_file(day, filepaths, measurement_intervals, ensure_variables_equality=True, logger=None, verbose=True): """ Create a daily file by merging and processing data from multiple filepaths. @@ -188,44 +449,35 @@ def create_daily_file(day, filepaths, verbose=True): - The data is loaded into memory and connections to source files are closed before returning the dataset. """ + import xarray as xr # Load in each process when function is called ! + + # ---------------------------------------------------------------------------------------. # Define start day and end of day start_day = np.array(day).astype("M8[D]") end_day = start_day + np.array(1, dtype="m8[D]") - np.array(1, dtype="m8[s]") # avoid 00:00 of next day ! # Add tolerance for searching timesteps before and after 00:00 to account for imprecise logging time - # - Example: timestep 23:58 that should be 00.00 goes into the next day ... - start_day_tol = start_day - np.array(60, dtype="m8[s]") - end_day_tol = end_day + np.array(60, dtype="m8[s]") + # - Example: timestep 23:59:30 that should be 00.00 goes into the next day ... + start_day_tol = start_day - np.array(TOLERANCE_SECONDS, dtype="m8[s]") + end_day_tol = end_day + np.array(TOLERANCE_SECONDS, dtype="m8[s]") - # Open files with data within the provided day + # ---------------------------------------------------------------------------------------. + # Open files with data within the provided day and concatenate them # list_ds = [xr.open_dataset(filepath, chunks={}).sel({"time": slice(start_day_tol, end_day_tol)}) # for filepath in filepaths] list_ds = [xr.open_dataset(filepath, chunks={}, cache=False).sortby("time") for filepath in filepaths] list_ds = [ds.sel({"time": slice(start_day_tol, end_day_tol)}) for ds in list_ds] - if len(list_ds) > 1: - # Check duplicated timesteps have same raw_drop_number - check_same_raw_drop_number_values(list_ds=list_ds, filepaths=filepaths) - # Merge dataset - ds = xr.merge(list_ds, join="outer", compat="no_conflicts", combine_attrs="override") + # Concatenate dataset + # - If some variable are missing in one file, it is filled with NaN. This should not occur anyway. + # - The resulting dataset can have duplicated timesteps ! + ds = xr.concat(list_ds, dim="time", join="outer", compat="no_conflicts", combine_attrs="override").sortby( + "time", + ) else: ds = list_ds[0] - # Check at least 5 timesteps are available - if len(ds["time"]) < 5: - raise ValueError(f"Less than 5 timesteps available for day {day}.") - - # Identify time integration - sample_interval = infer_sample_interval(ds, verbose=verbose, robust=False) - ds = add_sample_interval(ds, sample_interval=sample_interval) - - # Regularize timesteps (for trailing seconds) - ds = regularize_timesteps(ds, sample_interval=sample_interval, robust=False, add_quality_flag=True) - - # Slice for requested day - ds = ds.sel({"time": slice(start_day, end_day)}) - - # Load data into memory + # Compute data ds = ds.compute() # Close connection to source files @@ -233,18 +485,142 @@ def create_daily_file(day, filepaths, verbose=True): ds.close() del list_ds - return ds - - -def copy_l0b_to_l0c_directory(filepath): - """Copy L0B file to L0C directory.""" - import netCDF4 - - # Copy file - l0c_filepath = filepath.replace("L0B", "L0C") - _ = shutil.copy(filepath, l0c_filepath) - - # Edit DISDRODB product attribute - with netCDF4.Dataset(l0c_filepath, mode="a") as nc_file: - # Modify the global attribute - nc_file.setncattr("disdrodb_product", "L0C") + # ---------------------------------------------------------------------------------------. + # If sample interval is a dataset variable, drop timesteps with unexpected measurement intervals ! + if "sample_interval" in ds: + ds = drop_timesteps_with_invalid_sample_interval( + ds=ds, + measurement_intervals=measurement_intervals, + verbose=verbose, + logger=logger, + ) + + # ---------------------------------------------------------------------------------------. + # Remove duplicated timesteps + ds = remove_duplicated_timesteps( + ds, + ensure_variables_equality=ensure_variables_equality, + logger=logger, + verbose=verbose, + ) + + # Raise error if less than 3 timesteps left + n_timesteps = len(ds["time"]) + if n_timesteps < 3: + raise ValueError(f"{n_timesteps} timesteps left after removing duplicated timesteps.") + + # ---------------------------------------------------------------------------------------. + # Split dataset by sampling intervals + dict_ds = split_dataset_by_sampling_intervals( + ds=ds, + measurement_intervals=measurement_intervals, + min_sample_interval=10, + min_block_size=5, + ) + + # Log a warning if two sampling intervals are present within a given day + if len(dict_ds) > 1: + occuring_sampling_intervals = list(dict_ds) + msg = f"The dataset contains both sampling intervals {occuring_sampling_intervals}." + log_warning(logger=logger, msg=msg, verbose=verbose) + + # ---------------------------------------------------------------------------------------. + # Finalize L0C datasets + # - Add sample_interval coordinate + # - Regularize timesteps for trailing seconds + dict_ds = { + sample_interval: finalize_l0c_dataset( + ds=ds, + sample_interval=sample_interval, + start_day=start_day, + end_day=end_day, + verbose=verbose, + logger=logger, + ) + for sample_interval, ds in dict_ds.items() + } + return dict_ds + + +# ---------------------------------------------------------------------------------------. +#### DEPRECATED CODE + + +# def copy_l0b_to_l0c_directory(filepath): +# """Copy L0B file to L0C directory.""" +# import netCDF4 + +# # Copy file +# l0c_filepath = filepath.replace("L0B", "L0C") +# _ = shutil.copy(filepath, l0c_filepath) + +# # Edit DISDRODB product attribute +# with netCDF4.Dataset(l0c_filepath, mode="a") as nc_file: +# # Modify the global attribute +# nc_file.setncattr("disdrodb_product", "L0C") + +# def find_isel_common_time(da1, da2): +# """ +# Find the indices of common time steps between two data arrays. + +# Parameters +# ---------- +# da1 : xarray.DataArray +# The first data array with a time coordinate. +# da2 : xarray.DataArray +# The second data array with a time coordinate. + +# Returns +# ------- +# da1_isel : numpy.ndarray +# Indices of the common time steps in the first data array. +# da2_isel : numpy.ndarray +# Indices of the common time steps in the second data array. + +# Notes +# ----- +# This function assumes that both input data arrays have a "time" coordinate. +# The function finds the intersection of the time steps in both data arrays +# and returns the indices of these common time steps for each data array. +# """ +# intersecting_timesteps = np.intersect1d(da1["time"], da2["time"]) +# da1_isel = np.where(np.isin(da1["time"], intersecting_timesteps))[0] +# da2_isel = np.where(np.isin(da2["time"], intersecting_timesteps))[0] +# return da1_isel, da2_isel + + +# def check_same_raw_drop_number_values(list_ds, filepaths): +# """ +# Check if the 'raw_drop_number' values are the same across multiple datasets. + +# This function compares the 'raw_drop_number' values of multiple datasets to ensure they are identical +# at common timesteps. + +# If any discrepancies are found, a ValueError is raised indicating which files +# have differing values. + +# Parameters +# ---------- +# list_ds : list of xarray.Dataset +# A list of xarray Datasets to be compared. +# filepaths : list of str +# A list of file paths corresponding to the datasets in `list_ds`. + +# Raises +# ------ +# ValueError +# If 'raw_drop_number' values differ at any common timestep between any two datasets. +# """ +# # Retrieve variable to compare +# list_drop_number = [ds["raw_drop_number"].compute() for ds in list_ds] +# # Compare values +# combos = list(itertools.combinations(range(len(list_drop_number)), 2)) +# for i, j in combos: +# da1 = list_drop_number[i] +# da2 = list_drop_number[j] +# da1_isel, da2_isel = find_isel_common_time(da1=da1, da2=da2) +# if not np.all(da1.isel(time=da1_isel).data == da2.isel(time=da2_isel).data): +# file1 = filepaths[i] +# file2 = filepaths[i] +# msg = f"Duplicated timesteps have different values between file {file1} and {file2}" +# raise ValueError(msg) diff --git a/disdrodb/l0/readers/BRAZIL/CHUVA_LPM.py b/disdrodb/l0/readers/BRAZIL/CHUVA_LPM.py index 25f473c1..706e870e 100644 --- a/disdrodb/l0/readers/BRAZIL/CHUVA_LPM.py +++ b/disdrodb/l0/readers/BRAZIL/CHUVA_LPM.py @@ -161,6 +161,8 @@ def df_sanitizer_fun(df): df["time"] = df["sensor_date"] + "-" + df["sensor_time"] df["time"] = pd.to_datetime(df["time"], format="%d.%m.%y-%H:%M:%S", errors="coerce") + # TODO: correct time is unavailable yet ! + # Drop row if start_identifier different than 00 df = df[df["start_identifier"].astype(str) == "00"] diff --git a/disdrodb/l0/readers/NETHERLANDS/DELFT.py b/disdrodb/l0/readers/NETHERLANDS/DELFT.py index 5fa5632a..fcd829cd 100644 --- a/disdrodb/l0/readers/NETHERLANDS/DELFT.py +++ b/disdrodb/l0/readers/NETHERLANDS/DELFT.py @@ -156,9 +156,7 @@ def df_sanitizer_fun(df): "station_name", "station_number", "sensor_serial_number", - "sample_interval", "sensor_serial_number", - # "epoch_time", # "number_particles_all_detected", ] df = df.drop(columns=columns_to_drop) diff --git a/disdrodb/l1/beard_model.py b/disdrodb/l1/beard_model.py index 27d144af..1e25ff38 100644 --- a/disdrodb/l1/beard_model.py +++ b/disdrodb/l1/beard_model.py @@ -122,7 +122,7 @@ def get_vapor_actual_pressure_at_height( sea_level_temperature : float Standard temperature at sea level in Kelvin. sea_level_relative_humidity : float - Relative humidity at sea level. + Relative humidity at sea level. A value between 0 and 1. sea_level_air_pressure : float, optional Standard atmospheric pressure at sea level in Pascals. The default is 101_325 Pascals. lapse_rate : float, optional @@ -140,9 +140,6 @@ def get_vapor_actual_pressure_at_height( ) esat = get_vapor_saturation_pressure(sea_level_temperature) actual_vapor = sea_level_relative_humidity / (1 / esat - (1 - sea_level_relative_humidity) / sea_level_air_pressure) - # actual_vapor = get_vapor_actual_pressure(relative_humidity=sea_level_relative_humidity, - # temperature=sea_level_temperature, - # air_pressure=sea_level_air_pressure) return actual_vapor * np.exp(-(5.8e3 * lapse_rate / (temperature_at_altitude**2) + 5.5e-5) * altitude) @@ -184,18 +181,16 @@ def get_vapor_saturation_pressure(temperature): return np.exp(esat / (temperature**2)) -def get_vapor_actual_pressure(relative_humidity, temperature, air_pressure): +def get_vapor_actual_pressure(relative_humidity, temperature): """ Computes the actual vapor pressure over water. Parameters ---------- relative_humidity : float - Relative humidity. + Relative humidity. A value between 0 and 1. temperature : float Temperature in Kelvin. - air_pressure : float - Air pressure in Pascals. Returns ------- @@ -203,7 +198,6 @@ def get_vapor_actual_pressure(relative_humidity, temperature, air_pressure): Actual vapor pressure in Pascal. """ esat = get_vapor_saturation_pressure(temperature) - # old_e = relative_humidity / (1 / esat - (1 - relative_humidity) / air_pressure) return relative_humidity * esat @@ -548,7 +542,7 @@ def retrieve_fall_velocity( temperature : float Temperature in Kelvin. relative_humidity : float - Relative humidity. + Relative humidity. A value between 0 and 1. latitude : float Latitude in degrees. air_pressure : float @@ -583,7 +577,6 @@ def retrieve_fall_velocity( vapor_pressure = get_vapor_actual_pressure( relative_humidity=relative_humidity, temperature=temperature, - air_pressure=air_pressure, ) # Retrieve air density and water density diff --git a/disdrodb/l1/fall_velocity.py b/disdrodb/l1/fall_velocity.py index 6b24cbb2..6e7d8dc4 100644 --- a/disdrodb/l1/fall_velocity.py +++ b/disdrodb/l1/fall_velocity.py @@ -221,7 +221,7 @@ def get_raindrop_fall_velocity(diameter, method, ds_env=None): - 'altitude' : Altitude in meters (m). - 'latitude' : Latitude in degrees. - 'temperature' : Temperature in degrees Celsius (°C). - - 'relative_humidity' : Relative humidity in percentage (%). + - 'relative_humidity' : Relative humidity. A value between 0 and 1. - 'sea_level_air_pressure' : Sea level air pressure in Pascals (Pa). - 'lapse_rate' : Lapse rate in degrees Celsius per meter (°C/m). It is required for for the 'Beard1976' method. diff --git a/disdrodb/l1/processing.py b/disdrodb/l1/processing.py index 0750b39b..7783ad99 100644 --- a/disdrodb/l1/processing.py +++ b/disdrodb/l1/processing.py @@ -27,7 +27,7 @@ from disdrodb.l2.empirical_dsd import get_drop_average_velocity, get_min_max_diameter # TODO: maybe move out of L2 from disdrodb.utils.attrs import set_attrs from disdrodb.utils.encoding import set_encodings -from disdrodb.utils.time import infer_sample_interval, regularize_timesteps +from disdrodb.utils.time import ensure_sample_interval_in_seconds, infer_sample_interval def generate_l1( @@ -95,12 +95,16 @@ def generate_l1( # Initialize L2 dataset ds_l1 = xr.Dataset() - # Identify time integration - sample_interval = infer_sample_interval(ds, verbose=False) - ds = add_sample_interval(ds, sample_interval=sample_interval) + # Retrieve sample interval + # --> sample_interval is a coordinate of L0C products + if "sample_interval" in ds: + sample_interval = ensure_sample_interval_in_seconds(ds["sample_interval"].data) + else: + # This line is not called in the DISDRODB processing chain ! + sample_interval = infer_sample_interval(ds, verbose=False) - # Regularize timesteps (for trailing seconds) - ds = regularize_timesteps(ds, sample_interval=sample_interval) + # Re-add sample interval as coordinate (in seconds) + ds = add_sample_interval(ds, sample_interval=sample_interval) # --------------------------------------------------------------------------- # Retrieve ENV dataset or take defaults @@ -170,6 +174,11 @@ def generate_l1( ds_l1["n_drops_selected"] = drop_counts.sum(dim=["diameter_bin_center"]) ds_l1["n_drops_discarded"] = drop_counts.sum(dim=["diameter_bin_center"]) + # ------------------------------------------------------------------------------------------- + #### Add L0C coordinates that might got lost + if "time_qc" in ds: + ds_l1 = ds_l1.assign_coords({"time_qc": ds["time_qc"]}) + #### ----------------------------------------------------------------------------. #### Add encodings and attributes # Add variables attributes diff --git a/disdrodb/l1/resampling.py b/disdrodb/l1/resampling.py index 3dc50326..3cfcabbf 100644 --- a/disdrodb/l1/resampling.py +++ b/disdrodb/l1/resampling.py @@ -52,6 +52,7 @@ def add_sample_interval(ds, sample_interval): ds["sample_interval"].attrs["units"] = "seconds" ds = ds.set_coords("sample_interval") # Update measurement_interval attribute + ds.attrs = ds.attrs.copy() ds.attrs["measurement_interval"] = int(sample_interval) return ds diff --git a/disdrodb/l1/routines.py b/disdrodb/l1/routines.py index 91495e23..96477aa5 100644 --- a/disdrodb/l1/routines.py +++ b/disdrodb/l1/routines.py @@ -206,6 +206,9 @@ def run_l1_station( """ Run the L1 processing of a specific DISDRODB station when invoked from the terminal. + The L1 routines just filter the raw drop spectrum and compute basic statistics. + The L1 routine expects as input L0C files where each file has a unique sample interval. + This function is intended to be called through the ``disdrodb_run_l1_station`` command-line interface. @@ -274,15 +277,30 @@ def run_l1_station( # -------------------------------------------------------------------------. # List files to process required_product = get_required_product(product) - filepaths = get_filepaths( - base_dir=base_dir, - data_source=data_source, - campaign_name=campaign_name, - station_name=station_name, - product=required_product, - # Processing options - debugging_mode=debugging_mode, - ) + flag_not_available_data = False + try: + filepaths = get_filepaths( + base_dir=base_dir, + data_source=data_source, + campaign_name=campaign_name, + station_name=station_name, + product=required_product, + # Processing options + debugging_mode=debugging_mode, + ) + except Exception as e: + print(str(e)) # Case where no file paths available + flag_not_available_data = True + + # -------------------------------------------------------------------------. + # If no data available, print error message and return None + if flag_not_available_data: + msg = ( + f"{product} processing of {data_source} {campaign_name} {station_name}" + + f"has not been launched because of missing {required_product} data." + ) + print(msg) + return # -----------------------------------------------------------------. # Generate L1 files @@ -319,7 +337,7 @@ def run_l1_station( # ---------------------------------------------------------------------. # End L1 processing if verbose: - timedelta_str = str(datetime.timedelta(seconds=time.time() - t_i)) + timedelta_str = str(datetime.timedelta(seconds=round(time.time() - t_i))) msg = f"{product} processing of station {station_name} completed in {timedelta_str}" log_info(logger=logger, msg=msg, verbose=verbose) diff --git a/disdrodb/l1_env/routines.py b/disdrodb/l1_env/routines.py index cf60db8a..5acc40f1 100644 --- a/disdrodb/l1_env/routines.py +++ b/disdrodb/l1_env/routines.py @@ -25,7 +25,7 @@ def get_default_environment_dataset(): ds_env["sea_level_air_pressure"] = 101_325 ds_env["gas_constant_dry_air"] = 287.04 ds_env["lapse_rate"] = 0.0065 - ds_env["relative_humidity"] = 0.95 + ds_env["relative_humidity"] = 0.95 # Value between 0 and 1 ! ds_env["temperature"] = 20 + 273.15 return ds_env diff --git a/disdrodb/l2/empirical_dsd.py b/disdrodb/l2/empirical_dsd.py index b6df0514..49d9ef90 100644 --- a/disdrodb/l2/empirical_dsd.py +++ b/disdrodb/l2/empirical_dsd.py @@ -506,7 +506,7 @@ def get_equivalent_reflectivity_factor(drop_number_concentration, diameter, diam def get_mass_spectrum(drop_number_concentration, diameter, water_density=1000): """ - Calculate the rain drop mass spectrum m(D) in g/m3 mm. + Calculate the rain drop mass spectrum m(D) in g/m3 mm-1. It represents the mass of liquid water as a function of raindrop diameter. @@ -521,7 +521,7 @@ def get_mass_spectrum(drop_number_concentration, diameter, water_density=1000): Returns ------- array-like - The calculated rain drop mass spectrum in grams per cubic meter per diameter (g/m3 mm). + The calculated rain drop mass spectrum in grams per cubic meter per diameter (g/m3 mm-1). """ # Convert water density from kg/m3 to g/m3 @@ -814,6 +814,72 @@ def get_mean_volume_drop_diameter(moment_3, moment_4): return D_m +def get_std_volume_drop_diameter(drop_number_concentration, diameter_bin_width, diameter, mean_volume_diameter): + r""" + Calculate the standard deviation of the mass-weighted drop diameter (σₘ). + + This parameter is often also referred as the mass spectrum standard deviation. + It quantifies the spread or variability of DSD. + + Parameters + ---------- + drop_number_concentration : xarray.DataArray + The drop number concentration \\( N(D) \\) for each diameter bin, typically in units of + number per cubic meter per millimeter (m⁻³·mm⁻¹). + diameter : xarray.DataArray + The equivalent volume diameters \\( D \\) of the drops in each bin, in meters (m). + diameter_bin_width : xarray.DataArray + The width \\( \\Delta D \\) of each diameter bin, in millimeters (mm). + mean_volume_diameter : xarray.DataArray + The mean volume diameter \\( D_m \\), in millimeters (mm). This is typically computed using the + third and fourth moments or directly from the DSD. + + Returns + ------- + sigma_m : xarray.DataArray or float + The standard deviation of the mass-weighted drop diameter, \\( \\sigma_m \\), + in millimeters (mm). + + Notes + ----- + The standard deviation of the mass-weighted drop diameter is calculated using the formula: + + .. math:: + + \\sigma_m = \\sqrt{\frac{\\sum [N(D) \\cdot (D - D_m)^2 \\cdot D^3 + \\cdot \\Delta D]}{\\sum [N(D) \\cdot D^3 \\cdot \\Delta D]}} + + where: + + - \\( N(D) \\) is the drop number concentration for diameter \\( D \\) [m⁻³·mm⁻¹]. + - \\( D \\) is the drop diameter [mm]. + - \\( D_m \\) is the mean volume diameter [mm]. + - \\( \\Delta D \\) is the diameter bin width [mm]. + - The numerator computes the weighted variance of diameters. + - The weighting factor \\( D^3 \\) accounts for mass (since mass ∝ \\( D^3 \\)). + + **Physical Interpretation:** + + - A smaller \\( \\sigma_m \\) indicates that the mass is concentrated around the + mean mass-weighted diameter, implying less variability in drop sizes. + - A larger \\( \\sigma_m \\) suggests a wider spread of drop sizes contributing + to the mass, indicating greater variability. + + References + ---------- + - Smith, P. L., Johnson, R. W., & Kliche, D. V. (2019). On Use of the Standard + Deviation of the Mass Distribution as a Parameter in Raindrop Size Distribution + Functions. *Journal of Applied Meteorology and Climatology*, 58(4), 787-796. + https://doi.org/10.1175/JAMC-D-18-0086.1 + - Williams, C. R., and Coauthors, 2014: Describing the Shape of Raindrop Size Distributions Using Uncorrelated + Raindrop Mass Spectrum Parameters. J. Appl. Meteor. Climatol., 53, 1282-1296, https://doi.org/10.1175/JAMC-D-13-076.1. + """ + const = drop_number_concentration * diameter_bin_width * diameter**3 + numerator = ((diameter * 1000 - mean_volume_diameter) ** 2 * const).sum(dim="diameter_bin_center") + sigma_m = np.sqrt(numerator / const.sum(dim="diameter_bin_center")) + return sigma_m + + def get_median_volume_drop_diameter(drop_number_concentration, diameter, diameter_bin_width, water_density=1000): r""" Compute the median volume drop diameter (D50). @@ -960,72 +1026,6 @@ def get_quantile_volume_drop_diameter( return d -def get_std_volume_drop_diameter(drop_number_concentration, diameter_bin_width, diameter, mean_volume_diameter): - r""" - Calculate the standard deviation of the mass-weighted drop diameter (σₘ). - - This parameter is often also referred as the mass spectrum standard deviation. - It quantifies the spread or variability of DSD. - - Parameters - ---------- - drop_number_concentration : xarray.DataArray - The drop number concentration \\( N(D) \\) for each diameter bin, typically in units of - number per cubic meter per millimeter (m⁻³·mm⁻¹). - diameter : xarray.DataArray - The equivalent volume diameters \\( D \\) of the drops in each bin, in meters (m). - diameter_bin_width : xarray.DataArray - The width \\( \\Delta D \\) of each diameter bin, in millimeters (mm). - mean_volume_diameter : xarray.DataArray - The mean volume diameter \\( D_m \\), in millimeters (mm). This is typically computed using the - third and fourth moments or directly from the DSD. - - Returns - ------- - sigma_m : xarray.DataArray or float - The standard deviation of the mass-weighted drop diameter, \\( \\sigma_m \\), - in millimeters (mm). - - Notes - ----- - The standard deviation of the mass-weighted drop diameter is calculated using the formula: - - .. math:: - - \\sigma_m = \\sqrt{\frac{\\sum [N(D) \\cdot (D - D_m)^2 \\cdot D^3 - \\cdot \\Delta D]}{\\sum [N(D) \\cdot D^3 \\cdot \\Delta D]}} - - where: - - - \\( N(D) \\) is the drop number concentration for diameter \\( D \\) [m⁻³·mm⁻¹]. - - \\( D \\) is the drop diameter [mm]. - - \\( D_m \\) is the mean volume diameter [mm]. - - \\( \\Delta D \\) is the diameter bin width [mm]. - - The numerator computes the weighted variance of diameters. - - The weighting factor \\( D^3 \\) accounts for mass (since mass ∝ \\( D^3 \\)). - - **Physical Interpretation:** - - - A smaller \\( \\sigma_m \\) indicates that the mass is concentrated around the - mean mass-weighted diameter, implying less variability in drop sizes. - - A larger \\( \\sigma_m \\) suggests a wider spread of drop sizes contributing - to the mass, indicating greater variability. - - References - ---------- - - Smith, P. L., Johnson, R. W., & Kliche, D. V. (2019). On Use of the Standard - Deviation of the Mass Distribution as a Parameter in Raindrop Size Distribution - Functions. *Journal of Applied Meteorology and Climatology*, 58(4), 787-796. - https://doi.org/10.1175/JAMC-D-18-0086.1 - - Williams, C. R., and Coauthors, 2014: Describing the Shape of Raindrop Size Distributions Using Uncorrelated - Raindrop Mass Spectrum Parameters. J. Appl. Meteor. Climatol., 53, 1282-1296, https://doi.org/10.1175/JAMC-D-13-076.1. - """ - const = drop_number_concentration * diameter_bin_width * diameter**3 - numerator = ((diameter * 1000 - mean_volume_diameter) ** 2 * const).sum(dim="diameter_bin_center") - sigma_m = np.sqrt(numerator / const.sum(dim="diameter_bin_center")) - return sigma_m - - ####----------------------------------------------------------------------------------------------------- #### Normalized Gamma Parameters @@ -1072,7 +1072,8 @@ def get_normalized_intercept_parameter(liquid_water_content, mean_volume_diamete # Compute Nw # --> 1e9 is used to convert from mm-4 to m-3 mm-1 - # 256 = 4**4 + # - 256 = 4**4 + # - lwc = (np.pi * water_density / 6) * moment_3 Nw = (256.0 / (np.pi * water_density)) * liquid_water_content / mean_volume_diameter**4 * 1e9 return Nw @@ -1094,6 +1095,14 @@ def get_mom_normalized_intercept_parameter(moment_3, moment_4): Nw : xarray.DataArray or float Normalized intercept parameter \\( N_w \\) in units of m⁻3·mm⁻¹. + References + ---------- + Testud, J., S. Oury, R. A. Black, P. Amayenc, and X. Dou, 2001: + The Concept of “Normalized” Distribution to Describe Raindrop Spectra: + A Tool for Cloud Physics and Cloud Remote Sensing. + J. Appl. Meteor. Climatol., 40, 1118-1140, + https://doi.org/10.1175/1520-0450(2001)040<1118:TCONDT>2.0.CO;2 + """ Nw = 256 / 6 * moment_3**5 / moment_4**4 return Nw diff --git a/disdrodb/l2/event.py b/disdrodb/l2/event.py index 594a6d45..41472008 100644 --- a/disdrodb/l2/event.py +++ b/disdrodb/l2/event.py @@ -15,16 +15,13 @@ # along with this program. If not, see . # -----------------------------------------------------------------------------. """Functions for event definition.""" - -import datetime - import dask import numpy as np import pandas as pd import xarray as xr from disdrodb.api.info import get_start_end_time_from_filepaths -from disdrodb.utils.time import ensure_sorted_by_time +from disdrodb.utils.time import acronym_to_seconds, ensure_sorted_by_time @dask.delayed @@ -34,105 +31,157 @@ def _delayed_open_dataset(filepath): return ds -def identify_events(filepaths, parallel): - """Identify rainy events.""" +def identify_events( + filepaths, + parallel=False, + min_n_drops=5, + neighbor_min_size=2, + neighbor_time_interval="5MIN", + intra_event_max_time_gap="6H", + event_min_duration="5MIN", + event_min_size=3, +): + """Return a list of rainy events. + + Rainy timesteps are defined when n_drops_selected > min_n_drops. + Any rainy isolated timesteps (based on neighborhood criteria) is removed. + Then, consecutive rainy timesteps are grouped into the same event if the time gap between them does not + exceed `intra_event_max_time_gap`. Finally, events that do not meet minimum size or duration + requirements are filtered out. + + Parameters + ---------- + filepaths: list + List of L1C file paths. + parallel: bool + Whether to load the files in parallel. + Set parallel=True only in a multiprocessing environment. + The default is False. + neighbor_time_interval : str + The time interval around a given a timestep defining the neighborhood. + Only timesteps that fall within this time interval before or after a timestep are considered neighbors. + neighbor_min_size : int, optional + The minimum number of neighboring timesteps required within `neighbor_time_interval` for a + timestep to be considered non-isolated. Isolated timesteps are removed ! + - If `neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs. + - If `neighbor_min_size=1`, the timestep must have at least one neighbor within `neighbor_time_interval`. + - If `neighbor_min_size=2`, the timestep must have at least two timesteps within `neighbor_time_interval`. + Defaults to 1. + intra_event_max_time_gap: str + The maximum time interval between two timesteps to be considered part of the same event. + This parameters is used to group timesteps into events ! + event_min_duration : str + The minimum duration an event must span. Events shorter than this duration are discarded. + event_min_size : int, optional + The minimum number of valid timesteps required for an event. Defaults to 1. + + Returns + ------- + list of dict + A list of events, where each event is represented as a dictionary with keys: + - "start_time": np.datetime64, start time of the event + - "end_time": np.datetime64, end time of the event + - "duration": np.timedelta64, duration of the event + - "n_timesteps": int, number of valid timesteps in the event + """ # Open datasets in parallel if parallel: list_ds = dask.compute([_delayed_open_dataset(filepath) for filepath in filepaths])[0] else: list_ds = [xr.open_dataset(filepath, chunks={}, cache=False) for filepath in filepaths] - - # List sample interval - sample_intervals = np.array([ds["sample_interval"].data.item() for ds in list_ds]) + # Filter dataset for requested variables + variables = ["time", "n_drops_selected"] + list_ds = [ds[variables] for ds in list_ds] # Concat datasets - ds = xr.concat(list_ds, dim="time") - # Read in memory what is needed - ds = ds[["time", "n_drops_selected"]].compute() + ds = xr.concat(list_ds, dim="time", compat="no_conflicts", combine_attrs="override") + # Read in memory the variable needed + ds = ds.compute() # Close file on disk _ = [ds.close() for ds in list_ds] del list_ds - # Check for sample intervals - if len(set(sample_intervals)) > 1: - raise ValueError("Sample intervals are not constant across files.") # Sort dataset by time ds = ensure_sorted_by_time(ds) - # Select events - # TODO: - minimum_n_drops = 5 - min_time_contiguity = pd.Timedelta(datetime.timedelta(minutes=10)) - max_dry_time_interval = pd.Timedelta(datetime.timedelta(hours=2)) - minimum_duration = pd.Timedelta(datetime.timedelta(minutes=5)) - event_list = select_events( - ds=ds, - minimum_n_drops=minimum_n_drops, - minimum_duration=minimum_duration, - min_time_contiguity=min_time_contiguity, - max_dry_time_interval=max_dry_time_interval, + # Define candidate timesteps to group into events + idx_valid = ds["n_drops_selected"].data > min_n_drops + timesteps = ds["time"].data[idx_valid] + # Define event list + event_list = group_timesteps_into_event( + timesteps=timesteps, + neighbor_min_size=neighbor_min_size, + neighbor_time_interval=neighbor_time_interval, + intra_event_max_time_gap=intra_event_max_time_gap, + event_min_duration=event_min_duration, + event_min_size=event_min_size, ) return event_list -def remove_isolated_timesteps(timesteps, min_time_contiguity): - """Remove isolated timesteps.""" - # Sort timesteps just in case - timesteps.sort() - cleaned_timesteps = [] - for i, t in enumerate(timesteps): - prev_t = timesteps[i - 1] if i > 0 else None - next_t = timesteps[i + 1] if i < len(timesteps) - 1 else None - - is_isolated = True - if prev_t and t - prev_t <= min_time_contiguity: - is_isolated = False - if next_t and next_t - t <= min_time_contiguity: - is_isolated = False - - if not is_isolated: - cleaned_timesteps.append(t) - - return cleaned_timesteps - - -def group_timesteps_into_events(timesteps, max_dry_time_interval): - """Group timesteps into events.""" - timesteps.sort() - events = [] - current_event = [timesteps[0]] - - for i in range(1, len(timesteps)): - current_t = timesteps[i] - previous_t = timesteps[i - 1] - - if current_t - previous_t <= max_dry_time_interval: - current_event.append(current_t) - else: - events.append(current_event) - current_event = [current_t] +def group_timesteps_into_event( + timesteps, + intra_event_max_time_gap, + event_min_size=0, + event_min_duration="0S", + neighbor_min_size=0, + neighbor_time_interval="0S", +): + """ + Group candidate timesteps into events based on temporal criteria. - events.append(current_event) - return events + This function groups valid candidate timesteps into events by considering how they cluster + in time. Any isolated timesteps (based on neighborhood criteria) are first removed. Then, + consecutive timesteps are grouped into the same event if the time gap between them does not + exceed `intra_event_max_time_gap`. Finally, events that do not meet minimum size or duration + requirements are filtered out. + Please note that neighbor_min_size and neighbor_time_interval are very sensitive to the + actual sample interval of the data ! -def select_events( - ds, - minimum_n_drops, - minimum_duration, # TODO UNUSED - min_time_contiguity, - max_dry_time_interval, -): - """Select events.""" - timesteps = ds["time"].data - n_drops_selected = ds["n_drops_selected"].data - - # Define candidate timesteps to group into events - idx_valid = n_drops_selected > minimum_n_drops - timesteps = timesteps[idx_valid] + Parameters + ---------- + timesteps: np.ndarray + Candidate timesteps to be grouped into events. + neighbor_time_interval : str + The time interval around a given a timestep defining the neighborhood. + Only timesteps that fall within this time interval before or after a timestep are considered neighbors. + neighbor_min_size : int, optional + The minimum number of neighboring timesteps required within `neighbor_time_interval` for a + timestep to be considered non-isolated. Isolated timesteps are removed ! + - If `neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs. + - If `neighbor_min_size=1`, the timestep must have at least one neighbor within `neighbor_time_interval`. + - If `neighbor_min_size=2`, the timestep must have at least two timesteps within `neighbor_time_interval`. + Defaults to 1. + intra_event_max_time_gap: str + The maximum time interval between two timesteps to be considered part of the same event. + This parameters is used to group timesteps into events ! + event_min_duration : str + The minimum duration an event must span. Events shorter than this duration are discarded. + event_min_size : int, optional + The minimum number of valid timesteps required for an event. Defaults to 1. - # Remove noisy timesteps - timesteps = remove_isolated_timesteps(timesteps, min_time_contiguity) + Returns + ------- + list of dict + A list of events, where each event is represented as a dictionary with keys: + - "start_time": np.datetime64, start time of the event + - "end_time": np.datetime64, end time of the event + - "duration": np.timedelta64, duration of the event + - "n_timesteps": int, number of valid timesteps in the event + """ + # Retrieve datetime arguments + neighbor_time_interval = pd.Timedelta(acronym_to_seconds(neighbor_time_interval), unit="seconds") + intra_event_max_time_gap = pd.Timedelta(acronym_to_seconds(intra_event_max_time_gap), unit="seconds") + event_min_duration = pd.Timedelta(acronym_to_seconds(event_min_duration), unit="seconds") + + # Remove isolated timesteps + timesteps = remove_isolated_timesteps( + timesteps, + neighbor_min_size=neighbor_min_size, + neighbor_time_interval=neighbor_time_interval, + ) # Group timesteps into events - events = group_timesteps_into_events(timesteps, max_dry_time_interval) + # - If two timesteps are separated by less than intra_event_max_time_gap, are considered the same event + events = group_timesteps_into_events(timesteps, intra_event_max_time_gap) # Define list of event event_list = [ @@ -140,12 +189,141 @@ def select_events( "start_time": event[0], "end_time": event[-1], "duration": (event[-1] - event[0]).astype("m8[m]"), + "n_timesteps": len(event), } for event in events ] + + # Filter event list by duration + event_list = [event for event in event_list if event["duration"] >= event_min_duration] + + # Filter event list by duration + event_list = [event for event in event_list if event["n_timesteps"] >= event_min_size] + return event_list +def remove_isolated_timesteps(timesteps, neighbor_min_size, neighbor_time_interval): + """ + Remove isolated timesteps that do not have enough neighboring timesteps within a specified time gap. + + A timestep is considered isolated (and thus removed) if it does not have at least `neighbor_min_size` other + timesteps within the `neighbor_time_interval` before or after it. + In other words, for each timestep, we look for how many other timesteps fall into the + time interval [t - neighbor_time_interval, t + neighbor_time_interval], excluding it itself. + If the count of such neighbors is less than `neighbor_min_size`, that timestep is removed. + + Parameters + ---------- + timesteps : array-like of np.datetime64 + Sorted or unsorted array of valid timesteps. + neighbor_time_interval : np.timedelta64 + The time interval around a given a timestep defining the neighborhood. + Only timesteps that fall within this time interval before or after a timestep are considered neighbors. + neighbor_min_size : int, optional + The minimum number of neighboring timesteps required within `neighbor_time_interval` for a + timestep to be considered non-isolated. + - If `neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs. + - If `neighbor_min_size=1`, the timestep must have at least one neighbor within `neighbor_time_interval`. + - If `neighbor_min_size=2`, the timestep must have at least two timesteps within `neighbor_time_interval`. + Defaults to 1. + + Returns + ------- + np.ndarray + Array of timesteps with isolated entries removed. + """ + # Sort timesteps + timesteps = np.array(timesteps) + timesteps.sort() + + # Do nothing if neighbor_min_size is 0 + if neighbor_min_size == 0: + return timesteps + + # Compute the start and end of the interval for each timestep + t_starts = timesteps - neighbor_time_interval + t_ends = timesteps + neighbor_time_interval + + # Use searchsorted to find the positions where these intervals would be inserted + # to keep the array sorted. This effectively gives us the bounds of timesteps + # within the neighbor interval. + left_indices = np.searchsorted(timesteps, t_starts, side="left") + right_indices = np.searchsorted(timesteps, t_ends, side="right") + + # The number of neighbors is the difference in indices minus one (to exclude the timestep itself) + n_neighbors = right_indices - left_indices - 1 + valid_mask = n_neighbors >= neighbor_min_size + + non_isolated_timesteps = timesteps[valid_mask] + + # NON VECTORIZED CODE + # non_isolated_timesteps = [] + # n_neighbours_arr = [] + # for i, t in enumerate(timesteps): + # n_neighbours = np.sum(np.logical_and(timesteps >= (t - neighbor_time_interval), + # timesteps <= (t + neighbor_time_interval))) - 1 + # n_neighbours_arr.append(n_neighbours) + # if n_neighbours > neighbor_min_size: + # non_isolated_timesteps.append(t) + # non_isolated_timesteps = np.array(non_isolated_timesteps) + return non_isolated_timesteps + + +def group_timesteps_into_events(timesteps, intra_event_max_time_gap): + """ + Group valid timesteps into events based on a maximum allowed dry interval. + + Parameters + ---------- + timesteps : array-like of np.datetime64 + Sorted array of valid timesteps. + intra_event_max_time_gap : np.timedelta64 + Maximum time interval allowed between consecutive valid timesteps for them + to be considered part of the same event. + + Returns + ------- + list of np.ndarray + A list of events, where each event is an array of timesteps. + """ + # Deal with case with no timesteps + if len(timesteps) == 0: + return [] + + # Ensure timesteps are sorted + timesteps.sort() + + # Compute differences between consecutive timesteps + diffs = np.diff(timesteps) + + # Identify the indices where the gap is larger than intra_event_max_time_gap + # These indices represent boundaries between events + break_indices = np.where(diffs > intra_event_max_time_gap)[0] + 1 + + # Split the timesteps at the identified break points + events = np.split(timesteps, break_indices) + + # NON VECTORIZED CODE + # events = [] + # current_event = [timesteps[0]] + # for i in range(1, len(timesteps)): + # current_t = timesteps[i] + # previous_t = timesteps[i - 1] + + # if current_t - previous_t <= intra_event_max_time_gap: + # current_event.append(current_t) + # else: + # events.append(current_event) + # current_event = [current_t] + + # events.append(current_event) + return events + + +####-----------------------------------------------------------------------------------. + + def get_events_info(list_events, filepaths, accumulation_interval, rolling): """ Provide information about the required files for each event. @@ -208,14 +386,3 @@ def get_events_info(list_events, filepaths, accumulation_interval, rolling): ) return event_info - - -# list_events[0] -# accumulation_interval = 30 - - -# For event -# - Get start_time, end_time -# - Buffer start_time and end_time by accumulation_interval - -# - Get filepaths for start_time, end_time (assign_filepaths_to_event) diff --git a/disdrodb/l2/processing.py b/disdrodb/l2/processing.py index 71baf634..6cf130c0 100644 --- a/disdrodb/l2/processing.py +++ b/disdrodb/l2/processing.py @@ -49,6 +49,7 @@ from disdrodb.scattering import get_radar_parameters from disdrodb.utils.attrs import set_attrs from disdrodb.utils.encoding import set_encodings +from disdrodb.utils.time import ensure_sample_interval_in_seconds def define_diameter_array(diameter_min=0, diameter_max=10, diameter_spacing=0.05): @@ -89,49 +90,6 @@ def define_diameter_array(diameter_min=0, diameter_max=10, diameter_spacing=0.05 return da -def ensure_sample_interval_in_seconds(sample_interval): - """ - Ensure the sample interval is in seconds. - - Parameters - ---------- - sample_interval : int, numpy.ndarray, xarray.DataArray, or numpy.timedelta64 - The sample interval to be converted to seconds. - It can be: - - An integer representing the interval in seconds. - - A numpy array or xarray DataArray of integers representing intervals in seconds. - - A numpy.timedelta64 object representing the interval. - - A numpy array or xarray DataArray of numpy.timedelta64 objects representing intervals. - - Returns - ------- - int, numpy.ndarray, or xarray.DataArray - The sample interval converted to seconds. The return type matches the input type: - - If the input is an integer, the output is an integer. - - If the input is a numpy array, the output is a numpy array of integers. - - If the input is an xarray DataArray, the output is an xarray DataArray of integers. - - """ - if ( - isinstance(sample_interval, int) - or isinstance(sample_interval, (np.ndarray, xr.DataArray)) - and np.issubdtype(sample_interval.dtype, int) - ): - return sample_interval - if isinstance(sample_interval, np.timedelta64): - return sample_interval / np.timedelta64(1, "s") - if isinstance(sample_interval, np.ndarray) and np.issubdtype(sample_interval.dtype, np.timedelta64): - return sample_interval.astype("timedelta64[s]").astype(int) - if isinstance(sample_interval, xr.DataArray) and np.issubdtype(sample_interval.dtype, np.timedelta64): - sample_interval = sample_interval.copy() - sample_interval_int = sample_interval.data.astype("timedelta64[s]").astype(int) - sample_interval.data = sample_interval_int - return sample_interval - raise TypeError( - "sample_interval must be an int, numpy.timedelta64, or numpy array of timedelta64.", - ) - - def define_velocity_array(ds): """ Create the fall velocity DataArray using various methods. @@ -229,8 +187,8 @@ def compute_integral_parameters( # Compute rain accumulation (P) [mm] rain_accumulation = get_rain_accumulation(rain_rate=rain_rate, sample_interval=sample_interval) - # Compute moments (m1 to m6) - for moment in range(1, 7): + # Compute moments (m0 to m6) + for moment in range(0, 7): ds[f"M{moment}"] = get_moment( drop_number_concentration=drop_number_concentration, diameter=diameter, diff --git a/disdrodb/l2/processing_options.py b/disdrodb/l2/processing_options.py index 788294b8..492c78ba 100644 --- a/disdrodb/l2/processing_options.py +++ b/disdrodb/l2/processing_options.py @@ -1,10 +1,14 @@ -import re - # TODO: Write to YAML +# TODO: radar_simulation_enabled: differentiate between L2E and L2M: config = { "global_settings": { - "time_integration": ["10S", "30S", "1MIN", "5MIN", "10MIN", "15MIN", "30MIN", "1H", "ROLL5MIN", "ROLL10MIN"], + "time_integration": [ + "1MIN", + "10MIN", + "ROLL1MIN", + "ROLL10MIN", + ], # ["10S", "30S", "1MIN", "5MIN", "10MIN", "15MIN", "30MIN", "1H", "ROLL5MIN", "ROLL10MIN"], # L2M options "l2m_options": { "fall_velocity_method": "Beard1976", @@ -26,13 +30,6 @@ "order": 2, "add_gof_metrics": True, }, - # 'fixed': { - # 'psd_model': 'NormalizedGammaPSD', - # 'parameters': { - # 'mu': [1.5, 2, 2.5, 3], - # }, - # "add_gof_metrics": True - # }, }, # Radar options "radar_simulation_enabled": True, @@ -76,45 +73,8 @@ def get_l2_processing_options(): for tt in config["global_settings"]["time_integration"]: l2_options_dict[tt] = config["global_settings"].copy() _ = l2_options_dict[tt].pop("time_integration", None) + # Add specific settings for tt, product_options in config["specific_settings"].items(): - l2_options_dict[tt].update(product_options) + if tt in l2_options_dict: + l2_options_dict[tt].update(product_options) return l2_options_dict - - -def get_resampling_information(sample_interval_acronym): - """ - Extract resampling information from the sample interval acronym. - - Parameters - ---------- - sample_interval_acronym: str - A string representing the sample interval: e.g., "1H30MIN", "ROLL1H30MIN". - - Returns - ------- - sample_interval_seconds, rolling: tuple - Sample_interval in seconds and whether rolling is enabled. - """ - rolling = sample_interval_acronym.startswith("ROLL") - if rolling: - sample_interval_acronym = sample_interval_acronym[4:] # Remove "ROLL" - - # Regular expression to match duration components - pattern = r"(\d+)([DHMIN]+)" - matches = re.findall(pattern, sample_interval_acronym) - - # Conversion factors for each unit - unit_to_seconds = { - "D": 86400, # Seconds in a day - "H": 3600, # Seconds in an hour - "MIN": 60, # Seconds in a minute - "S": 1, # Seconds in a second - } - - # Parse matches and calculate total seconds - sample_interval = 0 - for value, unit in matches: - value = int(value) - if unit in unit_to_seconds: - sample_interval += value * unit_to_seconds[unit] - return sample_interval, rolling diff --git a/disdrodb/l2/routines.py b/disdrodb/l2/routines.py index 5baa45ae..3d8fb1ea 100644 --- a/disdrodb/l2/routines.py +++ b/disdrodb/l2/routines.py @@ -32,11 +32,12 @@ create_logs_directory, create_product_directory, ) +from disdrodb.api.info import group_filepaths from disdrodb.api.io import get_filepaths, get_required_product from disdrodb.api.path import ( + define_accumulation_acronym, define_l2e_filename, define_l2m_filename, - get_sample_interval_acronym, ) from disdrodb.configs import get_base_dir from disdrodb.l1.resampling import ( @@ -45,12 +46,12 @@ ) from disdrodb.l2.event import get_events_info, identify_events from disdrodb.l2.processing import ( - ensure_sample_interval_in_seconds, generate_l2_empirical, generate_l2_model, generate_l2_radar, ) -from disdrodb.l2.processing_options import get_l2_processing_options, get_resampling_information +from disdrodb.l2.processing_options import get_l2_processing_options +from disdrodb.metadata import read_station_metadata from disdrodb.utils.decorator import delayed_if_parallel, single_threaded_if_parallel # Logger @@ -61,6 +62,7 @@ log_error, log_info, ) +from disdrodb.utils.time import ensure_sample_interval_in_seconds, get_resampling_information from disdrodb.utils.writer import write_product logger = logging.getLogger(__name__) @@ -97,7 +99,7 @@ def _generate_l2e( # -----------------------------------------------------------------. # Create file logger - sample_interval_acronym = get_sample_interval_acronym(seconds=accumulation_interval) + sample_interval_acronym = define_accumulation_acronym(seconds=accumulation_interval, rolling=rolling) starting_time = pd.to_datetime(start_time).strftime("%Y%m%d%H%M%S") ending_time = pd.to_datetime(end_time).strftime("%Y%m%d%H%M%S") filename = f"L2E.{sample_interval_acronym}.{campaign_name}.{station_name}.s{starting_time}.e{ending_time}" @@ -119,7 +121,8 @@ def _generate_l2e( # - Open the netCDFs list_ds = [xr.open_dataset(filepath, chunks={}, cache=False, autoclose=True) for filepath in filepaths] # - Concatenate datasets - ds = xr.concat(list_ds, dim="time").sel(time=slice(start_time, end_time)).compute() + ds = xr.concat(list_ds, dim="time", compat="no_conflicts", combine_attrs="override") + ds = ds.sel(time=slice(start_time, end_time)).compute() # - Close file on disk _ = [ds.close() for ds in list_ds] @@ -130,7 +133,7 @@ def _generate_l2e( # - When we regularize, we infill with NaN # - When we aggregate with sum, we don't skip NaN # --> Aggregation with original missing timesteps currently results in NaN ! - # TODO: tolerance on fraction of missing timesteps for large accumulation_intervals + # TODO: Add tolerance on fraction of missing timesteps for large accumulation_intervals ds["drop_number"] = xr.where(np.isnan(ds["drop_number"]), 0, ds["drop_number"]) # - Regularize dataset @@ -146,6 +149,15 @@ def _generate_l2e( rolling=rolling, ) + ##------------------------------------------------------------------------. + # Remove timesteps with no drops or NaN (from L2E computations) + # timestep_zero_drops = ds["time"].data[ds["n_drops_selected"].data == 0] + # timestep_nan = ds["time"].data[np.isnan(ds["n_drops_selected"].data)] + indices_valid_timesteps = np.where( + ~np.logical_or(ds["n_drops_selected"].data == 0, np.isnan(ds["n_drops_selected"].data)), + )[0] + ds = ds.isel(time=indices_valid_timesteps) + ##------------------------------------------------------------------------. #### Generate L2E product ds = generate_l2_empirical(ds=ds) @@ -156,6 +168,12 @@ def _generate_l2e( ds.update(ds_radar) ds.attrs = ds_radar.attrs.copy() + ##------------------------------------------------------------------------. + #### Regularize back dataset + # TODO: infill timestep_zero_drops and timestep_nan differently ? + # --> R, P, LWC = 0, + # --> Z, D, with np.nan? + ##------------------------------------------------------------------------. # Write netCDF4 dataset if ds["time"].size > 1: @@ -191,6 +209,28 @@ def _generate_l2e( return logger_filepath +def is_possible_product(accumulation_interval, sample_interval, rolling): + """Assess if production is possible given the requested accumulation interval and source sample_interval.""" + # Avoid rolling product generation at source sample interval + if rolling and accumulation_interval == sample_interval: + return False + # Avoid product generation if the accumulation_interval is less than the sample interval + if accumulation_interval < sample_interval: + return False + # Avoid producti generation if accumulation_interval is not multiple of sample_interval + return accumulation_interval % sample_interval == 0 + + +def flatten_list(nested_list): + """Flatten a nested list into a single-level list.""" + if isinstance(nested_list, list) and len(nested_list) == 0: + return nested_list + # If list is already flat, return as is to avoid flattening to chars + if isinstance(nested_list, list) and not isinstance(nested_list[0], list): + return nested_list + return [item for sublist in nested_list for item in sublist] if isinstance(nested_list, list) else [nested_list] + + def run_l2e_station( # Station arguments data_source, @@ -209,6 +249,17 @@ def run_l2e_station( This function is intended to be called through the ``disdrodb_run_l2e_station`` command-line interface. + The DISDRODB L2E routine generate a L2E file for each event. + Events are defined based on the DISDRODB event settings options. + The DISDRODB event settings allows to produce L2E files either + per custom block of time (i.e day/month/year) or for blocks of rainy events. + + For stations with varying measurement intervals, DISDRODB defines a separate list of 'events' + for each measurement interval option. In other words, DISDRODB does not + mix files with data acquired at different sample intervals when resampling the data. + + L0C product generation ensure creation of files with unique sample intervals. + Parameters ---------- data_source : str @@ -254,33 +305,70 @@ def run_l2e_station( # -------------------------------------------------------------------------. # List L1 files to process required_product = get_required_product(product) - filepaths = get_filepaths( - base_dir=base_dir, - data_source=data_source, - campaign_name=campaign_name, - station_name=station_name, - product=required_product, - # Processing options - debugging_mode=False, - ) + flag_not_available_data = False + try: + filepaths = get_filepaths( + base_dir=base_dir, + data_source=data_source, + campaign_name=campaign_name, + station_name=station_name, + product=required_product, + # Processing options + debugging_mode=False, + ) + except Exception as e: + print(str(e)) # Case where no file paths available + flag_not_available_data = True + + # -------------------------------------------------------------------------. + # If no data available, print error message and return None + if flag_not_available_data: + msg = ( + f"{product} processing of {data_source} {campaign_name} {station_name}" + + f"has not been launched because of missing {required_product} data." + ) + print(msg) + return # -------------------------------------------------------------------------. # Retrieve L2 processing options # - Each dictionary item contains the processing options for a given rolling/accumulation_interval combo l2_processing_options = get_l2_processing_options() + # ---------------------------------------------------------------------. + # Group filepaths by sample intervals + # - Typically the sample interval is fixed + # - Some stations might change the sample interval along the years + # - For each sample interval, separated processing take place here after ! + dict_filepaths = group_filepaths(filepaths, groups="sample_interval") + # -------------------------------------------------------------------------. - # Define list event - # TODO: pass event option list ! - list_events = identify_events(filepaths, parallel=parallel) - if debugging_mode: - list_events = list_events[0 : min(len(list_events), 3)] + # Define list of event + # - [(start_time, end_time)] + # TODO: Here pass event option list ! + # TODO: Implement more general define_events function + # - Either rainy events + # - Either time blocks (day/month/year) + # TODO: Define events identification settings based on accumulation + # - This is currently done at the source sample interval ! + # - Should we allow event definition for each accumulation interval and + # move this code inside the loop below + + # sample_interval = list(dict_filepaths)[0] + # filepaths = dict_filepaths[sample_interval] + + dict_list_events = { + sample_interval: identify_events(filepaths, parallel=parallel) + for sample_interval, filepaths in dict_filepaths.items() + } # ---------------------------------------------------------------------. - # Retrieve source sample interval - # TODO: Read from metadata ? - with xr.open_dataset(filepaths[0], chunks={}, autoclose=True, cache=False) as ds: - sample_interval = ensure_sample_interval_in_seconds(ds["sample_interval"].data).item() + # Subset for debugging mode + if debugging_mode: + dict_list_events = { + sample_interval: list_events[0 : min(len(list_events), 3)] + for sample_interval, list_events in dict_list_events.items() + } # ---------------------------------------------------------------------. # Loop @@ -298,12 +386,33 @@ def run_l2e_station( radar_simulation_options = l2_options["radar_simulation_options"] # ------------------------------------------------------------------. - # Avoid generation of rolling products for source sample interval ! - if rolling and accumulation_interval == sample_interval: - continue + # Group filepaths by events + # - This is done separately for each possible source sample interval + # - It groups filepaths by start_time and end_time provided by list_events + # - Here 'events' can also simply be period of times ('day', 'months', ...) + # - When aggregating/resampling/accumulating data, we need to load also + # some data before/after the actual event start_time/end_time + # - get_events_info adjust the event times to accounts for the required "border" data. + events_info = [ + get_events_info( + list_events=list_events, + filepaths=dict_filepaths[sample_interval], + accumulation_interval=accumulation_interval, + rolling=rolling, + ) + for sample_interval, list_events in dict_list_events.items() + if is_possible_product( + accumulation_interval=accumulation_interval, + sample_interval=sample_interval, + rolling=rolling, + ) + ] + events_info = flatten_list(events_info) - # Avoid product generation if the accumulation_interval is less than the sample interval - if accumulation_interval < sample_interval: + # ------------------------------------------------------------------. + # Skip processing if no files available + # - When not compatible accumulation_interval with source sample_interval + if len(events_info) == 0: continue # ------------------------------------------------------------------. @@ -331,15 +440,10 @@ def run_l2e_station( sample_interval=accumulation_interval, rolling=rolling, ) - # Retrieve files events information - events_info = get_events_info( - list_events=list_events, - filepaths=filepaths, - accumulation_interval=accumulation_interval, - rolling=rolling, - ) + + # ------------------------------------------------------------------. # Generate files - # - Loop over netCDF files and generate new product files. + # - L2E product generation is optionally parallelized over events # - If parallel=True, it does that in parallel using dask.delayed list_tasks = [ _generate_l2e( @@ -374,7 +478,7 @@ def run_l2e_station( station_name=station_name, base_dir=base_dir, # Product options - sample_interval=sample_interval, + sample_interval=accumulation_interval, rolling=rolling, # Logs list list_logs=list_logs, @@ -383,7 +487,7 @@ def run_l2e_station( # ---------------------------------------------------------------------. # End product processing if verbose: - timedelta_str = str(datetime.timedelta(seconds=time.time() - t_i)) + timedelta_str = str(datetime.timedelta(seconds=round(time.time() - t_i))) msg = f"{product} processing of station {station_name} completed in {timedelta_str}" log_info(logger=logger, msg=msg, verbose=verbose) @@ -552,19 +656,17 @@ def run_l2m_station( l2_processing_options = get_l2_processing_options() # ---------------------------------------------------------------------. - # Retrieve sampling interval - # TODO: read from metadata ! - filepath = get_filepaths( + # Retrieve source sampling interval + # - If a station has varying measurement interval over time, choose the smallest one ! + metadata = read_station_metadata( base_dir=base_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name, - product="L1", - # Processing options - debugging_mode=debugging_mode, - )[0] - with xr.open_dataset(filepath, chunks={}, cache=False, autoclose=True) as ds: - sample_interval = ensure_sample_interval_in_seconds(ds["sample_interval"].data).item() + ) + sample_interval = metadata["measurement_interval"] + if isinstance(sample_interval, list): + sample_interval = min(sample_interval) # ---------------------------------------------------------------------. # Loop @@ -596,17 +698,31 @@ def run_l2m_station( # -----------------------------------------------------------------. # List files to process required_product = get_required_product(product) - filepaths = get_filepaths( - base_dir=base_dir, - data_source=data_source, - campaign_name=campaign_name, - station_name=station_name, - product=required_product, - sample_interval=accumulation_interval, - rolling=rolling, - # Processing options - debugging_mode=debugging_mode, - ) + flag_not_available_data = False + try: + filepaths = get_filepaths( + base_dir=base_dir, + data_source=data_source, + campaign_name=campaign_name, + station_name=station_name, + product=required_product, + sample_interval=accumulation_interval, + rolling=rolling, + # Processing options + debugging_mode=debugging_mode, + ) + except Exception as e: + print(str(e)) # Case where no file paths available + flag_not_available_data = True + + # If no data available, try with other L2E accumulation intervals + if flag_not_available_data: + msg = ( + f"{product} processing of {data_source} {campaign_name} {station_name}" + + f"has not been launched because of missing {required_product} {sample_interval_acronym} data ." + ) + print(msg) + continue # -----------------------------------------------------------------. # Loop over distributions to fit @@ -694,6 +810,6 @@ def run_l2m_station( # ---------------------------------------------------------------------. # End L2M processing if verbose: - timedelta_str = str(datetime.timedelta(seconds=time.time() - t_i)) + timedelta_str = str(datetime.timedelta(seconds=round(time.time() - t_i))) msg = f"{product} processing of station {station_name} completed in {timedelta_str}" log_info(logger=logger, msg=msg, verbose=verbose) diff --git a/disdrodb/psd/fitting.py b/disdrodb/psd/fitting.py index 7bb85593..91d5d185 100644 --- a/disdrodb/psd/fitting.py +++ b/disdrodb/psd/fitting.py @@ -1015,7 +1015,8 @@ def get_gamma_parameters_johnson2014(ds, method="Nelder-Mead"): ####-----------------------------------------------------------------------------------------. -#### Optimize Normalized Gamma +#### Grid Search +# Optimize Normalized Gamma def _estimate_normalized_gamma_w_d50(Nd, D, D50, Nw, order=2): @@ -1106,55 +1107,6 @@ def get_normalized_gamma_parameters(ds, order=2): return ds -####-----------------------------------------------------------------. -#### Moments - - -def get_exponential_moment(N0, Lambda, moment): - """Compute exponential distribution moments.""" - return N0 * gamma(moment + 1) / Lambda ** (moment + 1) - - -def get_gamma_moment_v1(N0, mu, Lambda, moment): - """Compute gamma distribution moments. - - References - ---------- - Kozu, T., and K. Nakamura, 1991: - Rainfall Parameter Estimation from Dual-Radar Measurements - Combining Reflectivity Profile and Path-integrated Attenuation. - J. Atmos. Oceanic Technol., 8, 259-270, https://doi.org/10.1175/1520-0426(1991)008<0259:RPEFDR>2.0.CO;2 - """ - # Zhang et al 2001: N0 * gamma(mu + moment + 1) * Lambda ** (-(mu + moment + 1)) - return N0 * gamma(mu + moment + 1) / Lambda ** (mu + moment + 1) - - -def get_gamma_moment_v2(Nt, mu, Lambda, moment): - """Compute gamma distribution moments. - - References - ---------- - Kozu, T., and K. Nakamura, 1991: - Rainfall Parameter Estimation from Dual-Radar Measurements - Combining Reflectivity Profile and Path-integrated Attenuation. - J. Atmos. Oceanic Technol., 8, 259-270, https://doi.org/10.1175/1520-0426(1991)008<0259:RPEFDR>2.0.CO;2 - """ - return Nt * gamma(mu + moment + 1) / gamma(mu + 1) / Lambda**moment - - -def get_lognormal_moment(Nt, sigma, mu, moment): - """Compute lognormal distribution moments. - - References - ---------- - Kozu, T., and K. Nakamura, 1991: - Rainfall Parameter Estimation from Dual-Radar Measurements - Combining Reflectivity Profile and Path-integrated Attenuation. - J. Atmos. Oceanic Technol., 8, 259-270, https://doi.org/10.1175/1520-0426(1991)008<0259:RPEFDR>2.0.CO;2 - """ - return Nt * np.exp(moment * mu + 1 / 2 * moment * sigma**2) - - ####-----------------------------------------------------------------. #### Methods of Moments @@ -1189,7 +1141,7 @@ def get_exponential_parameters_Zhang2008(moment_l, moment_m, l, m): # noqa: E74 return Lambda, N0 -def get_exponential_parameters_Testud2001(moment_3, moment_4): +def get_exponential_parameters_M34(moment_3, moment_4): """Compute exponential distribution parameters following Testud 2001. References @@ -1206,24 +1158,103 @@ def get_exponential_parameters_Testud2001(moment_3, moment_4): return Lambda, N0 -def get_gamma_parameters_Testud2001(moment_3, moment_4): - """Compute exponential distribution parameters following Testud 2001. +# M246 DEFAULT FOR GAMMA ? +# ----------------------------- + +# LMOM (Johnson et al., 2014) + + +def get_gamma_parameters_M012(M0, M1, M2): + """Compute gamma distribution parameters following Cao et al., 2009. References ---------- - Testud, J., S. Oury, R. A. Black, P. Amayenc, and X. Dou, 2001: - The Concept of “Normalized” Distribution to Describe Raindrop Spectra: - A Tool for Cloud Physics and Cloud Remote Sensing. - J. Appl. Meteor. Climatol., 40, 1118-1140, - https://doi.org/10.1175/1520-0450(2001)040<1118:TCONDT>2.0.CO;2 + Cao, Q., and G. Zhang, 2009: + Errors in Estimating Raindrop Size Distribution Parameters Employing Disdrometer and Simulated Raindrop Spectra. + J. Appl. Meteor. Climatol., 48, 406-425, https://doi.org/10.1175/2008JAMC2026.1. + """ + G = M1**3 / M0 / M2 + mu = 1 / (1 - G) - 2 + Lambda = M0 / M1 * (mu + 1) + N0 = Lambda ** (mu + 1) * M0 / gamma(mu + 1) + return mu, Lambda, N0 + + +def get_gamma_parameters_M234(M2, M3, M4): + """Compute gamma distribution parameters following Cao et al., 2009. + + References + ---------- + Cao, Q., and G. Zhang, 2009: + Errors in Estimating Raindrop Size Distribution Parameters Employing Disdrometer and Simulated Raindrop Spectra. + J. Appl. Meteor. Climatol., 48, 406-425, https://doi.org/10.1175/2008JAMC2026.1. + """ + G = M3**2 / M2 / M4 + mu = 1 / (1 - G) - 4 + Lambda = M2 / M3 * (mu + 3) + N0 = Lambda ** (mu + 3) * M2 / gamma(mu + 3) + return mu, Lambda, N0 + + +def get_gamma_parameters_M246(M2, M4, M6): + """Compute gamma distribution parameters following Ulbrich 1998. + + References + ---------- + Ulbrich, C. W., and D. Atlas, 1998: + Rainfall Microphysics and Radar Properties: Analysis Methods for Drop Size Spectra. + J. Appl. Meteor. Climatol., 37, 912-923, + https://doi.org/10.1175/1520-0450(1998)037<0912:RMARPA>2.0.CO;2 + + Cao, Q., and G. Zhang, 2009: + Errors in Estimating Raindrop Size Distribution Parameters Employing Disdrometer and Simulated Raindrop Spectra. + J. Appl. Meteor. Climatol., 48, 406-425, https://doi.org/10.1175/2008JAMC2026.1. + + Thurai, M., Williams, C.R., Bringi, V.N., 2014: + Examining the correlations between drop size distribution parameters using data + from two side-by-side 2D-video disdrometers. + Atmospheric Research, 144, 95-110, https://doi.org/10.1016/j.atmosres.2014.01.002. + """ + G = M4**2 / M2 / M6 + + # TODO: Different formulas ! + # Thurai et al., 2014 (A4), Ulbrich et al., 1998 (2) + # mu = ((7.0 - 11.0 * G) - + # np.sqrt((7.0 - 11.0 * G) ** 2.0 - 4.0 * (G - 1.0) * (30.0 * G - 12.0)) / (2.0 * (G - 1.0))) + mu = (7.0 - 11.0 * G) - np.sqrt(G**2 + 89 * G + 1) / (2.0 * (G - 1.0)) + + # Cao et al., 2009 (B3) + # --> Wrong ??? + mu = (7.0 - 11.0 * G) - np.sqrt(G**2 + 14 * G + 1) / (2.0 * (G - 1.0)) + + Lambda = np.sqrt((4 + mu) * (3 + mu) * M2 / M4) + # Cao et al., 2009 + N0 = M2 * Lambda ** (3 + mu) / gamma(3 + mu) + # # Thurai et al., 2014 + # N0 = M3 * Lambda ** (4 + mu) / gamma(4 + mu) + # # Ulbrich et al., 1998 + # N0 = M6 * Lambda ** (7.0 + mu) / gamma(7 + mu) + return mu, Lambda, N0 + + +def get_gamma_parameters_M456(M4, M5, M6): + """Compute gamma distribution parameters following Cao et al., 2009. + + References + ---------- + Cao, Q., and G. Zhang, 2009: + Errors in Estimating Raindrop Size Distribution Parameters Employing Disdrometer and Simulated Raindrop Spectra. + J. Appl. Meteor. Climatol., 48, 406-425, https://doi.org/10.1175/2008JAMC2026.1. """ - # gamma(4) = 6 - N0 = 256 / gamma(4) * moment_3**5 / moment_4**4 # 256/6*moment_3**5/moment_4**4 - return N0 + G = M5**2 / M4 / M6 + mu = 1 / (1 - G) - 6 + Lambda = M4 / M5 * (mu + 5) + N0 = Lambda ** (mu + 5) * M4 / gamma(mu + 5) + return mu, Lambda, N0 -def get_gamma_parameters_Kozu1991(moment_3, moment_4, moment_6): - """Compute exponential distribution parameters following Kozu 1991. +def get_gamma_parameters_M346(M3, M4, M6): + """Compute gamma distribution parameters following Kozu 1991. References ---------- @@ -1237,36 +1268,25 @@ def get_gamma_parameters_Kozu1991(moment_3, moment_4, moment_6): Stratiform versus Convective Clouds. J. Appl. Meteor. Climatol., 35, 355-371, https://doi.org/10.1175/1520-0450(1996)035<0355:EFTRSO>2.0.CO;2 + + Cao, Q., and G. Zhang, 2009: + Errors in Estimating Raindrop Size Distribution Parameters Employing Disdrometer and Simulated Raindrop Spectra. + J. Appl. Meteor. Climatol., 48, 406-425, https://doi.org/10.1175/2008JAMC2026.1. """ - G = moment_4**3 / moment_3**2 / moment_6 - mu = (11 * G - 8 + np.sqrt(G * (G + 8))) / (2 * (1 - G)) - # mu = (5.5*G - 4 + np.sqrt(G*(G * 0.25 + 2)))/(1-G) - Lambda = (mu + 4) * moment_3 / moment_4 - N0 = Lambda ** (mu + 4) * moment_3 / gamma(mu + 4) - # Nt = Lambda**3*moment_3*gamma(mu+1)/gamma(mu+4) - # Dm = moment_4/moment_3 # 4/lambda - return mu, Lambda, N0 + G = M4**3 / M3**2 / M6 + # Kozu + mu = (5.5 * G - 4 + np.sqrt(G * (G * 0.25 + 2))) / (1 - G) -def get_gamma_parameters_Ulbrich1998(moment_2, moment_4, moment_6): - """Compute exponential distribution parameters following Ulbrich 1998. + # Cao et al., 2009 (equivalent) + # mu = (11 * G - 8 + np.sqrt(G * (G + 8))) / (2 * (1 - G)) - References - ---------- - Ulbrich, C. W., and D. Atlas, 1998: - Rainfall Microphysics and Radar Properties: Analysis Methods for Drop Size Spectra. - J. Appl. Meteor. Climatol., 37, 912-923, - https://doi.org/10.1175/1520-0450(1998)037<0912:RMARPA>2.0.CO;2 - """ - G = moment_4**2 / moment_2 / moment_6 - mu = (7.0 - 11.0 * G) - np.sqrt((7.0 - 11.0 * G) ** 2.0 - 4.0 * (G - 1.0) * (30.0 * G - 12.0)) / (2.0 * (G - 1.0)) - Lambda = np.sqrt((4 + mu) * (3 + mu) * moment_2 / moment_4) - N0 = moment_6 * Lambda ** (7.0 + mu) / gamma(7 + mu) - # D50 = (3.67 + mu) / Lambda + Lambda = (mu + 4) * M3 / M4 + N0 = Lambda ** (mu + 4) * M3 / gamma(mu + 4) return mu, Lambda, N0 -def get_lognormal_parameters_Kozu1991(moment_3, moment_4, moment_6): +def get_lognormal_parameters_M346(M3, M4, M6): """Compute lognormal distribution parameters following Kozu1991. References @@ -1276,9 +1296,9 @@ def get_lognormal_parameters_Kozu1991(moment_3, moment_4, moment_6): Combining Reflectivity Profile and Path-integrated Attenuation. J. Atmos. Oceanic Technol., 8, 259-270, https://doi.org/10.1175/1520-0426(1991)008<0259:RPEFDR>2.0.CO;2 """ - L3 = np.log(moment_3) - L4 = np.log(moment_4) - L6 = np.log(moment_6) + L3 = np.log(M3) + L4 = np.log(M4) + L6 = np.log(M6) Nt = np.exp((24 * L3 - 27 * L4 - 6 * L6) / 3) mu = (-10 * L3 + 13.5 * L4 - 3.5 * L6) / 3 sigma = (2 * L3 - 3 * L4 + L6) / 3 diff --git a/disdrodb/psd/models.py b/disdrodb/psd/models.py index 1fe10d04..92b2b472 100644 --- a/disdrodb/psd/models.py +++ b/disdrodb/psd/models.py @@ -343,9 +343,9 @@ class GammaPSD(ExponentialPSD): Attributes ---------- - N0: the intercept parameter. - Lambda: the inverse scale parameter - mu: the shape parameter + N0: the intercept parameter [mm**(-1-mu) m**-3] (scale parameter) + Lambda: the inverse scale parameter [mm-1] (slope parameter) + mu: the shape parameter [-] Dmax: the maximum diameter to consider (defaults to 11/Lambda, i.e. approx. 3*D50, if None) @@ -440,6 +440,8 @@ class NormalizedGammaPSD(XarrayPSD): N(D) = Nw * f1(mu) * (D/Dm)**mu * exp(-(mu+4)*D/Dm) # Nw * f(D; Dm, mu) f1(mu) = 6/(4**4) * (mu+4)**(mu+4)/Gamma(mu+4) + Note: gamma(4) = 6 + An alternative formulation as function of Dm: # Tokay et al., 2010 # Illingworth et al., 2002 (see eq10 to derive full formulation!) @@ -641,3 +643,52 @@ def __eq__(self, other): and (self.bin_edges == other.bin_edges).all() and (self.bin_psd == other.bin_psd).all() ) + + +####-----------------------------------------------------------------. +#### Moments Computation + + +def get_exponential_moment(N0, Lambda, moment): + """Compute exponential distribution moments.""" + return N0 * gamma(moment + 1) / Lambda ** (moment + 1) + + +def get_gamma_moment_v1(N0, mu, Lambda, moment): + """Compute gamma distribution moments. + + References + ---------- + Kozu, T., and K. Nakamura, 1991: + Rainfall Parameter Estimation from Dual-Radar Measurements + Combining Reflectivity Profile and Path-integrated Attenuation. + J. Atmos. Oceanic Technol., 8, 259-270, https://doi.org/10.1175/1520-0426(1991)008<0259:RPEFDR>2.0.CO;2 + """ + # Zhang et al 2001: N0 * gamma(mu + moment + 1) * Lambda ** (-(mu + moment + 1)) + return N0 * gamma(mu + moment + 1) / Lambda ** (mu + moment + 1) + + +def get_gamma_moment_v2(Nt, mu, Lambda, moment): + """Compute gamma distribution moments. + + References + ---------- + Kozu, T., and K. Nakamura, 1991: + Rainfall Parameter Estimation from Dual-Radar Measurements + Combining Reflectivity Profile and Path-integrated Attenuation. + J. Atmos. Oceanic Technol., 8, 259-270, https://doi.org/10.1175/1520-0426(1991)008<0259:RPEFDR>2.0.CO;2 + """ + return Nt * gamma(mu + moment + 1) / gamma(mu + 1) / Lambda**moment + + +def get_lognormal_moment(Nt, sigma, mu, moment): + """Compute lognormal distribution moments. + + References + ---------- + Kozu, T., and K. Nakamura, 1991: + Rainfall Parameter Estimation from Dual-Radar Measurements + Combining Reflectivity Profile and Path-integrated Attenuation. + J. Atmos. Oceanic Technol., 8, 259-270, https://doi.org/10.1175/1520-0426(1991)008<0259:RPEFDR>2.0.CO;2 + """ + return Nt * np.exp(moment * mu + 1 / 2 * moment * sigma**2) diff --git a/disdrodb/routines.py b/disdrodb/routines.py index 73f9ea20..1d70c8ed 100644 --- a/disdrodb/routines.py +++ b/disdrodb/routines.py @@ -152,7 +152,7 @@ def run_disdrodb_l0_station( # -------------------------------------------------------------------------. # End of L0 processing for all stations - timedelta_str = str(datetime.timedelta(seconds=time.time() - t_i)) + timedelta_str = str(datetime.timedelta(seconds=round(time.time() - t_i))) print(f"L0 processing of stations {station_name} completed in {timedelta_str}") diff --git a/disdrodb/tests/data/check_readers/DISDRODB/Raw/EPFL/PARSIVEL_2007/metadata/10.yml b/disdrodb/tests/data/check_readers/DISDRODB/Raw/EPFL/PARSIVEL_2007/metadata/10.yml index 8e63baff..3e334611 100644 --- a/disdrodb/tests/data/check_readers/DISDRODB/Raw/EPFL/PARSIVEL_2007/metadata/10.yml +++ b/disdrodb/tests/data/check_readers/DISDRODB/Raw/EPFL/PARSIVEL_2007/metadata/10.yml @@ -37,7 +37,7 @@ firmware_version: "" sensor_beam_length: "" sensor_beam_width: "" sensor_nominal_width: "" -measurement_interval: "" +measurement_interval: 10 calibration_sensitivity: "" calibration_certification_date: "" calibration_certification_url: "" diff --git a/disdrodb/tests/data/check_readers/DISDRODB/Raw/UK/DIVEN/ground_truth/CAIRNGORM/L0B.DIVEN.CAIRNGORM.s20170210000000.e20170210000400.V0.nc b/disdrodb/tests/data/check_readers/DISDRODB/Raw/UK/DIVEN/ground_truth/CAIRNGORM/L0B.DIVEN.CAIRNGORM.s20170210000000.e20170210000400.V0.nc index fe41f9b0e3585933a71eb8715ffd83d1835f4413..1d407e1899bed4d2cbb21b14d25d8591d61e0fc2 100644 GIT binary patch delta 5512 zcmbstYgkQLd#!zTIj4&*DygJHagdyjiuy1rnj)n%#wC(!bV#}~22Y>HWiTdrXFTIF zio}qRtx4m{6f>A1*KwJM5MxXjx6c@ZZ(Yu=)A7u&Z#~c1YrnU(-uGSaW$n{gZnjY} zTkYnsaRNqkXxS5?)M>kGE3|JgJ_K4}wzzrl{;5zR#0rNLaJrWx>=~p4pCCK%QaD>Z zwEF9LNL*wXQlempQ0O|(>_8Ujj!L&=mjps^pC z3{Mnl{2UxbHO_*8qT0SlNodhrl^iFRFw_7u2yqpBD*B4=CvK00I*WX$yPQ)A1fB(xqTcIYGOFD9;+>Kv*uBn&WvFqNnMBFY$yP#~GMzZf?g&4mT3 zIQzpCP0S7=Q#ZGVAEEJZQx%6#!a$2qTnh6oLc}!dy*s5ki+Uk1451!`56z_(`5@#; zfl>?)QbYa6zGGHh62VuLaPiELPV&F0z}Qe6mX)xAWL~u zV6ZZeJk<#GqtIkZBKs395L!e5cZB*=z}SK40i{GDyfd0k+D}K#=JZ|9lUwJ53j^4wAw<|4Xpk zbFW}qbolUC!CHbC^$fwynK9eDXa&m)(@T>%ErZb9K<^eR433a?yV(ks4oj;oIBh{I zEiDFiyE#iC-9$lr6MT0Yj8ta|3p7%Vn(Q1r^7IP8p3?=ZU}=V@ry$NP&9s9xJu|wj zrA}@gN>}=-5X7sN;MyxseD-8U5p3$UxMOlQ=fS8YIdKeVHTiDA1{4c@LY3!N=@&kd zPH9So^b5qdUZ^9NM95K~pp6U$G0;{96BuYGgNqqxFM~T6*hvPn$;v&EgAA^tAl&FX zMAU!g{!A7dnZW3W*bAa&UKnM!{7%>p`{*t zbK@s2dnB(m_)Yzqkht*SNQKC*)&!lb8G43ORRqaL$oBD*7?+ZAQN5EjTQ6{$C8F_6 zM|y>~J**3usEc~d5q>#*Mz{H_WF{)`j=dZ%HjvS_!or<>;;*=bhK(IN0wu7lBhmT{ zBTKR$Lh-fplXwjD#Ml8rT|sBac%rkU^~RQY^)!q*mIFPWP9TSRw7F*s64}}wR9~bZ zKbIk!^~?Yb=Qzv)n7o?e-8lIzd^Z!kJ*%7ud9=fm z5-V?@Zbv|0(wK&Mr(H+D`%u8L$2c}jP|i)&5hO+M~8-wlCwRrf(zS%b&g*i%f%~J!E*HlWB?3z2U^R64QU(rNyjOB6K;vej3x@A={PMOrq;gdUZh95uxAdMhM3u zw42o@9IZeVR*2Gv`;2HN&IkZ|p(|Xyr4$nUX;q9{_Kb?qG4W-wuc-T-H0yNlBZX zq|cd`FioGHkd%_1laeK84;G#I`bd$KmEFuP9>g?}R%<%!ll6(SQiuSPO`AP8CF_O+ zMMW-3db#L%XYPlPp4SDeOO)P;X*v3w+~gFGw?*2cax*d@v&0D!OFDt3#2O}*bTXT( zPtHjt`4+8!vLbu2^qtcqpsq**E4Fu&el2=Wu=VTbtLvxJdiVwe`2_~~2SVhQ_xsTB zq$e~^>*MQ_oHEB(tMl(WJvn8XHgKTMCt0V}`T6R6bqZt$(OaCLd5b+9$qm7zv&BML ziM==^xMqx0SJF-BVnv(atdzv;+^m#Y#6t;s>ZLh}X`udCEz;|hq}aBEgea0qxBH5g zP8=DWUwHLQ?G-gT3-z24$T3cjT}+(e6$&MqAf1zSK{K(J~{aQ7_|&@}%>untf!UmQIWeXiy6&9zA6@NA?k!ns+{y#lv}5LyYOcV z+^v;`&xSHBk%zdh1BNpvx+r!lF|?d;7Q8K>u0E`=xR@R3zc!wIU1?Vnu@d&7? ze+WCzd&0PjpNk1~ljg&N=3K0T|6J(;3vb(sUj^M%N{=2^VXp{;%;`!^G?Drmt_Pn} zfPAh|M_k$9K8t+9F5;iCPnt9S-(~)O5Fo6}hQjpDiXML)hp30LVvP@N#rXxlb|Z>? zhr9l4tk5S$3Qd{%whLTkfAsD$NK_@(Qz7z(iw-P zSj^e_s)}ZzMI7CRjMxkCs3zWT*?kTobYV{EJ<}-LdWlopI-|?JuVTQpGC&Bp&F(dp zH~QWwnN=xT(Ar}~{3#;XoENU7cgf_U$}4QomxevBHt^4a-{d_@wuc@tjai>7oZ1%I zbsL5aFxAYb(Uj|Eveh2wHYh-d;hN=d%vZso>p}Q4Sae0Fs+QeL0;IR-@8t0lIfI=T zEN@yvaig`f92c#8Wc^8p=T73sKrem;gOq|*6Sv^DY{AJzt5=y?Fr~>YP;O*_yAAmlh6eS!-zLm(?V7Jkn0&ok9jhxz%nMZv|%o8rmU;O%Q# ztReI;3A0^n<-Fz^zUfs%U%(_9%oyo`I(Y`u$EwMoPX__L!0r^t{3B!^>jrX-WE|&m znifOdydhN?82e&4Ki^BFszJr;Na=fr4uTUCkvkC+S^32x4Hq%PLySa7kYZQjlh>FF z$w|b7Zu@`On+i#_)xCx8HzbB^eLSsfePu72gB69-49mzhixEzHZrRM1$#}L9E6CRg zEPASeGe<Z65$N7d01Ji1 z2kF;}1-UM>(e#?8yQ5BFfKkD>zds2ZOTNfa6xas!7xrhfd~Prkp2MulENSLlI=H+0}G&o~(2ySoz{Utq=HK^dqB@LOaz~-_VE;m>Sq@LVZ(+R#?s_Zb!Du~{A zVCcavpuVCOR}AczTGdsB$6HnX5QUwURSU!LRPr2(waTi|0Tff(6^ARURyk74eJ`xF F{U4JH@@@bC delta 5106 zcmb_fX;f547Or~lA>Az6?5#9QD+Gr)*9sBi}VJ=r{V-#M=R1DjnGI!(gIt2(cAD; z)Z-}!B-RvMaYYSrxXMtG>`%>8kV;Z@}{6?j8L|bLzoB^Q$Wllst|Idu^>zNP|#w& zntb&L`BL;YWfA?v3WR1;;EK=?3VH?*ouQmKg#0O|ubO;_SO5hf6!c^EAvB2sZ)Q0{ zODP~S#3zI*DImrK6Ox85Q4~!k6M}k%9`y__>KRnjOe0J-bU2;kZb?3ai1`%6Qc#4@ z2nu?phVRy0%L@u(>E4Md`=o66Gb4uf^mEZrt_0;R_jU-HDF=Le4vf|FM z4X3_gGCpqn=%4;z(2lBmt>_F$8olte7Yx+++xHC8%YvJo$78-S6ck*9k>`5B7@I|c zOEA>wweo43a6#HumVX5zY_f!v(QL5Do(!H$JR-sk$N9o$7dgghxM2CE>Oa3th>4mk zSj#X;zfd?3&xCFJ83fB^w^liDUMAt`AXPU?sPmK$>NJAodxy3j?*;hUYUKrqY>_EDredi z1<6GX&I4CVGeRP&p=97ky-((F8I1m3IXnpr-ub%pIzGhmo^%zu_T1r5^luB+wIS6zCvOXrsU|25l8Mi$OaDe#D@?0{2k}w+4@pj#d3NsGwlK z+~jjdwBD}plXG4*$_<7RiSCu)hrw_>25tmfXh&3Ykspk--AKW|T5E>(lGG?DsF0TK z+w(ho5}XwDz0u|A^?ce|BULdwMO?d|kMHCJDJx8DUNPilWI{|7DTTyT>SVHa^i?jU zM)c^=_IcjiJxoeXrL4&ak!oQqBBMw-zN{Yr*RBK$vAK{n!Abo6+#;|MTp{j;N=Ul_ zCC6q2yI`8qwB7J9k2F_{rNWRw@wEnv8F!RfDv?oV+1V$`5O%>#+Pp zX!RQtbS4uN1up+%oD@6O6a|XBn{iwr?n=i<^^6j2X2yZ3W2Oun+C#{_cg-X#+-NCh z7LOCa?}jTZjh_#PL)Fs$p})4k*mdlW%BU^!qp_GrF`=(&tUwtd?7oBafhy06)8WQM!4bA)Y7xg!n?3?X1=hQb8Ygg z0%Ye|gqt~2EtTA?{1Y6QmoI$AxswtfEtAJ36YkD=YRNY4)O~1KkT0CiWh=is7X$?UQaWDX4~|UJP0%cdSWdsJ~~vY zfBc_Cu(Wk@znIuaR5Fl@(r@M8c>mEqVReq9`}@Jq6DN#{8K<16X$Ls_FeeCJRQpdW z|9X2z^pq*1ksH0+NyD8OnUN!dkUCxyFqiS3Fid!azM_>MVF!dBimXTWua#ag2=5Wm zbo%r_10I~sgoqqRsLpkV#?~Nq8gy4wg4-~9riQwhDQ7CU_R>Z&^9n{@Ovl*k7RRDv zq9T!vQ6-`a9?`u&~rX?Eu41WHhe#t3$!9j!PhkP(JBq%V*FFC*v5a>@nBC>(Z zU5+4?+rr5rH}HO94gSS;y+2G%&RyW1oH{2xB{wA--YM2XRI!~%tbpFdj;c9nxv9BJ zl2Zx_bWmAtD_tHPYaut3y9onSNJ1z*WlqkL?3DDBjNDnN8DxRHIcbPiavD!TcHO%U z!`)K7ysyGnvRp#ONPF5G_A%V5^pjs78jEH1{+~t9)tzCPO>5%9^Cq}DE*x)t!kX0>E(>79Ko6GznZ?=NW;r9TE{lVW5JI(@OHg3%xrariSdIh5{x_Qe3b04E8M&= za!z_JpG6)DbDwEVD`)yq&b34=c?d{~D5)?l+37W=Qk$l2j)s{&pxDFdgXTZt>X3z>H%I8r^%j68S+ z1ZB%(Z1~MHHfl~ee@W%RkmHjC^+AaL!qJ6XjmAto&mOc0br>mXBTs@Tx#PGkh97H+ zbeFtN=CSSTDrzLE<%7Rof+Y=v*Y>ki_6kcj1Us0s!}RDhG}gL8X|0*1;uMzb;1H2l zoWhbl!AAM)3-EJoU$N~H0;@0kx-7iR9mf)_wzPE}!h8QJD7eEy;CFm7gg2-l;R}tO z8f6(vr5MU5M9Y=OpI~X=yG_ercKv%ksfYOZPEL@6j-Fj4s!{#F`Ow5xPV-vJ(;5V+ zb#2l>HdAe^%{w`}$IP~Sd}z&mPWRqtGehlD)R1}dJy3^eJiGO{wsEP$MtaFe(LEON z+6Fre3Eys$B)4Z@R0Nn~F<>&r)J9g@L@QcPG~_zxH+ncS>rF?*w!o4gevcuH+wmW={ZmBA(#(0<&6fB(P);s(>!JS$UUZ;j*bC`Q-kYtuc-s^PfSg-M!CyPcOm{4 zQ>j^zdo$tDnP5R^XE~J6c>WM^UZU=`?uF-HDLdfcvc6~JFZ76f-qxZF9(nRp zWi}OKd-4w1kQsDmz# zayl(GGFGuM%P`4JOLl~Zgll^-)@fr&>1I8(au`Y)b{d>Q}% diff --git a/disdrodb/tests/data/check_readers/DISDRODB/Raw/UK/DIVEN/metadata/CAIRNGORM.yml b/disdrodb/tests/data/check_readers/DISDRODB/Raw/UK/DIVEN/metadata/CAIRNGORM.yml index 54be96d8..492c0c4a 100755 --- a/disdrodb/tests/data/check_readers/DISDRODB/Raw/UK/DIVEN/metadata/CAIRNGORM.yml +++ b/disdrodb/tests/data/check_readers/DISDRODB/Raw/UK/DIVEN/metadata/CAIRNGORM.yml @@ -38,7 +38,7 @@ firmware_version: "" sensor_beam_length: "" sensor_beam_width: "" sensor_nominal_width: "" -measurement_interval: "" +measurement_interval: 60 calibration_sensitivity: "" calibration_certification_date: "" calibration_certification_url: "" diff --git a/disdrodb/tests/data/test_dir_structure/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml b/disdrodb/tests/data/test_dir_structure/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml index 4d04c7be..0d55359d 100644 --- a/disdrodb/tests/data/test_dir_structure/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml +++ b/disdrodb/tests/data/test_dir_structure/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml @@ -28,7 +28,7 @@ firmware_dsp: "" firmware_version: "" sensor_beam_width: "" sensor_nominal_width: "" -measurement_interval: "" +measurement_interval: 30 contributors: "" authors: "" institution: "" diff --git a/disdrodb/tests/data/test_dir_structure/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml b/disdrodb/tests/data/test_dir_structure/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml index 838e17f9..f9741785 100644 --- a/disdrodb/tests/data/test_dir_structure/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml +++ b/disdrodb/tests/data/test_dir_structure/DISDRODB/Raw/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml @@ -37,7 +37,7 @@ firmware_version: "" sensor_beam_length: "" sensor_beam_width: "" sensor_nominal_width: "" -measurement_interval: "" +measurement_interval: 30 calibration_sensitivity: "" calibration_certification_date: "" calibration_certification_url: "" diff --git a/disdrodb/tests/pytest_files/test_folders_files_creation/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATIONID.yml b/disdrodb/tests/pytest_files/test_folders_files_creation/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATIONID.yml index 19bd7cf0..5ce6e661 100644 --- a/disdrodb/tests/pytest_files/test_folders_files_creation/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATIONID.yml +++ b/disdrodb/tests/pytest_files/test_folders_files_creation/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATIONID.yml @@ -28,7 +28,7 @@ firmware_dsp: "" firmware_version: "" sensor_beam_width: "" sensor_nominal_width: "" -measurement_interval: "" +measurement_interval: 30 contributors: "" authors: "" institution: "" diff --git a/disdrodb/tests/pytest_files/test_folders_files_creation/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml b/disdrodb/tests/pytest_files/test_folders_files_creation/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml index 5dfd8800..953b97c7 100644 --- a/disdrodb/tests/pytest_files/test_folders_files_creation/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml +++ b/disdrodb/tests/pytest_files/test_folders_files_creation/DISDRODB/Processed/DATA_SOURCE/CAMPAIGN_NAME/metadata/STATION_NAME.yml @@ -36,7 +36,7 @@ firmware_version: "" sensor_beam_length: "" sensor_beam_width: "" sensor_nominal_width: "" -measurement_interval: "" +measurement_interval: 30 calibration_sensitivity: "" calibration_certification_date: "" calibration_certification_url: "" diff --git a/disdrodb/tests/pytest_files/test_folders_files_creation/metadata/123.yml b/disdrodb/tests/pytest_files/test_folders_files_creation/metadata/123.yml index a5e873db..cfd7a2cf 100644 --- a/disdrodb/tests/pytest_files/test_folders_files_creation/metadata/123.yml +++ b/disdrodb/tests/pytest_files/test_folders_files_creation/metadata/123.yml @@ -36,7 +36,7 @@ firmware_version: "" sensor_beam_length: "" sensor_beam_width: "" sensor_nominal_width: "" -measurement_interval: "" +measurement_interval: 30 calibration_sensitivity: "" calibration_certification_date: "" calibration_certification_url: "" diff --git a/disdrodb/tests/test_api/test_api_path.py b/disdrodb/tests/test_api/test_api_path.py index 6968b080..7590c2f9 100644 --- a/disdrodb/tests/test_api/test_api_path.py +++ b/disdrodb/tests/test_api/test_api_path.py @@ -74,6 +74,8 @@ def test_define_l0b_filename(product): # Set variables campaign_name = "CAMPAIGN_NAME" station_name = "STATION_NAME" + sample_interval = 10 + sample_interval_str = "10S" start_date = datetime.datetime(2019, 3, 26, 0, 0, 0) end_date = datetime.datetime(2021, 2, 8, 0, 0, 0) @@ -83,12 +85,16 @@ def test_define_l0b_filename(product): ds = xr.DataArray( data=data, dims=["time"], - coords={"time": pd.date_range(start=start_date, end=end_date)}, + coords={"time": pd.date_range(start=start_date, end=end_date), "sample_interval": sample_interval}, ) - + # Define expected results - expected_name = f"{product}.CAMPAIGN_NAME.STATION_NAME.s20190326000000.e20210208000000.{PRODUCT_VERSION}.nc" - + # TODO: MODIFY ! + if product == "L0B": + expected_name = f"{product}.CAMPAIGN_NAME.STATION_NAME.s20190326000000.e20210208000000.{PRODUCT_VERSION}.nc" + else: + expected_name = f"{product}.{sample_interval_str}.CAMPAIGN_NAME.STATION_NAME.s20190326000000.e20210208000000.{PRODUCT_VERSION}.nc" + # Test the function define_filename_func = define_l0b_filename if product == "L0B" else define_l0c_filename res = define_filename_func(ds, campaign_name, station_name) diff --git a/disdrodb/tests/test_l0/test_l0a_processing.py b/disdrodb/tests/test_l0/test_l0a_processing.py index 0bf05549..c87059ad 100644 --- a/disdrodb/tests/test_l0/test_l0a_processing.py +++ b/disdrodb/tests/test_l0/test_l0a_processing.py @@ -151,7 +151,8 @@ def test_remove_corrupted_rows(): remove_corrupted_rows(pd.DataFrame()) # Test case 3: Check if the function raises ValueError when only one row remains - with pytest.raises(ValueError, match=r"Only 1 row remains after data corruption checks. Check the file."): + msg = r"Only 1 row remains after data corruption checks. Check the raw file and maybe delete it." + with pytest.raises(ValueError, match=msg): remove_corrupted_rows(pd.DataFrame({"raw_drop_number": ["1"]})) diff --git a/disdrodb/utils/logger.py b/disdrodb/utils/logger.py index f03b423e..8338dd9b 100644 --- a/disdrodb/utils/logger.py +++ b/disdrodb/utils/logger.py @@ -23,8 +23,6 @@ import re from asyncio.log import logger -from disdrodb.api.path import define_campaign_dir, define_filename, define_logs_dir - def create_logger_file(logs_dir, filename, parallel): """Create logger file.""" @@ -85,7 +83,8 @@ def log_debug(logger: logger, msg: str, verbose: bool = False) -> None: """ if verbose: print(" - " + msg) - logger.debug(msg) + if logger is not None: + logger.debug(msg) def log_info(logger: logger, msg: str, verbose: bool = False) -> None: @@ -103,7 +102,8 @@ def log_info(logger: logger, msg: str, verbose: bool = False) -> None: """ if verbose: print(" - " + msg) - logger.info(msg) + if logger is not None: + logger.info(msg) def log_warning(logger: logger, msg: str, verbose: bool = False) -> None: @@ -121,7 +121,8 @@ def log_warning(logger: logger, msg: str, verbose: bool = False) -> None: """ if verbose: print(" - " + msg) - logger.warning(msg) + if logger is not None: + logger.warning(msg) def log_error(logger: logger, msg: str, verbose: bool = False) -> None: @@ -139,7 +140,8 @@ def log_error(logger: logger, msg: str, verbose: bool = False) -> None: """ if verbose: print(" - " + msg) - logger.error(msg) + if logger is not None: + logger.error(msg) ####---------------------------------------------------------------------------. @@ -253,6 +255,7 @@ def create_product_logs( None """ + from disdrodb.api.path import define_campaign_dir, define_filename, define_logs_dir from disdrodb.utils.directories import list_files # --------------------------------------------------------. diff --git a/disdrodb/utils/time.py b/disdrodb/utils/time.py index d35bf900..2da1aa1b 100644 --- a/disdrodb/utils/time.py +++ b/disdrodb/utils/time.py @@ -16,6 +16,7 @@ # -----------------------------------------------------------------------------. """This module contains utilities related to the processing of temporal dataset.""" import logging +import re from typing import Optional import numpy as np @@ -23,10 +24,114 @@ import xarray as xr from xarray.core import dtypes -from disdrodb.utils.logger import log_info +from disdrodb.utils.logger import log_info, log_warning logger = logging.getLogger(__name__) +####------------------------------------------------------------------------------------. +#### Sampling Interval Acronyms + + +def seconds_to_acronym(seconds): + """ + Convert a duration in seconds to a readable string format (e.g., "1H30", "1D2H"). + + Parameters + ---------- + - seconds (int): The time duration in seconds. + + Returns + ------- + - str: The duration as a string in a format like "30S", "1MIN30S", "1H30MIN", or "1D2H". + """ + timedelta = pd.Timedelta(seconds=seconds) + components = timedelta.components + + parts = [] + if components.days > 0: + parts.append(f"{components.days}D") + if components.hours > 0: + parts.append(f"{components.hours}H") + if components.minutes > 0: + parts.append(f"{components.minutes}MIN") + if components.seconds > 0: + parts.append(f"{components.seconds}S") + acronym = "".join(parts) + return acronym + + +def get_resampling_information(sample_interval_acronym): + """ + Extract resampling information from the sample interval acronym. + + Parameters + ---------- + sample_interval_acronym: str + A string representing the sample interval: e.g., "1H30MIN", "ROLL1H30MIN". + + Returns + ------- + sample_interval_seconds, rolling: tuple + Sample_interval in seconds and whether rolling is enabled. + """ + rolling = sample_interval_acronym.startswith("ROLL") + if rolling: + sample_interval_acronym = sample_interval_acronym[4:] # Remove "ROLL" + + # Allowed pattern: one or more occurrences of "" + # where unit is exactly one of D, H, MIN, or S. + # Examples: 1H, 30MIN, 2D, 45S, and any concatenation like 1H30MIN. + pattern = r"^(\d+(?:D|H|MIN|S))+$" + + # Check if the entire string matches the pattern + if not re.match(pattern, sample_interval_acronym): + raise ValueError( + f"Invalid sample interval acronym '{sample_interval_acronym}'. " + "Must be composed of one or more groups, where unit is D, H, MIN, or S.", + ) + + # Regular expression to match duration components and extract all (value, unit) pairs + pattern = r"(\d+)(D|H|MIN|S)" + matches = re.findall(pattern, sample_interval_acronym) + + # Conversion factors for each unit + unit_to_seconds = { + "D": 86400, # Seconds in a day + "H": 3600, # Seconds in an hour + "MIN": 60, # Seconds in a minute + "S": 1, # Seconds in a second + } + + # Parse matches and calculate total seconds + sample_interval = 0 + for value, unit in matches: + value = int(value) + if unit in unit_to_seconds: + sample_interval += value * unit_to_seconds[unit] + return sample_interval, rolling + + +def acronym_to_seconds(acronym): + """ + Extract the interval in seconds from the duration acronym. + + Parameters + ---------- + acronym: str + A string representing a duration: e.g., "1H30MIN", "ROLL1H30MIN". + + Returns + ------- + seconds + Duration in seconds. + """ + seconds, _ = get_resampling_information(acronym) + return seconds + + +####------------------------------------------------------------------------------------. +#### Xarray utilities + def get_dataset_start_end_time(ds: xr.Dataset, time_dim="time"): """Retrieves dataset starting and ending time. @@ -138,8 +243,58 @@ def ensure_sorted_by_time(ds): return ds -def infer_sample_interval(ds, verbose=False, robust=False): - """Infer the sample interval of a dataset.""" +####------------------------------------------ +#### Sampling interval utilities + + +def ensure_sample_interval_in_seconds(sample_interval): + """ + Ensure the sample interval is in seconds. + + Parameters + ---------- + sample_interval : int, numpy.ndarray, xarray.DataArray, or numpy.timedelta64 + The sample interval to be converted to seconds. + It can be: + - An integer representing the interval in seconds. + - A numpy array or xarray DataArray of integers representing intervals in seconds. + - A numpy.timedelta64 object representing the interval. + - A numpy array or xarray DataArray of numpy.timedelta64 objects representing intervals. + + Returns + ------- + int, numpy.ndarray, or xarray.DataArray + The sample interval converted to seconds. The return type matches the input type: + - If the input is an integer, the output is an integer. + - If the input is a numpy array, the output is a numpy array of integers. + - If the input is an xarray DataArray, the output is an xarray DataArray of integers. + + """ + if ( + isinstance(sample_interval, int) + or isinstance(sample_interval, (np.ndarray, xr.DataArray)) + and np.issubdtype(sample_interval.dtype, int) + ): + return sample_interval + if isinstance(sample_interval, np.timedelta64): + return sample_interval / np.timedelta64(1, "s") + if isinstance(sample_interval, np.ndarray) and np.issubdtype(sample_interval.dtype, np.timedelta64): + return sample_interval.astype("timedelta64[s]").astype(int) + if isinstance(sample_interval, xr.DataArray) and np.issubdtype(sample_interval.dtype, np.timedelta64): + sample_interval = sample_interval.copy() + sample_interval_int = sample_interval.data.astype("timedelta64[s]").astype(int) + sample_interval.data = sample_interval_int + return sample_interval + raise TypeError( + "sample_interval must be an int, numpy.timedelta64, or numpy array of timedelta64.", + ) + + +def infer_sample_interval(ds, robust=False, verbose=False, logger=None): + """Infer the sample interval of a dataset. + + NOTE: This function is not used in the DISDRODB processing chain. + """ # Check sorted by time and sort if necessary ds = ensure_sorted_by_time(ds) @@ -204,21 +359,19 @@ def infer_sample_interval(ds, verbose=False, robust=False): if robust and np.any(deltadt == 0): raise ValueError("Likely presence of duplicated timesteps.") - # - Raise error if estimated sample interval has frequency less than 80 % - # TODO: this currently allow to catch up sensors that logs data only when rainy ! - # --> HPICONET often below 50 % - sample_interval_fraction_threshold = 50 + ####-------------------------------------------------------------------------. + #### Informative messages + # - Log a warning if estimated sample interval has frequency less than 60 % + sample_interval_fraction_threshold = 60 msg = ( f"The most frequent sampling interval ({sample_interval} s) " + f"has a frequency lower than {sample_interval_fraction_threshold}%: {sample_interval_fraction} %. " + f"Total number of timesteps: {n_timesteps}." ) if sample_interval_fraction < sample_interval_fraction_threshold: - raise ValueError(msg) - if verbose and sample_interval_fraction < sample_interval_fraction_threshold: - print(msg) + log_warning(logger=logger, msg=msg, verbose=verbose) - # - Raise error if an unexpected interval has frequency larger than 20 percent + # - Log a warning if an unexpected interval has frequency larger than 20 percent frequent_unexpected_intervals = unexpected_intervals[unexpected_intervals_fractions > 20] if len(frequent_unexpected_intervals) != 0: frequent_unexpected_intervals_str = ", ".join( @@ -229,9 +382,9 @@ def infer_sample_interval(ds, verbose=False, robust=False): + f"greater than 20%: {frequent_unexpected_intervals_str} %. " + f"Total number of timesteps: {n_timesteps}." ) - raise ValueError(msg) + log_warning(logger=logger, msg=msg, verbose=verbose) - # - Raise error if the most frequent interval is not the smallest + # - Raise error if the most frequent interval is not the expected one ! if sample_interval != min_delta: raise ValueError( f"The most frequent sampling interval ({sample_interval} seconds) " @@ -242,6 +395,10 @@ def infer_sample_interval(ds, verbose=False, robust=False): return int(sample_interval) +####--------------------------------------------------------------------------------- +#### Timesteps regularization + + def get_problematic_timestep_indices(timesteps, sample_interval): """Identify timesteps with missing previous or following timesteps.""" previous_time = timesteps - pd.Timedelta(seconds=sample_interval) @@ -254,7 +411,7 @@ def get_problematic_timestep_indices(timesteps, sample_interval): return idx_previous_missing, idx_next_missing, idx_isolated_missing -def regularize_timesteps(ds, sample_interval, robust=False, add_quality_flag=True): +def regularize_timesteps(ds, sample_interval, robust=False, add_quality_flag=True, logger=None, verbose=True): """Ensure timesteps match with the sample_interval.""" # Check sorted by time and sort if necessary ds = ensure_sorted_by_time(ds) @@ -320,7 +477,7 @@ def regularize_timesteps(ds, sample_interval, robust=False, add_quality_flag=Tru if n_duplicates == 2: if prev_time not in adjusted_times: adjusted_times[dup_indices[0]] = prev_time - qc_flag[dup_indices[0]] = 1 + qc_flag[dup_indices[0]] = flag_solved_duplicated_timestep count_solved += 1 elif next_time not in adjusted_times: adjusted_times[dup_indices[-1]] = next_time @@ -335,15 +492,18 @@ def regularize_timesteps(ds, sample_interval, robust=False, add_quality_flag=Tru count_solved += 1 if next_time not in adjusted_times: adjusted_times[dup_indices[-1]] = next_time - qc_flag[dup_indices[-1]] = 1 + qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep count_solved += 1 if count_solved != n_duplicates - 1: idx_to_drop = np.append(idx_to_drop, dup_indices[0:-1]) - qc_flag[dup_indices[-1]] = 2 - msg = f"Cannot resolve {n_duplicates} duplicated timesteps around {dup_time}." + qc_flag[dup_indices[-1]] = flag_dropped_duplicated_timestep + msg = ( + f"Cannot resolve {n_duplicates} duplicated timesteps" + f"(after trailing seconds correction) around {dup_time}." + ) + log_warning(logger=logger, msg=msg, verbose=verbose) if robust: raise ValueError(msg) - print(msg) # Update the time coordinate (Convert to ns for xarray compatibility) ds = ds.assign_coords({"time": adjusted_times.astype("datetime64[ns]")}) @@ -357,8 +517,23 @@ def regularize_timesteps(ds, sample_interval, robust=False, add_quality_flag=Tru qc_flag[idx_previous_missing] = np.maximum(qc_flag[idx_previous_missing], flag_previous_missing) qc_flag[idx_next_missing] = np.maximum(qc_flag[idx_next_missing], flag_next_missing) qc_flag[idx_isolated_missing] = np.maximum(qc_flag[idx_isolated_missing], flag_isolated_timestep) - # Assign time quality flag - ds["qc_time"] = xr.DataArray(qc_flag, dims="time") + + # If the first timestep is at 00:00 and currently flagged as previous missing (1), reset to 0 + # first_time = pd.to_datetime(adjusted_times[0]).time() + # first_expected_time = pd.Timestamp("00:00:00").time() + # if first_time == first_expected_time and qc_flag[0] == flag_previous_missing: + # qc_flag[0] = 0 + + # # If the last timestep is flagged and currently flagged as next missing (2), reset it to 0 + # last_time = pd.to_datetime(adjusted_times[-1]).time() + # last_time_expected = (pd.Timestamp("00:00:00") - pd.Timedelta(30, unit="seconds")).time() + # # Check if adding one interval would go beyond the end_time + # if last_time == last_time_expected and qc_flag[-1] == flag_next_missing: + # qc_flag[-1] = 0 + + # Assign time quality flag coordinate + ds["time_qc"] = xr.DataArray(qc_flag, dims="time") + ds = ds.set_coords("time_qc") # Drop duplicated timesteps if len(idx_to_drop) > 0: diff --git a/docs/source/metadata_csv/Sensor_Info.csv b/docs/source/metadata_csv/Sensor_Info.csv index 5a08ba5f..bf59ae17 100644 --- a/docs/source/metadata_csv/Sensor_Info.csv +++ b/docs/source/metadata_csv/Sensor_Info.csv @@ -9,7 +9,7 @@ firmware_version,Firmware version sensor_beam_length,Length of the laser beam's measurement area in mm sensor_beam_width,Width of the laser beam's measurement area in mm sensor_nominal_width,Expected width of the sensor beam under typical operating conditions -measurement_interval,Number of seconds over which measurements are taken +measurement_interval,Number of seconds over which measurements are taken. calibration_sensitivity,Sensor sensitivity calibration_certification_date,Sensor calibration date(s) calibration_certification_url,Sensor calibration certification url