Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing open-ended adjustements error #216

Merged
merged 7 commits into from
Dec 20, 2023
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 42 additions & 38 deletions src/pypromice/qc/github_data_issues.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def flagNAN(ds_in,
ds : xr.Dataset
Level 0 data with flagged data
'''
ds = ds_in.copy()
ds = ds_in.copy(deep=True)
df = None

df = _getDF(flag_url + ds.attrs["station_id"] + ".csv",
Expand Down Expand Up @@ -99,7 +99,7 @@ def adjustTime(ds,
ds : xr.Dataset
Level 0 data with flagged data
'''
ds_out = ds.copy()
ds_out = ds.copy(deep=True)
adj_info=None

adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv",
Expand Down Expand Up @@ -165,7 +165,7 @@ def adjustData(ds,
ds : xr.Dataset
Level 0 data with flagged data
'''
ds_out = ds.copy()
ds_out = ds.copy(deep=True)
adj_info=None
adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv",
os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"),
Expand All @@ -176,13 +176,6 @@ def adjustData(ds,
# removing potential time shifts from the adjustment list
adj_info = adj_info.loc[adj_info.adjust_function != "time_shift", :]

# if t1 is left empty, then adjustment is applied until the end of the file
adj_info.loc[adj_info.t0.isnull(), "t0"] = ds_out.time.values[0]
adj_info.loc[adj_info.t1.isnull(), "t1"] = ds_out.time.values[-1]
# making all timestamps timezone naive (compatibility with xarray)
adj_info.t0 = pd.to_datetime(adj_info.t0).dt.tz_localize(None)
adj_info.t1 = pd.to_datetime(adj_info.t1).dt.tz_localize(None)

# if "*" is in the variable name then we interpret it as regex
selec = adj_info['variable'].str.contains('\*') & (adj_info['variable'] != "*")
for ind in adj_info.loc[selec, :].index:
Expand Down Expand Up @@ -217,53 +210,64 @@ def adjustData(ds,
adj_info.loc[var].adjust_function,
adj_info.loc[var].adjust_value,
):
if (t0 > pd.to_datetime(ds_out.time.values[-1])) | (t1 < pd.to_datetime(ds_out.time.values[0])):
# making all timestamps timezone naive (compatibility with xarray)
if isinstance(t0, str):
t0 = pd.to_datetime(t0, utc=True).tz_localize(None)
if isinstance(t1, str):
t1 = pd.to_datetime(t1, utc=True).tz_localize(None)

index_slice = dict(time=slice(t0, t1))

if len(ds_out[var].loc[index_slice].time.time) == 0:
logger.info("Time range does not intersect with dataset")
continue

logger.info(f'---> {t0} {t1} {var} {func} {val}')

if func == "add":
ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values + val
ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice].values + val
# flagging adjusted values
# if var + "_adj_flag" not in ds_out.columns:
# ds_out[var + "_adj_flag"] = 0
# msk = ds_out[var].loc[dict(time=slice(t0, t1))])].notnull()
# ind = ds_out[var].loc[dict(time=slice(t0, t1))])].loc[msk].time
# msk = ds_out[var].loc[index_slice])].notnull()
# ind = ds_out[var].loc[index_slice])].loc[msk].time
# ds_out.loc[ind, var + "_adj_flag"] = 1

if func == "multiply":
ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values * val
ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice].values * val
if "DW" in var:
ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))] % 360
ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice] % 360
# flagging adjusted values
# if var + "_adj_flag" not in ds_out.columns:
# ds_out[var + "_adj_flag"] = 0
# msk = ds_out[var].loc[dict(time=slice(t0, t1))].notnull()
# ind = ds_out[var].loc[dict(time=slice(t0, t1))].loc[msk].time
# msk = ds_out[var].loc[index_slice].notnull()
# ind = ds_out[var].loc[index_slice].loc[msk].time
# ds_out.loc[ind, var + "_adj_flag"] = 1

if func == "min_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values
tmp = ds_out[var].loc[index_slice].values
tmp[tmp < val] = np.nan

if func == "max_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values
tmp = ds_out[var].loc[index_slice].values
tmp[tmp > val] = np.nan
ds_out[var].loc[dict(time=slice(t0, t1))] = tmp
ds_out[var].loc[index_slice] = tmp

if func == "upper_perc_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy()
df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").quantile(1 - val / 100)
df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").var()
tmp = ds_out[var].loc[index_slice].copy()
df_w = ds_out[var].loc[index_slice].resample("14D").quantile(1 - val / 100)
df_w = ds_out[var].loc[index_slice].resample("14D").var()
for m_start, m_end in zip(df_w.time[:-2], df_w.time[1:]):
msk = (tmp.time >= m_start) & (tmp.time < m_end)
values_month = tmp.loc[msk].values
values_month[values_month < df_w.loc[m_start]] = np.nan
tmp.loc[msk] = values_month

ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values
ds_out[var].loc[index_slice] = tmp.values

if func == "biweekly_upper_range_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy()
df_max = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").max()
tmp = ds_out[var].loc[index_slice].copy()
df_max = ds_out[var].loc[index_slice].resample("14D").max()
for m_start, m_end in zip(df_max.time[:-2], df_max.time[1:]):
msk = (tmp.time >= m_start) & (tmp.time < m_end)
lim = df_max.loc[m_start] - val
Expand All @@ -277,28 +281,28 @@ def adjustData(ds,
values_month[values_month < lim] = np.nan
tmp.loc[msk] = values_month
# updating original pandas
ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values
ds_out[var].loc[index_slice] = tmp.values

if func == "hampel_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))]
tmp = ds_out[var].loc[index_slice]
tmp = _hampel(tmp, k=7 * 24, t0=val)
ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values
ds_out[var].loc[index_slice] = tmp.values

if func == "grad_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy()
msk = ds_out[var].loc[dict(time=slice(t0, t1))].copy().diff()
tmp = ds_out[var].loc[index_slice].copy()
msk = ds_out[var].loc[index_slice].copy().diff()
tmp[np.roll(msk.abs() > val, -1)] = np.nan
ds_out[var].loc[dict(time=slice(t0, t1))] = tmp
ds_out[var].loc[index_slice] = tmp

if "swap_with_" in func:
var2 = func[10:]
val_var = ds_out[var].loc[dict(time=slice(t0, t1))].values.copy()
val_var2 = ds_out[var2].loc[dict(time=slice(t0, t1))].values.copy()
ds_out[var2].loc[dict(time=slice(t0, t1))] = val_var
ds_out[var].loc[dict(time=slice(t0, t1))] = val_var2
val_var = ds_out[var].loc[index_slice].values.copy()
val_var2 = ds_out[var2].loc[index_slice].values.copy()
ds_out[var2].loc[index_slice] = val_var
ds_out[var].loc[index_slice] = val_var2

if func == "rotate":
ds_out[var].loc[dict(time=slice(t0, t1))] = (ds_out[var].loc[dict(time=slice(t0, t1))].values + val) % 360
ds_out[var].loc[index_slice] = (ds_out[var].loc[index_slice].values + val) % 360

return ds_out

Expand Down