diff --git a/disdrodb/api/checks.py b/disdrodb/api/checks.py index 640d6ee9..60268b0a 100644 --- a/disdrodb/api/checks.py +++ b/disdrodb/api/checks.py @@ -309,7 +309,10 @@ def check_issue_file(data_source, campaign_name, station_name, base_dir=None): # Check existence if not os.path.exists(issue_filepath): create_station_issue( - base_dir=base_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name + base_dir=base_dir, + data_source=data_source, + campaign_name=campaign_name, + station_name=station_name, ) # Check validity diff --git a/disdrodb/l0/l0_processing.py b/disdrodb/l0/l0_processing.py index 21b243cb..55f533a6 100644 --- a/disdrodb/l0/l0_processing.py +++ b/disdrodb/l0/l0_processing.py @@ -63,7 +63,6 @@ write_l0b, ) from disdrodb.l0.l0c_processing import ( - copy_l0b_to_l0c_directory, create_daily_file, get_files_per_days, ) @@ -356,31 +355,33 @@ def _generate_l0c( ### Core computation try: # If already single file per day, copy L0B to L0C - if len(filepaths) == 1: - copy_l0b_to_l0c_directory(filepaths[0]) - # Otherwise combine products ! - else: - # Produce L0C dataset - ds = create_daily_file(day=day, filepaths=filepaths, verbose=verbose) + # --> File start_time and end_time should also be within the day ! + # if len(filepaths) == 1: + # files_start_time, files_end_time = get_start_end_time_from_filepaths(filepaths) + # files_start_time.astype("M8[D]"), files_end_time.astype("M8[D]") + # copy_l0b_to_l0c_directory(filepaths[0]) - # Write L0C netCDF4 dataset - if ds["time"].size > 1: + # Produce L0C dataset + ds = create_daily_file(day=day, filepaths=filepaths, verbose=verbose) - # Get sensor name from dataset - sensor_name = ds.attrs.get("sensor_name") + # Write L0C netCDF4 dataset + if ds["time"].size > 1: - # Set encodings - ds = set_l0b_encodings(ds=ds, sensor_name=sensor_name) + # Get sensor name from dataset + sensor_name = ds.attrs.get("sensor_name") - # Define filepath - filename = define_l0c_filename(ds, campaign_name=campaign_name, station_name=station_name) - filepath = os.path.join(data_dir, filename) + # Set encodings + ds = set_l0b_encodings(ds=ds, sensor_name=sensor_name) - # Write to disk - write_product(ds, product=product, filepath=filepath, force=force) + # Define filepath + filename = define_l0c_filename(ds, campaign_name=campaign_name, station_name=station_name) + filepath = os.path.join(data_dir, filename) - # Clean environment - del ds + # Write to disk + write_product(ds, product=product, filepath=filepath, force=force) + + # Clean environment + del ds # Log end processing msg = f"{product} processing for {day} has ended." diff --git a/disdrodb/l0/readers/EPFL/UNIL_2022.py b/disdrodb/l0/readers/EPFL/UNIL_2022.py index e380015a..cc2cec95 100644 --- a/disdrodb/l0/readers/EPFL/UNIL_2022.py +++ b/disdrodb/l0/readers/EPFL/UNIL_2022.py @@ -92,10 +92,15 @@ def df_sanitizer_fun(df): df["time"] = pd.to_datetime(df["time"], format="%d-%m-%Y %H:%M:%S", errors="coerce") # - Split TO_BE_SPLITTED columns + df_splitted = df["TO_BE_SPLITTED"].str.split(",", expand=True, n=1) df_splitted.columns = ["datalogger_voltage", "rainfall_rate_32bit"] df["rainfall_rate_32bit"] = df_splitted["rainfall_rate_32bit"] + # Remove rows with error in data reading + # - When datalogger error: rainfall_rate_32bit: Error in data reading! + df = df[df["rainfall_rate_32bit"] != "Error in data reading! 0"] + # - Drop columns not agreeing with DISDRODB L0 standards columns_to_drop = [ "id", diff --git a/disdrodb/l0/readers/GPM/MC3E.py b/disdrodb/l0/readers/GPM/MC3E.py index 775d14f4..30156005 100644 --- a/disdrodb/l0/readers/GPM/MC3E.py +++ b/disdrodb/l0/readers/GPM/MC3E.py @@ -65,40 +65,104 @@ def reader( #### - Define dataframe sanitizer function for L0 processing def df_sanitizer_fun(df): # - Import pandas + import numpy as np import pandas as pd - # - Define 'time' datetime - df_time = pd.to_datetime(df["time"], format="%Y%m%d%H%M%S", errors="coerce") + # - Convert 'time' column to datetime + df["time"] = pd.to_datetime(df["time"], format="%Y%m%d%H%M%S", errors="coerce") - # - Split the 'TO_BE_SPLITTED' column - df = df["TO_BE_SPLITTED"].str.split(",", n=9, expand=True) + # Count number of delimiters in the column to be parsed + # --> Some first rows are corrupted, so count the most frequent occurrence + possible_delimiters, counts = np.unique(df["TO_BE_SPLITTED"].str.count(","), return_counts=True) + n_delimiters = possible_delimiters[np.argmax(counts)] - # - Assign column names - column_names = [ - "station_name", - "sensor_status", - "sensor_temperature", - "number_particles", - "rainfall_rate_32bit", - "reflectivity_16bit", - "mor_visibility", - "weather_code_synop_4680", - "weather_code_synop_4677", - "raw_drop_number", - ] - df.columns = column_names - - # - Add the time column - df["time"] = df_time + if n_delimiters == 1031: # first files + # - Select valid rows + df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1031] + # - Get time column + df_time = df["time"] + # - Split the 'TO_BE_SPLITTED' column + df = df["TO_BE_SPLITTED"].str.split(",", expand=True, n=7) + # - Assign column names + column_names = [ + "station_name", + "sensor_status", + "sensor_temperature", + "reflectivity_32bit", + "mor_visibility", + "weather_code_synop_4680", + "weather_code_synop_4677", + "raw_drop_number", + ] + df.columns = column_names + # - Add time column + df["time"] = df_time + # - Remove columns not in other files + df = df.drop(columns="reflectivity_32bit") + # - Add missing columns and set NaN value + missing_columns = [ + "number_particles", + "rainfall_rate_32bit", + "reflectivity_16bit", + ] + for column in missing_columns: + df[column] = "NaN" + elif n_delimiters == 1033: # (most of the files) + # - Select valid rows + df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1033] + # - Get time column + df_time = df["time"] + # - Split the column be parsed + df = df["TO_BE_SPLITTED"].str.split(",", expand=True, n=9) + # - Assign column names + column_names = [ + "station_name", + "sensor_status", + "sensor_temperature", + "number_particles", + "rainfall_rate_32bit", + "reflectivity_16bit", + "mor_visibility", + "weather_code_synop_4680", + "weather_code_synop_4677", + "raw_drop_number", + ] + df.columns = column_names + # - Add time column + df["time"] = df_time + elif n_delimiters == 1035: # APU 17 first files + # - Select valid rows + df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1035] + # - Get time column + df_time = df["time"] + # - Split the column be parsed + df = df["TO_BE_SPLITTED"].str.split(",", expand=True, n=11) + # - Assign column names + column_names = [ + "station_name", + "sensor_date", + "sensor_time", + "sensor_status", + "sensor_temperature", + "number_particles", + "rainfall_rate_32bit", + "reflectivity_16bit", + "mor_visibility", + "weather_code_synop_4680", + "weather_code_synop_4677", + "raw_drop_number", + ] + df.columns = column_names + # - Add time column + df["time"] = df_time + # - Drop columns not needed + df = df.drop(columns=["sensor_time", "sensor_date"]) + else: + # Wrong number of delimiters ... likely a corrupted file + raise ValueError("Unexpected number of comma delimiters !") # - Drop columns not agreeing with DISDRODB L0 standards df = df.drop(columns=["station_name"]) - - # - Drop rows with invalid values - # --> Ensure that weather_code_synop_4677 has length 2 - # --> If a previous column is missing it will have 000 - df = df[df["weather_code_synop_4677"].str.len() == 2] - return df ##------------------------------------------------------------------------. diff --git a/disdrodb/l0/readers/GPM/NSSTC.py b/disdrodb/l0/readers/GPM/NSSTC.py index 7595ada7..908b1349 100644 --- a/disdrodb/l0/readers/GPM/NSSTC.py +++ b/disdrodb/l0/readers/GPM/NSSTC.py @@ -82,7 +82,7 @@ def df_sanitizer_fun(df): possible_delimiters, counts = np.unique(df["TO_BE_SPLITTED"].str.count(","), return_counts=True) n_delimiters = possible_delimiters[np.argmax(counts)] - if n_delimiters == 1027: + if n_delimiters == 1027: # APU 2010 # - Select valid rows df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1027] # - Get time column @@ -110,6 +110,37 @@ def df_sanitizer_fun(df): ] for column in missing_columns: df[column] = "NaN" + elif n_delimiters == 1031: # APU08 (2011) + # - Select valid rows + df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1031] + # - Get time column + df_time = df["time"] + # - Split the 'TO_BE_SPLITTED' column + df = df["TO_BE_SPLITTED"].str.split(",", expand=True, n=7) + # - Assign column names + column_names = [ + "station_name", + "sensor_status", + "sensor_temperature", + "reflectivity_32bit", + "mor_visibility", + "weather_code_synop_4680", + "weather_code_synop_4677", + "raw_drop_number", + ] + df.columns = column_names + # - Add time column + df["time"] = df_time + # - Remove columns not in other files + df = df.drop(columns="reflectivity_32bit") + # - Add missing columns and set NaN value + missing_columns = [ + "number_particles", + "rainfall_rate_32bit", + "reflectivity_16bit", + ] + for column in missing_columns: + df[column] = "NaN" elif n_delimiters == 1033: # - Select valid rows df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1033] diff --git a/disdrodb/utils/dask.py b/disdrodb/utils/dask.py index b5d52bc5..ee3c5aae 100644 --- a/disdrodb/utils/dask.py +++ b/disdrodb/utils/dask.py @@ -17,8 +17,8 @@ # along with this program. If not, see . # -----------------------------------------------------------------------------. """Utilities for Dask Distributed computations.""" -import os import logging +import os def initialize_dask_cluster(): @@ -60,5 +60,3 @@ def close_dask_cluster(cluster, client): finally: # Restore the original log level logger.setLevel(original_level) - return None -