Skip to content

Add auto base and auto abstract #1114

Merged
merged 16 commits into from
Feb 22, 2023
Merged
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Method `set_params` to change parameters of ETNA objects [#1102](https://github.com/tinkoff-ai/etna/pull/1102)
-
### Changed


- Create `AutoBase` and `AutoAbstract` classes, some of `Auto` class's logic moved there ([#1114](https://github.com/tinkoff-ai/etna/pull/1114))
- Impose specific order of columns on return value of TSDataset.to_flatten ([#1095](https://github.com/tinkoff-ai/etna/pull/1095))
-
### Fixed

- Fix bug in `GaleShapleyFeatureSelectionTransform` with wrong number of remaining features ([#1110](https://github.com/tinkoff-ai/etna/pull/1110))
Expand Down
258 changes: 201 additions & 57 deletions etna/auto/auto.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from abc import ABC
from abc import abstractmethod
from typing import Callable
from typing import List
from typing import Optional
Expand Down Expand Up @@ -37,8 +39,96 @@ def __call__(self, pipeline: Pipeline) -> None:
...


class Auto:
"""Automatic pipeline selection via defined or custom pipeline pool."""
class AutoAbstract(ABC):
"""Interface for ``Auto`` object."""

@abstractmethod
def fit(
self,
ts: TSDataset,
timeout: Optional[int] = None,
n_trials: Optional[int] = None,
initializer: Optional[_Initializer] = None,
callback: Optional[_Callback] = None,
**optuna_kwargs,
) -> Pipeline:
"""
Start automatic pipeline selection.

Parameters
----------
ts:
tsdataset to fit on
timeout:
timeout for optuna. N.B. this is timeout for each worker
n_trials:
number of trials for optuna. N.B. this is number of trials for each worker
initializer:
is called before each pipeline backtest, can be used to initialize loggers
callback:
is called after each pipeline backtest, can be used to log extra metrics
optuna_kwargs:
additional kwargs for optuna :py:meth:`optuna.study.Study.optimize`
"""
pass

@abstractmethod
def _init_optuna(self):
"""Initialize optuna."""

@abstractmethod
def summary(self) -> pd.DataFrame:
"""Get Auto trials summary."""
pass

@abstractmethod
def top_k(self, k: int = 5) -> List[Pipeline]:
"""
Get top k pipelines.

Parameters
----------
k:
number of pipelines to return
"""
pass

@staticmethod
@abstractmethod
def objective(
ts: TSDataset,
target_metric: Metric,
metric_aggregation: MetricAggregationStatistics,
metrics: List[Metric],
backtest_params: dict,
initializer: Optional[_Initializer] = None,
callback: Optional[_Callback] = None,
) -> Callable[[Trial], float]:
"""
Optuna objective wrapper.

Parameters
----------
ts:
tsdataset to fit on
target_metric:
metric to optimize
metric_aggregation:
aggregation method for per-segment metrics
metrics:
list of metrics to compute
backtest_params:
custom parameters for backtest instead of default backtest parameters
initializer:
is called before each pipeline backtest, can be used to initialize loggers
callback:
is called after each pipeline backtest, can be used to log extra metrics
"""
pass


class AutoBase(AutoAbstract):
GooseIt marked this conversation as resolved.
Show resolved Hide resolved
"""Base Class for ``Auto`` and ``Tune``, implementing core logic behind these classes."""

def __init__(
self,
Expand All @@ -47,13 +137,12 @@ def __init__(
metric_aggregation: MetricAggregationStatistics = "mean",
backtest_params: Optional[dict] = None,
experiment_folder: Optional[str] = None,
pool: Union[Pool, List[Pipeline]] = Pool.default,
runner: Optional[AbstractRunner] = None,
storage: Optional[BaseStorage] = None,
metrics: Optional[List[Metric]] = None,
):
"""
Initialize Auto class.
Initialize AutoBase class.

Parameters
----------
Expand All @@ -67,8 +156,6 @@ def __init__(
custom parameters for backtest instead of default backtest parameters
experiment_folder:
folder to store experiment results and name for optuna study
pool:
pool of pipelines to choose from
runner:
runner to use for distributed training
storage:
Expand All @@ -79,12 +166,11 @@ def __init__(
if target_metric.greater_is_better is None:
raise ValueError("target_metric.greater_is_better is None")
self.target_metric = target_metric

self.metric_aggregation = metric_aggregation
self.backtest_params = {} if backtest_params is None else backtest_params
self.horizon = horizon
self.metric_aggregation: MetricAggregationStatistics = metric_aggregation
self.backtest_params = {} if backtest_params is None else backtest_params
self.experiment_folder = experiment_folder
self.pool = pool

self.runner = LocalRunner() if runner is None else runner
self.storage = RDBStorage("sqlite:///etna-auto.db") if storage is None else storage

Expand All @@ -94,6 +180,94 @@ def __init__(
self.metrics = metrics
self._optuna: Optional[Optuna] = None

def summary(self) -> pd.DataFrame:
"""Get Auto trials summary.

Returns
-------
study_dataframe:
dataframe with detailed info on each performed trial
"""
if self._optuna is None:
self._optuna = self._init_optuna()

study = self._optuna.study.get_trials()

study_params = [
{**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs["pipeline"]), "state": trial.state}
for trial in study
]

return pd.DataFrame(study_params)

def top_k(self, k: int = 5) -> List[Pipeline]:
"""
Get top k pipelines.

Parameters
----------
k:
number of pipelines to return
"""
summary = self.summary()
df = summary.sort_values(
by=[f"{self.target_metric.name}_{self.metric_aggregation}"],
ascending=(not self.target_metric.greater_is_better),
)
return [pipeline for pipeline in df["pipeline"].values[:k]] # noqa: C416


class Auto(AutoBase):
"""Automatic pipeline selection via defined or custom pipeline pool."""

def __init__(
self,
target_metric: Metric,
horizon: int,
metric_aggregation: MetricAggregationStatistics = "mean",
backtest_params: Optional[dict] = None,
experiment_folder: Optional[str] = None,
pool: Union[Pool, List[Pipeline]] = Pool.default,
runner: Optional[AbstractRunner] = None,
storage: Optional[BaseStorage] = None,
metrics: Optional[List[Metric]] = None,
):
"""
Initialize Auto class.

Parameters
----------
target_metric:
metric to optimize
horizon:
horizon to forecast for
metric_aggregation:
aggregation method for per-segment metrics
backtest_params:
custom parameters for backtest instead of default backtest parameters
experiment_folder:
folder to store experiment results and name for optuna study
pool:
pool of pipelines to choose from
runner:
runner to use for distributed training
storage:
optuna storage to use
metrics:
list of metrics to compute
"""
super().__init__(
target_metric=target_metric,
horizon=horizon,
metric_aggregation=metric_aggregation,
backtest_params=backtest_params,
experiment_folder=experiment_folder,
runner=runner,
storage=storage,
metrics=metrics,
)
self.pool = pool

def fit(
self,
ts: TSDataset,
Expand Down Expand Up @@ -142,53 +316,6 @@ def fit(

return get_from_params(**self._optuna.study.best_trial.user_attrs["pipeline"])

def _init_optuna(self):
"""Initialize optuna."""
if isinstance(self.pool, Pool):
pool: List[Pipeline] = self.pool.value.generate(horizon=self.horizon)
else:
pool = self.pool

pool_ = [pipeline.to_dict() for pipeline in pool]

optuna = Optuna(
direction="maximize" if self.target_metric.greater_is_better else "minimize",
study_name=self.experiment_folder,
storage=self.storage,
sampler=ConfigSampler(configs=pool_),
)
return optuna

def summary(self) -> pd.DataFrame:
"""Get Auto trials summary."""
if self._optuna is None:
self._optuna = self._init_optuna()

study = self._optuna.study.get_trials()

study_params = [
{**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs["pipeline"]), "state": trial.state}
for trial in study
]

return pd.DataFrame(study_params)

def top_k(self, k: int = 5) -> List[Pipeline]:
"""
Get top k pipelines.

Parameters
----------
k:
number of pipelines to return
"""
summary = self.summary()
df = summary.sort_values(
by=[f"{self.target_metric.name}_{self.metric_aggregation}"],
ascending=(not self.target_metric.greater_is_better),
)
return [pipeline for pipeline in df["pipeline"].values[:k]] # noqa: C416

@staticmethod
def objective(
ts: TSDataset,
Expand Down Expand Up @@ -243,3 +370,20 @@ def _objective(trial: Trial) -> float:
return aggregated_metrics[f"{target_metric.name}_{metric_aggregation}"]

return _objective

def _init_optuna(self):
"""Initialize optuna."""
if isinstance(self.pool, Pool):
pool: List[Pipeline] = self.pool.value.generate(horizon=self.horizon)
else:
pool = self.pool

pool_ = [pipeline.to_dict() for pipeline in pool]

optuna = Optuna(
direction="maximize" if self.target_metric.greater_is_better else "minimize",
study_name=self.experiment_folder,
storage=self.storage,
sampler=ConfigSampler(configs=pool_),
)
return optuna
7 changes: 4 additions & 3 deletions tests/test_auto/test_auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing_extensions import NamedTuple

from etna.auto import Auto
from etna.auto.auto import AutoBase
from etna.auto.auto import _Callback
from etna.auto.auto import _Initializer
from etna.metrics import MAE
Expand Down Expand Up @@ -124,7 +125,7 @@ def test_summary(
auto=MagicMock(),
):
auto._optuna.study.get_trials.return_value = trials
df_summary = Auto.summary(self=auto)
df_summary = AutoBase.summary(self=auto)
assert len(df_summary) == len(trials)
assert list(df_summary["SMAPE_median"].values) == [trial.user_attrs["SMAPE_median"] for trial in trials]

Expand All @@ -140,8 +141,8 @@ def test_top_k(
auto.metric_aggregation = "median"
auto.target_metric.greater_is_better = False

df_summary = Auto.summary(self=auto)
df_summary = AutoBase.summary(self=auto)
auto.summary = MagicMock(return_value=df_summary)
top_k = Auto.top_k(auto, k=k)
top_k = AutoBase.top_k(auto, k=k)
assert len(top_k) == k
assert [pipeline.model.lag for pipeline in top_k] == [i for i in range(k)] # noqa C416