diff --git a/.github/workflows/promptflow-tracing-e2e-test.yml b/.github/workflows/promptflow-tracing-e2e-test.yml new file mode 100644 index 000000000000..8b48064dafa7 --- /dev/null +++ b/.github/workflows/promptflow-tracing-e2e-test.yml @@ -0,0 +1,138 @@ +name: promptflow-tracing-e2e-test + +on: + schedule: + - cron: "40 17 * * *" # Every day starting at 1:40 BJT + + pull_request: + paths: + - src/promptflow/** + - scripts/building/** + - .github/workflows/promptflow-tracing-e2e-test.yml + + workflow_dispatch: + + +env: + packageSetupType: promptflow_with_extra + testWorkingDirectory: ${{ github.workspace }}/src/promptflow + PYTHONPATH: ${{ github.workspace }}/src/promptflow + IS_IN_CI_PIPELINE: "true" + + +jobs: + build: + strategy: + fail-fast: false + runs-on: ubuntu-latest + steps: + - name: checkout + uses: actions/checkout@v4 + - name: Display and Set Environment Variables + run: | + env | sort >> $GITHUB_OUTPUT + id: display_env + shell: bash -el {0} + - name: Python Setup - ubuntu-latest - Python Version 3.9 + uses: "./.github/actions/step_create_python_environment" + with: + pythonVersion: 3.9 + - name: Build wheel + uses: "./.github/actions/step_sdk_setup" + with: + setupType: promptflow_with_extra + scriptPath: ${{ env.testWorkingDirectory }} + - name: Upload Wheel + if: always() + uses: actions/upload-artifact@v3 + with: + name: wheel + path: | + ${{ github.workspace }}/src/promptflow/dist/*.whl + ${{ github.workspace }}/src/promptflow-tools/dist/*.whl + + tracing_tests: + needs: build + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + pythonVersion: ['3.8', '3.9', '3.10', '3.11'] + runs-on: ${{ matrix.os }} + steps: + - name: checkout + uses: actions/checkout@v4 + + - name: Display and Set Environment Variables + run: | + env | sort >> $GITHUB_OUTPUT + id: display_env + shell: bash -el {0} + + - name: Python Setup - ${{ matrix.os }} - Python Version ${{ matrix.pythonVersion }} + uses: "./.github/actions/step_create_python_environment" + with: + pythonVersion: ${{ matrix.pythonVersion }} + + - name: Download Artifacts + uses: actions/download-artifact@v3 + with: + name: wheel + path: artifacts + + - name: Install wheel + shell: pwsh + working-directory: artifacts + run: | + Set-PSDebug -Trace 1 + pip install -r ${{ github.workspace }}/src/promptflow/dev_requirements.txt + gci ./promptflow -Recurse | % {if ($_.Name.Contains('.whl')) {python -m pip install "$($_.FullName)"}} + gci ./promptflow-tools -Recurse | % {if ($_.Name.Contains('.whl')) {python -m pip install $_.FullName}} + pip freeze + + - name: run promptflow-tracing test + shell: pwsh + working-directory: ${{ env.testWorkingDirectory }} + run: | + python "../../scripts/building/run_coverage_tests.py" ` + -p promptflow ` + -t ${{ github.workspace }}/src/promptflow/tests/tracing_test/e2etests ` + -l eastus ` + -m "e2etest" ` + -n ${{ steps.cpu-cores.outputs.count }} ` + --coverage-config ${{ github.workspace }}/src/promptflow/tests/tracing_test/.coveragerc ` + -o "${{ env.testWorkingDirectory }}/test-results-tracing.xml" + + - name: Upload Test Results + if: always() + uses: actions/upload-artifact@v3 + with: + name: Test Results (Python ${{ matrix.pythonVersion }}) (OS ${{ matrix.os }}) + path: | + ${{ env.testWorkingDirectory }}/*.xml + ${{ env.testWorkingDirectory }}/htmlcov/ + + + publish-test-results-tracing-test: + needs: tracing_tests + if: always() + + runs-on: ubuntu-latest + permissions: + checks: write + pull-requests: write + contents: read + issues: read + + steps: + - name: checkout + uses: actions/checkout@v4 + - name: Publish Test Results + uses: "./.github/actions/step_publish_test_results" + with: + testActionFileName: promptflow-tracing-e2e-test.yml + testResultTitle: promptflow-tracing e2e test result + osVersion: ubuntu-latest + pythonVersion: 3.9 + coverageThreshold: 40 + context: test/tracing \ No newline at end of file diff --git a/src/promptflow/tests/tracing_test/__init__.py b/src/promptflow/tests/tracing_test/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/promptflow/tests/tracing_test/e2etests/__init__.py b/src/promptflow/tests/tracing_test/e2etests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/promptflow/tests/tracing_test/e2etests/simple_functions.py b/src/promptflow/tests/tracing_test/e2etests/simple_functions.py new file mode 100644 index 000000000000..4a9355023616 --- /dev/null +++ b/src/promptflow/tests/tracing_test/e2etests/simple_functions.py @@ -0,0 +1,85 @@ +import asyncio +from time import sleep +from typing import Union + +from openai import AsyncAzureOpenAI, AzureOpenAI + +from promptflow.contracts.types import PromptTemplate +from promptflow.tracing import trace + + +@trace +def is_valid_name(name): + sleep(0.5) + return len(name) > 0 + + +@trace +def get_user_name(user_id): + sleep(0.5) + user_name = f"User {user_id}" + if not is_valid_name(user_name): + raise ValueError(f"Invalid user name: {user_name}") + + return user_name + + +@trace +def format_greeting(user_name): + sleep(0.5) + return f"Hello, {user_name}!" + + +@trace +def greetings(user_id): + user_name = get_user_name(user_id) + greeting = format_greeting(user_name) + return greeting + + +@trace +async def dummy_llm(prompt: str, model: str): + asyncio.sleep(0.5) + return "dummy_output" + + +@trace +async def dummy_llm_tasks_async(prompt: str, models: list): + tasks = [] + for model in models: + tasks.append(asyncio.create_task(dummy_llm(prompt, model))) + done, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) + return [task.result() for task in done] + + +@trace +def render_prompt_template(prompt: PromptTemplate, **kwargs): + for k, v in kwargs.items(): + prompt = prompt.replace(f"{{{{{k}}}}}", str(v)) + return prompt + + +@trace +def openai_chat(connection: dict, prompt: str, stream: bool = False): + client = AzureOpenAI(**connection) + + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt} + ] + response = client.chat.completions.create(model="gpt-35-turbo", messages=messages, stream=stream) + return response.choices[0].message.content or "" + + +@trace +def openai_completion(connection: dict, prompt: str): + client = AzureOpenAI(**connection) + response = client.completions.create(model="text-ada-001", prompt=prompt) + return response.choices[0].text or "" + + +@trace +async def openai_embedding_async(connection: dict, input: Union[str, list]): + client = AsyncAzureOpenAI(**connection) + resp = await client.embeddings.create(model="text-embedding-ada-002", input=input) + return resp.data[0].embedding diff --git a/src/promptflow/tests/tracing_test/e2etests/test_trace.py b/src/promptflow/tests/tracing_test/e2etests/test_trace.py new file mode 100644 index 000000000000..3fedb19ed4d6 --- /dev/null +++ b/src/promptflow/tests/tracing_test/e2etests/test_trace.py @@ -0,0 +1,275 @@ +import asyncio +import json + +import pytest +from opentelemetry.trace.status import StatusCode + +from promptflow._core.openai_injector import inject_openai_api +from promptflow.tracing.contracts.trace import TraceType + +from ..utils import execute_function_in_subprocess, prepare_memory_exporter +from .simple_functions import ( + dummy_llm_tasks_async, + greetings, + openai_chat, + openai_completion, + openai_embedding_async, + render_prompt_template, +) + +LLM_FUNCTION_NAMES = [ + "openai.resources.chat.completions.Completions.create", + "openai.resources.completions.Completions.create", + "openai.resources.chat.completions.AsyncCompletions.create", + "openai.resources.completions.AsyncCompletions.create", +] + +EMBEDDING_FUNCTION_NAMES = [ + "openai.resources.embeddings.Embeddings.create", + "openai.resources.embeddings.AsyncEmbeddings.create", +] + +LLM_TOKEN_NAMES = [ + "llm.token_count.prompt", + "llm.token_count.completion", + "llm.token_count.total", +] + +EMBEDDING_TOKEN_NAMES = [ + "embedding.token_count.prompt", + "embedding.token_count.total", +] + +CUMULATIVE_LLM_TOKEN_NAMES = [ + "__computed__.cumulative_token_count.prompt", + "__computed__.cumulative_token_count.completion", + "__computed__.cumulative_token_count.total", +] + +CUMULATIVE_EMBEDDING_TOKEN_NAMES = [ + "__computed__.cumulative_token_count.prompt", + "__computed__.cumulative_token_count.total", +] + + +@pytest.mark.usefixtures("dev_connections") +@pytest.mark.e2etest +class TestTracing: + @pytest.mark.parametrize( + "func, inputs, expected_span_length", + [ + (greetings, {"user_id": 1}, 4), + (dummy_llm_tasks_async, {"prompt": "Hello", "models": ["model_1", "model_1"]}, 3), + ], + ) + def test_otel_trace(self, func, inputs, expected_span_length): + execute_function_in_subprocess(self.assert_otel_trace, func, inputs, expected_span_length) + + def assert_otel_trace(self, func, inputs, expected_span_length): + exporter = prepare_memory_exporter() + + result = self.run_func(func, inputs) + assert isinstance(result, (str, list)) + span_list = exporter.get_finished_spans() + self.validate_span_list(span_list, expected_span_length) + + @pytest.mark.parametrize( + "func, inputs", + [ + (render_prompt_template, {"prompt": "Hello {{name}}!", "name": "world"}), + ] + ) + def test_otel_trace_with_prompt(self, func, inputs): + execute_function_in_subprocess(self.assert_otel_traces_with_prompt, func, inputs) + + def assert_otel_traces_with_prompt(self, func, inputs): + memory_exporter = prepare_memory_exporter() + + result = self.run_func(func, inputs) + assert result == "Hello world!" + + span_list = memory_exporter.get_finished_spans() + for span in span_list: + assert span.status.status_code == StatusCode.OK + assert isinstance(span.name, str) + if span.attributes.get("function", "") == "render_prompt_template": + assert "prompt.template" in span.attributes + assert span.attributes["prompt.template"] == inputs["prompt"] + assert "prompt.variables" in span.attributes + for var in inputs: + if var == "prompt": + continue + assert var in span.attributes["prompt.variables"] + + @pytest.mark.parametrize( + "func, inputs, expected_span_length", + [ + (openai_chat, {"prompt": "Hello"}, 2), + (openai_completion, {"prompt": "Hello"}, 2), + ], + ) + def test_otel_trace_with_llm(self, dev_connections, func, inputs, expected_span_length): + execute_function_in_subprocess(self.assert_otel_trace_with_llm, dev_connections, func, inputs, expected_span_length) + + def assert_otel_trace_with_llm(self, dev_connections, func, inputs, expected_span_length): + inject_openai_api() + exporter = prepare_memory_exporter() + + inputs = self.add_azure_connection_to_inputs(inputs, dev_connections) + result = self.run_func(func, inputs) + assert isinstance(result, str) + span_list = exporter.get_finished_spans() + self.validate_span_list(span_list, expected_span_length) + self.validate_openai_tokens(span_list) + + @pytest.mark.parametrize( + "func, inputs, expected_span_length", + [ + (openai_embedding_async, {"input": "Hello"}, 2), + # [9906] is the tokenized version of "Hello" + (openai_embedding_async, {"input": [9906]}, 2), + ] + ) + def test_otel_trace_with_embedding( + self, + dev_connections, + func, + inputs, + expected_span_length, + ): + execute_function_in_subprocess( + self.assert_otel_traces_with_embedding, dev_connections, func, inputs, expected_span_length + ) + + def assert_otel_traces_with_embedding(self, dev_connections, func, inputs, expected_span_length): + inject_openai_api() + memory_exporter = prepare_memory_exporter() + + inputs = self.add_azure_connection_to_inputs(inputs, dev_connections) + result = self.run_func(func, inputs) + assert isinstance(result, list) + + span_list = memory_exporter.get_finished_spans() + self.validate_span_list(span_list, expected_span_length) + self.validate_openai_tokens(span_list) + for span in span_list: + if span.attributes.get("function", "") in EMBEDDING_FUNCTION_NAMES: + assert span.attributes.get("embedding.model", "") == "ada" + embeddings = span.attributes.get("embedding.embeddings", "") + assert "embedding.vector" in embeddings + assert "embedding.text" in embeddings + if isinstance(inputs["input"], list): + # If the input is a token array, which is list of int, the attribute should contains + # the length of the token array ''. + assert "dimensional token" in embeddings + else: + # If the input is a string, the attribute should contains the original input string. + assert inputs["input"] in embeddings + + def test_otel_trace_with_multiple_functions(self): + execute_function_in_subprocess(self.assert_otel_traces_with_multiple_functions) + + def assert_otel_traces_with_multiple_functions(self): + memory_exporter = prepare_memory_exporter() + + result = self.run_func(greetings, {"user_id": 1}) + assert isinstance(result, str) + result = self.run_func(dummy_llm_tasks_async, {"prompt": "Hello", "models": ["model_1", "model_1"]}) + assert isinstance(result, list) + + span_list = memory_exporter.get_finished_spans() + assert len(span_list) == 7, f"Got {len(span_list)} spans." # 4 + 3 spans in total + root_spans = [span for span in span_list if span.parent is None] + assert len(root_spans) == 2, f"Expected 2 root spans, got {len(root_spans)}" + assert root_spans[0].attributes["function"] == "greetings" + assert root_spans[1].attributes["function"] == "dummy_llm_tasks_async" + assert root_spans[1] == span_list[-1] # It should be the last span + sub_level_span = span_list[-2] # It should be the second last span + expected_values = { + "framework": "promptflow", + "span_type": "Function", + } + for span in span_list: + for k, v in expected_values.items(): + assert span.attributes[k] == v, f"span.attributes[{k}] = {span.attributes[k]}, expected: {v}" + assert ( + sub_level_span.parent.span_id == root_spans[1].context.span_id + ) # sub_level_span is a child of the second root span + + def run_func(self, func, inputs): + if asyncio.iscoroutinefunction(func): + return asyncio.run(func(**inputs)) + else: + return func(**inputs) + + def add_azure_connection_to_inputs(self, inputs, dev_connections): + conn_name = "azure_open_ai_connection" + if conn_name not in dev_connections: + raise ValueError(f"Connection '{conn_name}' not found in dev connections.") + conn_dict = { + "api_key": dev_connections[conn_name]["value"]["api_key"], + "azure_endpoint": dev_connections[conn_name]["value"]["api_base"], + "api_version": dev_connections[conn_name]["value"]["api_version"], + } + inputs["connection"] = conn_dict + return inputs + + def validate_span_list(self, span_list, expected_span_length): + assert len(span_list) == expected_span_length, f"Got {len(span_list)} spans." + root_spans = [span for span in span_list if span.parent is None] + assert len(root_spans) == 1 + root_span = root_spans[0] + for span in span_list: + assert span.status.status_code == StatusCode.OK + assert isinstance(span.name, str) + assert span.attributes["framework"] == "promptflow" + if span.attributes.get("function", "") in LLM_FUNCTION_NAMES: + expected_span_type = TraceType.LLM + elif span.attributes.get("function", "") in EMBEDDING_FUNCTION_NAMES: + expected_span_type = TraceType.EMBEDDING + else: + expected_span_type = TraceType.FUNCTION + msg = f"span_type: {span.attributes['span_type']}, expected: {expected_span_type}" + assert span.attributes["span_type"] == expected_span_type, msg + if span != root_span: # Non-root spans should have a parent + assert span.attributes["function"] + inputs = json.loads(span.attributes["inputs"]) + output = json.loads(span.attributes["output"]) + assert isinstance(inputs, dict) + assert output is not None + + def validate_openai_tokens(self, span_list): + span_dict = {span.context.span_id: span for span in span_list} + expected_tokens = {} + for span in span_list: + tokens = None + # Validate the openai tokens are correctly set in the llm trace. + if span.attributes.get("function", "") in LLM_FUNCTION_NAMES: + for token_name in LLM_TOKEN_NAMES + CUMULATIVE_LLM_TOKEN_NAMES: + assert token_name in span.attributes + tokens = {token_name: span.attributes[token_name] for token_name in CUMULATIVE_LLM_TOKEN_NAMES} + # Validate the openai tokens are correctly set in the embedding trace. + if span.attributes.get("function", "") in EMBEDDING_FUNCTION_NAMES: + for token_name in EMBEDDING_TOKEN_NAMES + CUMULATIVE_EMBEDDING_TOKEN_NAMES: + assert token_name in span.attributes + tokens = {token_name: span.attributes[token_name] for token_name in CUMULATIVE_EMBEDDING_TOKEN_NAMES} + # Aggregate the tokens to the parent span. + if tokens is not None: + current_span_id = span.context.span_id + while True: + if current_span_id in expected_tokens: + expected_tokens[current_span_id] = { + key: expected_tokens[current_span_id][key] + tokens[key] for key in tokens + } + else: + expected_tokens[current_span_id] = tokens + parent_cxt = getattr(span_dict[current_span_id], "parent", None) + if parent_cxt is None: + break + current_span_id = parent_cxt.span_id + # Validate the aggregated tokens are correctly set in the parent span. + for span in span_list: + span_id = span.context.span_id + if span_id in expected_tokens: + for token_name in expected_tokens[span_id]: + assert span.attributes[token_name] == expected_tokens[span_id][token_name] diff --git a/src/promptflow/tests/tracing_test/utils.py b/src/promptflow/tests/tracing_test/utils.py new file mode 100644 index 000000000000..7654bd1bb679 --- /dev/null +++ b/src/promptflow/tests/tracing_test/utils.py @@ -0,0 +1,41 @@ +import traceback + +from multiprocessing import Queue, get_context + +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import set_tracer_provider + + +def execute_function_in_subprocess(func, *args, **kwargs): + """ + Execute a function in a new process and return any exception that occurs. + Replace pickle with dill for better serialization capabilities. + """ + ctx = get_context("spawn") + error_queue = ctx.Queue() + process = ctx.Process(target=_run_in_subprocess, args=(error_queue, func, args, kwargs)) + process.start() + process.join() # Wait for the process to finish + + if not error_queue.empty(): + err, stacktrace_str = error_queue.get() + raise Exception(f"An error occurred in the subprocess: {err}\nStacktrace:\n{stacktrace_str}") + assert process.exitcode == 0, f"Subprocess exited with code {process.exitcode}" + + +def _run_in_subprocess(error_queue: Queue, func, args, kwargs): + try: + func(*args, **kwargs) + except BaseException as e: + error_queue.put((repr(e), traceback.format_exc())) + + +def prepare_memory_exporter(): + provider = TracerProvider() + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + provider.add_span_processor(processor) + set_tracer_provider(provider) + return exporter