Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leverage async batch run for first async-enabled evaluator - FluencyEvaluator #3542

Merged
merged 13 commits into from
Jul 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ async def exec_aggregation_async(
) -> AggregationResult:
return self._flow_executor.exec_aggregation(batch_inputs, aggregation_inputs, run_id=run_id)

async def exec_line_async(
ninghu marked this conversation as resolved.
Show resolved Hide resolved
self,
inputs: Mapping[str, Any],
index: Optional[int] = None,
run_id: Optional[str] = None,
) -> LineResult:
return await self._flow_executor.exec_line_async(inputs, index, run_id)

async def _exec_batch(
self,
batch_inputs: List[Mapping[str, Any]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _run_bulk(self, run: Run, stream=False, **kwargs):
with flow_overwrite_context(
flow_obj, tuning_node, variant, connections=run.connections, init_kwargs=run.init
) as flow:
self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage)
self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage, **kwargs)

@classmethod
def _validate_inputs(cls, run: Run):
Expand All @@ -140,7 +140,7 @@ def _validate_inputs(cls, run: Run):
raise UserErrorException(message=str(error), error=error)

def _submit_bulk_run(
self, flow: Union[Flow, FlexFlow, Prompty], run: Run, local_storage: LocalStorageOperations
self, flow: Union[Flow, FlexFlow, Prompty], run: Run, local_storage: LocalStorageOperations, **kwargs
ninghu marked this conversation as resolved.
Show resolved Hide resolved
) -> dict:
logger.info(f"Submitting run {run.name}, log path: {local_storage.logger.file_path}")
run_id = run.name
Expand Down Expand Up @@ -183,6 +183,7 @@ def _submit_bulk_run(
storage=local_storage,
log_path=local_storage.logger.file_path,
init_kwargs=run.init,
**kwargs,
)
batch_result = batch_engine.run(
input_dirs=input_dirs,
Expand Down
7 changes: 5 additions & 2 deletions src/promptflow-devkit/promptflow/batch/_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def __init__(
self._storage = storage if storage else DefaultRunStorage(base_dir=self._working_dir)
self._kwargs = kwargs

self._batch_use_async = kwargs.get("batch_use_async", False)
ninghu marked this conversation as resolved.
Show resolved Hide resolved
self._batch_timeout_sec = batch_timeout_sec or get_int_env_var("PF_BATCH_TIMEOUT_SEC")
self._line_timeout_sec = line_timeout_sec or get_int_env_var("PF_LINE_TIMEOUT_SEC", LINE_TIMEOUT_SEC)
self._worker_count = worker_count or get_int_env_var("PF_WORKER_COUNT")
Expand Down Expand Up @@ -472,7 +473,7 @@ async def _exec(

# execute lines
is_timeout = False
if isinstance(self._executor_proxy, PythonExecutorProxy):
if not self._batch_use_async and isinstance(self._executor_proxy, PythonExecutorProxy):
results, is_timeout = await self._executor_proxy._exec_batch(
inputs_to_run,
output_dir,
Expand Down Expand Up @@ -656,6 +657,8 @@ def _check_eager_flow_and_language_from_yaml(self):

def _batch_timeout_expired(self) -> bool:
# Currently, local PythonExecutorProxy will handle the batch timeout by itself.
if self._batch_timeout_sec is None or isinstance(self._executor_proxy, PythonExecutorProxy):
if self._batch_timeout_sec is None or (
not self._batch_use_async and isinstance(self._executor_proxy, PythonExecutorProxy)
):
return False
return (datetime.utcnow() - self._start_time).total_seconds() > self._batch_timeout_sec
18 changes: 13 additions & 5 deletions src/promptflow-evals/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
# promptflow-evals package
# Release History

Please insert change log into "Next Release" ONLY.
## v0.3.2 (Upcoming)

## Next release
### Features Added

## 0.0.1
- Introduced package
### Bugs Fixed

### Improvements
- Converted built-in evaluators to async-based implementation, leveraging async batch run for performance improvement.

## v0.3.1 (2022-07-09)
- This release contains minor bug fixes and improvements.

## v0.3.0 (2024-05-17)
- Initial release of promptflow-evals package.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from promptflow._sdk._constants import PF_FLOW_ENTRY_IN_TMP, PF_FLOW_META_LOAD_IN_SUBPROCESS
from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow.azure._utils.general import set_event_loop_policy
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api

from ..._user_agent import USER_AGENT
Expand All @@ -25,6 +26,10 @@ def __enter__(self):
os.environ[PF_FLOW_ENTRY_IN_TMP] = "true"
os.environ[PF_FLOW_META_LOAD_IN_SUBPROCESS] = "false"

# For addressing the issue of asyncio event loop closed on Windows
# Reference: https://stackoverflow.com/questions/45600579/asyncio-event-loop-is-closed-when-getting-loop
set_event_loop_policy()

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.client, CodeClient):
recover_openai_api()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import inspect
import logging

import numpy as np
Expand All @@ -24,8 +25,18 @@ def __init__(self, pf_client: PFClient):
self._thread_pool = ThreadPoolExecutor(thread_name_prefix="evaluators_thread")

def run(self, flow, data, column_mapping=None, **kwargs):
flow_to_run = flow
if hasattr(flow, "_to_async"):
flow_to_run = flow._to_async()

batch_use_async = self._should_batch_use_async(flow_to_run)
eval_future = self._thread_pool.submit(
self._pf_client.run, flow, data=data, column_mapping=column_mapping, **kwargs
self._pf_client.run,
flow_to_run,
data=data,
column_mapping=column_mapping,
batch_use_async=batch_use_async,
**kwargs
)
return ProxyRun(run=eval_future)

Expand All @@ -38,3 +49,12 @@ def get_details(self, proxy_run, all_results=False):
def get_metrics(self, proxy_run):
run = proxy_run.run.result(timeout=BATCH_RUN_TIMEOUT)
return self._pf_client.get_metrics(run)

@staticmethod
def _should_batch_use_async(flow):
if hasattr(flow, "__call__") and inspect.iscoroutinefunction(flow.__call__):
return True
elif inspect.iscoroutinefunction(flow):
return True
else:
return False
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,48 @@

import numpy as np

from promptflow.client import load_flow
from promptflow.core import AzureOpenAIModelConfiguration
from promptflow._utils.async_utils import async_run_allowing_running_loop
ninghu marked this conversation as resolved.
Show resolved Hide resolved
from promptflow.core import AsyncPrompty, AzureOpenAIModelConfiguration

try:
from ..._user_agent import USER_AGENT
except ImportError:
USER_AGENT = None


class _AsyncFluencyEvaluator:
def __init__(self, model_config: AzureOpenAIModelConfiguration):
if model_config.api_version is None:
model_config.api_version = "2024-02-15-preview"

prompty_model_config = {"configuration": model_config}
prompty_model_config.update(
{"parameters": {"extra_headers": {"x-ms-user-agent": USER_AGENT}}}
) if USER_AGENT and isinstance(model_config, AzureOpenAIModelConfiguration) else None
current_dir = os.path.dirname(__file__)
prompty_path = os.path.join(current_dir, "fluency.prompty")
self._flow = AsyncPrompty.load(source=prompty_path, model=prompty_model_config)

async def __call__(self, *, question: str, answer: str, **kwargs):
# Validate input parameters
question = str(question or "")
answer = str(answer or "")

if not (question.strip() and answer.strip()):
raise ValueError("Both 'question' and 'answer' must be non-empty strings.")

# Run the evaluation flow
llm_output = await self._flow(question=question, answer=answer)

score = np.nan
if llm_output:
match = re.search(r"\d", llm_output)
if match:
score = float(match.group())

return {"gpt_fluency": float(score)}


class FluencyEvaluator:
"""
Initialize a fluency evaluator configured for a specific Azure OpenAI model.
Expand All @@ -41,17 +75,7 @@ class FluencyEvaluator:
"""

def __init__(self, model_config: AzureOpenAIModelConfiguration):
# TODO: Remove this block once the bug is fixed
# https://msdata.visualstudio.com/Vienna/_workitems/edit/3151324
if model_config.api_version is None:
model_config.api_version = "2024-02-15-preview"

prompty_model_config = {"configuration": model_config}
prompty_model_config.update({"parameters": {"extra_headers": {"x-ms-user-agent": USER_AGENT}}}) \
if USER_AGENT and isinstance(model_config, AzureOpenAIModelConfiguration) else None
current_dir = os.path.dirname(__file__)
prompty_path = os.path.join(current_dir, "fluency.prompty")
self._flow = load_flow(source=prompty_path, model=prompty_model_config)
self._async_evaluator = _AsyncFluencyEvaluator(model_config)
ninghu marked this conversation as resolved.
Show resolved Hide resolved

def __call__(self, *, question: str, answer: str, **kwargs):
"""
Expand All @@ -64,20 +88,7 @@ def __call__(self, *, question: str, answer: str, **kwargs):
:return: The fluency score.
:rtype: dict
"""
# Validate input parameters
question = str(question or "")
answer = str(answer or "")
return async_run_allowing_running_loop(self._async_evaluator, question=question, answer=answer, **kwargs)

if not (question.strip() and answer.strip()):
raise ValueError("Both 'question' and 'answer' must be non-empty strings.")

# Run the evaluation flow
llm_output = self._flow(question=question, answer=answer)

score = np.nan
if llm_output:
match = re.search(r"\d", llm_output)
if match:
score = float(match.group())

return {"gpt_fluency": float(score)}
def _to_async(self):
ninghu marked this conversation as resolved.
Show resolved Hide resolved
return self._async_evaluator
34 changes: 33 additions & 1 deletion src/promptflow-evals/tests/evals/e2etests/test_evaluate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import pathlib
import time

import numpy as np
import pandas as pd
Expand All @@ -9,7 +10,12 @@
from azure.identity import DefaultAzureCredential

from promptflow.evals.evaluate import evaluate
from promptflow.evals.evaluators import ContentSafetyEvaluator, F1ScoreEvaluator, GroundednessEvaluator
from promptflow.evals.evaluators import (
ContentSafetyEvaluator,
F1ScoreEvaluator,
FluencyEvaluator,
GroundednessEvaluator,
)


@pytest.fixture
Expand Down Expand Up @@ -153,6 +159,32 @@ def test_evaluate_with_content_safety_evaluator(self, project_scope, data_file,
assert 0 <= metrics.get("content_safety.self_harm_defect_rate") <= 1
assert 0 <= metrics.get("content_safety.hate_unfairness_defect_rate") <= 1

@pytest.mark.performance_test
def test_evaluate_with_async_enabled_evaluator(self, model_config, data_file):
fluency_eval = FluencyEvaluator(model_config)

start_time = time.time()
result = evaluate(
data=data_file,
evaluators={
"fluency": fluency_eval,
},
)
end_time = time.time()
duration = end_time - start_time

row_result_df = pd.DataFrame(result["rows"])
metrics = result["metrics"]

# validate the results
assert result is not None
assert result["rows"] is not None
input_data = pd.read_json(data_file, lines=True)
assert row_result_df.shape[0] == len(input_data)
assert "outputs.fluency.gpt_fluency" in row_result_df.columns.to_list()
assert "fluency.gpt_fluency" in metrics.keys()
assert duration < 10, f"evaluate API call took too long: {duration} seconds"

@pytest.mark.parametrize(
"use_pf_client,function,column",
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
from promptflow.evals.evaluators import FluencyEvaluator


async def fluency_async_mock():
return "1"


@pytest.mark.usefixtures("mock_model_config")
@pytest.mark.unittest
class TestBuiltInEvaluators:
def test_fluency_evaluator(self, mock_model_config):
fluency_eval = FluencyEvaluator(model_config=mock_model_config)
fluency_eval._flow = MagicMock(return_value="1")
fluency_eval._async_evaluator._flow = MagicMock(return_value=fluency_async_mock())

score = fluency_eval(question="What is the capital of Japan?", answer="The capital of Japan is Tokyo.")

Expand All @@ -19,7 +23,7 @@ def test_fluency_evaluator(self, mock_model_config):

def test_fluency_evaluator_non_string_inputs(self, mock_model_config):
fluency_eval = FluencyEvaluator(model_config=mock_model_config)
fluency_eval._flow = MagicMock(return_value="1")
fluency_eval._async_evaluator._flow = MagicMock(return_value=fluency_async_mock())

score = fluency_eval(question={"foo": 1}, answer={"bar": "2"})

Expand All @@ -28,7 +32,7 @@ def test_fluency_evaluator_non_string_inputs(self, mock_model_config):

def test_fluency_evaluator_empty_string(self, mock_model_config):
fluency_eval = FluencyEvaluator(model_config=mock_model_config)
fluency_eval._flow = MagicMock(return_value="1")
fluency_eval._async_evaluator._flow = MagicMock(return_value=fluency_async_mock())

with pytest.raises(ValueError) as exc_info:
fluency_eval(question="What is the capital of Japan?", answer=None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,10 @@
'a4c1141c2441edb1a62856177868a2242292a0be', (167936, 3388)
'3cd3028235ab4f02d7ee074fbc5667dd90d8a282', (171520, 3574)
'99493a54841e1a28fb054b71d2adc27e0be2ff5e', (175104, 3573)
'50f3e636259f7dfe9c86d070b4c0752fdcc1cfdd', (178688, 4400)
'cb45b5a6e3897d4a687e5f673a6924eebccb529c', (183296, 3456)
'089d2dc2622d2035e182f44dfebfce832c32be91', (186880, 5044)
'b74db0f7d27659cd5160bee77fd1402490fc0764', (192000, 3454)
'9da70c55984adfd99de7d7d35452bb119706a14c', (195584, 3417)
'70d94a59cf7aca95a8fe7faa2e8db14a05cf1773', (199168, 3438)
'7771928ea1d8a376edd1ac6ab344d3d1855b015e', (202752, 3431)
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,10 @@
'a4c1141c2441edb1a62856177868a2242292a0be', (167936, 3388)
'3cd3028235ab4f02d7ee074fbc5667dd90d8a282', (171520, 3574)
'99493a54841e1a28fb054b71d2adc27e0be2ff5e', (175104, 3573)
'50f3e636259f7dfe9c86d070b4c0752fdcc1cfdd', (178688, 4400)
'cb45b5a6e3897d4a687e5f673a6924eebccb529c', (183296, 3456)
'089d2dc2622d2035e182f44dfebfce832c32be91', (186880, 5044)
'b74db0f7d27659cd5160bee77fd1402490fc0764', (192000, 3454)
'9da70c55984adfd99de7d7d35452bb119706a14c', (195584, 3417)
'70d94a59cf7aca95a8fe7faa2e8db14a05cf1773', (199168, 3438)
'7771928ea1d8a376edd1ac6ab344d3d1855b015e', (202752, 3431)
Loading