Skip to content

Rewrite TimeSeriesImputerTransform to work without per-segment wrapper #1293

Merged
merged 7 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Update requirement for `ruptures`, add requirement for `sqlalchemy` ([#1276](https://github.com/tinkoff-ai/etna/pull/1276))
- Optimize `make_samples` of `RNNNet` and `MLPNet` ([#1281](https://github.com/tinkoff-ai/etna/pull/1281))
- Remove `to_be_fixed` from inference tests on `SpecialDaysTransform` ([#1283](https://github.com/tinkoff-ai/etna/pull/1283))
-
- Rewrite `TimeSeriesImputerTransform` to work without per-segment wrapper ([#1293](https://github.com/tinkoff-ai/etna/pull/1293))
-
-
- Add default `params_to_tune` for catboost models ([#1185](https://github.com/tinkoff-ai/etna/pull/1185))
Expand Down
288 changes: 122 additions & 166 deletions etna/transforms/missing_values/imputation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import cast

import numpy as np
import pandas as pd

from etna.distributions import BaseDistribution
from etna.distributions import CategoricalDistribution
from etna.distributions import IntDistribution
from etna.transforms.base import OneSegmentTransform
from etna.transforms.base import ReversiblePerSegmentWrapper
from etna.transforms.base import ReversibleTransform
from etna.transforms.utils import check_new_segments


class ImputerMode(str, Enum):
Expand All @@ -22,29 +23,39 @@ class ImputerMode(str, Enum):
seasonal = "seasonal"
constant = "constant"

@classmethod
def _missing_(cls, value):
raise NotImplementedError(
f"{value} is not a valid {cls.__name__}. Supported strategies: {', '.join([repr(m.value) for m in cls])}"
)


class _OneSegmentTimeSeriesImputerTransform(OneSegmentTransform):
"""One segment version of transform to fill NaNs in series of a given dataframe.
class TimeSeriesImputerTransform(ReversibleTransform):
"""Transform to fill NaNs in series of a given dataframe.

- It is assumed that given series begins with first non NaN value.

- This transform can't fill NaNs in the future, only on train data.

- This transform can't fill NaNs if all values are NaNs. In this case exception is raised.

Warning
-------
This transform can suffer from look-ahead bias in 'mean' mode. For transforming data at some timestamp
it uses information from the whole train part.
"""

def __init__(
self,
in_column: str,
strategy: str,
window: int,
seasonality: int,
default_value: Optional[float],
in_column: str = "target",
strategy: str = ImputerMode.constant,
window: int = -1,
seasonality: int = 1,
default_value: Optional[float] = None,
constant_value: float = 0,
):
"""
Create instance of _OneSegmentTimeSeriesImputerTransform.
Create instance of TimeSeriesImputerTransform.

Parameters
----------
Expand Down Expand Up @@ -82,204 +93,149 @@ def __init__(
ValueError:
if incorrect strategy given
"""
super().__init__(required_features=[in_column])
self.in_column = in_column
self.strategy = ImputerMode(strategy)
self.strategy = strategy
brsnw250 marked this conversation as resolved.
Show resolved Hide resolved
self.window = window
self.seasonality = seasonality
self.default_value = default_value
self.constant_value = constant_value
self.fill_value: Optional[float] = None
self.nan_timestamps: Optional[List[pd.Timestamp]] = None
self._strategy = ImputerMode(strategy)
self._fill_value: Optional[Dict[str, float]] = None
self._nan_timestamps: Optional[Dict[str, List[pd.Timestamp]]] = None

def fit(self, df: pd.DataFrame) -> "_OneSegmentTimeSeriesImputerTransform":
"""
Fit preprocess params.
def get_regressors_info(self) -> List[str]:
"""Return the list with regressors created by the transform."""
return []

def _fit(self, df: pd.DataFrame):
"""Fit the transform.

Parameters
----------
df: pd.DataFrame
dataframe with series to fit preprocess params with

Returns
-------
self: _OneSegmentTimeSeriesImputerTransform
fitted preprocess
df:
Dataframe in etna wide format.
"""
raw_series = df[self.in_column]
if np.all(raw_series.isna()):
segments = sorted(set(df.columns.get_level_values("segment")))
features = df.loc[:, pd.IndexSlice[segments, self.in_column]]
if features.isna().all().any():
raise ValueError("Series hasn't non NaN values which means it is empty and can't be filled.")
series = raw_series[raw_series.first_valid_index() :]
self.nan_timestamps = series[series.isna()].index
if self.strategy == ImputerMode.constant:
self.fill_value = self.constant_value
elif self.strategy == ImputerMode.mean:
self.fill_value = series.mean()
return self

def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Transform given series.

nan_timestamps = {}
for segment in segments:
series = features.loc[:, pd.IndexSlice[segment, self.in_column]]
series = series[series.first_valid_index() :]
nan_timestamps[segment] = series[series.isna()].index

fill_value = {}
if self._strategy is ImputerMode.mean:
mean_values = features.mean().to_dict()
# take only segment from multiindex key
mean_values = {key[0]: value for key, value in mean_values.items()}
fill_value = mean_values

self._nan_timestamps = nan_timestamps
self._fill_value = fill_value

def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Transform dataframe.

Parameters
----------
df: pd.Dataframe
transform ``in_column`` series of given dataframe
df:
Dataframe in etna wide format.

Returns
-------
result: pd.DataFrame
dataframe with in_column series with filled gaps
:
Transformed Dataframe in etna wide format.
"""
result_df = df
cur_nans = result_df[result_df[self.in_column].isna()].index
if self._fill_value is None or self._nan_timestamps is None:
raise ValueError("Transform is not fitted!")

result_df[self.in_column] = self._fill(result_df[self.in_column])
segments = sorted(set(df.columns.get_level_values("segment")))
check_new_segments(transform_segments=segments, fit_segments=self._nan_timestamps.keys())

# restore nans not in self.nan_timestamps
restore_nans = cur_nans.difference(self.nan_timestamps)
result_df.loc[restore_nans, self.in_column] = np.nan
cur_nans = {}
for segment in segments:
series = df.loc[:, pd.IndexSlice[segment, self.in_column]]
cur_nans[segment] = series[series.isna()].index

return result_df

def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Inverse transform dataframe.
result_df = self._fill(df)

Parameters
----------
df: pd.Dataframe
inverse transform ``in_column`` series of given dataframe
# restore nans not in self.nan_timestamps
for segment in segments:
restore_nans = cur_nans[segment].difference(self._nan_timestamps[segment])
result_df.loc[restore_nans, pd.IndexSlice[segment, self.in_column]] = np.nan

Returns
-------
result: pd.DataFrame
dataframe with in_column series with initial values
"""
result_df = df
index = result_df.index.intersection(self.nan_timestamps)
result_df.loc[index, self.in_column] = np.nan
return result_df

def _fill(self, df: pd.Series) -> pd.Series:
"""
Create new Series taking all previous dates and adding missing dates.
def _fill(self, df: pd.DataFrame) -> pd.DataFrame:
"""Fill the NaNs in a given Dataframe.

Fills missed values for new dates according to ``self.strategy``

Parameters
----------
df: pd.Series
series to fill
df:
dataframe to fill

Returns
-------
result: pd.Series
:
Filled Dataframe.
"""
if self.nan_timestamps is None:
raise ValueError("Trying to apply the unfitted transform! First fit the transform.")

if self.strategy == ImputerMode.mean or self.strategy == ImputerMode.constant:
df = df.fillna(value=self.fill_value)
elif self.strategy == ImputerMode.forward_fill:
df = df.fillna(method="ffill")
elif self.strategy == ImputerMode.running_mean or self.strategy == ImputerMode.seasonal:
history = self.seasonality * self.window if self.window != -1 else len(df)
timestamps = list(df.index)
for timestamp in self.nan_timestamps:
i = timestamps.index(timestamp)
indexes = np.arange(i - self.seasonality, i - self.seasonality - history, -self.seasonality)
indexes = indexes[indexes >= 0]
df.iloc[i] = np.nanmean(df.iloc[indexes])

if self.default_value:
df = df.fillna(value=self.default_value)
self._fill_value = cast(Dict[str, float], self._fill_value)
self._nan_timestamps = cast(Dict[str, List[pd.Timestamp]], self._nan_timestamps)
segments = sorted(set(df.columns.get_level_values("segment")))

if self._strategy is ImputerMode.constant:
new_values = df.loc[:, pd.IndexSlice[:, self.in_column]].fillna(value=self.constant_value)
df.loc[:, pd.IndexSlice[:, self.in_column]] = new_values
elif self._strategy is ImputerMode.forward_fill:
new_values = df.loc[:, pd.IndexSlice[:, self.in_column]].fillna(method="ffill")
df.loc[:, pd.IndexSlice[:, self.in_column]] = new_values
elif self._strategy is ImputerMode.mean:
for segment in segments:
df.loc[:, pd.IndexSlice[segment, self.in_column]].fillna(value=self._fill_value[segment], inplace=True)
elif self._strategy is ImputerMode.running_mean or self._strategy is ImputerMode.seasonal:
timestamp_to_index = {timestamp: i for i, timestamp in enumerate(df.index)}
for segment in segments:
history = self.seasonality * self.window if self.window != -1 else len(df)
for timestamp in self._nan_timestamps[segment]:
i = timestamp_to_index[timestamp]
indexes = np.arange(i - self.seasonality, i - self.seasonality - history, -self.seasonality)
indexes = indexes[indexes >= 0]
values = df.loc[df.index[indexes], pd.IndexSlice[segment, self.in_column]]
df.loc[timestamp, pd.IndexSlice[segment, self.in_column]] = np.nanmean(values)

if self.default_value is not None:
df.fillna(value=self.default_value, inplace=True)
return df


class TimeSeriesImputerTransform(ReversiblePerSegmentWrapper):
"""Transform to fill NaNs in series of a given dataframe.

- It is assumed that given series begins with first non NaN value.

- This transform can't fill NaNs in the future, only on train data.

- This transform can't fill NaNs if all values are NaNs. In this case exception is raised.

Warning
-------
This transform can suffer from look-ahead bias in 'mean' mode. For transforming data at some timestamp
it uses information from the whole train part.
"""

def __init__(
self,
in_column: str = "target",
strategy: str = ImputerMode.constant,
window: int = -1,
seasonality: int = 1,
default_value: Optional[float] = None,
constant_value: float = 0,
):
"""
Create instance of TimeSeriesImputerTransform.
def _inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Inverse transform dataframe.

Parameters
----------
in_column:
name of processed column
strategy:
filling value in missing timestamps:

- If "mean", then replace missing dates using the mean in fit stage.

- If "running_mean" then replace missing dates using mean of subset of data

- If "forward_fill" then replace missing dates using last existing value

- If "seasonal" then replace missing dates using seasonal moving average

- If "constant" then replace missing dates using constant value.

window:
In case of moving average and seasonality.

* If ``window=-1`` all previous dates are taken in account
df:
Dataframe to be inverse transformed.

* Otherwise only window previous dates

seasonality:
the length of the seasonality
default_value:
value which will be used to impute the NaNs left after applying the imputer with the chosen strategy
constant_value:
value to fill gaps in "constant" strategy

Raises
------
ValueError:
if incorrect strategy given
Returns
-------
:
Dataframe after applying inverse transformation.
"""
self.in_column = in_column
self.strategy = strategy
self.window = window
self.seasonality = seasonality
self.default_value = default_value
self.constant_value = constant_value
super().__init__(
transform=_OneSegmentTimeSeriesImputerTransform(
in_column=self.in_column,
strategy=self.strategy,
window=self.window,
seasonality=self.seasonality,
default_value=self.default_value,
constant_value=self.constant_value,
),
required_features=[self.in_column],
)
if self._fill_value is None or self._nan_timestamps is None:
raise ValueError("Transform is not fitted!")

def get_regressors_info(self) -> List[str]:
"""Return the list with regressors created by the transform."""
return []
segments = sorted(set(df.columns.get_level_values("segment")))
check_new_segments(transform_segments=segments, fit_segments=self._nan_timestamps.keys())

for segment in segments:
index = df.index.intersection(self._nan_timestamps[segment])
df.loc[index, pd.IndexSlice[segment, self.in_column]] = np.NaN
return df

def params_to_tune(self) -> Dict[str, BaseDistribution]:
"""Get default grid for tuning hyperparameters.
Expand Down
4 changes: 2 additions & 2 deletions etna/transforms/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import reprlib
from typing import List
from typing import Iterable
from typing import Optional

from etna.datasets.utils import inverse_transform_target_components # noqa: F401
from etna.datasets.utils import match_target_quantiles # noqa: F401


def check_new_segments(transform_segments: List[str], fit_segments: Optional[List[str]]):
def check_new_segments(transform_segments: Iterable[str], fit_segments: Optional[Iterable[str]]):
"""Check if there are any new segments that weren't present during training."""
if fit_segments is None:
raise ValueError("Transform is not fitted!")
Expand Down
Loading