From 8546ad9b4bc15c6e469880fbe165e96a04606a47 Mon Sep 17 00:00:00 2001 From: kdestin <101366538+kdestin@users.noreply.github.com> Date: Tue, 27 Aug 2024 14:21:56 -0400 Subject: [PATCH] refactor: Remove networking imports outside azure core (#3683) # Description This pull request removes all networking imports flagged by rule `C4749(networking-import-outside-azure-core-transport)` from [azure-pylint-guidelines-checker](https://github.com/Azure/azure-sdk-tools/tree/2eaf22b95a3a6bbe51380e6efad14c822f1d7d5e/tools/pylint-extensions/azure-pylint-guidelines-checker), and refactors the uses of those imports to do http requests through azure-core. Concretely this pull request: * Adds `src/promptflow-evals/promptflow/evals/_http_utils.py` which includes a sync and async version of azure.core.pipeline.Pipeline that provides a requests-like api for general http requests. * Refactors the sdk and tests to use those pipelines * Removes the dependency on `aiohttp_retry` * Remove both duplicate implementations of `AsyncHTTPClientWithRetry`. ## Background This PR is part of an effort to ready `promptflow-evals` to be migrated to Azure/azure-sdk-for-python. Azure SDKs are disallowed from directly using networking libraries like `requests`, `aiohttp`, etc... They're instead meant to use `azure-core`, which under the hood can delegate to those libraries. # All Promptflow Contribution checklist: - [x] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [x] **I have read the [contribution guidelines](https://github.com/microsoft/promptflow/blob/main/CONTRIBUTING.md).** - [x] **I confirm that all new dependencies are compatible with the MIT license.** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [x] Title of the pull request is clear and informative. - [x] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [x] Pull request includes test coverage for the included changes. --- .../promptflow/evals/_common/rai_service.py | 45 +- .../promptflow/evals/_http_utils.py | 395 ++++++++++++++++++ .../promptflow/evals/evaluate/_eval_run.py | 44 +- .../evals/synthetic/_conversation/__init__.py | 14 +- .../synthetic/_conversation/_conversation.py | 6 +- .../evals/synthetic/_model_tools/__init__.py | 4 +- .../_model_tools/_async_http_client.py | 96 ----- .../_model_tools/_identity_manager.py | 9 +- .../_model_tools/_proxy_completion_model.py | 93 ++--- .../synthetic/_model_tools/_rai_client.py | 26 +- .../evals/synthetic/_model_tools/models.py | 108 ++--- .../evals/synthetic/adversarial_simulator.py | 27 +- src/promptflow-evals/pyproject.toml | 1 - .../test_content_safety_rai_script.py | 180 +++++++- .../tests/evals/unittests/test_eval_run.py | 308 ++++++-------- .../test_synthetic_conversation_bot.py | 17 +- ...st_content_safety_service_unavailable.yaml | 140 ++++++- 17 files changed, 982 insertions(+), 531 deletions(-) create mode 100644 src/promptflow-evals/promptflow/evals/_http_utils.py delete mode 100644 src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_async_http_client.py diff --git a/src/promptflow-evals/promptflow/evals/_common/rai_service.py b/src/promptflow-evals/promptflow/evals/_common/rai_service.py index 529d6063dc5..6c63bcd9e89 100644 --- a/src/promptflow-evals/promptflow/evals/_common/rai_service.py +++ b/src/promptflow-evals/promptflow/evals/_common/rai_service.py @@ -9,7 +9,6 @@ from typing import Dict, List from urllib.parse import urlparse -import httpx import jwt import numpy as np from azure.core.credentials import TokenCredential @@ -22,6 +21,8 @@ from constants import CommonConstants, EvaluationMetrics, RAIService, Tasks from utils import get_harm_severity_level +from promptflow.evals._http_utils import get_async_http_client + try: version = importlib.metadata.version("promptflow-evals") except importlib.metadata.PackageNotFoundError: @@ -61,8 +62,11 @@ async def ensure_service_availability(rai_svc_url: str, token: str, capability: headers = get_common_headers(token) svc_liveness_url = rai_svc_url + "/checkannotation" - async with httpx.AsyncClient() as client: - response = await client.get(svc_liveness_url, headers=headers, timeout=CommonConstants.DEFAULT_HTTP_TIMEOUT) + client = get_async_http_client() + + response = await client.get( # pylint: disable=too-many-function-args,unexpected-keyword-arg + svc_liveness_url, headers=headers, timeout=CommonConstants.DEFAULT_HTTP_TIMEOUT + ) if response.status_code != 200: raise Exception( # pylint: disable=broad-exception-raised @@ -100,8 +104,11 @@ async def submit_request(question: str, answer: str, metric: str, rai_svc_url: s url = rai_svc_url + "/submitannotation" headers = get_common_headers(token) - async with httpx.AsyncClient() as client: - response = await client.post(url, json=payload, headers=headers, timeout=CommonConstants.DEFAULT_HTTP_TIMEOUT) + client = get_async_http_client() + + response = await client.post( # pylint: disable=too-many-function-args,unexpected-keyword-arg + url, json=payload, headers=headers, timeout=CommonConstants.DEFAULT_HTTP_TIMEOUT + ) if response.status_code != 202: print("Fail evaluating '%s' with error message: %s" % (payload["UserTextList"], response.text)) @@ -134,8 +141,11 @@ async def fetch_result(operation_id: str, rai_svc_url: str, credential: TokenCre token = await fetch_or_reuse_token(credential, token) headers = get_common_headers(token) - async with httpx.AsyncClient() as client: - response = await client.get(url, headers=headers, timeout=CommonConstants.DEFAULT_HTTP_TIMEOUT) + client = get_async_http_client() + + response = await client.get( # pylint: disable=too-many-function-args,unexpected-keyword-arg + url, headers=headers, timeout=CommonConstants.DEFAULT_HTTP_TIMEOUT + ) if response.status_code == 200: return response.json() @@ -238,15 +248,18 @@ async def _get_service_discovery_url(azure_ai_project: dict, token: str) -> str: :rtype: str """ headers = get_common_headers(token) - async with httpx.AsyncClient() as client: - response = await client.get( - f"https://management.azure.com/subscriptions/{azure_ai_project['subscription_id']}/" - f"resourceGroups/{azure_ai_project['resource_group_name']}/" - f"providers/Microsoft.MachineLearningServices/workspaces/{azure_ai_project['project_name']}?" - f"api-version=2023-08-01-preview", - headers=headers, - timeout=CommonConstants.DEFAULT_HTTP_TIMEOUT, - ) + + client = get_async_http_client() + + response = await client.get( # pylint: disable=too-many-function-args,unexpected-keyword-arg + f"https://management.azure.com/subscriptions/{azure_ai_project['subscription_id']}/" + f"resourceGroups/{azure_ai_project['resource_group_name']}/" + f"providers/Microsoft.MachineLearningServices/workspaces/{azure_ai_project['project_name']}?" + f"api-version=2023-08-01-preview", + headers=headers, + timeout=CommonConstants.DEFAULT_HTTP_TIMEOUT, + ) + if response.status_code != 200: raise Exception("Failed to retrieve the discovery service URL") # pylint: disable=broad-exception-raised base_url = urlparse(response.json()["properties"]["discoveryUrl"]) diff --git a/src/promptflow-evals/promptflow/evals/_http_utils.py b/src/promptflow-evals/promptflow/evals/_http_utils.py new file mode 100644 index 00000000000..ec73f6fcab8 --- /dev/null +++ b/src/promptflow-evals/promptflow/evals/_http_utils.py @@ -0,0 +1,395 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + + +from functools import wraps +from typing import Any, Awaitable, Callable, Dict, MutableMapping, Optional + +from azure.core.configuration import Configuration +from azure.core.pipeline import AsyncPipeline, Pipeline +from azure.core.pipeline.policies import ( + AsyncRedirectPolicy, + AsyncRetryPolicy, + CustomHookPolicy, + HeadersPolicy, + HttpLoggingPolicy, + NetworkTraceLoggingPolicy, + ProxyPolicy, + RedirectPolicy, + RetryPolicy, + UserAgentPolicy, +) +from azure.core.pipeline.transport import ( # pylint: disable=non-abstract-transport-import,no-name-in-module + AsyncHttpTransport, + AsyncioRequestsTransport, + HttpTransport, + RequestsTransport, +) +from azure.core.rest import AsyncHttpResponse, HttpRequest, HttpResponse +from azure.core.rest._rest_py3 import ContentType, FilesType, ParamsType +from typing_extensions import Self + +from promptflow.evals._user_agent import USER_AGENT + + +def _request_fn(f: Callable[["HttpPipeline"], None]): + """Decorator to generate convenience methods for HTTP method. + + :param Callable[["HttpPipeline"],None] f: A HttpPipeline classmethod to wrap. + The f.__name__ is the HTTP method used + :return: A wrapped callable that sends a `f.__name__` request + :rtype: Callable + """ + + @wraps(f) + def request_fn( + self: "HttpPipeline", + url: str, + *, + params: Optional[ParamsType] = None, + headers: Optional[MutableMapping[str, str]] = None, + json: Any = None, + content: Optional[ContentType] = None, + data: Optional[Dict[str, Any]] = None, + files: Optional[FilesType] = None, + **kwargs, + ) -> HttpResponse: + return self.request( + f.__name__.upper(), + url, + params=params, + headers=headers, + json=json, + content=content, + data=data, + files=files, + **kwargs, + ) + + return request_fn + + +def _async_request_fn(f: Callable[["AsyncHttpPipeline"], Awaitable[None]]): + """Decorator to generate convenience methods for HTTP method. + + :param Callable[["HttpPipeline"],None] f: A HttpPipeline classmethod to wrap. + The f.__name__ is the HTTP method used + :return: A wrapped callable that sends a `f.__name__` request + :rtype: Callable + """ + + @wraps(f) + async def request_fn( + self: "AsyncHttpPipeline", + url: str, + *, + params: Optional[ParamsType] = None, + headers: Optional[MutableMapping[str, str]] = None, + json: Any = None, + content: Optional[ContentType] = None, + data: Optional[Dict[str, Any]] = None, + files: Optional[FilesType] = None, + **kwargs, + ) -> AsyncHttpResponse: + return await self.request( + f.__name__.upper(), + url, + params=params, + headers=headers, + json=json, + content=content, + data=data, + files=files, + **kwargs, + ) + + return request_fn + + +class HttpPipeline(Pipeline): + """A *very* thin wrapper over azure.core.pipeline.Pipeline that facilitates sending miscellaneous http requests by + adding: + + * A requests-style api for sending http requests + * Facilities for populating policies for the client, include defaults, + and re-using policies from an existing client. + """ + + def __init__( + self, + *, + transport: Optional[HttpTransport] = None, + config: Optional[Configuration] = None, + user_agent_policy: Optional[UserAgentPolicy] = None, + headers_policy: Optional[HeadersPolicy] = None, + proxy_policy: Optional[ProxyPolicy] = None, + logging_policy: Optional[NetworkTraceLoggingPolicy] = None, + http_logging_policy: Optional[HttpLoggingPolicy] = None, + retry_policy: Optional[RetryPolicy] = None, + custom_hook_policy: Optional[CustomHookPolicy] = None, + redirect_policy: Optional[RedirectPolicy] = None, + **kwargs, + ): + """ + + :param HttpTransport transport: Http Transport used for requests, defaults to RequestsTransport + :param Configuration config: + :param UserAgentPolicy user_agent_policy: + :param HeadersPolicy headers_policy: + :param ProxyPolicy proxy_policy: + :param NetworkTraceLoggingPolicy logging_policy: + :param HttpLoggingPolicy http_logging_policy: + :param RetryPolicy retry_policy: + :param CustomHookPolicy custom_hook_policy: + :param RedirectPolicy redirect_policy: + """ + config = config or Configuration() + config.headers_policy = headers_policy or config.headers_policy or HeadersPolicy(**kwargs) + config.proxy_policy = proxy_policy or config.proxy_policy or ProxyPolicy(**kwargs) + config.redirect_policy = redirect_policy or config.redirect_policy or RedirectPolicy(**kwargs) + config.retry_policy = retry_policy or config.retry_policy or RetryPolicy(**kwargs) + config.custom_hook_policy = custom_hook_policy or config.custom_hook_policy or CustomHookPolicy(**kwargs) + config.logging_policy = logging_policy or config.logging_policy or NetworkTraceLoggingPolicy(**kwargs) + config.http_logging_policy = http_logging_policy or config.http_logging_policy or HttpLoggingPolicy(**kwargs) + config.user_agent_policy = user_agent_policy or config.user_agent_policy or UserAgentPolicy(**kwargs) + config.polling_interval = kwargs.get("polling_interval", 30) + + super().__init__( + # RequestsTransport normally should not be imported outside of azure.core, since transports + # are meant to be user configurable. + # RequestsTransport is only used in this file as the default transport when not user specified. + transport=transport or RequestsTransport(**kwargs), + policies=[ + config.headers_policy, + config.user_agent_policy, + config.proxy_policy, + config.redirect_policy, + config.retry_policy, + config.authentication_policy, + config.custom_hook_policy, + config.logging_policy, + ], + ) + + self._config = config + + def with_policies(self, **kwargs) -> Self: + """A named constructor which facilitates creating a new pipeline using an existing one as a base. + + Accepts the same parameters as __init__ + + :return: new Pipeline object with combined config of current object + and specified overrides + :rtype: Self + """ + cls = self.__class__ + return cls(config=self._config, transport=kwargs.pop("transport", self._transport), **kwargs) + + def request( + self, + method: str, + url: str, + *, + params: Optional[ParamsType] = None, + headers: Optional[MutableMapping[str, str]] = None, + json: Any = None, + content: Optional[ContentType] = None, + data: Optional[Dict[str, Any]] = None, + files: Optional[FilesType] = None, + **kwargs, + ) -> HttpResponse: + + request = HttpRequest( + method, + url, + params=params, + headers=headers, + json=json, + content=content, + data=data, + files=files, + ) + + return self.run(request, **kwargs).http_response + + @_request_fn + def delete(self) -> None: + """Send a DELETE request.""" + + @_request_fn + def put(self) -> None: + """Send a PUT request.""" + + @_request_fn + def get(self) -> None: + """Send a GET request.""" + + @_request_fn + def post(self) -> None: + """Send a POST request.""" + + @_request_fn + def head(self) -> None: + """Send a HEAD request.""" + + @_request_fn + def options(self) -> None: + """Send a OPTIONS request.""" + + @_request_fn + def patch(self) -> None: + """Send a PATCH request.""" + + +class AsyncHttpPipeline(AsyncPipeline): + """A *very* thin wrapper over azure.core.pipeline.AsyncPipeline that facilitates sending miscellaneous + http requests by adding: + + * A requests-style api for sending http requests + * Facilities for populating policies for the client, include defaults, + and re-using policies from an existing client. + """ + + def __init__( + self, + *, + transport: Optional[AsyncHttpTransport] = None, + config: Optional[Configuration] = None, + user_agent_policy: Optional[UserAgentPolicy] = None, + headers_policy: Optional[HeadersPolicy] = None, + proxy_policy: Optional[ProxyPolicy] = None, + logging_policy: Optional[NetworkTraceLoggingPolicy] = None, + http_logging_policy: Optional[HttpLoggingPolicy] = None, + retry_policy: Optional[AsyncRetryPolicy] = None, + custom_hook_policy: Optional[CustomHookPolicy] = None, + redirect_policy: Optional[AsyncRedirectPolicy] = None, + **kwargs, + ): + """ + + :param HttpTransport transport: Http Transport used for requests, defaults to RequestsTransport + :param Configuration config: + :param UserAgentPolicy user_agent_policy: + :param HeadersPolicy headers_policy: + :param ProxyPolicy proxy_policy: + :param NetworkTraceLoggingPolicy logging_policy: + :param HttpLoggingPolicy http_logging_policy: + :param AsyncRetryPolicy retry_policy: + :param CustomHookPolicy custom_hook_policy: + :param AsyncRedirectPolicy redirect_policy: + """ + config = config or Configuration() + config.headers_policy = headers_policy or config.headers_policy or HeadersPolicy(**kwargs) + config.proxy_policy = proxy_policy or config.proxy_policy or ProxyPolicy(**kwargs) + config.redirect_policy = redirect_policy or config.redirect_policy or AsyncRedirectPolicy(**kwargs) + config.retry_policy = retry_policy or config.retry_policy or AsyncRetryPolicy(**kwargs) + config.custom_hook_policy = custom_hook_policy or config.custom_hook_policy or CustomHookPolicy(**kwargs) + config.logging_policy = logging_policy or config.logging_policy or NetworkTraceLoggingPolicy(**kwargs) + config.http_logging_policy = http_logging_policy or config.http_logging_policy or HttpLoggingPolicy(**kwargs) + config.user_agent_policy = user_agent_policy or config.user_agent_policy or UserAgentPolicy(**kwargs) + config.polling_interval = kwargs.get("polling_interval", 30) + + super().__init__( + # AsyncioRequestsTransport normally should not be imported outside of azure.core, since transports + # are meant to be user configurable. + # AsyncioRequestsTransport is only used in this file as the default transport when not user specified. + transport=transport or AsyncioRequestsTransport(**kwargs), + policies=[ + config.headers_policy, + config.user_agent_policy, + config.proxy_policy, + config.redirect_policy, + config.retry_policy, + config.authentication_policy, + config.custom_hook_policy, + config.logging_policy, + ], + ) + + self._config = config + + def with_policies(self, **kwargs) -> Self: + """A named constructor which facilitates creating a new pipeline using an existing one as a base. + + Accepts the same parameters as __init__ + + :return: new Pipeline object with combined config of current object + and specified overrides + :rtype: Self + """ + cls = self.__class__ + return cls(config=self._config, transport=kwargs.pop("transport", self._transport), **kwargs) + + async def request( + self, + method: str, + url: str, + *, + params: Optional[ParamsType] = None, + headers: Optional[MutableMapping[str, str]] = None, + json: Any = None, + content: Optional[ContentType] = None, + data: Optional[Dict[str, Any]] = None, + files: Optional[FilesType] = None, + **kwargs, + ) -> AsyncHttpResponse: + + request = HttpRequest( + method, + url, + params=params, + headers=headers, + json=json, + content=content, + data=data, + files=files, + ) + + return (await self.run(request, **kwargs)).http_response + + @_async_request_fn + async def delete(self) -> None: + """Send a DELETE request.""" + + @_async_request_fn + async def put(self) -> None: + """Send a PUT request.""" + + @_async_request_fn + async def get(self) -> None: + """Send a GET request.""" + + @_async_request_fn + async def post(self) -> None: + """Send a POST request.""" + + @_async_request_fn + async def head(self) -> None: + """Send a HEAD request.""" + + @_async_request_fn + async def options(self) -> None: + """Send a OPTIONS request.""" + + @_async_request_fn + async def patch(self) -> None: + """Send a PATCH request.""" + + +def get_http_client() -> HttpPipeline: + """Get an HttpPipeline configured with common policies. + + :returns: An HttpPipeline with a set of applied policies: + :rtype: HttpPipeline + """ + return HttpPipeline(user_agent_policy=UserAgentPolicy(base_user_agent=USER_AGENT)) + + +def get_async_http_client() -> AsyncHttpPipeline: + """Get an AsyncHttpPipeline configured with common policies. + + :returns: An AsyncHttpPipeline with a set of applied policies: + :rtype: AsyncHttpPipeline + """ + return AsyncHttpPipeline(user_agent_policy=UserAgentPolicy(base_user_agent=USER_AGENT)) diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py index e98f6d050b8..88207e93239 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_eval_run.py @@ -12,11 +12,11 @@ from typing import Any, Dict, Optional, Set from urllib.parse import urlparse -import requests -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry +from azure.core.pipeline.policies import RetryPolicy +from azure.core.rest import HttpResponse from promptflow._sdk.entities import Run +from promptflow.evals._http_utils import get_http_client from promptflow.evals._version import VERSION LOGGER = logging.getLogger(__name__) @@ -25,7 +25,7 @@ # Handle optional import. The azure libraries are only present if # promptflow-azure is installed. try: - from azure.ai.ml.entities._credentials import AccountKeyConfiguration + from azure.ai.ml.entities._credentials import AccountKeyConfiguration # pylint: disable=ungrouped-imports from azure.ai.ml.entities._datastore.datastore import Datastore from azure.storage.blob import BlobServiceClient except (ModuleNotFoundError, ImportError): @@ -180,7 +180,7 @@ def _start_run(self) -> None: if response.status_code != 200: self.info = RunInfo.generate(self._run_name) LOGGER.warning( - f"The run failed to start: {response.status_code}: {response.text}." + f"The run failed to start: {response.status_code}: {response.text()}." "The results will be saved locally, but will not be logged to Azure." ) self._status = RunStatus.BROKEN @@ -279,7 +279,7 @@ def _get_token(self): def request_with_retry( self, url: str, method: str, json_dict: Dict[str, Any], headers: Optional[Dict[str, str]] = None - ) -> requests.Response: + ) -> HttpResponse: """ Send the request with retries. @@ -292,40 +292,38 @@ def request_with_retry( :param headers: The headers to be sent with the request. :type headers: Optional[Dict[str, str]] :return: The response - :rtype: requests.Response + :rtype: HttpResponse """ if headers is None: headers = {} headers["User-Agent"] = f"promptflow/{VERSION}" headers["Authorization"] = f"Bearer {self._get_token()}" - retry = Retry( - total=EvalRun._MAX_RETRIES, - connect=EvalRun._MAX_RETRIES, - read=EvalRun._MAX_RETRIES, - redirect=EvalRun._MAX_RETRIES, - status=EvalRun._MAX_RETRIES, - status_forcelist=(408, 429, 500, 502, 503, 504), - backoff_factor=EvalRun._BACKOFF_FACTOR, - allowed_methods=None, + + session = get_http_client().with_policies( + retry_policy=RetryPolicy( + retry_total=EvalRun._MAX_RETRIES, + retry_connect=EvalRun._MAX_RETRIES, + retry_read=EvalRun._MAX_RETRIES, + retry_status=EvalRun._MAX_RETRIES, + retry_on_status_codes=(408, 429, 500, 502, 503, 504), + retry_backoff_factor=EvalRun._BACKOFF_FACTOR, + ) ) - adapter = HTTPAdapter(max_retries=retry) - session = requests.Session() - session.mount("https://", adapter) return session.request(method, url, headers=headers, json=json_dict, timeout=EvalRun._TIMEOUT) - def _log_warning(self, failed_op: str, response: requests.Response) -> None: + def _log_warning(self, failed_op: str, response: HttpResponse) -> None: """ Log the error if request was not successful. :param failed_op: The user-friendly message for the failed operation. :type failed_op: str :param response: The request. - :type response: requests.Response + :type response: HttpResponse """ LOGGER.warning( f"Unable to {failed_op}, " f"the request failed with status code {response.status_code}, " - f"{response.text=}." + f"{response.text()=}." ) def _check_state_and_log(self, action: str, bad_states: Set[RunStatus], should_raise: bool) -> bool: @@ -482,4 +480,4 @@ def write_properties_to_run_history(self, properties: Dict[str, Any]) -> None: json_dict={"runId": self.info.run_id, "properties": properties}, ) if response.status_code != 200: - LOGGER.error("Fail writing properties '%s' to run history: %s", properties, response.text) + LOGGER.error("Fail writing properties '%s' to run history: %s", properties, response.text()) diff --git a/src/promptflow-evals/promptflow/evals/synthetic/_conversation/__init__.py b/src/promptflow-evals/promptflow/evals/synthetic/_conversation/__init__.py index 41b5936729c..25de120164a 100644 --- a/src/promptflow-evals/promptflow/evals/synthetic/_conversation/__init__.py +++ b/src/promptflow-evals/promptflow/evals/synthetic/_conversation/__init__.py @@ -11,7 +11,9 @@ import jinja2 -from .._model_tools import LLMBase, OpenAIChatCompletionsModel, RetryClient +from promptflow.evals._http_utils import AsyncHttpPipeline + +from .._model_tools import LLMBase, OpenAIChatCompletionsModel from .constants import ConversationRole @@ -33,6 +35,7 @@ class ConversationTurn: :param request: The request. :type request: Optional[Any] """ + role: "ConversationRole" name: Optional[str] = None message: str = "" @@ -140,7 +143,7 @@ def __init__( async def generate_response( self, - session: RetryClient, + session: AsyncHttpPipeline, conversation_history: List[ConversationTurn], max_history: int, turn_number: int = 0, @@ -148,8 +151,8 @@ async def generate_response( """ Prompt the ConversationBot for a response. - :param session: The aiohttp session to use for the request. - :type session: RetryClient + :param session: AsyncHttpPipeline to use for the request. + :type session: AsyncHttpPipeline :param conversation_history: The turns in the conversation so far. :type conversation_history: List[ConversationTurn] :param max_history: Parameters used to query GPT-4 model. @@ -229,6 +232,7 @@ class CallbackConversationBot(ConversationBot): :param kwargs: Optional keyword arguments to pass to the parent class. :type kwargs: Any """ + def __init__( self, callback: Callable, @@ -245,7 +249,7 @@ def __init__( async def generate_response( self, - session: "RetryClient", + session: AsyncHttpPipeline, conversation_history: List[Any], max_history: int, turn_number: int = 0, diff --git a/src/promptflow-evals/promptflow/evals/synthetic/_conversation/_conversation.py b/src/promptflow-evals/promptflow/evals/synthetic/_conversation/_conversation.py index 2e62c45b2b6..351be3d925d 100644 --- a/src/promptflow-evals/promptflow/evals/synthetic/_conversation/_conversation.py +++ b/src/promptflow-evals/promptflow/evals/synthetic/_conversation/_conversation.py @@ -6,7 +6,7 @@ import logging from typing import Callable, Dict, List, Tuple, Union -from .._model_tools import RetryClient +from ..._http_utils import AsyncHttpPipeline from . import ConversationBot, ConversationTurn @@ -61,7 +61,7 @@ def is_closing_message_helper(response: str) -> bool: async def simulate_conversation( bots: List[ConversationBot], - session: RetryClient, + session: AsyncHttpPipeline, stopping_criteria: Callable[[str], bool] = is_closing_message, turn_limit: int = 10, history_limit: int = 5, @@ -74,7 +74,7 @@ async def simulate_conversation( :param bots: List of ConversationBot instances participating in the conversation. :type bots: List[ConversationBot] :param session: The session to use for making API calls. - :type session: RetryClient + :type session: AsyncHttpPipeline :param stopping_criteria: A callable that determines when the conversation should stop. :type stopping_criteria: Callable[[str], bool] :param turn_limit: The maximum number of turns in the conversation. Defaults to 10. diff --git a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/__init__.py b/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/__init__.py index 039478e7bfb..045b4c85c1a 100644 --- a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/__init__.py +++ b/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/__init__.py @@ -8,7 +8,7 @@ from ._proxy_completion_model import ProxyChatCompletionsModel from ._rai_client import RAIClient from ._template_handler import CONTENT_HARM_TEMPLATES_COLLECTION_KEY, AdversarialTemplateHandler -from .models import AsyncHTTPClientWithRetry, LLMBase, OpenAIChatCompletionsModel, RetryClient +from .models import LLMBase, OpenAIChatCompletionsModel __all__ = [ "ManagedIdentityAPITokenManager", @@ -20,6 +20,4 @@ "ProxyChatCompletionsModel", "LLMBase", "OpenAIChatCompletionsModel", - "RetryClient", - "AsyncHTTPClientWithRetry", ] diff --git a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_async_http_client.py b/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_async_http_client.py deleted file mode 100644 index ae3c4ef445f..00000000000 --- a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_async_http_client.py +++ /dev/null @@ -1,96 +0,0 @@ -# --------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# --------------------------------------------------------- - -import logging - -from aiohttp import TraceConfig -from aiohttp_retry import RandomRetry, RetryClient - - -class AsyncHTTPClientWithRetry: - """Async HTTP client with retry configuration and request logging. - - :param n_retry: Number of retries to attempt - :type n_retry: int - :param retry_timeout: Timeout between retries, in seconds - :type retry_timeout: int - :param logger: The logger object to use for request logging - :type logger: logging.Logger - :param retry_options: The retry options. Defaults to None. - :type retry_options: Optional[aiohttp_retry.retry_options.BaseRandomRetry] - """ - def __init__(self, n_retry: int, retry_timeout: int, logger: logging.Logger, retry_options=None): - self.attempts = n_retry - self.logger = logger - - # Set up async HTTP client with retry - - trace_config = TraceConfig() # set up request logging - trace_config.on_request_end.append(self.delete_auth_header) - if retry_options is None: - retry_options = RandomRetry( # set up retry configuration - statuses=[104, 408, 409, 429, 500, 502, 503, 504], # on which statuses to retry - attempts=n_retry, - min_timeout=retry_timeout, - max_timeout=retry_timeout, - ) - - self.client = RetryClient(trace_configs=[trace_config], retry_options=retry_options) - - async def on_request_start(self, session, trace_config_ctx, params): # pylint: disable=unused-argument - """Build a new trace context from the config and log the request. - - :param session: The aiohttp client session. This parameter is not used in this method; - however, it must be included to match the method signature of the parent class. - :type session: aiohttp.ClientSession - :param trace_config_ctx: The trace config context - :type trace_config_ctx: Any - :param params: The request parameters - :type params: Any - """ - current_attempt = trace_config_ctx.trace_request_ctx["current_attempt"] - self.logger.info("[ATTEMPT %s] Sending %s request to %s" % (current_attempt, params.method, params.url)) - - async def delete_auth_header(self, session, trace_config_ctx, params): # pylint: disable=unused-argument - """Delete authorization header from request headers - - If set, the "Authorization" and "api-key" headers is removed from the request headers. - - :param session: The aiohttp client session. This parameter is not used in this method; - however, it must be included to match the method signature of the parent class. - :type session: aiohttp.ClientSession - :param trace_config_ctx: The trace config context. This parameter is not used in this method; - however, it must be included to match the method signature of the parent class. - :type trace_config_ctx: Any - :param params: The request parameters - :type params: Any - """ - request_headers = dict(params.response.request_info.headers) - if "Authorization" in request_headers: - del request_headers["Authorization"] - if "api-key" in request_headers: - del request_headers["api-key"] - - async def on_request_end(self, session, trace_config_ctx, params): # pylint: disable=unused-argument - """ - Retrieve current request trace and log the response. - - :param session: The aiohttp client session. This parameter is not used in this method; - however, it must be included to match the method signature of the parent class. - :type session: aiohttp.ClientSession - :param trace_config_ctx: The trace config context - :type trace_config_ctx: Any - :param params: The request parameters - :type params: Any - """ - current_attempt = trace_config_ctx.trace_request_ctx["current_attempt"] - request_headers = dict(params.response.request_info.headers) - if "Authorization" in request_headers: - del request_headers["Authorization"] # hide auth token from logs - if "api-key" in request_headers: - del request_headers["api-key"] - self.logger.info( - "[ATTEMPT %s] For %s request to %s, received response with status %s and request headers: %s" - % (current_attempt, params.method, params.url, params.response.status, request_headers) - ) diff --git a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_identity_manager.py b/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_identity_manager.py index cff70f37839..d4614b57f9d 100644 --- a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_identity_manager.py +++ b/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_identity_manager.py @@ -17,6 +17,7 @@ class TokenScope(Enum): """Token scopes for Azure endpoints""" + DEFAULT_AZURE_MANAGEMENT = "https://management.azure.com/.default" @@ -30,10 +31,12 @@ class APITokenManager(ABC): :param credential: Azure credential object :type credential: Optional[Union[azure.identity.DefaultAzureCredential, azure.identity.ManagedIdentityCredential] """ + def __init__( - self, logger: logging.Logger, + self, + logger: logging.Logger, auth_header: str = "Bearer", - credential: Optional[Union[DefaultAzureCredential, ManagedIdentityCredential]] = None + credential: Optional[Union[DefaultAzureCredential, ManagedIdentityCredential]] = None, ) -> None: self.logger = logger self.auth_header = auth_header @@ -96,6 +99,7 @@ class ManagedIdentityAPITokenManager(APITokenManager): :keyword kwargs: Additional keyword arguments :paramtype kwargs: Dict """ + def __init__(self, token_scope: TokenScope, logger: logging.Logger, **kwargs: Dict): super().__init__(logger, **kwargs) self.token_scope = token_scope @@ -129,6 +133,7 @@ class PlainTokenManager(APITokenManager): :keyword kwargs: Optional keyword arguments :paramtype kwargs: Dict """ + def __init__(self, openapi_key: str, logger: logging.Logger, **kwargs: Dict): super().__init__(logger, **kwargs) self.token = openapi_key diff --git a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_proxy_completion_model.py b/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_proxy_completion_model.py index 708aafa8446..8ae7587746e 100644 --- a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_proxy_completion_model.py +++ b/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_proxy_completion_model.py @@ -4,17 +4,17 @@ import asyncio import copy import json -import logging import time import uuid from typing import Dict, List -from aiohttp.web import HTTPException -from aiohttp_retry import JitterRetry, RetryClient +from azure.core.exceptions import HttpResponseError +from azure.core.pipeline.policies import AsyncRetryPolicy, RetryMode +from promptflow.evals._http_utils import AsyncHttpPipeline, get_async_http_client from promptflow.evals._user_agent import USER_AGENT -from .models import AsyncHTTPClientWithRetry, OpenAIChatCompletionsModel +from .models import OpenAIChatCompletionsModel class SimulationRequestDTO: @@ -99,7 +99,7 @@ def format_request_data(self, messages: List[Dict], **request_params) -> Dict: async def get_conversation_completion( self, messages: List[Dict], - session: RetryClient, + session: AsyncHttpPipeline, role: str = "assistant", # pylint: disable=unused-argument **request_params, ) -> dict: @@ -109,8 +109,8 @@ async def get_conversation_completion( :param messages: List of messages to query the model with. Expected format: [{"role": "user", "content": "Hello!"}, ...] :type messages: List[Dict] - :param session: aiohttp RetryClient object to query the model with. - :type session: ~promptflow.evals.synthetic._model_tools.RetryClient + :param session: AsyncHttpPipeline object to query the model with. + :type session: ~promptflow.evals._http_utils.AsyncHttpPipeline :param role: The role of the user sending the message. This parameter is not used in this method; however, it must be included to match the method signature of the parent class. Defaults to "assistant". :type role: str @@ -130,14 +130,14 @@ async def get_conversation_completion( async def request_api( self, - session: RetryClient, + session: AsyncHttpPipeline, request_data: dict, ) -> dict: """ Request the model with a body of data. :param session: HTTPS Session for invoking the endpoint. - :type session: RetryClient + :type session: AsyncHttpPipeline :param request_data: Prompt dictionary to query the model with. (Pass {"prompt": prompt} instead of prompt.) :type request_data: Dict[str, Any] :return: A body of data resulting from the model query. @@ -178,54 +178,45 @@ async def request_api( time_start = time.time() full_response = None - async with session.post( - url=self.endpoint_url, headers=proxy_headers, json=sim_request_dto.to_dict() - ) as response: - if response.status == 202: - response = await response.json() - self.result_url = response["location"] - else: - raise HTTPException( - reason=f"Received unexpected HTTP status: {response.status} {await response.text()}" - ) - - retry_options = JitterRetry( # set up retry configuration - statuses=[202], # on which statuses to retry - attempts=7, - start_timeout=10, - max_timeout=180, - retry_all_server_errors=False, - ) + response = await session.post(url=self.endpoint_url, headers=proxy_headers, json=sim_request_dto.to_dict()) + + if response.status_code != 202: + raise HttpResponseError( + message=f"Received unexpected HTTP status: {response.status} {await response.text()}", response=response + ) + + response = response.json() + self.result_url = response["location"] - exp_retry_client = AsyncHTTPClientWithRetry( - n_retry=None, - retry_timeout=None, - logger=logging.getLogger(), - retry_options=retry_options, + retry_policy = AsyncRetryPolicy( # set up retry configuration + retry_on_status_codes=[202], # on which statuses to retry + retry_total=7, + retry_backoff_factor=10.0, + retry_backoff_max=180, + retry_mode=RetryMode.Exponential, ) + exp_retry_client = get_async_http_client().with_policies(retry_policy=retry_policy) + # initial 10 seconds wait before attempting to fetch result await asyncio.sleep(10) - async with exp_retry_client.client as expsession: - async with expsession.get(url=self.result_url, headers=proxy_headers) as response: - if response.status == 200: - response_data = await response.json() - self.logger.info("Response: %s", response_data) - - # Copy the full response and return it to be saved in jsonl. - full_response = copy.copy(response_data) - - time_taken = time.time() - time_start - - # pylint: disable=unexpected-keyword-arg - parsed_response = self._parse_response( # type: ignore[call-arg] - response_data, request_data=request_data - ) - else: - raise HTTPException( - reason=f"Received unexpected HTTP status: {response.status} {await response.text()}" - ) + response = await exp_retry_client.get( # pylint: disable=too-many-function-args,unexpected-keyword-arg + self.result_url, headers=proxy_headers + ) + + response.raise_for_status() + + response_data = response.json() + self.logger.info("Response: %s", response_data) + + # Copy the full response and return it to be saved in jsonl. + full_response = copy.copy(response_data) + + time_taken = time.time() - time_start + + # pylint: disable=unexpected-keyword-arg + parsed_response = self._parse_response(response_data, request_data=request_data) # type: ignore[call-arg] return { "request": request_data, diff --git a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_rai_client.py b/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_rai_client.py index 447373c883f..36b49b0f9f1 100644 --- a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_rai_client.py +++ b/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/_rai_client.py @@ -1,16 +1,15 @@ # --------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- -import logging import os from typing import Any, Dict from urllib.parse import urljoin, urlparse -import requests +from azure.core.pipeline.policies import AsyncRetryPolicy, RetryMode +from promptflow.evals._http_utils import AsyncHttpPipeline, get_async_http_client, get_http_client from promptflow.evals._user_agent import USER_AGENT -from ._async_http_client import AsyncHTTPClientWithRetry from ._identity_manager import APITokenManager api_url = None @@ -60,7 +59,8 @@ def __init__(self, azure_ai_project: Dict, token_manager: APITokenManager) -> No def _get_service_discovery_url(self): bearer_token = self.token_manager.get_token() headers = {"Authorization": f"Bearer {bearer_token}", "Content-Type": "application/json"} - response = requests.get( + http_client = get_http_client() + response = http_client.get( # pylint: disable=too-many-function-args,unexpected-keyword-arg f"https://management.azure.com/subscriptions/{self.azure_ai_project['subscription_id']}/" f"resourceGroups/{self.azure_ai_project['resource_group_name']}/" f"providers/Microsoft.MachineLearningServices/workspaces/{self.azure_ai_project['project_name']}?" @@ -73,15 +73,17 @@ def _get_service_discovery_url(self): base_url = urlparse(response.json()["properties"]["discoveryUrl"]) return f"{base_url.scheme}://{base_url.netloc}" - def _create_async_client(self) -> AsyncHTTPClientWithRetry: + def _create_async_client(self) -> AsyncHttpPipeline: """Create an async http client with retry mechanism Number of retries is set to 6, and the timeout is set to 5 seconds. :return: The async http client - :rtype: ~promptflow.evals.synthetic._model_tools._async_http_client.AsyncHTTPClientWithRetry + :rtype: ~promptflow.evals._http_utils.AsyncHttpPipeline """ - return AsyncHTTPClientWithRetry(n_retry=6, retry_timeout=5, logger=logging.getLogger()) + return get_async_http_client().with_policies( + retry_policy=AsyncRetryPolicy(retry_total=6, retry_backoff_factor=5, retry_mode=RetryMode.Fixed) + ) async def get_contentharm_parameters(self) -> Any: """Get the content harm parameters, if they exist""" @@ -113,11 +115,11 @@ async def get(self, url: str) -> Any: "User-Agent": USER_AGENT, } - async with self._create_async_client().client as session: - async with session.get(url=url, headers=headers) as response: - if response.status == 200: - response = await response.json() - return response + session = self._create_async_client() + response = await session.get(url=url, headers=headers) # pylint: disable=unexpected-keyword-arg + + if response.status_code == 200: + return response.json() raise ValueError( "Azure safety evaluation service is not available in your current region, " diff --git a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/models.py b/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/models.py index 123f1276fb6..8d87964c557 100644 --- a/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/models.py +++ b/src/promptflow-evals/promptflow/evals/synthetic/_model_tools/models.py @@ -12,9 +12,7 @@ from typing import Deque, Dict, List, Optional, Union from urllib.parse import urlparse -from aiohttp import TraceConfig -from aiohttp.web import HTTPException -from aiohttp_retry import RandomRetry, RetryClient +from promptflow.evals._http_utils import AsyncHttpPipeline from ._identity_manager import APITokenManager @@ -34,52 +32,6 @@ def get_model_class_from_url(endpoint_url: str): raise ValueError(f"Unknown API type for endpoint {endpoint_url}") -# ===================== HTTP Retry ====================== -class AsyncHTTPClientWithRetry: - def __init__(self, n_retry, retry_timeout, logger, retry_options=None): - self.attempts = n_retry - self.logger = logger - - # Set up async HTTP client with retry - - trace_config = TraceConfig() # set up request logging - trace_config.on_request_end.append(self.delete_auth_header) - # trace_config.on_request_start.append(self.on_request_start) - # trace_config.on_request_end.append(self.on_request_end) - if retry_options is None: - retry_options = RandomRetry( # set up retry configuration - statuses=[104, 408, 409, 424, 429, 500, 502, 503, 504], # on which statuses to retry - attempts=n_retry, - min_timeout=retry_timeout, - max_timeout=retry_timeout, - ) - - self.client = RetryClient(trace_configs=[trace_config], retry_options=retry_options) - - async def on_request_start(self, session, trace_config_ctx, params): - current_attempt = trace_config_ctx.trace_request_ctx["current_attempt"] - self.logger.info("[ATTEMPT %s] Sending %s request to %s" % (current_attempt, params.method, params.url)) - - async def delete_auth_header(self, session, trace_config_ctx, params): - request_headers = dict(params.response.request_info.headers) - if "Authorization" in request_headers: - del request_headers["Authorization"] - if "api-key" in request_headers: - del request_headers["api-key"] - - async def on_request_end(self, session, trace_config_ctx, params): - current_attempt = trace_config_ctx.trace_request_ctx["current_attempt"] - request_headers = dict(params.response.request_info.headers) - if "Authorization" in request_headers: - del request_headers["Authorization"] # hide auth token from logs - if "api-key" in request_headers: - del request_headers["api-key"] - self.logger.info( - "[ATTEMPT %s] For %s request to %s, received response with status %s and request headers: %s" - % (current_attempt, params.method, params.url, params.response.status, request_headers) - ) - - # =========================================================== # ===================== LLMBase Class ======================= # =========================================================== @@ -119,7 +71,7 @@ def format_request_data(self, prompt: str, **request_params) -> dict: async def get_completion( self, prompt: str, - session: RetryClient, + session: AsyncHttpPipeline, **request_params, ) -> dict: """ @@ -128,7 +80,7 @@ async def get_completion( Parameters ---------- prompt: Prompt str to query model with. - session: aiohttp RetryClient object to use for the request. + session: AsyncHttpPipeline object to use for the request. **request_params: Additional parameters to pass to the request. """ request_data = self.format_request_data(prompt, **request_params) @@ -141,7 +93,7 @@ async def get_completion( async def get_all_completions( self, prompts: List[str], - session: RetryClient, + session: AsyncHttpPipeline, api_call_max_parallel_count: int, api_call_delay_seconds: float, request_error_rate_threshold: float, @@ -152,7 +104,7 @@ async def get_all_completions( @abstractmethod async def request_api( self, - session: RetryClient, + session: AsyncHttpPipeline, request_data: dict, ) -> dict: pass @@ -161,7 +113,7 @@ async def request_api( async def get_conversation_completion( self, messages: List[dict], - session: RetryClient, + session: AsyncHttpPipeline, role: str, **request_params, ) -> dict: @@ -172,7 +124,7 @@ async def request_api_parallel( self, request_datas: List[dict], output_collector: List, - session: RetryClient, + session: AsyncHttpPipeline, api_call_delay_seconds: float, request_error_rate_threshold: float, ) -> None: @@ -315,7 +267,7 @@ def format_request_data(self, prompt: str, **request_params) -> Dict[str, str]: async def get_conversation_completion( self, messages: List[dict], - session: RetryClient, + session: AsyncHttpPipeline, role: str = "assistant", **request_params, ) -> dict: @@ -326,7 +278,7 @@ async def get_conversation_completion( ---------- messages: List of messages to query the model with. Expected format: [{"role": "user", "content": "Hello!"}, ...] - session: aiohttp RetryClient object to query the model with. + session: AsyncHttpPipeline object to query the model with. role: Role of the user sending the message. request_params: Additional parameters to pass to the model. """ @@ -345,7 +297,7 @@ async def get_conversation_completion( async def get_all_completions( # type: ignore[override] self, prompts: List[Dict[str, str]], - session: RetryClient, + session: AsyncHttpPipeline, api_call_max_parallel_count: int = 1, api_call_delay_seconds: float = 0.1, request_error_rate_threshold: float = 0.5, @@ -357,7 +309,7 @@ async def get_all_completions( # type: ignore[override] Parameters ---------- prompts: List of prompts to query the model with. - session: aiohttp RetryClient to use for the request. + session: AsyncHttpPipeline to use for the request. api_call_max_parallel_count: Number of parallel requests to make to the API. api_call_delay_seconds: Number of seconds to wait between API requests. request_error_rate_threshold: Maximum error rate allowed before raising an error. @@ -406,7 +358,7 @@ async def request_api_parallel( self, request_datas: List[dict], output_collector: List, - session: RetryClient, + session: AsyncHttpPipeline, api_call_delay_seconds: float = 0.1, request_error_rate_threshold: float = 0.5, ) -> None: @@ -461,7 +413,7 @@ async def request_api_parallel( async def request_api( self, - session: RetryClient, + session: AsyncHttpPipeline, request_data: dict, ) -> dict: """ @@ -503,21 +455,21 @@ async def request_api( time_start = time.time() full_response = None - async with session.post(url=self.endpoint_url, headers=headers, json=request_data, params=params) as response: - if response.status == 200: - response_data = await response.json() - self.logger.info(f"Response: {response_data}") - # Copy the full response and return it to be saved in jsonl. - full_response = copy.copy(response_data) + response = await session.post(url=self.endpoint_url, headers=headers, json=request_data, params=params) - time_taken = time.time() - time_start + response.raise_for_status() - parsed_response = self._parse_response(response_data, request_data=request_data) - else: - raise HTTPException( - reason=f"Received unexpected HTTP status: {response.status} {await response.text()}" - ) + response_data = response.json() + + self.logger.info(f"Response: {response_data}") + + # Copy the full response and return it to be saved in jsonl. + full_response = copy.copy(response_data) + + time_taken = time.time() - time_start + + parsed_response = self._parse_response(response_data, request_data=request_data) return { "request": request_data, @@ -561,7 +513,7 @@ def format_request_data(self, messages: List[dict], **request_params): # type: async def get_conversation_completion( self, messages: List[dict], - session: RetryClient, + session: AsyncHttpPipeline, role: str = "assistant", **request_params, ) -> dict: @@ -572,7 +524,7 @@ async def get_conversation_completion( ---------- messages: List of messages to query the model with. Expected format: [{"role": "user", "content": "Hello!"}, ...] - session: aiohttp RetryClient object to query the model with. + session: AsyncHttpPipeline object to query the model with. role: Not used for this model, since it is a chat model. request_params: Additional parameters to pass to the model. """ @@ -588,7 +540,7 @@ async def get_conversation_completion( async def get_completion( self, prompt: str, - session: RetryClient, + session: AsyncHttpPipeline, **request_params, ) -> dict: """ @@ -597,7 +549,7 @@ async def get_completion( Parameters ---------- prompt: Prompt str to query model with. - session: aiohttp RetryClient object to use for the request. + session: AsyncHttpPipeline object to use for the request. **request_params: Additional parameters to pass to the request. """ messages = [{"role": "system", "content": prompt}] @@ -611,7 +563,7 @@ async def get_completion( async def get_all_completions( self, prompts: List[str], # type: ignore[override] - session: RetryClient, + session: AsyncHttpPipeline, api_call_max_parallel_count: int = 1, api_call_delay_seconds: float = 0.1, request_error_rate_threshold: float = 0.5, diff --git a/src/promptflow-evals/promptflow/evals/synthetic/adversarial_simulator.py b/src/promptflow-evals/promptflow/evals/synthetic/adversarial_simulator.py index a6d92933672..825a5a428bf 100644 --- a/src/promptflow-evals/promptflow/evals/synthetic/adversarial_simulator.py +++ b/src/promptflow-evals/promptflow/evals/synthetic/adversarial_simulator.py @@ -8,17 +8,18 @@ import random from typing import Any, Callable, Dict, List +from azure.core.pipeline.policies import AsyncRetryPolicy, RetryMode from azure.identity import DefaultAzureCredential from tqdm import tqdm from promptflow._sdk._telemetry import ActivityType, monitor_operation +from promptflow.evals._http_utils import get_async_http_client from promptflow.evals.synthetic.adversarial_scenario import AdversarialScenario from ._conversation import CallbackConversationBot, ConversationBot, ConversationRole from ._conversation._conversation import simulate_conversation from ._model_tools import ( AdversarialTemplateHandler, - AsyncHTTPClientWithRetry, ManagedIdentityAPITokenManager, ProxyChatCompletionsModel, RAIClient, @@ -285,19 +286,21 @@ async def _simulate_async( target=target, role=ConversationRole.ASSISTANT, template=template, parameters=parameters ) bots = [user_bot, system_bot] - asyncHttpClient = AsyncHTTPClientWithRetry( - n_retry=api_call_retry_limit, - retry_timeout=api_call_retry_sleep_sec, - logger=logger, + session = get_async_http_client().with_policies( + retry_policy=AsyncRetryPolicy( + retry_total=api_call_retry_limit, + retry_backoff_factor=api_call_retry_sleep_sec, + retry_mode=RetryMode.Fixed, + ) ) + async with semaphore: - async with asyncHttpClient.client as session: - _, conversation_history = await simulate_conversation( - bots=bots, - session=session, - turn_limit=max_conversation_turns, - api_call_delay_sec=api_call_delay_sec, - ) + _, conversation_history = await simulate_conversation( + bots=bots, + session=session, + turn_limit=max_conversation_turns, + api_call_delay_sec=api_call_delay_sec, + ) return self._to_chat_protocol(conversation_history=conversation_history, template_parameters=parameters) def _get_user_proxy_completion_model(self, template_key, template_parameters): diff --git a/src/promptflow-evals/pyproject.toml b/src/promptflow-evals/pyproject.toml index b826012e732..794037c3698 100644 --- a/src/promptflow-evals/pyproject.toml +++ b/src/promptflow-evals/pyproject.toml @@ -41,7 +41,6 @@ packages = [ python = "<4.0,>=3.8" promptflow-devkit = "<2.0.0,>=1.15.0" promptflow-core = "<2.0.0,>=1.15.0" -aiohttp_retry = ">=2.8.3" websocket-client = ">=1.2.0" jsonpath_ng = ">=1.5.0" urllib3 = ">1.26.17" diff --git a/src/promptflow-evals/tests/evals/unittests/test_content_safety_rai_script.py b/src/promptflow-evals/tests/evals/unittests/test_content_safety_rai_script.py index a2113b1061c..c87c06f6c05 100644 --- a/src/promptflow-evals/tests/evals/unittests/test_content_safety_rai_script.py +++ b/src/promptflow-evals/tests/evals/unittests/test_content_safety_rai_script.py @@ -1,10 +1,13 @@ +import http import os import pathlib +from typing import Any, Iterator, MutableMapping, Optional from unittest.mock import MagicMock, patch -import httpx import numpy as np import pytest +from azure.core.exceptions import HttpResponseError +from azure.core.rest import AsyncHttpResponse, HttpRequest from azure.identity import DefaultAzureCredential from promptflow.evals._common.constants import EvaluationMetrics, HarmSeverityLevel, RAIService @@ -26,6 +29,95 @@ def data_file(): return os.path.join(data_path, "evaluate_test_data.jsonl") +class MockAsyncHttpResponse(AsyncHttpResponse): + """A mocked implementation of azure.core.rest.HttpResponse.""" + + def __init__( + self, + status_code: int, + *, + text: Optional[str] = None, + json: Optional[Any] = None, + headers: Optional[MutableMapping[str, str]] = None, + request: Optional[HttpRequest] = None, + content_type: Optional[str] = None, + ) -> None: + self._status_code = status_code + self._text = text or "" + self._json = json + self._request = request + self._headers = headers or {} + self._content_type = content_type + + def json(self) -> Any: + return self._json + + def text(self, encoding: Optional[str] = None) -> str: + return self._text + + @property + def status_code(self) -> int: + return self._status_code + + @property + def request(self) -> HttpRequest: + return self._request + + @property + def reason(self) -> str: + return f"{self.status_code} {http.client.responses[self.status_code]}" + + @property + def headers(self) -> MutableMapping[str, str]: + return self._headers + + @property + def content_type(self) -> Optional[str]: + return self._content_type + + @property + def is_closed(self) -> bool: + return True + + @property + def is_stream_consumed(self) -> bool: + return True + + @property + def encoding(self) -> Optional[str]: + return None + + def raise_for_status(self) -> None: + if self.status_code >= 400: + raise HttpResponseError(response=self) + + async def close(self) -> None: + pass + + async def __aenter__(self) -> object: + raise NotImplementedError() + + async def __aexit__(self, *args) -> None: + raise NotImplementedError() + + @property + def url(self) -> str: + raise NotImplementedError() + + @property + def content(self) -> bytes: + raise NotImplementedError() + + async def read(self) -> bytes: + raise NotImplementedError() + + async def iter_bytes(self, **kwargs) -> Iterator[bytes]: + raise NotImplementedError() + + async def iter_raw(self, **kwargs) -> Iterator[bytes]: + raise NotImplementedError() + + @pytest.mark.usefixtures("mock_project_scope") @pytest.mark.unittest class TestContentSafetyEvaluator: @@ -43,26 +135,33 @@ def test_rai_subscript_functions(self): ensure_service_availability()""" @pytest.mark.asyncio - @patch("httpx.AsyncClient.get", return_value=httpx.Response(200, json={})) + @patch("promptflow.evals._http_utils.AsyncHttpPipeline.get", return_value=MockAsyncHttpResponse(200, json={})) async def test_ensure_service_availability(self, client_mock): _ = await ensure_service_availability("dummy_url", "dummy_token") - client_mock.return_value.status_code = 9001 + assert client_mock._mock_await_count == 1 + + @pytest.mark.asyncio + @patch("promptflow.evals._http_utils.AsyncHttpPipeline.get", return_value=MockAsyncHttpResponse(9001, json={})) + async def test_ensure_service_availability_service_unavailable(self, client_mock): with pytest.raises(Exception) as exc_info: _ = await ensure_service_availability("dummy_url", "dummy_token") assert "RAI service is not available in this region. Status Code: 9001" in str(exc_info._excinfo[1]) - client_mock.return_value.status_code = 200 + assert client_mock._mock_await_count == 1 + + @pytest.mark.asyncio + @patch("promptflow.evals._http_utils.AsyncHttpPipeline.get", return_value=MockAsyncHttpResponse(200, json={})) + async def test_ensure_service_availability_exception_capability_unavailable(self, client_mock): with pytest.raises(Exception) as exc_info: _ = await ensure_service_availability("dummy_url", "dummy_token", capability="does not exist") assert "Capability 'does not exist' is not available in this region" in str(exc_info._excinfo[1]) - assert client_mock._mock_await_count == 3 + assert client_mock._mock_await_count == 1 @pytest.mark.asyncio @patch( - "httpx.AsyncClient.post", - return_value=httpx.Response( + "promptflow.evals._http_utils.AsyncHttpPipeline.post", + return_value=MockAsyncHttpResponse( 202, json={"location": "this/is/the/dummy-operation-id"}, - request=httpx.Request("POST", "test"), ), ) async def test_submit_request(self, client_mock): @@ -74,8 +173,18 @@ async def test_submit_request(self, client_mock): token="dummy", ) assert result == "dummy-operation-id" - client_mock.return_value.status_code = 404 - with pytest.raises(httpx.HTTPStatusError) as exc_info: + + @pytest.mark.asyncio + @patch( + "promptflow.evals._http_utils.AsyncHttpPipeline.post", + return_value=MockAsyncHttpResponse( + 404, + json={"location": "this/is/the/dummy-operation-id"}, + content_type="application/json", + ), + ) + async def test_submit_request_not_found(self, client_mock): + with pytest.raises(HttpResponseError) as exc_info: _ = await submit_request( question="What is the meaning of life", answer="42", @@ -83,8 +192,7 @@ async def test_submit_request(self, client_mock): rai_svc_url="www.notarealurl.com", token="dummy", ) - assert "Client error '404 Not Found' for url 'test'" in str(exc_info._excinfo[1]) - assert client_mock._mock_await_count == 2 + assert "Operation returned an invalid status '404 Not Found'" in str(exc_info._excinfo[1]) @pytest.mark.usefixtures("mock_token") @pytest.mark.usefixtures("mock_expired_token") @@ -102,7 +210,10 @@ async def test_fetch_or_reuse_token(self, mock_token, mock_expired_token): res = await fetch_or_reuse_token(credential=mock_cred, token="not-a-token") assert res == 100 - @patch("httpx.AsyncClient.get", return_value=httpx.Response(200, json={"result": "stuff"})) + @patch( + "promptflow.evals._http_utils.AsyncHttpPipeline.get", + return_value=MockAsyncHttpResponse(200, json={"result": "stuff"}), + ) @patch("promptflow.evals._common.constants.RAIService.TIMEOUT", 1) @patch("promptflow.evals._common.constants.RAIService.SLEEP_TIME", 1.2) @pytest.mark.usefixtures("mock_token") @@ -118,13 +229,21 @@ async def test_fetch_result(self, client_mock, mock_token): assert client_mock._mock_await_count == 1 assert res["result"] == "stuff" - client_mock.return_value.status_code = 404 + @patch( + "promptflow.evals._http_utils.AsyncHttpPipeline.get", + return_value=MockAsyncHttpResponse(404, json={"result": "stuff"}), + ) + @patch("promptflow.evals._common.constants.RAIService.TIMEOUT", 1) + @patch("promptflow.evals._common.constants.RAIService.SLEEP_TIME", 1.2) + @pytest.mark.usefixtures("mock_token") + @pytest.mark.asyncio + async def test_fetch_result_timeout(self, client_mock, mock_token): with pytest.raises(TimeoutError) as exc_info: _ = await fetch_result( operation_id="op-id", rai_svc_url="www.notarealurl.com", credential=None, token=mock_token ) - # We expect 2 more calls; the initial call, then one more ~2 seconds later. - assert client_mock._mock_await_count == 3 + # We expect 2 calls; the initial call, then one more ~2 seconds later. + assert client_mock._mock_await_count == 2 # Don't bother checking exact time beyond seconds, that's never going to be consistent across machines. assert "Fetching annotation result 2 times out after 1" in str(exc_info._excinfo[1]) @@ -201,8 +320,10 @@ def test_parse_response(self): @pytest.mark.asyncio @patch( - "httpx.AsyncClient.get", - return_value=httpx.Response(200, json={"properties": {"discoveryUrl": "https://www.url.com:123/thePath"}}), + "promptflow.evals._http_utils.AsyncHttpPipeline.get", + return_value=MockAsyncHttpResponse( + 200, json={"properties": {"discoveryUrl": "https://www.url.com:123/thePath"}} + ), ) async def test_get_service_discovery_url(self, client_mock): @@ -216,16 +337,31 @@ async def test_get_service_discovery_url(self, client_mock): url = await _get_service_discovery_url(azure_ai_project=azure_ai_project, token=token) assert url == "https://www.url.com:123" - client_mock.return_value.status_code = 201 + @pytest.mark.asyncio + @patch( + "promptflow.evals._http_utils.AsyncHttpPipeline.get", + return_value=MockAsyncHttpResponse( + 201, json={"properties": {"discoveryUrl": "https://www.url.com:123/thePath"}} + ), + ) + async def test_get_service_discovery_url_exception(self, client_mock): + token = "fake-token" + azure_ai_project = { + "subscription_id": "fake-id", + "project_name": "fake-name", + "resource_group_name": "fake-group", + } + with pytest.raises(Exception) as exc_info: _ = await _get_service_discovery_url(azure_ai_project=azure_ai_project, token=token) assert "Failed to retrieve the discovery service URL" in str(exc_info._excinfo[1]) - assert client_mock._mock_await_count == 2 @pytest.mark.asyncio @patch( - "httpx.AsyncClient.get", - return_value=httpx.Response(200, json={"properties": {"discoveryUrl": "https://www.url.com:123/thePath"}}), + "promptflow.evals._http_utils.AsyncHttpPipeline.get", + return_value=MockAsyncHttpResponse( + 200, json={"properties": {"discoveryUrl": "https://www.url.com:123/thePath"}} + ), ) @patch( "promptflow.evals._common.rai_service._get_service_discovery_url", diff --git a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py index 1ba8e2983de..ce124747151 100644 --- a/src/promptflow-evals/tests/evals/unittests/test_eval_run.py +++ b/src/promptflow-evals/tests/evals/unittests/test_eval_run.py @@ -41,16 +41,10 @@ def _get_mock_create_resonse(self, status=200): mock_response = MagicMock() mock_response.status_code = status if status != 200: - mock_response.text = "Mock error" + mock_response.text = lambda: "Mock error" else: mock_response.json.return_value = { - "run": { - "info": { - "run_id": str(uuid4()), - "experiment_id": str(uuid4()), - "run_name": str(uuid4()) - } - } + "run": {"info": {"run_id": str(uuid4()), "experiment_id": str(uuid4()), "run_name": str(uuid4())}} } return mock_response @@ -58,7 +52,7 @@ def _get_mock_end_response(self, status=200): """Get the mock end run response.""" mock_response = MagicMock() mock_response.status_code = status - mock_response.text = 'Everything good' if status == 200 else 'Everything bad' + mock_response.text = lambda: "Everything good" if status == 200 else "Everything bad" return mock_response @pytest.mark.parametrize( @@ -66,13 +60,8 @@ def _get_mock_end_response(self, status=200): ) def test_end_raises(self, token_mock, status, should_raise, caplog): """Test that end run raises exception if incorrect status is set.""" - mock_session = MagicMock() - mock_session.request.return_value = self._get_mock_create_resonse() - with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): - with EvalRun( - run_name=None, - **TestEvalRun._MOCK_CREDS - ) as run: + with patch("promptflow.evals._http_utils.HttpPipeline.request", return_value=self._get_mock_create_resonse()): + with EvalRun(run_name=None, **TestEvalRun._MOCK_CREDS) as run: if should_raise: with pytest.raises(ValueError) as cm: run._end_run(status) @@ -83,9 +72,7 @@ def test_end_raises(self, token_mock, status, should_raise, caplog): def test_run_logs_if_terminated(self, token_mock, caplog): """Test that run warn user if we are trying to terminate it twice.""" - mock_session = MagicMock() - mock_session.request.return_value = self._get_mock_create_resonse() - with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): + with patch("promptflow.evals._http_utils.HttpPipeline.request", return_value=self._get_mock_create_resonse()): logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be @@ -107,10 +94,10 @@ def test_run_logs_if_terminated(self, token_mock, caplog): def test_end_logs_if_fails(self, token_mock, caplog): """Test that if the terminal status setting was failed, it is logged.""" - mock_session = MagicMock() - mock_session.request.side_effect = [self._get_mock_create_resonse(), - self._get_mock_end_response(500)] - with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): + with patch( + "promptflow.evals._http_utils.HttpPipeline.request", + side_effect=[self._get_mock_create_resonse(), self._get_mock_end_response(500)], + ): logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be @@ -130,12 +117,10 @@ def test_end_logs_if_fails(self, token_mock, caplog): def test_start_run_fails(self, token_mock, caplog): """Test that there are log messges if run was not started.""" - mock_session = MagicMock() mock_response_start = MagicMock() mock_response_start.status_code = 500 - mock_response_start.text = "Mock internal service error." - mock_session.request.return_value = mock_response_start - with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): + mock_response_start.text = lambda: "Mock internal service error." + with patch("promptflow.evals._http_utils.HttpPipeline.request", return_value=mock_response_start): logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be @@ -152,7 +137,7 @@ def test_start_run_fails(self, token_mock, caplog): run._start_run() assert len(caplog.records) == 1 assert "500" in caplog.records[0].message - assert mock_response_start.text in caplog.records[0].message + assert mock_response_start.text() in caplog.records[0].message assert "The results will be saved locally" in caplog.records[0].message caplog.clear() # Log artifact @@ -171,62 +156,47 @@ def test_start_run_fails(self, token_mock, caplog): assert "Unable to stop run due to Run status=RunStatus.BROKEN." in caplog.records[0].message caplog.clear() - @patch("promptflow.evals.evaluate._eval_run.requests.Session") - def test_run_name(self, mock_session_cls, token_mock): + def test_run_name(self, token_mock): """Test that the run name is the same as ID if name is not given.""" - mock_session = MagicMock() mock_response = self._get_mock_create_resonse() - mock_session.request.return_value = mock_response - mock_session_cls.return_value = mock_session - with EvalRun( - run_name=None, - tracking_uri="www.microsoft.com", - subscription_id="mock", - group_name="mock", - workspace_name="mock", - ml_client=MagicMock(), - ) as run: - pass - assert run.info.run_id == mock_response.json.return_value['run']['info']['run_id'] - assert run.info.experiment_id == mock_response.json.return_value[ - 'run']['info']['experiment_id'] - assert run.info.run_name == mock_response.json.return_value['run']['info']["run_name"] + with patch("promptflow.evals._http_utils.HttpPipeline.request", return_value=mock_response): + with EvalRun( + run_name=None, + tracking_uri="www.microsoft.com", + subscription_id="mock", + group_name="mock", + workspace_name="mock", + ml_client=MagicMock(), + ) as run: + pass + assert run.info.run_id == mock_response.json.return_value["run"]["info"]["run_id"] + assert run.info.experiment_id == mock_response.json.return_value["run"]["info"]["experiment_id"] + assert run.info.run_name == mock_response.json.return_value["run"]["info"]["run_name"] - @patch("promptflow.evals.evaluate._eval_run.requests.Session") - def test_run_with_name(self, mock_session_cls, token_mock): + def test_run_with_name(self, token_mock): """Test that the run name is not the same as id if it is given.""" mock_response = self._get_mock_create_resonse() - mock_response.json.return_value['run']['info']['run_name'] = 'test' - mock_session = MagicMock() - mock_session.request.return_value = mock_response - mock_session_cls.return_value = mock_session - with EvalRun( - run_name="test", - tracking_uri="www.microsoft.com", - subscription_id="mock", - group_name="mock", - workspace_name="mock", - ml_client=MagicMock(), - ) as run: - pass - assert run.info.run_id == mock_response.json.return_value['run']['info']['run_id'] - assert run.info.experiment_id == mock_response.json.return_value[ - 'run']['info']['experiment_id'] - assert run.info.run_name == 'test' + mock_response.json.return_value["run"]["info"]["run_name"] = "test" + with patch("promptflow.evals._http_utils.HttpPipeline.request", return_value=mock_response): + with EvalRun( + run_name="test", + tracking_uri="www.microsoft.com", + subscription_id="mock", + group_name="mock", + workspace_name="mock", + ml_client=MagicMock(), + ) as run: + pass + assert run.info.run_id == mock_response.json.return_value["run"]["info"]["run_id"] + assert run.info.experiment_id == mock_response.json.return_value["run"]["info"]["experiment_id"] + assert run.info.run_name == "test" assert run.info.run_name != run.info.run_id - @patch("promptflow.evals.evaluate._eval_run.requests.Session") - def test_get_urls(self, mock_session_cls, token_mock): + def test_get_urls(self, token_mock): """Test getting url-s from eval run.""" - mock_response = self._get_mock_create_resonse() - mock_session = MagicMock() - mock_session.request.return_value = mock_response - mock_session_cls.return_value = mock_session - with EvalRun( - run_name="test", - **TestEvalRun._MOCK_CREDS - ) as run: - pass + with patch("promptflow.evals._http_utils.HttpPipeline.request", return_value=self._get_mock_create_resonse()): + with EvalRun(run_name="test", **TestEvalRun._MOCK_CREDS) as run: + pass assert run.get_run_history_uri() == ( "https://region.api.azureml.ms/history/v1.0/subscriptions" "/000000-0000-0000-0000-0000000/resourceGroups/mock-rg-region" @@ -254,39 +224,38 @@ def test_get_urls(self, mock_session_cls, token_mock): ) def test_log_artifacts_logs_error(self, token_mock, tmp_path, caplog, log_function, expected_str): """Test that the error is logged.""" - mock_session = MagicMock() mock_response = MagicMock() mock_response.status_code = 404 - mock_response.text = "Mock not found error." + mock_response.text = lambda: "Mock not found error." if log_function == "log_artifact": with open(os.path.join(tmp_path, "test.json"), "w") as fp: json.dump({"f1": 0.5}, fp) - mock_session.request.side_effect = [ - self._get_mock_create_resonse(), - mock_response, - self._get_mock_end_response() - ] - with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): + + with patch( + "promptflow.evals._http_utils.HttpPipeline.request", + side_effect=[ + self._get_mock_create_resonse(), + mock_response, + self._get_mock_end_response(), + ], + ): logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be # captured by caplog. Here we will skip this logger to capture logs. logger.parent = logging.root - with EvalRun( - run_name="test", - **TestEvalRun._MOCK_CREDS - ) as run: + with EvalRun(run_name="test", **TestEvalRun._MOCK_CREDS) as run: fn = getattr(run, log_function) - if log_function == 'log_artifact': - with open(os.path.join(tmp_path, EvalRun.EVALUATION_ARTIFACT), 'w') as fp: - fp.write('42') - kwargs = {'artifact_folder': tmp_path} + if log_function == "log_artifact": + with open(os.path.join(tmp_path, EvalRun.EVALUATION_ARTIFACT), "w") as fp: + fp.write("42") + kwargs = {"artifact_folder": tmp_path} else: - kwargs = {'key': 'f1', 'value': 0.5} - with patch('promptflow.evals.evaluate._eval_run.BlobServiceClient', return_value=MagicMock()): + kwargs = {"key": "f1", "value": 0.5} + with patch("promptflow.evals.evaluate._eval_run.BlobServiceClient", return_value=MagicMock()): fn(**kwargs) assert len(caplog.records) == 1 - assert mock_response.text in caplog.records[0].message + assert mock_response.text() in caplog.records[0].message assert "404" in caplog.records[0].message assert expected_str in caplog.records[0].message @@ -308,13 +277,8 @@ def test_wrong_artifact_path( expected_error, ): """Test that if artifact path is empty, or dies not exist we are logging the error.""" - mock_session = MagicMock() - mock_session.request.return_value = self._get_mock_create_resonse() - with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): - with EvalRun( - run_name="test", - **TestEvalRun._MOCK_CREDS - ) as run: + with patch("promptflow.evals._http_utils.HttpPipeline.request", return_value=self._get_mock_create_resonse()): + with EvalRun(run_name="test", **TestEvalRun._MOCK_CREDS) as run: logger = logging.getLogger(EvalRun.__module__) # All loggers, having promptflow. prefix will have "promptflow" logger # as a parent. This logger does not propagate the logs and cannot be @@ -322,7 +286,7 @@ def test_wrong_artifact_path( logger.parent = logging.root artifact_folder = tmp_path if dir_exists else "wrong_path_567" if not dir_empty: - with open(os.path.join(tmp_path, "test.txt"), 'w') as fp: + with open(os.path.join(tmp_path, "test.txt"), "w") as fp: fp.write("42") run.log_artifact(artifact_folder) assert len(caplog.records) == 1 @@ -355,51 +319,50 @@ def test_run_broken_if_no_tracking_uri(self, token_mock, caplog): with EvalRun( run_name=None, tracking_uri=None, - subscription_id='mock', - group_name='mock', - workspace_name='mock', - ml_client=MagicMock() + subscription_id="mock", + group_name="mock", + workspace_name="mock", + ml_client=MagicMock(), ) as run: assert len(caplog.records) == 1 assert "The results will be saved locally, but will not be logged to Azure." in caplog.records[0].message - with patch('promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry') as mock_request: - run.log_artifact('mock_dir') - run.log_metric('foo', 42) - run.write_properties_to_run_history({'foo': 'bar'}) + with patch("promptflow.evals.evaluate._eval_run.EvalRun.request_with_retry") as mock_request: + run.log_artifact("mock_dir") + run.log_metric("foo", 42) + run.write_properties_to_run_history({"foo": "bar"}) mock_request.assert_not_called() - @pytest.mark.parametrize('status_code,pf_run', [ - (401, False), - (200, False), - (401, True), - (200, True), - ]) + @pytest.mark.parametrize( + "status_code,pf_run", + [ + (401, False), + (200, False), + (401, True), + (200, True), + ], + ) def test_lifecycle(self, token_mock, status_code, pf_run): """Test the run statuses throughout its life cycle.""" pf_run_mock = None if pf_run: pf_run_mock = MagicMock() - pf_run_mock.name = 'mock_pf_run' - pf_run_mock._experiment_name = 'mock_pf_experiment' - mock_session = MagicMock() - mock_session.request.return_value = self._get_mock_create_resonse(status_code) - with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): - run = EvalRun( - run_name="test", - **TestEvalRun._MOCK_CREDS, - promptflow_run=pf_run_mock - ) - assert run.status == RunStatus.NOT_STARTED, f'Get {run.status}, expected {RunStatus.NOT_STARTED}' + pf_run_mock.name = "mock_pf_run" + pf_run_mock._experiment_name = "mock_pf_experiment" + with patch( + "promptflow.evals._http_utils.HttpPipeline.request", return_value=self._get_mock_create_resonse(status_code) + ): + run = EvalRun(run_name="test", **TestEvalRun._MOCK_CREDS, promptflow_run=pf_run_mock) + assert run.status == RunStatus.NOT_STARTED, f"Get {run.status}, expected {RunStatus.NOT_STARTED}" run._start_run() if status_code == 200 or pf_run: - assert run.status == RunStatus.STARTED, f'Get {run.status}, expected {RunStatus.STARTED}' + assert run.status == RunStatus.STARTED, f"Get {run.status}, expected {RunStatus.STARTED}" else: - assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' + assert run.status == RunStatus.BROKEN, f"Get {run.status}, expected {RunStatus.BROKEN}" run._end_run("FINISHED") if status_code == 200 or pf_run: - assert run.status == RunStatus.TERMINATED, f'Get {run.status}, expected {RunStatus.TERMINATED}' + assert run.status == RunStatus.TERMINATED, f"Get {run.status}, expected {RunStatus.TERMINATED}" else: - assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' + assert run.status == RunStatus.BROKEN, f"Get {run.status}, expected {RunStatus.BROKEN}" def test_local_lifecycle(self, token_mock): """Test that the local run have correct statuses.""" @@ -411,34 +374,28 @@ def test_local_lifecycle(self, token_mock): workspace_name="mock", ml_client=MagicMock(), ) - assert run.status == RunStatus.NOT_STARTED, f'Get {run.status}, expected {RunStatus.NOT_STARTED}' + assert run.status == RunStatus.NOT_STARTED, f"Get {run.status}, expected {RunStatus.NOT_STARTED}" run._start_run() - assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' + assert run.status == RunStatus.BROKEN, f"Get {run.status}, expected {RunStatus.BROKEN}" run._end_run("FINISHED") - assert run.status == RunStatus.BROKEN, f'Get {run.status}, expected {RunStatus.BROKEN}' + assert run.status == RunStatus.BROKEN, f"Get {run.status}, expected {RunStatus.BROKEN}" - @pytest.mark.parametrize('status_code', [200, 401]) + @pytest.mark.parametrize("status_code", [200, 401]) def test_write_properties(self, token_mock, caplog, status_code): """Test writing properties to the evaluate run.""" mock_write = MagicMock() mock_write.status_code = status_code - mock_write.text = 'Mock error' - mock_session = MagicMock() - mock_session.request.side_effect = [ - self._get_mock_create_resonse(), - mock_write, - self._get_mock_end_response() - ] - with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): - with EvalRun( - run_name="test", - **TestEvalRun._MOCK_CREDS - ) as run: - run.write_properties_to_run_history({'foo': 'bar'}) + mock_write.text = lambda: "Mock error" + with patch( + "promptflow.evals._http_utils.HttpPipeline.request", + side_effect=[self._get_mock_create_resonse(), mock_write, self._get_mock_end_response()], + ): + with EvalRun(run_name="test", **TestEvalRun._MOCK_CREDS) as run: + run.write_properties_to_run_history({"foo": "bar"}) if status_code != 200: assert len(caplog.records) == 1 - assert 'Fail writing properties' in caplog.records[0].message - assert mock_write.text in caplog.records[0].message + assert "Fail writing properties" in caplog.records[0].message + assert mock_write.text() in caplog.records[0].message else: assert len(caplog.records) == 0 @@ -452,24 +409,24 @@ def test_write_properties_to_run_history_logs_error(self, token_mock, caplog): with EvalRun( run_name=None, tracking_uri=None, - subscription_id='mock', - group_name='mock', - workspace_name='mock', - ml_client=MagicMock() + subscription_id="mock", + group_name="mock", + workspace_name="mock", + ml_client=MagicMock(), ) as run: - run.write_properties_to_run_history({'foo': 'bar'}) + run.write_properties_to_run_history({"foo": "bar"}) assert len(caplog.records) == 3 assert "tracking_uri was not provided," in caplog.records[0].message assert "Unable to write properties due to Run status=RunStatus.BROKEN." in caplog.records[1].message assert "Unable to stop run due to Run status=RunStatus.BROKEN." in caplog.records[2].message @pytest.mark.parametrize( - 'function_literal,args,expected_action', + "function_literal,args,expected_action", [ - ('write_properties_to_run_history', ({'foo': 'bar'}), 'write properties'), - ('log_metric', ('foo', 42), 'log metric'), - ('log_artifact', ('mock_folder',), 'log artifact') - ] + ("write_properties_to_run_history", ({"foo": "bar"}), "write properties"), + ("log_metric", ("foo", 42), "log metric"), + ("log_artifact", ("mock_folder",), "log artifact"), + ], ) def test_logs_if_not_started(self, token_mock, caplog, function_literal, args, expected_action): """Test that all public functions are raising exception if run is not started.""" @@ -478,32 +435,25 @@ def test_logs_if_not_started(self, token_mock, caplog, function_literal, args, e # as a parent. This logger does not propagate the logs and cannot be # captured by caplog. Here we will skip this logger to capture logs. logger.parent = logging.root - run = EvalRun( - run_name=None, - **TestEvalRun._MOCK_CREDS - ) + run = EvalRun(run_name=None, **TestEvalRun._MOCK_CREDS) getattr(run, function_literal)(*args) assert len(caplog.records) == 1 assert expected_action in caplog.records[0].message, caplog.records[0].message - assert f"Unable to {expected_action} due to Run status=RunStatus.NOT_STARTED" in caplog.records[ - 0].message, caplog.records[0].message + assert ( + f"Unable to {expected_action} due to Run status=RunStatus.NOT_STARTED" in caplog.records[0].message + ), caplog.records[0].message - @pytest.mark.parametrize( - 'status', - [RunStatus.STARTED, RunStatus.BROKEN, RunStatus.TERMINATED] - ) + @pytest.mark.parametrize("status", [RunStatus.STARTED, RunStatus.BROKEN, RunStatus.TERMINATED]) def test_starting_started_run(self, token_mock, status): """Test exception if the run was already started""" - run = EvalRun( - run_name=None, - **TestEvalRun._MOCK_CREDS - ) - mock_session = MagicMock() - mock_session.request.return_value = self._get_mock_create_resonse(500 if status == RunStatus.BROKEN else 200) - with patch("promptflow.evals.evaluate._eval_run.requests.Session", return_value=mock_session): + run = EvalRun(run_name=None, **TestEvalRun._MOCK_CREDS) + with patch( + "promptflow.evals._http_utils.HttpPipeline.request", + return_value=self._get_mock_create_resonse(500 if status == RunStatus.BROKEN else 200), + ): run._start_run() if status == RunStatus.TERMINATED: - run._end_run('FINISHED') + run._end_run("FINISHED") with pytest.raises(RuntimeError) as cm: run._start_run() assert f"Unable to start run due to Run status={status}" in cm.value.args[0], cm.value.args[0] diff --git a/src/promptflow-evals/tests/evals/unittests/test_synthetic_conversation_bot.py b/src/promptflow-evals/tests/evals/unittests/test_synthetic_conversation_bot.py index 6d0f0b9b379..5a41a3fe008 100644 --- a/src/promptflow-evals/tests/evals/unittests/test_synthetic_conversation_bot.py +++ b/src/promptflow-evals/tests/evals/unittests/test_synthetic_conversation_bot.py @@ -2,7 +2,9 @@ import jinja2 import pytest +from azure.core.pipeline.policies import AsyncRetryPolicy, RetryMode +from promptflow.evals._http_utils import get_async_http_client from promptflow.evals.synthetic._conversation import ( ConversationBot, ConversationRole, @@ -10,7 +12,6 @@ LLMBase, OpenAIChatCompletionsModel, ) -from promptflow.evals.synthetic._model_tools import AsyncHTTPClientWithRetry # Mock classes for dependencies @@ -74,12 +75,15 @@ async def test_conversation_bot_initialization_user_invalid_jinja(self, bot_inva assert isinstance(bot.conversation_template, jinja2.Template) assert isinstance(bot.conversation_starter, str) assert bot.conversation_starter is not None - asyncHttpClient = AsyncHTTPClientWithRetry( - n_retry=1, - retry_timeout=0, - logger=None, + + client = get_async_http_client().with_policies( + retry_policy=AsyncRetryPolicy( + retry_total=1, + retry_backoff_factor=0, + retry_mode=RetryMode.Fixed, + ) ) - client = asyncHttpClient.client + parsed_response, req, time_taken, full_response = await bot.generate_response( session=client, conversation_history=[], max_history=0, turn_number=0 ) @@ -87,7 +91,6 @@ async def test_conversation_bot_initialization_user_invalid_jinja(self, bot_inva parsed_response["samples"][0] == bot_invalid_jinja_params["instantiation_parameters"]["conversation_starter"] ) - client.close() @pytest.mark.asyncio async def test_conversation_bot_initialization_assistant(self, bot_assistant_params): diff --git a/src/promptflow-evals/tests/recordings/azure/test_builtin_evaluators_TestBuiltInEvaluators_test_content_safety_service_unavailable.yaml b/src/promptflow-evals/tests/recordings/azure/test_builtin_evaluators_TestBuiltInEvaluators_test_content_safety_service_unavailable.yaml index 3fc9c167124..08dc274ae0b 100644 --- a/src/promptflow-evals/tests/recordings/azure/test_builtin_evaluators_TestBuiltInEvaluators_test_content_safety_service_unavailable.yaml +++ b/src/promptflow-evals/tests/recordings/azure/test_builtin_evaluators_TestBuiltInEvaluators_test_content_safety_service_unavailable.yaml @@ -1,18 +1,16 @@ interactions: - request: - body: '' + body: null headers: - accept: + Accept: - '*/*' - accept-encoding: + Accept-Encoding: - gzip, deflate - connection: + Connection: - close - content-type: + Content-Type: - application/json - host: - - management.azure.com - user-agent: + User-Agent: - promptflow-evals/0.1.0.dev0 method: GET uri: https://management.azure.com/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/00000/providers/Microsoft.MachineLearningServices/workspaces/00000?api-version=2023-08-01-preview @@ -20,16 +18,16 @@ interactions: body: string: '{"id": "/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/00000/providers/Microsoft.MachineLearningServices/workspaces/00000", "name": "00000", "type": "Microsoft.MachineLearningServices/workspaces", "location": - "westus2", "tags": {}, "etag": null, "kind": "Default", "sku": {"name": "Basic", - "tier": "Basic"}, "properties": {"discoveryUrl": "https://westus2.api.azureml.ms/discovery", - "mlFlowTrackingUri": "azureml://westus2.api.azureml.ms/mlflow/v1.0/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/00000/providers/Microsoft.MachineLearningServices/workspaces/00000"}}' + "westus", "tags": {}, "etag": null, "kind": "Project", "sku": {"name": "Basic", + "tier": "Basic"}, "properties": {"discoveryUrl": "https://westus.api.azureml.ms/discovery", + "mlFlowTrackingUri": "azureml://westus.api.azureml.ms/mlflow/v1.0/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/00000/providers/Microsoft.MachineLearningServices/workspaces/00000"}}' headers: cache-control: - no-cache connection: - close content-length: - - '2911' + - '2971' content-type: - application/json; charset=utf-8 expires: @@ -45,27 +43,127 @@ interactions: x-content-type-options: - nosniff x-request-time: - - '0.038' + - '0.032' status: code: 200 message: OK - request: - body: '' + body: null + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - close + Content-Type: + - application/json + User-Agent: + - promptflow-evals/0.1.0.dev0 + method: GET + uri: https://westus.api.azureml.ms/raisvc/v1.0/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/00000/providers/Microsoft.MachineLearningServices/workspaces/00000/checkannotation + response: + body: + string: unknown to cluster + headers: + connection: + - close + content-length: + - '18' + content-type: + - application/octet-stream + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + x-content-type-options: + - nosniff + x-request-time: + - '0.006' + status: + code: 530 + message: +- request: + body: null + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - close + Content-Type: + - application/json + User-Agent: + - promptflow-evals/0.1.0.dev0 + method: GET + uri: https://westus.api.azureml.ms/raisvc/v1.0/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/00000/providers/Microsoft.MachineLearningServices/workspaces/00000/checkannotation + response: + body: + string: unknown to cluster + headers: + connection: + - close + content-length: + - '18' + content-type: + - application/octet-stream + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + x-content-type-options: + - nosniff + x-request-time: + - '0.006' + status: + code: 530 + message: +- request: + body: null headers: - accept: + Accept: - '*/*' - accept-encoding: + Accept-Encoding: - gzip, deflate + Connection: + - close + Content-Type: + - application/json + User-Agent: + - promptflow-evals/0.1.0.dev0 + method: GET + uri: https://westus.api.azureml.ms/raisvc/v1.0/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/00000/providers/Microsoft.MachineLearningServices/workspaces/00000/checkannotation + response: + body: + string: unknown to cluster + headers: connection: - close + content-length: + - '18' content-type: + - application/octet-stream + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + x-content-type-options: + - nosniff + x-request-time: + - '0.008' + status: + code: 530 + message: +- request: + body: null + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - close + Content-Type: - application/json - host: - - westus2.api.azureml.ms - user-agent: + User-Agent: - promptflow-evals/0.1.0.dev0 method: GET - uri: https://westus2.api.azureml.ms/raisvc/v1.0/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/00000/providers/Microsoft.MachineLearningServices/workspaces/00000/checkannotation + uri: https://westus.api.azureml.ms/raisvc/v1.0/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/00000/providers/Microsoft.MachineLearningServices/workspaces/00000/checkannotation response: body: string: unknown to cluster @@ -81,7 +179,7 @@ interactions: x-content-type-options: - nosniff x-request-time: - - '0.004' + - '0.005' status: code: 530 message: