Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing open-ended adjustements error #216

Merged
merged 7 commits into from
Dec 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 55 additions & 53 deletions src/pypromice/qc/github_data_issues.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def flagNAN(ds_in,
ds : xr.Dataset
Level 0 data with flagged data
'''
ds = ds_in.copy()
ds = ds_in.copy(deep=True)
df = None

df = _getDF(flag_url + ds.attrs["station_id"] + ".csv",
Expand Down Expand Up @@ -71,7 +71,7 @@ def flagNAN(ds_in,

for v in varlist:
if v in list(ds.keys()):
logger.info(f'---> flagging {t0} {t1} {v}')
logger.info(f'---> flagging {v} between {t0} and {t1}')
ds[v] = ds[v].where((ds['time'] < t0) | (ds['time'] > t1))
else:
logger.info(f'---> could not flag {v} not in dataset')
Expand Down Expand Up @@ -99,7 +99,7 @@ def adjustTime(ds,
ds : xr.Dataset
Level 0 data with flagged data
'''
ds_out = ds.copy()
ds_out = ds.copy(deep=True)
adj_info=None

adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv",
Expand Down Expand Up @@ -165,7 +165,7 @@ def adjustData(ds,
ds : xr.Dataset
Level 0 data with flagged data
'''
ds_out = ds.copy()
ds_out = ds.copy(deep=True)
adj_info=None
adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv",
os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"),
Expand All @@ -176,13 +176,11 @@ def adjustData(ds,
# removing potential time shifts from the adjustment list
adj_info = adj_info.loc[adj_info.adjust_function != "time_shift", :]

# if t1 is left empty, then adjustment is applied until the end of the file
adj_info.loc[adj_info.t0.isnull(), "t0"] = ds_out.time.values[0]
adj_info.loc[adj_info.t1.isnull(), "t1"] = ds_out.time.values[-1]
# making all timestamps timezone naive (compatibility with xarray)
adj_info.t0 = pd.to_datetime(adj_info.t0).dt.tz_localize(None)
adj_info.t1 = pd.to_datetime(adj_info.t1).dt.tz_localize(None)

# making sure that t0 and t1 columns are object dtype then replaceing nan with None
adj_info[['t0','t1']] = adj_info[['t0','t1']].astype(object)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why you cast the type to object. I suppose the dynamically inferred types are either

  • str if all the values in the columns are strings
  • float if all the values in the columns inferred as nan
  • object if the values in the columns have different types

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that, along with your next comment, it is a matter of personal preference:

I prefer replacing all the missing dates by None in two single-line vectorized call (l.181-182) rather than having ifs in a for loop. But before I do that, I need to make sure that the t0 and t1 columns can accommodate None. If a t0 or t1 column has a float type and I try to replace the values by None pandas actually coerces those None into their float equivalent: np.nan. This causes problem later as slice(np.nan, np.nan) fails. When t0 and t1 are of object type, then replacing some of their values by None will keep the None as NoneType.

adj_info.loc[adj_info.t1.isnull()|(adj_info.t1==''), "t1"] = None
adj_info.loc[adj_info.t0.isnull()|(adj_info.t0==''), "t0"] = None
Comment on lines +180 to +182
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t0 and t1 are already checked in line 219:222 except for the empty string and np.nan
Consider instead

                if isinstance(t0, str) and t0 != '':
                    t0 = pd.to_datetime(t0, utc=True).tz_localize(None)
                else:
                    t0 = None
                if isinstance(t1, str) and t1 != '':
                    t1 = pd.to_datetime(t1, utc=True).tz_localize(None)
                else:
                    t1 = None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big fan of if/else in for loop. I'd rather prepare the adj_info table before looping through it.
Now I see that I have used part of your suggestion l.219-222 so I admit it is currently a bit hybrid.


# if "*" is in the variable name then we interpret it as regex
selec = adj_info['variable'].str.contains('\*') & (adj_info['variable'] != "*")
for ind in adj_info.loc[selec, :].index:
Expand Down Expand Up @@ -217,88 +215,92 @@ def adjustData(ds,
adj_info.loc[var].adjust_function,
adj_info.loc[var].adjust_value,
):
if (t0 > pd.to_datetime(ds_out.time.values[-1])) | (t1 < pd.to_datetime(ds_out.time.values[0])):
# making all timestamps timezone naive (compatibility with xarray)
if isinstance(t0, str):
t0 = pd.to_datetime(t0, utc=True).tz_localize(None)
if isinstance(t1, str):
t1 = pd.to_datetime(t1, utc=True).tz_localize(None)

index_slice = dict(time=slice(t0, t1))

if len(ds_out[var].loc[index_slice].time.time) == 0:
logger.info("Time range does not intersect with dataset")
continue
logger.info(f'---> {t0} {t1} {var} {func} {val}')

logger.info(f'---> adjusting {var} between {t0} and {t1} ({func} {val})')

if func == "add":
ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values + val
ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice].values + val
# flagging adjusted values
# if var + "_adj_flag" not in ds_out.columns:
# ds_out[var + "_adj_flag"] = 0
# msk = ds_out[var].loc[dict(time=slice(t0, t1))])].notnull()
# ind = ds_out[var].loc[dict(time=slice(t0, t1))])].loc[msk].time
# msk = ds_out[var].loc[index_slice])].notnull()
# ind = ds_out[var].loc[index_slice])].loc[msk].time
# ds_out.loc[ind, var + "_adj_flag"] = 1

if func == "multiply":
ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values * val
ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice].values * val
if "DW" in var:
ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))] % 360
ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice] % 360
# flagging adjusted values
# if var + "_adj_flag" not in ds_out.columns:
# ds_out[var + "_adj_flag"] = 0
# msk = ds_out[var].loc[dict(time=slice(t0, t1))].notnull()
# ind = ds_out[var].loc[dict(time=slice(t0, t1))].loc[msk].time
# msk = ds_out[var].loc[index_slice].notnull()
# ind = ds_out[var].loc[index_slice].loc[msk].time
# ds_out.loc[ind, var + "_adj_flag"] = 1

if func == "min_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values
tmp = ds_out[var].loc[index_slice].values
tmp[tmp < val] = np.nan

if func == "max_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values
tmp = ds_out[var].loc[index_slice].values
tmp[tmp > val] = np.nan
ds_out[var].loc[dict(time=slice(t0, t1))] = tmp
ds_out[var].loc[index_slice] = tmp

if func == "upper_perc_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy()
df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").quantile(1 - val / 100)
df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").var()
tmp = ds_out[var].loc[index_slice].copy()
df_w = ds_out[var].loc[index_slice].resample(time="14D").quantile(1 - val / 100)
df_w = ds_out[var].loc[index_slice].resample(time="14D").var()
for m_start, m_end in zip(df_w.time[:-2], df_w.time[1:]):
msk = (tmp.time >= m_start) & (tmp.time < m_end)
values_month = tmp.loc[msk].values
values_month[values_month < df_w.loc[m_start]] = np.nan
tmp.loc[msk] = values_month

ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values
ds_out[var].loc[index_slice] = tmp.values

if func == "biweekly_upper_range_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy()
df_max = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").max()
for m_start, m_end in zip(df_max.time[:-2], df_max.time[1:]):
msk = (tmp.time >= m_start) & (tmp.time < m_end)
lim = df_max.loc[m_start] - val
values_month = tmp.loc[msk].values
values_month[values_month < lim] = np.nan
tmp.loc[msk] = values_month
# remaining samples following outside of the last 2 weeks window
msk = tmp.time >= m_end
lim = df_max.loc[m_start] - val
values_month = tmp.loc[msk].values
values_month[values_month < lim] = np.nan
tmp.loc[msk] = values_month
# updating original pandas
ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values
df_max = (
ds_out[var].loc[index_slice].copy(deep=True)
.resample(time="14D",offset='7D').max()
.sel(time=ds_out[var].loc[index_slice].time.values, method='ffill')
)
df_max['time'] = ds_out[var].loc[index_slice].time
# updating original pandas
ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice].where(ds_out[var].loc[index_slice] > df_max-val)


if func == "hampel_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))]
tmp = ds_out[var].loc[index_slice]
tmp = _hampel(tmp, k=7 * 24, t0=val)
ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values
ds_out[var].loc[index_slice] = tmp.values

if func == "grad_filter":
tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy()
msk = ds_out[var].loc[dict(time=slice(t0, t1))].copy().diff()
tmp = ds_out[var].loc[index_slice].copy()
msk = ds_out[var].loc[index_slice].copy().diff()
tmp[np.roll(msk.abs() > val, -1)] = np.nan
ds_out[var].loc[dict(time=slice(t0, t1))] = tmp
ds_out[var].loc[index_slice] = tmp

if "swap_with_" in func:
var2 = func[10:]
val_var = ds_out[var].loc[dict(time=slice(t0, t1))].values.copy()
val_var2 = ds_out[var2].loc[dict(time=slice(t0, t1))].values.copy()
ds_out[var2].loc[dict(time=slice(t0, t1))] = val_var
ds_out[var].loc[dict(time=slice(t0, t1))] = val_var2
val_var = ds_out[var].loc[index_slice].values.copy()
val_var2 = ds_out[var2].loc[index_slice].values.copy()
ds_out[var2].loc[index_slice] = val_var
ds_out[var].loc[index_slice] = val_var2

if func == "rotate":
ds_out[var].loc[dict(time=slice(t0, t1))] = (ds_out[var].loc[dict(time=slice(t0, t1))].values + val) % 360
ds_out[var].loc[index_slice] = (ds_out[var].loc[index_slice].values + val) % 360

return ds_out

Expand Down