Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fundamental] Recording support metrics #1762

Merged
merged 12 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/promptflow-sdk-cli-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ jobs:
path: |
${{ env.testWorkingDirectory }}/*.xml
${{ env.testWorkingDirectory }}/htmlcov/
${{ env.testWorkingDirectory }}/tests/sdk_cli_test/count.json


publish-test-results-sdk-cli-test:
Expand Down
10 changes: 0 additions & 10 deletions src/promptflow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from promptflow._cli._utils import AzureMLWorkspaceTriad
from promptflow._constants import PROMPTFLOW_CONNECTIONS
from promptflow._core.connection_manager import ConnectionManager
from promptflow._core.openai_injector import inject_openai_api
from promptflow._utils.context_utils import _change_working_dir
from promptflow.connections import AzureOpenAIConnection

Expand All @@ -52,15 +51,6 @@ def mock_build_info():
yield m


@pytest.fixture(autouse=True, scope="session")
def inject_api():
"""Inject OpenAI API during test session.

AOAI call in promptflow should involve trace logging and header injection. Inject
function to API call in test scenario."""
inject_openai_api()


@pytest.fixture
def dev_connections() -> dict:
with open(CONNECTION_FILE, "r") as f:
Expand Down
12 changes: 12 additions & 0 deletions src/promptflow/tests/executor/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import pytest

from promptflow._core.openai_injector import inject_openai_api


@pytest.fixture(autouse=True, scope="session")
def inject_api_executor():
"""Inject OpenAI API during test session.

AOAI call in promptflow should involve trace logging and header injection. Inject
function to API call in test scenario."""
inject_openai_api()
52 changes: 39 additions & 13 deletions src/promptflow/tests/sdk_cli_test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sqlalchemy import create_engine

from promptflow import PFClient
from promptflow._core.openai_injector import inject_openai_api
from promptflow._sdk._configuration import Configuration
from promptflow._sdk._constants import EXPERIMENT_CREATED_ON_INDEX_NAME, EXPERIMENT_TABLE_NAME, LOCAL_MGMT_DB_PATH
from promptflow._sdk._serving.app import create_app as create_serving_app
Expand All @@ -19,7 +20,17 @@
from promptflow.executor._line_execution_process_pool import _process_wrapper
from promptflow.executor._process_manager import create_spawned_fork_process_manager

from .recording_utilities import RecordStorage, mock_tool, recording_array_extend, recording_array_reset
from .recording_utilities import (
RecordStorage,
delete_count_lock_file,
inject_async_with_recording,
inject_sync_with_recording,
is_live,
is_record,
is_replay,
mock_tool,
recording_array_reset,
)

PROMOTFLOW_ROOT = Path(__file__) / "../../.."
RUNTIME_TEST_CONFIGS_ROOT = Path(PROMOTFLOW_ROOT / "tests/test_configs/runtime")
Expand Down Expand Up @@ -192,13 +203,10 @@ def serving_client_with_environment_variables(mocker: MockerFixture):
)


@pytest.fixture
def recording_file_override(request: pytest.FixtureRequest, mocker: MockerFixture):
if RecordStorage.is_replaying_mode() or RecordStorage.is_recording_mode():
file_path = RECORDINGS_TEST_CONFIGS_ROOT / "node_cache.shelve"
RecordStorage.get_instance(file_path)
yield

# ==================== Recording injection ====================
# To inject patches in subprocesses, add new mock method in setup_recording_injection_if_enabled
# in fork mode, this is automatically enabled.
# in spawn mode, we need to decalre recording in each process separately.

SpawnProcess = multiprocessing.get_context("spawn").Process

Expand All @@ -213,7 +221,7 @@ def __init__(self, group=None, target=None, *args, **kwargs):


@pytest.fixture
def recording_injection(mocker: MockerFixture, recording_file_override):
def recording_injection(mocker: MockerFixture):
original_process_class = multiprocessing.get_context("spawn").Process
multiprocessing.get_context("spawn").Process = MockSpawnProcess
if "spawn" == multiprocessing.get_start_method():
Expand All @@ -222,10 +230,10 @@ def recording_injection(mocker: MockerFixture, recording_file_override):
patches = setup_recording_injection_if_enabled()

try:
yield (RecordStorage.is_replaying_mode() or RecordStorage.is_recording_mode(), recording_array_extend)
yield
finally:
if RecordStorage.is_replaying_mode() or RecordStorage.is_recording_mode():
RecordStorage.get_instance().delete_lock_file()
RecordStorage.get_instance().delete_lock_file()
delete_count_lock_file()
recording_array_reset()

multiprocessing.get_context("spawn").Process = original_process_class
Expand All @@ -238,7 +246,7 @@ def recording_injection(mocker: MockerFixture, recording_file_override):

def setup_recording_injection_if_enabled():
patches = []
if RecordStorage.is_replaying_mode() or RecordStorage.is_recording_mode():
if is_replay() or is_record():
file_path = RECORDINGS_TEST_CONFIGS_ROOT / "node_cache.shelve"
RecordStorage.get_instance(file_path)

Expand All @@ -251,6 +259,24 @@ def setup_recording_injection_if_enabled():
patcher = patch(target, mocked_tool)
patches.append(patcher)
patcher.start()

crazygao marked this conversation as resolved.
Show resolved Hide resolved
patcher = patch("promptflow._core.openai_injector.inject_sync", inject_sync_with_recording)
patches.append(patcher)
patcher.start()

patcher = patch("promptflow._core.openai_injector.inject_async", inject_async_with_recording)
patches.append(patcher)
patcher.start()
if is_live():
patcher = patch("promptflow._core.openai_injector.inject_sync", inject_sync_with_recording)
patches.append(patcher)
patcher.start()

patcher = patch("promptflow._core.openai_injector.inject_async", inject_async_with_recording)
patches.append(patcher)
patcher.start()

inject_openai_api()
return patches


Expand Down
3 changes: 0 additions & 3 deletions src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
from promptflow.connections import AzureOpenAIConnection
from promptflow.exceptions import UserErrorException

from ..recording_utilities import RecordStorage

PROMOTFLOW_ROOT = Path(__file__) / "../../../.."

TEST_ROOT = Path(__file__).parent.parent.parent
Expand Down Expand Up @@ -901,7 +899,6 @@ def test_error_message_dump(self, pf):
assert "error" in run_dict
assert run_dict["error"] == exception

@pytest.mark.skipif(RecordStorage.is_replaying_mode(), reason="System metrics not supported in replaying mode")
def test_system_metrics_in_properties(self, pf) -> None:
run = create_run_against_multi_line_data(pf)
assert FlowRunProperties.SYSTEM_METRICS in run.properties
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
from .constants import ENVIRON_TEST_MODE, RecordMode
from .mock_tool import mock_tool, recording_array_extend, recording_array_reset
from .record_storage import RecordFileMissingException, RecordItemMissingException, RecordStorage
from .mock_tool import delete_count_lock_file, mock_tool, recording_array_extend, recording_array_reset
from .openai_inject_recording import inject_async_with_recording, inject_sync_with_recording
from .record_storage import (
Counter,
RecordFileMissingException,
RecordItemMissingException,
RecordStorage,
is_live,
is_record,
is_replay,
)

__all__ = [
"Counter",
"RecordStorage",
"RecordMode",
"ENVIRON_TEST_MODE",
"RecordFileMissingException",
"RecordItemMissingException",
"mock_tool",
"recording_array_extend",
"recording_array_reset",
"inject_async_with_recording",
"inject_sync_with_recording",
"is_live",
"is_record",
"is_replay",
"delete_count_lock_file",
]
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
import functools
import inspect
import os
from pathlib import Path

from promptflow._core.tool import STREAMING_OPTION_PARAMETER_ATTR, ToolType
from promptflow._core.tracer import TraceType, _create_trace_from_function_call

from .record_storage import RecordFileMissingException, RecordItemMissingException, RecordStorage
from .record_storage import (
Counter,
RecordFileMissingException,
RecordItemMissingException,
RecordStorage,
is_live,
is_record,
is_replay,
)

COUNT_RECORD = (Path(__file__) / "../../count.json").resolve()

# recording array is a global variable to store the function names that need to be recorded
recording_array = ["fetch_text_content_from_url", "my_python_tool"]
Expand Down Expand Up @@ -46,52 +58,53 @@ def _replace_tool_rule(func):
func_wo_partial = func.func
else:
func_wo_partial = func
if func_wo_partial.__qualname__.startswith("AzureOpenAI"):
return True
elif func_wo_partial.__qualname__.startswith("OpenAI"):
return True
elif func_wo_partial.__module__ == "promptflow.tools.aoai":
return True
elif func_wo_partial.__module__ == "promptflow.tools.openai_gpt4v":
return True
elif func_wo_partial.__module__ == "promptflow.tools.openai":
return True
elif func_wo_partial.__qualname__ in recording_array:

if func_wo_partial.__qualname__ in recording_array:
return True
else:
return False


def call_func(func, args, kwargs):
input_dict = _prepare_input_dict(func, args, kwargs)
if RecordStorage.is_replaying_mode():
if is_replay():
return RecordStorage.get_instance().get_record(input_dict)
# Record mode will record item to record file
elif RecordStorage.is_recording_mode():
elif is_record():
try:
# prevent recording the same item twice
obj = RecordStorage.get_instance().get_record(input_dict)
except (RecordItemMissingException, RecordFileMissingException):
# recording the item
obj = RecordStorage.get_instance().set_record(input_dict, func(*args, **kwargs))
elif is_live():
obj = Counter.get_instance().set_file_record_count(COUNT_RECORD, func(*args, **kwargs))
return obj


async def call_func_async(func, args, kwargs):
input_dict = _prepare_input_dict(func, args, kwargs)
if RecordStorage.is_replaying_mode():
if is_replay():
return RecordStorage.get_instance().get_record(input_dict)
# Record mode will record item to record file
elif RecordStorage.is_recording_mode():
elif is_record():
try:
# prevent recording the same item twice
obj = RecordStorage.get_instance().get_record(input_dict)
except (RecordItemMissingException, RecordFileMissingException):
# recording the item
obj = RecordStorage.get_instance().set_record(input_dict, await func(*args, **kwargs))
elif is_live():
obj = Counter.get_instance().set_file_record_count(COUNT_RECORD, await func(*args, **kwargs))
return obj


def delete_count_lock_file():
lock_file = str(COUNT_RECORD) + ".lock"
if os.path.isfile(lock_file):
os.remove(lock_file)


def mock_tool(original_tool):
"""
Basically this is the original tool decorator.
Expand Down Expand Up @@ -175,7 +188,9 @@ def decorated_tool(*args, **kwargs):

# tool replacements.
if func is not None:
if not _replace_tool_rule(func):
if _replace_tool_rule(func):
return tool_decorator(func)
else:
return original_tool(
func,
*args_mock,
Expand All @@ -185,7 +200,6 @@ def decorated_tool(*args, **kwargs):
input_settings=input_settings,
**kwargs_mock,
)
return tool_decorator(func)
return original_tool( # no recording for @tool(name="func_name")
func,
*args_mock,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
import functools

from promptflow._core.openai_injector import inject_function_async, inject_function_sync, inject_operation_headers

from .mock_tool import call_func, call_func_async


def inject_recording(f):
if asyncio.iscoroutinefunction(f):

@functools.wraps(f)
async def wrapper(*args, **kwargs):
return await call_func_async(f, args, kwargs)

else:

@functools.wraps(f)
def wrapper(*args, **kwargs):
return call_func(f, args, kwargs)

return wrapper


def inject_async_with_recording(f):
wrapper_fun = inject_operation_headers(
(inject_function_async(args_to_ignore=["api_key", "headers", "extra_headers"])(inject_recording(f)))
)
wrapper_fun._original = f
return wrapper_fun


def inject_sync_with_recording(f):
wrapper_fun = inject_operation_headers(
(inject_function_sync(args_to_ignore=["api_key", "headers", "extra_headers"])(inject_recording(f)))
)
wrapper_fun._original = f
return wrapper_fun
Loading
Loading