Skip to content

Commit

Permalink
Make the promptflow-azure an optional dependency; fix artifact loggin…
Browse files Browse the repository at this point in the history
…g. (#3403)

# Description

- Make promptflow-azure as an optional dependency
- Fix the mlflow-like artifact logging so that it shows in the UI.

# All Promptflow Contribution checklist:
- [x] **The pull request does not introduce [breaking changes].**
- [x] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [x] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [x] Pull request includes test coverage for the included changes.
  • Loading branch information
nick863 authored Jun 20, 2024
1 parent e28c922 commit 00c720f
Show file tree
Hide file tree
Showing 12 changed files with 1,130 additions and 281 deletions.
192 changes: 132 additions & 60 deletions src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import dataclasses
import json
import logging
import os
import posixpath
import requests
import time
import uuid
from typing import Any, Dict, Optional, Type
from urllib.parse import urlparse

import requests
from azure.storage.blob import BlobClient
from azure.storage.blob import BlobServiceClient
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from promptflow.azure._utils._token_cache import ArmTokenCache
from promptflow.evals._version import VERSION
from promptflow._sdk.entities import Run

from azure.ai.ml.entities._credentials import AccountKeyConfiguration
from azure.ai.ml.entities._datastore.datastore import Datastore

LOGGER = logging.getLogger(__name__)

Expand All @@ -30,15 +32,21 @@ class RunInfo:

run_id: str
experiment_id: str
run_name: str

@staticmethod
def generate() -> "RunInfo":
def generate(run_name: Optional[str]) -> 'RunInfo':
"""
Generate the new RunInfo instance with the RunID and Experiment ID.
**Note:** This code is used when we are in failed state and cannot get a run.
:param run_name: The name of a run.
:type run_name: str
"""
return RunInfo(
str(uuid.uuid4()),
str(uuid.uuid4()),
run_name or ""
)


Expand Down Expand Up @@ -79,22 +87,25 @@ class EvalRun(metaclass=Singleton):
:type workspace_name: str
:param ml_client: The ml client used for authentication into Azure.
:type ml_client: MLClient
:param promptflow_run: The promptflow run used by the
"""

_MAX_RETRIES = 5
_BACKOFF_FACTOR = 2
_TIMEOUT = 5
_SCOPE = "https://management.azure.com/.default"

def __init__(
self,
run_name: Optional[str],
tracking_uri: str,
subscription_id: str,
group_name: str,
workspace_name: str,
ml_client,
):
EVALUATION_ARTIFACT = 'instance_results.jsonl'

def __init__(self,
run_name: Optional[str],
tracking_uri: str,
subscription_id: str,
group_name: str,
workspace_name: str,
ml_client: Any,
promptflow_run: Optional[Run] = None,
):
"""
Constructor
"""
Expand All @@ -103,13 +114,29 @@ def __init__(
self._subscription_id: str = subscription_id
self._resource_group_name: str = group_name
self._workspace_name: str = workspace_name
self._ml_client = ml_client
self._url_base = urlparse(self._tracking_uri).netloc
self._is_broken = self._start_run()
self._ml_client: Any = ml_client
self._is_promptflow_run: bool = promptflow_run is not None
self._is_broken = False
if self._tracking_uri is None:
LOGGER.warning("tracking_uri was not provided, "
"The results will be saved locally, but will not be logged to Azure.")
self._url_base = None
self._is_broken = True
self.info = RunInfo.generate(run_name)
else:
self._url_base = urlparse(self._tracking_uri).netloc
if promptflow_run is not None:
self.info = RunInfo(
promptflow_run.name,
promptflow_run._experiment_name,
promptflow_run.name
)
else:
self._is_broken = self._start_run(run_name)

self._is_terminated = False
self.name: str = run_name if run_name else self.info.run_id

def _get_scope(self):
def _get_scope(self) -> str:
"""
Return the scope information for the workspace.
Expand All @@ -126,11 +153,13 @@ def _get_scope(self):
self._workspace_name,
)

def _start_run(self) -> bool:
def _start_run(self, run_name: Optional[str]) -> bool:
"""
Make a request to start the mlflow run. If the run will not start, it will be
marked as broken and the logging will be switched off.
:param run_name: The display name for the run.
:type run_name: Optional[str]
:returns: True if the run has started and False otherwise.
"""
url = f"https://{self._url_base}/mlflow/v2.0" f"{self._get_scope()}/api/2.0/mlflow/runs/create"
Expand All @@ -140,18 +169,23 @@ def _start_run(self) -> bool:
"start_time": int(time.time() * 1000),
"tags": [{"key": "mlflow.user", "value": "promptflow-evals"}],
}
response = self.request_with_retry(url=url, method="POST", json_dict=body)
if run_name:
body["run_name"] = run_name
response = self.request_with_retry(
url=url,
method='POST',
json_dict=body
)
if response.status_code != 200:
self.info = RunInfo.generate()
LOGGER.error(
f"The run failed to start: {response.status_code}: {response.text}."
"The results will be saved locally, but will not be logged to Azure."
)
self.info = RunInfo.generate(run_name)
LOGGER.warning(f"The run failed to start: {response.status_code}: {response.text}."
"The results will be saved locally, but will not be logged to Azure.")
return True
parsed_response = response.json()
self.info = RunInfo(
run_id=parsed_response["run"]["info"]["run_id"],
experiment_id=parsed_response["run"]["info"]["experiment_id"],
run_id=parsed_response['run']['info']['run_id'],
experiment_id=parsed_response['run']['info']['experiment_id'],
run_name=parsed_response['run']['info']['run_name']
)
return False

Expand All @@ -163,6 +197,9 @@ def end_run(self, status: str) -> None:
:type status: str
:raises: ValueError if the run is not in ("FINISHED", "FAILED", "KILLED")
"""
if self._is_promptflow_run:
# This run is already finished, we just add artifacts/metrics to it.
return
if status not in ("FINISHED", "FAILED", "KILLED"):
raise ValueError(
f"Incorrect terminal status {status}. " 'Valid statuses are "FINISHED", "FAILED" and "KILLED".'
Expand All @@ -171,7 +208,7 @@ def end_run(self, status: str) -> None:
LOGGER.warning("Unable to stop run because it was already terminated.")
return
if self._is_broken:
LOGGER.error("Unable to stop run because the run failed to start.")
LOGGER.warning("Unable to stop run because the run failed to start.")
return
url = f"https://{self._url_base}/mlflow/v2.0" f"{self._get_scope()}/api/2.0/mlflow/runs/update"
body = {
Expand All @@ -182,7 +219,7 @@ def end_run(self, status: str) -> None:
}
response = self.request_with_retry(url=url, method="POST", json_dict=body)
if response.status_code != 200:
LOGGER.error("Unable to terminate the run.")
LOGGER.warning("Unable to terminate the run.")
Singleton.destroy(EvalRun)
self._is_terminated = True

Expand Down Expand Up @@ -210,6 +247,9 @@ def get_metrics_url(self):
return f"https://{self._url_base}" "/mlflow/v2.0" f"{self._get_scope()}" f"/api/2.0/mlflow/runs/log-metric"

def _get_token(self):
# We have to use lazy import because promptflow.azure
# is an optional dependency.
from promptflow.azure._utils._token_cache import ArmTokenCache
return ArmTokenCache().get_token(self._ml_client._credential)

def request_with_retry(
Expand Down Expand Up @@ -247,7 +287,7 @@ def request_with_retry(
session.mount("https://", adapter)
return session.request(method, url, headers=headers, json=json_dict, timeout=EvalRun._TIMEOUT)

def _log_error(self, failed_op: str, response: requests.Response) -> None:
def _log_warning(self, failed_op: str, response: requests.Response) -> None:
"""
Log the error if request was not successful.
Expand All @@ -256,7 +296,7 @@ def _log_error(self, failed_op: str, response: requests.Response) -> None:
:param response: The request.
:type response: requests.Response
"""
LOGGER.error(
LOGGER.warning(
f"Unable to {failed_op}, "
f"the request failed with status code {response.status_code}, "
f"{response.text=}."
Expand All @@ -273,51 +313,83 @@ def log_artifact(self, artifact_folder: str) -> None:
:type artifact_folder: str
"""
if self._is_broken:
LOGGER.error("Unable to log artifact because the run failed to start.")
LOGGER.warning("Unable to log artifact because the run failed to start.")
return
# Check if artifact dirrectory is empty or does not exist.
if not os.path.isdir(artifact_folder):
LOGGER.error("The path to the artifact is either not a directory or does not exist.")
LOGGER.warning("The path to the artifact is either not a directory or does not exist.")
return
if not os.listdir(artifact_folder):
LOGGER.error("The path to the artifact is empty.")
LOGGER.warning("The path to the artifact is empty.")
return
if not os.path.isfile(os.path.join(artifact_folder, EvalRun.EVALUATION_ARTIFACT)):
LOGGER.warning("The run results file was not found, skipping artifacts upload.")
return
# First we will list the files and the appropriate remote paths for them.
upload_path = os.path.basename(os.path.normpath(artifact_folder))
remote_paths = {"paths": []}
root_upload_path = posixpath.join("promptflow", 'PromptFlowArtifacts', self.info.run_name)
remote_paths = {'paths': []}
local_paths = []

# Go over the artifact folder and upload all artifacts.
for (root, _, filenames) in os.walk(artifact_folder):
upload_path = root_upload_path
if root != artifact_folder:
rel_path = os.path.relpath(root, artifact_folder)
if rel_path != ".":
upload_path = posixpath.join(upload_path, rel_path)
if rel_path != '.':
upload_path = posixpath.join(root_upload_path, rel_path)
for f in filenames:
remote_file_path = posixpath.join(upload_path, f)
remote_paths["paths"].append({"path": remote_file_path})
local_file_path = os.path.join(root, f)
local_paths.append(local_file_path)
# Now we need to reserve the space for files in the artifact store.
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Content-Length": str(len(json.dumps(remote_paths))),
"x-ms-client-request-id": str(uuid.uuid1()),
}

# We will write the artifacts to the workspaceblobstore
datastore = self._ml_client.datastores.get_default(include_secrets=True)
account_url = f"{datastore.account_name}.blob.{datastore.endpoint}"
svc_client = BlobServiceClient(
account_url=account_url, credential=self._get_datastore_credential(datastore))
for local, remote in zip(local_paths, remote_paths['paths']):
blob_client = svc_client.get_blob_client(
container=datastore.container_name,
blob=remote['path'])
with open(local, 'rb') as fp:
blob_client.upload_blob(fp, overwrite=True)

# To show artifact in UI we will need to register it. If it is a promptflow run,
# we are rewriting already registered artifact and need to skip this step.
if self._is_promptflow_run:
return
url = (
f"https://{self._url_base}/artifact/v2.0/subscriptions/{self._subscription_id}"
f"/resourceGroups/{self._resource_group_name}/providers/"
f"Microsoft.MachineLearningServices/workspaces/{self._workspace_name}/artifacts/register"
)

response = self.request_with_retry(
url=self.get_artifacts_uri(), method="POST", json_dict=remote_paths, headers=headers
url=url,
method="POST",
json_dict={
"origin": "ExperimentRun",
"container": f"dcid.{self.info.run_id}",
"path": EvalRun.EVALUATION_ARTIFACT,
"dataPath": {
"dataStoreName": datastore.name,
"relativePath": posixpath.join(root_upload_path, EvalRun.EVALUATION_ARTIFACT),
},
},
)
if response.status_code != 200:
self._log_error("allocate Blob for the artifact", response)
return
empty_artifacts = response.json()["artifactContentInformation"]
# The response from Azure contains the URL with SAS, that allows to upload file to the
# artifact store.
for local, remote in zip(local_paths, remote_paths["paths"]):
artifact_loc = empty_artifacts[remote["path"]]
blob_client = BlobClient.from_blob_url(artifact_loc["contentUri"], max_single_put_size=32 * 1024 * 1024)
with open(local, "rb") as fp:
blob_client.upload_blob(fp)
self._log_warning('register artifact', response)

def _get_datastore_credential(self, datastore: Datastore):
# Reference the logic in azure.ai.ml._artifact._artifact_utilities
# https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ml/azure-ai-ml/azure/ai/ml/_artifacts/_artifact_utilities.py#L103
credential = datastore.credentials
if isinstance(credential, AccountKeyConfiguration):
return credential.account_key
elif hasattr(credential, "sas_token"):
return credential.sas_token
else:
return self._ml_client.datastores._credential

def log_metric(self, key: str, value: float) -> None:
"""
Expand All @@ -329,7 +401,7 @@ def log_metric(self, key: str, value: float) -> None:
:type value: float
"""
if self._is_broken:
LOGGER.error("Unable to log metric because the run failed to start.")
LOGGER.warning("Unable to log metric because the run failed to start.")
return
body = {
"run_uuid": self.info.run_id,
Expand All @@ -345,7 +417,7 @@ def log_metric(self, key: str, value: float) -> None:
json_dict=body,
)
if response.status_code != 200:
self._log_error("save metrics", response)
self._log_warning("save metrics", response)

@staticmethod
def get_instance(*args, **kwargs) -> "EvalRun":
Expand Down
4 changes: 3 additions & 1 deletion src/promptflow-evals/promptflow/evals/evaluate/_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,9 @@ def _evaluate(
metrics = _aggregate_metrics(evaluators_result_df, evaluators)
metrics.update(evaluators_metric)

studio_url = _log_metrics_and_instance_results(metrics, result_df, trace_destination, target_run)
studio_url = _log_metrics_and_instance_results(
metrics, result_df, trace_destination, target_run, evaluation_name,
)

result = {"rows": result_df.to_dict("records"), "metrics": metrics, "studio_url": studio_url}

Expand Down
Loading

0 comments on commit 00c720f

Please sign in to comment.