diff --git a/demo/runners/support/agent.py b/demo/runners/support/agent.py index 06d1765eb0..c6c91e5b57 100644 --- a/demo/runners/support/agent.py +++ b/demo/runners/support/agent.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor import asyncpg import base64 import functools @@ -185,6 +186,7 @@ def __init__( self.params = params self.proc = None self.client_session: ClientSession = ClientSession() + self.thread_pool_executor = ThreadPoolExecutor(20) if self.endorser_role and self.endorser_role == "author": seed = None @@ -695,13 +697,13 @@ def _process(self, args, env, loop): close_fds=True, ) loop.run_in_executor( - None, + self.thread_pool_executor, output_reader, proc.stdout, functools.partial(self.handle_output, source="stdout"), ) loop.run_in_executor( - None, + self.thread_pool_executor, output_reader, proc.stderr, functools.partial(self.handle_output, source="stderr"), @@ -726,7 +728,9 @@ async def start_process(self, python_path: str = None, wait: bool = True): # start agent sub-process loop = asyncio.get_event_loop() - future = loop.run_in_executor(None, self._process, agent_args, my_env, loop) + future = loop.run_in_executor( + self.thread_pool_executor, self._process, agent_args, my_env, loop + ) self.proc = await asyncio.wait_for(future, 20, loop=loop) if wait: await asyncio.sleep(1.0) @@ -753,7 +757,7 @@ async def terminate(self): # now shut down the agent loop = asyncio.get_event_loop() if self.proc: - future = loop.run_in_executor(None, self._terminate) + future = loop.run_in_executor(self.thread_pool_executor, self._terminate) result = await asyncio.wait_for(future, 10, loop=loop) async def listen_webhooks(self, webhook_port):