Skip to content

Commit

Permalink
Fix issue in L0C processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ghiggi committed Dec 16, 2024
1 parent 7874379 commit 68ca587
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 52 deletions.
5 changes: 4 additions & 1 deletion disdrodb/api/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,10 @@ def check_issue_file(data_source, campaign_name, station_name, base_dir=None):
# Check existence
if not os.path.exists(issue_filepath):
create_station_issue(
base_dir=base_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name
base_dir=base_dir,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
)

# Check validity
Expand Down
41 changes: 21 additions & 20 deletions disdrodb/l0/l0_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
write_l0b,
)
from disdrodb.l0.l0c_processing import (
copy_l0b_to_l0c_directory,
create_daily_file,
get_files_per_days,
)
Expand Down Expand Up @@ -356,31 +355,33 @@ def _generate_l0c(
### Core computation
try:
# If already single file per day, copy L0B to L0C
if len(filepaths) == 1:
copy_l0b_to_l0c_directory(filepaths[0])
# Otherwise combine products !
else:
# Produce L0C dataset
ds = create_daily_file(day=day, filepaths=filepaths, verbose=verbose)
# --> File start_time and end_time should also be within the day !
# if len(filepaths) == 1:
# files_start_time, files_end_time = get_start_end_time_from_filepaths(filepaths)
# files_start_time.astype("M8[D]"), files_end_time.astype("M8[D]")
# copy_l0b_to_l0c_directory(filepaths[0])

# Write L0C netCDF4 dataset
if ds["time"].size > 1:
# Produce L0C dataset
ds = create_daily_file(day=day, filepaths=filepaths, verbose=verbose)

# Get sensor name from dataset
sensor_name = ds.attrs.get("sensor_name")
# Write L0C netCDF4 dataset
if ds["time"].size > 1:

# Set encodings
ds = set_l0b_encodings(ds=ds, sensor_name=sensor_name)
# Get sensor name from dataset
sensor_name = ds.attrs.get("sensor_name")

# Define filepath
filename = define_l0c_filename(ds, campaign_name=campaign_name, station_name=station_name)
filepath = os.path.join(data_dir, filename)
# Set encodings
ds = set_l0b_encodings(ds=ds, sensor_name=sensor_name)

# Write to disk
write_product(ds, product=product, filepath=filepath, force=force)
# Define filepath
filename = define_l0c_filename(ds, campaign_name=campaign_name, station_name=station_name)
filepath = os.path.join(data_dir, filename)

# Clean environment
del ds
# Write to disk
write_product(ds, product=product, filepath=filepath, force=force)

# Clean environment
del ds

# Log end processing
msg = f"{product} processing for {day} has ended."
Expand Down
5 changes: 5 additions & 0 deletions disdrodb/l0/readers/EPFL/UNIL_2022.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,15 @@ def df_sanitizer_fun(df):
df["time"] = pd.to_datetime(df["time"], format="%d-%m-%Y %H:%M:%S", errors="coerce")

# - Split TO_BE_SPLITTED columns

df_splitted = df["TO_BE_SPLITTED"].str.split(",", expand=True, n=1)
df_splitted.columns = ["datalogger_voltage", "rainfall_rate_32bit"]
df["rainfall_rate_32bit"] = df_splitted["rainfall_rate_32bit"]

# Remove rows with error in data reading
# - When datalogger error: rainfall_rate_32bit: Error in data reading!
df = df[df["rainfall_rate_32bit"] != "Error in data reading! 0"]

# - Drop columns not agreeing with DISDRODB L0 standards
columns_to_drop = [
"id",
Expand Down
118 changes: 91 additions & 27 deletions disdrodb/l0/readers/GPM/MC3E.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,40 +65,104 @@ def reader(
#### - Define dataframe sanitizer function for L0 processing
def df_sanitizer_fun(df):
# - Import pandas
import numpy as np
import pandas as pd

# - Define 'time' datetime
df_time = pd.to_datetime(df["time"], format="%Y%m%d%H%M%S", errors="coerce")
# - Convert 'time' column to datetime
df["time"] = pd.to_datetime(df["time"], format="%Y%m%d%H%M%S", errors="coerce")

# - Split the 'TO_BE_SPLITTED' column
df = df["TO_BE_SPLITTED"].str.split(",", n=9, expand=True)
# Count number of delimiters in the column to be parsed
# --> Some first rows are corrupted, so count the most frequent occurrence
possible_delimiters, counts = np.unique(df["TO_BE_SPLITTED"].str.count(","), return_counts=True)
n_delimiters = possible_delimiters[np.argmax(counts)]

# - Assign column names
column_names = [
"station_name",
"sensor_status",
"sensor_temperature",
"number_particles",
"rainfall_rate_32bit",
"reflectivity_16bit",
"mor_visibility",
"weather_code_synop_4680",
"weather_code_synop_4677",
"raw_drop_number",
]
df.columns = column_names

# - Add the time column
df["time"] = df_time
if n_delimiters == 1031: # first files
# - Select valid rows
df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1031]
# - Get time column
df_time = df["time"]
# - Split the 'TO_BE_SPLITTED' column
df = df["TO_BE_SPLITTED"].str.split(",", expand=True, n=7)
# - Assign column names
column_names = [
"station_name",
"sensor_status",
"sensor_temperature",
"reflectivity_32bit",
"mor_visibility",
"weather_code_synop_4680",
"weather_code_synop_4677",
"raw_drop_number",
]
df.columns = column_names
# - Add time column
df["time"] = df_time
# - Remove columns not in other files
df = df.drop(columns="reflectivity_32bit")
# - Add missing columns and set NaN value
missing_columns = [
"number_particles",
"rainfall_rate_32bit",
"reflectivity_16bit",
]
for column in missing_columns:
df[column] = "NaN"
elif n_delimiters == 1033: # (most of the files)
# - Select valid rows
df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1033]
# - Get time column
df_time = df["time"]
# - Split the column be parsed
df = df["TO_BE_SPLITTED"].str.split(",", expand=True, n=9)
# - Assign column names
column_names = [
"station_name",
"sensor_status",
"sensor_temperature",
"number_particles",
"rainfall_rate_32bit",
"reflectivity_16bit",
"mor_visibility",
"weather_code_synop_4680",
"weather_code_synop_4677",
"raw_drop_number",
]
df.columns = column_names
# - Add time column
df["time"] = df_time
elif n_delimiters == 1035: # APU 17 first files
# - Select valid rows
df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1035]
# - Get time column
df_time = df["time"]
# - Split the column be parsed
df = df["TO_BE_SPLITTED"].str.split(",", expand=True, n=11)
# - Assign column names
column_names = [
"station_name",
"sensor_date",
"sensor_time",
"sensor_status",
"sensor_temperature",
"number_particles",
"rainfall_rate_32bit",
"reflectivity_16bit",
"mor_visibility",
"weather_code_synop_4680",
"weather_code_synop_4677",
"raw_drop_number",
]
df.columns = column_names
# - Add time column
df["time"] = df_time
# - Drop columns not needed
df = df.drop(columns=["sensor_time", "sensor_date"])
else:
# Wrong number of delimiters ... likely a corrupted file
raise ValueError("Unexpected number of comma delimiters !")

# - Drop columns not agreeing with DISDRODB L0 standards
df = df.drop(columns=["station_name"])

# - Drop rows with invalid values
# --> Ensure that weather_code_synop_4677 has length 2
# --> If a previous column is missing it will have 000
df = df[df["weather_code_synop_4677"].str.len() == 2]

return df

##------------------------------------------------------------------------.
Expand Down
33 changes: 32 additions & 1 deletion disdrodb/l0/readers/GPM/NSSTC.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def df_sanitizer_fun(df):
possible_delimiters, counts = np.unique(df["TO_BE_SPLITTED"].str.count(","), return_counts=True)
n_delimiters = possible_delimiters[np.argmax(counts)]

if n_delimiters == 1027:
if n_delimiters == 1027: # APU 2010
# - Select valid rows
df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1027]
# - Get time column
Expand Down Expand Up @@ -110,6 +110,37 @@ def df_sanitizer_fun(df):
]
for column in missing_columns:
df[column] = "NaN"
elif n_delimiters == 1031: # APU08 (2011)
# - Select valid rows
df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1031]
# - Get time column
df_time = df["time"]
# - Split the 'TO_BE_SPLITTED' column
df = df["TO_BE_SPLITTED"].str.split(",", expand=True, n=7)
# - Assign column names
column_names = [
"station_name",
"sensor_status",
"sensor_temperature",
"reflectivity_32bit",
"mor_visibility",
"weather_code_synop_4680",
"weather_code_synop_4677",
"raw_drop_number",
]
df.columns = column_names
# - Add time column
df["time"] = df_time
# - Remove columns not in other files
df = df.drop(columns="reflectivity_32bit")
# - Add missing columns and set NaN value
missing_columns = [
"number_particles",
"rainfall_rate_32bit",
"reflectivity_16bit",
]
for column in missing_columns:
df[column] = "NaN"
elif n_delimiters == 1033:
# - Select valid rows
df = df.loc[df["TO_BE_SPLITTED"].str.count(",") == 1033]
Expand Down
4 changes: 1 addition & 3 deletions disdrodb/utils/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Utilities for Dask Distributed computations."""
import os
import logging
import os


def initialize_dask_cluster():
Expand Down Expand Up @@ -60,5 +60,3 @@ def close_dask_cluster(cluster, client):
finally:
# Restore the original log level
logger.setLevel(original_level)
return None

0 comments on commit 68ca587

Please sign in to comment.