Skip to content

Make TimeSeriesImputerTransform vectorized #760

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 103 additions & 144 deletions etna/transforms/missing_values/imputation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from enum import Enum
from typing import Dict
from typing import List
from typing import Optional

import numpy as np
import pandas as pd

from etna.transforms.base import PerSegmentWrapper
from etna.transforms.base import Transform


Expand All @@ -19,20 +19,31 @@ class ImputerMode(str, Enum):
seasonal = "seasonal"


class _OneSegmentTimeSeriesImputerTransform(Transform):
"""One segment version of transform to fill NaNs in series of a given dataframe.
class TimeSeriesImputerTransform(Transform):
"""Transform to fill NaNs in series of a given dataframe.

- It is assumed that given series begins with first non NaN value.

- This transform can't fill NaNs in the future, only on train data.

- This transform can't fill NaNs if all values are NaNs. In this case exception is raised.

Warning
-------
This transform can suffer from look-ahead bias in 'mean' mode. For transforming data at some timestamp
it uses information from the whole train part.
"""

def __init__(self, in_column: str, strategy: str, window: int, seasonality: int, default_value: Optional[float]):
def __init__(
self,
in_column: str = "target",
strategy: str = ImputerMode.zero.value,
window: int = -1,
seasonality: int = 1,
default_value: Optional[float] = None,
):
"""
Create instance of _OneSegmentTimeSeriesImputerTransform.
Create instance of TimeSeriesImputerTransform.

Parameters
----------
Expand Down Expand Up @@ -69,192 +80,140 @@ def __init__(self, in_column: str, strategy: str, window: int, seasonality: int,
if incorrect strategy given
"""
self.in_column = in_column
self.strategy = ImputerMode(strategy)
self.strategy = strategy
self.window = window
self.seasonality = seasonality
self.default_value = default_value
self.fill_value: Optional[int] = None
self.nan_timestamps: Optional[List[pd.Timestamp]] = None
self._strategy = ImputerMode(strategy)
self._fill_value: Dict[str, int] = {}
self._nan_timestamps: Dict[str, List[pd.Timestamp]] = {}

def fit(self, df: pd.DataFrame) -> "_OneSegmentTimeSeriesImputerTransform":
"""
Fit preprocess params.
def fit(self, df: pd.DataFrame) -> "TimeSeriesImputerTransform":
"""Fit params.

Parameters
----------
df: pd.DataFrame
dataframe with series to fit preprocess params with
df:
dataframe with data.

Returns
-------
self: _OneSegmentTimeSeriesImputerTransform
fitted preprocess
result: TimeSeriesImputerTransform
"""
raw_series = df[self.in_column]
if np.all(raw_series.isna()):
segments = sorted(set(df.columns.get_level_values("segment")))
features = df.loc[:, pd.IndexSlice[segments, self.in_column]]
if features.isna().all().any():
raise ValueError("Series hasn't non NaN values which means it is empty and can't be filled.")
series = raw_series[raw_series.first_valid_index() :]
self.nan_timestamps = series[series.isna()].index
if self.strategy == ImputerMode.zero:
self.fill_value = 0
elif self.strategy == ImputerMode.mean:
self.fill_value = series.mean()

for segment in segments:
series = features.loc[:, pd.IndexSlice[segment, self.in_column]]
series = series[series.first_valid_index() :]
self._nan_timestamps[segment] = series[series.isna()].index

if self._strategy == ImputerMode.mean:
mean_values = features.mean().to_dict()
# take only segment from multiindex key
mean_values = {key[0]: value for key, value in mean_values.items()}
self._fill_value = mean_values

return self

def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Transform given series.
"""Fill nans in the dataset.

Parameters
----------
df: pd.Dataframe
transform ``in_column`` series of given dataframe
df:
dataframe with data to transform.

Returns
-------
result: pd.DataFrame
dataframe with in_column series with filled gaps
result: pd.Dataframe
transformed dataframe
"""
result_df = df.copy()
cur_nans = result_df[result_df[self.in_column].isna()].index

result_df[self.in_column] = self._fill(result_df[self.in_column])
segments = sorted(set(df.columns.get_level_values("segment")))

# restore nans not in self.nan_timestamps
restore_nans = cur_nans.difference(self.nan_timestamps)
result_df.loc[restore_nans, self.in_column] = np.nan
cur_nans = {}
for segment in segments:
series = df.loc[:, pd.IndexSlice[segment, self.in_column]]
cur_nans[segment] = series[series.isna()].index

return result_df
result_df = self._fill(df)

def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Inverse transform dataframe.

Parameters
----------
df: pd.Dataframe
inverse transform ``in_column`` series of given dataframe
# restore nans not in self.nan_timestamps
for segment in segments:
restore_nans = cur_nans[segment].difference(self._nan_timestamps[segment])
result_df.loc[restore_nans, pd.IndexSlice[segment, self.in_column]] = np.nan

Returns
-------
result: pd.DataFrame
dataframe with in_column series with initial values
"""
result_df = df.copy()
index = result_df.index.intersection(self.nan_timestamps)
result_df.loc[index, self.in_column] = np.nan
return result_df

def _fill(self, df: pd.Series) -> pd.Series:
"""
Create new Series taking all previous dates and adding missing dates.
def _fill(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create new Series taking all previous dates and adding missing dates.

Fills missed values for new dates according to ``self.strategy``

Parameters
----------
df: pd.Series
series to fill
df: pd.DataFrame
dataframe to fill

Returns
-------
result: pd.Series
result: pd.DataFrame
"""
if self.nan_timestamps is None:
if len(self._nan_timestamps) == 0:
raise ValueError("Trying to apply the unfitted transform! First fit the transform.")

if self.strategy == ImputerMode.zero or self.strategy == ImputerMode.mean:
df = df.fillna(value=self.fill_value)
elif self.strategy == ImputerMode.forward_fill:
df = df.fillna(method="ffill")
elif self.strategy == ImputerMode.running_mean or self.strategy == ImputerMode.seasonal:
history = self.seasonality * self.window if self.window != -1 else len(df)
timestamps = list(df.index)
for timestamp in self.nan_timestamps:
i = timestamps.index(timestamp)
indexes = np.arange(i - self.seasonality, i - self.seasonality - history, -self.seasonality)
indexes = indexes[indexes >= 0]
df.iloc[i] = np.nanmean(df.iloc[indexes])
segments = sorted(set(df.columns.get_level_values("segment")))
result_df = df.copy(deep=True)

if self._strategy == ImputerMode.zero:
# we can't just do `result_df.fillna(value=0)`, it leads to errors if category dtype is present
result_df.loc[:, pd.IndexSlice[segments, self.in_column]] = result_df.loc[
:, pd.IndexSlice[segments, self.in_column]
].fillna(value=0)
elif self._strategy == ImputerMode.forward_fill:
result_df.fillna(method="ffill", inplace=True)
elif self._strategy == ImputerMode.mean:
for segment in segments:
result_df.loc[:, pd.IndexSlice[segment, self.in_column]].fillna(
value=self._fill_value[segment], inplace=True
)
elif self._strategy == ImputerMode.running_mean or self._strategy == ImputerMode.seasonal:
for segment in segments:
history = self.seasonality * self.window if self.window != -1 else len(df)
timestamps = list(df.index)
for timestamp in self._nan_timestamps[segment]:
i = timestamps.index(timestamp)
indexes = np.arange(i - self.seasonality, i - self.seasonality - history, -self.seasonality)
indexes = indexes[indexes >= 0]
values = result_df.loc[result_df.index[indexes], pd.IndexSlice[segment, self.in_column]]
result_df.loc[timestamp, pd.IndexSlice[segment, self.in_column]] = np.nanmean(values)

if self.default_value:
df = df.fillna(value=self.default_value)
return df


class TimeSeriesImputerTransform(PerSegmentWrapper):
"""Transform to fill NaNs in series of a given dataframe.

- It is assumed that given series begins with first non NaN value.

- This transform can't fill NaNs in the future, only on train data.

- This transform can't fill NaNs if all values are NaNs. In this case exception is raised.

Warning
-------
This transform can suffer from look-ahead bias in 'mean' mode. For transforming data at some timestamp
it uses information from the whole train part.
"""
result_df = result_df.fillna(value=self.default_value)
return result_df

def __init__(
self,
in_column: str = "target",
strategy: str = ImputerMode.zero,
window: int = -1,
seasonality: int = 1,
default_value: Optional[float] = None,
):
"""
Create instance of TimeSeriesImputerTransform.
def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply inverse transformation to the dataset.

Parameters
----------
in_column:
name of processed column
strategy:
filling value in missing timestamps:

- If "zero", then replace missing dates with zeros
df:
dataframe with data to transform.

- If "mean", then replace missing dates using the mean in fit stage.

- If "running_mean" then replace missing dates using mean of subset of data

- If "forward_fill" then replace missing dates using last existing value

- If "seasonal" then replace missing dates using seasonal moving average

window:
In case of moving average and seasonality.

* If ``window=-1`` all previous dates are taken in account

* Otherwise only window previous dates

seasonality:
the length of the seasonality
default_value:
value which will be used to impute the NaNs left after applying the imputer with the chosen strategy

Raises
------
ValueError:
if incorrect strategy given
Returns
-------
result: pd.DataFrame
transformed series
"""
self.in_column = in_column
self.strategy = strategy
self.window = window
self.seasonality = seasonality
self.default_value = default_value
super().__init__(
transform=_OneSegmentTimeSeriesImputerTransform(
in_column=self.in_column,
strategy=self.strategy,
window=self.window,
seasonality=self.seasonality,
default_value=self.default_value,
)
)
segments = sorted(set(df.columns.get_level_values("segment")))
result_df = df.copy()

for segment in segments:
index = result_df.index.intersection(self._nan_timestamps[segment])
result_df.loc[index, pd.IndexSlice[segment, self.in_column]] = np.nan
return result_df


__all__ = ["TimeSeriesImputerTransform"]
21 changes: 11 additions & 10 deletions tests/test_transforms/test_missing_values/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ def all_date_present_df(date_range: pd.Series) -> pd.DataFrame:
"""Create pd.DataFrame that contains some target on given range of dates without gaps."""
df = pd.DataFrame({"timestamp": date_range})
df["target"] = list(range(len(df)))
df.set_index("timestamp", inplace=True)
df["segment"] = "segment_1"
df = TSDataset.to_dataset(df)
return df


@pytest.fixture
def all_date_present_df_two_segments(all_date_present_df: pd.Series) -> pd.DataFrame:
def all_date_present_df_two_segments(all_date_present_df: pd.DataFrame) -> pd.DataFrame:
"""Create pd.DataFrame that contains two segments with some targets on given range of dates without gaps."""
df_1 = all_date_present_df.reset_index()
df_2 = all_date_present_df.copy().reset_index()
df_1 = TSDataset.to_flatten(all_date_present_df)
df_2 = df_1.copy()

df_1["segment"] = "segment_1"
df_2["segment"] = "segment_2"
Expand All @@ -50,8 +51,8 @@ def df_with_missing_value_x_index(random_seed, all_date_present_df: pd.DataFrame
# because Imputer should know starting and ending dates
timestamps = sorted(all_date_present_df.index)[1:-1]
idx = np.random.choice(timestamps)
df = all_date_present_df
df.loc[idx, "target"] = np.NaN
df = all_date_present_df.loc[:, pd.IndexSlice["segment_1", :]]
df.loc[idx, pd.IndexSlice[:, "target"]] = np.NaN
return df, idx


Expand All @@ -60,8 +61,8 @@ def df_with_missing_range_x_index(all_date_present_df: pd.DataFrame) -> Tuple[pd
"""Create pd.DataFrame that contains some target on given range of dates with range of gaps."""
timestamps = sorted(all_date_present_df.index)
rng = timestamps[2:7]
df = all_date_present_df
df.loc[rng, "target"] = np.NaN
df = all_date_present_df.loc[:, pd.IndexSlice["segment_1", :]]
df.loc[rng, pd.IndexSlice[:, "target"]] = np.NaN
return df, rng


Expand All @@ -71,8 +72,8 @@ def df_with_missing_range_x_index_two_segments(
) -> Tuple[pd.DataFrame, list]:
"""Create pd.DataFrame that contains some target on given range of dates with range of gaps."""
df_one_segment, rng = df_with_missing_range_x_index
df_1 = df_one_segment.reset_index()
df_2 = df_one_segment.copy().reset_index()
df_1 = TSDataset.to_flatten(df_one_segment)
df_2 = df_1.copy()
df_1["segment"] = "segment_1"
df_2["segment"] = "segment_2"
classic_df = pd.concat([df_1, df_2], ignore_index=True)
Expand Down
Loading