diff --git a/pyproject.toml b/pyproject.toml index 31cdc0a93a..67c7070a12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,7 @@ dev-dependencies = [ "dirty-equals>=0.6.0", "importlib-metadata>=6.7.0", "rich>=13.7.1", + "nest_asyncio==1.6.0" ] [tool.rye.scripts] diff --git a/requirements-dev.lock b/requirements-dev.lock index 64600eb215..930a286174 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -51,6 +51,7 @@ mdurl==0.1.2 mypy==1.13.0 mypy-extensions==1.0.0 # via mypy +nest-asyncio==1.6.0 nodeenv==1.8.0 # via pyright nox==2023.4.22 diff --git a/src/openai/_utils/_sync.py b/src/openai/_utils/_sync.py index d0d810337e..8b3aaf2b5d 100644 --- a/src/openai/_utils/_sync.py +++ b/src/openai/_utils/_sync.py @@ -1,56 +1,62 @@ from __future__ import annotations +import sys +import asyncio import functools -from typing import TypeVar, Callable, Awaitable +import contextvars +from typing import Any, TypeVar, Callable, Awaitable from typing_extensions import ParamSpec -import anyio -import anyio.to_thread - -from ._reflection import function_has_argument - T_Retval = TypeVar("T_Retval") T_ParamSpec = ParamSpec("T_ParamSpec") -# copied from `asyncer`, https://github.com/tiangolo/asyncer -def asyncify( - function: Callable[T_ParamSpec, T_Retval], - *, - cancellable: bool = False, - limiter: anyio.CapacityLimiter | None = None, -) -> Callable[T_ParamSpec, Awaitable[T_Retval]]: +if sys.version_info >= (3, 9): + to_thread = asyncio.to_thread +else: + # backport of https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread + # for Python 3.8 support + async def to_thread( + func: Callable[T_ParamSpec, T_Retval], /, *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs + ) -> Any: + """Asynchronously run function *func* in a separate thread. + + Any *args and **kwargs supplied for this function are directly passed + to *func*. Also, the current :class:`contextvars.Context` is propagated, + allowing context variables from the main thread to be accessed in the + separate thread. + + Returns a coroutine that can be awaited to get the eventual result of *func*. + """ + loop = asyncio.events.get_running_loop() + ctx = contextvars.copy_context() + func_call = functools.partial(ctx.run, func, *args, **kwargs) + return await loop.run_in_executor(None, func_call) + + +# inspired by `asyncer`, https://github.com/tiangolo/asyncer +def asyncify(function: Callable[T_ParamSpec, T_Retval]) -> Callable[T_ParamSpec, Awaitable[T_Retval]]: """ Take a blocking function and create an async one that receives the same - positional and keyword arguments, and that when called, calls the original function - in a worker thread using `anyio.to_thread.run_sync()`. Internally, - `asyncer.asyncify()` uses the same `anyio.to_thread.run_sync()`, but it supports - keyword arguments additional to positional arguments and it adds better support for - autocompletion and inline errors for the arguments of the function called and the - return value. - - If the `cancellable` option is enabled and the task waiting for its completion is - cancelled, the thread will still run its course but its return value (or any raised - exception) will be ignored. + positional and keyword arguments. For python version 3.9 and above, it uses + asyncio.to_thread to run the function in a separate thread. For python version + 3.8, it uses locally defined copy of the asyncio.to_thread function which was + introduced in python 3.9. - Use it like this: + Usage: - ```Python - def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str: - # Do work - return "Some result" + ```python + def blocking_func(arg1, arg2, kwarg1=None): + # blocking code + return result - result = await to_thread.asyncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b") - print(result) + result = asyncify(blocking_function)(arg1, arg2, kwarg1=value1) ``` ## Arguments `function`: a blocking regular callable (e.g. a function) - `cancellable`: `True` to allow cancellation of the operation - `limiter`: capacity limiter to use to limit the total amount of threads running - (if omitted, the default limiter is used) ## Return @@ -60,22 +66,6 @@ def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str: """ async def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: - partial_f = functools.partial(function, *args, **kwargs) - - # In `v4.1.0` anyio added the `abandon_on_cancel` argument and deprecated the old - # `cancellable` argument, so we need to use the new `abandon_on_cancel` to avoid - # surfacing deprecation warnings. - if function_has_argument(anyio.to_thread.run_sync, "abandon_on_cancel"): - return await anyio.to_thread.run_sync( - partial_f, - abandon_on_cancel=cancellable, - limiter=limiter, - ) - - return await anyio.to_thread.run_sync( - partial_f, - cancellable=cancellable, - limiter=limiter, - ) + return await to_thread(function, *args, **kwargs) return wrapper diff --git a/tests/test_client.py b/tests/test_client.py index 7ea2ab38d1..7caa8cb319 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -4,11 +4,14 @@ import gc import os +import sys import json import asyncio import inspect +import subprocess import tracemalloc from typing import Any, Union, cast +from textwrap import dedent from unittest import mock from typing_extensions import Literal @@ -1766,3 +1769,38 @@ def retry_handler(_request: httpx.Request) -> httpx.Response: ) as response: assert response.retries_taken == failures_before_success assert int(response.http_request.headers.get("x-stainless-retry-count")) == failures_before_success + + def test_get_platform(self) -> None: + # A previous implementation of asyncify could leave threads unterminated when + # used with nest_asyncio. + # + # Since nest_asyncio.apply() is global and cannot be un-applied, this + # test is run in a separate process to avoid affecting other tests. + test_code = dedent(""" + import asyncio + import nest_asyncio + import threading + + from openai._utils import asyncify + from openai._base_client import get_platform + + async def test_main() -> None: + result = await asyncify(get_platform)() + print(result) + for thread in threading.enumerate(): + print(thread.name) + + nest_asyncio.apply() + asyncio.run(test_main()) + """) + with subprocess.Popen( + [sys.executable, "-c", test_code], + text=True, + ) as process: + try: + process.wait(2) + if process.returncode: + raise AssertionError("calling get_platform using asyncify resulted in a non-zero exit code") + except subprocess.TimeoutExpired as e: + process.kill() + raise AssertionError("calling get_platform using asyncify resulted in a hung process") from e