diff --git a/src/promptflow-evals/CHANGELOG.md b/src/promptflow-evals/CHANGELOG.md index 11a52d8bcd7..8052f67d06b 100644 --- a/src/promptflow-evals/CHANGELOG.md +++ b/src/promptflow-evals/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features Added - Introduced `JailbreakAdversarialSimulator` for customers who need to do run jailbreak and non jailbreak adversarial simulations at the same time. More info in the README.md in `/promptflow/evals/synthetic/README.md#jailbreak-simulator` +- Exposed batch evaluation run timeout via "PF_BATCH_TIMEOUT_SEC" environment variable. This variable can be used to set the timeout for the batch evaluation for each evaluator and target separately only, not the entire API call. ### Bugs Fixed - Large simulation was causing a jinja exception, this has been fixed. diff --git a/src/promptflow-evals/promptflow/evals/_constants.py b/src/promptflow-evals/promptflow/evals/_constants.py index c31b88322e3..5166f6a464e 100644 --- a/src/promptflow-evals/promptflow/evals/_constants.py +++ b/src/promptflow-evals/promptflow/evals/_constants.py @@ -23,4 +23,5 @@ class Prefixes: CONTENT_SAFETY_DEFECT_RATE_THRESHOLD_DEFAULT = 4 -BATCH_RUN_TIMEOUT = 3600 +PF_BATCH_TIMEOUT_SEC_DEFAULT = 3600 +PF_BATCH_TIMEOUT_SEC = "PF_BATCH_TIMEOUT_SEC" diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/batch_run_context.py b/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/batch_run_context.py index 9b810bf09c5..1d188c98ce7 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/batch_run_context.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/batch_run_context.py @@ -5,6 +5,7 @@ from promptflow._sdk._constants import PF_FLOW_ENTRY_IN_TMP, PF_FLOW_META_LOAD_IN_SUBPROCESS from promptflow._utils.user_agent_utils import ClientUserAgentUtil +from promptflow.evals._constants import PF_BATCH_TIMEOUT_SEC, PF_BATCH_TIMEOUT_SEC_DEFAULT from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api from ..._user_agent import USER_AGENT @@ -16,6 +17,7 @@ class BatchRunContext: def __init__(self, client): self.client = client + self._is_timeout_set_by_system = False def __enter__(self): if isinstance(self.client, CodeClient): @@ -26,6 +28,10 @@ def __enter__(self): os.environ[PF_FLOW_ENTRY_IN_TMP] = "true" os.environ[PF_FLOW_META_LOAD_IN_SUBPROCESS] = "false" + if os.environ.get(PF_BATCH_TIMEOUT_SEC) is None: + os.environ[PF_BATCH_TIMEOUT_SEC] = str(PF_BATCH_TIMEOUT_SEC_DEFAULT) + self._is_timeout_set_by_system = True + # For addressing the issue of asyncio event loop closed on Windows set_event_loop_policy() @@ -36,3 +42,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): if isinstance(self.client, ProxyClient): os.environ.pop(PF_FLOW_ENTRY_IN_TMP, None) os.environ.pop(PF_FLOW_META_LOAD_IN_SUBPROCESS, None) + + if self._is_timeout_set_by_system: + os.environ.pop(PF_BATCH_TIMEOUT_SEC, None) + self._is_timeout_set_by_system = False diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/code_client.py b/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/code_client.py index 10336f80ecb..e791d35bdc8 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/code_client.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/code_client.py @@ -8,10 +8,10 @@ import pandas as pd from promptflow.contracts.types import AttrDict -from promptflow.evals.evaluate._utils import _apply_column_mapping, _has_aggregator, load_jsonl +from promptflow.evals.evaluate._utils import _apply_column_mapping, _has_aggregator, get_int_env_var, load_jsonl from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor -from ..._constants import BATCH_RUN_TIMEOUT +from ..._constants import PF_BATCH_TIMEOUT_SEC, PF_BATCH_TIMEOUT_SEC_DEFAULT LOGGER = logging.getLogger(__name__) @@ -24,15 +24,17 @@ def __init__(self, run, input_data, evaluator_name=None, aggregated_metrics=None self.aggregated_metrics = aggregated_metrics def get_result_df(self, exclude_inputs=False): - result_df = self.run.result(timeout=BATCH_RUN_TIMEOUT) + batch_run_timeout = get_int_env_var(PF_BATCH_TIMEOUT_SEC, PF_BATCH_TIMEOUT_SEC_DEFAULT) + result_df = self.run.result(timeout=batch_run_timeout) if exclude_inputs: result_df = result_df.drop(columns=[col for col in result_df.columns if col.startswith("inputs.")]) return result_df def get_aggregated_metrics(self): try: + batch_run_timeout = get_int_env_var(PF_BATCH_TIMEOUT_SEC, PF_BATCH_TIMEOUT_SEC_DEFAULT) aggregated_metrics = ( - self.aggregated_metrics.result(timeout=BATCH_RUN_TIMEOUT) + self.aggregated_metrics.result(timeout=batch_run_timeout) if self.aggregated_metrics is not None else None ) diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/proxy_client.py b/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/proxy_client.py index b04ac32cb9a..ecbe04093fb 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/proxy_client.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_batch_run_client/proxy_client.py @@ -9,8 +9,6 @@ from promptflow.client import PFClient from promptflow.tracing import ThreadPoolExecutorWithContext as ThreadPoolExecutor -from ..._constants import BATCH_RUN_TIMEOUT - LOGGER = logging.getLogger(__name__) @@ -41,13 +39,13 @@ def run(self, flow, data, column_mapping=None, **kwargs): return ProxyRun(run=eval_future) def get_details(self, proxy_run, all_results=False): - run = proxy_run.run.result(timeout=BATCH_RUN_TIMEOUT) + run = proxy_run.run.result() result_df = self._pf_client.get_details(run, all_results=all_results) result_df.replace("(Failed)", np.nan, inplace=True) return result_df def get_metrics(self, proxy_run): - run = proxy_run.run.result(timeout=BATCH_RUN_TIMEOUT) + run = proxy_run.run.result() return self._pf_client.get_metrics(run) @staticmethod diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_utils.py b/src/promptflow-evals/promptflow/evals/evaluate/_utils.py index 5f8fd061b5e..35685469ba3 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_utils.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_utils.py @@ -204,6 +204,22 @@ def _has_aggregator(evaluator): return hasattr(evaluator, "__aggregate__") +def get_int_env_var(env_var_name, default_value=None): + """ + The function `get_int_env_var` retrieves an integer environment variable value, with an optional + default value if the variable is not set or cannot be converted to an integer. + + :param env_var_name: The name of the environment variable you want to retrieve the value of + :param default_value: The default value is the value that will be returned if the environment + variable is not found or if it cannot be converted to an integer + :return: an integer value. + """ + try: + return int(os.environ.get(env_var_name, default_value)) + except Exception: + return default_value + + def set_event_loop_policy(): import asyncio import platform diff --git a/src/promptflow-evals/tests/evals/unittests/test_batch_run_context.py b/src/promptflow-evals/tests/evals/unittests/test_batch_run_context.py index b67bba15c30..b62a4ff3cfc 100644 --- a/src/promptflow-evals/tests/evals/unittests/test_batch_run_context.py +++ b/src/promptflow-evals/tests/evals/unittests/test_batch_run_context.py @@ -1,10 +1,12 @@ +import os from unittest.mock import MagicMock import pytest from promptflow.client import PFClient +from promptflow.evals._constants import PF_BATCH_TIMEOUT_SEC, PF_BATCH_TIMEOUT_SEC_DEFAULT from promptflow.evals._user_agent import USER_AGENT -from promptflow.evals.evaluate._batch_run_client import BatchRunContext, CodeClient +from promptflow.evals.evaluate._batch_run_client import BatchRunContext, CodeClient, ProxyClient @pytest.fixture @@ -50,3 +52,27 @@ def test_with_pfclient(self, mocker, pf_client_mock): pass mock_recover_openai_api.assert_not_called() + + def test_batch_timeout_default(self): + before_timeout = os.environ.get(PF_BATCH_TIMEOUT_SEC) + assert before_timeout is None + + with BatchRunContext(ProxyClient(PFClient)): + during_timeout = int(os.environ.get(PF_BATCH_TIMEOUT_SEC)) + assert during_timeout == PF_BATCH_TIMEOUT_SEC_DEFAULT + + # Default timeout should be reset after exiting BatchRunContext + after_timeout = os.environ.get(PF_BATCH_TIMEOUT_SEC) + assert after_timeout is None + + def test_batch_timeout_custom(self): + custom_timeout = 1000 + os.environ[PF_BATCH_TIMEOUT_SEC] = str(custom_timeout) + + with BatchRunContext(ProxyClient(PFClient)): + during_timeout = int(os.environ.get(PF_BATCH_TIMEOUT_SEC)) + assert during_timeout == custom_timeout + + # Custom timeouts should not be reset after exiting BatchRunContext + after_timeout = int(os.environ.get(PF_BATCH_TIMEOUT_SEC)) + assert after_timeout == custom_timeout