Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove empty windows during predict #1036

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions nbs/common.base_windows.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,16 @@
"\n",
" # TODO: Hack to compute number of windows\n",
" windows = self._create_windows(batch, step='predict')\n",
" n_windows = len(windows['temporal'])\n",
" y_idx = batch['y_idx']\n",
"\n",
" # remove empty windows to not waste compute\n",
" mask_idx = batch['temporal_cols'].get_loc('available_mask')\n",
" insample_mask = windows['temporal'][:, :self.input_size, mask_idx]\n",
" empty_windows = insample_mask.max(axis=1).values == 0\n",
" windows['temporal'] = windows['temporal'][~empty_windows]\n",
"\n",
" # Number of windows in batch\n",
" n_windows = len(windows['temporal'])\n",
" y_idx = batch['y_idx']\n",
" windows_batch_size = self.inference_windows_batch_size\n",
" if windows_batch_size < 0:\n",
" windows_batch_size = n_windows\n",
Expand All @@ -571,12 +577,14 @@
" # Create and normalize windows [Ws, L+H, C]\n",
" w_idxs = np.arange(i*windows_batch_size, \n",
" min((i+1)*windows_batch_size, n_windows))\n",
" windows = self._create_windows(batch, step='predict', w_idxs=w_idxs)\n",
" windows[\"temporal\"] = windows[\"temporal\"][w_idxs]\n",
" windows = self._normalization(windows=windows, y_idx=y_idx)\n",
"\n",
" # Parse windows\n",
" insample_y, insample_mask, _, _, \\\n",
" hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)\n",
" if stat_exog is not None:\n",
" stat_exog = stat_exog[w_idxs]\n",
"\n",
" # Implicit Quantile Loss\n",
" # if isinstance(self.loss, losses.IQLoss):\n",
Expand Down
40 changes: 11 additions & 29 deletions nbs/core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -923,43 +923,25 @@
" # the cv_times is sorted by window and then id\n",
" fcsts_df = ufp.sort(fcsts_df, [id_col, 'cutoff', time_col])\n",
"\n",
" # allocate output array\n",
" max_windows_per_serie = (np.diff(self.dataset.indptr) - 1) // self.h\n",
" windows_per_serie = np.minimum(max_windows_per_serie, n_windows)\n",
" samples_per_serie = windows_per_serie * self.h\n",
" fcsts = np.empty((samples_per_serie.sum(), len(cols)), dtype=np.float32)\n",
"\n",
" col_idx = 0\n",
" fcsts = np.full((self.dataset.n_groups * self.h * n_windows, len(cols)),\n",
" np.nan, dtype=np.float32)\n",
" \n",
" for model in self.models:\n",
" model.fit(dataset=self.dataset,\n",
" val_size=val_size, \n",
" test_size=test_size)\n",
" model.fit(dataset=self.dataset, val_size=val_size, test_size=test_size)\n",
" model_fcsts = model.predict(self.dataset, step_size=step_size, **data_kwargs)\n",
"\n",
" # Append predictions in memory placeholder\n",
" output_length = len(model.loss.output_names)\n",
" fcsts[:,col_idx:(col_idx + output_length)] = model_fcsts\n",
" col_idx += output_length\n",
" # we may have allocated more space than needed\n",
" # each serie can produce at most (serie.size - 1) // self.h CV windows\n",
" effective_sizes = ufp.counts_by_id(fcsts_df, id_col)['counts'].to_numpy()\n",
" needs_trim = effective_sizes.sum() != fcsts.shape[0]\n",
" if self.scalers_ or needs_trim:\n",
" indptr = np.arange(\n",
" 0,\n",
" n_windows * self.h * (self.dataset.n_groups + 1),\n",
" n_windows * self.h,\n",
" dtype=np.int32,\n",
" )\n",
" if self.scalers_:\n",
" fcsts = self._scalers_target_inverse_transform(fcsts, indptr)\n",
" if needs_trim:\n",
" # we keep only the effective samples of each serie from the cv results\n",
" trimmed = np.empty_like(\n",
" fcsts, shape=(effective_sizes.sum(), fcsts.shape[1])\n",
" )\n",
" cv_indptr = np.append(0, effective_sizes).cumsum(dtype=np.int32)\n",
" for i in range(fcsts.shape[1]):\n",
" ga = GroupedArray(fcsts[:, i], indptr)\n",
" trimmed[:, i] = ga._tails(cv_indptr)\n",
" fcsts = trimmed\n",
"\n",
" if self.scalers_:\n",
" indptr = np.append(0, samples_per_serie.cumsum()).astype(np.int32)\n",
" fcsts = self._scalers_target_inverse_transform(fcsts, indptr)\n",
"\n",
" self._fitted = True\n",
"\n",
Expand Down
14 changes: 11 additions & 3 deletions neuralforecast/common/_base_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,16 @@ def predict_step(self, batch, batch_idx):

# TODO: Hack to compute number of windows
windows = self._create_windows(batch, step="predict")
n_windows = len(windows["temporal"])
y_idx = batch["y_idx"]

# remove empty windows to not waste compute
mask_idx = batch["temporal_cols"].get_loc("available_mask")
insample_mask = windows["temporal"][:, : self.input_size, mask_idx]
empty_windows = insample_mask.max(axis=1).values == 0
windows["temporal"] = windows["temporal"][~empty_windows]

# Number of windows in batch
n_windows = len(windows["temporal"])
y_idx = batch["y_idx"]
windows_batch_size = self.inference_windows_batch_size
if windows_batch_size < 0:
windows_batch_size = n_windows
Expand All @@ -590,13 +596,15 @@ def predict_step(self, batch, batch_idx):
w_idxs = np.arange(
i * windows_batch_size, min((i + 1) * windows_batch_size, n_windows)
)
windows = self._create_windows(batch, step="predict", w_idxs=w_idxs)
windows["temporal"] = windows["temporal"][w_idxs]
windows = self._normalization(windows=windows, y_idx=y_idx)

# Parse windows
insample_y, insample_mask, _, _, hist_exog, futr_exog, stat_exog = (
self._parse_windows(batch, windows)
)
if stat_exog is not None:
stat_exog = stat_exog[w_idxs]

# Implicit Quantile Loss
# if isinstance(self.loss, losses.IQLoss):
Expand Down
39 changes: 10 additions & 29 deletions neuralforecast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,13 +866,13 @@ def _no_refit_cross_validation(
# the cv_times is sorted by window and then id
fcsts_df = ufp.sort(fcsts_df, [id_col, "cutoff", time_col])

col_idx = 0
fcsts = np.full(
(self.dataset.n_groups * self.h * n_windows, len(cols)),
np.nan,
dtype=np.float32,
)
# allocate output array
max_windows_per_serie = (np.diff(self.dataset.indptr) - 1) // self.h
windows_per_serie = np.minimum(max_windows_per_serie, n_windows)
samples_per_serie = windows_per_serie * self.h
fcsts = np.empty((samples_per_serie.sum(), len(cols)), dtype=np.float32)

col_idx = 0
for model in self.models:
model.fit(dataset=self.dataset, val_size=val_size, test_size=test_size)
model_fcsts = model.predict(
Expand All @@ -883,29 +883,10 @@ def _no_refit_cross_validation(
output_length = len(model.loss.output_names)
fcsts[:, col_idx : (col_idx + output_length)] = model_fcsts
col_idx += output_length
# we may have allocated more space than needed
# each serie can produce at most (serie.size - 1) // self.h CV windows
effective_sizes = ufp.counts_by_id(fcsts_df, id_col)["counts"].to_numpy()
needs_trim = effective_sizes.sum() != fcsts.shape[0]
if self.scalers_ or needs_trim:
indptr = np.arange(
0,
n_windows * self.h * (self.dataset.n_groups + 1),
n_windows * self.h,
dtype=np.int32,
)
if self.scalers_:
fcsts = self._scalers_target_inverse_transform(fcsts, indptr)
if needs_trim:
# we keep only the effective samples of each serie from the cv results
trimmed = np.empty_like(
fcsts, shape=(effective_sizes.sum(), fcsts.shape[1])
)
cv_indptr = np.append(0, effective_sizes).cumsum(dtype=np.int32)
for i in range(fcsts.shape[1]):
ga = GroupedArray(fcsts[:, i], indptr)
trimmed[:, i] = ga._tails(cv_indptr)
fcsts = trimmed

if self.scalers_:
indptr = np.append(0, samples_per_serie.cumsum()).astype(np.int32)
fcsts = self._scalers_target_inverse_transform(fcsts, indptr)

self._fitted = True

Expand Down
Loading