Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leverage async batch run for first async-enabled evaluator - FluencyEvaluator #3542

Merged
merged 13 commits into from
Jul 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ async def exec_aggregation_async(
) -> AggregationResult:
return self._flow_executor.exec_aggregation(batch_inputs, aggregation_inputs, run_id=run_id)

async def exec_line_async(
ninghu marked this conversation as resolved.
Show resolved Hide resolved
self,
inputs: Mapping[str, Any],
index: Optional[int] = None,
run_id: Optional[str] = None,
) -> LineResult:
return await self._flow_executor.exec_line_async(inputs, index, run_id)

async def _exec_batch(
self,
batch_inputs: List[Mapping[str, Any]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _run_bulk(self, run: Run, stream=False, **kwargs):
with flow_overwrite_context(
flow_obj, tuning_node, variant, connections=run.connections, init_kwargs=run.init
) as flow:
self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage)
self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage, **kwargs)

@classmethod
def _validate_inputs(cls, run: Run):
Expand All @@ -140,7 +140,7 @@ def _validate_inputs(cls, run: Run):
raise UserErrorException(message=str(error), error=error)

def _submit_bulk_run(
self, flow: Union[Flow, FlexFlow, Prompty], run: Run, local_storage: LocalStorageOperations
self, flow: Union[Flow, FlexFlow, Prompty], run: Run, local_storage: LocalStorageOperations, **kwargs
ninghu marked this conversation as resolved.
Show resolved Hide resolved
) -> dict:
logger.info(f"Submitting run {run.name}, log path: {local_storage.logger.file_path}")
run_id = run.name
Expand Down Expand Up @@ -183,6 +183,7 @@ def _submit_bulk_run(
storage=local_storage,
log_path=local_storage.logger.file_path,
init_kwargs=run.init,
**kwargs,
)
batch_result = batch_engine.run(
input_dirs=input_dirs,
Expand Down
7 changes: 5 additions & 2 deletions src/promptflow-devkit/promptflow/batch/_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def __init__(
self._storage = storage if storage else DefaultRunStorage(base_dir=self._working_dir)
self._kwargs = kwargs

self._batch_use_async = kwargs.get("batch_use_async", False)
ninghu marked this conversation as resolved.
Show resolved Hide resolved
self._batch_timeout_sec = batch_timeout_sec or get_int_env_var("PF_BATCH_TIMEOUT_SEC")
self._line_timeout_sec = line_timeout_sec or get_int_env_var("PF_LINE_TIMEOUT_SEC", LINE_TIMEOUT_SEC)
self._worker_count = worker_count or get_int_env_var("PF_WORKER_COUNT")
Expand Down Expand Up @@ -472,7 +473,7 @@ async def _exec(

# execute lines
is_timeout = False
if isinstance(self._executor_proxy, PythonExecutorProxy):
if not self._batch_use_async and isinstance(self._executor_proxy, PythonExecutorProxy):
results, is_timeout = await self._executor_proxy._exec_batch(
inputs_to_run,
output_dir,
Expand Down Expand Up @@ -656,6 +657,8 @@ def _check_eager_flow_and_language_from_yaml(self):

def _batch_timeout_expired(self) -> bool:
# Currently, local PythonExecutorProxy will handle the batch timeout by itself.
if self._batch_timeout_sec is None or isinstance(self._executor_proxy, PythonExecutorProxy):
if self._batch_timeout_sec is None or (
not self._batch_use_async and isinstance(self._executor_proxy, PythonExecutorProxy)
):
return False
return (datetime.utcnow() - self._start_time).total_seconds() > self._batch_timeout_sec
24 changes: 13 additions & 11 deletions src/promptflow-evals/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
# promptflow-evals package
# Release History

Please insert change log into "Next Release" ONLY.

## Next release

## 0.3.2
## v0.3.2 (Upcoming)

### Features Added
- Introduced `JailbreakAdversarialSimulator` for customers who need to do run jailbreak and non jailbreak adversarial simulations at the same time. More info in the README.md in `/promptflow/evals/synthetic/README.md#jailbreak-simulator`

- The `AdversarialSimulator` responds with `category` of harm in the response.

- Large simulation was causing a jinja exception, this has been fixed
### Bugs Fixed
- Large simulation was causing a jinja exception, this has been fixed.

### Improvements
- Converted built-in evaluators to async-based implementation, leveraging async batch run for performance improvement.
- Parity between evals and Simulator on signature, passing credentials.
- The `AdversarialSimulator` responds with `category` of harm in the response.

## v0.3.1 (2022-07-09)
- This release contains minor bug fixes and improvements.

## 0.0.1
- Introduced package
## v0.3.0 (2024-05-17)
- Initial release of promptflow-evals package.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api

from ..._user_agent import USER_AGENT
from .._utils import set_event_loop_policy
from .code_client import CodeClient
from .proxy_client import ProxyClient

Expand All @@ -25,6 +26,9 @@ def __enter__(self):
os.environ[PF_FLOW_ENTRY_IN_TMP] = "true"
os.environ[PF_FLOW_META_LOAD_IN_SUBPROCESS] = "false"

# For addressing the issue of asyncio event loop closed on Windows
set_event_loop_policy()

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.client, CodeClient):
recover_openai_api()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import inspect
import logging

import numpy as np
Expand All @@ -24,8 +25,18 @@ def __init__(self, pf_client: PFClient):
self._thread_pool = ThreadPoolExecutor(thread_name_prefix="evaluators_thread")

def run(self, flow, data, column_mapping=None, **kwargs):
flow_to_run = flow
if hasattr(flow, "_to_async"):
flow_to_run = flow._to_async()

batch_use_async = self._should_batch_use_async(flow_to_run)
eval_future = self._thread_pool.submit(
self._pf_client.run, flow, data=data, column_mapping=column_mapping, **kwargs
self._pf_client.run,
flow_to_run,
data=data,
column_mapping=column_mapping,
batch_use_async=batch_use_async,
**kwargs
)
return ProxyRun(run=eval_future)

Expand All @@ -38,3 +49,12 @@ def get_details(self, proxy_run, all_results=False):
def get_metrics(self, proxy_run):
run = proxy_run.run.result(timeout=BATCH_RUN_TIMEOUT)
return self._pf_client.get_metrics(run)

@staticmethod
def _should_batch_use_async(flow):
if hasattr(flow, "__call__") and inspect.iscoroutinefunction(flow.__call__):
return True
elif inspect.iscoroutinefunction(flow):
return True
else:
return False
19 changes: 16 additions & 3 deletions src/promptflow-evals/promptflow/evals/evaluate/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from promptflow.evals._constants import DEFAULT_EVALUATION_RESULTS_FILE_NAME, Prefixes
from promptflow.evals.evaluate._eval_run import EvalRun


LOGGER = logging.getLogger(__name__)

AZURE_WORKSPACE_REGEX_FORMAT = (
Expand Down Expand Up @@ -62,7 +61,11 @@ def _azure_pf_client_and_triad(trace_destination):


def _log_metrics_and_instance_results(
metrics, instance_results, trace_destination, run, evaluation_name,
metrics,
instance_results,
trace_destination,
run,
evaluation_name,
) -> str:
if trace_destination is None:
LOGGER.error("Unable to log traces as trace destination was not defined.")
Expand Down Expand Up @@ -175,7 +178,7 @@ def _apply_column_mapping(source_df: pd.DataFrame, mapping_config: dict, inplace
if match is not None:
pattern = match.group(1)
if pattern.startswith(pattern_prefix):
map_from_key = pattern[len(pattern_prefix):]
map_from_key = pattern[len(pattern_prefix) :]
elif pattern.startswith(run_outputs_prefix):
# Target-generated columns always starts from .outputs.
map_from_key = f"{Prefixes._TGT_OUTPUTS}{pattern[len(run_outputs_prefix) :]}"
Expand All @@ -199,3 +202,13 @@ def _apply_column_mapping(source_df: pd.DataFrame, mapping_config: dict, inplace

def _has_aggregator(evaluator):
return hasattr(evaluator, "__aggregate__")


def set_event_loop_policy():
import asyncio
import platform

if platform.system().lower() == "windows":
# Reference: https://stackoverflow.com/questions/45600579/asyncio-event-loop-is-closed-when-getting-loop
# On Windows seems to be a problem with EventLoopPolicy, use this snippet to work around it
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
ninghu marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,48 @@

import numpy as np

from promptflow.client import load_flow
from promptflow.core import AzureOpenAIModelConfiguration
from promptflow._utils.async_utils import async_run_allowing_running_loop
ninghu marked this conversation as resolved.
Show resolved Hide resolved
from promptflow.core import AsyncPrompty, AzureOpenAIModelConfiguration

try:
from ..._user_agent import USER_AGENT
except ImportError:
USER_AGENT = None


class _AsyncFluencyEvaluator:
def __init__(self, model_config: AzureOpenAIModelConfiguration):
if model_config.api_version is None:
model_config.api_version = "2024-02-15-preview"

prompty_model_config = {"configuration": model_config}
prompty_model_config.update(
{"parameters": {"extra_headers": {"x-ms-useragent": USER_AGENT}}}
) if USER_AGENT and isinstance(model_config, AzureOpenAIModelConfiguration) else None
current_dir = os.path.dirname(__file__)
prompty_path = os.path.join(current_dir, "fluency.prompty")
self._flow = AsyncPrompty.load(source=prompty_path, model=prompty_model_config)

async def __call__(self, *, question: str, answer: str, **kwargs):
# Validate input parameters
question = str(question or "")
answer = str(answer or "")

if not (question.strip() and answer.strip()):
raise ValueError("Both 'question' and 'answer' must be non-empty strings.")

# Run the evaluation flow
llm_output = await self._flow(question=question, answer=answer)

score = np.nan
if llm_output:
match = re.search(r"\d", llm_output)
if match:
score = float(match.group())

return {"gpt_fluency": float(score)}


class FluencyEvaluator:
"""
Initialize a fluency evaluator configured for a specific Azure OpenAI model.
Expand All @@ -41,17 +75,7 @@ class FluencyEvaluator:
"""

def __init__(self, model_config: AzureOpenAIModelConfiguration):
# TODO: Remove this block once the bug is fixed
# https://msdata.visualstudio.com/Vienna/_workitems/edit/3151324
if model_config.api_version is None:
model_config.api_version = "2024-02-15-preview"

prompty_model_config = {"configuration": model_config}
prompty_model_config.update({"parameters": {"extra_headers": {"x-ms-useragent": USER_AGENT}}}) \
if USER_AGENT and isinstance(model_config, AzureOpenAIModelConfiguration) else None
current_dir = os.path.dirname(__file__)
prompty_path = os.path.join(current_dir, "fluency.prompty")
self._flow = load_flow(source=prompty_path, model=prompty_model_config)
self._async_evaluator = _AsyncFluencyEvaluator(model_config)
ninghu marked this conversation as resolved.
Show resolved Hide resolved

def __call__(self, *, question: str, answer: str, **kwargs):
"""
Expand All @@ -64,20 +88,7 @@ def __call__(self, *, question: str, answer: str, **kwargs):
:return: The fluency score.
:rtype: dict
"""
# Validate input parameters
question = str(question or "")
answer = str(answer or "")
return async_run_allowing_running_loop(self._async_evaluator, question=question, answer=answer, **kwargs)

if not (question.strip() and answer.strip()):
raise ValueError("Both 'question' and 'answer' must be non-empty strings.")

# Run the evaluation flow
llm_output = self._flow(question=question, answer=answer)

score = np.nan
if llm_output:
match = re.search(r"\d", llm_output)
if match:
score = float(match.group())

return {"gpt_fluency": float(score)}
def _to_async(self):
ninghu marked this conversation as resolved.
Show resolved Hide resolved
return self._async_evaluator
11 changes: 7 additions & 4 deletions src/promptflow-evals/tests/evals/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from unittest.mock import patch

import pytest

from pytest_mock import MockerFixture

from promptflow.client import PFClient
Expand Down Expand Up @@ -36,6 +35,7 @@ def is_record():
def is_replay():
return False


# Import of optional packages
AZURE_INSTALLED = True
try:
Expand Down Expand Up @@ -95,6 +95,8 @@ def model_config() -> dict:
raise ValueError(f"Connection '{conn_name}' not found in dev connections.")

model_config = AzureOpenAIModelConfiguration(**dev_connections[conn_name]["value"])
# Default to gpt-35-turbo for capacity reasons
model_config.azure_deployment = "gpt-35-turbo"

AzureOpenAIModelConfiguration.__repr__ = lambda self: "<sensitive data redacted>"

Expand Down Expand Up @@ -368,12 +370,13 @@ def pytest_collection_modifyitems(items):
parents = {}
for item in items:
# Check if parent contains 'localtest' marker and remove it.
if any(mark.name == 'localtest' for mark in item.parent.own_markers) or id(item.parent) in parents:
if any(mark.name == "localtest" for mark in item.parent.own_markers) or id(item.parent) in parents:
if id(item.parent) not in parents:
item.parent.own_markers = [
marker for marker in item.own_markers if getattr(marker, 'name', None) != 'localtest']
marker for marker in item.own_markers if getattr(marker, "name", None) != "localtest"
]
parents[id(item.parent)] = item.parent
if not item.get_closest_marker('azuretest'):
if not item.get_closest_marker("azuretest"):
# If item's parent was marked as 'localtest', mark the child as such, but not if
# it was marked as 'azuretest'.
item.add_marker(pytest.mark.localtest)
34 changes: 33 additions & 1 deletion src/promptflow-evals/tests/evals/e2etests/test_evaluate.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import json
import os
import pathlib
import time

import numpy as np
import pandas as pd
import pytest
import requests

from promptflow.evals.evaluate import evaluate
from promptflow.evals.evaluators import ContentSafetyEvaluator, F1ScoreEvaluator, GroundednessEvaluator
from promptflow.evals.evaluators import (
ContentSafetyEvaluator,
F1ScoreEvaluator,
FluencyEvaluator,
GroundednessEvaluator,
)


@pytest.fixture
Expand Down Expand Up @@ -154,6 +160,32 @@ def test_evaluate_with_content_safety_evaluator(self, project_scope, data_file,
assert 0 <= metrics.get("content_safety.self_harm_defect_rate") <= 1
assert 0 <= metrics.get("content_safety.hate_unfairness_defect_rate") <= 1

@pytest.mark.performance_test
def test_evaluate_with_async_enabled_evaluator(self, model_config, data_file):
fluency_eval = FluencyEvaluator(model_config)

start_time = time.time()
result = evaluate(
data=data_file,
evaluators={
"fluency": fluency_eval,
},
)
end_time = time.time()
duration = end_time - start_time

row_result_df = pd.DataFrame(result["rows"])
metrics = result["metrics"]

# validate the results
assert result is not None
assert result["rows"] is not None
input_data = pd.read_json(data_file, lines=True)
assert row_result_df.shape[0] == len(input_data)
assert "outputs.fluency.gpt_fluency" in row_result_df.columns.to_list()
assert "fluency.gpt_fluency" in metrics.keys()
assert duration < 10, f"evaluate API call took too long: {duration} seconds"

@pytest.mark.parametrize(
"use_pf_client,function,column",
[
Expand Down
Loading
Loading