diff --git a/training/poetry.lock b/training/poetry.lock index ae5432b..a011f2d 100644 --- a/training/poetry.lock +++ b/training/poetry.lock @@ -154,6 +154,16 @@ files = [ {file = "ansicon-1.89.0.tar.gz", hash = "sha256:e4d039def5768a47e4afec8e89e83ec3ae5a26bf00ad851f914d1240b444d2b1"}, ] +[[package]] +name = "antlr4-python3-runtime" +version = "4.9.3" +description = "ANTLR 4.9.3 runtime for Python 3.7" +optional = false +python-versions = "*" +files = [ + {file = "antlr4-python3-runtime-4.9.3.tar.gz", hash = "sha256:f224469b4168294902bb1efa80a8bf7855f24c99aef99cbefc1bcd3cce77881b"}, +] + [[package]] name = "anyio" version = "4.2.0" @@ -3030,6 +3040,21 @@ rsa = ["cryptography (>=3.0.0)"] signals = ["blinker (>=1.4.0)"] signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"] +[[package]] +name = "omegaconf" +version = "2.3.0" +description = "A flexible configuration library" +optional = false +python-versions = ">=3.6" +files = [ + {file = "omegaconf-2.3.0-py3-none-any.whl", hash = "sha256:7b4df175cdb08ba400f45cae3bdcae7ba8365db4d165fc65fd04b050ab63b46b"}, + {file = "omegaconf-2.3.0.tar.gz", hash = "sha256:d5d4b6d29955cc50ad50c46dc269bcd92c6e00f5f90d23ab5fee7bfca4ba4cc7"}, +] + +[package.dependencies] +antlr4-python3-runtime = "==4.9.*" +PyYAML = ">=5.1.0" + [[package]] name = "opencensus" version = "0.11.4" @@ -5637,4 +5662,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "bd0affce210edc8ba611ef24ad330464591c2dff3a61d6ac67bfcb29a0968b32" +content-hash = "0b86d93861c2ea33c1ec7e7799f635da4269146e415d169a738f6c13a3dd3c09" diff --git a/training/pyproject.toml b/training/pyproject.toml index 8313f64..8c9275b 100644 --- a/training/pyproject.toml +++ b/training/pyproject.toml @@ -21,6 +21,7 @@ requests = "2.28.2" # core dependency tinydb = "4.7.1" # core dependency prettytable = "3.6.0" # command line interface coloredlogs = "15.0.1" # command line interface +omegaconf = "2.3.0" # core dependency [tool.poetry.group.catboost.dependencies] catboost = "1.1.1" diff --git a/training/tests/stages/test_search_hp.py b/training/tests/stages/test_search_hp.py index dd96d5f..bfc76f8 100644 --- a/training/tests/stages/test_search_hp.py +++ b/training/tests/stages/test_search_hp.py @@ -15,11 +15,12 @@ ### from xtime.stages.search_hp import ( + _DEFAULT_CONFIGURATION, _get_metrics_for_best_trial, - _init_search_algorithm, + _init_tune_config, + _run, _save_best_trial_info, _save_summary, - _set_run_status, - _set_tags, - search_hp, + create_example_config, + run, ) diff --git a/training/tests/stages/test_train.py b/training/tests/stages/test_train.py new file mode 100644 index 0000000..ebdb72b --- /dev/null +++ b/training/tests/stages/test_train.py @@ -0,0 +1,17 @@ +### +# Copyright (2023) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +from xtime.stages.train import _DEFAULT_CONFIGURATION, create_example_config, run diff --git a/training/tests/test_cli.py b/training/tests/test_cli.py index 5d85d21..138fba1 100644 --- a/training/tests/test_cli.py +++ b/training/tests/test_cli.py @@ -29,7 +29,9 @@ dataset_list, dataset_save, datasets, + experiment_create, experiment_describe, + experiment_run, experiment_search_hp, experiment_train, experiments, @@ -53,6 +55,8 @@ def test_help(self) -> None: experiment_train, experiment_search_hp, experiment_describe, + experiment_run, + experiment_create, datasets, dataset_describe, dataset_save, diff --git a/training/xtime/contrib/mlflow_ext.py b/training/xtime/contrib/mlflow_ext.py index 6f896f6..bc7b5b3 100644 --- a/training/xtime/contrib/mlflow_ext.py +++ b/training/xtime/contrib/mlflow_ext.py @@ -20,40 +20,17 @@ import mlflow from mlflow import MlflowClient -from mlflow.entities import Experiment, Run +from mlflow.entities import Experiment, LifecycleStage, Run from mlflow.store.entities import PagedList from mlflow.utils.file_utils import local_file_uri_to_path +from omegaconf import DictConfig, OmegaConf from xtime import hparams -from xtime.datasets import parse_dataset_name -from xtime.ml import Task -from xtime.run import RunType __all__ = ["MLflow"] class MLflow(object): - @staticmethod - def set_tags( - dataset: t.Optional[str] = None, run_type: t.Optional[RunType] = None, task: t.Optional[Task] = None, **kwargs - ) -> None: - """Helper method that sets tags for active MLflow run. - - Args: - dataset: Dataset name and version in format "name[:version]". - run_type: Type of this MLflow run (`train`, `hpo` etc.). - task: Task being solved in this run. - kwargs: dictionary of additional tags to set. - """ - if dataset: - dataset_name, dataset_version = parse_dataset_name(dataset) - mlflow.set_tags({"dataset_name": dataset_name, "dataset_version": dataset_version}) - if run_type: - mlflow.set_tag("run_type", run_type.value) - if task: - mlflow.set_tag("task", task.type.value) - mlflow.set_tags(kwargs) - @staticmethod def get_experiment_ids(client: t.Optional[MlflowClient] = None) -> t.List[str]: """Return all MLflow experiment IDs. @@ -112,20 +89,36 @@ def get_runs( return runs @staticmethod - def create_experiment(client: t.Optional[MlflowClient] = None) -> None: + def create_experiment(client: t.Optional[MlflowClient] = None, name: t.Optional[str] = None) -> t.Optional[str]: """Create a new MLflow experiment with name specified in `MLFLOW_EXPERIMENT_NAME` environment variable. Args: client: MLflow client to use. If not provided, a default client will be used. + name: Name of the experiment + Returns: + Experiment ID or None if name is none or empty, and MLFLOW_EXPERIMENT_NAME is also not set. """ if client is None: client = MlflowClient() - from mlflow.tracking import _EXPERIMENT_NAME_ENV_VAR + # First, try got use the name possible provided to this function, if not available, try standard MLflow + # environment variable + name = (name or "").strip() + if not name: + from mlflow.tracking import _EXPERIMENT_NAME_ENV_VAR + + name = (os.environ.get(_EXPERIMENT_NAME_ENV_VAR, None) or "").strip() + + if not name: + return None - name = os.environ.get(_EXPERIMENT_NAME_ENV_VAR, None) - if name and client.get_experiment_by_name(name) is None: - mlflow.create_experiment(name) + experiment: t.Optional[Experiment] = client.get_experiment_by_name(name) + if experiment is not None: + if experiment.lifecycle_stage == LifecycleStage.DELETED: + mlflow.MlflowClient().set_experiment_tag() + return experiment.experiment_id + + return mlflow.create_experiment(name) @staticmethod def get_tags_from_env() -> t.Dict: @@ -162,7 +155,9 @@ def get_artifact_path(run: t.Optional[Run] = None, ensure_exists: bool = True) - return local_dir @staticmethod - def init_run(run: t.Optional[Run]) -> None: + def init_run( + run: t.Optional[Run], set_tags_from_env: bool = False, user_tags: t.Optional[DictConfig] = None + ) -> None: """Initialize MLflow run. For now, it does not do a lot of things, mainly ensuring that the artifact directory exists. So, it's a wrapper @@ -170,8 +165,18 @@ def init_run(run: t.Optional[Run]) -> None: Args: run: MLflow run to initialize. If none, currently active run will be used. + set_tags_from_env: If true, read tags from user environment and set them. + user_tags: Additional run tags from a configuration file, command line, etc. """ _ = MLflow.get_artifact_path(run, ensure_exists=True) + if run is not None: + # TODO sergey: think if we need to check for `run` - need to make sure active run is present or + # refactor this implementation to set tags for this particular run using client API. + if set_tags_from_env: + mlflow.set_tags(MLflow.get_tags_from_env()) + if user_tags: + assert isinstance(user_tags, DictConfig), f"Invalid `user_tags` type ({type(user_tags)})." + mlflow.set_tags(OmegaConf.to_container(user_tags, resolve=True)) @staticmethod def log_metrics(metrics: t.Dict[str, t.Any]) -> None: @@ -186,9 +191,18 @@ def log_metrics(metrics: t.Dict[str, t.Any]) -> None: """ # Some metrics produced by Ray Tune we are not interested in. _metrics_to_ignore = { - "timesteps_total", "time_this_iter_s", "timesteps_total", "episodes_total", "training_iteration", - "timestamp", "time_total_s", "pid", "time_since_restore", "timesteps_since_restore", - "iterations_since_restore", "warmup_time" + "timesteps_total", + "time_this_iter_s", + "timesteps_total", + "episodes_total", + "training_iteration", + "timestamp", + "time_total_s", + "pid", + "time_since_restore", + "timesteps_since_restore", + "iterations_since_restore", + "warmup_time", } for name, value in metrics.items(): try: @@ -196,3 +210,18 @@ def log_metrics(metrics: t.Dict[str, t.Any]) -> None: mlflow.log_metric(name, value) except mlflow.MlflowException: continue + + @staticmethod + def set_status_tag_from_trial_counts(num_trials: int, num_failed_trials: int) -> None: + """Set `status` tag given number of trials(sub-experiments) and their fail/success status. + + Args: + num_trials: Total number of trials within current MLflow run. + num_failed_trials: Number of failed trials for current mlflow run. + """ + if num_failed_trials == 0: + mlflow.set_tag("status", "COMPLETED") + elif num_failed_trials == num_trials: + mlflow.set_tag("status", "FAILED") + else: + mlflow.set_tag("status", "PARTIALLY_COMPLETED") diff --git a/training/xtime/contrib/tune_ext.py b/training/xtime/contrib/tune_ext.py index 878cad6..d7da2ff 100644 --- a/training/xtime/contrib/tune_ext.py +++ b/training/xtime/contrib/tune_ext.py @@ -270,7 +270,7 @@ def _check_mode(mode: str) -> str: class YamlEncoder: @staticmethod - def represent(dumper: yaml.representer.BaseRepresenter, rv: RandomVarDomain) -> yaml.nodes.MappingNode: + def represent(dumper: yaml.representer.BaseRepresenter, rv: t.Union[RandomVarDomain]) -> yaml.nodes.MappingNode: """Represent given random variable for yaml dumper.""" sampler: t.Dict = YamlEncoder.sampler_to_dict(rv.sampler) if isinstance(rv, sample.Integer): diff --git a/training/xtime/contrib/utils.py b/training/xtime/contrib/utils.py new file mode 100644 index 0000000..d9d4e7e --- /dev/null +++ b/training/xtime/contrib/utils.py @@ -0,0 +1,54 @@ +### +# Copyright (2023) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +import logging +import typing as t + +__all__ = ["Text", "normalize_str", "log_deprecate_msg_for_run_inputs"] + +from omegaconf import DictConfig, ListConfig, OmegaConf + + +class Text: + """Helper class to manipulate text chunks to create text documents.""" + + def __init__(self, content: t.Optional[str] = None) -> None: + self.content = normalize_str(content) + + def __str__(self) -> str: + return self.content + + @classmethod + def from_chunks(cls, *chunks: t.Optional[str], sep: str = "\n") -> "Text": + return cls(sep.join([c for c in (normalize_str(_c) for _c in chunks) if c])) + + +def normalize_str(s: t.Optional[str]) -> str: + """Normalize input string. + Args: + s: Optional string. + Returns: + Empty string if s is None, else input string with removed leading and trailing whitespaces, + new line and tab characters. + """ + return (s or "").strip() + + +def log_deprecate_msg_for_run_inputs(logger: logging.Logger) -> None: + """This is an internal temporary function that will be removed.""" + logger.warning( + "The `run_inputs.yaml` is deprecated and will not be generated in the future versions. " + "Start using the `experiment.yaml` file instead." + ) diff --git a/training/xtime/estimators/estimator.py b/training/xtime/estimators/estimator.py index 3797d36..6e989a5 100644 --- a/training/xtime/estimators/estimator.py +++ b/training/xtime/estimators/estimator.py @@ -33,7 +33,6 @@ roc_auc_score, ) -from xtime.contrib.mlflow_ext import MLflow from xtime.datasets import Dataset, DatasetMetadata, DatasetSplit from xtime.datasets.dataset import build_dataset from xtime.io import IO, encode @@ -71,11 +70,22 @@ def after_test(self, dataset: Dataset, estimator: "Estimator", metrics: t.Dict) class MLflowCallback(Callback): def __init__(self, hparams: t.Dict, ctx: Context) -> None: + from xtime.datasets.dataset import parse_dataset_name + mlflow.log_params(hparams) - MLflow.set_tags(dataset=ctx.metadata.dataset, run_type=ctx.metadata.run_type, model=ctx.metadata.model) + + dataset_name, dataset_version = parse_dataset_name(ctx.metadata.dataset) + mlflow.set_tags( + { + "dataset_name": dataset_name, + "dataset_version": dataset_version, + "run_type": ctx.metadata.run_type.value, + "model": ctx.metadata.model, + } + ) def before_fit(self, dataset: Dataset, estimator: "Estimator") -> None: - MLflow.set_tags(task=dataset.metadata.task) + mlflow.set_tag("task", dataset.metadata.task.type.value) def after_test(self, dataset: Dataset, estimator: "Estimator", metrics: t.Dict) -> None: mlflow.log_metrics({name: float(metrics[name]) for name in METRICS[dataset.metadata.task.type]}) @@ -144,8 +154,14 @@ def fit(cls, hparams: t.Dict, ctx: Context) -> t.Dict: else: callback = TrainCallback(IO.work_dir(), hparams, ctx) if mlflow.active_run() is not None: + # When running with ray tune, there will be no active run. callback = ContainerCallback([callback, MLflowCallback(hparams, ctx)]) + if isinstance(callback, ContainerCallback): + print("Estimator callbacks =", [c.__class__.__name__ for c in callback.callbacks]) + else: + print("Estimator callbacks =", callback.__class__.__name__) + estimator = cls(hparams, ctx.dataset.metadata) callback.before_fit(ctx.dataset, estimator) diff --git a/training/xtime/hparams/__init__.py b/training/xtime/hparams/__init__.py index 1cf9a3a..3b8f6ec 100644 --- a/training/xtime/hparams/__init__.py +++ b/training/xtime/hparams/__init__.py @@ -21,6 +21,7 @@ HParamsSpec, JsonEncoder, ValueSpec, + default_hparams, from_auto, from_file, from_mlflow, @@ -40,4 +41,5 @@ "from_auto", "from_string", "from_file", + "default_hparams", ] diff --git a/training/xtime/hparams/_hparams.py b/training/xtime/hparams/_hparams.py index d7652aa..24140d0 100644 --- a/training/xtime/hparams/_hparams.py +++ b/training/xtime/hparams/_hparams.py @@ -18,9 +18,11 @@ import json import os import typing as t +from enum import Enum from pathlib import Path import numpy as np +from omegaconf import DictConfig, ListConfig, OmegaConf from ray.tune.search.sample import Domain from ray.tune.search.variant_generator import generate_variants @@ -39,10 +41,11 @@ "from_mlflow", "from_string", "from_file", + "default_hparams", ] -HParamsSource = t.Union[t.Dict, str, t.Iterable[t.Union[str, t.Dict]]] +HParamsSource = t.Union[t.Dict, str, t.Iterable[t.Union[str, t.Dict, DictConfig]]] """Specification options for hyperparameters (HP). - dict: Ready-to-use dictionary of HPs mapping HP names to values. @@ -141,7 +144,10 @@ def get_hparams(source: t.Optional[HParamsSource] = None) -> t.Dict: elif isinstance(source, t.Dict): # It is assumed that source is already a valid hyperparameter dictionary. hp_dict = copy.deepcopy(source) - elif isinstance(source, (t.List, t.Tuple)): + elif isinstance(source, DictConfig): + # It is assumed that source is already a valid hyperparameter dictionary. + hp_dict = OmegaConf.to_container(source, resolve=True) + elif isinstance(source, (t.List, t.Tuple, ListConfig)): # This is a list of multiple sources. hp_dict: t.Dict = {} for one_source in source: @@ -248,6 +254,8 @@ def from_string(params: t.Optional[str] = None) -> t.Dict: if not params: return {} + # TODO sergey: check if params string starts with a misspelled protocol (e.g., hparams:) + # These imports may be required by the `eval` call below. import math # noqa # pylint: disable=unused-import @@ -339,6 +347,26 @@ def from_file(url: PathLike) -> t.Dict: return hp_dict +def default_hparams(**query) -> str: + """Construct a string to retrieve hparams from a default hparams recommender. + Args: + query: search constraints, e.g., `model=xgboost,task=binary_classification`, etc. + Returns: + An hparams spec to query default hparam recommender. + """ + from xtime.contrib.utils import Text + from xtime.ml import Task + + query = copy.deepcopy(query) + for k, v in query.items(): + if isinstance(v, Task): + query[k] = v.type.value + elif isinstance(v, Enum): + query[k] = v.value + + return str(Text.from_chunks("auto", "default", ";".join(f"{k}={v}" for k, v in query.items()), sep=":")) + + def _str_content(str_val: t.Optional[str], scheme: str) -> str: assert str_val is None or isinstance(str_val, str), f"Invalid `str_val` type ({type(str_val)})." str_val = str_val.strip() if isinstance(str_val, str) else "" diff --git a/training/xtime/io.py b/training/xtime/io.py index 140dbd3..c67504b 100644 --- a/training/xtime/io.py +++ b/training/xtime/io.py @@ -28,6 +28,8 @@ __all__ = ["encode", "PathLike", "to_path", "IO"] +from omegaconf import DictConfig, ListConfig, OmegaConf + logger = logging.getLogger(__name__) @@ -61,6 +63,8 @@ def encode(obj: t.Any) -> t.Any: return obj.tolist() if isinstance(obj, np.float64): return float(obj) + if isinstance(obj, (DictConfig, ListConfig)): + return OmegaConf.to_container(obj, resolve=True) return obj @@ -127,8 +131,8 @@ def save_yaml(data: t.Any, file_path: t.Union[str, Path], raise_on_error: bool = try: with open(file_path, "w") as stream: yaml.dump(data, stream, Dumper=yaml.SafeDumper) - except yaml.representer.RepresenterError: - logger.warning("YAML representation error (file_path=%s, data=%s).", file_path, data) + except yaml.representer.RepresenterError as err: + logger.warning("YAML representation error (file_path='%s', data='%s', err='%s').", file_path, data, err) if raise_on_error: raise diff --git a/training/xtime/main.py b/training/xtime/main.py index 7ad8740..ad80833 100644 --- a/training/xtime/main.py +++ b/training/xtime/main.py @@ -19,10 +19,11 @@ import sys import typing as t from multiprocessing import Process +from pathlib import Path import click import coloredlogs -from ray import tune +from omegaconf import DictConfig, OmegaConf from xtime.datasets import ( Dataset, @@ -81,16 +82,34 @@ def _run_search_hp_pipeline( num_validate_trials: int = 0, gpu: bool = False, ) -> None: - from xtime.stages.search_hp import search_hp + from xtime.stages import search_hp """Run an ML pipeline that includes (1) hyperparameter search and (2) analysis how stable hyperparameters are.""" - mlflow_uri: str = search_hp(dataset, model, algorithm, hparams, num_search_trials, gpu) - if num_validate_trials > 0: - validate_hparams = [ - mlflow_uri, # Take the best hyperparameters from this MLFlow run. - {"random_state": tune.randint(0, int(2**32 - 1))}, # And vary random seed to validate these HPs are stable. - ] - search_hp(dataset, model, "random", validate_hparams, num_validate_trials, gpu) + + # fmt: off + config = OmegaConf.create({ + "stage": "search_hp", + + "dataset": dataset, + "model": model, + "hparams": hparams if len(hparams) > 0 else None, + + "tune": { + "tune_config": { + "search_alg": {"_type": algorithm}, + "num_samples": num_search_trials + }, + "trial_resources": { + "gpu": 0 if not gpu else 1 + } + }, + "validation": { + "max_concurrent_trials": 0, + "num_samples": num_validate_trials + } + }) + # fmt: on + search_hp.run(config) @click.group(name="xtime", help="Machine Learning benchmarks for tabular data for XTIME project.") @@ -129,12 +148,15 @@ def experiments() -> None: ... @model_arg @params_option def experiment_train(dataset: str, model: str, params: t.Tuple[str]) -> None: - from xtime.stages.train import train + from xtime.stages import train try: - # When no --params are provided, the `params` will be empty. Setting no None here - # will enable the train function to retrieve default parameters in this case. - train(dataset, model, params if len(params) > 0 else None) + logger.warning("This command is deprecated and will be removed soon. Use `experiment run` instead.") + train.run( + OmegaConf.create( + {"stage": "train", "dataset": dataset, "model": model, "hparams": params if len(params) > 0 else None} + ) + ) except Exception as err: print_err_and_exit(err) @@ -231,6 +253,57 @@ def experiment_describe(report_type: str, run: t.Optional[str] = None, file: t.O print_err_and_exit(err) +@experiments.command( + name="run", + help="Run one of supported ML experiments (train, search_hp, etc.) by providing experiment configuration file.", +) +@click.argument( + "config_file", required=False, metavar="CONFIG_FILE", type=str, default=(Path.cwd() / "experiment.yaml").as_posix() +) +def experiment_run(config_file: str) -> None: + from xtime.stages import search_hp, train + + config: DictConfig = OmegaConf.load(config_file) + stages = {"train": train, "search_hp": search_hp} + if config.stage not in stages: + print(f"Unsupported stage (stage = {config.stage})") + exit(1) + stages[config.stage].run(config) + + +@experiments.command(name="create", help="Create a configuration experiment file for the given type of experiment.") +@click.argument( + "experiment_type", + required=False, + metavar="EXPERIMENT_ID", + type=click.Choice(["train", "search_hp"]), + default="train", +) +@click.option( + "--file", + "-f", + metavar="FILE", + required=False, + type=str, + default="experiment.yaml", + help="Configuration file for an experiment", +) +def experiment_create(experiment_type: str, file: str) -> None: + file_path = Path(file).absolute() + if file_path.exists(): + print(f"Experiment file already exists ({file_path}).") + exit(1) + + from xtime.stages import search_hp, train + + stages = {"train": train, "search_hp": search_hp} + if experiment_type not in stages: + print(f"Unsupported stage (stage = {experiment_type})") + exit(1) + + OmegaConf.save(stages[experiment_type].create_example_config(), file, resolve=False) + + @cli.group("dataset", help="Dataset-related commands (explore available datasets with these commands).") def datasets() -> None: ... diff --git a/training/xtime/stages/search_hp.py b/training/xtime/stages/search_hp.py index d6ce28c..5bbd22b 100644 --- a/training/xtime/stages/search_hp.py +++ b/training/xtime/stages/search_hp.py @@ -16,7 +16,6 @@ import copy import logging -import os import sys import typing as t from pathlib import Path @@ -24,6 +23,7 @@ import mlflow import ray from mlflow import ActiveRun +from omegaconf import DictConfig, OmegaConf from ray import tune from ray.air import Result, RunConfig from ray.tune import ResultGrid, TuneConfig @@ -31,75 +31,119 @@ from ray.tune.search.hyperopt import HyperOptSearch import xtime.contrib.tune_ext as ray_tune_extensions -import xtime.hparams as hp from xtime.contrib.mlflow_ext import MLflow from xtime.contrib.tune_ext import Analysis, RayTuneDriverToMLflowLoggerCallback +from xtime.contrib.utils import Text, log_deprecate_msg_for_run_inputs from xtime.datasets import build_dataset from xtime.estimators import Estimator, get_estimator -from xtime.hparams import HParamsSource, get_hparams +from xtime.hparams import HParamsSource, default_hparams, get_hparams from xtime.io import IO, encode from xtime.ml import METRICS from xtime.run import Context, Metadata, RunType +__all__ = ["run", "create_example_config"] + + logger = logging.getLogger(__name__) -def search_hp( - dataset: str, model: str, algorithm: str, hparams: t.Optional[HParamsSource], num_trials: int, gpu: bool = False -) -> str: - estimator: t.Type[Estimator] = get_estimator(model) +def run(config: DictConfig) -> None: + config = OmegaConf.merge(_DEFAULT_CONFIGURATION, config) + assert config.stage == "search_hp", f"Invalid stage {config.stage}." + validation_config: DictConfig = config.pop("validation") + + # Run main hyperparameter optimization experiment. + run_uri: str = _run(config) + # Run validation (sensitivity analysis) experiment. + if validation_config.num_samples > 0: + validation_config.update({"search_alg": {"_type": "random"}}) + config = OmegaConf.merge( + config, + OmegaConf.create( + { + # Take the best hyperparameters from this MLFlow run. And vary random seed to validate + # these HPs are stable. + "hparams": [run_uri, "params:random_state=tune.randint(0, int(2**32 - 1))"], + "tune": {"tune_config": validation_config}, + } + ), + ) + _: str = _run(config) + + +def _run(config: DictConfig) -> str: + assert config.stage == "search_hp", f"Invalid stage {config.stage}." ray.init() ray_tune_extensions.add_representers() - MLflow.create_experiment() - with mlflow.start_run(description=" ".join(sys.argv)) as active_run: + experiment_id: t.Optional[str] = MLflow.create_experiment(name=config.mlflow.experiment_name) + + description: Text = Text.from_chunks(config.mlflow.description, " ".join(sys.argv)) + with mlflow.start_run(description=str(description), experiment_id=experiment_id) as active_run: # This MLflow run tracks Ray Tune hyperparameter search. Individual trials won't have their own MLflow runs. - MLflow.init_run(active_run) + MLflow.init_run(active_run, set_tags_from_env=True, user_tags=config.mlflow.tags) + + log_deprecate_msg_for_run_inputs(logger) IO.save_yaml( - data={ - "dataset": dataset, - "model": model, - "algorithm": algorithm, - "hparams": hparams, - "num_trials": num_trials, - "gpu": gpu, - }, + data=encode( + { + "dataset": config.dataset, + "model": config.model, + "algorithm": config.tune.tune_config.search_alg.get("_type"), + "hparams": config.hparams, + "num_trials": config.tune.tune_config.num_samples, + "gpu": _gpu_resource(config) > 0, + } + ), file_path=MLflow.get_artifact_path(active_run) / "run_inputs.yaml", raise_on_error=False, ) + OmegaConf.save(config, MLflow.get_artifact_path(active_run) / "experiment.yaml", resolve=False) + artifact_path: Path = MLflow.get_artifact_path(active_run) run_id: str = active_run.info.run_id - ctx = Context(Metadata(dataset=dataset, model=model, run_type=RunType.HPO), dataset=build_dataset(dataset)) + ctx = Context( + Metadata(dataset=config.dataset, model=config.model, run_type=RunType.HPO), + dataset=build_dataset(config.dataset), + ) IO.save_yaml(ctx.dataset.metadata.to_json(), artifact_path / "dataset_info.yaml", raise_on_error=False) - _set_tags( - dataset=dataset, - model=model, - run_type=RunType.HPO, - algorithm=algorithm, - task=ctx.dataset.metadata.task, - framework="tune", + + mlflow.set_tags( + encode( + { + "dataset": config.dataset, + "model": config.model, + "run_type": RunType.HPO.value, + "algorithm": config.tune.tune_config.search_alg.get("_type"), + "task": ctx.dataset.metadata.task.type.value, + "framework": "tune", + } + ) + ) + mlflow.log_params( + encode( + { + "dataset": config.dataset, + "model": config.model, + "num_trials": config.tune.tune_config.num_samples, + "algorithm": config.tune.tune_config.search_alg.get("_type"), + } + ) ) - mlflow.log_params({"dataset": dataset, "model": model, "algorithm": algorithm, "num_trials": num_trials}) - if hparams is None: - hparams = f"auto:default:model={model};task={ctx.dataset.metadata.task.type.value};run_type=hpo" - logger.info(f"Hyperparameters are not provided, using default ones: '%s'.", hparams) + hparams: HParamsSource = config.hparams + if config.hparams is None: + hparams = default_hparams(model=config.model, task=ctx.dataset.metadata.task, run_type=RunType.HPO) + param_space: t.Dict = get_hparams(hparams) logger.info("Hyperparameter search space resolved to: '%s'", param_space) # Set any `tune_config` parameters before calling the `_init_search_algorithm` method. The reason for this is # there maybe duplicate parameters in the `search_alg` instance that will not be set (e.g., # BasicVariantGenerator's max_concurrent parameter). - tune_config = _init_search_algorithm( - # The `max_concurrent_trials` can be overriden in `_init_search_algorithm` - TuneConfig( - metric=METRICS.get_primary_metric(ctx.dataset.metadata.task), - mode="min", - num_samples=num_trials, - max_concurrent_trials=0, - ), - algorithm, + tune_config = _init_tune_config( + config.tune.tune_config, metric=METRICS.get_primary_metric(ctx.dataset.metadata.task), mode="min" ) run_config = RunConfig( name="ray_tune", @@ -108,60 +152,62 @@ def search_hp( callbacks=[RayTuneDriverToMLflowLoggerCallback(tune_config.metric, tune_config.mode)], ) + estimator: t.Type[Estimator] = get_estimator(config.model) objective_fn = tune.with_parameters(estimator.fit, ctx=ctx) - if gpu: - objective_fn = tune.with_resources(objective_fn, {"gpu": 1}) + if config.tune.trial_resources: + objective_fn = tune.with_resources( + objective_fn, OmegaConf.to_container(config.tune.trial_resources, resolve=True) + ) + tuner = tune.Tuner(objective_fn, param_space=param_space, tune_config=tune_config, run_config=run_config) results: ResultGrid = tuner.fit() - _set_run_status(results) + MLflow.set_status_tag_from_trial_counts(len(results), results.num_errors) + best_trial_metrics: t.Dict = _get_metrics_for_best_trial(results, ctx) MLflow.log_metrics(best_trial_metrics) _save_best_trial_info(results, artifact_path, best_trial_metrics, active_run) _save_summary(artifact_path, active_run) - print(f"MLFlow run URI: mlflow:///{active_run.info.run_id}") + print(f"MLFlow run URI: mlflow:///{run_id}") ray.shutdown() return f"mlflow:///{run_id}" -def _init_search_algorithm(tune_config: TuneConfig, algorithm: str) -> TuneConfig: - """ - - Setting `tune_config.max_concurrent_trials` wraps search algorithm with `ConcurrencyLimiter`. - """ - # None and 0 means the same thing - no limit, so set to 0 to simplify subsequent comparison. - if tune_config.max_concurrent_trials is None: - tune_config.max_concurrent_trials = 0 - - if algorithm == "random": - tune_config.search_alg = BasicVariantGenerator(random_state=1, max_concurrent=tune_config.max_concurrent_trials) - elif algorithm == "hyperopt": - tune_config.search_alg = HyperOptSearch( - metric=tune_config.metric, mode=tune_config.mode, n_initial_points=20, random_state_seed=1 - ) - if tune_config.max_concurrent_trials == 0: - logger.info("TuneConfig's max_concurrent_trials is unlimited - setting to 2 for HyperOptSearch algorithm.") - tune_config.max_concurrent_trials = 2 - else: - raise ValueError(f"Unsupported hyperparameter optimization algorithm: {algorithm}") - return tune_config +def _gpu_resource(config: DictConfig) -> float: + trial_resources: t.Optional[DictConfig] = config.tune.tune_config.get("trial_resources", None) + if trial_resources is None: + return 0 + assert isinstance(trial_resources, DictConfig), "Invalid trial resources definition." + gpu: t.Optional[float] = trial_resources.get("gpu", None) + if gpu is None or gpu <= 0: + return 0 + return gpu -def _set_tags(**tags) -> None: - if "MLFLOW_TAGS" in os.environ: - tags = copy.deepcopy(tags) - tags.update(hp.get_hparams(f"params:{os.environ['MLFLOW_TAGS']}")) - MLflow.set_tags(**tags) +def _init_tune_config(cfg: DictConfig, metric: str, mode: str) -> TuneConfig: + cfg = cfg.copy() + search_alg: t.Optional[DictConfig] = cfg.pop("search_alg", None) + # + tune_config = TuneConfig( + **OmegaConf.to_container(OmegaConf.merge(cfg, {"metric": metric, "mode": mode}), resolve=True) + ) + # + if search_alg is None: + search_alg = OmegaConf.create({"_type": "random"}) + search_alg_type: str = search_alg.pop("_type", None) -def _set_run_status(results: ResultGrid) -> None: - num_failed_trials: int = results.num_errors - if num_failed_trials == 0: - mlflow.set_tag("status", "COMPLETED") - elif num_failed_trials == len(results): - mlflow.set_tag("status", "FAILED") + if search_alg_type in {None, "random", "BasicVariantGenerator"}: + search_alg.max_concurrent = tune_config.max_concurrent_trials + tune_config.search_alg = BasicVariantGenerator(**OmegaConf.to_container(search_alg, resolve=True)) + elif search_alg_type == "hyperopt": + search_alg.update(metric=tune_config.metric, mode=tune_config.mode) + tune_config.search_alg = HyperOptSearch(**OmegaConf.to_container(search_alg, resolve=True)) else: - mlflow.set_tag("status", "PARTIALLY_COMPLETED") + raise ValueError(f"Unsupported hyperparameter optimization algorithm: {search_alg_type}.") + + return tune_config def _get_metrics_for_best_trial(results: ResultGrid, ctx: Context) -> t.Dict: @@ -200,3 +246,59 @@ def _save_best_trial_info(results: ResultGrid, local_dir: Path, metrics: t.Dict, def _save_summary(local_dir: Path, active_run: ActiveRun) -> None: IO.save_to_file(Analysis.get_summary(active_run.info.run_id), (local_dir / "summary.yaml").as_posix()) + + +def create_example_config() -> DictConfig: + """Create a template to be used with `experiment run` command. + + Returns: + An example configuration file for the `train` experiment. + """ + # fmt: off + return OmegaConf.merge( + _DEFAULT_CONFIGURATION, + { + "stage": "search_hp", + + "dataset": "churn_modelling:default", + "model": "xgboost", + "hparams": "auto:default:model=xgboost;task=binary_classification;run_type=train", + }, + ) + # fmt: on + + +# fmt: off +_DEFAULT_CONFIGURATION = OmegaConf.create( + { + "stage": "???", # Name of this stage must be `search_hp`. + + "dataset": "???", # Dataset fully-qualified name (e.g., {name}:{version}). + "model": "???", # Model name, e.g., `xgboost`. + "hparams": None, # Hyperparameter specs (see HParamsSource docstring for possible values). + + "tune": { # Configuration for Ray Tune framework. + "tune_config": { # Configuration for TuneConfig instance (__init__ kwargs except `search_alg`). + "search_alg": { # Search algorithm specs - everything here is __init__ kwargs except `_type`. + "_type": "random" # Algorithm name (random / hyperopt.) + }, + "max_concurrent_trials": 0, + "num_samples": 100 + }, + "trial_resources": {} # Trial resources to be used with + }, + + "validation": { # TuneConfig configuration for validation run (sensitivity analysis). + "max_concurrent_trials": 0, + "num_samples": None # If none or 0, will be disabled. + }, + + "mlflow": { # Parameters for MLflow tracking API. + "experiment_name": "${oc.env:MLFLOW_EXPERIMENT_NAME, null}", + "description": "", + "tags": {} + }, + } +) +# fmt: on +"""Default configuration template (`???` means mandatory value, null - None).""" diff --git a/training/xtime/stages/train.py b/training/xtime/stages/train.py index a9dc3bc..6d996d2 100644 --- a/training/xtime/stages/train.py +++ b/training/xtime/stages/train.py @@ -19,43 +19,81 @@ import typing as t import mlflow +from omegaconf import DictConfig, OmegaConf import xtime.contrib.tune_ext as ray_tune_extensions from xtime.contrib.mlflow_ext import MLflow +from xtime.contrib.utils import Text, log_deprecate_msg_for_run_inputs from xtime.datasets import build_dataset from xtime.estimators import Estimator, get_estimator -from xtime.hparams import HParamsSource, get_hparams -from xtime.io import IO +from xtime.hparams import HParamsSource, default_hparams, get_hparams +from xtime.io import IO, encode from xtime.run import Context, Metadata, RunType logger = logging.getLogger(__name__) +__all__ = ["run", "create_example_config"] -def train(dataset: str, model: str, hparams: t.Optional[HParamsSource]) -> None: - """Train a model for a given problem using default, HP-optimized or pre-defined parameters. - Enable GPUs by setting CUDA_VISIBLE_DEVICES variable (e.g., CUDA_VISIBLE_DEVICES=0 python -m xtime ...). - """ +def run(config: DictConfig) -> None: + config = OmegaConf.merge(_DEFAULT_CONFIGURATION, config) + assert config.stage == "train", f"Invalid stage {config.stage}." + ray_tune_extensions.add_representers() - MLflow.create_experiment() - with mlflow.start_run(description=" ".join(sys.argv)) as active_run: - # This MLflow run tracks model training. - MLflow.init_run(active_run) + experiment_id: t.Optional[str] = MLflow.create_experiment(name=config.mlflow.experiment_name) + + description: Text = Text.from_chunks(config.mlflow.description, " ".join(sys.argv)) + with mlflow.start_run(description=str(description), experiment_id=experiment_id) as active_run: + MLflow.init_run(active_run, set_tags_from_env=True, user_tags=config.mlflow.tags) + + log_deprecate_msg_for_run_inputs(logger) IO.save_yaml( - data={"dataset": dataset, "model": model, "hparams": hparams}, + data=encode({"dataset": config.dataset, "model": config.model, "hparams": config.hparams}), file_path=MLflow.get_artifact_path(active_run) / "run_inputs.yaml", raise_on_error=False, ) + OmegaConf.save(config, MLflow.get_artifact_path(active_run) / "experiment.yaml", resolve=False) + context = Context( - metadata=Metadata(dataset=dataset, model=model, run_type=RunType.TRAIN), dataset=build_dataset(dataset) + metadata=Metadata(dataset=config.dataset, model=config.model, run_type=RunType.TRAIN), + dataset=build_dataset(config.dataset), ) - if hparams is None: - hparams = f"auto:default:model={model};task={context.dataset.metadata.task.type.value};run_type=train" - logger.info(f"Hyperparameters are not provided, using default ones: '%s'.", hparams) + hparams: HParamsSource = config.hparams + if config.hparams is None: + hparams = default_hparams(model=config.model, task=context.dataset.metadata.task, run_type=RunType.TRAIN) hp_dict: t.Dict = get_hparams(hparams) logger.info("Hyperparameters resolved to: '%s'", hp_dict) - estimator: t.Type[Estimator] = get_estimator(model) + estimator: t.Type[Estimator] = get_estimator(config.model) _ = estimator.fit(hp_dict, context) print(f"MLflowRun uri=mlflow:///{active_run.info.run_id}") + + +def create_example_config() -> DictConfig: + """Create a template to be used with `experiment run` command. + + Returns: + An example configuration file for the `train` experiment. + """ + return OmegaConf.merge( + _DEFAULT_CONFIGURATION, + { + "stage": "train", + "dataset": "churn_modelling:default", + "model": "xgboost", + "hparams": "auto:default:model=xgboost;task=binary_classification;run_type=train", + }, + ) + + +_DEFAULT_CONFIGURATION = OmegaConf.create( + { + "stage": "???", + "dataset": "???", + "model": "???", + "hparams": None, + "mlflow": {"experiment_name": "${oc.env:MLFLOW_EXPERIMENT_NAME, null}", "description": "", "tags": {}}, + } +) +"""Default configuration template (`???` means mandatory value, null - None)."""