Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New experiment run command. #37

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion training/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions training/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ requests = "2.28.2" # core dependency
tinydb = "4.7.1" # core dependency
prettytable = "3.6.0" # command line interface
coloredlogs = "15.0.1" # command line interface
omegaconf = "2.3.0" # core dependency

[tool.poetry.group.catboost.dependencies]
catboost = "1.1.1"
Expand Down
9 changes: 5 additions & 4 deletions training/tests/stages/test_search_hp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
###

from xtime.stages.search_hp import (
_DEFAULT_CONFIGURATION,
_get_metrics_for_best_trial,
_init_search_algorithm,
_init_tune_config,
_run,
_save_best_trial_info,
_save_summary,
_set_run_status,
_set_tags,
search_hp,
create_example_config,
run,
)
17 changes: 17 additions & 0 deletions training/tests/stages/test_train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
###
# Copyright (2023) Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###

from xtime.stages.train import _DEFAULT_CONFIGURATION, create_example_config, run
4 changes: 4 additions & 0 deletions training/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
dataset_list,
dataset_save,
datasets,
experiment_create,
experiment_describe,
experiment_run,
experiment_search_hp,
experiment_train,
experiments,
Expand All @@ -53,6 +55,8 @@ def test_help(self) -> None:
experiment_train,
experiment_search_hp,
experiment_describe,
experiment_run,
experiment_create,
datasets,
dataset_describe,
dataset_save,
Expand Down
97 changes: 63 additions & 34 deletions training/xtime/contrib/mlflow_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,17 @@

import mlflow
from mlflow import MlflowClient
from mlflow.entities import Experiment, Run
from mlflow.entities import Experiment, LifecycleStage, Run
from mlflow.store.entities import PagedList
from mlflow.utils.file_utils import local_file_uri_to_path
from omegaconf import DictConfig, OmegaConf

from xtime import hparams
from xtime.datasets import parse_dataset_name
from xtime.ml import Task
from xtime.run import RunType

__all__ = ["MLflow"]


class MLflow(object):
@staticmethod
def set_tags(
dataset: t.Optional[str] = None, run_type: t.Optional[RunType] = None, task: t.Optional[Task] = None, **kwargs
) -> None:
"""Helper method that sets tags for active MLflow run.

Args:
dataset: Dataset name and version in format "name[:version]".
run_type: Type of this MLflow run (`train`, `hpo` etc.).
task: Task being solved in this run.
kwargs: dictionary of additional tags to set.
"""
if dataset:
dataset_name, dataset_version = parse_dataset_name(dataset)
mlflow.set_tags({"dataset_name": dataset_name, "dataset_version": dataset_version})
if run_type:
mlflow.set_tag("run_type", run_type.value)
if task:
mlflow.set_tag("task", task.type.value)
mlflow.set_tags(kwargs)

@staticmethod
def get_experiment_ids(client: t.Optional[MlflowClient] = None) -> t.List[str]:
"""Return all MLflow experiment IDs.
Expand Down Expand Up @@ -112,20 +89,36 @@ def get_runs(
return runs

@staticmethod
def create_experiment(client: t.Optional[MlflowClient] = None) -> None:
def create_experiment(client: t.Optional[MlflowClient] = None, name: t.Optional[str] = None) -> t.Optional[str]:
"""Create a new MLflow experiment with name specified in `MLFLOW_EXPERIMENT_NAME` environment variable.

Args:
client: MLflow client to use. If not provided, a default client will be used.
name: Name of the experiment
Returns:
Experiment ID or None if name is none or empty, and MLFLOW_EXPERIMENT_NAME is also not set.
"""
if client is None:
client = MlflowClient()

from mlflow.tracking import _EXPERIMENT_NAME_ENV_VAR
# First, try got use the name possible provided to this function, if not available, try standard MLflow
# environment variable
name = (name or "").strip()
if not name:
from mlflow.tracking import _EXPERIMENT_NAME_ENV_VAR

name = (os.environ.get(_EXPERIMENT_NAME_ENV_VAR, None) or "").strip()

if not name:
return None

name = os.environ.get(_EXPERIMENT_NAME_ENV_VAR, None)
if name and client.get_experiment_by_name(name) is None:
mlflow.create_experiment(name)
experiment: t.Optional[Experiment] = client.get_experiment_by_name(name)
if experiment is not None:
if experiment.lifecycle_stage == LifecycleStage.DELETED:
mlflow.MlflowClient().set_experiment_tag()
return experiment.experiment_id

return mlflow.create_experiment(name)

@staticmethod
def get_tags_from_env() -> t.Dict:
Expand Down Expand Up @@ -162,16 +155,28 @@ def get_artifact_path(run: t.Optional[Run] = None, ensure_exists: bool = True) -
return local_dir

@staticmethod
def init_run(run: t.Optional[Run]) -> None:
def init_run(
run: t.Optional[Run], set_tags_from_env: bool = False, user_tags: t.Optional[DictConfig] = None
) -> None:
"""Initialize MLflow run.

For now, it does not do a lot of things, mainly ensuring that the artifact directory exists. So, it's a wrapper
over MLflow.get_artifact_path method to better communicate the usage scenarios of this method.

Args:
run: MLflow run to initialize. If none, currently active run will be used.
set_tags_from_env: If true, read tags from user environment and set them.
user_tags: Additional run tags from a configuration file, command line, etc.
"""
_ = MLflow.get_artifact_path(run, ensure_exists=True)
if run is not None:
# TODO sergey: think if we need to check for `run` - need to make sure active run is present or
# refactor this implementation to set tags for this particular run using client API.
if set_tags_from_env:
mlflow.set_tags(MLflow.get_tags_from_env())
if user_tags:
assert isinstance(user_tags, DictConfig), f"Invalid `user_tags` type ({type(user_tags)})."
mlflow.set_tags(OmegaConf.to_container(user_tags, resolve=True))

@staticmethod
def log_metrics(metrics: t.Dict[str, t.Any]) -> None:
Expand All @@ -186,13 +191,37 @@ def log_metrics(metrics: t.Dict[str, t.Any]) -> None:
"""
# Some metrics produced by Ray Tune we are not interested in.
_metrics_to_ignore = {
"timesteps_total", "time_this_iter_s", "timesteps_total", "episodes_total", "training_iteration",
"timestamp", "time_total_s", "pid", "time_since_restore", "timesteps_since_restore",
"iterations_since_restore", "warmup_time"
"timesteps_total",
"time_this_iter_s",
"timesteps_total",
"episodes_total",
"training_iteration",
"timestamp",
"time_total_s",
"pid",
"time_since_restore",
"timesteps_since_restore",
"iterations_since_restore",
"warmup_time",
}
for name, value in metrics.items():
try:
if isinstance(value, (int, float)) and name not in _metrics_to_ignore:
mlflow.log_metric(name, value)
except mlflow.MlflowException:
continue

@staticmethod
def set_status_tag_from_trial_counts(num_trials: int, num_failed_trials: int) -> None:
"""Set `status` tag given number of trials(sub-experiments) and their fail/success status.

Args:
num_trials: Total number of trials within current MLflow run.
num_failed_trials: Number of failed trials for current mlflow run.
"""
if num_failed_trials == 0:
mlflow.set_tag("status", "COMPLETED")
elif num_failed_trials == num_trials:
mlflow.set_tag("status", "FAILED")
else:
mlflow.set_tag("status", "PARTIALLY_COMPLETED")
2 changes: 1 addition & 1 deletion training/xtime/contrib/tune_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def _check_mode(mode: str) -> str:

class YamlEncoder:
@staticmethod
def represent(dumper: yaml.representer.BaseRepresenter, rv: RandomVarDomain) -> yaml.nodes.MappingNode:
def represent(dumper: yaml.representer.BaseRepresenter, rv: t.Union[RandomVarDomain]) -> yaml.nodes.MappingNode:
"""Represent given random variable for yaml dumper."""
sampler: t.Dict = YamlEncoder.sampler_to_dict(rv.sampler)
if isinstance(rv, sample.Integer):
Expand Down
54 changes: 54 additions & 0 deletions training/xtime/contrib/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
###
# Copyright (2023) Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
import logging
import typing as t

__all__ = ["Text", "normalize_str", "log_deprecate_msg_for_run_inputs"]

from omegaconf import DictConfig, ListConfig, OmegaConf


class Text:
"""Helper class to manipulate text chunks to create text documents."""

def __init__(self, content: t.Optional[str] = None) -> None:
self.content = normalize_str(content)

def __str__(self) -> str:
return self.content

@classmethod
def from_chunks(cls, *chunks: t.Optional[str], sep: str = "\n") -> "Text":
return cls(sep.join([c for c in (normalize_str(_c) for _c in chunks) if c]))


def normalize_str(s: t.Optional[str]) -> str:
"""Normalize input string.
Args:
s: Optional string.
Returns:
Empty string if s is None, else input string with removed leading and trailing whitespaces,
new line and tab characters.
"""
return (s or "").strip()


def log_deprecate_msg_for_run_inputs(logger: logging.Logger) -> None:
"""This is an internal temporary function that will be removed."""
logger.warning(
"The `run_inputs.yaml` is deprecated and will not be generated in the future versions. "
"Start using the `experiment.yaml` file instead."
)
22 changes: 19 additions & 3 deletions training/xtime/estimators/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
roc_auc_score,
)

from xtime.contrib.mlflow_ext import MLflow
from xtime.datasets import Dataset, DatasetMetadata, DatasetSplit
from xtime.datasets.dataset import build_dataset
from xtime.io import IO, encode
Expand Down Expand Up @@ -71,11 +70,22 @@ def after_test(self, dataset: Dataset, estimator: "Estimator", metrics: t.Dict)

class MLflowCallback(Callback):
def __init__(self, hparams: t.Dict, ctx: Context) -> None:
from xtime.datasets.dataset import parse_dataset_name

mlflow.log_params(hparams)
MLflow.set_tags(dataset=ctx.metadata.dataset, run_type=ctx.metadata.run_type, model=ctx.metadata.model)

dataset_name, dataset_version = parse_dataset_name(ctx.metadata.dataset)
mlflow.set_tags(
{
"dataset_name": dataset_name,
"dataset_version": dataset_version,
"run_type": ctx.metadata.run_type.value,
"model": ctx.metadata.model,
}
)

def before_fit(self, dataset: Dataset, estimator: "Estimator") -> None:
MLflow.set_tags(task=dataset.metadata.task)
mlflow.set_tag("task", dataset.metadata.task.type.value)

def after_test(self, dataset: Dataset, estimator: "Estimator", metrics: t.Dict) -> None:
mlflow.log_metrics({name: float(metrics[name]) for name in METRICS[dataset.metadata.task.type]})
Expand Down Expand Up @@ -144,8 +154,14 @@ def fit(cls, hparams: t.Dict, ctx: Context) -> t.Dict:
else:
callback = TrainCallback(IO.work_dir(), hparams, ctx)
if mlflow.active_run() is not None:
# When running with ray tune, there will be no active run.
callback = ContainerCallback([callback, MLflowCallback(hparams, ctx)])

if isinstance(callback, ContainerCallback):
print("Estimator callbacks =", [c.__class__.__name__ for c in callback.callbacks])
else:
print("Estimator callbacks =", callback.__class__.__name__)

estimator = cls(hparams, ctx.dataset.metadata)

callback.before_fit(ctx.dataset, estimator)
Expand Down
2 changes: 2 additions & 0 deletions training/xtime/hparams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
HParamsSpec,
JsonEncoder,
ValueSpec,
default_hparams,
from_auto,
from_file,
from_mlflow,
Expand All @@ -40,4 +41,5 @@
"from_auto",
"from_string",
"from_file",
"default_hparams",
]
Loading