Skip to content

Commit

Permalink
feat(api): Limited support for programmatically running PAPIv≥2.14 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
SyntaxColoring authored and caila-marashaj committed Jul 26, 2023
1 parent 7da9f74 commit d78d500
Show file tree
Hide file tree
Showing 12 changed files with 842 additions and 379 deletions.
457 changes: 388 additions & 69 deletions api/src/opentrons/execute.py

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion api/src/opentrons/protocol_engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
The main interface is the `ProtocolEngine` class.
"""

from .create_protocol_engine import create_protocol_engine
from .create_protocol_engine import (
create_protocol_engine,
create_protocol_engine_in_thread,
)
from .protocol_engine import ProtocolEngine
from .errors import ProtocolEngineError, ErrorOccurrence
from .commands import (
Expand Down Expand Up @@ -55,6 +58,7 @@
__all__ = [
# main factory and interface exports
"create_protocol_engine",
"create_protocol_engine_in_thread",
"ProtocolEngine",
"StateSummary",
"Config",
Expand Down
61 changes: 59 additions & 2 deletions api/src/opentrons/protocol_engine/create_protocol_engine.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
"""Main ProtocolEngine factory."""
import asyncio
import contextlib
import typing

from opentrons.hardware_control import HardwareControlAPI
from opentrons.hardware_control.types import DoorState
from opentrons.protocol_engine.resources.module_data_provider import ModuleDataProvider
from opentrons.util.async_helpers import async_context_manager_in_thread

from .protocol_engine import ProtocolEngine
from .resources import DeckDataProvider
from .resources import DeckDataProvider, ModuleDataProvider
from .state import Config, StateStore


# TODO(mm, 2023-06-16): Arguably, this not being a context manager makes us prone to forgetting to
# clean it up properly, especially in tests. See e.g. https://opentrons.atlassian.net/browse/RSS-222
async def create_protocol_engine(
hardware_api: HardwareControlAPI,
config: Config,
Expand All @@ -32,3 +38,54 @@ async def create_protocol_engine(
)

return ProtocolEngine(state_store=state_store, hardware_api=hardware_api)


@contextlib.contextmanager
def create_protocol_engine_in_thread(
hardware_api: HardwareControlAPI,
config: Config,
drop_tips_and_home_after: bool,
) -> typing.Generator[
typing.Tuple[ProtocolEngine, asyncio.AbstractEventLoop], None, None
]:
"""Run a `ProtocolEngine` in a worker thread.
When this context manager is entered, it:
1. Starts a worker thread.
2. Starts an asyncio event loop in that worker thread.
3. Creates and `.play()`s a `ProtocolEngine` in that event loop.
4. Returns the `ProtocolEngine` and the event loop.
Use functions like `asyncio.run_coroutine_threadsafe()` to safely interact with
the `ProtocolEngine` from your thread.
When this context manager is exited, it:
1. Cleans up the `ProtocolEngine`.
2. Stops and cleans up the event loop.
3. Joins the thread.
"""
with async_context_manager_in_thread(
_protocol_engine(hardware_api, config, drop_tips_and_home_after)
) as (
protocol_engine,
loop,
):
yield protocol_engine, loop


@contextlib.asynccontextmanager
async def _protocol_engine(
hardware_api: HardwareControlAPI,
config: Config,
drop_tips_and_home_after: bool,
) -> typing.AsyncGenerator[ProtocolEngine, None]:
protocol_engine = await create_protocol_engine(
hardware_api=hardware_api,
config=config,
)
try:
protocol_engine.play()
yield protocol_engine
finally:
await protocol_engine.finish(drop_tips_and_home=drop_tips_and_home_after)
114 changes: 110 additions & 4 deletions api/src/opentrons/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@
"""

from functools import wraps
from typing import TypeVar, Callable, Awaitable, cast, Any
from threading import Thread
from typing import (
Any,
AsyncContextManager,
Awaitable,
Callable,
Generator,
Tuple,
TypeVar,
cast,
)

import asyncio
import contextlib
import queue


async def asyncio_yield() -> None:
Expand Down Expand Up @@ -36,10 +48,10 @@ async def and await call() that still effectively "block" other concurrent tasks
await asyncio.sleep(0)


Wrapped = TypeVar("Wrapped", bound=Callable[..., Awaitable[Any]])
_Wrapped = TypeVar("_Wrapped", bound=Callable[..., Awaitable[Any]])


def ensure_yield(async_def_func: Wrapped) -> Wrapped:
def ensure_yield(async_def_func: _Wrapped) -> _Wrapped:
"""
A decorator that makes sure that asyncio_yield() is called after the decorated async
function finishes executing.
Expand All @@ -57,4 +69,98 @@ async def _wrapper(*args: Any, **kwargs: Any) -> Any:
await asyncio_yield()
return ret

return cast(Wrapped, _wrapper)
return cast(_Wrapped, _wrapper)


_ContextManagerResult = TypeVar("_ContextManagerResult")


@contextlib.contextmanager
def async_context_manager_in_thread(
async_context_manager: AsyncContextManager[_ContextManagerResult],
) -> Generator[Tuple[_ContextManagerResult, asyncio.AbstractEventLoop], None, None]:
"""Enter an async context manager in a worker thread.
When you enter this context manager, it:
1. Spawns a worker thread.
2. In that thread, starts an asyncio event loop.
3. In that event loop, enters the context manager that you passed in.
4. Returns: the result of entering that context manager, and the running event loop.
Use functions like `asyncio.run_coroutine_threadsafe()` to safely interact
with the returned object from your thread.
When you exit this context manager, it:
1. In the worker thread's event loop, exits the context manager that you passed in.
2. Stops and cleans up the worker thread's event loop.
3. Joins the worker thread.
"""
with _run_loop_in_thread() as loop_in_thread:
async_object = asyncio.run_coroutine_threadsafe(
async_context_manager.__aenter__(),
loop=loop_in_thread,
).result()

try:
yield async_object, loop_in_thread

finally:
exit = asyncio.run_coroutine_threadsafe(
async_context_manager.__aexit__(None, None, None),
loop=loop_in_thread,
)
exit.result()


@contextlib.contextmanager
def _run_loop_in_thread() -> Generator[asyncio.AbstractEventLoop, None, None]:
"""Run an event loop in a worker thread.
Entering this context manager spawns a thread, starts an asyncio event loop in it,
and returns that loop.
Exiting this context manager stops and cleans up the event loop, and then joins the thread.
"""
loop_mailbox: "queue.SimpleQueue[asyncio.AbstractEventLoop]" = queue.SimpleQueue()

def _in_thread() -> None:
loop = asyncio.new_event_loop()

# We assume that the lines above this will never fail,
# so we will always reach this point to unblock the parent thread.
loop_mailbox.put(loop)

loop.run_forever()

# If we've reached here, the loop has been stopped from outside this thread. Clean it up.
#
# This cleanup is naive because asyncio makes it difficult and confusing to get it right.
# Compare this with asyncio.run()'s cleanup, which:
#
# * Cancels and awaits any remaining tasks
# (according to the source code--this seems undocumented)
# * Shuts down asynchronous generators
# (see asyncio.shutdown_asyncgens())
# * Shuts down the default thread pool executor
# (see https://bugs.python.org/issue34037 and asyncio.shutdown_default_executor())
#
# In Python >=3.11, we should rewrite this to use asyncio.Runner,
# which can take care of these nuances for us.
loop.close()

thread = Thread(
target=_in_thread,
name=f"{__name__} event loop thread",
# This is a load-bearing daemon=True. It avoids @atexit-related deadlocks when this is used
# by opentrons.execute and cleaned up by opentrons.execute's @atexit handler.
# https://github.com/Opentrons/opentrons/pull/12970#issuecomment-1648243785
daemon=True,
)
thread.start()
loop_in_thread = loop_mailbox.get()
try:
yield loop_in_thread
finally:
loop_in_thread.call_soon_threadsafe(loop_in_thread.stop)
thread.join()
37 changes: 36 additions & 1 deletion api/src/opentrons/util/entrypoint_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import logging
from json import JSONDecodeError
import pathlib
from typing import Dict, Sequence, Union, TYPE_CHECKING
import shutil
from typing import BinaryIO, Dict, Sequence, TextIO, Union, TYPE_CHECKING

from jsonschema import ValidationError # type: ignore

Expand Down Expand Up @@ -83,3 +84,37 @@ def datafiles_from_paths(paths: Sequence[Union[str, pathlib.Path]]) -> Dict[str,
else:
log.info(f"ignoring {child} in data path")
return datafiles


# HACK(mm, 2023-06-29): This function is attempting to do something fundamentally wrong.
# Remove it when we fix https://opentrons.atlassian.net/browse/RSS-281.
def copy_file_like(source: Union[BinaryIO, TextIO], destination: pathlib.Path) -> None:
"""Copy a file-like object to a path.
Limitations:
If `source` is text, the new file's encoding may not correctly match its original encoding.
This can matter if it's a Python file and it has an encoding declaration
(https://docs.python.org/3.7/reference/lexical_analysis.html#encoding-declarations).
Also, its newlines may get translated.
"""
# When we read from the source stream, will it give us bytes, or text?
try:
# Experimentally, this is present (but possibly None) on text-mode streams,
# and not present on binary-mode streams.
getattr(source, "encoding")
except AttributeError:
source_is_text = False
else:
source_is_text = True

if source_is_text:
destination_mode = "wt"
else:
destination_mode = "wb"

with open(
destination,
mode=destination_mode,
) as destination_file:
# Use copyfileobj() to limit memory usage.
shutil.copyfileobj(fsrc=source, fdst=destination_file)
98 changes: 0 additions & 98 deletions api/tests/opentrons/async_context_manager_in_thread.py

This file was deleted.

Loading

0 comments on commit d78d500

Please sign in to comment.