Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Limited support for programmatically running PAPIv≥2.14 #12970

Merged
merged 33 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d0b71f8
Minor improvements to async_context_manager_in_thread() and its tests.
SyntaxColoring Jun 16, 2023
4896dea
Move async_context_manager_in_thread() to opentrons.utils.
SyntaxColoring Jun 16, 2023
a2683be
Collect async_context_manager_in_thread() tests into a class.
SyntaxColoring Jun 16, 2023
91b7b22
Move protocol_engine_in_thread() to opentrons.protocol_engine.
SyntaxColoring Jun 16, 2023
685b801
Consistently take HardwareControlAPI interface.
SyntaxColoring Jun 16, 2023
6b5abf3
Consistently take a full Config instead of trying to guess what the c…
SyntaxColoring Jun 16, 2023
d2745aa
Add a hacky copy_file_like() helper.
SyntaxColoring Jun 29, 2023
ef6fe3e
¯\_(ツ)_/¯
SyntaxColoring Jun 27, 2023
b8eca37
Fix JSON protocols not being able to use any labware.
SyntaxColoring Jul 7, 2023
2dc868c
Fix Python protocols not including Jupyter labware by default.
SyntaxColoring Jul 7, 2023
81a37f9
Minor style fixups and comment clarifications.
SyntaxColoring Jul 7, 2023
61b6763
Bundled data is supported, actually.
SyntaxColoring Jul 7, 2023
56f2c76
Fix confusing error message.
SyntaxColoring Jul 10, 2023
9693d85
Merge branch 'edge' into pe_based_execute
SyntaxColoring Jul 11, 2023
6e14e77
Don't attempt to pass bundled_labware in PAPIv2.14+.
SyntaxColoring Jul 11, 2023
fa47468
Fix: Don't forget to add custom labware to PAPIv2.14+ live contexts.
SyntaxColoring Jul 11, 2023
d6ee2fb
Fix ^D hanging in a REPL after get_protocol_api("2.15").
SyntaxColoring Jul 11, 2023
e9c59d8
Simplify the copy_file_like() hack by trying less.
SyntaxColoring Jul 11, 2023
8754ec7
Ugh.
SyntaxColoring Jul 11, 2023
fcc62c1
Test fixup: Write encoding should match read encoding.
SyntaxColoring Jul 12, 2023
6729d49
Test fixup: Use dummy binary data that's more obviously dummy & binary.
SyntaxColoring Jul 12, 2023
8d6139b
Test fixup: Rename test file to match source.
SyntaxColoring Jul 12, 2023
50a74b2
Test fixup: Remove unused param.
SyntaxColoring Jul 12, 2023
433e9d2
Merge branch 'edge' into pe_based_execute
SyntaxColoring Jul 14, 2023
d49dbe8
Merge branch 'edge' into pe_based_execute
SyntaxColoring Jul 18, 2023
8fe755c
Enable Protocol Engine in test parametrization.
SyntaxColoring Jul 14, 2023
1b35fbf
Undo ":raises Exception:" docs addition.
SyntaxColoring Jul 14, 2023
372c456
Fix errors not being reported.
SyntaxColoring Jul 14, 2023
913d4de
Give up even harder on encodings, I guess.
SyntaxColoring Jul 20, 2023
91e6bd3
Merge branch 'edge' into pe_based_execute
SyntaxColoring Jul 20, 2023
9b25d8e
Don't test with emoji that can't be encoded by Windows' default encod…
SyntaxColoring Jul 24, 2023
4369f9b
Perhaps.
SyntaxColoring Jul 25, 2023
6543c2c
Explain yoself.
SyntaxColoring Jul 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
457 changes: 388 additions & 69 deletions api/src/opentrons/execute.py

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion api/src/opentrons/protocol_engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
The main interface is the `ProtocolEngine` class.
"""

from .create_protocol_engine import create_protocol_engine
from .create_protocol_engine import (
create_protocol_engine,
create_protocol_engine_in_thread,
)
from .protocol_engine import ProtocolEngine
from .errors import ProtocolEngineError, ErrorOccurrence
from .commands import (
Expand Down Expand Up @@ -55,6 +58,7 @@
__all__ = [
# main factory and interface exports
"create_protocol_engine",
"create_protocol_engine_in_thread",
"ProtocolEngine",
"StateSummary",
"Config",
Expand Down
61 changes: 59 additions & 2 deletions api/src/opentrons/protocol_engine/create_protocol_engine.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
"""Main ProtocolEngine factory."""
import asyncio
import contextlib
import typing

from opentrons.hardware_control import HardwareControlAPI
from opentrons.hardware_control.types import DoorState
from opentrons.protocol_engine.resources.module_data_provider import ModuleDataProvider
from opentrons.util.async_helpers import async_context_manager_in_thread

from .protocol_engine import ProtocolEngine
from .resources import DeckDataProvider
from .resources import DeckDataProvider, ModuleDataProvider
from .state import Config, StateStore


# TODO(mm, 2023-06-16): Arguably, this not being a context manager makes us prone to forgetting to
# clean it up properly, especially in tests. See e.g. https://opentrons.atlassian.net/browse/RSS-222
async def create_protocol_engine(
hardware_api: HardwareControlAPI,
config: Config,
Expand All @@ -32,3 +38,54 @@ async def create_protocol_engine(
)

return ProtocolEngine(state_store=state_store, hardware_api=hardware_api)


@contextlib.contextmanager
def create_protocol_engine_in_thread(
hardware_api: HardwareControlAPI,
config: Config,
drop_tips_and_home_after: bool,
) -> typing.Generator[
typing.Tuple[ProtocolEngine, asyncio.AbstractEventLoop], None, None
]:
"""Run a `ProtocolEngine` in a worker thread.

When this context manager is entered, it:

1. Starts a worker thread.
2. Starts an asyncio event loop in that worker thread.
3. Creates and `.play()`s a `ProtocolEngine` in that event loop.
4. Returns the `ProtocolEngine` and the event loop.
Use functions like `asyncio.run_coroutine_threadsafe()` to safely interact with
the `ProtocolEngine` from your thread.

When this context manager is exited, it:

1. Cleans up the `ProtocolEngine`.
2. Stops and cleans up the event loop.
3. Joins the thread.
"""
with async_context_manager_in_thread(
_protocol_engine(hardware_api, config, drop_tips_and_home_after)
) as (
protocol_engine,
loop,
):
yield protocol_engine, loop


@contextlib.asynccontextmanager
async def _protocol_engine(
hardware_api: HardwareControlAPI,
config: Config,
drop_tips_and_home_after: bool,
) -> typing.AsyncGenerator[ProtocolEngine, None]:
protocol_engine = await create_protocol_engine(
hardware_api=hardware_api,
config=config,
)
try:
protocol_engine.play()
yield protocol_engine
finally:
await protocol_engine.finish(drop_tips_and_home=drop_tips_and_home_after)
114 changes: 110 additions & 4 deletions api/src/opentrons/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@
"""

from functools import wraps
from typing import TypeVar, Callable, Awaitable, cast, Any
from threading import Thread
from typing import (
Any,
AsyncContextManager,
Awaitable,
Callable,
Generator,
Tuple,
TypeVar,
cast,
)

import asyncio
import contextlib
import queue


async def asyncio_yield() -> None:
Expand Down Expand Up @@ -36,10 +48,10 @@ async def and await call() that still effectively "block" other concurrent tasks
await asyncio.sleep(0)


Wrapped = TypeVar("Wrapped", bound=Callable[..., Awaitable[Any]])
_Wrapped = TypeVar("_Wrapped", bound=Callable[..., Awaitable[Any]])


def ensure_yield(async_def_func: Wrapped) -> Wrapped:
def ensure_yield(async_def_func: _Wrapped) -> _Wrapped:
"""
A decorator that makes sure that asyncio_yield() is called after the decorated async
function finishes executing.
Expand All @@ -57,4 +69,98 @@ async def _wrapper(*args: Any, **kwargs: Any) -> Any:
await asyncio_yield()
return ret

return cast(Wrapped, _wrapper)
return cast(_Wrapped, _wrapper)


_ContextManagerResult = TypeVar("_ContextManagerResult")


@contextlib.contextmanager
def async_context_manager_in_thread(
async_context_manager: AsyncContextManager[_ContextManagerResult],
) -> Generator[Tuple[_ContextManagerResult, asyncio.AbstractEventLoop], None, None]:
"""Enter an async context manager in a worker thread.

When you enter this context manager, it:

1. Spawns a worker thread.
2. In that thread, starts an asyncio event loop.
3. In that event loop, enters the context manager that you passed in.
4. Returns: the result of entering that context manager, and the running event loop.
Use functions like `asyncio.run_coroutine_threadsafe()` to safely interact
with the returned object from your thread.

When you exit this context manager, it:

1. In the worker thread's event loop, exits the context manager that you passed in.
2. Stops and cleans up the worker thread's event loop.
3. Joins the worker thread.
"""
with _run_loop_in_thread() as loop_in_thread:
async_object = asyncio.run_coroutine_threadsafe(
async_context_manager.__aenter__(),
loop=loop_in_thread,
).result()

try:
yield async_object, loop_in_thread

finally:
exit = asyncio.run_coroutine_threadsafe(
async_context_manager.__aexit__(None, None, None),
loop=loop_in_thread,
)
exit.result()


@contextlib.contextmanager
def _run_loop_in_thread() -> Generator[asyncio.AbstractEventLoop, None, None]:
"""Run an event loop in a worker thread.

Entering this context manager spawns a thread, starts an asyncio event loop in it,
and returns that loop.

Exiting this context manager stops and cleans up the event loop, and then joins the thread.
"""
loop_mailbox: "queue.SimpleQueue[asyncio.AbstractEventLoop]" = queue.SimpleQueue()

def _in_thread() -> None:
loop = asyncio.new_event_loop()

# We assume that the lines above this will never fail,
# so we will always reach this point to unblock the parent thread.
loop_mailbox.put(loop)

loop.run_forever()

# If we've reached here, the loop has been stopped from outside this thread. Clean it up.
#
# This cleanup is naive because asyncio makes it difficult and confusing to get it right.
# Compare this with asyncio.run()'s cleanup, which:
#
# * Cancels and awaits any remaining tasks
# (according to the source code--this seems undocumented)
# * Shuts down asynchronous generators
# (see asyncio.shutdown_asyncgens())
# * Shuts down the default thread pool executor
# (see https://bugs.python.org/issue34037 and asyncio.shutdown_default_executor())
#
# In Python >=3.11, we should rewrite this to use asyncio.Runner,
# which can take care of these nuances for us.
loop.close()

thread = Thread(
target=_in_thread,
name=f"{__name__} event loop thread",
# This is a load-bearing daemon=True. It avoids @atexit-related deadlocks when this is used
# by opentrons.execute and cleaned up by opentrons.execute's @atexit handler.
# https://github.com/Opentrons/opentrons/pull/12970#issuecomment-1648243785
daemon=True,
)
thread.start()
loop_in_thread = loop_mailbox.get()
try:
yield loop_in_thread
finally:
loop_in_thread.call_soon_threadsafe(loop_in_thread.stop)
thread.join()
37 changes: 36 additions & 1 deletion api/src/opentrons/util/entrypoint_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import logging
from json import JSONDecodeError
import pathlib
from typing import Dict, Sequence, Union, TYPE_CHECKING
import shutil
from typing import BinaryIO, Dict, Sequence, TextIO, Union, TYPE_CHECKING

from jsonschema import ValidationError # type: ignore

Expand Down Expand Up @@ -83,3 +84,37 @@ def datafiles_from_paths(paths: Sequence[Union[str, pathlib.Path]]) -> Dict[str,
else:
log.info(f"ignoring {child} in data path")
return datafiles


# HACK(mm, 2023-06-29): This function is attempting to do something fundamentally wrong.
# Remove it when we fix https://opentrons.atlassian.net/browse/RSS-281.
def copy_file_like(source: Union[BinaryIO, TextIO], destination: pathlib.Path) -> None:
"""Copy a file-like object to a path.

Limitations:
If `source` is text, the new file's encoding may not correctly match its original encoding.
This can matter if it's a Python file and it has an encoding declaration
(https://docs.python.org/3.7/reference/lexical_analysis.html#encoding-declarations).
Also, its newlines may get translated.
"""
# When we read from the source stream, will it give us bytes, or text?
try:
# Experimentally, this is present (but possibly None) on text-mode streams,
# and not present on binary-mode streams.
getattr(source, "encoding")
except AttributeError:
source_is_text = False
else:
source_is_text = True

if source_is_text:
destination_mode = "wt"
else:
destination_mode = "wb"

with open(
destination,
mode=destination_mode,
) as destination_file:
# Use copyfileobj() to limit memory usage.
shutil.copyfileobj(fsrc=source, fdst=destination_file)
98 changes: 0 additions & 98 deletions api/tests/opentrons/async_context_manager_in_thread.py

This file was deleted.

Loading