diff --git a/mlflow/sklearn/__init__.py b/mlflow/sklearn/__init__.py index 94660dec2c2d64..8e7b411f0e447f 100644 --- a/mlflow/sklearn/__init__.py +++ b/mlflow/sklearn/__init__.py @@ -539,15 +539,9 @@ def autolog(): - A fitted estimator (logged by :py:func:`mlflow.sklearn.log_model()`). **How does autologging work for meta estimators?** - When a meta estimator (e.g. `Pipeline`_, `GridSearchCV`_) calls ``fit()``, it internally calls - ``fit()`` on its child estimators. Autologging does NOT perform logging on these constituent - ``fit()`` calls. - - **Parameter search** - In addition to recording the information discussed above, autologging for parameter - search meta estimators (`GridSearchCV`_ and `RandomizedSearchCV`_) records child runs - with metrics for each set of explored parameters, as well as artifacts and parameters - for the best model (if available). + When a meta estimator (e.g. `Pipeline`_, `GridSearchCV`_) calls ``fit``, it internally calls + ``fit`` on its child estimators. Autologging does NOT perform logging on these constituent + ``fit``. **Supported estimators** All estimators obtained by `sklearn.utils.all_estimators`_ (including meta estimators). @@ -561,9 +555,6 @@ def autolog(): .. _GridSearchCV: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html - .. _RandomizedSearchCV: - https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.RandomizedSearchCV.html - **Example** .. code-block:: python @@ -663,55 +654,7 @@ def fit_mlflow(self, func_name, *args, **kwargs): raise e - _log_posttraining_metadata(self, *args, **kwargs) - - if should_start_run: - try_mlflow_log(mlflow.end_run) - - return fit_output - - def _log_pretraining_metadata(estimator, *args, **kwargs): - """ - Records metadata (e.g., params and tags) for a scikit-learn estimator prior to training. - This is intended to be invoked within a patched scikit-learn training routine - (e.g., `fit()`, `fit_transform()`, ...) and assumes the existence of an active - MLflow run that can be referenced via the fluent Tracking API. - - :param estimator: The scikit-learn estimator for which to log metadata. - :param args: The arguments passed to the scikit-learn training routine (e.g., - `fit()`, `fit_transform()`, ...). - :param kwargs: The keyword arguments passed to the scikit-learn training routine. - """ - # Deep parameter logging includes parameters from children of a given - # estimator. For some meta estimators (e.g., pipelines), recording - # these parameters is desirable. For parameter search estimators, - # however, child estimators act as seeds for the parameter search - # process; accordingly, we avoid logging initial, untuned parameters - # for these seed estimators. - should_log_params_deeply = not _is_parameter_search_estimator(estimator) - # Chunk model parameters to avoid hitting the log_batch API limit - for chunk in _chunk_dict( - estimator.get_params(deep=should_log_params_deeply), - chunk_size=MAX_PARAMS_TAGS_PER_BATCH, - ): - truncated = _truncate_dict(chunk, MAX_ENTITY_KEY_LENGTH, MAX_PARAM_VAL_LENGTH) - try_mlflow_log(mlflow.log_params, truncated) - - try_mlflow_log(mlflow.set_tags, _get_estimator_info_tags(estimator)) - - def _log_posttraining_metadata(estimator, *args, **kwargs): - """ - Records metadata for a scikit-learn estimator after training has completed. - This is intended to be invoked within a patched scikit-learn training routine - (e.g., `fit()`, `fit_transform()`, ...) and assumes the existence of an active - MLflow run that can be referenced via the fluent Tracking API. - - :param estimator: The scikit-learn estimator for which to log metadata. - :param args: The arguments passed to the scikit-learn training routine (e.g., - `fit()`, `fit_transform()`, ...). - :param kwargs: The keyword arguments passed to the scikit-learn training routine. - """ - if hasattr(estimator, "score"): + if hasattr(self, "score"): try: score_args = _get_args_for_score(self.score, self.fit, args, kwargs) training_score = self.score(*score_args) @@ -727,45 +670,10 @@ def _log_posttraining_metadata(estimator, *args, **kwargs): try_mlflow_log(log_model, self, artifact_path="model") - if _is_parameter_search_estimator(estimator): - if hasattr(estimator, "best_estimator_"): - try_mlflow_log(log_model, estimator.best_estimator_, artifact_path="best_estimator") - - if hasattr(estimator, "best_params_"): - best_params = { - f"best_{param_name}": param_value - for param_name, param_value in estimator.best_params_.items() - } - try_mlflow_log(mlflow.log_params, best_params) - - if hasattr(estimator, "cv_results_"): - try: - # Fetch environment-specific tags (e.g., user and source) to ensure that lineage - # information is consistent with the parent run - environment_tags = context_registry.resolve_tags() - _create_child_runs_for_parameter_search( - cv_estimator=estimator, - parent_run=mlflow.active_run(), - child_tags=environment_tags, - ) - except Exception as e: - msg = ( - "Encountered exception during creation of child runs for parameter search." - " Child runs may be missing. Exception: {}".format(str(e)) - ) - _logger.warning(msg) - - try: - cv_results_df = pd.DataFrame.from_dict(estimator.cv_results_) - _log_parameter_search_results_as_artifact( - cv_results_df, mlflow.active_run().info.run_id - ) - except Exception as e: - msg = ( - "Failed to log parameter search results as an artifact." - " Exception: {}".format(str(e)) - ) - _logger.warning(msg) + if should_start_run: + try_mlflow_log(mlflow.end_run) + + return fit_output def patched_fit(self, func_name, *args, **kwargs): """ @@ -785,21 +693,8 @@ def f(self, *args, **kwargs): return f - from sklearn.model_selection import GridSearchCV, RandomizedSearchCV - patch_settings = gorilla.Settings(allow_hit=True, store_hit=True) - try: - from sklearn.utils import all_estimators - except ImportError: - all_estimators = _all_estimators - - _, estimators_to_patch = zip(*all_estimators()) - # Ensure that relevant meta estimators (e.g. GridSearchCV, Pipeline) are selected - # for patching if they are not already included in the output of `all_estimators()` - estimators_to_patch = set(estimators_to_patch).union( - set(_get_meta_estimators_for_autologging()) - ) - for class_def in estimators_to_patch: + for _, class_def in _all_estimators(): for func_name in ["fit", "fit_transform", "fit_predict"]: if hasattr(class_def, func_name): original = getattr(class_def, func_name) @@ -828,118 +723,3 @@ def f(self, *args, **kwargs): patch_func = functools.wraps(original)(patch_func) patch = gorilla.Patch(class_def, func_name, patch_func, settings=patch_settings) gorilla.apply(patch) - - def fit_predict_cv(self, *args, **kwargs): - return patched_fit_cv(self, 'fit_predict', *args, **kwargs) - - def fit_transform_cv(self, *args, **kwargs): - return patched_fit_cv(self, 'fit_transform', *args, **kwargs) - - def fit_cv(self, *args, **kwargs): - return patched_fit_cv(self, 'fit', *args, **kwargs) - - def patched_fit_cv(self, fn_name, *args, **kwargs): - """ - To be applied to a sklearn model class that defines a `fit` - method and inherits from `BaseEstimator` (thereby defining - the `get_params()` method) - """ - with _SklearnTrainingSession(allow_children=False, clazz=self.__class__) as t: - if t.should_log(): - return fit_mlflow_cv(self, fn_name, *args, **kwargs) - else: - original_fit = gorilla.get_original_attribute(self, fn_name) - return original_fit(*args, **kwargs) - - def fit_mlflow_cv(self, fn_name, *args, **kwargs): - try_mlflow_log(mlflow.start_run, nested=True) - # Perform shallow parameter logging for hyperparameter search APIs (e.g., GridSearchCV - # and RandomizedSearchCV) to avoid logging superfluous parameters from the seed - # `estimator` constructor argument; we will log the set of optimal estimator - # parameters, if available, once training completes - try_mlflow_log(mlflow.log_params, self.get_params(deep=False)) - try_mlflow_log(mlflow.set_tag, "estimator_name", self.__class__.__name__) - try_mlflow_log(mlflow.set_tag, "estimator_class", self.__class__) - - original_fit = gorilla.get_original_attribute(self, fn_name) - fit_output = original_fit(*args, **kwargs) - - if hasattr(self, 'score'): - try: - training_score = self.score(args[0], args[1]) - try_mlflow_log(mlflow.log_metric, "training_score", training_score) - except Exception as e: - print("Failed to collect scoring metrics!") - print(e) - - try_mlflow_log(log_model, self, artifact_path='model') - if hasattr(self, 'best_estimator_'): - try_mlflow_log(log_model, self.best_estimator_, artifact_path="best_estimator") - - if hasattr(self, 'best_params_'): - best_params = { - f"best_{param_name}": param_value - for param_name, param_value in self.best_params_.items() - } - try_mlflow_log(mlflow.log_params, best_params) - - _create_child_cv_runs(cv_estimator=self) - - try_mlflow_log(mlflow.end_run) - - def _create_child_cv_runs(cv_estimator): - from mlflow.tracking.client import MlflowClient - from mlflow.entities import Metric, RunTag, Param - import pandas as pd - import time - from numbers import Number - - client = MlflowClient() - metrics_timestamp = int(time.time() * 1000) - - cv_results_df = pd.DataFrame.from_dict(cv_estimator.cv_results_) - for _, result_row in cv_results_df.iterrows(): - with mlflow.start_run(nested=True): - params = [ - Param(str(key), str(value)) for key, value in result_row.get("params", {}).items() - ] - metrics = { - Metric( - key=key, - value=value, - timestamp=metrics_timestamp, - step=0, - ) - for key, value in result_row.iteritems() - # Parameters values are recorded twice in the set of search `cv_results`: - # once within a `params` column with dictionary values and once within - # a separate dataframe column that is created for each parameter. To prevent - # duplication of parameters, we log the consolidated values from the parameter - # dictionary column and filter out the other parameter-specific columns with - # names of the form `param_{param_name}`. - if not key.startswith("param") and isinstance(value, Number) - } - tags = [ - RunTag(key, value) for key, value in { - "estimator_name": str(cv_estimator.estimator.__class__.__name__), - "estimator_class": str(cv_estimator.estimator.__class__), - }.items() - ] - - client.log_batch( - run_id=mlflow.active_run().info.run_id, - params=params, - metrics=metrics, - tags=tags, - ) - - for class_def in [GridSearchCV, RandomizedSearchCV]: - if hasattr(class_def, 'fit'): - patch = gorilla.Patch(class_def, 'fit', fit_cv, settings=patch_settings) - gorilla.apply(patch) - if hasattr(class_def, 'fit_transform'): - patch = gorilla.Patch(class_def, 'fit_transform_cv', fit_transform_cv, settings=patch_settings) - gorilla.apply(patch) - if hasattr(class_def, 'fit_predict'): - patch = gorilla.Patch(class_def, 'fit_predict_cv', fit_predict_cv, settings=patch_settings) - gorilla.apply(patch) diff --git a/mlflow/sklearn/utils.py b/mlflow/sklearn/utils.py index 81f16271dc3a85..3fbe3411006d0a 100644 --- a/mlflow/sklearn/utils.py +++ b/mlflow/sklearn/utils.py @@ -12,17 +12,6 @@ _SAMPLE_WEIGHT = "sample_weight" -def _get_estimator_info_tags(estimator): - """ - :return: A dictionary of MLflow run tag keys and values - describing the specified estimator. - """ - return { - "estimator_name": estimator.__class__.__name__, - "estimator_class": (estimator.__class__.__module__ + "." + estimator.__class__.__name__), - } - - def _get_Xy(args, kwargs, X_var_name, y_var_name): # corresponds to: model.fit(X, y) if len(args) >= 2: @@ -97,9 +86,6 @@ def _chunk_dict(d, chunk_size): def _truncate_dict(d, max_key_length=None, max_value_length=None): - def _truncate_and_ellipsize(value, max_length): - return str(value)[: (max_length - 3)] + "..." - key_is_none = max_key_length is None val_is_none = max_value_length is None @@ -126,161 +112,6 @@ def _truncate_and_ellipsize(value, max_length): return truncated -def _get_meta_estimators_for_autologging(): - """ - :return: A list of meta estimator class definitions - (e.g., `sklearn.model_selection.GridSearchCV`) that should be included - when patching training functions for autologging - """ - from sklearn.model_selection import GridSearchCV, RandomizedSearchCV - from sklearn.pipeline import Pipeline - - return [ - GridSearchCV, - RandomizedSearchCV, - Pipeline, - ] - - -def _is_parameter_search_estimator(estimator): - """ - :return: `True` if the specified scikit-learn estimator is a parameter search estimator, - such as `GridSearchCV`. `False` otherwise. - """ - from sklearn.model_selection import GridSearchCV, RandomizedSearchCV - - parameter_search_estimators = [ - GridSearchCV, - RandomizedSearchCV, - ] - - return any( - [ - isinstance(estimator, param_search_estimator) - for param_search_estimator in parameter_search_estimators - ] - ) - - -def _log_parameter_search_results_as_artifact(cv_results_df, run_id): - """ - Records a collection of parameter search results as an MLflow artifact - for the specified run. - - :param cv_results_df: A Pandas DataFrame containing the results of a parameter search - training session, which may be obtained by parsing the `cv_results_` - attribute of a trained parameter search estimator such as - `GridSearchCV`. - :param run_id: The ID of the MLflow Run to which the artifact should be recorded. - """ - with TempDir() as t: - results_path = t.path("cv_results.csv") - cv_results_df.to_csv(results_path) - try_mlflow_log(MlflowClient().log_artifact, run_id, results_path) - - -def _create_child_runs_for_parameter_search(cv_estimator, parent_run, child_tags=None): - """ - Creates a collection of child runs for a parameter search training session. - Runs are reconstructed from the `cv_results_` attribute of the specified trained - parameter search estimator - `cv_estimator`, which provides relevant performance - metrics for each point in the parameter search space. One child run is created - for each point in the parameter search space. For additional information, see - `https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html`_. # noqa: E501 - - :param cv_estimator: The trained parameter search estimator for which to create - child runs. - :param parent_run: A py:class:`mlflow.entities.Run` object referring to the parent - parameter search run for which child runs should be created. - :param child_tags: An optional dictionary of MLflow tag keys and values to log - for each child run. - """ - import pandas as pd - - client = MlflowClient() - # Use the start time of the parent parameter search run as a rough estimate for the - # start time of child runs, since we cannot precisely determine when each point - # in the parameter search space was explored - child_run_start_time = parent_run.info.start_time - child_run_end_time = int(time.time() * 1000) - - seed_estimator = cv_estimator.estimator - # In the unlikely case that a seed of a parameter search estimator is, - # itself a parameter search estimator, we should avoid logging the untuned - # parameters of the seeds's seed estimator - should_log_params_deeply = not _is_parameter_search_estimator(seed_estimator) - # Each row of `cv_results_` only provides parameters that vary across - # the user-specified parameter grid. In order to log the complete set - # of parameters for each child run, we fetch the parameters defined by - # the seed estimator and update them with parameter subset specified - # in the result row - base_params = seed_estimator.get_params(deep=should_log_params_deeply) - - cv_results_df = pd.DataFrame.from_dict(cv_estimator.cv_results_) - for _, result_row in cv_results_df.iterrows(): - tags_to_log = dict(child_tags) if child_tags else {} - tags_to_log.update( - {MLFLOW_PARENT_RUN_ID: parent_run.info.run_id,} - ) - tags_to_log.update(_get_estimator_info_tags(seed_estimator)) - child_run = client.create_run( - experiment_id=parent_run.info.experiment_id, - start_time=child_run_start_time, - tags=tags_to_log, - ) - - from itertools import zip_longest - - params_to_log = dict(base_params) - params_to_log.update(result_row.get("params", {})) - param_batches_to_log = _chunk_dict(params_to_log, chunk_size=MAX_PARAMS_TAGS_PER_BATCH) - - # Parameters values are recorded twice in the set of search `cv_results_`: - # once within a `params` column with dictionary values and once within - # a separate dataframe column that is created for each parameter. To prevent - # duplication of parameters, we log the consolidated values from the parameter - # dictionary column and filter out the other parameter-specific columns with - # names of the form `param_{param_name}`. Additionally, `cv_results_` produces - # metrics for each training split, which is fairly verbose; accordingly, we filter - # out per-split metrics in favor of aggregate metrics (mean, std, etc.) - excluded_metric_prefixes = ["param", "split"] - metric_batches_to_log = _chunk_dict( - { - key: value - for key, value in result_row.iteritems() - if not any([key.startswith(prefix) for prefix in excluded_metric_prefixes]) - and isinstance(value, Number) - }, - chunk_size=min( - MAX_ENTITIES_PER_BATCH - MAX_PARAMS_TAGS_PER_BATCH, MAX_METRICS_PER_BATCH - ), - ) - - for params_batch, metrics_batch in zip_longest( - param_batches_to_log, metric_batches_to_log, fillvalue={} - ): - # Trim any parameter keys / values and metric keys that exceed the limits - # imposed by corresponding MLflow Tracking APIs (e.g., LogParam, LogMetric) - truncated_params_batch = _truncate_dict( - params_batch, MAX_ENTITY_KEY_LENGTH, MAX_PARAM_VAL_LENGTH - ) - truncated_metrics_batch = _truncate_dict( - metrics_batch, max_key_length=MAX_ENTITY_KEY_LENGTH - ) - client.log_batch( - run_id=child_run.info.run_id, - params=[ - Param(str(key), str(value)) for key, value in truncated_params_batch.items() - ], - metrics=[ - Metric(key=key, value=value, timestamp=child_run_end_time, step=0,) - for key, value in truncated_metrics_batch.items() - ], - ) - - client.set_terminated(run_id=child_run.info.run_id, end_time=child_run_end_time) - - def _is_supported_version(): import sklearn diff --git a/tests/sklearn/test_sklearn_autolog.py b/tests/sklearn/test_sklearn_autolog.py index 100517cf185de5..c7e95b144c1ffe 100644 --- a/tests/sklearn/test_sklearn_autolog.py +++ b/tests/sklearn/test_sklearn_autolog.py @@ -527,119 +527,6 @@ def test_meta_estimator_fit_performs_logging_only_once(): assert len(mlflow.search_runs([run._info.experiment_id], query)) == 0 -@pytest.mark.parametrize( - "estimator_class_and_space", - [ - (sklearn.model_selection.GridSearchCV, {"kernel": ("linear", "rbf"), "C": [1, 5, 10]}), - (sklearn.model_selection.RandomizedSearchCV, {"C": uniform(loc=0, scale=4)}), - ], -) -@pytest.mark.parametrize("backend", [None, "threading", "loky"]) -def test_parameter_search_estimators_produce_expected_outputs(estimator_class_and_space, backend): - mlflow.sklearn.autolog() - - estimator_class, search_space = estimator_class_and_space - svc = sklearn.svm.SVC() - cv_model = estimator_class(svc, search_space, n_jobs=1, return_train_score=True) - X, y = get_iris() - - def train_cv_model(): - if backend is None: - cv_model.fit(X, y) - else: - with sklearn.utils.parallel_backend(backend=backend): - cv_model.fit(X, y) - - with mlflow.start_run() as run: - cv_model.fit(X, y) - run_id = run.info.run_id - - params, metrics, tags, artifacts = get_run_data(run_id) - expected_cv_params = truncate_dict(stringify_dict_values(cv_model.get_params(deep=False))) - assert expected_cv_params.items() <= params.items() - assert metrics == {TRAINING_SCORE: cv_model.score(X, y)} - assert tags == get_expected_class_tags(cv_model) - assert MODEL_DIR in artifacts - assert "best_estimator" in artifacts - assert "cv_results.csv" in artifacts - - client = mlflow.tracking.MlflowClient() - child_runs = client.search_runs( - run.info.experiment_id, f"tags.`mlflow.parentRunId` = '{run_id}'" - ) - cv_results = pd.DataFrame.from_dict(cv_model.cv_results_) - # We expect to have created a child run for each point in the parameter search space - assert len(child_runs) == len(cv_results) - - # Verify that each set of parameter search results has a corresponding MLflow run - # with the expected data - for _, result in cv_results.iterrows(): - result_params = result.get("params", {}) - params_search_clause = " and ".join( - [f"params.`{key}` = '{value}'" for key, value in result_params.items()] - ) - search_filter = f"tags.`mlflow.parentRunId` = '{run_id}' and {params_search_clause}" - child_runs = client.search_runs(run.info.experiment_id, search_filter) - assert len(child_runs) == 1 - child_run = child_runs[0] - - child_params, child_metrics, child_tags, child_artifacts = get_run_data( - child_run.info.run_id - ) - assert child_tags == get_expected_class_tags(svc) - assert "mean_test_score" in child_metrics.keys() - assert "std_test_score" in child_metrics.keys() - # Ensure that we do not capture separate metrics for each cross validation split, which - # would produce very noisy metrics results - assert len([metric for metric in child_metrics.keys() if metric.startswith("split")]) == 0 - - -def test_parameter_search_handles_large_volume_of_metric_outputs(): - mlflow.sklearn.autolog() - - metrics_size = MAX_METRICS_PER_BATCH + 10 - metrics_to_log = { - "score_{}".format(i): sklearn.metrics.make_scorer(lambda y, y_pred, **kwargs: i) - for i in range(metrics_size) - } - - with mlflow.start_run() as run: - svc = sklearn.svm.SVC() - cv_model = sklearn.model_selection.GridSearchCV( - svc, {"C": [1]}, n_jobs=1, scoring=metrics_to_log, refit=False - ) - cv_model.fit(*get_iris()) - run_id = run.info.run_id - - client = mlflow.tracking.MlflowClient() - child_runs = client.search_runs( - run.info.experiment_id, f"tags.`mlflow.parentRunId` = '{run_id}'" - ) - assert len(child_runs) == 1 - child_run = child_runs[0] - - assert len(child_run.data.metrics) >= metrics_size - - -@pytest.mark.disable_force_try_mlflow_log_to_fail -@pytest.mark.parametrize( - "failing_specialization", - [ - "mlflow.sklearn.utils._log_parameter_search_results_as_artifact", - "mlflow.sklearn.utils._create_child_runs_for_parameter_search", - ], -) -def test_autolog_does_not_throw_when_parameter_search_logging_fails(failing_specialization): - with mock.patch(failing_specialization, side_effect=Exception("Failed")) as mock_func: - # Enable autologging after mocking the parameter search specialization function - # to ensure that the mock is applied before the function is imported - mlflow.sklearn.autolog() - svc = sklearn.svm.SVC() - cv_model = sklearn.model_selection.GridSearchCV(svc, {"C": [1]}, n_jobs=1) - cv_model.fit(*get_iris()) - mock_func.assert_called_once() - - @pytest.mark.disable_force_try_mlflow_log_to_fail @pytest.mark.parametrize( "func_to_fail",