diff --git a/google/cloud/aiplatform/__init__.py b/google/cloud/aiplatform/__init__.py index 53775d7c56..1b26fcfea1 100644 --- a/google/cloud/aiplatform/__init__.py +++ b/google/cloud/aiplatform/__init__.py @@ -94,6 +94,7 @@ log_model = metadata.metadata._experiment_tracker.log_model get_experiment_df = metadata.metadata._experiment_tracker.get_experiment_df start_run = metadata.metadata._experiment_tracker.start_run +autolog = metadata.metadata._experiment_tracker.autolog start_execution = metadata.metadata._experiment_tracker.start_execution log = metadata.metadata._experiment_tracker.log log_time_series_metrics = metadata.metadata._experiment_tracker.log_time_series_metrics diff --git a/google/cloud/aiplatform/_mlflow_plugin/__init__.py b/google/cloud/aiplatform/_mlflow_plugin/__init__.py new file mode 100644 index 0000000000..f1790ebd04 --- /dev/null +++ b/google/cloud/aiplatform/_mlflow_plugin/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/google/cloud/aiplatform/_mlflow_plugin/_vertex_mlflow_tracking.py b/google/cloud/aiplatform/_mlflow_plugin/_vertex_mlflow_tracking.py new file mode 100644 index 0000000000..539ca57b55 --- /dev/null +++ b/google/cloud/aiplatform/_mlflow_plugin/_vertex_mlflow_tracking.py @@ -0,0 +1,471 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from collections import defaultdict +from typing import Any, Dict, List, NamedTuple, Optional, Union + +from mlflow import entities as mlflow_entities +from mlflow.store.tracking import abstract_store +from mlflow import exceptions as mlflow_exceptions + +from google.cloud import aiplatform +from google.cloud.aiplatform import base +from google.cloud.aiplatform import utils +from google.cloud.aiplatform.compat.types import execution as execution_v1 + +_LOGGER = base.Logger(__name__) + +# MLFlow RunStatus: +# https://www.mlflow.org/docs/latest/python_api/mlflow.entities.html#mlflow.entities.RunStatus +_MLFLOW_RUN_TO_VERTEX_RUN_STATUS = { + mlflow_entities.RunStatus.FINISHED: execution_v1.Execution.State.COMPLETE, + mlflow_entities.RunStatus.FAILED: execution_v1.Execution.State.FAILED, + mlflow_entities.RunStatus.RUNNING: execution_v1.Execution.State.RUNNING, + mlflow_entities.RunStatus.KILLED: execution_v1.Execution.State.CANCELLED, + mlflow_entities.RunStatus.SCHEDULED: execution_v1.Execution.State.NEW, +} +mlflow_to_vertex_run_default = defaultdict( + lambda: execution_v1.Execution.State.STATE_UNSPECIFIED +) +for mlflow_status in _MLFLOW_RUN_TO_VERTEX_RUN_STATUS: + mlflow_to_vertex_run_default[mlflow_status] = _MLFLOW_RUN_TO_VERTEX_RUN_STATUS[ + mlflow_status + ] + +# Mapping of Vertex run status to MLFlow run status (inverse of _MLFLOW_RUN_TO_VERTEX_RUN_STATUS) +_VERTEX_RUN_TO_MLFLOW_RUN_STATUS = { + v: k for k, v in _MLFLOW_RUN_TO_VERTEX_RUN_STATUS.items() +} +vertex_run_to_mflow_default = defaultdict(lambda: mlflow_entities.RunStatus.FAILED) +for vertex_status in _VERTEX_RUN_TO_MLFLOW_RUN_STATUS: + vertex_run_to_mflow_default[vertex_status] = _VERTEX_RUN_TO_MLFLOW_RUN_STATUS[ + vertex_status + ] + +_MLFLOW_TERMINAL_RUN_STATES = [ + mlflow_entities.RunStatus.FINISHED, + mlflow_entities.RunStatus.FAILED, + mlflow_entities.RunStatus.KILLED, +] + + +class _RunTracker(NamedTuple): + """Tracks the current Vertex ExperimentRun. + + Stores the current ExperimentRun the plugin is writing to and whether or + not this run is autocreated. + + Attributes: + autocreate (bool): + Whether the Vertex ExperimentRun should be autocreated. If False, + the plugin writes to the currently active run created via + `aiplatform.start_run()`. + experiment_run (aiplatform.ExperimentRun): + The currently set ExperimentRun. + """ + + autocreate: bool + experiment_run: "aiplatform.ExperimentRun" + + +class _VertexMlflowTracking(abstract_store.AbstractStore): + """Vertex plugin implementation of MLFlow's AbstractStore class.""" + + def _to_mlflow_metric( + self, + vertex_metrics: Dict[str, Union[float, int, str]], + ) -> Optional[List[mlflow_entities.Metric]]: + """Helper method to convert Vertex metrics to mlflow.entities.Metric type. + + Args: + vertex_metrics (Dict[str, Union[float, int, str]]): + Required. A dictionary of Vertex metrics returned from + ExperimentRun.get_metrics() + Returns: + List[mlflow_entities.Metric] - A list of metrics converted to MLFlow's + Metric type. + """ + + mlflow_metrics = [] + + if vertex_metrics: + for metric_key in vertex_metrics: + mlflow_metric = mlflow_entities.Metric( + key=metric_key, + value=vertex_metrics[metric_key], + step=0, + timestamp=0, + ) + mlflow_metrics.append(mlflow_metric) + else: + return None + + return mlflow_metrics + + def _to_mlflow_params( + self, vertex_params: Dict[str, Union[float, int, str]] + ) -> Optional[mlflow_entities.Param]: + """Helper method to convert Vertex params to mlflow.entities.Param type. + + Args: + vertex_params (Dict[str, Union[float, int, str]]): + Required. A dictionary of Vertex params returned from + ExperimentRun.get_params() + Returns: + List[mlflow_entities.Param] - A list of params converted to MLFlow's + Param type. + """ + + mlflow_params = [] + + if vertex_params: + for param_key in vertex_params: + mlflow_param = mlflow_entities.Param( + key=param_key, value=vertex_params[param_key] + ) + mlflow_params.append(mlflow_param) + else: + return None + + return mlflow_params + + def _to_mlflow_entity( + self, + vertex_exp: "aiplatform.Experiment", + vertex_run: "aiplatform.ExperimentRun", + ) -> mlflow_entities.Run: + """Helper method to convert data to required MLFlow type. + + This converts data into MLFlow's mlflow_entities.Run type, which is a + required return type for some methods we're overriding in this plugin. + + Args: + vertex_exp (aiplatform.Experiment): + Required. The current Vertex Experiment. + vertex_run (aiplatform.ExperimentRun): + Required. The active Vertex ExperimentRun + Returns: + mlflow_entities.Run - The data from the currently active run + converted to MLFLow's mlflow_entities.Run type. + + https://www.mlflow.org/docs/latest/python_api/mlflow.entities.html#mlflow.entities.Run + """ + + run_info = mlflow_entities.RunInfo( + run_id=f"{vertex_exp.name}-{vertex_run.name}", + run_uuid=f"{vertex_exp.name}-{vertex_run.name}", + experiment_id=vertex_exp.name, + user_id="", + status=vertex_run_to_mflow_default[vertex_run.state], + start_time=1, + end_time=2, + lifecycle_stage=mlflow_entities.LifecycleStage.ACTIVE, + artifact_uri="file:///tmp/", # The plugin will fail if artifact_uri is not set to a valid filepath string + ) + + run_data = mlflow_entities.RunData( + metrics=self._to_mlflow_metric(vertex_run.get_metrics()), + params=self._to_mlflow_params(vertex_run.get_params()), + tags={}, + ) + + return mlflow_entities.Run(run_info=run_info, run_data=run_data) + + def __init__(self, store_uri: Optional[str], artifact_uri: Optional[str]) -> None: + """Initializes the Vertex MLFlow plugin. + + This plugin overrides MLFlow's AbstractStore class to write metrics and + parameters from model training code to Vertex Experiments. This plugin + is private and should not be instantiated outside the Vertex SDK. + + The _run_map instance property is a dict mapping MLFlow run_id to an + instance of _RunTracker with data on the corresponding Vertex + ExperimentRun. + + For example: { + 'sklearn-12345': _RunTracker(autocreate=True, experiment_run=aiplatform.ExperimentRun(...)) + } + + Until autologging and Experiments supports nested runs, _nested_run_tracker + is used to ensure the plugin shows a warning log exactly once every time it + encounters a model that produces nested runs, like sklearn GridSearchCV and + RandomizedSearchCV models. It is a mapping of parent_run_id to the number of + child runs for that parent. When exactly 1 child run is found, the warning + log is shown. + + Args: + store_uri (str): + The tracking store uri used by MLFlow to write parameters and + metrics for a run. This plugin ignores store_uri since we are + writing data to Vertex Experiments. For this plugin, the value + of store_uri will always be `vertex-mlflow-plugin://`. + artifact_uri (str): + The artifact uri used by MLFlow to write artifacts generated by + a run. This plugin ignores artifact_uri since it doesn't write + any artifacts to Vertex. + """ + + self._run_map = {} + self._vertex_experiment = None + self._nested_run_tracker = {} + super(_VertexMlflowTracking, self).__init__() + + @property + def run_map(self) -> Dict[str, Any]: + return self._run_map + + @property + def vertex_experiment(self) -> "aiplatform.Experiment": + return self._vertex_experiment + + def create_run( + self, + experiment_id: str, + user_id: str, + start_time: str, + tags: List[mlflow_entities.RunTag], + run_name: str, + ) -> mlflow_entities.Run: + """Creates a new ExperimentRun in Vertex if no run is active. + + This overrides the behavior of MLFlow's `create_run()` method to check + if there is a currently active ExperimentRun. If no ExperimentRun is + active, a new Vertex ExperimentRun will be created with the name + `-`. If aiplatform.start_run() has been + invoked and there is an active run, no run will be created and the + currently active ExperimentRun will be returned as an MLFlow Run + entity. + + Args: + experiment_id (str): + The ID of the currently set MLFlow Experiment. Not used by this + plugin. + user_id (str): + The ID of the MLFlow user. Not used by this plugin. + start_time (int): + The start time of the run, in milliseconds since the UNIX + epoch. Not used by this plugin. + tags (List[mlflow_entities.RunTag]): + The tags provided by MLFlow. Only the `mlflow.autologging` tag + is used by this plugin. + run_name (str): + The name of the MLFlow run. Not used by this plugin. + Returns: + mlflow_entities.Run - The created run returned as MLFLow's run + type. + Raises: + RuntimeError: + If a second model training call is made to a manually created + run created via `aiplatform.start_run()` that has already been + used to autolog metrics and parameters in this session. + """ + + self._vertex_experiment = ( + aiplatform.metadata.metadata._experiment_tracker._experiment + ) + + currently_active_run = ( + aiplatform.metadata.metadata._experiment_tracker._experiment_run + ) + + parent_run_id = None + + for tag in tags: + if tag.key == "mlflow.parentRunId" and tag.value is not None: + parent_run_id = tag.value + if parent_run_id in self._nested_run_tracker: + self._nested_run_tracker[parent_run_id] += 1 + else: + self._nested_run_tracker[parent_run_id] = 1 + _LOGGER.warning( + f"This model creates nested runs. No additional ExperimentRun resources will be created for nested runs, summary metrics and parameters will be logged to the parent ExperimentRun: {parent_run_id}." + ) + + if currently_active_run: + if ( + f"{currently_active_run.resource_id}" in self._run_map + and not parent_run_id + ): + _LOGGER.warning( + "Metrics and parameters have already been logged to this run. Call aiplatform.end_run() to end the current run before training a new model." + ) + raise mlflow_exceptions.MlflowException( + "Metrics and parameters have already been logged to this run. Call aiplatform.end_run() to end the current run before training a new model." + ) + elif not parent_run_id: + run_tracker = _RunTracker( + autocreate=False, experiment_run=currently_active_run + ) + current_run_id = currently_active_run.name + + # nested run case + else: + raise mlflow_exceptions.MlflowException( + f"This model creates nested runs. No additional ExperimentRun resources will be created for nested runs, summary metrics and parameters will be logged to the {parent_run_id}: ExperimentRun." + ) + + # Create a new run if aiplatform.start_run() hasn't been called + else: + framework = "" + + for tag in tags: + if tag.key == "mlflow.autologging": + framework = tag.value + + current_run_id = f"{framework}-{utils.timestamped_unique_name()}" + currently_active_run = aiplatform.start_run(run=current_run_id) + run_tracker = _RunTracker( + autocreate=True, experiment_run=currently_active_run + ) + + self._run_map[currently_active_run.resource_id] = run_tracker + + return self._to_mlflow_entity( + vertex_exp=self._vertex_experiment, + vertex_run=run_tracker.experiment_run, + ) + + def update_run_info( + self, + run_id: str, + run_status: mlflow_entities.RunStatus, + end_time: int, + run_name: str, + ) -> mlflow_entities.RunInfo: + """Updates the ExperimentRun status with the status provided by MLFlow. + + Args: + run_id (str): + The ID of the currently set MLFlow run. This is mapped to the + corresponding ExperimentRun in self._run_map. + run_status (mlflow_entities.RunStatus): + The run status provided by MLFlow MLFlow. + end_time (int): + The end time of the run. Not used by this plugin. + run_name (str): + The name of the MLFlow run. Not used by this plugin. + Returns: + mlflow_entities.RunInfo - Info about the updated run in MLFlow's + required RunInfo format. + """ + + # The if block below does the following: + # - Ends autocreated ExperimentRuns when MLFlow returns a terminal RunStatus. + # - For other autocreated runs or runs where MLFlow returns a non-terminal + # RunStatus, this updates the ExperimentRun with the corresponding + # _MLFLOW_RUN_TO_VERTEX_RUN_STATUS. + # - Non-autocreated ExperimentRuns with a terminal status are not ended. + + if ( + self._run_map[run_id].autocreate + and run_status in _MLFLOW_TERMINAL_RUN_STATES + and self._run_map[run_id].experiment_run + is aiplatform.metadata.metadata._experiment_tracker._experiment_run + ): + aiplatform.metadata.metadata._experiment_tracker.end_run( + state=execution_v1.Execution.State.COMPLETE + ) + elif ( + self._run_map[run_id].autocreate + or run_status not in _MLFLOW_TERMINAL_RUN_STATES + ): + self._run_map[run_id].experiment_run.update_state( + state=mlflow_to_vertex_run_default[run_status] + ) + + return mlflow_entities.RunInfo( + run_uuid=run_id, + run_id=run_id, + status=run_status, + end_time=end_time, + experiment_id=self._vertex_experiment, + user_id="", + start_time=1, + lifecycle_stage=mlflow_entities.LifecycleStage.ACTIVE, + artifact_uri="file:///tmp/", + ) + + def log_batch( + self, + run_id: str, + metrics: List[mlflow_entities.Metric], + params: List[mlflow_entities.Param], + tags: List[mlflow_entities.RunTag], + ) -> None: + """The primary logging method used by MLFlow. + + This plugin overrides this method to write the metrics and parameters + provided by MLFlow to the active Vertex ExperimentRun. + Args: + run_id (str): + The ID of the MLFlow run to write metrics to. This is mapped to + the corresponding ExperimentRun in self._run_map. + metrics (List[mlflow_entities.Metric]): + A list of MLFlow metrics generated from the current model + training run. + params (List[mlflow_entities.Param]): + A list of MLFlow params generated from the current model + training run. + tags (List[mlflow_entities.RunTag]): + The tags provided by MLFlow. Not used by this plugin. + """ + + summary_metrics = {} + summary_params = {} + time_series_metrics = {} + + # Get the run to write to + vertex_run = self._run_map[run_id].experiment_run + + for metric in metrics: + if metric.step: + if metric.step not in time_series_metrics: + time_series_metrics[metric.step] = {metric.key: metric.value} + else: + time_series_metrics[metric.step][metric.key] = metric.value + else: + summary_metrics[metric.key] = metric.value + + for param in params: + summary_params[param.key] = param.value + + if summary_metrics: + vertex_run.log_metrics(metrics=summary_metrics) + + if summary_params: + vertex_run.log_params(params=summary_params) + + # TODO(b/261722623): batch these calls + if time_series_metrics: + for step in time_series_metrics: + vertex_run.log_time_series_metrics(time_series_metrics[step], step) + + def get_run(self, run_id: str) -> mlflow_entities.Run: + """Gets the currently active run. + + Args: + run_id (str): + The ID of the currently set MLFlow run. This is mapped to the + corresponding ExperimentRun in self._run_map. + Returns: + mlflow_entities.Run - The currently active Vertex ExperimentRun, + returned as MLFLow's run type. + """ + return self._to_mlflow_entity( + vertex_exp=self._vertex_experiment, + vertex_run=self._run_map[run_id].experiment_run, + ) diff --git a/google/cloud/aiplatform/metadata/artifact.py b/google/cloud/aiplatform/metadata/artifact.py index 7245fcde37..ca74722315 100644 --- a/google/cloud/aiplatform/metadata/artifact.py +++ b/google/cloud/aiplatform/metadata/artifact.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2022 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ from typing import Optional, Dict, Union import proto +import threading from google.auth import credentials as auth_credentials @@ -217,6 +218,7 @@ def _create( project=project, location=location, credentials=credentials ) self._gca_resource = resource + self._threading_lock = threading.Lock() return self diff --git a/google/cloud/aiplatform/metadata/context.py b/google/cloud/aiplatform/metadata/context.py index a77d4cb62d..88e83c40be 100644 --- a/google/cloud/aiplatform/metadata/context.py +++ b/google/cloud/aiplatform/metadata/context.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2022 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ from typing import Optional, Dict, List, Sequence import proto +import threading from google.auth import credentials as auth_credentials @@ -248,6 +249,7 @@ def _create( project=project, location=location, credentials=credentials ) self._gca_resource = resource + self._threading_lock = threading.Lock() return self diff --git a/google/cloud/aiplatform/metadata/metadata.py b/google/cloud/aiplatform/metadata/metadata.py index 301a2d7ca5..07b04ded85 100644 --- a/google/cloud/aiplatform/metadata/metadata.py +++ b/google/cloud/aiplatform/metadata/metadata.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2022 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,8 @@ # limitations under the License. # -from typing import Any, Dict, List, Optional, Union +import logging +from typing import Dict, Union, Optional, Any, List from google.api_core import exceptions from google.auth import credentials as auth_credentials @@ -33,12 +34,23 @@ artifact_schema as google_artifact_schema, ) from google.cloud.aiplatform.tensorboard import tensorboard_resource +from google.cloud.aiplatform.utils import autologging_utils from google.cloud.aiplatform_v1.types import execution as execution_v1 _LOGGER = base.Logger(__name__) +class _MLFlowLogFilter(logging.Filter): + """Log filter to only show MLFlow logs for unsupported framework versions.""" + + def filter(self, record) -> bool: + if record.msg.startswith("You are using an unsupported version"): + return True + else: + return False + + def _get_experiment_schema_version() -> str: """Helper method to get experiment schema version @@ -190,6 +202,7 @@ def __init__(self): self._experiment: Optional[experiment_resources.Experiment] = None self._experiment_run: Optional[experiment_run_resource.ExperimentRun] = None self._global_tensorboard: Optional[tensorboard_resource.Tensorboard] = None + self._existing_tracking_uri: Optional[str] = None def reset(self): """Resets this experiment tracker, clearing the current experiment and run.""" @@ -248,6 +261,16 @@ def set_experiment( self._experiment = experiment + if ( + not current_backing_tb + and not backing_tb + and autologging_utils._is_autologging_enabled() + ): + logging.warning( + "Disabling autologging since the current Experiment doesn't have a backing Tensorboard." + ) + self.autolog(disable=True) + def set_tensorboard( self, tensorboard: Union[ @@ -280,6 +303,40 @@ def set_tensorboard( self._global_tensorboard = tensorboard + def _initialize_mlflow_plugin(): + """Invokes the Vertex MLFlow plugin. + + Adding our log filter to MLFlow before calling mlflow.autolog() with + silent=False will only surface warning logs when the installed ML + framework version used for autologging is not supported by MLFlow. + """ + + import mlflow + from mlflow.tracking._tracking_service import utils as mlflow_tracking_utils + from google.cloud.aiplatform._mlflow_plugin._vertex_mlflow_tracking import ( + _VertexMlflowTracking, + ) + + # Only show MLFlow warning logs for ML framework version mismatches + logging.getLogger("mlflow").setLevel(logging.WARNING) + logging.getLogger("mlflow.tracking.fluent").disabled = True + logging.getLogger("mlflow.utils.autologging_utils").addFilter( + _MLFlowLogFilter() + ) + + mlflow_tracking_utils._tracking_store_registry.register( + "vertex-mlflow-plugin", _VertexMlflowTracking + ) + + mlflow.set_tracking_uri("vertex-mlflow-plugin://") + + mlflow.autolog( + log_input_examples=False, + log_model_signatures=False, + log_models=False, + silent=False, # using False to show unsupported framework version warnings with _MLFlowLogFilter + ) + def start_run( self, run: str, @@ -371,13 +428,77 @@ def end_run( try: self._experiment_run.end_run(state=state) except exceptions.NotFound: - _LOGGER.warn( + _LOGGER.warning( f"Experiment run {self._experiment_run.name} was not found." "It may have been deleted" ) finally: self._experiment_run = None + def autolog(self, disable=False): + """Enables autologging of parameters and metrics to Vertex Experiments. + + After calling `aiplatform.autolog()`, any metrics and parameters from + model training calls with supported ML frameworks will be automatically + logged to Vertex Experiments. + + Using autologging requires setting an experiment and experiment_tensorboard. + + Args: + disable (bool): + Optional. Whether to disable autologging. Defaults to False. + If set to True, this resets the MLFlow tracking URI to its + previous state before autologging was called and remove logging + filters. + Raises: + ImportError: + If MLFlow is not installed. MLFlow is required to use + autologging in Vertex. + ValueError: + If experiment or experiment_tensorboard is not set. + If `disable` is passed and autologging hasn't been enbaled. + """ + + try: + import mlflow + except ImportError: + raise ImportError( + "MLFlow is not installed. Please install MLFlow using pip install google-cloud-aiplatform[autologging] to use autologging in the Vertex SDK." + ) + + if disable: + if not autologging_utils._is_autologging_enabled(): + raise ValueError( + "Autologging is not enabled. Enable autologging by calling aiplatform.autolog()." + ) + if self._existing_tracking_uri: + mlflow.set_tracking_uri(self._existing_tracking_uri) + mlflow.autolog(disable=True) + + # Remove the log filters we applied in the plugin + logging.getLogger("mlflow").setLevel(logging.INFO) + logging.getLogger("mlflow.tracking.fluent").disabled = False + logging.getLogger("mlflow.utils.autologging_utils").removeFilter( + _MLFlowLogFilter() + ) + elif not self._experiment: + raise ValueError( + "No experiment set. Make sure to call aiplatform.init(experiment='my-experiment') " + "before calling aiplatform.autolog()." + ) + elif not self.experiment._metadata_context.metadata.get( + constants._BACKING_TENSORBOARD_RESOURCE_KEY + ): + raise ValueError( + "Setting an experiment tensorboard is required to use autologging. " + "Please set a backing tensorboard resource by calling " + "aiplatform.init(experiment_tensorboard=aiplatform.Tensorboard(...))." + ) + else: + self._existing_tracking_uri = mlflow.get_tracking_uri() + + _ExperimentTracker._initialize_mlflow_plugin() + def log_params(self, params: Dict[str, Union[float, int, str]]): """Log single or multiple parameters with specified key and value pairs. @@ -810,7 +931,7 @@ def start_execution( if self.experiment_run: if self.experiment_run._is_legacy_experiment_run(): - _LOGGER.warn( + _LOGGER.warning( f"{self.experiment_run._run_name} is an Experiment run created in Vertex Experiment Preview", " and does not support tracking Executions." " Please create a new Experiment run to track executions against an Experiment run.", diff --git a/google/cloud/aiplatform/metadata/resource.py b/google/cloud/aiplatform/metadata/resource.py index 63ad89208e..3e4d0ddc23 100644 --- a/google/cloud/aiplatform/metadata/resource.py +++ b/google/cloud/aiplatform/metadata/resource.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2021 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import abc import collections import re +import threading from copy import deepcopy from typing import Dict, Optional, Union, Any, List @@ -102,6 +103,8 @@ def __init__( name=full_resource_name, retry=base._DEFAULT_RETRY ) + self._threading_lock = threading.Lock() + @property def metadata(self) -> Dict: return self.to_dict()["metadata"] @@ -295,20 +298,22 @@ def update( credentials set in aiplatform.init. """ - gca_resource = deepcopy(self._gca_resource) - if metadata: - self._nested_update_metadata(gca_resource=gca_resource, metadata=metadata) - if description: - gca_resource.description = description - - api_client = self._instantiate_client(credentials=credentials) - - # TODO: if etag is not valid sync and retry - update_gca_resource = self._update_resource( - client=api_client, - resource=gca_resource, - ) - self._gca_resource = update_gca_resource + with self._threading_lock: + gca_resource = deepcopy(self._gca_resource) + if metadata: + self._nested_update_metadata( + gca_resource=gca_resource, metadata=metadata + ) + if description: + gca_resource.description = description + + api_client = self._instantiate_client(credentials=credentials) + # TODO: if etag is not valid sync and retry + update_gca_resource = self._update_resource( + client=api_client, + resource=gca_resource, + ) + self._gca_resource = update_gca_resource @classmethod def list( @@ -453,7 +458,7 @@ def _create( ) self._gca_resource = resource - + self._threading_lock = threading.Lock() return self @classmethod diff --git a/google/cloud/aiplatform/utils/autologging_utils.py b/google/cloud/aiplatform/utils/autologging_utils.py new file mode 100644 index 0000000000..556fa74464 --- /dev/null +++ b/google/cloud/aiplatform/utils/autologging_utils.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +def _is_autologging_enabled() -> bool: + try: + import mlflow + + if mlflow.get_tracking_uri() == "vertex-mlflow-plugin://": + return True + else: + return False + except ImportError: + return False diff --git a/setup.py b/setup.py index 5382dbaf55..47248bfdde 100644 --- a/setup.py +++ b/setup.py @@ -75,6 +75,9 @@ endpoint_extra_require = ["requests >= 2.28.1"] private_endpoints_extra_require = ["urllib3 >=1.21.1, <1.27", "requests >= 2.28.1"] + +autologging_extra_require = ["mlflow>=1.27.0,<=2.1.1"] + full_extra_require = list( set( tensorboard_extra_require @@ -88,6 +91,7 @@ + vizier_extra_require + prediction_extra_require + private_endpoints_extra_require + + autologging_extra_require ) ) testing_extra_require = ( @@ -143,6 +147,7 @@ "prediction": prediction_extra_require, "datasets": datasets_extra_require, "private_endpoints": private_endpoints_extra_require, + "autologging": autologging_extra_require, }, python_requires=">=3.7", classifiers=[ diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index bd36017074..8f27a26d3d 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -10,5 +10,5 @@ proto-plus==1.22.0 protobuf==3.19.5 mock==4.0.2 google-cloud-storage==1.32.0 -packaging==14.3 +packaging==20.0 # Increased for compatibility with MLFlow grpcio-testing==1.34.0 diff --git a/tests/system/aiplatform/test_autologging.py b/tests/system/aiplatform/test_autologging.py new file mode 100644 index 0000000000..e45231d87e --- /dev/null +++ b/tests/system/aiplatform/test_autologging.py @@ -0,0 +1,329 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import mlflow # noqa: F401 +import numpy as np +import os +import pytest + +from google.cloud import aiplatform +from tests.system.aiplatform import e2e_base + +_RUN = "run-1" +_PARAMS = {"sdk-param-test-1": 0.1, "sdk-param-test-2": 0.2} +_METRICS = {"sdk-metric-test-1": 0.8, "sdk-metric-test-2": 100.0} + +_RUN_2 = "run-2" +_PARAMS_2 = {"sdk-param-test-1": 0.2, "sdk-param-test-2": 0.4} +_METRICS_2 = {"sdk-metric-test-1": 1.6, "sdk-metric-test-2": 200.0} + +_TIME_SERIES_METRIC_KEY = "accuracy" + +_CLASSIFICATION_METRICS = { + "display_name": "my-classification-metrics", + "labels": ["cat", "dog"], + "matrix": [[9, 1], [1, 9]], + "fpr": [0.1, 0.5, 0.9], + "tpr": [0.1, 0.7, 0.9], + "threshold": [0.9, 0.5, 0.1], +} + + +def build_and_train_test_tf_model(): + import tensorflow as tf + + X = np.array( + [ + [1, 1], + [1, 2], + [2, 2], + [2, 3], + [1, 1], + [1, 2], + [2, 2], + [2, 3], + [1, 1], + [1, 2], + [2, 2], + [2, 3], + ] + ) + y = np.dot(X, np.array([1, 2])) + 3 + + model = tf.keras.models.Sequential( + [ + tf.keras.layers.Flatten(input_shape=(2,)), + tf.keras.layers.Dense(128, activation="relu"), + tf.keras.layers.Dropout(0.2), + tf.keras.layers.Dense(1), + ] + ) + + model.compile( + optimizer="adam", + loss=tf.keras.losses.CategoricalCrossentropy(), + metrics=["accuracy"], + ) + + model.fit(X, y, epochs=5) + + +def build_and_train_test_scikit_model(): + from sklearn.linear_model import LinearRegression + + X = np.array([[1, 1], [1, 2], [2, 2], [2, 3]]) + y = np.dot(X, np.array([1, 2])) + 3 + + model = LinearRegression() + model.fit(X, y) + + +def train_test_scikit_model_and_get_score(): + from sklearn.linear_model import LinearRegression + + X = np.array([[1, 1], [1, 2], [2, 2], [2, 3]]) + y = np.dot(X, np.array([1, 2])) + 3 + + model = LinearRegression() + model.fit(X, y) + + model.score(X, y) + + +def build_and_train_test_sklearn_gridsearch_model(): + from sklearn import svm, datasets + from sklearn.linear_model import LinearRegression # noqa: F401 + from sklearn.model_selection import GridSearchCV, RandomizedSearchCV # noqa: F401 + + iris = datasets.load_iris() + parameters = {"kernel": ("linear", "rbf"), "C": [1, 10]} + svc = svm.SVC() + clf = GridSearchCV(svc, parameters) + clf.fit(iris.data, iris.target) + return clf + + +@pytest.mark.usefixtures( + "prepare_staging_bucket", "delete_staging_bucket", "tear_down_resources" +) +class TestAutologging(e2e_base.TestEndToEnd): + + _temp_prefix = "tmpvrtxsdk-e2e" + + def setup_class(cls): + cls._experiment_autocreate_scikit = cls._make_display_name("")[:64] + cls._experiment_autocreate_tf = cls._make_display_name("")[:64] + cls._experiment_manual_scikit = cls._make_display_name("")[:64] + cls._experiment_nested_run = cls._make_display_name("")[:64] + cls._experiment_disable_test = cls._make_display_name("")[:64] + cls._tensorboard_autorun = aiplatform.Tensorboard.create( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + display_name=cls._make_display_name("")[:64], + ) + cls._tensorboard_manual_run = aiplatform.Tensorboard.create( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + display_name=cls._make_display_name("")[:64], + ) + cls._tensorboard_nested_run = aiplatform.Tensorboard.create( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + display_name=cls._make_display_name("")[:64], + ) + cls._tensorboard_enable_test = aiplatform.Tensorboard.create( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + display_name=cls._make_display_name("")[:64], + ) + + cls._experiment_enable_name = cls._make_display_name("")[:64] + cls._experiment_enable_test = aiplatform.Experiment.get_or_create( + experiment_name=cls._experiment_enable_name + ) + cls._experiment_enable_test.assign_backing_tensorboard( + cls._tensorboard_enable_test + ) + + def test_autologging_with_autorun_creation(self, shared_state): + + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + experiment=self._experiment_autocreate_scikit, + experiment_tensorboard=self._tensorboard_autorun, + ) + + shared_state["resources"] = [self._tensorboard_autorun] + + shared_state["resources"].append( + aiplatform.metadata.metadata._experiment_tracker.experiment + ) + + aiplatform.autolog() + + build_and_train_test_scikit_model() + + # Confirm sklearn run, params, and metrics exist + experiment_df_scikit = aiplatform.get_experiment_df() + assert experiment_df_scikit["run_name"][0].startswith("sklearn-") + assert experiment_df_scikit["param.fit_intercept"][0] == "True" + assert experiment_df_scikit["metric.training_mae"][0] > 0 + + # Write post-training metrics to a scikit-learn model + assert "metric.LinearRegression_score_X" not in experiment_df_scikit.columns + train_test_scikit_model_and_get_score() + + experiment_df_scikit = aiplatform.get_experiment_df() + assert "metric.LinearRegression_score_X" in experiment_df_scikit.columns + + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + experiment=self._experiment_autocreate_tf, + experiment_tensorboard=self._tensorboard_autorun, + ) + + shared_state["resources"].append( + aiplatform.metadata.metadata._experiment_tracker.experiment + ) + build_and_train_test_tf_model() + + experiment_df_tf = aiplatform.get_experiment_df() + + # Confirm tf run, params, metrics, and time series metrics exist + assert experiment_df_tf["run_name"][0].startswith("tensorflow-") + assert experiment_df_tf["param.steps_per_epoch"][0] == "None" + assert experiment_df_tf["metric.loss"][0] > 0 + assert "time_series_metric.accuracy" in experiment_df_tf.columns + + # No data should be written locally + assert not os.path.isdir("mlruns") + + # training a model after disabling autologging should not create additional ExperimentRuns + assert len(experiment_df_tf) == 1 + aiplatform.autolog(disable=True) + build_and_train_test_scikit_model() + + # No additional experiment runs should be created after disabling autologging + experiment_df_after_disable = aiplatform.get_experiment_df() + assert len(experiment_df_after_disable) == 1 + + def test_autologging_with_manual_run_creation(self, shared_state): + + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + experiment=self._experiment_manual_scikit, + experiment_tensorboard=self._tensorboard_manual_run, + ) + + shared_state["resources"] = [self._tensorboard_manual_run] + + shared_state["resources"].append( + aiplatform.metadata.metadata._experiment_tracker.experiment + ) + + aiplatform.autolog() + run = aiplatform.start_run(_RUN) + + build_and_train_test_scikit_model() + experiment_df = aiplatform.get_experiment_df() + + # The run shouldn't be ended until end_run() is called + assert len(experiment_df) == 1 + assert experiment_df["run_name"][0] == _RUN + assert experiment_df["param.fit_intercept"][0] == "True" + assert run.state == aiplatform.gapic.Execution.State.RUNNING + + aiplatform.end_run() + assert run.state == aiplatform.gapic.Execution.State.COMPLETE + + new_experiment = self._make_display_name("exp-2") + + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + experiment=new_experiment, + experiment_tensorboard=self._tensorboard_manual_run, + ) + + shared_state["resources"].append( + aiplatform.metadata.metadata._experiment_tracker.experiment + ) + # Ending the manually created run and training a new model + # should auto-create a new ExperimentRun + build_and_train_test_scikit_model() + experiment_df_after_autocreate = aiplatform.get_experiment_df() + + assert len(experiment_df_after_autocreate) == 1 + assert experiment_df_after_autocreate["run_name"][0].startswith("sklearn-") + + def test_autologging_nested_run_model(self, shared_state, caplog): + + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + experiment=self._experiment_nested_run, + experiment_tensorboard=self._tensorboard_nested_run, + ) + + shared_state["resources"] = [self._tensorboard_nested_run] + + shared_state["resources"].append( + aiplatform.metadata.metadata._experiment_tracker.experiment + ) + + aiplatform.autolog() + + build_and_train_test_sklearn_gridsearch_model() + experiment_df = aiplatform.get_experiment_df() + + assert len(experiment_df) == 1 + + assert "This model creates nested runs." in caplog.text + caplog.clear() + + def test_autologging_enable_disable_check(self, shared_state, caplog): + + # first enable autologging with provided tb-backed experiment + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + experiment=self._experiment_enable_name, + ) + + shared_state["resources"] = [ + aiplatform.metadata.metadata._experiment_tracker.experiment + ] + shared_state["resources"].append(self._tensorboard_enable_test) + + aiplatform.autolog() + + assert aiplatform.utils.autologging_utils._is_autologging_enabled() + + # re-initializing without tb-backed experiment should disable autologging + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + experiment=self._experiment_disable_test, + ) + + assert "Disabling" in caplog.text + caplog.clear() + + assert not aiplatform.utils.autologging_utils._is_autologging_enabled() diff --git a/tests/unit/aiplatform/test_autologging.py b/tests/unit/aiplatform/test_autologging.py new file mode 100644 index 0000000000..a408c12eb1 --- /dev/null +++ b/tests/unit/aiplatform/test_autologging.py @@ -0,0 +1,1011 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import copy +import datetime +from importlib import reload +import os +from unittest import mock +from unittest.mock import patch + + +from mlflow import entities as mlflow_entities +from google.cloud.aiplatform._mlflow_plugin import _vertex_mlflow_tracking +from google.cloud.aiplatform.utils import autologging_utils + +import pytest +from google.api_core import exceptions + + +from google.cloud import aiplatform +from google.cloud.aiplatform import initializer +from google.cloud.aiplatform import base +from google.cloud.aiplatform_v1 import ( + AddContextArtifactsAndExecutionsResponse, + Artifact as GapicArtifact, + Context as GapicContext, + Execution as GapicExecution, + MetadataServiceClient, + MetadataStore as GapicMetadataStore, + TensorboardServiceClient, +) +from google.cloud.aiplatform.compat.types import execution as gca_execution +from google.cloud.aiplatform.compat.types import ( + tensorboard_run as gca_tensorboard_run, +) +from google.cloud.aiplatform.compat.types import ( + tensorboard_time_series as gca_tensorboard_time_series, +) +from google.cloud.aiplatform.metadata import constants + +from google.cloud.aiplatform.compat.services import ( + tensorboard_service_client, +) + +from google.cloud.aiplatform.compat.types import ( + encryption_spec as gca_encryption_spec, + tensorboard as gca_tensorboard, +) + +import test_tensorboard +import test_metadata + +import numpy as np + +_TEST_PROJECT = "test-project" +_TEST_OTHER_PROJECT = "test-project-1" +_TEST_LOCATION = "us-central1" +_TEST_PARENT = ( + f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/metadataStores/default" +) +_TEST_EXPERIMENT = "test-experiment" +_TEST_OTHER_EXPERIMENT = "test-other-experiment" +_TEST_EXPERIMENT_DESCRIPTION = "test-experiment-description" +_TEST_OTHER_EXPERIMENT_DESCRIPTION = "test-other-experiment-description" +_TEST_PIPELINE = _TEST_EXPERIMENT +_TEST_RUN = "run-1" +_TEST_OTHER_RUN = "run-2" +_TEST_DISPLAY_NAME = "test-display-name" + +# resource attributes +_TEST_METADATA = {"test-param1": 1, "test-param2": "test-value", "test-param3": True} + +# metadataStore +_TEST_METADATASTORE = ( + f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/metadataStores/default" +) + +# context +_TEST_CONTEXT_ID = _TEST_EXPERIMENT +_TEST_CONTEXT_NAME = f"{_TEST_PARENT}/contexts/{_TEST_CONTEXT_ID}" + +# execution +_TEST_EXECUTION_ID = f"{_TEST_EXPERIMENT}-{_TEST_RUN}" +_TEST_EXECUTION_NAME = f"{_TEST_PARENT}/executions/{_TEST_EXECUTION_ID}" +_TEST_OTHER_EXECUTION_ID = f"{_TEST_EXPERIMENT}-{_TEST_OTHER_RUN}" +_TEST_OTHER_EXECUTION_NAME = f"{_TEST_PARENT}/executions/{_TEST_OTHER_EXECUTION_ID}" +_TEST_SCHEMA_TITLE = "test.Schema" + +_TEST_EXECUTION = GapicExecution( + name=_TEST_EXECUTION_NAME, + schema_title=_TEST_SCHEMA_TITLE, + display_name=_TEST_DISPLAY_NAME, + metadata=_TEST_METADATA, + state=GapicExecution.State.RUNNING, +) + +# artifact +_TEST_ARTIFACT_ID = f"{_TEST_EXPERIMENT}-{_TEST_RUN}-metrics" +_TEST_ARTIFACT_NAME = f"{_TEST_PARENT}/artifacts/{_TEST_ARTIFACT_ID}" +_TEST_OTHER_ARTIFACT_ID = f"{_TEST_EXPERIMENT}-{_TEST_OTHER_RUN}-metrics" +_TEST_OTHER_ARTIFACT_NAME = f"{_TEST_PARENT}/artifacts/{_TEST_OTHER_ARTIFACT_ID}" + +# parameters +_TEST_PARAM_KEY_1 = "learning_rate" +_TEST_PARAM_KEY_2 = "dropout" +_TEST_PARAMS = {_TEST_PARAM_KEY_1: 0.01, _TEST_PARAM_KEY_2: 0.2} +_TEST_OTHER_PARAMS = {_TEST_PARAM_KEY_1: 0.02, _TEST_PARAM_KEY_2: 0.3} + +# metrics +_TEST_METRIC_KEY_1 = "rmse" +_TEST_METRIC_KEY_2 = "accuracy" +_TEST_METRICS = {_TEST_METRIC_KEY_1: 222, _TEST_METRIC_KEY_2: 1} +_TEST_OTHER_METRICS = {_TEST_METRIC_KEY_2: 0.9} + +# classification_metrics +_TEST_CLASSIFICATION_METRICS = { + "display_name": "my-classification-metrics", + "labels": ["cat", "dog"], + "matrix": [[9, 1], [1, 9]], + "fpr": [0.1, 0.5, 0.9], + "tpr": [0.1, 0.7, 0.9], + "threshold": [0.9, 0.5, 0.1], +} + +# schema +_TEST_WRONG_SCHEMA_TITLE = "system.WrongSchema" + +# tf model autologging +_TEST_TF_EXPERIMENT_RUN_PARAMS = { + "batch_size": "None", + "class_weight": "None", + "epochs": "5", + "initial_epoch": "0", + "max_queue_size": "10", + "sample_weight": "None", + "shuffle": "True", + "steps_per_epoch": "None", + "use_multiprocessing": "False", + "validation_batch_size": "None", + "validation_freq": "1", + "validation_split": "0.0", + "validation_steps": "None", + "workers": "1", +} +_TEST_TF_EXPERIMENT_RUN_METRICS = { + "accuracy": 0.0, + "loss": 1.013, +} + +# tensorboard +_TEST_TB_ID = "1028944691210842416" +_TEST_TENSORBOARD_NAME = ( + f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/tensorboards/{_TEST_TB_ID}" +) +_TEST_TB_DISPLAY_NAME = "my_tensorboard_1234" +_TEST_ENCRYPTION_KEY_NAME = "key_1234" +_TEST_ENCRYPTION_SPEC = gca_encryption_spec.EncryptionSpec( + kms_key_name=_TEST_ENCRYPTION_KEY_NAME +) +_TEST_TB_NAME = ( + f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/tensorboards/{_TEST_TB_ID}" +) +_TEST_TENSORBOARD_EXPERIMENT_ID = "test-experiment" +_TEST_TENSORBOARD_EXPERIMENT_NAME = ( + f"{_TEST_TB_NAME}/experiments/{_TEST_TENSORBOARD_EXPERIMENT_ID}" +) + +_TEST_TENSORBOARD_RUN_ID = "run-1" +_TEST_TENSORBOARD_RUN_NAME = ( + f"{_TEST_TENSORBOARD_EXPERIMENT_NAME}/runs/{_TEST_TENSORBOARD_RUN_ID}" +) + +_TEST_TENSORBOARD_RUN = gca_tensorboard_run.TensorboardRun( + name=_TEST_TENSORBOARD_RUN_NAME, + display_name=_TEST_DISPLAY_NAME, +) +_TEST_TIME_SERIES_DISPLAY_NAME = "loss" +_TEST_TIME_SERIES_DISPLAY_NAME_2 = "accuracy" +_TEST_TENSORBOARD_TIME_SERIES_ID = "test-time-series" +_TEST_TENSORBOARD_TIME_SERIES_NAME = ( + f"{_TEST_TENSORBOARD_RUN_NAME}/timeSeries/{_TEST_TENSORBOARD_TIME_SERIES_ID}" +) + +_TEST_TENSORBOARD_TIME_SERIES_LIST = [ + gca_tensorboard_time_series.TensorboardTimeSeries( + name=_TEST_TENSORBOARD_TIME_SERIES_NAME, + display_name=_TEST_TIME_SERIES_DISPLAY_NAME, + value_type=gca_tensorboard_time_series.TensorboardTimeSeries.ValueType.SCALAR, + ), + gca_tensorboard_time_series.TensorboardTimeSeries( + name=_TEST_TENSORBOARD_TIME_SERIES_NAME, + display_name=_TEST_TIME_SERIES_DISPLAY_NAME_2, + value_type=gca_tensorboard_time_series.TensorboardTimeSeries.ValueType.SCALAR, + ), +] + +# mlflow +_TEST_MLFLOW_TRACKING_URI = "file://my-test-tracking-uri" +_TEST_MLFLOW_CREATE_RUN_TIMESTAMP = int(datetime.datetime.now().timestamp()) +_TEST_MLFLOW_RUN_ID = f"tensorflow-{_TEST_MLFLOW_CREATE_RUN_TIMESTAMP}" + +_MOCK_MLFLOW_RUN_INFO = mlflow_entities.RunInfo( + run_uuid=_TEST_MLFLOW_RUN_ID, + run_id=_TEST_MLFLOW_RUN_ID, + experiment_id=_TEST_EXPERIMENT, + user_id="", + status=gca_execution.Execution.State.RUNNING, + start_time=1, + end_time=2, + lifecycle_stage=mlflow_entities.LifecycleStage.ACTIVE, + artifact_uri="file:///tmp/", +) + +_MOCK_MLFLOW_RUN_INFO_COMPLETE = mlflow_entities.RunInfo( + run_uuid=_TEST_MLFLOW_RUN_ID, + run_id=_TEST_MLFLOW_RUN_ID, + experiment_id=_TEST_EXPERIMENT, + user_id="", + status=gca_execution.Execution.State.COMPLETE, + start_time=1, + end_time=2, + lifecycle_stage=mlflow_entities.LifecycleStage.ACTIVE, + artifact_uri="file:///tmp/", +) + + +_MOCK_MLFLOW_RUN_DATA = mlflow_entities.RunData( + metrics=[ + mlflow_entities.Metric(key=k, value=v, step=0, timestamp=0) + for k, v in _TEST_TF_EXPERIMENT_RUN_METRICS.items() + ], + params=[ + mlflow_entities.Param(key=k, value=v) + for k, v in _TEST_TF_EXPERIMENT_RUN_PARAMS.items() + ], + tags={}, +) + + +@pytest.fixture +def mlflow_plugin_create_run_mock(): + with patch.object( + _vertex_mlflow_tracking._VertexMlflowTracking, "create_run" + ) as create_vertex_run_mock: + create_vertex_run_mock.return_value = mlflow_entities.Run( + run_info=_MOCK_MLFLOW_RUN_INFO, run_data=_MOCK_MLFLOW_RUN_DATA + ) + yield create_vertex_run_mock + + +@pytest.fixture +def mlflow_plugin_get_run_mock(): + with patch.object( + _vertex_mlflow_tracking._VertexMlflowTracking, "get_run" + ) as get_vertex_run_mock: + get_vertex_run_mock.return_value = mlflow_entities.Run( + run_info=_MOCK_MLFLOW_RUN_INFO, run_data=_MOCK_MLFLOW_RUN_DATA + ) + yield get_vertex_run_mock + + +@pytest.fixture +def mlflow_plugin_update_run_info_mock(): + with patch.object( + _vertex_mlflow_tracking._VertexMlflowTracking, "update_run_info" + ) as update_run_mock: + update_run_mock.return_value = _MOCK_MLFLOW_RUN_INFO_COMPLETE + yield update_run_mock + + +@pytest.fixture +def mock_experiment_run(): + exp_run_mock = mock.MagicMock(aiplatform.ExperimentRun) + exp_run_mock.run_name = _TEST_MLFLOW_RUN_ID + exp_run_mock.experiment = _EXPERIMENT_MOCK + return exp_run_mock + + +@pytest.fixture +def mlflow_plugin_run_map_mock(mock_experiment_run): + with patch.object( + _vertex_mlflow_tracking._VertexMlflowTracking, + "run_map", + new_callable=mock.PropertyMock, + ) as run_map_mock: + run_map_mock.return_value = { + _TEST_MLFLOW_RUN_ID: _vertex_mlflow_tracking._RunTracker( + autocreate=True, experiment_run=mock_experiment_run + ) + } + yield run_map_mock + + +@pytest.fixture +def mlflow_plugin_vertex_experiment_mock(mock_experiment_run): + with patch.object( + _vertex_mlflow_tracking._VertexMlflowTracking, + "vertex_experiment", + new_callable=mock.PropertyMock, + ) as vertex_experiment_mock: + vertex_experiment_mock.return_value = _EXPERIMENT_MOCK + yield vertex_experiment_mock + + +@pytest.fixture +def get_tensorboard_mock(): + with patch.object( + tensorboard_service_client.TensorboardServiceClient, "get_tensorboard" + ) as get_tensorboard_mock: + get_tensorboard_mock.return_value = gca_tensorboard.Tensorboard( + name=_TEST_TENSORBOARD_NAME, + display_name=_TEST_DISPLAY_NAME, + encryption_spec=_TEST_ENCRYPTION_SPEC, + ) + yield get_tensorboard_mock + + +@pytest.fixture +def get_tensorboard_run_mock(): + with patch.object( + tensorboard_service_client.TensorboardServiceClient, + "get_tensorboard_run", + ) as get_tensorboard_run_mock: + get_tensorboard_run_mock.return_value = _TEST_TENSORBOARD_RUN + yield get_tensorboard_run_mock + + +@pytest.fixture +def list_tensorboard_time_series_mock(): + with patch.object( + tensorboard_service_client.TensorboardServiceClient, + "list_tensorboard_time_series", + ) as list_tensorboard_time_series_mock: + list_tensorboard_time_series_mock.return_value = ( + _TEST_TENSORBOARD_TIME_SERIES_LIST + ) + yield list_tensorboard_time_series_mock + + +create_tensorboard_experiment_mock = test_tensorboard.create_tensorboard_experiment_mock +write_tensorboard_run_data_mock = test_tensorboard.write_tensorboard_run_data_mock +get_tensorboard_time_series_mock = test_tensorboard.get_tensorboard_time_series_mock + +create_tensorboard_run_artifact_mock = ( + test_metadata.create_tensorboard_run_artifact_mock +) +add_context_artifacts_and_executions_mock = ( + test_metadata.add_context_artifacts_and_executions_mock +) + + +@pytest.fixture +def get_metadata_store_mock(): + with patch.object( + MetadataServiceClient, "get_metadata_store" + ) as get_metadata_store_mock: + get_metadata_store_mock.return_value = GapicMetadataStore( + name=_TEST_METADATASTORE, + ) + yield get_metadata_store_mock + + +_TEST_EXPERIMENT_CONTEXT = GapicContext( + name=_TEST_CONTEXT_NAME, + display_name=_TEST_EXPERIMENT, + description=_TEST_EXPERIMENT_DESCRIPTION, + schema_title=constants.SYSTEM_EXPERIMENT, + schema_version=constants.SCHEMA_VERSIONS[constants.SYSTEM_EXPERIMENT], + metadata={ + **constants.EXPERIMENT_METADATA, + constants._BACKING_TENSORBOARD_RESOURCE_KEY: test_tensorboard._TEST_NAME, + }, +) + + +@pytest.fixture +def add_context_children_mock(): + with patch.object( + MetadataServiceClient, "add_context_children" + ) as add_context_children_mock: + yield add_context_children_mock + + +@pytest.fixture +def add_context_artifacts_and_executions_mock(): + with patch.object( + MetadataServiceClient, "add_context_artifacts_and_executions" + ) as add_context_artifacts_and_executions_mock: + add_context_artifacts_and_executions_mock.return_value = ( + AddContextArtifactsAndExecutionsResponse() + ) + yield add_context_artifacts_and_executions_mock + + +@pytest.fixture +def get_tensorboard_run_not_found_mock(): + with patch.object( + TensorboardServiceClient, "get_tensorboard_run" + ) as get_tensorboard_run_mock: + get_tensorboard_run_mock.side_effect = [ + exceptions.NotFound(""), + test_tensorboard._TEST_TENSORBOARD_RUN, + ] + yield get_tensorboard_run_mock + + +@pytest.fixture +def get_tensorboard_experiment_not_found_mock(): + with patch.object( + TensorboardServiceClient, "get_tensorboard_experiment" + ) as get_tensorboard_experiment_mock: + get_tensorboard_experiment_mock.side_effect = [ + exceptions.NotFound(""), + test_tensorboard._TEST_TENSORBOARD_EXPERIMENT, + ] + yield get_tensorboard_experiment_mock + + +@pytest.fixture +def get_artifact_mock(): + with patch.object(MetadataServiceClient, "get_artifact") as get_artifact_mock: + get_artifact_mock.return_value = GapicArtifact( + name=_TEST_ARTIFACT_NAME, + display_name=_TEST_ARTIFACT_ID, + schema_title=constants.SYSTEM_METRICS, + schema_version=constants.SCHEMA_VERSIONS[constants.SYSTEM_METRICS], + ) + yield get_artifact_mock + + +@pytest.fixture +def get_artifact_not_found_mock(): + with patch.object(MetadataServiceClient, "get_artifact") as get_artifact_mock: + get_artifact_mock.side_effect = exceptions.NotFound("") + yield get_artifact_mock + + +@pytest.fixture +def update_context_mock(): + with patch.object(MetadataServiceClient, "update_context") as update_context_mock: + update_context_mock.return_value = _TEST_EXPERIMENT_CONTEXT + yield update_context_mock + + +_TEST_EXPERIMENT_RUN_CONTEXT_NAME = f"{_TEST_PARENT}/contexts/{_TEST_EXECUTION_ID}" +_TEST_OTHER_EXPERIMENT_RUN_CONTEXT_NAME = ( + f"{_TEST_PARENT}/contexts/{_TEST_OTHER_EXECUTION_ID}" +) + +_EXPERIMENT_MOCK = GapicContext( + name=_TEST_CONTEXT_NAME, + display_name=_TEST_EXPERIMENT, + description=_TEST_EXPERIMENT_DESCRIPTION, + schema_title=constants.SYSTEM_EXPERIMENT, + schema_version=constants.SCHEMA_VERSIONS[constants.SYSTEM_EXPERIMENT], + metadata={ + **constants.EXPERIMENT_METADATA, + constants._BACKING_TENSORBOARD_RESOURCE_KEY: _TEST_TENSORBOARD_NAME, + }, +) +_EXPERIMENT_MOCK_WITHOUT_TB_SET = GapicContext( + name=_TEST_CONTEXT_NAME, + display_name=_TEST_EXPERIMENT, + description=_TEST_EXPERIMENT_DESCRIPTION, + schema_title=constants.SYSTEM_EXPERIMENT, + schema_version=constants.SCHEMA_VERSIONS[constants.SYSTEM_EXPERIMENT], + metadata={ + **constants.EXPERIMENT_METADATA, + }, +) +_EXPERIMENT_RUN_MOCK = GapicContext( + name=_TEST_EXPERIMENT_RUN_CONTEXT_NAME, + display_name=_TEST_RUN, + schema_title=constants.SYSTEM_EXPERIMENT_RUN, + schema_version=constants.SCHEMA_VERSIONS[constants.SYSTEM_EXPERIMENT_RUN], + metadata={ + constants._PARAM_KEY: _TEST_TF_EXPERIMENT_RUN_PARAMS, + constants._METRIC_KEY: _TEST_TF_EXPERIMENT_RUN_METRICS, + constants._STATE_KEY: gca_execution.Execution.State.RUNNING.name, + }, +) +_EXPERIMENT_RUN_MOCK_WITH_BACKING_TB = GapicContext( + name=_TEST_EXPERIMENT_RUN_CONTEXT_NAME, + display_name=_TEST_RUN, + schema_title=constants.SYSTEM_EXPERIMENT_RUN, + schema_version=constants.SCHEMA_VERSIONS[constants.SYSTEM_EXPERIMENT_RUN], + metadata={ + constants._PARAM_KEY: _TEST_TF_EXPERIMENT_RUN_PARAMS, + constants._METRIC_KEY: _TEST_TF_EXPERIMENT_RUN_METRICS, + constants._STATE_KEY: gca_execution.Execution.State.RUNNING.name, + **constants.EXPERIMENT_METADATA, + constants._BACKING_TENSORBOARD_RESOURCE_KEY: _TEST_TENSORBOARD_NAME, + }, +) + +_EXPERIMENT_RUN_MOCK_WITH_PARENT_EXPERIMENT = copy.deepcopy(_EXPERIMENT_RUN_MOCK) +_EXPERIMENT_RUN_MOCK_WITH_PARENT_EXPERIMENT.parent_contexts = [_TEST_CONTEXT_NAME] + + +@pytest.fixture +def get_experiment_mock(): + with patch.object(MetadataServiceClient, "get_context") as get_context_mock: + get_context_mock.return_value = _EXPERIMENT_MOCK + yield get_context_mock + + +@pytest.fixture +def get_experiment_mock_without_tensorboard(): + with patch.object(MetadataServiceClient, "get_context") as get_context_mock: + get_context_mock.return_value = _EXPERIMENT_MOCK_WITHOUT_TB_SET + yield get_context_mock + + +@pytest.fixture +def get_experiment_run_run_mock(): + with patch.object(MetadataServiceClient, "get_context") as get_context_mock: + get_context_mock.side_effect = [ + _EXPERIMENT_MOCK, + _EXPERIMENT_RUN_MOCK, + _EXPERIMENT_RUN_MOCK_WITH_PARENT_EXPERIMENT, + ] + + yield get_context_mock + + +@pytest.fixture +def get_experiment_run_mock(): + with patch.object(MetadataServiceClient, "get_context") as get_context_mock: + get_context_mock.side_effect = [ + _EXPERIMENT_MOCK, + _EXPERIMENT_RUN_MOCK_WITH_PARENT_EXPERIMENT, + ] + + yield get_context_mock + + +@pytest.fixture +def create_experiment_context_mock(): + with patch.object(MetadataServiceClient, "create_context") as create_context_mock: + create_context_mock.side_effect = [_TEST_EXPERIMENT_CONTEXT] + yield create_context_mock + + +@pytest.fixture +def create_experiment_run_context_mock(): + with patch.object(MetadataServiceClient, "create_context") as create_context_mock: + create_context_mock.side_effect = [_EXPERIMENT_RUN_MOCK] + yield create_context_mock + + +_TEST_TENSORBOARD_TIME_SERIES = gca_tensorboard_time_series.TensorboardTimeSeries( + name=_TEST_TENSORBOARD_TIME_SERIES_NAME, + display_name=_TEST_TIME_SERIES_DISPLAY_NAME, + value_type=gca_tensorboard_time_series.TensorboardTimeSeries.ValueType.SCALAR, +) + + +@pytest.fixture +def list_tensorboard_time_series_mock_empty(): + with patch.object( + TensorboardServiceClient, + "list_tensorboard_time_series", + ) as list_tensorboard_time_series_mock: + list_tensorboard_time_series_mock.side_effect = [ + [], # initially empty + [], + [_TEST_TENSORBOARD_TIME_SERIES], + ] + yield list_tensorboard_time_series_mock + + +def build_and_train_test_tf_model(): + import tensorflow as tf + + X = np.array( + [ + [1, 1], + [1, 2], + [2, 2], + [2, 3], + [1, 1], + [1, 2], + [2, 2], + [2, 3], + [1, 1], + [1, 2], + [2, 2], + [2, 3], + ] + ) + y = np.dot(X, np.array([1, 2])) + 3 + + model = tf.keras.models.Sequential( + [ + tf.keras.layers.Flatten(input_shape=(2,)), + tf.keras.layers.Dense(128, activation="relu"), + tf.keras.layers.Dropout(0.2), + tf.keras.layers.Dense(1), + ] + ) + + model.compile( + optimizer="adam", + loss=tf.keras.losses.CategoricalCrossentropy(), + metrics=["accuracy"], + ) + + model.fit(X, y, epochs=5) + + +@pytest.mark.usefixtures("google_auth_mock") +class TestAutologging: + def setup_method(self): + reload(initializer) + reload(aiplatform) + + if autologging_utils._is_autologging_enabled(): + aiplatform.autolog(disable=True) + + def teardown_method(self): + initializer.global_pool.shutdown(wait=True) + + @pytest.mark.usefixtures( + "update_context_mock", + "get_tensorboard_time_series_mock", + "get_tensorboard_run_not_found_mock", + "get_tensorboard_experiment_not_found_mock", + "list_tensorboard_time_series_mock", + "list_tensorboard_time_series_mock_empty", + ) + def test_autologging_init( + self, + get_experiment_mock, + get_metadata_store_mock, + get_tensorboard_mock, + ): + + try: + import mlflow # noqa: F401 + except ImportError: + raise ImportError( + "MLFlow is not installed and is required to test autologging. " + 'Please install the SDK using "pip install google-cloud-aiplatform[autologging]"' + ) + try: + import tensorflow as tf # noqa: F401 + except ImportError: + raise ImportError( + "TensorFlow is not installed and is required to test autologging." + 'Please install it before running autologging tests."' + ) + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + experiment=_TEST_EXPERIMENT, + experiment_tensorboard=_TEST_TENSORBOARD_NAME, + ) + + aiplatform.autolog() + + get_tensorboard_mock.assert_called_with( + name=_TEST_TENSORBOARD_NAME, + retry=base._DEFAULT_RETRY, + ) + + assert get_tensorboard_mock.call_count == 1 + + get_experiment_mock.assert_called_once_with( + name=_TEST_CONTEXT_NAME, retry=base._DEFAULT_RETRY + ) + + get_metadata_store_mock.assert_called_once_with( + name=_TEST_METADATASTORE, + retry=base._DEFAULT_RETRY, + ) + + @pytest.mark.usefixtures( + "get_experiment_mock", + "get_metadata_store_mock", + ) + def test_autologging_raises_if_experiment_not_set( + self, + ): + aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION) + + with pytest.raises(ValueError): + aiplatform.autolog() + + @pytest.mark.usefixtures( + "get_experiment_mock_without_tensorboard", + "get_metadata_store_mock", + "update_context_mock", + ) + def test_autologging_raises_if_experiment_tensorboard_not_set( + self, + ): + + # unset the global tensorboard + aiplatform.metadata.metadata._experiment_tracker._global_tensorboard = None + + aiplatform.init( + project=_TEST_PROJECT, location=_TEST_LOCATION, experiment=_TEST_EXPERIMENT + ) + + with pytest.raises(ValueError): + aiplatform.autolog() + + @pytest.mark.usefixtures( + "get_experiment_mock", + "update_context_mock", + "get_metadata_store_mock", + "create_experiment_run_context_mock", + "get_tensorboard_mock", + "get_tensorboard_time_series_mock", + "get_tensorboard_run_not_found_mock", + "get_tensorboard_experiment_not_found_mock", + "list_tensorboard_time_series_mock", + "get_artifact_not_found_mock", + "list_tensorboard_time_series_mock_empty", + ) + def test_autologging_sets_and_resets_mlflow_tracking_uri( + self, + ): + import mlflow # noqa: F401 + + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + experiment=_TEST_EXPERIMENT, + experiment_tensorboard=_TEST_TENSORBOARD_NAME, + ) + mlflow.set_tracking_uri(_TEST_MLFLOW_TRACKING_URI) + + aiplatform.autolog() + + assert mlflow.get_tracking_uri() == "vertex-mlflow-plugin://" + + aiplatform.autolog(disable=True) + + assert mlflow.get_tracking_uri() == _TEST_MLFLOW_TRACKING_URI + + @pytest.mark.usefixtures( + "get_experiment_mock", + "update_context_mock", + "get_metadata_store_mock", + "create_experiment_run_context_mock", + "get_tensorboard_mock", + "get_tensorboard_time_series_mock", + "get_tensorboard_run_not_found_mock", + "get_tensorboard_experiment_not_found_mock", + "list_tensorboard_time_series_mock", + "get_artifact_not_found_mock", + "list_tensorboard_time_series_mock_empty", + ) + def test_autologging_enabled_check( + self, + ): + + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + experiment=_TEST_EXPERIMENT, + experiment_tensorboard=_TEST_TENSORBOARD_NAME, + ) + + aiplatform.autolog() + + assert aiplatform.utils.autologging_utils._is_autologging_enabled() + + aiplatform.autolog(disable=True) + + assert not aiplatform.utils.autologging_utils._is_autologging_enabled() + + @pytest.mark.usefixtures( + "get_experiment_mock", + "update_context_mock", + "get_metadata_store_mock", + "create_experiment_run_context_mock", + "get_tensorboard_mock", + "get_tensorboard_time_series_mock", + "get_tensorboard_run_not_found_mock", + "get_tensorboard_experiment_not_found_mock", + "list_tensorboard_time_series_mock", + "get_artifact_not_found_mock", + "list_tensorboard_time_series_mock_empty", + ) + def test_calling_autolog_with_disable_raises_if_not_enabled( + self, + ): + + import mlflow # noqa: F401 + + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + experiment=_TEST_EXPERIMENT, + experiment_tensorboard=_TEST_TENSORBOARD_NAME, + ) + + with pytest.raises(ValueError): + aiplatform.autolog(disable=True) + + @pytest.mark.usefixtures( + "get_metadata_store_mock", + "add_context_children_mock", + "get_experiment_mock", + "get_experiment_run_run_mock", + "get_tensorboard_mock", + "create_tensorboard_experiment_mock", + "write_tensorboard_run_data_mock", + "get_tensorboard_experiment_not_found_mock", + "get_artifact_not_found_mock", + "list_tensorboard_time_series_mock", + "create_tensorboard_run_artifact_mock", + "get_tensorboard_time_series_mock", + "get_tensorboard_run_mock", + "update_context_mock", + "list_tensorboard_time_series_mock_empty", + "add_context_artifacts_and_executions_mock", + ) + def test_autologging_plugin_autocreates_run_id( + self, + create_experiment_run_context_mock, + ): + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + experiment=_TEST_EXPERIMENT, + experiment_tensorboard=_TEST_TENSORBOARD_NAME, + ) + + aiplatform.autolog() + + build_and_train_test_tf_model() + + # An ExperimentRun should be created with an auto-generated ID + for args, kwargs in create_experiment_run_context_mock.call_args_list: + assert kwargs["context"].display_name.startswith("tensorflow-") + assert kwargs["context_id"].startswith(f"{_TEST_EXPERIMENT}-tensorflow-") + + @pytest.mark.usefixtures( + "get_metadata_store_mock", + "add_context_children_mock", + "get_experiment_mock", + "create_experiment_context_mock", + "get_experiment_run_run_mock", + "get_tensorboard_mock", + "create_tensorboard_experiment_mock", + "write_tensorboard_run_data_mock", + "get_tensorboard_experiment_not_found_mock", + "get_artifact_not_found_mock", + "list_tensorboard_time_series_mock", + "create_tensorboard_run_artifact_mock", + "get_tensorboard_time_series_mock", + "get_tensorboard_run_mock", + "update_context_mock", + "list_tensorboard_time_series_mock_empty", + "add_context_artifacts_and_executions_mock", + "mlflow_plugin_get_run_mock", + "mlflow_plugin_run_map_mock", + "mlflow_plugin_create_run_mock", + "mlflow_plugin_vertex_experiment_mock", + ) + def test_autologging_plugin_with_auto_run_creation( + self, + mlflow_plugin_create_run_mock, + mlflow_plugin_get_run_mock, + mlflow_plugin_update_run_info_mock, + ): + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + experiment=_TEST_EXPERIMENT, + experiment_tensorboard=_TEST_TENSORBOARD_NAME, + ) + + aiplatform.autolog() + + build_and_train_test_tf_model() + + assert mlflow_plugin_create_run_mock.call_count == 1 + + build_and_train_test_tf_model() + + # a subsequent model.fit() call should create another ExperimentRun + assert mlflow_plugin_create_run_mock.call_count == 2 + + assert ( + mlflow_plugin_update_run_info_mock.call_args_list[0][0][0] + == _TEST_MLFLOW_RUN_ID + ) + + # the above model.fit() calls should not result in any data being written locally + assert not os.path.isdir("mlruns") + + # training a model after disabling autologging should not create additional ExperimentRuns + # and the plugin should not be invoked + aiplatform.autolog(disable=True) + build_and_train_test_tf_model() + assert mlflow_plugin_create_run_mock.call_count == 2 + + @pytest.mark.usefixtures( + "get_metadata_store_mock", + "add_context_children_mock", + "get_experiment_mock", + "get_experiment_run_run_mock", + "create_experiment_context_mock", + "get_tensorboard_mock", + "create_tensorboard_experiment_mock", + "write_tensorboard_run_data_mock", + "get_tensorboard_experiment_not_found_mock", + "get_artifact_not_found_mock", + "list_tensorboard_time_series_mock", + "create_tensorboard_run_artifact_mock", + "get_tensorboard_time_series_mock", + "get_tensorboard_run_mock", + "update_context_mock", + "list_tensorboard_time_series_mock_empty", + "add_context_artifacts_and_executions_mock", + ) + def test_autologging_with_manual_run_creation( + self, + create_experiment_run_context_mock, + caplog, + ): + + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + experiment=_TEST_EXPERIMENT, + experiment_tensorboard=_TEST_TENSORBOARD_NAME, + ) + + aiplatform.autolog() + + aiplatform.start_run(_TEST_RUN) + build_and_train_test_tf_model() + assert create_experiment_run_context_mock.call_count == 1 + + # metrics and params from additional training calls will not be logged + # and no new ExperimentRun will be created + # a warning will be logged with details + build_and_train_test_tf_model() + assert create_experiment_run_context_mock.call_count == 1 + assert ( + "Metrics and parameters have already been logged to this run" in caplog.text + ) + + # ending the run and training a new model should result in an auto-created run + aiplatform.end_run() + + build_and_train_test_tf_model() + assert create_experiment_run_context_mock.call_count == 2 + + caplog.clear() + + @pytest.mark.usefixtures( + "get_metadata_store_mock", + "add_context_children_mock", + "get_experiment_mock", + "create_experiment_run_context_mock", + "get_experiment_run_run_mock", + "get_tensorboard_mock", + "create_tensorboard_experiment_mock", + "write_tensorboard_run_data_mock", + "get_tensorboard_experiment_not_found_mock", + "get_artifact_not_found_mock", + "list_tensorboard_time_series_mock", + "create_tensorboard_run_artifact_mock", + "get_tensorboard_time_series_mock", + "get_tensorboard_run_mock", + "update_context_mock", + "list_tensorboard_time_series_mock_empty", + "add_context_artifacts_and_executions_mock", + ) + def test_mlflow_log_filter_only_shows_framework_warning_logs( + self, + caplog, + ): + + import tensorflow # noqa: F401 + + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + experiment=_TEST_EXPERIMENT, + experiment_tensorboard=_TEST_TENSORBOARD_NAME, + ) + + aiplatform.autolog() + + # Tests that no INFO logs are being surfaced from MLFlow + # We can't test for the unsupported version warning log since + # MLFlow changes supported versions regularly + assert "INFO mlflow" not in caplog.text + + caplog.clear() diff --git a/tests/unit/aiplatform/test_logging.py b/tests/unit/aiplatform/test_logging.py index e18cbe623f..010b6b670d 100644 --- a/tests/unit/aiplatform/test_logging.py +++ b/tests/unit/aiplatform/test_logging.py @@ -17,9 +17,15 @@ from google.cloud.aiplatform import base import logging +import pytest +import sys class TestLogging: + @pytest.mark.skipif( + sys.version_info < (3, 8), + reason="requires python3.8 or higher to work with MLFlow", + ) def test_no_root_logging_handler_override(self, caplog): # Users should be able to control the root logger in their apps # The aiplatform module import should not override their root logger config @@ -33,6 +39,10 @@ def test_no_root_logging_handler_override(self, caplog): assert "Info level\n" in caplog.text assert "Critical level\n" in caplog.text + @pytest.mark.skipif( + sys.version_info < (3, 8), + reason="requires python3.8 or higher to work with MLFlow", + ) def test_log_level_coexistance(self, caplog): # The aiplatform module and the root logger can have different log levels. aip_logger = base.Logger(__name__)