diff --git a/CHANGELOG.md b/CHANGELOG.md index f6d561fb4..dc54b2758 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - - - -- +- Add `ChangePointSegmentationTransform`, `RupturesChangePointsModel` ([#821](https://github.com/tinkoff-ai/etna/issues/821)) - - ### Changed diff --git a/etna/analysis/change_points_trend/search.py b/etna/analysis/change_points_trend/search.py index 8c8177beb..3dc91ffa7 100644 --- a/etna/analysis/change_points_trend/search.py +++ b/etna/analysis/change_points_trend/search.py @@ -1,35 +1,12 @@ from typing import Dict from typing import List -import numpy as np import pandas as pd from ruptures.base import BaseEstimator -from ruptures.costs import CostLinear from etna.datasets import TSDataset -def _prepare_signal(series: pd.Series, model: BaseEstimator) -> np.ndarray: - """Prepare series for change point model.""" - signal = series.to_numpy() - if isinstance(model.cost, CostLinear): - signal = signal.reshape((-1, 1)) - return signal - - -def _find_change_points_segment( - series: pd.Series, change_point_model: BaseEstimator, **model_predict_params -) -> List[pd.Timestamp]: - """Find trend change points within one segment.""" - signal = _prepare_signal(series=series, model=change_point_model) - timestamp = series.index - change_point_model.fit(signal=signal) - # last point in change points is the first index after the series - change_points_indices = change_point_model.predict(**model_predict_params)[:-1] - change_points = [timestamp[idx] for idx in change_points_indices] - return change_points - - def find_change_points( ts: TSDataset, in_column: str, change_point_model: BaseEstimator, **model_predict_params ) -> Dict[str, List[pd.Timestamp]]: @@ -51,13 +28,12 @@ def find_change_points( Dict[str, List[pd.Timestamp]] dictionary with list of trend change points for each segment """ + from etna.transforms.decomposition.base_change_points import RupturesChangePointsModel + result: Dict[str, List[pd.Timestamp]] = {} df = ts.to_pandas() + ruptures = RupturesChangePointsModel(change_point_model, **model_predict_params) for segment in ts.segments: df_segment = df[segment] - raw_series = df_segment[in_column] - series = raw_series.loc[raw_series.first_valid_index() : raw_series.last_valid_index()] - result[segment] = _find_change_points_segment( - series=series, change_point_model=change_point_model, **model_predict_params - ) + result[segment] = ruptures.get_change_points(df=df_segment, in_column=in_column) return result diff --git a/etna/transforms/__init__.py b/etna/transforms/__init__.py index 73b78efb9..c9e261b4b 100644 --- a/etna/transforms/__init__.py +++ b/etna/transforms/__init__.py @@ -1,6 +1,7 @@ from etna.transforms.base import PerSegmentWrapper from etna.transforms.base import Transform from etna.transforms.decomposition import BinsegTrendTransform +from etna.transforms.decomposition import ChangePointsSegmentationTransform from etna.transforms.decomposition import ChangePointsTrendTransform from etna.transforms.decomposition import LinearTrendTransform from etna.transforms.decomposition import STLTransform diff --git a/etna/transforms/decomposition/__init__.py b/etna/transforms/decomposition/__init__.py index 0b7777311..138e93bca 100644 --- a/etna/transforms/decomposition/__init__.py +++ b/etna/transforms/decomposition/__init__.py @@ -1,4 +1,6 @@ +from etna.transforms.decomposition.base_change_points import RupturesChangePointsModel from etna.transforms.decomposition.binseg import BinsegTrendTransform +from etna.transforms.decomposition.change_points_segmentation import ChangePointsSegmentationTransform from etna.transforms.decomposition.change_points_trend import ChangePointsTrendTransform from etna.transforms.decomposition.detrend import LinearTrendTransform from etna.transforms.decomposition.detrend import TheilSenTrendTransform diff --git a/etna/transforms/decomposition/base_change_points.py b/etna/transforms/decomposition/base_change_points.py new file mode 100644 index 000000000..2f0f562a8 --- /dev/null +++ b/etna/transforms/decomposition/base_change_points.py @@ -0,0 +1,108 @@ +from abc import ABC +from abc import abstractmethod +from typing import List +from typing import Tuple +from typing import Type + +import pandas as pd +from ruptures.base import BaseEstimator +from ruptures.costs import CostLinear +from sklearn.base import RegressorMixin + +TTimestampInterval = Tuple[pd.Timestamp, pd.Timestamp] +TDetrendModel = Type[RegressorMixin] + + +class BaseChangePointsModelAdapter(ABC): + """BaseChangePointsModelAdapter is the base class for change point models adapters.""" + + @abstractmethod + def get_change_points(self, df: pd.DataFrame, in_column: str) -> List[pd.Timestamp]: + """Find change points within one segment. + + Parameters + ---------- + df: + dataframe indexed with timestamp + in_column: + name of column to get change points + + Returns + ------- + change points: + change point timestamps + """ + pass + + @staticmethod + def _build_intervals(change_points: List[pd.Timestamp]) -> List[TTimestampInterval]: + """Create list of stable intervals from list of change points.""" + change_points.extend([pd.Timestamp.min, pd.Timestamp.max]) + change_points = sorted(change_points) + intervals = list(zip(change_points[:-1], change_points[1:])) + return intervals + + def get_change_points_intervals(self, df: pd.DataFrame, in_column: str) -> List[TTimestampInterval]: + """Find change point intervals in given dataframe and column. + + Parameters + ---------- + df: + dataframe indexed with timestamp + in_column: + name of column to get change points + + Returns + ------- + : + change points intervals + """ + change_points = self.get_change_points(df=df, in_column=in_column) + intervals = self._build_intervals(change_points=change_points) + return intervals + + +class RupturesChangePointsModel(BaseChangePointsModelAdapter): + """RupturesChangePointsModel is ruptures change point models adapter.""" + + def __init__(self, change_point_model: BaseEstimator, **change_point_model_predict_params): + """Init RupturesChangePointsModel. + + Parameters + ---------- + change_point_model: + model to get change points + change_point_model_predict_params: + params for ``change_point_model.predict`` method + """ + self.change_point_model = change_point_model + self.model_predict_params = change_point_model_predict_params + + def get_change_points(self, df: pd.DataFrame, in_column: str) -> List[pd.Timestamp]: + """Find change points within one segment. + + Parameters + ---------- + df: + dataframe indexed with timestamp + in_column: + name of column to get change points + + Returns + ------- + change points: + change point timestamps + """ + series = df.loc[df[in_column].first_valid_index() : df[in_column].last_valid_index(), in_column] + if series.isnull().values.any(): + raise ValueError("The input column contains NaNs in the middle of the series! Try to use the imputer.") + + signal = series.to_numpy() + if isinstance(self.change_point_model.cost, CostLinear): + signal = signal.reshape((-1, 1)) + timestamp = series.index + self.change_point_model.fit(signal=signal) + # last point in change points is the first index after the series + change_points_indices = self.change_point_model.predict(**self.model_predict_params)[:-1] + change_points = [timestamp[idx] for idx in change_points_indices] + return change_points diff --git a/etna/transforms/decomposition/change_points_segmentation.py b/etna/transforms/decomposition/change_points_segmentation.py new file mode 100644 index 000000000..c2c5fbffb --- /dev/null +++ b/etna/transforms/decomposition/change_points_segmentation.py @@ -0,0 +1,121 @@ +from typing import List +from typing import Optional + +import pandas as pd + +from etna.transforms.base import FutureMixin +from etna.transforms.base import PerSegmentWrapper +from etna.transforms.base import Transform +from etna.transforms.decomposition.base_change_points import BaseChangePointsModelAdapter +from etna.transforms.decomposition.base_change_points import TTimestampInterval + + +class _OneSegmentChangePointsSegmentationTransform(Transform): + """_OneSegmentChangePointsSegmentationTransform make label encoder to change points.""" + + def __init__(self, in_column: str, out_column: str, change_point_model: BaseChangePointsModelAdapter): + """Init _OneSegmentChangePointsSegmentationTransform. + Parameters + ---------- + in_column: + name of column to apply transform to + out_column: + result column name. If not given use ``self.__repr__()`` + change_point_model: + model to get change points + """ + self.in_column = in_column + self.out_column = out_column + self.intervals: Optional[List[TTimestampInterval]] = None + self.change_point_model = change_point_model + + def _fill_per_interval(self, series: pd.Series) -> pd.Series: + """Fill values in resulting series.""" + if self.intervals is None: + raise ValueError("Transform is not fitted! Fit the Transform before calling transform method.") + result_series = pd.Series(index=series.index) + for k, interval in enumerate(self.intervals): + tmp_series = series[interval[0] : interval[1]] + if tmp_series.empty: + continue + result_series[tmp_series.index] = k + return result_series.astype(int).astype("category") + + def fit(self, df: pd.DataFrame) -> "_OneSegmentChangePointsSegmentationTransform": + """Fit _OneSegmentChangePointsSegmentationTransform: find change points in ``df`` and build intervals. + + Parameters + ---------- + df: + one segment dataframe indexed with timestamp + + Returns + ------- + : + instance with trained change points + + Raises + ------ + ValueError + If series contains NaNs in the middle + """ + self.intervals = self.change_point_model.get_change_points_intervals(df=df, in_column=self.in_column) + return self + + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + """Split df to intervals. + + Parameters + ---------- + df: + one segment dataframe + + Returns + ------- + df: + df with new column + """ + series = df[self.in_column] + result_series = self._fill_per_interval(series=series) + df.loc[:, self.out_column] = result_series + return df + + +class ChangePointsSegmentationTransform(PerSegmentWrapper, FutureMixin): + """ChangePointsSegmentationTransform make label encoder to change points. + + Warning + ------- + This transform can suffer from look-ahead bias. For transforming data at some timestamp + it uses information from the whole train part. + """ + + def __init__( + self, + in_column: str, + change_point_model: BaseChangePointsModelAdapter, + out_column: Optional[str] = None, + ): + """Init ChangePointsSegmentationTransform. + + Parameterss + ---------- + in_column: + name of column to fit change point model + out_column: + result column name. If not given use ``self.__repr__()`` + change_point_model: + model to get change points + """ + self.in_column = in_column + self.out_column = out_column + self.change_point_model = change_point_model + if self.out_column is None: + self.out_column = repr(self) + super().__init__( + transform=_OneSegmentChangePointsSegmentationTransform( + in_column=self.in_column, + out_column=self.out_column, + change_point_model=self.change_point_model, + ) + ) diff --git a/etna/transforms/decomposition/change_points_trend.py b/etna/transforms/decomposition/change_points_trend.py index f99e116b6..c179c9ac8 100644 --- a/etna/transforms/decomposition/change_points_trend.py +++ b/etna/transforms/decomposition/change_points_trend.py @@ -10,12 +10,12 @@ from ruptures.base import BaseEstimator from sklearn.base import RegressorMixin -from etna.analysis.change_points_trend.search import _find_change_points_segment from etna.transforms.base import PerSegmentWrapper from etna.transforms.base import Transform +from etna.transforms.decomposition.base_change_points import RupturesChangePointsModel +from etna.transforms.decomposition.base_change_points import TTimestampInterval from etna.transforms.utils import match_target_quantiles -TTimestampInterval = Tuple[pd.Timestamp, pd.Timestamp] TDetrendModel = Type[RegressorMixin] @@ -37,6 +37,7 @@ def __init__( name of column to apply transform to change_point_model: model to get trend change points + TODO: replace this parameters with the instance of BaseChangePointsModelAdapter in ETNA 2.0 detrend_model: model to get trend in data change_point_model_predict_params: @@ -44,25 +45,15 @@ def __init__( """ self.in_column = in_column self.out_columns = in_column - self.change_point_model = change_point_model + self.ruptures_change_point_model = RupturesChangePointsModel( + change_point_model=change_point_model, **change_point_model_predict_params + ) self.detrend_model = detrend_model self.per_interval_models: Optional[Dict[TTimestampInterval, TDetrendModel]] = None self.intervals: Optional[List[TTimestampInterval]] = None + self.change_point_model = change_point_model self.change_point_model_predict_params = change_point_model_predict_params - @staticmethod - def _build_trend_intervals(change_points: List[pd.Timestamp]) -> List[TTimestampInterval]: - """Create list of stable trend intervals from list of change points.""" - change_points = sorted(change_points) - left_border = pd.Timestamp.min - intervals = [] - for point in change_points: - right_border = point - intervals.append((left_border, right_border)) - left_border = right_border - intervals.append((left_border, pd.Timestamp.max)) - return intervals - def _init_detrend_models( self, intervals: List[TTimestampInterval] ) -> Dict[Tuple[pd.Timestamp, pd.Timestamp], TDetrendModel]: @@ -112,14 +103,10 @@ def fit(self, df: pd.DataFrame) -> "_OneSegmentChangePointsTrendTransform": ------- : """ - series = df.loc[df[self.in_column].first_valid_index() : df[self.in_column].last_valid_index(), self.in_column] - if series.isnull().values.any(): - raise ValueError("The input column contains NaNs in the middle of the series! Try to use the imputer.") - change_points = _find_change_points_segment( - series=series, change_point_model=self.change_point_model, **self.change_point_model_predict_params - ) - self.intervals = self._build_trend_intervals(change_points=change_points) + self.intervals = self.ruptures_change_point_model.get_change_points_intervals(df=df, in_column=self.in_column) self.per_interval_models = self._init_detrend_models(intervals=self.intervals) + + series = df.loc[df[self.in_column].first_valid_index() : df[self.in_column].last_valid_index(), self.in_column] self._fit_per_interval_model(series=series) return self @@ -190,6 +177,7 @@ def __init__( name of column to apply transform to change_point_model: model to get trend change points + TODO: replace this parameters with the instance of BaseChangePointsModelAdapter in ETNA 2.0 detrend_model: model to get trend in data change_point_model_predict_params: diff --git a/tests/test_transforms/test_decomposition/test_base_change_points.py b/tests/test_transforms/test_decomposition/test_base_change_points.py new file mode 100644 index 000000000..852e6cca6 --- /dev/null +++ b/tests/test_transforms/test_decomposition/test_base_change_points.py @@ -0,0 +1,71 @@ +import numpy as np +import pandas as pd +import pytest +from ruptures import Binseg + +from etna.datasets import TSDataset +from etna.datasets import generate_ar_df +from etna.transforms.decomposition.base_change_points import BaseChangePointsModelAdapter +from etna.transforms.decomposition.base_change_points import RupturesChangePointsModel + +N_BKPS = 5 + + +@pytest.fixture +def df_with_nans() -> pd.DataFrame: + """Generate pd.DataFrame with timestamp.""" + df = pd.DataFrame({"timestamp": pd.date_range("2019-12-01", "2019-12-31")}) + tmp = np.zeros(31) + tmp[8] = None + df["target"] = tmp + df["segment"] = "segment_1" + df = TSDataset.to_dataset(df=df) + return df["segment_1"] + + +@pytest.fixture +def simple_ar_df(random_seed): + df = generate_ar_df(periods=125, start_time="2021-05-20", n_segments=1, ar_coef=[2], freq="D") + df_ts_format = TSDataset.to_dataset(df)["segment_0"] + return df_ts_format + + +def test_fit_transform_with_nans_in_middle_raise_error(df_with_nans): + change_point_model = RupturesChangePointsModel(change_point_model=Binseg(), n_bkps=N_BKPS) + with pytest.raises(ValueError, match="The input column contains NaNs in the middle of the series!"): + _ = change_point_model.get_change_points_intervals(df=df_with_nans, in_column="target") + + +def test_build_intervals(): + """Check correctness of intervals generation with list of change points.""" + change_points = [pd.Timestamp("2020-01-01"), pd.Timestamp("2020-01-18"), pd.Timestamp("2020-02-24")] + expected_intervals = [ + (pd.Timestamp.min, pd.Timestamp("2020-01-01")), + (pd.Timestamp("2020-01-01"), pd.Timestamp("2020-01-18")), + (pd.Timestamp("2020-01-18"), pd.Timestamp("2020-02-24")), + (pd.Timestamp("2020-02-24"), pd.Timestamp.max), + ] + intervals = BaseChangePointsModelAdapter._build_intervals(change_points=change_points) + assert isinstance(intervals, list) + assert len(intervals) == 4 + for (exp_left, exp_right), (real_left, real_right) in zip(expected_intervals, intervals): + assert exp_left == real_left + assert exp_right == real_right + + +def test_get_change_points_intervals_format(simple_ar_df): + change_point_model = RupturesChangePointsModel(change_point_model=Binseg(), n_bkps=N_BKPS) + intervals = change_point_model.get_change_points_intervals(df=simple_ar_df, in_column="target") + assert isinstance(intervals, list) + assert len(intervals) == N_BKPS + 1 + for interval in intervals: + assert len(interval) == 2 + + +def test_get_change_points_format(simple_ar_df): + change_point_model = RupturesChangePointsModel(change_point_model=Binseg(), n_bkps=N_BKPS) + intervals = change_point_model.get_change_points(df=simple_ar_df, in_column="target") + assert isinstance(intervals, list) + assert len(intervals) == N_BKPS + for interval in intervals: + assert isinstance(interval, pd.Timestamp) diff --git a/tests/test_transforms/test_decomposition/test_change_points_segmentation_transform.py b/tests/test_transforms/test_decomposition/test_change_points_segmentation_transform.py new file mode 100644 index 000000000..f8320f68a --- /dev/null +++ b/tests/test_transforms/test_decomposition/test_change_points_segmentation_transform.py @@ -0,0 +1,124 @@ +import numpy as np +import pandas as pd +import pytest +from ruptures import Binseg + +from etna.datasets import TSDataset +from etna.datasets import generate_ar_df +from etna.metrics import SMAPE +from etna.models import CatBoostModelPerSegment +from etna.pipeline import Pipeline +from etna.transforms import ChangePointsSegmentationTransform +from etna.transforms.decomposition.base_change_points import RupturesChangePointsModel +from etna.transforms.decomposition.change_points_segmentation import _OneSegmentChangePointsSegmentationTransform + +OUT_COLUMN = "result" +N_BKPS = 5 + + +@pytest.fixture +def pre_transformed_df() -> pd.DataFrame: + """Generate pd.DataFrame with timestamp.""" + df = pd.DataFrame({"timestamp": pd.date_range("2019-12-01", "2019-12-31")}) + df["target"] = 0 + df["segment"] = "segment_1" + df = TSDataset.to_dataset(df=df) + return df + + +@pytest.fixture +def simple_ar_ts(random_seed): + df = generate_ar_df(periods=125, start_time="2021-05-20", n_segments=3, ar_coef=[2], freq="D") + df_ts_format = TSDataset.to_dataset(df) + return TSDataset(df_ts_format, freq="D") + + +@pytest.fixture +def multitrend_df_with_nans_in_tails(multitrend_df): + multitrend_df.loc[ + [multitrend_df.index[0], multitrend_df.index[1], multitrend_df.index[-2], multitrend_df.index[-1]], + pd.IndexSlice["segment_1", "target"], + ] = None + return multitrend_df + + +def test_fit_one_segment(pre_transformed_df: pd.DataFrame): + """Check that fit method save intervals.""" + change_point_model = RupturesChangePointsModel(change_point_model=Binseg(), n_bkps=N_BKPS) + bs = _OneSegmentChangePointsSegmentationTransform( + in_column="target", change_point_model=change_point_model, out_column=OUT_COLUMN + ) + bs.fit(df=pre_transformed_df["segment_1"]) + assert bs.intervals is not None + + +def test_transform_format_one_segment(pre_transformed_df: pd.DataFrame): + """Check that transform method generate new column.""" + change_point_model = RupturesChangePointsModel(change_point_model=Binseg(), n_bkps=N_BKPS) + bs = _OneSegmentChangePointsSegmentationTransform( + in_column="target", change_point_model=change_point_model, out_column=OUT_COLUMN + ) + bs.fit(df=pre_transformed_df["segment_1"]) + transformed = bs.transform(df=pre_transformed_df["segment_1"]) + assert set(transformed.columns) == {"target", OUT_COLUMN} + assert transformed[OUT_COLUMN].dtype == "category" + + +def test_monotonously_result(pre_transformed_df: pd.DataFrame): + """Check that resulting column is monotonously non-decreasing.""" + change_point_model = RupturesChangePointsModel(change_point_model=Binseg(), n_bkps=N_BKPS) + bs = _OneSegmentChangePointsSegmentationTransform( + in_column="target", change_point_model=change_point_model, out_column=OUT_COLUMN + ) + bs.fit(df=pre_transformed_df["segment_1"]) + + transformed = bs.transform(df=pre_transformed_df["segment_1"].copy(deep=True)) + result = transformed[OUT_COLUMN].astype(int).values + assert (result[1:] - result[:-1] >= 0).mean() == 1 + + +def test_transform_raise_error_if_not_fitted(pre_transformed_df: pd.DataFrame): + """Test that transform for one segment raise error when calling transform without being fit.""" + change_point_model = RupturesChangePointsModel(change_point_model=Binseg(), n_bkps=N_BKPS) + transform = _OneSegmentChangePointsSegmentationTransform( + in_column="target", change_point_model=change_point_model, out_column=OUT_COLUMN + ) + with pytest.raises(ValueError, match="Transform is not fitted!"): + _ = transform.transform(df=pre_transformed_df["segment_1"]) + + +def test_backtest(simple_ar_ts): + model = CatBoostModelPerSegment() + horizon = 3 + change_point_model = RupturesChangePointsModel(change_point_model=Binseg(), n_bkps=N_BKPS) + bs = ChangePointsSegmentationTransform( + in_column="target", change_point_model=change_point_model, out_column=OUT_COLUMN + ) + pipeline = Pipeline(model=model, transforms=[bs], horizon=horizon) + _, _, _ = pipeline.backtest(ts=simple_ar_ts, metrics=[SMAPE()], n_folds=3) + + +def test_future_and_past_filling(simple_ar_ts): + change_point_model = RupturesChangePointsModel(change_point_model=Binseg(), n_bkps=N_BKPS) + bs = ChangePointsSegmentationTransform( + in_column="target", change_point_model=change_point_model, out_column=OUT_COLUMN + ) + before, ts = simple_ar_ts.train_test_split(test_start="2021-06-01") + train, after = ts.train_test_split(test_start="2021-08-01") + bs.fit_transform(train.df) + before = bs.transform(before.df) + after = bs.transform(after.df) + for seg in train.segments: + assert np.sum(np.abs(before[seg][OUT_COLUMN].astype(int))) == 0 + assert (after[seg][OUT_COLUMN].astype(int) == 5).all() + + +def test_make_future(simple_ar_ts): + change_point_model = RupturesChangePointsModel(change_point_model=Binseg(), n_bkps=N_BKPS) + bs = ChangePointsSegmentationTransform( + in_column="target", change_point_model=change_point_model, out_column=OUT_COLUMN + ) + simple_ar_ts.fit_transform(transforms=[bs]) + future = simple_ar_ts.make_future(10) + for seg in simple_ar_ts.segments: + assert (future.to_pandas()[seg][OUT_COLUMN].astype(int) == 5).all() diff --git a/tests/test_transforms/test_decomposition/test_change_points_trend_transform.py b/tests/test_transforms/test_decomposition/test_change_points_trend_transform.py index f6b8bb50c..cc2891513 100644 --- a/tests/test_transforms/test_decomposition/test_change_points_trend_transform.py +++ b/tests/test_transforms/test_decomposition/test_change_points_trend_transform.py @@ -38,23 +38,6 @@ def multitrend_df_with_nans_in_tails(multitrend_df): return multitrend_df -def test_build_trend_intervals(): - """Check correctness of intervals generation with list of change points.""" - change_points = [pd.Timestamp("2020-01-01"), pd.Timestamp("2020-01-18"), pd.Timestamp("2020-02-24")] - expected_intervals = [ - (pd.Timestamp.min, pd.Timestamp("2020-01-01")), - (pd.Timestamp("2020-01-01"), pd.Timestamp("2020-01-18")), - (pd.Timestamp("2020-01-18"), pd.Timestamp("2020-02-24")), - (pd.Timestamp("2020-02-24"), pd.Timestamp.max), - ] - intervals = _OneSegmentChangePointsTrendTransform._build_trend_intervals(change_points=change_points) - assert isinstance(intervals, list) - assert len(intervals) == 4 - for (exp_left, exp_right), (real_left, real_right) in zip(expected_intervals, intervals): - assert exp_left == real_left - assert exp_right == real_right - - def test_models_after_fit(multitrend_df: pd.DataFrame): """Check that fit method generates correct number of detrend model's copies.""" bs = _OneSegmentChangePointsTrendTransform(