diff --git a/CHANGELOG.md b/CHANGELOG.md index 4abcbf57f..682e34606 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,9 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Method `set_params` to change parameters of ETNA objects [#1102](https://github.com/tinkoff-ai/etna/pull/1102) - ### Changed - + +- Create `AutoBase` and `AutoAbstract` classes, some of `Auto` class's logic moved there ([#1114](https://github.com/tinkoff-ai/etna/pull/1114)) - Impose specific order of columns on return value of TSDataset.to_flatten ([#1095](https://github.com/tinkoff-ai/etna/pull/1095)) -- ### Fixed - Fix bug in `GaleShapleyFeatureSelectionTransform` with wrong number of remaining features ([#1110](https://github.com/tinkoff-ai/etna/pull/1110)) diff --git a/etna/auto/auto.py b/etna/auto/auto.py index 1188a135e..c7286fc3b 100644 --- a/etna/auto/auto.py +++ b/etna/auto/auto.py @@ -1,3 +1,5 @@ +from abc import ABC +from abc import abstractmethod from typing import Callable from typing import List from typing import Optional @@ -37,8 +39,96 @@ def __call__(self, pipeline: Pipeline) -> None: ... -class Auto: - """Automatic pipeline selection via defined or custom pipeline pool.""" +class AutoAbstract(ABC): + """Interface for ``Auto`` object.""" + + @abstractmethod + def fit( + self, + ts: TSDataset, + timeout: Optional[int] = None, + n_trials: Optional[int] = None, + initializer: Optional[_Initializer] = None, + callback: Optional[_Callback] = None, + **optuna_kwargs, + ) -> Pipeline: + """ + Start automatic pipeline selection. + + Parameters + ---------- + ts: + tsdataset to fit on + timeout: + timeout for optuna. N.B. this is timeout for each worker + n_trials: + number of trials for optuna. N.B. this is number of trials for each worker + initializer: + is called before each pipeline backtest, can be used to initialize loggers + callback: + is called after each pipeline backtest, can be used to log extra metrics + optuna_kwargs: + additional kwargs for optuna :py:meth:`optuna.study.Study.optimize` + """ + pass + + @abstractmethod + def _init_optuna(self): + """Initialize optuna.""" + + @abstractmethod + def summary(self) -> pd.DataFrame: + """Get Auto trials summary.""" + pass + + @abstractmethod + def top_k(self, k: int = 5) -> List[Pipeline]: + """ + Get top k pipelines. + + Parameters + ---------- + k: + number of pipelines to return + """ + pass + + @staticmethod + @abstractmethod + def objective( + ts: TSDataset, + target_metric: Metric, + metric_aggregation: MetricAggregationStatistics, + metrics: List[Metric], + backtest_params: dict, + initializer: Optional[_Initializer] = None, + callback: Optional[_Callback] = None, + ) -> Callable[[Trial], float]: + """ + Optuna objective wrapper. + + Parameters + ---------- + ts: + tsdataset to fit on + target_metric: + metric to optimize + metric_aggregation: + aggregation method for per-segment metrics + metrics: + list of metrics to compute + backtest_params: + custom parameters for backtest instead of default backtest parameters + initializer: + is called before each pipeline backtest, can be used to initialize loggers + callback: + is called after each pipeline backtest, can be used to log extra metrics + """ + pass + + +class AutoBase(AutoAbstract): + """Base Class for ``Auto`` and ``Tune``, implementing core logic behind these classes.""" def __init__( self, @@ -47,13 +137,12 @@ def __init__( metric_aggregation: MetricAggregationStatistics = "mean", backtest_params: Optional[dict] = None, experiment_folder: Optional[str] = None, - pool: Union[Pool, List[Pipeline]] = Pool.default, runner: Optional[AbstractRunner] = None, storage: Optional[BaseStorage] = None, metrics: Optional[List[Metric]] = None, ): """ - Initialize Auto class. + Initialize AutoBase class. Parameters ---------- @@ -67,8 +156,6 @@ def __init__( custom parameters for backtest instead of default backtest parameters experiment_folder: folder to store experiment results and name for optuna study - pool: - pool of pipelines to choose from runner: runner to use for distributed training storage: @@ -79,12 +166,11 @@ def __init__( if target_metric.greater_is_better is None: raise ValueError("target_metric.greater_is_better is None") self.target_metric = target_metric - - self.metric_aggregation = metric_aggregation - self.backtest_params = {} if backtest_params is None else backtest_params self.horizon = horizon + self.metric_aggregation: MetricAggregationStatistics = metric_aggregation + self.backtest_params = {} if backtest_params is None else backtest_params self.experiment_folder = experiment_folder - self.pool = pool + self.runner = LocalRunner() if runner is None else runner self.storage = RDBStorage("sqlite:///etna-auto.db") if storage is None else storage @@ -94,6 +180,94 @@ def __init__( self.metrics = metrics self._optuna: Optional[Optuna] = None + def summary(self) -> pd.DataFrame: + """Get Auto trials summary. + + Returns + ------- + study_dataframe: + dataframe with detailed info on each performed trial + """ + if self._optuna is None: + self._optuna = self._init_optuna() + + study = self._optuna.study.get_trials() + + study_params = [ + {**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs["pipeline"]), "state": trial.state} + for trial in study + ] + + return pd.DataFrame(study_params) + + def top_k(self, k: int = 5) -> List[Pipeline]: + """ + Get top k pipelines. + + Parameters + ---------- + k: + number of pipelines to return + """ + summary = self.summary() + df = summary.sort_values( + by=[f"{self.target_metric.name}_{self.metric_aggregation}"], + ascending=(not self.target_metric.greater_is_better), + ) + return [pipeline for pipeline in df["pipeline"].values[:k]] # noqa: C416 + + +class Auto(AutoBase): + """Automatic pipeline selection via defined or custom pipeline pool.""" + + def __init__( + self, + target_metric: Metric, + horizon: int, + metric_aggregation: MetricAggregationStatistics = "mean", + backtest_params: Optional[dict] = None, + experiment_folder: Optional[str] = None, + pool: Union[Pool, List[Pipeline]] = Pool.default, + runner: Optional[AbstractRunner] = None, + storage: Optional[BaseStorage] = None, + metrics: Optional[List[Metric]] = None, + ): + """ + Initialize Auto class. + + Parameters + ---------- + target_metric: + metric to optimize + horizon: + horizon to forecast for + metric_aggregation: + aggregation method for per-segment metrics + backtest_params: + custom parameters for backtest instead of default backtest parameters + experiment_folder: + folder to store experiment results and name for optuna study + pool: + pool of pipelines to choose from + runner: + runner to use for distributed training + storage: + optuna storage to use + metrics: + list of metrics to compute + """ + super().__init__( + target_metric=target_metric, + horizon=horizon, + metric_aggregation=metric_aggregation, + backtest_params=backtest_params, + experiment_folder=experiment_folder, + runner=runner, + storage=storage, + metrics=metrics, + ) + self.pool = pool + def fit( self, ts: TSDataset, @@ -142,53 +316,6 @@ def fit( return get_from_params(**self._optuna.study.best_trial.user_attrs["pipeline"]) - def _init_optuna(self): - """Initialize optuna.""" - if isinstance(self.pool, Pool): - pool: List[Pipeline] = self.pool.value.generate(horizon=self.horizon) - else: - pool = self.pool - - pool_ = [pipeline.to_dict() for pipeline in pool] - - optuna = Optuna( - direction="maximize" if self.target_metric.greater_is_better else "minimize", - study_name=self.experiment_folder, - storage=self.storage, - sampler=ConfigSampler(configs=pool_), - ) - return optuna - - def summary(self) -> pd.DataFrame: - """Get Auto trials summary.""" - if self._optuna is None: - self._optuna = self._init_optuna() - - study = self._optuna.study.get_trials() - - study_params = [ - {**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs["pipeline"]), "state": trial.state} - for trial in study - ] - - return pd.DataFrame(study_params) - - def top_k(self, k: int = 5) -> List[Pipeline]: - """ - Get top k pipelines. - - Parameters - ---------- - k: - number of pipelines to return - """ - summary = self.summary() - df = summary.sort_values( - by=[f"{self.target_metric.name}_{self.metric_aggregation}"], - ascending=(not self.target_metric.greater_is_better), - ) - return [pipeline for pipeline in df["pipeline"].values[:k]] # noqa: C416 - @staticmethod def objective( ts: TSDataset, @@ -243,3 +370,20 @@ def _objective(trial: Trial) -> float: return aggregated_metrics[f"{target_metric.name}_{metric_aggregation}"] return _objective + + def _init_optuna(self): + """Initialize optuna.""" + if isinstance(self.pool, Pool): + pool: List[Pipeline] = self.pool.value.generate(horizon=self.horizon) + else: + pool = self.pool + + pool_ = [pipeline.to_dict() for pipeline in pool] + + optuna = Optuna( + direction="maximize" if self.target_metric.greater_is_better else "minimize", + study_name=self.experiment_folder, + storage=self.storage, + sampler=ConfigSampler(configs=pool_), + ) + return optuna diff --git a/tests/test_auto/test_auto.py b/tests/test_auto/test_auto.py index 6d56b5bf8..4061aec55 100644 --- a/tests/test_auto/test_auto.py +++ b/tests/test_auto/test_auto.py @@ -8,6 +8,7 @@ from typing_extensions import NamedTuple from etna.auto import Auto +from etna.auto.auto import AutoBase from etna.auto.auto import _Callback from etna.auto.auto import _Initializer from etna.metrics import MAE @@ -124,7 +125,7 @@ def test_summary( auto=MagicMock(), ): auto._optuna.study.get_trials.return_value = trials - df_summary = Auto.summary(self=auto) + df_summary = AutoBase.summary(self=auto) assert len(df_summary) == len(trials) assert list(df_summary["SMAPE_median"].values) == [trial.user_attrs["SMAPE_median"] for trial in trials] @@ -140,8 +141,8 @@ def test_top_k( auto.metric_aggregation = "median" auto.target_metric.greater_is_better = False - df_summary = Auto.summary(self=auto) + df_summary = AutoBase.summary(self=auto) auto.summary = MagicMock(return_value=df_summary) - top_k = Auto.top_k(auto, k=k) + top_k = AutoBase.top_k(auto, k=k) assert len(top_k) == k assert [pipeline.model.lag for pipeline in top_k] == [i for i in range(k)] # noqa C416