From 60b1b7e9beb1e0c7881119715253421778c81ce0 Mon Sep 17 00:00:00 2001 From: Stainless Bot Date: Mon, 1 Apr 2024 22:39:18 +0000 Subject: [PATCH] feat(api): run polling helpers refactor: rename createAndStream to stream --- README.md | 20 +- api.md | 5 + examples/assistant.py | 27 +- examples/assistant_stream_helpers.py | 2 +- helpers.md | 10 +- .../resources/beta/threads/runs/runs.py | 495 +++++++++++++++++- src/openai/resources/beta/threads/threads.py | 80 +++ tests/api_resources/beta/threads/test_runs.py | 2 + 8 files changed, 610 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 7f053e5429..5264026dc9 100644 --- a/README.md +++ b/README.md @@ -51,12 +51,30 @@ we recommend using [python-dotenv](https://pypi.org/project/python-dotenv/) to add `OPENAI_API_KEY="My API Key"` to your `.env` file so that your API Key is not stored in source control. +### Polling Helpers + +When interacting with the API some actions such as starting a Run may take time to complete. The SDK includes +helper functions which will poll the status until it reaches a terminal state and then return the resulting object. +If an API method results in an action which could benefit from polling there will be a corresponding version of the +method ending in '\_and_poll'. + +For instance to create a Run and poll until it reaches a terminal state you can run: + +```python +run = client.beta.threads.runs.create_and_poll( + thread_id=thread.id, + assistant_id=assistant.id, +) +``` + +More information on the lifecycle of a Run can be found in the [Run Lifecycle Documentation](https://platform.openai.com/docs/assistants/how-it-works/run-lifecycle) + ### Streaming Helpers The SDK also includes helpers to process streams and handle the incoming events. ```python -with client.beta.threads.runs.create_and_stream( +with client.beta.threads.runs.stream( thread_id=thread.id, assistant_id=assistant.id, instructions="Please address the user as Jane Doe. The user has a premium account.", diff --git a/api.md b/api.md index 29392cff13..dbc95cd0b4 100644 --- a/api.md +++ b/api.md @@ -230,6 +230,7 @@ Methods: - client.beta.threads.update(thread_id, \*\*params) -> Thread - client.beta.threads.delete(thread_id) -> ThreadDeleted - client.beta.threads.create_and_run(\*\*params) -> Run +- client.beta.threads.create_and_run_poll(\*args) -> Run - client.beta.threads.create_and_run_stream(\*args) -> AssistantStreamManager[AssistantEventHandler] | AssistantStreamManager[AssistantEventHandlerT] ### Runs @@ -248,7 +249,11 @@ Methods: - client.beta.threads.runs.list(thread_id, \*\*params) -> SyncCursorPage[Run] - client.beta.threads.runs.cancel(run_id, \*, thread_id) -> Run - client.beta.threads.runs.submit_tool_outputs(run_id, \*, thread_id, \*\*params) -> Run +- client.beta.threads.runs.create_and_poll(\*args) -> Run - client.beta.threads.runs.create_and_stream(\*args) -> AssistantStreamManager[AssistantEventHandler] | AssistantStreamManager[AssistantEventHandlerT] +- client.beta.threads.runs.poll(\*args) -> Run +- client.beta.threads.runs.stream(\*args) -> AssistantStreamManager[AssistantEventHandler] | AssistantStreamManager[AssistantEventHandlerT] +- client.beta.threads.runs.submit_tool_outputs_and_poll(\*args) -> Run - client.beta.threads.runs.submit_tool_outputs_stream(\*args) -> AssistantStreamManager[AssistantEventHandler] | AssistantStreamManager[AssistantEventHandlerT] #### Steps diff --git a/examples/assistant.py b/examples/assistant.py index c5fbb82a3a..0631494ecc 100644 --- a/examples/assistant.py +++ b/examples/assistant.py @@ -1,4 +1,3 @@ -import time import openai @@ -20,28 +19,20 @@ content="I need to solve the equation `3x + 11 = 14`. Can you help me?", ) -run = client.beta.threads.runs.create( +run = client.beta.threads.runs.create_and_poll( thread_id=thread.id, assistant_id=assistant.id, instructions="Please address the user as Jane Doe. The user has a premium account.", ) -print("checking assistant status. ") -while True: - run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id) +print("Run completed with status: " + run.status) - if run.status == "completed": - print("done!") - messages = client.beta.threads.messages.list(thread_id=thread.id) +if run.status == "completed": + messages = client.beta.threads.messages.list(thread_id=thread.id) - print("messages: ") - for message in messages: - assert message.content[0].type == "text" - print({"role": message.role, "message": message.content[0].text.value}) + print("messages: ") + for message in messages: + assert message.content[0].type == "text" + print({"role": message.role, "message": message.content[0].text.value}) - client.beta.assistants.delete(assistant.id) - - break - else: - print("in progress...") - time.sleep(5) + client.beta.assistants.delete(assistant.id) diff --git a/examples/assistant_stream_helpers.py b/examples/assistant_stream_helpers.py index 6c2aae0b46..7baec77c72 100644 --- a/examples/assistant_stream_helpers.py +++ b/examples/assistant_stream_helpers.py @@ -63,7 +63,7 @@ def main() -> None: ) print(f"Question: {question}\n") - with client.beta.threads.runs.create_and_stream( + with client.beta.threads.runs.stream( thread_id=thread.id, assistant_id=assistant.id, instructions="Please address the user as Jane Doe. The user has a premium account.", diff --git a/helpers.md b/helpers.md index fed20ee81c..4271cd9ede 100644 --- a/helpers.md +++ b/helpers.md @@ -46,11 +46,11 @@ class EventHandler(AssistantEventHandler): if output.type == "logs": print(f"\n{output.logs}", flush=True) -# Then, we use the `create_and_stream` SDK helper +# Then, we use the `stream` SDK helper # with the `EventHandler` class to create the Run # and stream the response. -with client.beta.threads.runs.create_and_stream( +with client.beta.threads.runs.stream( thread_id="thread_id", assistant_id="assistant_id", event_handler=EventHandler(), @@ -63,7 +63,7 @@ with client.beta.threads.runs.create_and_stream( You can also iterate over all the streamed events. ```python -with client.beta.threads.runs.create_and_stream( +with client.beta.threads.runs.stream( thread_id=thread.id, assistant_id=assistant.id ) as stream: @@ -78,7 +78,7 @@ with client.beta.threads.runs.create_and_stream( You can also iterate over just the text deltas received ```python -with client.beta.threads.runs.create_and_stream( +with client.beta.threads.runs.stream( thread_id=thread.id, assistant_id=assistant.id ) as stream: @@ -91,7 +91,7 @@ with client.beta.threads.runs.create_and_stream( There are three helper methods for creating streams: ```python -client.beta.threads.runs.create_and_stream() +client.beta.threads.runs.stream() ``` This method can be used to start and stream the response to an existing run with an associated thread diff --git a/src/openai/resources/beta/threads/runs/runs.py b/src/openai/resources/beta/threads/runs/runs.py index ab39a96a8d..4529c65025 100644 --- a/src/openai/resources/beta/threads/runs/runs.py +++ b/src/openai/resources/beta/threads/runs/runs.py @@ -2,6 +2,8 @@ from __future__ import annotations +import time +import typing_extensions from typing import Iterable, Optional, overload from functools import partial from typing_extensions import Literal @@ -19,6 +21,7 @@ ) from ....._types import NOT_GIVEN, Body, Query, Headers, NotGiven from ....._utils import ( + is_given, required_args, maybe_transform, async_maybe_transform, @@ -497,7 +500,58 @@ def cancel( cast_to=Run, ) + def create_and_poll( + self, + *, + assistant_id: str, + additional_instructions: Optional[str] | NotGiven = NOT_GIVEN, + instructions: Optional[str] | NotGiven = NOT_GIVEN, + metadata: Optional[object] | NotGiven = NOT_GIVEN, + model: Optional[str] | NotGiven = NOT_GIVEN, + temperature: Optional[float] | NotGiven = NOT_GIVEN, + tools: Optional[Iterable[AssistantToolParam]] | NotGiven = NOT_GIVEN, + poll_interval_ms: int | NotGiven = NOT_GIVEN, + thread_id: str, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> Run: + """ + A helper to create a run an poll for a terminal state. More information on Run + lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps + """ + run = self.create( + thread_id=thread_id, + assistant_id=assistant_id, + additional_instructions=additional_instructions, + instructions=instructions, + metadata=metadata, + model=model, + temperature=temperature, + # We assume we are not streaming when polling + stream=False, + tools=tools, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + ) + return self.poll( + run.id, + thread_id=thread_id, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + poll_interval_ms=poll_interval_ms, + timeout=timeout, + ) + @overload + @typing_extensions.deprecated("use `stream` instead") def create_and_stream( self, *, @@ -520,6 +574,7 @@ def create_and_stream( ... @overload + @typing_extensions.deprecated("use `stream` instead") def create_and_stream( self, *, @@ -542,6 +597,7 @@ def create_and_stream( """Create a Run stream""" ... + @typing_extensions.deprecated("use `stream` instead") def create_and_stream( self, *, @@ -596,6 +652,150 @@ def create_and_stream( ) return AssistantStreamManager(make_request, event_handler=event_handler or AssistantEventHandler()) + def poll( + self, + run_id: str, + thread_id: str, + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + poll_interval_ms: int | NotGiven = NOT_GIVEN, + ) -> Run: + """ + A helper to poll a run status until it reaches a terminal state. More + information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps + """ + extra_headers = {"X-Stainless-Poll-Helper": "true", **(extra_headers or {})} + + if is_given(poll_interval_ms): + extra_headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms) + + terminal_states = {"requires_action", "cancelled", "completed", "failed", "expired"} + while True: + response = self.with_raw_response.retrieve( + thread_id=thread_id, + run_id=run_id, + extra_headers=extra_headers, + extra_body=extra_body, + extra_query=extra_query, + timeout=timeout, + ) + + run = response.parse() + # Return if we reached a terminal state + if run.status in terminal_states: + return run + + if not is_given(poll_interval_ms): + from_header = response.headers.get("openai-poll-after-ms") + if from_header is not None: + poll_interval_ms = int(from_header) + else: + poll_interval_ms = 1000 + + time.sleep(poll_interval_ms / 1000) + + @overload + def stream( + self, + *, + assistant_id: str, + additional_instructions: Optional[str] | NotGiven = NOT_GIVEN, + instructions: Optional[str] | NotGiven = NOT_GIVEN, + metadata: Optional[object] | NotGiven = NOT_GIVEN, + model: Optional[str] | NotGiven = NOT_GIVEN, + temperature: Optional[float] | NotGiven = NOT_GIVEN, + tools: Optional[Iterable[AssistantToolParam]] | NotGiven = NOT_GIVEN, + thread_id: str, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> AssistantStreamManager[AssistantEventHandler]: + """Create a Run stream""" + ... + + @overload + def stream( + self, + *, + assistant_id: str, + additional_instructions: Optional[str] | NotGiven = NOT_GIVEN, + instructions: Optional[str] | NotGiven = NOT_GIVEN, + metadata: Optional[object] | NotGiven = NOT_GIVEN, + model: Optional[str] | NotGiven = NOT_GIVEN, + temperature: Optional[float] | NotGiven = NOT_GIVEN, + tools: Optional[Iterable[AssistantToolParam]] | NotGiven = NOT_GIVEN, + thread_id: str, + event_handler: AssistantEventHandlerT, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> AssistantStreamManager[AssistantEventHandlerT]: + """Create a Run stream""" + ... + + def stream( + self, + *, + assistant_id: str, + additional_instructions: Optional[str] | NotGiven = NOT_GIVEN, + instructions: Optional[str] | NotGiven = NOT_GIVEN, + metadata: Optional[object] | NotGiven = NOT_GIVEN, + model: Optional[str] | NotGiven = NOT_GIVEN, + temperature: Optional[float] | NotGiven = NOT_GIVEN, + tools: Optional[Iterable[AssistantToolParam]] | NotGiven = NOT_GIVEN, + thread_id: str, + event_handler: AssistantEventHandlerT | None = None, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> AssistantStreamManager[AssistantEventHandler] | AssistantStreamManager[AssistantEventHandlerT]: + """Create a Run stream""" + if not thread_id: + raise ValueError(f"Expected a non-empty value for `thread_id` but received {thread_id!r}") + + extra_headers = { + "OpenAI-Beta": "assistants=v1", + "X-Stainless-Stream-Helper": "threads.runs.create_and_stream", + "X-Stainless-Custom-Event-Handler": "true" if event_handler else "false", + **(extra_headers or {}), + } + make_request = partial( + self._post, + f"/threads/{thread_id}/runs", + body=maybe_transform( + { + "assistant_id": assistant_id, + "additional_instructions": additional_instructions, + "instructions": instructions, + "metadata": metadata, + "model": model, + "temperature": temperature, + "stream": True, + "tools": tools, + }, + run_create_params.RunCreateParams, + ), + options=make_request_options( + extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout + ), + cast_to=Run, + stream=True, + stream_cls=Stream[AssistantStreamEvent], + ) + return AssistantStreamManager(make_request, event_handler=event_handler or AssistantEventHandler()) + @overload def submit_tool_outputs( self, @@ -747,6 +947,45 @@ def submit_tool_outputs( stream_cls=Stream[AssistantStreamEvent], ) + def submit_tool_outputs_and_poll( + self, + *, + tool_outputs: Iterable[run_submit_tool_outputs_params.ToolOutput], + run_id: str, + thread_id: str, + poll_interval_ms: int | NotGiven = NOT_GIVEN, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> Run: + """ + A helper to submit a tool output to a run and poll for a terminal run state. + More information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps + """ + run = self.submit_tool_outputs( + run_id=run_id, + thread_id=thread_id, + tool_outputs=tool_outputs, + stream=False, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + ) + return self.poll( + run_id=run.id, + thread_id=thread_id, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + poll_interval_ms=poll_interval_ms, + ) + @overload def submit_tool_outputs_stream( self, @@ -763,7 +1002,8 @@ def submit_tool_outputs_stream( ) -> AssistantStreamManager[AssistantEventHandler]: """ Submit the tool outputs from a previous run and stream the run to a terminal - state. + state. More information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps """ ... @@ -784,7 +1024,8 @@ def submit_tool_outputs_stream( ) -> AssistantStreamManager[AssistantEventHandlerT]: """ Submit the tool outputs from a previous run and stream the run to a terminal - state. + state. More information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps """ ... @@ -804,7 +1045,8 @@ def submit_tool_outputs_stream( ) -> AssistantStreamManager[AssistantEventHandler] | AssistantStreamManager[AssistantEventHandlerT]: """ Submit the tool outputs from a previous run and stream the run to a terminal - state. + state. More information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps """ if not run_id: raise ValueError(f"Expected a non-empty value for `run_id` but received {run_id!r}") @@ -1283,7 +1525,58 @@ async def cancel( cast_to=Run, ) + async def create_and_poll( + self, + *, + assistant_id: str, + additional_instructions: Optional[str] | NotGiven = NOT_GIVEN, + instructions: Optional[str] | NotGiven = NOT_GIVEN, + metadata: Optional[object] | NotGiven = NOT_GIVEN, + model: Optional[str] | NotGiven = NOT_GIVEN, + temperature: Optional[float] | NotGiven = NOT_GIVEN, + tools: Optional[Iterable[AssistantToolParam]] | NotGiven = NOT_GIVEN, + poll_interval_ms: int | NotGiven = NOT_GIVEN, + thread_id: str, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> Run: + """ + A helper to create a run an poll for a terminal state. More information on Run + lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps + """ + run = await self.create( + thread_id=thread_id, + assistant_id=assistant_id, + additional_instructions=additional_instructions, + instructions=instructions, + metadata=metadata, + model=model, + temperature=temperature, + # We assume we are not streaming when polling + stream=False, + tools=tools, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + ) + return await self.poll( + run.id, + thread_id=thread_id, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + poll_interval_ms=poll_interval_ms, + timeout=timeout, + ) + @overload + @typing_extensions.deprecated("use `stream` instead") def create_and_stream( self, *, @@ -1306,6 +1599,7 @@ def create_and_stream( ... @overload + @typing_extensions.deprecated("use `stream` instead") def create_and_stream( self, *, @@ -1328,6 +1622,7 @@ def create_and_stream( """Create a Run stream""" ... + @typing_extensions.deprecated("use `stream` instead") def create_and_stream( self, *, @@ -1384,6 +1679,152 @@ def create_and_stream( ) return AsyncAssistantStreamManager(request, event_handler=event_handler or AsyncAssistantEventHandler()) + async def poll( + self, + run_id: str, + thread_id: str, + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + poll_interval_ms: int | NotGiven = NOT_GIVEN, + ) -> Run: + """ + A helper to poll a run status until it reaches a terminal state. More + information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps + """ + extra_headers = {"X-Stainless-Poll-Helper": "true", **(extra_headers or {})} + + if is_given(poll_interval_ms): + extra_headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms) + + terminal_states = {"requires_action", "cancelled", "completed", "failed", "expired"} + while True: + response = await self.with_raw_response.retrieve( + thread_id=thread_id, + run_id=run_id, + extra_headers=extra_headers, + extra_body=extra_body, + extra_query=extra_query, + timeout=timeout, + ) + + run = response.parse() + # Return if we reached a terminal state + if run.status in terminal_states: + return run + + if not is_given(poll_interval_ms): + from_header = response.headers.get("openai-poll-after-ms") + if from_header is not None: + poll_interval_ms = int(from_header) + else: + poll_interval_ms = 1000 + + time.sleep(poll_interval_ms / 1000) + + @overload + def stream( + self, + *, + assistant_id: str, + additional_instructions: Optional[str] | NotGiven = NOT_GIVEN, + instructions: Optional[str] | NotGiven = NOT_GIVEN, + metadata: Optional[object] | NotGiven = NOT_GIVEN, + model: Optional[str] | NotGiven = NOT_GIVEN, + temperature: Optional[float] | NotGiven = NOT_GIVEN, + tools: Optional[Iterable[AssistantToolParam]] | NotGiven = NOT_GIVEN, + thread_id: str, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> AsyncAssistantStreamManager[AsyncAssistantEventHandler]: + """Create a Run stream""" + ... + + @overload + def stream( + self, + *, + assistant_id: str, + additional_instructions: Optional[str] | NotGiven = NOT_GIVEN, + instructions: Optional[str] | NotGiven = NOT_GIVEN, + metadata: Optional[object] | NotGiven = NOT_GIVEN, + model: Optional[str] | NotGiven = NOT_GIVEN, + temperature: Optional[float] | NotGiven = NOT_GIVEN, + tools: Optional[Iterable[AssistantToolParam]] | NotGiven = NOT_GIVEN, + thread_id: str, + event_handler: AsyncAssistantEventHandlerT, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> AsyncAssistantStreamManager[AsyncAssistantEventHandlerT]: + """Create a Run stream""" + ... + + def stream( + self, + *, + assistant_id: str, + additional_instructions: Optional[str] | NotGiven = NOT_GIVEN, + instructions: Optional[str] | NotGiven = NOT_GIVEN, + metadata: Optional[object] | NotGiven = NOT_GIVEN, + model: Optional[str] | NotGiven = NOT_GIVEN, + temperature: Optional[float] | NotGiven = NOT_GIVEN, + tools: Optional[Iterable[AssistantToolParam]] | NotGiven = NOT_GIVEN, + thread_id: str, + event_handler: AsyncAssistantEventHandlerT | None = None, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> ( + AsyncAssistantStreamManager[AsyncAssistantEventHandler] + | AsyncAssistantStreamManager[AsyncAssistantEventHandlerT] + ): + """Create a Run stream""" + if not thread_id: + raise ValueError(f"Expected a non-empty value for `thread_id` but received {thread_id!r}") + + extra_headers = { + "OpenAI-Beta": "assistants=v1", + "X-Stainless-Stream-Helper": "threads.runs.create_and_stream", + "X-Stainless-Custom-Event-Handler": "true" if event_handler else "false", + **(extra_headers or {}), + } + request = self._post( + f"/threads/{thread_id}/runs", + body=maybe_transform( + { + "assistant_id": assistant_id, + "additional_instructions": additional_instructions, + "instructions": instructions, + "metadata": metadata, + "model": model, + "temperature": temperature, + "stream": True, + "tools": tools, + }, + run_create_params.RunCreateParams, + ), + options=make_request_options( + extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout + ), + cast_to=Run, + stream=True, + stream_cls=AsyncStream[AssistantStreamEvent], + ) + return AsyncAssistantStreamManager(request, event_handler=event_handler or AsyncAssistantEventHandler()) + @overload async def submit_tool_outputs( self, @@ -1535,6 +1976,45 @@ async def submit_tool_outputs( stream_cls=AsyncStream[AssistantStreamEvent], ) + async def submit_tool_outputs_and_poll( + self, + *, + tool_outputs: Iterable[run_submit_tool_outputs_params.ToolOutput], + run_id: str, + thread_id: str, + poll_interval_ms: int | NotGiven = NOT_GIVEN, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> Run: + """ + A helper to submit a tool output to a run and poll for a terminal run state. + More information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps + """ + run = await self.submit_tool_outputs( + run_id=run_id, + thread_id=thread_id, + tool_outputs=tool_outputs, + stream=False, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + ) + return await self.poll( + run_id=run.id, + thread_id=thread_id, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + poll_interval_ms=poll_interval_ms, + ) + @overload def submit_tool_outputs_stream( self, @@ -1551,7 +2031,8 @@ def submit_tool_outputs_stream( ) -> AsyncAssistantStreamManager[AsyncAssistantEventHandler]: """ Submit the tool outputs from a previous run and stream the run to a terminal - state. + state. More information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps """ ... @@ -1572,7 +2053,8 @@ def submit_tool_outputs_stream( ) -> AsyncAssistantStreamManager[AsyncAssistantEventHandlerT]: """ Submit the tool outputs from a previous run and stream the run to a terminal - state. + state. More information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps """ ... @@ -1595,7 +2077,8 @@ def submit_tool_outputs_stream( ): """ Submit the tool outputs from a previous run and stream the run to a terminal - state. + state. More information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps """ if not run_id: raise ValueError(f"Expected a non-empty value for `run_id` but received {run_id!r}") diff --git a/src/openai/resources/beta/threads/threads.py b/src/openai/resources/beta/threads/threads.py index c2ad6aca5f..3509267d4f 100644 --- a/src/openai/resources/beta/threads/threads.py +++ b/src/openai/resources/beta/threads/threads.py @@ -467,6 +467,45 @@ def create_and_run( stream_cls=Stream[AssistantStreamEvent], ) + def create_and_run_poll( + self, + *, + assistant_id: str, + instructions: Optional[str] | NotGiven = NOT_GIVEN, + metadata: Optional[object] | NotGiven = NOT_GIVEN, + model: Optional[str] | NotGiven = NOT_GIVEN, + temperature: Optional[float] | NotGiven = NOT_GIVEN, + thread: thread_create_and_run_params.Thread | NotGiven = NOT_GIVEN, + tools: Optional[Iterable[thread_create_and_run_params.Tool]] | NotGiven = NOT_GIVEN, + poll_interval_ms: int | NotGiven = NOT_GIVEN, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> Run: + """ + A helper to create a thread, start a run and then poll for a terminal state. + More information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps + """ + run = self.create_and_run( + assistant_id=assistant_id, + instructions=instructions, + metadata=metadata, + model=model, + temperature=temperature, + stream=False, + thread=thread, + tools=tools, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + ) + return self.runs.poll(run.id, run.thread_id, extra_headers, extra_query, extra_body, timeout, poll_interval_ms) + @overload def create_and_run_stream( self, @@ -967,6 +1006,47 @@ async def create_and_run( stream_cls=AsyncStream[AssistantStreamEvent], ) + async def create_and_run_poll( + self, + *, + assistant_id: str, + instructions: Optional[str] | NotGiven = NOT_GIVEN, + metadata: Optional[object] | NotGiven = NOT_GIVEN, + model: Optional[str] | NotGiven = NOT_GIVEN, + temperature: Optional[float] | NotGiven = NOT_GIVEN, + thread: thread_create_and_run_params.Thread | NotGiven = NOT_GIVEN, + tools: Optional[Iterable[thread_create_and_run_params.Tool]] | NotGiven = NOT_GIVEN, + poll_interval_ms: int | NotGiven = NOT_GIVEN, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> Run: + """ + A helper to create a thread, start a run and then poll for a terminal state. + More information on Run lifecycles can be found here: + https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps + """ + run = await self.create_and_run( + assistant_id=assistant_id, + instructions=instructions, + metadata=metadata, + model=model, + temperature=temperature, + stream=False, + thread=thread, + tools=tools, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=timeout, + ) + return await self.runs.poll( + run.id, run.thread_id, extra_headers, extra_query, extra_body, timeout, poll_interval_ms + ) + @overload def create_and_run_stream( self, diff --git a/tests/api_resources/beta/threads/test_runs.py b/tests/api_resources/beta/threads/test_runs.py index aabe2c7fc9..b9f392dc87 100644 --- a/tests/api_resources/beta/threads/test_runs.py +++ b/tests/api_resources/beta/threads/test_runs.py @@ -14,6 +14,8 @@ Run, ) +# pyright: reportDeprecated=false + base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010")