diff --git a/core/src/ten_runtime/binding/python/interface/ten/async_ten_env.py b/core/src/ten_runtime/binding/python/interface/ten/async_ten_env.py index 799abea65a..14e00252ee 100644 --- a/core/src/ten_runtime/binding/python/interface/ten/async_ten_env.py +++ b/core/src/ten_runtime/binding/python/interface/ten/async_ten_env.py @@ -10,6 +10,7 @@ from .cmd import Cmd from .cmd_result import CmdResult from .ten_env import TenEnv +from typing import AsyncGenerator class AsyncTenEnv(TenEnv): @@ -25,25 +26,38 @@ def __init__( def __del__(self) -> None: pass - async def send_cmd(self, cmd: Cmd) -> CmdResult: - q = asyncio.Queue(1) + async def send_cmd(self, cmd: Cmd) -> AsyncGenerator[CmdResult, None]: + q = asyncio.Queue(maxsize=10) self._internal.send_cmd( cmd, - lambda ten_env, result: asyncio.run_coroutine_threadsafe( + lambda _, result: asyncio.run_coroutine_threadsafe( q.put(result), self._ten_loop - ), # type: ignore + ), ) - return await q.get() - async def send_json(self, json_str: str) -> CmdResult: - q = asyncio.Queue(1) + while True: + result: CmdResult = await q.get() + if result.is_completed(): + yield result + # This is the final result, so break the while loop. + break + yield result + + async def send_json(self, json_str: str) -> AsyncGenerator[CmdResult, None]: + q = asyncio.Queue(maxsize=10) self._internal.send_json( json_str, lambda ten_env, result: asyncio.run_coroutine_threadsafe( q.put(result), self._ten_loop - ), # type: ignore + ), ) - return await q.get() + while True: + result: CmdResult = await q.get() + if result.is_completed(): + yield result + # This is the final result, so break the while loop. + break + yield result def _deinit_routine(self) -> None: # Wait for the internal thread to finish. diff --git a/packages/example_extensions/aio_http_server_python/main.py b/packages/example_extensions/aio_http_server_python/main.py index a6e11e17b9..b3441258e7 100644 --- a/packages/example_extensions/aio_http_server_python/main.py +++ b/packages/example_extensions/aio_http_server_python/main.py @@ -41,7 +41,9 @@ async def default_handler(self, request: web_request.Request): '{"_ten":{"type":"close_app",' '"dest":[{"app":"localhost"}]}}' ) - asyncio.create_task(self.ten_env.send_json(close_app_cmd_json)) + asyncio.create_task( + anext(self.ten_env.send_json(close_app_cmd_json)) + ) return web.Response(status=200, text="OK") elif "name" in data["_ten"]: # Send the command to the TEN runtime. @@ -53,7 +55,7 @@ async def default_handler(self, request: web_request.Request): if cmd is None: return web.Response(status=400, text="Bad request") - cmd_result = await self.ten_env.send_cmd(cmd) + cmd_result = await anext(self.ten_env.send_cmd(cmd)) else: return web.Response(status=404, text="Not found") diff --git a/tests/ten_runtime/integration/python/async_extension_basic_python/async_extension_basic_python_app/ten_packages/extension/default_async_extension_python/extension.py b/tests/ten_runtime/integration/python/async_extension_basic_python/async_extension_basic_python_app/ten_packages/extension/default_async_extension_python/extension.py index d6741075dc..3ee59bccb9 100644 --- a/tests/ten_runtime/integration/python/async_extension_basic_python/async_extension_basic_python_app/ten_packages/extension/default_async_extension_python/extension.py +++ b/tests/ten_runtime/integration/python/async_extension_basic_python/async_extension_basic_python_app/ten_packages/extension/default_async_extension_python/extension.py @@ -48,7 +48,7 @@ async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: # Send a new command to other extensions and wait for the result. The # result will be returned to the original sender. new_cmd = Cmd.create("hello") - cmd_result = await ten_env.send_cmd(new_cmd) + cmd_result = await anext(ten_env.send_cmd(new_cmd)) ten_env.return_result(cmd_result, cmd) async def on_stop(self, ten_env: AsyncTenEnv) -> None: diff --git a/tests/ten_runtime/integration/python/async_io_basic_python/async_io_basic_python_app/ten_packages/extension/default_extension_python/extension.py b/tests/ten_runtime/integration/python/async_io_basic_python/async_io_basic_python_app/ten_packages/extension/default_extension_python/extension.py index 914ba86fa1..731062862b 100644 --- a/tests/ten_runtime/integration/python/async_io_basic_python/async_io_basic_python_app/ten_packages/extension/default_extension_python/extension.py +++ b/tests/ten_runtime/integration/python/async_io_basic_python/async_io_basic_python_app/ten_packages/extension/default_extension_python/extension.py @@ -33,7 +33,7 @@ async def stop_thread(self): async def send_cmd_async(self, ten_env: TenEnv, cmd: Cmd) -> CmdResult: print("DefaultExtension send_cmd_async") - q = asyncio.Queue(1) + q = asyncio.Queue(maxsize=10) ten_env.send_cmd( cmd, lambda ten_env, result: asyncio.run_coroutine_threadsafe( diff --git a/tests/ten_runtime/integration/python/go_app_async_extension_python/go_app_async_extension_python_app/ten_packages/extension/default_extension_python/extension.py b/tests/ten_runtime/integration/python/go_app_async_extension_python/go_app_async_extension_python_app/ten_packages/extension/default_extension_python/extension.py index 0508b37919..e9acdfc597 100644 --- a/tests/ten_runtime/integration/python/go_app_async_extension_python/go_app_async_extension_python_app/ten_packages/extension/default_extension_python/extension.py +++ b/tests/ten_runtime/integration/python/go_app_async_extension_python/go_app_async_extension_python_app/ten_packages/extension/default_extension_python/extension.py @@ -51,9 +51,7 @@ async def greeting(self, ten_env: AsyncTenEnv) -> CmdResult: await asyncio.sleep(1) new_cmd = Cmd.create("greeting") - return await ten_env.send_cmd( - new_cmd, - ) + return await anext(ten_env.send_cmd(new_cmd)) async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: cmd_json = cmd.to_json() @@ -66,7 +64,7 @@ async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: await asyncio.sleep(0.5) - result = await ten_env.send_cmd(new_cmd) + result = await anext(ten_env.send_cmd(new_cmd)) statusCode = result.get_status_code() detail = result.get_property_string("detail") diff --git a/tests/ten_runtime/integration/python/two_async_exts_one_group_python/two_async_exts_one_group_python_app/ten_packages/extension/default_extension_python/extension.py b/tests/ten_runtime/integration/python/two_async_exts_one_group_python/two_async_exts_one_group_python_app/ten_packages/extension/default_extension_python/extension.py index b1c8e767cb..fd9e5dd409 100644 --- a/tests/ten_runtime/integration/python/two_async_exts_one_group_python/two_async_exts_one_group_python_app/ten_packages/extension/default_extension_python/extension.py +++ b/tests/ten_runtime/integration/python/two_async_exts_one_group_python/two_async_exts_one_group_python_app/ten_packages/extension/default_extension_python/extension.py @@ -41,7 +41,7 @@ async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: # Send a new command to other extensions and wait for the result. The # result will be returned to the original sender. new_cmd = Cmd.create("hello") - cmd_result = await ten_env.send_cmd(new_cmd) + cmd_result = await anext(ten_env.send_cmd(new_cmd)) ten_env.return_result(cmd_result, cmd) async def on_stop(self, ten_env: AsyncTenEnv) -> None: diff --git a/tests/ten_runtime/integration/python/two_async_exts_python/two_async_exts_python_app/ten_packages/extension/default_extension_python/extension.py b/tests/ten_runtime/integration/python/two_async_exts_python/two_async_exts_python_app/ten_packages/extension/default_extension_python/extension.py index 84ce80c070..637eda5945 100644 --- a/tests/ten_runtime/integration/python/two_async_exts_python/two_async_exts_python_app/ten_packages/extension/default_extension_python/extension.py +++ b/tests/ten_runtime/integration/python/two_async_exts_python/two_async_exts_python_app/ten_packages/extension/default_extension_python/extension.py @@ -42,7 +42,7 @@ async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: # Send a new command to other extensions and wait for the result. The # result will be returned to the original sender. new_cmd = Cmd.create("hello") - cmd_result = await ten_env.send_cmd(new_cmd) + cmd_result = await anext(ten_env.send_cmd(new_cmd)) ten_env.return_result(cmd_result, cmd) async def on_stop(self, ten_env: AsyncTenEnv) -> None: