-
Notifications
You must be signed in to change notification settings - Fork 180
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test(api): Configure ctx fixture as PAPIv2.14 when it's configured as…
… an OT-3 (#12567)
- Loading branch information
1 parent
c594cf2
commit e39d251
Showing
9 changed files
with
338 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
"""A test helper to enter an async context manager in a worker thread.""" | ||
|
||
from __future__ import annotations | ||
|
||
import asyncio | ||
import contextlib | ||
import queue | ||
import typing | ||
|
||
from concurrent.futures import ThreadPoolExecutor | ||
|
||
|
||
_T = typing.TypeVar("_T") | ||
|
||
|
||
@contextlib.contextmanager | ||
def async_context_manager_in_thread( | ||
async_context_manager: typing.AsyncContextManager[_T], | ||
) -> typing.Generator[typing.Tuple[_T, asyncio.AbstractEventLoop], None, None]: | ||
"""Enter an async context manager in a worker thread. | ||
When you enter this context manager, it: | ||
1. Spawns a worker thread. | ||
2. In that thread, starts an asyncio event loop. | ||
3. In that event loop, enters the context manager that you passed in. | ||
4. Returns: the result of entering that context manager, and the running event loop. | ||
Use functions like `asyncio.run_coroutine_threadsafe()` to safely interact | ||
with the returned object from your thread. | ||
When you exit this context manager, it: | ||
1. In the worker thread's event loop, exits the context manager that you passed in. | ||
2. Stops and cleans up the worker thread's event loop. | ||
3. Joins the worker thread. | ||
""" | ||
with _run_loop_in_thread() as loop_in_thread: | ||
async_object = asyncio.run_coroutine_threadsafe( | ||
async_context_manager.__aenter__(), | ||
loop=loop_in_thread, | ||
).result() | ||
|
||
try: | ||
yield async_object, loop_in_thread | ||
|
||
finally: | ||
exit = asyncio.run_coroutine_threadsafe( | ||
async_context_manager.__aexit__(None, None, None), | ||
loop=loop_in_thread, | ||
) | ||
exit.result() | ||
|
||
|
||
@contextlib.contextmanager | ||
def _run_loop_in_thread() -> typing.Generator[asyncio.AbstractEventLoop, None, None]: | ||
"""Run an event loop in a worker thread. | ||
Entering this context manager spawns a thread, starts an asyncio event loop in it, | ||
and returns that loop. | ||
Exiting this context manager stops and cleans up the event loop, and then joins the thread. | ||
""" | ||
loop_queue: "queue.SimpleQueue[asyncio.AbstractEventLoop]" = queue.SimpleQueue() | ||
|
||
def _in_thread() -> None: | ||
loop = asyncio.new_event_loop() | ||
|
||
# We assume that the lines above this will never fail, | ||
# so we will always reach this point to unblock the parent thread. | ||
loop_queue.put(loop) | ||
|
||
loop.run_forever() | ||
|
||
# If we've reached here, the loop has been stopped from outside this thread. Clean it up. | ||
# | ||
# This cleanup is naive because asyncio makes it difficult and confusing to get it right. | ||
# Compare this with asyncio.run()'s cleanup, which: | ||
# | ||
# * Cancels and awaits any remaining tasks | ||
# (according to the source code--this seems undocumented) | ||
# * Shuts down asynchronous generators | ||
# (see asyncio.shutdown_asyncgens()) | ||
# * Shuts down the default thread pool executor | ||
# (see https://bugs.python.org/issue34037 and asyncio.shutdown_default_executor()) | ||
# | ||
# In Python >=3.11, we should rewrite this to use asyncio.Runner, | ||
# which can take care of these nuances for us. | ||
loop.close() | ||
|
||
with ThreadPoolExecutor(max_workers=1) as executor: | ||
executor.submit(_in_thread) | ||
|
||
loop_in_thread = loop_queue.get() | ||
|
||
try: | ||
yield loop_in_thread | ||
finally: | ||
loop_in_thread.call_soon_threadsafe(loop_in_thread.stop) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
"""Run a `ProtocolEngine` in a worker thread.""" | ||
|
||
import asyncio | ||
import contextlib | ||
import typing | ||
|
||
from opentrons.hardware_control import ThreadManagedHardware | ||
from opentrons.protocol_engine import create_protocol_engine, ProtocolEngine, Config | ||
|
||
from .async_context_manager_in_thread import async_context_manager_in_thread | ||
|
||
|
||
@contextlib.contextmanager | ||
def protocol_engine_in_thread( | ||
hardware: ThreadManagedHardware, | ||
) -> typing.Generator[ | ||
typing.Tuple[ProtocolEngine, asyncio.AbstractEventLoop], None, None | ||
]: | ||
"""Run a `ProtocolEngine` in a worker thread. | ||
When this context manager is entered, it: | ||
1. Starts a worker thread. | ||
2. Starts an asyncio event loop in that worker thread. | ||
3. Creates and `.play()`s a `ProtocolEngine` in that event loop. | ||
4. Returns the `ProtocolEngine` and the event loop. | ||
Use functions like `asyncio.run_coroutine_threadsafe()` to safely interact with | ||
the `ProtocolEngine` from your thread. | ||
When this context manager is exited, it: | ||
1. Cleans up the `ProtocolEngine`. | ||
2. Stops and cleans up the event loop. | ||
3. Joins the thread. | ||
""" | ||
with async_context_manager_in_thread(_protocol_engine(hardware)) as ( | ||
protocol_engine, | ||
loop, | ||
): | ||
yield protocol_engine, loop | ||
|
||
|
||
@contextlib.asynccontextmanager | ||
async def _protocol_engine( | ||
hardware: ThreadManagedHardware, | ||
) -> typing.AsyncGenerator[ProtocolEngine, None]: | ||
protocol_engine = await create_protocol_engine( | ||
hardware_api=hardware.wrapped(), | ||
config=Config( | ||
robot_type="OT-3 Standard", | ||
ignore_pause=True, | ||
use_virtual_pipettes=True, | ||
use_virtual_modules=True, | ||
use_virtual_gripper=True, | ||
block_on_door_open=False, | ||
), | ||
) | ||
try: | ||
protocol_engine.play() | ||
yield protocol_engine | ||
finally: | ||
await protocol_engine.finish() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.