From 208bd1be202f7752c2386bcb9a9a0bed1c0adde0 Mon Sep 17 00:00:00 2001 From: Sergey Serebryakov Date: Thu, 8 Feb 2024 00:50:43 +0000 Subject: [PATCH] New `experiment run` command. This commit adds new `experiment run` command that simplifies (to some extent) the process of running ML experiments. This command accepts only one argument - path to the experiment configuration file in YAML format. Previous mechanism to run experiments (experiment `experiment train` and `experiment search_hp`) is still supported, but will likely be removed in future releases. For more details, run the following command: ```shell python -m xtime.main experiment --help ``` 1. Create a template experiment configuration file by running ```shell python -m xtime.main experiment create EXPERIMENT_TYPE EXPERIMENT_DEFINITION_FILE ``` where EXPERIMENT_TYPE is the name of one of supported experiments (`train` or `search_hp`) and EXPERIMENT_DEFINITION_FILE is the path to the YAML file that will be created. 2. Edit that file. The source code of these two experiments provides detailed description of each parameters in this file, but many of them have the same semantics as current CLI API. 3. Run experiment by executing the following command: ```shell python -m xtime.main EXPERIMENT_DEFINITION_FILE ``` This file be serialized in experiment's artifact directory, so it can later be reproduced. This commit also fixes several bugs: - In some cases, the code ignores the experiment name provided by users and uses the `default` experiment. This is fixed now. This commit adds new dependencies: - New python dependency - `OmegaConf` library to interact with configurations of experiments. --- training/poetry.lock | 27 ++- training/pyproject.toml | 1 + training/tests/stages/test_search_hp.py | 9 +- training/tests/stages/test_train.py | 17 ++ training/tests/test_cli.py | 4 + training/xtime/contrib/mlflow_ext.py | 97 +++++---- training/xtime/contrib/tune_ext.py | 2 +- training/xtime/contrib/utils.py | 54 +++++ training/xtime/estimators/estimator.py | 22 ++- training/xtime/hparams/__init__.py | 2 + training/xtime/hparams/_hparams.py | 32 ++- training/xtime/io.py | 8 +- training/xtime/main.py | 99 ++++++++-- training/xtime/stages/search_hp.py | 252 +++++++++++++++++------- training/xtime/stages/train.py | 70 +++++-- 15 files changed, 545 insertions(+), 151 deletions(-) create mode 100644 training/tests/stages/test_train.py create mode 100644 training/xtime/contrib/utils.py diff --git a/training/poetry.lock b/training/poetry.lock index ae5432b..a011f2d 100644 --- a/training/poetry.lock +++ b/training/poetry.lock @@ -154,6 +154,16 @@ files = [ {file = "ansicon-1.89.0.tar.gz", hash = "sha256:e4d039def5768a47e4afec8e89e83ec3ae5a26bf00ad851f914d1240b444d2b1"}, ] +[[package]] +name = "antlr4-python3-runtime" +version = "4.9.3" +description = "ANTLR 4.9.3 runtime for Python 3.7" +optional = false +python-versions = "*" +files = [ + {file = "antlr4-python3-runtime-4.9.3.tar.gz", hash = "sha256:f224469b4168294902bb1efa80a8bf7855f24c99aef99cbefc1bcd3cce77881b"}, +] + [[package]] name = "anyio" version = "4.2.0" @@ -3030,6 +3040,21 @@ rsa = ["cryptography (>=3.0.0)"] signals = ["blinker (>=1.4.0)"] signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"] +[[package]] +name = "omegaconf" +version = "2.3.0" +description = "A flexible configuration library" +optional = false +python-versions = ">=3.6" +files = [ + {file = "omegaconf-2.3.0-py3-none-any.whl", hash = "sha256:7b4df175cdb08ba400f45cae3bdcae7ba8365db4d165fc65fd04b050ab63b46b"}, + {file = "omegaconf-2.3.0.tar.gz", hash = "sha256:d5d4b6d29955cc50ad50c46dc269bcd92c6e00f5f90d23ab5fee7bfca4ba4cc7"}, +] + +[package.dependencies] +antlr4-python3-runtime = "==4.9.*" +PyYAML = ">=5.1.0" + [[package]] name = "opencensus" version = "0.11.4" @@ -5637,4 +5662,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "bd0affce210edc8ba611ef24ad330464591c2dff3a61d6ac67bfcb29a0968b32" +content-hash = "0b86d93861c2ea33c1ec7e7799f635da4269146e415d169a738f6c13a3dd3c09" diff --git a/training/pyproject.toml b/training/pyproject.toml index 8313f64..8c9275b 100644 --- a/training/pyproject.toml +++ b/training/pyproject.toml @@ -21,6 +21,7 @@ requests = "2.28.2" # core dependency tinydb = "4.7.1" # core dependency prettytable = "3.6.0" # command line interface coloredlogs = "15.0.1" # command line interface +omegaconf = "2.3.0" # core dependency [tool.poetry.group.catboost.dependencies] catboost = "1.1.1" diff --git a/training/tests/stages/test_search_hp.py b/training/tests/stages/test_search_hp.py index dd96d5f..bfc76f8 100644 --- a/training/tests/stages/test_search_hp.py +++ b/training/tests/stages/test_search_hp.py @@ -15,11 +15,12 @@ ### from xtime.stages.search_hp import ( + _DEFAULT_CONFIGURATION, _get_metrics_for_best_trial, - _init_search_algorithm, + _init_tune_config, + _run, _save_best_trial_info, _save_summary, - _set_run_status, - _set_tags, - search_hp, + create_example_config, + run, ) diff --git a/training/tests/stages/test_train.py b/training/tests/stages/test_train.py new file mode 100644 index 0000000..ebdb72b --- /dev/null +++ b/training/tests/stages/test_train.py @@ -0,0 +1,17 @@ +### +# Copyright (2023) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +from xtime.stages.train import _DEFAULT_CONFIGURATION, create_example_config, run diff --git a/training/tests/test_cli.py b/training/tests/test_cli.py index 5d85d21..138fba1 100644 --- a/training/tests/test_cli.py +++ b/training/tests/test_cli.py @@ -29,7 +29,9 @@ dataset_list, dataset_save, datasets, + experiment_create, experiment_describe, + experiment_run, experiment_search_hp, experiment_train, experiments, @@ -53,6 +55,8 @@ def test_help(self) -> None: experiment_train, experiment_search_hp, experiment_describe, + experiment_run, + experiment_create, datasets, dataset_describe, dataset_save, diff --git a/training/xtime/contrib/mlflow_ext.py b/training/xtime/contrib/mlflow_ext.py index 6f896f6..bc7b5b3 100644 --- a/training/xtime/contrib/mlflow_ext.py +++ b/training/xtime/contrib/mlflow_ext.py @@ -20,40 +20,17 @@ import mlflow from mlflow import MlflowClient -from mlflow.entities import Experiment, Run +from mlflow.entities import Experiment, LifecycleStage, Run from mlflow.store.entities import PagedList from mlflow.utils.file_utils import local_file_uri_to_path +from omegaconf import DictConfig, OmegaConf from xtime import hparams -from xtime.datasets import parse_dataset_name -from xtime.ml import Task -from xtime.run import RunType __all__ = ["MLflow"] class MLflow(object): - @staticmethod - def set_tags( - dataset: t.Optional[str] = None, run_type: t.Optional[RunType] = None, task: t.Optional[Task] = None, **kwargs - ) -> None: - """Helper method that sets tags for active MLflow run. - - Args: - dataset: Dataset name and version in format "name[:version]". - run_type: Type of this MLflow run (`train`, `hpo` etc.). - task: Task being solved in this run. - kwargs: dictionary of additional tags to set. - """ - if dataset: - dataset_name, dataset_version = parse_dataset_name(dataset) - mlflow.set_tags({"dataset_name": dataset_name, "dataset_version": dataset_version}) - if run_type: - mlflow.set_tag("run_type", run_type.value) - if task: - mlflow.set_tag("task", task.type.value) - mlflow.set_tags(kwargs) - @staticmethod def get_experiment_ids(client: t.Optional[MlflowClient] = None) -> t.List[str]: """Return all MLflow experiment IDs. @@ -112,20 +89,36 @@ def get_runs( return runs @staticmethod - def create_experiment(client: t.Optional[MlflowClient] = None) -> None: + def create_experiment(client: t.Optional[MlflowClient] = None, name: t.Optional[str] = None) -> t.Optional[str]: """Create a new MLflow experiment with name specified in `MLFLOW_EXPERIMENT_NAME` environment variable. Args: client: MLflow client to use. If not provided, a default client will be used. + name: Name of the experiment + Returns: + Experiment ID or None if name is none or empty, and MLFLOW_EXPERIMENT_NAME is also not set. """ if client is None: client = MlflowClient() - from mlflow.tracking import _EXPERIMENT_NAME_ENV_VAR + # First, try got use the name possible provided to this function, if not available, try standard MLflow + # environment variable + name = (name or "").strip() + if not name: + from mlflow.tracking import _EXPERIMENT_NAME_ENV_VAR + + name = (os.environ.get(_EXPERIMENT_NAME_ENV_VAR, None) or "").strip() + + if not name: + return None - name = os.environ.get(_EXPERIMENT_NAME_ENV_VAR, None) - if name and client.get_experiment_by_name(name) is None: - mlflow.create_experiment(name) + experiment: t.Optional[Experiment] = client.get_experiment_by_name(name) + if experiment is not None: + if experiment.lifecycle_stage == LifecycleStage.DELETED: + mlflow.MlflowClient().set_experiment_tag() + return experiment.experiment_id + + return mlflow.create_experiment(name) @staticmethod def get_tags_from_env() -> t.Dict: @@ -162,7 +155,9 @@ def get_artifact_path(run: t.Optional[Run] = None, ensure_exists: bool = True) - return local_dir @staticmethod - def init_run(run: t.Optional[Run]) -> None: + def init_run( + run: t.Optional[Run], set_tags_from_env: bool = False, user_tags: t.Optional[DictConfig] = None + ) -> None: """Initialize MLflow run. For now, it does not do a lot of things, mainly ensuring that the artifact directory exists. So, it's a wrapper @@ -170,8 +165,18 @@ def init_run(run: t.Optional[Run]) -> None: Args: run: MLflow run to initialize. If none, currently active run will be used. + set_tags_from_env: If true, read tags from user environment and set them. + user_tags: Additional run tags from a configuration file, command line, etc. """ _ = MLflow.get_artifact_path(run, ensure_exists=True) + if run is not None: + # TODO sergey: think if we need to check for `run` - need to make sure active run is present or + # refactor this implementation to set tags for this particular run using client API. + if set_tags_from_env: + mlflow.set_tags(MLflow.get_tags_from_env()) + if user_tags: + assert isinstance(user_tags, DictConfig), f"Invalid `user_tags` type ({type(user_tags)})." + mlflow.set_tags(OmegaConf.to_container(user_tags, resolve=True)) @staticmethod def log_metrics(metrics: t.Dict[str, t.Any]) -> None: @@ -186,9 +191,18 @@ def log_metrics(metrics: t.Dict[str, t.Any]) -> None: """ # Some metrics produced by Ray Tune we are not interested in. _metrics_to_ignore = { - "timesteps_total", "time_this_iter_s", "timesteps_total", "episodes_total", "training_iteration", - "timestamp", "time_total_s", "pid", "time_since_restore", "timesteps_since_restore", - "iterations_since_restore", "warmup_time" + "timesteps_total", + "time_this_iter_s", + "timesteps_total", + "episodes_total", + "training_iteration", + "timestamp", + "time_total_s", + "pid", + "time_since_restore", + "timesteps_since_restore", + "iterations_since_restore", + "warmup_time", } for name, value in metrics.items(): try: @@ -196,3 +210,18 @@ def log_metrics(metrics: t.Dict[str, t.Any]) -> None: mlflow.log_metric(name, value) except mlflow.MlflowException: continue + + @staticmethod + def set_status_tag_from_trial_counts(num_trials: int, num_failed_trials: int) -> None: + """Set `status` tag given number of trials(sub-experiments) and their fail/success status. + + Args: + num_trials: Total number of trials within current MLflow run. + num_failed_trials: Number of failed trials for current mlflow run. + """ + if num_failed_trials == 0: + mlflow.set_tag("status", "COMPLETED") + elif num_failed_trials == num_trials: + mlflow.set_tag("status", "FAILED") + else: + mlflow.set_tag("status", "PARTIALLY_COMPLETED") diff --git a/training/xtime/contrib/tune_ext.py b/training/xtime/contrib/tune_ext.py index 878cad6..d7da2ff 100644 --- a/training/xtime/contrib/tune_ext.py +++ b/training/xtime/contrib/tune_ext.py @@ -270,7 +270,7 @@ def _check_mode(mode: str) -> str: class YamlEncoder: @staticmethod - def represent(dumper: yaml.representer.BaseRepresenter, rv: RandomVarDomain) -> yaml.nodes.MappingNode: + def represent(dumper: yaml.representer.BaseRepresenter, rv: t.Union[RandomVarDomain]) -> yaml.nodes.MappingNode: """Represent given random variable for yaml dumper.""" sampler: t.Dict = YamlEncoder.sampler_to_dict(rv.sampler) if isinstance(rv, sample.Integer): diff --git a/training/xtime/contrib/utils.py b/training/xtime/contrib/utils.py new file mode 100644 index 0000000..d9d4e7e --- /dev/null +++ b/training/xtime/contrib/utils.py @@ -0,0 +1,54 @@ +### +# Copyright (2023) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +import logging +import typing as t + +__all__ = ["Text", "normalize_str", "log_deprecate_msg_for_run_inputs"] + +from omegaconf import DictConfig, ListConfig, OmegaConf + + +class Text: + """Helper class to manipulate text chunks to create text documents.""" + + def __init__(self, content: t.Optional[str] = None) -> None: + self.content = normalize_str(content) + + def __str__(self) -> str: + return self.content + + @classmethod + def from_chunks(cls, *chunks: t.Optional[str], sep: str = "\n") -> "Text": + return cls(sep.join([c for c in (normalize_str(_c) for _c in chunks) if c])) + + +def normalize_str(s: t.Optional[str]) -> str: + """Normalize input string. + Args: + s: Optional string. + Returns: + Empty string if s is None, else input string with removed leading and trailing whitespaces, + new line and tab characters. + """ + return (s or "").strip() + + +def log_deprecate_msg_for_run_inputs(logger: logging.Logger) -> None: + """This is an internal temporary function that will be removed.""" + logger.warning( + "The `run_inputs.yaml` is deprecated and will not be generated in the future versions. " + "Start using the `experiment.yaml` file instead." + ) diff --git a/training/xtime/estimators/estimator.py b/training/xtime/estimators/estimator.py index 3797d36..6e989a5 100644 --- a/training/xtime/estimators/estimator.py +++ b/training/xtime/estimators/estimator.py @@ -33,7 +33,6 @@ roc_auc_score, ) -from xtime.contrib.mlflow_ext import MLflow from xtime.datasets import Dataset, DatasetMetadata, DatasetSplit from xtime.datasets.dataset import build_dataset from xtime.io import IO, encode @@ -71,11 +70,22 @@ def after_test(self, dataset: Dataset, estimator: "Estimator", metrics: t.Dict) class MLflowCallback(Callback): def __init__(self, hparams: t.Dict, ctx: Context) -> None: + from xtime.datasets.dataset import parse_dataset_name + mlflow.log_params(hparams) - MLflow.set_tags(dataset=ctx.metadata.dataset, run_type=ctx.metadata.run_type, model=ctx.metadata.model) + + dataset_name, dataset_version = parse_dataset_name(ctx.metadata.dataset) + mlflow.set_tags( + { + "dataset_name": dataset_name, + "dataset_version": dataset_version, + "run_type": ctx.metadata.run_type.value, + "model": ctx.metadata.model, + } + ) def before_fit(self, dataset: Dataset, estimator: "Estimator") -> None: - MLflow.set_tags(task=dataset.metadata.task) + mlflow.set_tag("task", dataset.metadata.task.type.value) def after_test(self, dataset: Dataset, estimator: "Estimator", metrics: t.Dict) -> None: mlflow.log_metrics({name: float(metrics[name]) for name in METRICS[dataset.metadata.task.type]}) @@ -144,8 +154,14 @@ def fit(cls, hparams: t.Dict, ctx: Context) -> t.Dict: else: callback = TrainCallback(IO.work_dir(), hparams, ctx) if mlflow.active_run() is not None: + # When running with ray tune, there will be no active run. callback = ContainerCallback([callback, MLflowCallback(hparams, ctx)]) + if isinstance(callback, ContainerCallback): + print("Estimator callbacks =", [c.__class__.__name__ for c in callback.callbacks]) + else: + print("Estimator callbacks =", callback.__class__.__name__) + estimator = cls(hparams, ctx.dataset.metadata) callback.before_fit(ctx.dataset, estimator) diff --git a/training/xtime/hparams/__init__.py b/training/xtime/hparams/__init__.py index 1cf9a3a..3b8f6ec 100644 --- a/training/xtime/hparams/__init__.py +++ b/training/xtime/hparams/__init__.py @@ -21,6 +21,7 @@ HParamsSpec, JsonEncoder, ValueSpec, + default_hparams, from_auto, from_file, from_mlflow, @@ -40,4 +41,5 @@ "from_auto", "from_string", "from_file", + "default_hparams", ] diff --git a/training/xtime/hparams/_hparams.py b/training/xtime/hparams/_hparams.py index d7652aa..24140d0 100644 --- a/training/xtime/hparams/_hparams.py +++ b/training/xtime/hparams/_hparams.py @@ -18,9 +18,11 @@ import json import os import typing as t +from enum import Enum from pathlib import Path import numpy as np +from omegaconf import DictConfig, ListConfig, OmegaConf from ray.tune.search.sample import Domain from ray.tune.search.variant_generator import generate_variants @@ -39,10 +41,11 @@ "from_mlflow", "from_string", "from_file", + "default_hparams", ] -HParamsSource = t.Union[t.Dict, str, t.Iterable[t.Union[str, t.Dict]]] +HParamsSource = t.Union[t.Dict, str, t.Iterable[t.Union[str, t.Dict, DictConfig]]] """Specification options for hyperparameters (HP). - dict: Ready-to-use dictionary of HPs mapping HP names to values. @@ -141,7 +144,10 @@ def get_hparams(source: t.Optional[HParamsSource] = None) -> t.Dict: elif isinstance(source, t.Dict): # It is assumed that source is already a valid hyperparameter dictionary. hp_dict = copy.deepcopy(source) - elif isinstance(source, (t.List, t.Tuple)): + elif isinstance(source, DictConfig): + # It is assumed that source is already a valid hyperparameter dictionary. + hp_dict = OmegaConf.to_container(source, resolve=True) + elif isinstance(source, (t.List, t.Tuple, ListConfig)): # This is a list of multiple sources. hp_dict: t.Dict = {} for one_source in source: @@ -248,6 +254,8 @@ def from_string(params: t.Optional[str] = None) -> t.Dict: if not params: return {} + # TODO sergey: check if params string starts with a misspelled protocol (e.g., hparams:) + # These imports may be required by the `eval` call below. import math # noqa # pylint: disable=unused-import @@ -339,6 +347,26 @@ def from_file(url: PathLike) -> t.Dict: return hp_dict +def default_hparams(**query) -> str: + """Construct a string to retrieve hparams from a default hparams recommender. + Args: + query: search constraints, e.g., `model=xgboost,task=binary_classification`, etc. + Returns: + An hparams spec to query default hparam recommender. + """ + from xtime.contrib.utils import Text + from xtime.ml import Task + + query = copy.deepcopy(query) + for k, v in query.items(): + if isinstance(v, Task): + query[k] = v.type.value + elif isinstance(v, Enum): + query[k] = v.value + + return str(Text.from_chunks("auto", "default", ";".join(f"{k}={v}" for k, v in query.items()), sep=":")) + + def _str_content(str_val: t.Optional[str], scheme: str) -> str: assert str_val is None or isinstance(str_val, str), f"Invalid `str_val` type ({type(str_val)})." str_val = str_val.strip() if isinstance(str_val, str) else "" diff --git a/training/xtime/io.py b/training/xtime/io.py index 140dbd3..c67504b 100644 --- a/training/xtime/io.py +++ b/training/xtime/io.py @@ -28,6 +28,8 @@ __all__ = ["encode", "PathLike", "to_path", "IO"] +from omegaconf import DictConfig, ListConfig, OmegaConf + logger = logging.getLogger(__name__) @@ -61,6 +63,8 @@ def encode(obj: t.Any) -> t.Any: return obj.tolist() if isinstance(obj, np.float64): return float(obj) + if isinstance(obj, (DictConfig, ListConfig)): + return OmegaConf.to_container(obj, resolve=True) return obj @@ -127,8 +131,8 @@ def save_yaml(data: t.Any, file_path: t.Union[str, Path], raise_on_error: bool = try: with open(file_path, "w") as stream: yaml.dump(data, stream, Dumper=yaml.SafeDumper) - except yaml.representer.RepresenterError: - logger.warning("YAML representation error (file_path=%s, data=%s).", file_path, data) + except yaml.representer.RepresenterError as err: + logger.warning("YAML representation error (file_path='%s', data='%s', err='%s').", file_path, data, err) if raise_on_error: raise diff --git a/training/xtime/main.py b/training/xtime/main.py index 7ad8740..ad80833 100644 --- a/training/xtime/main.py +++ b/training/xtime/main.py @@ -19,10 +19,11 @@ import sys import typing as t from multiprocessing import Process +from pathlib import Path import click import coloredlogs -from ray import tune +from omegaconf import DictConfig, OmegaConf from xtime.datasets import ( Dataset, @@ -81,16 +82,34 @@ def _run_search_hp_pipeline( num_validate_trials: int = 0, gpu: bool = False, ) -> None: - from xtime.stages.search_hp import search_hp + from xtime.stages import search_hp """Run an ML pipeline that includes (1) hyperparameter search and (2) analysis how stable hyperparameters are.""" - mlflow_uri: str = search_hp(dataset, model, algorithm, hparams, num_search_trials, gpu) - if num_validate_trials > 0: - validate_hparams = [ - mlflow_uri, # Take the best hyperparameters from this MLFlow run. - {"random_state": tune.randint(0, int(2**32 - 1))}, # And vary random seed to validate these HPs are stable. - ] - search_hp(dataset, model, "random", validate_hparams, num_validate_trials, gpu) + + # fmt: off + config = OmegaConf.create({ + "stage": "search_hp", + + "dataset": dataset, + "model": model, + "hparams": hparams if len(hparams) > 0 else None, + + "tune": { + "tune_config": { + "search_alg": {"_type": algorithm}, + "num_samples": num_search_trials + }, + "trial_resources": { + "gpu": 0 if not gpu else 1 + } + }, + "validation": { + "max_concurrent_trials": 0, + "num_samples": num_validate_trials + } + }) + # fmt: on + search_hp.run(config) @click.group(name="xtime", help="Machine Learning benchmarks for tabular data for XTIME project.") @@ -129,12 +148,15 @@ def experiments() -> None: ... @model_arg @params_option def experiment_train(dataset: str, model: str, params: t.Tuple[str]) -> None: - from xtime.stages.train import train + from xtime.stages import train try: - # When no --params are provided, the `params` will be empty. Setting no None here - # will enable the train function to retrieve default parameters in this case. - train(dataset, model, params if len(params) > 0 else None) + logger.warning("This command is deprecated and will be removed soon. Use `experiment run` instead.") + train.run( + OmegaConf.create( + {"stage": "train", "dataset": dataset, "model": model, "hparams": params if len(params) > 0 else None} + ) + ) except Exception as err: print_err_and_exit(err) @@ -231,6 +253,57 @@ def experiment_describe(report_type: str, run: t.Optional[str] = None, file: t.O print_err_and_exit(err) +@experiments.command( + name="run", + help="Run one of supported ML experiments (train, search_hp, etc.) by providing experiment configuration file.", +) +@click.argument( + "config_file", required=False, metavar="CONFIG_FILE", type=str, default=(Path.cwd() / "experiment.yaml").as_posix() +) +def experiment_run(config_file: str) -> None: + from xtime.stages import search_hp, train + + config: DictConfig = OmegaConf.load(config_file) + stages = {"train": train, "search_hp": search_hp} + if config.stage not in stages: + print(f"Unsupported stage (stage = {config.stage})") + exit(1) + stages[config.stage].run(config) + + +@experiments.command(name="create", help="Create a configuration experiment file for the given type of experiment.") +@click.argument( + "experiment_type", + required=False, + metavar="EXPERIMENT_ID", + type=click.Choice(["train", "search_hp"]), + default="train", +) +@click.option( + "--file", + "-f", + metavar="FILE", + required=False, + type=str, + default="experiment.yaml", + help="Configuration file for an experiment", +) +def experiment_create(experiment_type: str, file: str) -> None: + file_path = Path(file).absolute() + if file_path.exists(): + print(f"Experiment file already exists ({file_path}).") + exit(1) + + from xtime.stages import search_hp, train + + stages = {"train": train, "search_hp": search_hp} + if experiment_type not in stages: + print(f"Unsupported stage (stage = {experiment_type})") + exit(1) + + OmegaConf.save(stages[experiment_type].create_example_config(), file, resolve=False) + + @cli.group("dataset", help="Dataset-related commands (explore available datasets with these commands).") def datasets() -> None: ... diff --git a/training/xtime/stages/search_hp.py b/training/xtime/stages/search_hp.py index d6ce28c..5bbd22b 100644 --- a/training/xtime/stages/search_hp.py +++ b/training/xtime/stages/search_hp.py @@ -16,7 +16,6 @@ import copy import logging -import os import sys import typing as t from pathlib import Path @@ -24,6 +23,7 @@ import mlflow import ray from mlflow import ActiveRun +from omegaconf import DictConfig, OmegaConf from ray import tune from ray.air import Result, RunConfig from ray.tune import ResultGrid, TuneConfig @@ -31,75 +31,119 @@ from ray.tune.search.hyperopt import HyperOptSearch import xtime.contrib.tune_ext as ray_tune_extensions -import xtime.hparams as hp from xtime.contrib.mlflow_ext import MLflow from xtime.contrib.tune_ext import Analysis, RayTuneDriverToMLflowLoggerCallback +from xtime.contrib.utils import Text, log_deprecate_msg_for_run_inputs from xtime.datasets import build_dataset from xtime.estimators import Estimator, get_estimator -from xtime.hparams import HParamsSource, get_hparams +from xtime.hparams import HParamsSource, default_hparams, get_hparams from xtime.io import IO, encode from xtime.ml import METRICS from xtime.run import Context, Metadata, RunType +__all__ = ["run", "create_example_config"] + + logger = logging.getLogger(__name__) -def search_hp( - dataset: str, model: str, algorithm: str, hparams: t.Optional[HParamsSource], num_trials: int, gpu: bool = False -) -> str: - estimator: t.Type[Estimator] = get_estimator(model) +def run(config: DictConfig) -> None: + config = OmegaConf.merge(_DEFAULT_CONFIGURATION, config) + assert config.stage == "search_hp", f"Invalid stage {config.stage}." + validation_config: DictConfig = config.pop("validation") + + # Run main hyperparameter optimization experiment. + run_uri: str = _run(config) + # Run validation (sensitivity analysis) experiment. + if validation_config.num_samples > 0: + validation_config.update({"search_alg": {"_type": "random"}}) + config = OmegaConf.merge( + config, + OmegaConf.create( + { + # Take the best hyperparameters from this MLFlow run. And vary random seed to validate + # these HPs are stable. + "hparams": [run_uri, "params:random_state=tune.randint(0, int(2**32 - 1))"], + "tune": {"tune_config": validation_config}, + } + ), + ) + _: str = _run(config) + + +def _run(config: DictConfig) -> str: + assert config.stage == "search_hp", f"Invalid stage {config.stage}." ray.init() ray_tune_extensions.add_representers() - MLflow.create_experiment() - with mlflow.start_run(description=" ".join(sys.argv)) as active_run: + experiment_id: t.Optional[str] = MLflow.create_experiment(name=config.mlflow.experiment_name) + + description: Text = Text.from_chunks(config.mlflow.description, " ".join(sys.argv)) + with mlflow.start_run(description=str(description), experiment_id=experiment_id) as active_run: # This MLflow run tracks Ray Tune hyperparameter search. Individual trials won't have their own MLflow runs. - MLflow.init_run(active_run) + MLflow.init_run(active_run, set_tags_from_env=True, user_tags=config.mlflow.tags) + + log_deprecate_msg_for_run_inputs(logger) IO.save_yaml( - data={ - "dataset": dataset, - "model": model, - "algorithm": algorithm, - "hparams": hparams, - "num_trials": num_trials, - "gpu": gpu, - }, + data=encode( + { + "dataset": config.dataset, + "model": config.model, + "algorithm": config.tune.tune_config.search_alg.get("_type"), + "hparams": config.hparams, + "num_trials": config.tune.tune_config.num_samples, + "gpu": _gpu_resource(config) > 0, + } + ), file_path=MLflow.get_artifact_path(active_run) / "run_inputs.yaml", raise_on_error=False, ) + OmegaConf.save(config, MLflow.get_artifact_path(active_run) / "experiment.yaml", resolve=False) + artifact_path: Path = MLflow.get_artifact_path(active_run) run_id: str = active_run.info.run_id - ctx = Context(Metadata(dataset=dataset, model=model, run_type=RunType.HPO), dataset=build_dataset(dataset)) + ctx = Context( + Metadata(dataset=config.dataset, model=config.model, run_type=RunType.HPO), + dataset=build_dataset(config.dataset), + ) IO.save_yaml(ctx.dataset.metadata.to_json(), artifact_path / "dataset_info.yaml", raise_on_error=False) - _set_tags( - dataset=dataset, - model=model, - run_type=RunType.HPO, - algorithm=algorithm, - task=ctx.dataset.metadata.task, - framework="tune", + + mlflow.set_tags( + encode( + { + "dataset": config.dataset, + "model": config.model, + "run_type": RunType.HPO.value, + "algorithm": config.tune.tune_config.search_alg.get("_type"), + "task": ctx.dataset.metadata.task.type.value, + "framework": "tune", + } + ) + ) + mlflow.log_params( + encode( + { + "dataset": config.dataset, + "model": config.model, + "num_trials": config.tune.tune_config.num_samples, + "algorithm": config.tune.tune_config.search_alg.get("_type"), + } + ) ) - mlflow.log_params({"dataset": dataset, "model": model, "algorithm": algorithm, "num_trials": num_trials}) - if hparams is None: - hparams = f"auto:default:model={model};task={ctx.dataset.metadata.task.type.value};run_type=hpo" - logger.info(f"Hyperparameters are not provided, using default ones: '%s'.", hparams) + hparams: HParamsSource = config.hparams + if config.hparams is None: + hparams = default_hparams(model=config.model, task=ctx.dataset.metadata.task, run_type=RunType.HPO) + param_space: t.Dict = get_hparams(hparams) logger.info("Hyperparameter search space resolved to: '%s'", param_space) # Set any `tune_config` parameters before calling the `_init_search_algorithm` method. The reason for this is # there maybe duplicate parameters in the `search_alg` instance that will not be set (e.g., # BasicVariantGenerator's max_concurrent parameter). - tune_config = _init_search_algorithm( - # The `max_concurrent_trials` can be overriden in `_init_search_algorithm` - TuneConfig( - metric=METRICS.get_primary_metric(ctx.dataset.metadata.task), - mode="min", - num_samples=num_trials, - max_concurrent_trials=0, - ), - algorithm, + tune_config = _init_tune_config( + config.tune.tune_config, metric=METRICS.get_primary_metric(ctx.dataset.metadata.task), mode="min" ) run_config = RunConfig( name="ray_tune", @@ -108,60 +152,62 @@ def search_hp( callbacks=[RayTuneDriverToMLflowLoggerCallback(tune_config.metric, tune_config.mode)], ) + estimator: t.Type[Estimator] = get_estimator(config.model) objective_fn = tune.with_parameters(estimator.fit, ctx=ctx) - if gpu: - objective_fn = tune.with_resources(objective_fn, {"gpu": 1}) + if config.tune.trial_resources: + objective_fn = tune.with_resources( + objective_fn, OmegaConf.to_container(config.tune.trial_resources, resolve=True) + ) + tuner = tune.Tuner(objective_fn, param_space=param_space, tune_config=tune_config, run_config=run_config) results: ResultGrid = tuner.fit() - _set_run_status(results) + MLflow.set_status_tag_from_trial_counts(len(results), results.num_errors) + best_trial_metrics: t.Dict = _get_metrics_for_best_trial(results, ctx) MLflow.log_metrics(best_trial_metrics) _save_best_trial_info(results, artifact_path, best_trial_metrics, active_run) _save_summary(artifact_path, active_run) - print(f"MLFlow run URI: mlflow:///{active_run.info.run_id}") + print(f"MLFlow run URI: mlflow:///{run_id}") ray.shutdown() return f"mlflow:///{run_id}" -def _init_search_algorithm(tune_config: TuneConfig, algorithm: str) -> TuneConfig: - """ - - Setting `tune_config.max_concurrent_trials` wraps search algorithm with `ConcurrencyLimiter`. - """ - # None and 0 means the same thing - no limit, so set to 0 to simplify subsequent comparison. - if tune_config.max_concurrent_trials is None: - tune_config.max_concurrent_trials = 0 - - if algorithm == "random": - tune_config.search_alg = BasicVariantGenerator(random_state=1, max_concurrent=tune_config.max_concurrent_trials) - elif algorithm == "hyperopt": - tune_config.search_alg = HyperOptSearch( - metric=tune_config.metric, mode=tune_config.mode, n_initial_points=20, random_state_seed=1 - ) - if tune_config.max_concurrent_trials == 0: - logger.info("TuneConfig's max_concurrent_trials is unlimited - setting to 2 for HyperOptSearch algorithm.") - tune_config.max_concurrent_trials = 2 - else: - raise ValueError(f"Unsupported hyperparameter optimization algorithm: {algorithm}") - return tune_config +def _gpu_resource(config: DictConfig) -> float: + trial_resources: t.Optional[DictConfig] = config.tune.tune_config.get("trial_resources", None) + if trial_resources is None: + return 0 + assert isinstance(trial_resources, DictConfig), "Invalid trial resources definition." + gpu: t.Optional[float] = trial_resources.get("gpu", None) + if gpu is None or gpu <= 0: + return 0 + return gpu -def _set_tags(**tags) -> None: - if "MLFLOW_TAGS" in os.environ: - tags = copy.deepcopy(tags) - tags.update(hp.get_hparams(f"params:{os.environ['MLFLOW_TAGS']}")) - MLflow.set_tags(**tags) +def _init_tune_config(cfg: DictConfig, metric: str, mode: str) -> TuneConfig: + cfg = cfg.copy() + search_alg: t.Optional[DictConfig] = cfg.pop("search_alg", None) + # + tune_config = TuneConfig( + **OmegaConf.to_container(OmegaConf.merge(cfg, {"metric": metric, "mode": mode}), resolve=True) + ) + # + if search_alg is None: + search_alg = OmegaConf.create({"_type": "random"}) + search_alg_type: str = search_alg.pop("_type", None) -def _set_run_status(results: ResultGrid) -> None: - num_failed_trials: int = results.num_errors - if num_failed_trials == 0: - mlflow.set_tag("status", "COMPLETED") - elif num_failed_trials == len(results): - mlflow.set_tag("status", "FAILED") + if search_alg_type in {None, "random", "BasicVariantGenerator"}: + search_alg.max_concurrent = tune_config.max_concurrent_trials + tune_config.search_alg = BasicVariantGenerator(**OmegaConf.to_container(search_alg, resolve=True)) + elif search_alg_type == "hyperopt": + search_alg.update(metric=tune_config.metric, mode=tune_config.mode) + tune_config.search_alg = HyperOptSearch(**OmegaConf.to_container(search_alg, resolve=True)) else: - mlflow.set_tag("status", "PARTIALLY_COMPLETED") + raise ValueError(f"Unsupported hyperparameter optimization algorithm: {search_alg_type}.") + + return tune_config def _get_metrics_for_best_trial(results: ResultGrid, ctx: Context) -> t.Dict: @@ -200,3 +246,59 @@ def _save_best_trial_info(results: ResultGrid, local_dir: Path, metrics: t.Dict, def _save_summary(local_dir: Path, active_run: ActiveRun) -> None: IO.save_to_file(Analysis.get_summary(active_run.info.run_id), (local_dir / "summary.yaml").as_posix()) + + +def create_example_config() -> DictConfig: + """Create a template to be used with `experiment run` command. + + Returns: + An example configuration file for the `train` experiment. + """ + # fmt: off + return OmegaConf.merge( + _DEFAULT_CONFIGURATION, + { + "stage": "search_hp", + + "dataset": "churn_modelling:default", + "model": "xgboost", + "hparams": "auto:default:model=xgboost;task=binary_classification;run_type=train", + }, + ) + # fmt: on + + +# fmt: off +_DEFAULT_CONFIGURATION = OmegaConf.create( + { + "stage": "???", # Name of this stage must be `search_hp`. + + "dataset": "???", # Dataset fully-qualified name (e.g., {name}:{version}). + "model": "???", # Model name, e.g., `xgboost`. + "hparams": None, # Hyperparameter specs (see HParamsSource docstring for possible values). + + "tune": { # Configuration for Ray Tune framework. + "tune_config": { # Configuration for TuneConfig instance (__init__ kwargs except `search_alg`). + "search_alg": { # Search algorithm specs - everything here is __init__ kwargs except `_type`. + "_type": "random" # Algorithm name (random / hyperopt.) + }, + "max_concurrent_trials": 0, + "num_samples": 100 + }, + "trial_resources": {} # Trial resources to be used with + }, + + "validation": { # TuneConfig configuration for validation run (sensitivity analysis). + "max_concurrent_trials": 0, + "num_samples": None # If none or 0, will be disabled. + }, + + "mlflow": { # Parameters for MLflow tracking API. + "experiment_name": "${oc.env:MLFLOW_EXPERIMENT_NAME, null}", + "description": "", + "tags": {} + }, + } +) +# fmt: on +"""Default configuration template (`???` means mandatory value, null - None).""" diff --git a/training/xtime/stages/train.py b/training/xtime/stages/train.py index a9dc3bc..6d996d2 100644 --- a/training/xtime/stages/train.py +++ b/training/xtime/stages/train.py @@ -19,43 +19,81 @@ import typing as t import mlflow +from omegaconf import DictConfig, OmegaConf import xtime.contrib.tune_ext as ray_tune_extensions from xtime.contrib.mlflow_ext import MLflow +from xtime.contrib.utils import Text, log_deprecate_msg_for_run_inputs from xtime.datasets import build_dataset from xtime.estimators import Estimator, get_estimator -from xtime.hparams import HParamsSource, get_hparams -from xtime.io import IO +from xtime.hparams import HParamsSource, default_hparams, get_hparams +from xtime.io import IO, encode from xtime.run import Context, Metadata, RunType logger = logging.getLogger(__name__) +__all__ = ["run", "create_example_config"] -def train(dataset: str, model: str, hparams: t.Optional[HParamsSource]) -> None: - """Train a model for a given problem using default, HP-optimized or pre-defined parameters. - Enable GPUs by setting CUDA_VISIBLE_DEVICES variable (e.g., CUDA_VISIBLE_DEVICES=0 python -m xtime ...). - """ +def run(config: DictConfig) -> None: + config = OmegaConf.merge(_DEFAULT_CONFIGURATION, config) + assert config.stage == "train", f"Invalid stage {config.stage}." + ray_tune_extensions.add_representers() - MLflow.create_experiment() - with mlflow.start_run(description=" ".join(sys.argv)) as active_run: - # This MLflow run tracks model training. - MLflow.init_run(active_run) + experiment_id: t.Optional[str] = MLflow.create_experiment(name=config.mlflow.experiment_name) + + description: Text = Text.from_chunks(config.mlflow.description, " ".join(sys.argv)) + with mlflow.start_run(description=str(description), experiment_id=experiment_id) as active_run: + MLflow.init_run(active_run, set_tags_from_env=True, user_tags=config.mlflow.tags) + + log_deprecate_msg_for_run_inputs(logger) IO.save_yaml( - data={"dataset": dataset, "model": model, "hparams": hparams}, + data=encode({"dataset": config.dataset, "model": config.model, "hparams": config.hparams}), file_path=MLflow.get_artifact_path(active_run) / "run_inputs.yaml", raise_on_error=False, ) + OmegaConf.save(config, MLflow.get_artifact_path(active_run) / "experiment.yaml", resolve=False) + context = Context( - metadata=Metadata(dataset=dataset, model=model, run_type=RunType.TRAIN), dataset=build_dataset(dataset) + metadata=Metadata(dataset=config.dataset, model=config.model, run_type=RunType.TRAIN), + dataset=build_dataset(config.dataset), ) - if hparams is None: - hparams = f"auto:default:model={model};task={context.dataset.metadata.task.type.value};run_type=train" - logger.info(f"Hyperparameters are not provided, using default ones: '%s'.", hparams) + hparams: HParamsSource = config.hparams + if config.hparams is None: + hparams = default_hparams(model=config.model, task=context.dataset.metadata.task, run_type=RunType.TRAIN) hp_dict: t.Dict = get_hparams(hparams) logger.info("Hyperparameters resolved to: '%s'", hp_dict) - estimator: t.Type[Estimator] = get_estimator(model) + estimator: t.Type[Estimator] = get_estimator(config.model) _ = estimator.fit(hp_dict, context) print(f"MLflowRun uri=mlflow:///{active_run.info.run_id}") + + +def create_example_config() -> DictConfig: + """Create a template to be used with `experiment run` command. + + Returns: + An example configuration file for the `train` experiment. + """ + return OmegaConf.merge( + _DEFAULT_CONFIGURATION, + { + "stage": "train", + "dataset": "churn_modelling:default", + "model": "xgboost", + "hparams": "auto:default:model=xgboost;task=binary_classification;run_type=train", + }, + ) + + +_DEFAULT_CONFIGURATION = OmegaConf.create( + { + "stage": "???", + "dataset": "???", + "model": "???", + "hparams": None, + "mlflow": {"experiment_name": "${oc.env:MLFLOW_EXPERIMENT_NAME, null}", "description": "", "tags": {}}, + } +) +"""Default configuration template (`???` means mandatory value, null - None)."""