From f4215a4c51ceca5c9fb8a6f2946ecc4e18cf81fc Mon Sep 17 00:00:00 2001 From: Lina Tang Date: Mon, 11 Mar 2024 11:10:40 +0800 Subject: [PATCH] [Tracing] Add trace tests to tracing_test (#2252) # Description Add tracing tests to tracing_test. # All Promptflow Contribution checklist: - [x] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [x] Title of the pull request is clear and informative. - [x] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [x] Pull request includes test coverage for the included changes. --------- Co-authored-by: Lina Tang --- .../workflows/promptflow-tracing-e2e-test.yml | 152 ++++++++++ src/promptflow/tests/tracing_test/__init__.py | 0 .../tests/tracing_test/e2etests/__init__.py | 0 .../tracing_test/e2etests/simple_functions.py | 85 ++++++ .../tests/tracing_test/e2etests/test_trace.py | 277 ++++++++++++++++++ src/promptflow/tests/tracing_test/utils.py | 41 +++ 6 files changed, 555 insertions(+) create mode 100644 .github/workflows/promptflow-tracing-e2e-test.yml create mode 100644 src/promptflow/tests/tracing_test/__init__.py create mode 100644 src/promptflow/tests/tracing_test/e2etests/__init__.py create mode 100644 src/promptflow/tests/tracing_test/e2etests/simple_functions.py create mode 100644 src/promptflow/tests/tracing_test/e2etests/test_trace.py create mode 100644 src/promptflow/tests/tracing_test/utils.py diff --git a/.github/workflows/promptflow-tracing-e2e-test.yml b/.github/workflows/promptflow-tracing-e2e-test.yml new file mode 100644 index 00000000000..6c732273984 --- /dev/null +++ b/.github/workflows/promptflow-tracing-e2e-test.yml @@ -0,0 +1,152 @@ +name: promptflow-tracing-e2e-test + +on: + schedule: + - cron: "40 17 * * *" # Every day starting at 1:40 BJT + + pull_request: + paths: + - src/promptflow/** + - scripts/building/** + - .github/workflows/promptflow-tracing-e2e-test.yml + + workflow_dispatch: + + +env: + packageSetupType: promptflow_with_extra + testWorkingDirectory: ${{ github.workspace }}/src/promptflow + PYTHONPATH: ${{ github.workspace }}/src/promptflow + IS_IN_CI_PIPELINE: "true" + + +jobs: + build: + strategy: + fail-fast: false + runs-on: ubuntu-latest + steps: + - name: checkout + uses: actions/checkout@v4 + - name: Display and Set Environment Variables + run: | + env | sort >> $GITHUB_OUTPUT + id: display_env + shell: bash -el {0} + - name: Python Setup - ubuntu-latest - Python Version 3.9 + uses: "./.github/actions/step_create_python_environment" + with: + pythonVersion: 3.9 + - name: Build wheel + uses: "./.github/actions/step_sdk_setup" + with: + setupType: promptflow_with_extra + scriptPath: ${{ env.testWorkingDirectory }} + - name: Upload Wheel + if: always() + uses: actions/upload-artifact@v3 + with: + name: wheel + path: | + ${{ github.workspace }}/src/promptflow/dist/*.whl + ${{ github.workspace }}/src/promptflow-tools/dist/*.whl + + tracing_tests: + needs: build + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + pythonVersion: ['3.8', '3.9', '3.10', '3.11'] + runs-on: ${{ matrix.os }} + steps: + - name: checkout + uses: actions/checkout@v4 + + - name: Display and Set Environment Variables + run: | + env | sort >> $GITHUB_OUTPUT + id: display_env + shell: bash -el {0} + + - name: Python Setup - ${{ matrix.os }} - Python Version ${{ matrix.pythonVersion }} + uses: "./.github/actions/step_create_python_environment" + with: + pythonVersion: ${{ matrix.pythonVersion }} + + - name: Download Artifacts + uses: actions/download-artifact@v3 + with: + name: wheel + path: artifacts + + - name: Install wheel + shell: pwsh + working-directory: artifacts + run: | + Set-PSDebug -Trace 1 + pip install -r ${{ github.workspace }}/src/promptflow/dev_requirements.txt + gci ./promptflow -Recurse | % {if ($_.Name.Contains('.whl')) {python -m pip install "$($_.FullName)"}} + gci ./promptflow-tools -Recurse | % {if ($_.Name.Contains('.whl')) {python -m pip install $_.FullName}} + pip freeze + + - name: Azure Login + uses: azure/login@v1 + with: + creds: ${{ secrets.AZURE_CREDENTIALS }} + + - name: Generate Configs + uses: "./.github/actions/step_generate_configs" + with: + targetFolder: ${{ env.testWorkingDirectory }} + + - name: Get number of CPU cores + uses: SimenB/github-actions-cpu-cores@v1 + id: cpu-cores + + - name: run promptflow-tracing test + shell: pwsh + working-directory: ${{ env.testWorkingDirectory }} + run: | + python "../../scripts/building/run_coverage_tests.py" ` + -p promptflow ` + -t ${{ github.workspace }}/src/promptflow/tests/tracing_test/e2etests ` + -l eastus ` + -m "e2etest" ` + -n ${{ steps.cpu-cores.outputs.count }} ` + --coverage-config ${{ github.workspace }}/src/promptflow/tests/tracing_test/.coveragerc ` + -o "${{ env.testWorkingDirectory }}/test-results-tracing.xml" + + - name: Upload Test Results + if: always() + uses: actions/upload-artifact@v3 + with: + name: Test Results (Python ${{ matrix.pythonVersion }}) (OS ${{ matrix.os }}) + path: | + ${{ env.testWorkingDirectory }}/*.xml + ${{ env.testWorkingDirectory }}/htmlcov/ + + + publish-test-results-tracing-test: + needs: tracing_tests + if: always() + + runs-on: ubuntu-latest + permissions: + checks: write + pull-requests: write + contents: read + issues: read + + steps: + - name: checkout + uses: actions/checkout@v4 + - name: Publish Test Results + uses: "./.github/actions/step_publish_test_results" + with: + testActionFileName: promptflow-tracing-e2e-test.yml + testResultTitle: promptflow-tracing e2e test result + osVersion: ubuntu-latest + pythonVersion: 3.9 + coverageThreshold: 40 + context: test/tracing \ No newline at end of file diff --git a/src/promptflow/tests/tracing_test/__init__.py b/src/promptflow/tests/tracing_test/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/promptflow/tests/tracing_test/e2etests/__init__.py b/src/promptflow/tests/tracing_test/e2etests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/promptflow/tests/tracing_test/e2etests/simple_functions.py b/src/promptflow/tests/tracing_test/e2etests/simple_functions.py new file mode 100644 index 00000000000..9e0732637ee --- /dev/null +++ b/src/promptflow/tests/tracing_test/e2etests/simple_functions.py @@ -0,0 +1,85 @@ +import asyncio +from time import sleep +from typing import Union + +from openai import AsyncAzureOpenAI, AzureOpenAI + +from promptflow.contracts.types import PromptTemplate +from promptflow.tracing import trace + + +@trace +def is_valid_name(name): + sleep(0.5) + return len(name) > 0 + + +@trace +def get_user_name(user_id): + sleep(0.5) + user_name = f"User {user_id}" + if not is_valid_name(user_name): + raise ValueError(f"Invalid user name: {user_name}") + + return user_name + + +@trace +def format_greeting(user_name): + sleep(0.5) + return f"Hello, {user_name}!" + + +@trace +def greetings(user_id): + user_name = get_user_name(user_id) + greeting = format_greeting(user_name) + return greeting + + +@trace +async def dummy_llm(prompt: str, model: str): + asyncio.sleep(0.5) + return "dummy_output" + + +@trace +async def dummy_llm_tasks_async(prompt: str, models: list): + tasks = [] + for model in models: + tasks.append(asyncio.create_task(dummy_llm(prompt, model))) + done, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) + return [task.result() for task in done] + + +@trace +def render_prompt_template(prompt: PromptTemplate, **kwargs): + for k, v in kwargs.items(): + prompt = prompt.replace(f"{{{{{k}}}}}", str(v)) + return prompt + + +@trace +def openai_chat(connection: dict, prompt: str, stream: bool = False): + client = AzureOpenAI(**connection) + + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt} + ] + response = client.chat.completions.create(model="gpt-35-turbo", messages=messages, stream=stream) + return response.choices[0].message.content or "" + + +@trace +def openai_completion(connection: dict, prompt: str): + client = AzureOpenAI(**connection) + response = client.completions.create(model="text-ada-001", prompt=prompt) + return response.choices[0].text or "" + + +@trace +async def openai_embedding_async(connection: dict, input: Union[str, list]): + client = AsyncAzureOpenAI(**connection) + resp = await client.embeddings.create(model="text-embedding-ada-002", input=input) + return resp.data[0].embedding diff --git a/src/promptflow/tests/tracing_test/e2etests/test_trace.py b/src/promptflow/tests/tracing_test/e2etests/test_trace.py new file mode 100644 index 00000000000..0c9fdbcf95f --- /dev/null +++ b/src/promptflow/tests/tracing_test/e2etests/test_trace.py @@ -0,0 +1,277 @@ +import asyncio +import json + +import pytest +from opentelemetry.trace.status import StatusCode + +from promptflow.tracing._openai_injector import inject_openai_api +from promptflow.tracing.contracts.trace import TraceType + +from ..utils import execute_function_in_subprocess, prepare_memory_exporter +from .simple_functions import ( + dummy_llm_tasks_async, + greetings, + openai_chat, + openai_completion, + openai_embedding_async, + render_prompt_template, +) + +LLM_FUNCTION_NAMES = [ + "openai.resources.chat.completions.Completions.create", + "openai.resources.completions.Completions.create", + "openai.resources.chat.completions.AsyncCompletions.create", + "openai.resources.completions.AsyncCompletions.create", +] + +EMBEDDING_FUNCTION_NAMES = [ + "openai.resources.embeddings.Embeddings.create", + "openai.resources.embeddings.AsyncEmbeddings.create", +] + +LLM_TOKEN_NAMES = [ + "llm.token_count.prompt", + "llm.token_count.completion", + "llm.token_count.total", +] + +EMBEDDING_TOKEN_NAMES = [ + "embedding.token_count.prompt", + "embedding.token_count.total", +] + +CUMULATIVE_LLM_TOKEN_NAMES = [ + "__computed__.cumulative_token_count.prompt", + "__computed__.cumulative_token_count.completion", + "__computed__.cumulative_token_count.total", +] + +CUMULATIVE_EMBEDDING_TOKEN_NAMES = [ + "__computed__.cumulative_token_count.prompt", + "__computed__.cumulative_token_count.total", +] + + +@pytest.mark.usefixtures("dev_connections") +@pytest.mark.e2etest +class TestTracing: + @pytest.mark.parametrize( + "func, inputs, expected_span_length", + [ + (greetings, {"user_id": 1}, 4), + (dummy_llm_tasks_async, {"prompt": "Hello", "models": ["model_1", "model_1"]}, 3), + ], + ) + def test_otel_trace(self, func, inputs, expected_span_length): + execute_function_in_subprocess(self.assert_otel_trace, func, inputs, expected_span_length) + + def assert_otel_trace(self, func, inputs, expected_span_length): + exporter = prepare_memory_exporter() + + result = self.run_func(func, inputs) + assert isinstance(result, (str, list)) + span_list = exporter.get_finished_spans() + self.validate_span_list(span_list, expected_span_length) + + @pytest.mark.parametrize( + "func, inputs", + [ + (render_prompt_template, {"prompt": "Hello {{name}}!", "name": "world"}), + ] + ) + def test_otel_trace_with_prompt(self, func, inputs): + execute_function_in_subprocess(self.assert_otel_traces_with_prompt, func, inputs) + + def assert_otel_traces_with_prompt(self, func, inputs): + memory_exporter = prepare_memory_exporter() + + result = self.run_func(func, inputs) + assert result == "Hello world!" + + span_list = memory_exporter.get_finished_spans() + for span in span_list: + assert span.status.status_code == StatusCode.OK + assert isinstance(span.name, str) + if span.attributes.get("function", "") == "render_prompt_template": + assert "prompt.template" in span.attributes + assert span.attributes["prompt.template"] == inputs["prompt"] + assert "prompt.variables" in span.attributes + for var in inputs: + if var == "prompt": + continue + assert var in span.attributes["prompt.variables"] + + @pytest.mark.parametrize( + "func, inputs, expected_span_length", + [ + (openai_chat, {"prompt": "Hello"}, 2), + (openai_completion, {"prompt": "Hello"}, 2), + ], + ) + def test_otel_trace_with_llm(self, dev_connections, func, inputs, expected_span_length): + execute_function_in_subprocess( + self.assert_otel_trace_with_llm, dev_connections, func, inputs, expected_span_length + ) + + def assert_otel_trace_with_llm(self, dev_connections, func, inputs, expected_span_length): + inject_openai_api() + exporter = prepare_memory_exporter() + + inputs = self.add_azure_connection_to_inputs(inputs, dev_connections) + result = self.run_func(func, inputs) + assert isinstance(result, str) + span_list = exporter.get_finished_spans() + self.validate_span_list(span_list, expected_span_length) + self.validate_openai_tokens(span_list) + + @pytest.mark.parametrize( + "func, inputs, expected_span_length", + [ + (openai_embedding_async, {"input": "Hello"}, 2), + # [9906] is the tokenized version of "Hello" + (openai_embedding_async, {"input": [9906]}, 2), + ] + ) + def test_otel_trace_with_embedding( + self, + dev_connections, + func, + inputs, + expected_span_length, + ): + execute_function_in_subprocess( + self.assert_otel_traces_with_embedding, dev_connections, func, inputs, expected_span_length + ) + + def assert_otel_traces_with_embedding(self, dev_connections, func, inputs, expected_span_length): + inject_openai_api() + memory_exporter = prepare_memory_exporter() + + inputs = self.add_azure_connection_to_inputs(inputs, dev_connections) + result = self.run_func(func, inputs) + assert isinstance(result, list) + + span_list = memory_exporter.get_finished_spans() + self.validate_span_list(span_list, expected_span_length) + self.validate_openai_tokens(span_list) + for span in span_list: + if span.attributes.get("function", "") in EMBEDDING_FUNCTION_NAMES: + assert span.attributes.get("embedding.model", "") == "ada" + embeddings = span.attributes.get("embedding.embeddings", "") + assert "embedding.vector" in embeddings + assert "embedding.text" in embeddings + if isinstance(inputs["input"], list): + # If the input is a token array, which is list of int, the attribute should contains + # the length of the token array ''. + assert "dimensional token" in embeddings + else: + # If the input is a string, the attribute should contains the original input string. + assert inputs["input"] in embeddings + + def test_otel_trace_with_multiple_functions(self): + execute_function_in_subprocess(self.assert_otel_traces_with_multiple_functions) + + def assert_otel_traces_with_multiple_functions(self): + memory_exporter = prepare_memory_exporter() + + result = self.run_func(greetings, {"user_id": 1}) + assert isinstance(result, str) + result = self.run_func(dummy_llm_tasks_async, {"prompt": "Hello", "models": ["model_1", "model_1"]}) + assert isinstance(result, list) + + span_list = memory_exporter.get_finished_spans() + assert len(span_list) == 7, f"Got {len(span_list)} spans." # 4 + 3 spans in total + root_spans = [span for span in span_list if span.parent is None] + assert len(root_spans) == 2, f"Expected 2 root spans, got {len(root_spans)}" + assert root_spans[0].attributes["function"] == "greetings" + assert root_spans[1].attributes["function"] == "dummy_llm_tasks_async" + assert root_spans[1] == span_list[-1] # It should be the last span + sub_level_span = span_list[-2] # It should be the second last span + expected_values = { + "framework": "promptflow", + "span_type": "Function", + } + for span in span_list: + for k, v in expected_values.items(): + assert span.attributes[k] == v, f"span.attributes[{k}] = {span.attributes[k]}, expected: {v}" + assert ( + sub_level_span.parent.span_id == root_spans[1].context.span_id + ) # sub_level_span is a child of the second root span + + def run_func(self, func, inputs): + if asyncio.iscoroutinefunction(func): + return asyncio.run(func(**inputs)) + else: + return func(**inputs) + + def add_azure_connection_to_inputs(self, inputs, dev_connections): + conn_name = "azure_open_ai_connection" + if conn_name not in dev_connections: + raise ValueError(f"Connection '{conn_name}' not found in dev connections.") + conn_dict = { + "api_key": dev_connections[conn_name]["value"]["api_key"], + "azure_endpoint": dev_connections[conn_name]["value"]["api_base"], + "api_version": dev_connections[conn_name]["value"]["api_version"], + } + inputs["connection"] = conn_dict + return inputs + + def validate_span_list(self, span_list, expected_span_length): + assert len(span_list) == expected_span_length, f"Got {len(span_list)} spans." + root_spans = [span for span in span_list if span.parent is None] + assert len(root_spans) == 1 + root_span = root_spans[0] + for span in span_list: + assert span.status.status_code == StatusCode.OK + assert isinstance(span.name, str) + assert span.attributes["framework"] == "promptflow" + if span.attributes.get("function", "") in LLM_FUNCTION_NAMES: + expected_span_type = TraceType.LLM + elif span.attributes.get("function", "") in EMBEDDING_FUNCTION_NAMES: + expected_span_type = TraceType.EMBEDDING + else: + expected_span_type = TraceType.FUNCTION + msg = f"span_type: {span.attributes['span_type']}, expected: {expected_span_type}" + assert span.attributes["span_type"] == expected_span_type, msg + if span != root_span: # Non-root spans should have a parent + assert span.attributes["function"] + inputs = json.loads(span.attributes["inputs"]) + output = json.loads(span.attributes["output"]) + assert isinstance(inputs, dict) + assert output is not None + + def validate_openai_tokens(self, span_list): + span_dict = {span.context.span_id: span for span in span_list} + expected_tokens = {} + for span in span_list: + tokens = None + # Validate the openai tokens are correctly set in the llm trace. + if span.attributes.get("function", "") in LLM_FUNCTION_NAMES: + for token_name in LLM_TOKEN_NAMES + CUMULATIVE_LLM_TOKEN_NAMES: + assert token_name in span.attributes + tokens = {token_name: span.attributes[token_name] for token_name in CUMULATIVE_LLM_TOKEN_NAMES} + # Validate the openai tokens are correctly set in the embedding trace. + if span.attributes.get("function", "") in EMBEDDING_FUNCTION_NAMES: + for token_name in EMBEDDING_TOKEN_NAMES + CUMULATIVE_EMBEDDING_TOKEN_NAMES: + assert token_name in span.attributes + tokens = {token_name: span.attributes[token_name] for token_name in CUMULATIVE_EMBEDDING_TOKEN_NAMES} + # Aggregate the tokens to the parent span. + if tokens is not None: + current_span_id = span.context.span_id + while True: + if current_span_id in expected_tokens: + expected_tokens[current_span_id] = { + key: expected_tokens[current_span_id][key] + tokens[key] for key in tokens + } + else: + expected_tokens[current_span_id] = tokens + parent_cxt = getattr(span_dict[current_span_id], "parent", None) + if parent_cxt is None: + break + current_span_id = parent_cxt.span_id + # Validate the aggregated tokens are correctly set in the parent span. + for span in span_list: + span_id = span.context.span_id + if span_id in expected_tokens: + for token_name in expected_tokens[span_id]: + assert span.attributes[token_name] == expected_tokens[span_id][token_name] diff --git a/src/promptflow/tests/tracing_test/utils.py b/src/promptflow/tests/tracing_test/utils.py new file mode 100644 index 00000000000..7654bd1bb67 --- /dev/null +++ b/src/promptflow/tests/tracing_test/utils.py @@ -0,0 +1,41 @@ +import traceback + +from multiprocessing import Queue, get_context + +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import set_tracer_provider + + +def execute_function_in_subprocess(func, *args, **kwargs): + """ + Execute a function in a new process and return any exception that occurs. + Replace pickle with dill for better serialization capabilities. + """ + ctx = get_context("spawn") + error_queue = ctx.Queue() + process = ctx.Process(target=_run_in_subprocess, args=(error_queue, func, args, kwargs)) + process.start() + process.join() # Wait for the process to finish + + if not error_queue.empty(): + err, stacktrace_str = error_queue.get() + raise Exception(f"An error occurred in the subprocess: {err}\nStacktrace:\n{stacktrace_str}") + assert process.exitcode == 0, f"Subprocess exited with code {process.exitcode}" + + +def _run_in_subprocess(error_queue: Queue, func, args, kwargs): + try: + func(*args, **kwargs) + except BaseException as e: + error_queue.put((repr(e), traceback.format_exc())) + + +def prepare_memory_exporter(): + provider = TracerProvider() + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + provider.add_span_processor(processor) + set_tracer_provider(provider) + return exporter