Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add openai batch example #1667

Merged
merged 17 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ auto_examples/mmcloud_agent/index
auto_examples/modin_plugin/index
auto_examples/kfmpi_plugin/index
auto_examples/onnx_plugin/index
auto_examples/openai_batch_agent/index
auto_examples/papermill_plugin/index
auto_examples/pandera_plugin/index
auto_examples/kfpytorch_plugin/index
Expand Down
2 changes: 2 additions & 0 deletions docs/integrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ orchestrated by Flyte itself, within its provisioned Kubernetes clusters.
- Run Databricks jobs in your workflows with the Databricks agent.
* - {doc}`Memory Machine Cloud <auto_examples/mmcloud_agent/index>`
- Execute tasks using the MemVerge Memory Machine Cloud agent.
* - {doc}`OpenAI Batch <auto_examples/openai_batch_agent/index>`
- Submit requests for asynchronous batch processing on OpenAI.
* - {doc}`SageMaker Inference <auto_examples/sagemaker_inference_agent/index>`
- Deploy models and create, as well as trigger inference endpoints on SageMaker.
* - {doc}`Sensor <auto_examples/sensor/index>`
Expand Down
23 changes: 23 additions & 0 deletions examples/openai_batch_agent/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# ######################
# NOTE: For CI/CD only #
########################
FROM python:3.11-slim-buster
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytesnacks

WORKDIR /root
ENV VENV /opt/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root

# Install Python dependencies
COPY requirements.in /root
RUN pip install -r /root/requirements.in

# Copy the actual code
COPY . /root/

# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
ARG tag
ENV FLYTE_INTERNAL_IMAGE $tag
48 changes: 48 additions & 0 deletions examples/openai_batch_agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
(openai_batch_agent)=

# OpenAI Batch Agent

```{eval-rst}
.. tags:: Integration, Intermediate, OpenAI
```

The Batch API agent allows you to submit requests for asynchronous batch processing on OpenAI.
You can provide either a JSONL file or a JSON iterator, and the agent handles the upload to OpenAI,
creation of the batch, and downloading of the output and error files.

## Installation

To use the OpenAI Batch agent, run the following command:

```
pip install flytekitplugins-openai
```

## Example usage

For a usage example, see {doc}`OpenAI Batch agent example usage <openai_batch_agent_example_usage>`.

## Local testing

To test an agent locally, create a class for the agent task that inherits from
[SyncAgentExecutorMixin](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L222-L256)
or [AsyncAgentExecutorMixin](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L259-L354).
These mixins can handle synchronous and synchronous tasks, respectively,
and allow flytekit to mimic FlytePropeller's behavior in calling the agent.
For more information, see "[Testing agents locally](https://docs.flyte.org/en/latest/flyte_agents/testing_agents_locally.html)".

## Flyte deployment configuration

```{note}
If you are using a managed deployment of Flyte, you will need to contact your deployment administrator to configure agents in your deployment.
```

To enable the OpenAI Batch agent in your Flyte deployment, refer to the
{ref}`OpenAI Batch agent setup guide <deployment-agent-setup-openai-batch>`.

```{toctree}
:maxdepth: -1
:hidden:

openai_batch_agent_example_usage
```
Empty file.
52 changes: 52 additions & 0 deletions examples/openai_batch_agent/openai_batch_agent/data.jsonl

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# %% [markdown]
# (openai_batch_agent_example_usage)=
#
# # Batching Requests for Asynchronous Processing
#
# This example demonstrates how to send a batch of API requests to GPT models for asynchronous processing.
#
# Every batch input should include `custom_id`, `method`, `url`, and `body`.
# You can provide either a `JSONLFile` or `Iterator[JSON]`, and the agent handles the file upload to OpenAI,
# creation of the batch, and downloading of the output and error files.
#
# ## Using `Iterator`
#
# Here's how you can provide an `Iterator` as an input to the agent:
# %%
import os
from typing import Iterator

from flytekit import Secret, workflow
from flytekit.types.file import JSONLFile
from flytekit.types.iterator import JSON
from flytekitplugins.openai import BatchResult, create_batch


def jsons():
for x in [
{
"custom_id": "request-1",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-3.5-turbo",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is 2+2?"},
],
},
},
{
"custom_id": "request-2",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-3.5-turbo",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Who won the world series in 2020?"},
],
},
},
]:
yield x


iterator_batch = create_batch(
name="gpt-3.5-turbo",
openai_organization="your-org",
secret=Secret(group="openai", key="api-key"),
)


@workflow
def json_iterator_wf(json_vals: Iterator[JSON] = jsons()) -> BatchResult:
return iterator_batch(jsonl_in=json_vals)


# %% [markdown]
# The `create_batch` function returns an imperative workflow responsible for uploading the JSON data to OpenAI,
# creating a batch, polling the status of the batch to check for completion, and downloading the
# output and error files. It also accepts a `config` parameter, allowing you to provide `metadata`, `endpoint`,
# and `completion_window` values. These parameters default to their respective default values.
#
# `BatchResult` is a dataclass that contains the paths to the output file and the error file.
#
# ## Using `JSONLFile`
#
# The following code snippet demonstrates how to send a JSONL file to the `create_batch` function:
# %%
file_batch = create_batch(
name="gpt-3.5-turbo",
openai_organization="your-org",
secret=Secret(group="openai", key="api-key"),
is_json_iterator=False,
)


@workflow
def jsonl_wf(
jsonl_file: JSONLFile = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data.jsonl")
) -> BatchResult:
return file_batch(jsonl_in=jsonl_file)


# %% [markdown]
# The iterator **streams JSON objects to a JSONL file**. If you have large batches of requests or have distinct JSON objects that
# you want to run predictions on, we recommend you use the iterator.
#
# You can find more info about the [Batch API in the OpenAI docs](https://help.openai.com/en/articles/9197833-batch-api-faq).
1 change: 1 addition & 0 deletions examples/openai_batch_agent/requirements.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
flytekitplugins-openai>=1.12.1b2
Loading