Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(api): Do not wait for the event loop to be free when reporting APIv2 commands #9238

Merged
merged 24 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
bbd6bb2
Add `ThreadAsyncQueue`.
SyntaxColoring Jan 11, 2022
ba05ee0
Add hint to `ProtocolEngine.stop()` docstring.
SyntaxColoring Jan 14, 2022
8578f21
Delete obsolete `dispatch_threadsafe()`.
SyntaxColoring Jan 14, 2022
00a0df1
Make setup and teardown async in calling code.
SyntaxColoring Jan 14, 2022
97bb7aa
Use a queue to pass actions to the ProtocolEngine.
SyntaxColoring Jan 14, 2022
5cbcae1
Clarify comment in `legacy_wrappers`.
SyntaxColoring Jan 14, 2022
6f6a68a
WIP: Update tests and convert more stuff to async?
SyntaxColoring Jan 14, 2022
cc10f9c
More async fixups.
SyntaxColoring Jan 18, 2022
bca11bd
Add todo for making `add_plugin` synchronous.
SyntaxColoring Jan 18, 2022
2f3fb6c
Lint.
SyntaxColoring Jan 18, 2022
5c4566e
Add missing await.
SyntaxColoring Jan 18, 2022
7044dea
More test fixups.
SyntaxColoring Jan 18, 2022
9e1f889
Add perf debug logs.
SyntaxColoring Jan 18, 2022
731befe
Add more debug logs.
SyntaxColoring Jan 19, 2022
de14507
Revert debug logs.
SyntaxColoring Jan 20, 2022
2a836b8
Use async iteration to simplify consuming from the queue.
SyntaxColoring Jan 20, 2022
ebe737c
Minor comment, docstring, and naming revisions.
SyntaxColoring Jan 20, 2022
9c73d5e
Delete `.putting()` in favor of making the object itself a context ma…
SyntaxColoring Jan 20, 2022
5fb3c06
Simplify `get_async_until_closed()`.
SyntaxColoring Jan 21, 2022
7f94b1b
Add missing docstrings.
SyntaxColoring Jan 21, 2022
ac85811
Simplify docstrings.
SyntaxColoring Jan 21, 2022
bb61b51
Make setup, loading, etc. non-async again.
SyntaxColoring Jan 21, 2022
ca8cf1e
Siimplify `LegacyContextPlugin` tests at the cost of duplication.
SyntaxColoring Jan 21, 2022
873eeba
Delete spurious `# todo`.
SyntaxColoring Jan 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions api/src/opentrons/protocol_engine/plugins.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Protocol engine plugin interface."""
from __future__ import annotations
from abc import ABC, abstractmethod
from anyio import from_thread
from typing import List
from typing_extensions import final

Expand Down Expand Up @@ -37,25 +36,11 @@ def dispatch(self, action: Action) -> None:
"""
return self._action_dispatcher.dispatch(action)

@final
def dispatch_threadsafe(self, action: Action) -> None:
"""Dispatch an action into the action pipeline from a child thread.

Child thread must be created with `anyio.to_thread`.

Arguments:
action: A new ProtocolEngine action to send into the pipeline.
This action will flow through all plugins, including
this one, so be careful to avoid infinite loops. In general,
do not dispatch an action your plugin will react to.
"""
return from_thread.run_sync(self._action_dispatcher.dispatch, action)

def setup(self) -> None:
"""Run any necessary setup steps prior to plugin usage."""
...

def teardown(self) -> None:
async def teardown(self) -> None:
"""Run any necessary teardown steps once the engine is stopped."""
...

Expand Down Expand Up @@ -111,9 +96,9 @@ def start(self, plugin: AbstractPlugin) -> None:
self._action_dispatcher.add_handler(plugin)
plugin.setup()

def stop(self) -> None:
async def stop(self) -> None:
"""Stop any configured plugins."""
for p in self._plugins:
p.teardown()
await p.teardown()
Comment on lines -114 to +102
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the easy way out by changing plugin.teardown() and plugin_starter.stop() to be async. I haven't totally thought this through yet, but in another PR, I'd like to refactor this to make it easier to follow the lifetime of the LegacyContextPlugin.

Currently, cleaning up the LegacyContextPlugin works like this:

  1. The protocol thread exits, either gracefully or by exception.
  2. ProtocolRunner calls ProtocolEngine.finish().
  3. ProtocolEngine.finish() calls PluginStarter.stop().
  4. PluginStarter.stop() calls .teardown() on all plugins, which is currently just the LegacyContextPlugin.

But all LegacyContextPlugin really cares about is:

  • That the protocol thread has exited, so it knows there are no more events coming in.
  • That the ProtocolEngine is still open, so it knows it can still dispatch actions into the ProtocolEngine as it drains its ThreadAsyncQueue.

So in principle, we shouldn't need to involve ProtocolEngine.finish(). And I think involving ProtocolEngine.finish() makes things a little more fragile.


self._plugins = []
5 changes: 4 additions & 1 deletion api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ async def stop(self) -> None:
"""Stop execution immediately, halting all motion and cancelling future commands.

After an engine has been `stop`'ed, it cannot be restarted.

After a `stop`, you must still call `finish` to give the engine a chance
to clean up resources and propagate errors.
"""
self._state_store.commands.raise_if_stop_requested()
self._action_dispatcher.dispatch(StopAction())
Expand Down Expand Up @@ -193,7 +196,7 @@ async def finish(
finally:
await self._hardware_stopper.do_stop_and_recover(drop_tips_and_home)
self._action_dispatcher.dispatch(HardwareStoppedAction())
self._plugin_starter.stop()
await self._plugin_starter.stop()

def add_labware_offset(self, request: LabwareOffsetCreate) -> LabwareOffset:
"""Add a new labware offset and return it.
Expand Down
166 changes: 114 additions & 52 deletions api/src/opentrons/protocol_runner/legacy_context_plugin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Customize the ProtocolEngine to monitor and control legacy (APIv2) protocols."""
from __future__ import annotations
from typing import Callable, Optional, NamedTuple

from asyncio import create_task, Task
from contextlib import ExitStack
from typing import Optional

from opentrons.commands.types import CommandMessage as LegacyCommand
from opentrons.hardware_control import HardwareControlAPI
Expand All @@ -14,19 +17,11 @@
LegacyModuleLoadInfo,
)
from .legacy_command_mapper import LegacyCommandMapper


class ContextUnsubscribe(NamedTuple):
"""Unsubscribe functions for broker messages."""

command_broker: Callable[[], None]
labware_broker: Callable[[], None]
pipette_broker: Callable[[], None]
module_broker: Callable[[], None]
from .thread_async_queue import ThreadAsyncQueue


class LegacyContextPlugin(AbstractPlugin):
"""A ProtocolEngine plugin wrapping a legacy ProtocolContext.
"""A ProtocolEngine plugin to monitor and control an APIv2 protocol.

In the legacy ProtocolContext, protocol execution is accomplished
by direct communication with the HardwareControlAPI, as opposed to an
Expand Down Expand Up @@ -54,40 +49,81 @@ def __init__(
self._hardware_api = hardware_api
self._protocol_context = protocol_context
self._legacy_command_mapper = legacy_command_mapper or LegacyCommandMapper()
self._unsubscribe: Optional[ContextUnsubscribe] = None

def setup(self) -> None:
"""Set up subscriptions to the context's message brokers."""
context = self._protocol_context

command_unsubscribe = context.broker.subscribe(
topic="command",
handler=self._dispatch_legacy_command,
)
labware_unsubscribe = context.labware_load_broker.subscribe(
callback=self._dispatch_labware_loaded
)
pipette_unsubscribe = context.instrument_load_broker.subscribe(
callback=self._dispatch_instrument_loaded
)
module_unsubscribe = context.module_load_broker.subscribe(
callback=self._dispatch_module_loaded
)
# We use a non-blocking queue to communicate activity
# from the APIv2 protocol, which is running in its own thread,
# to the ProtocolEngine, which is running in the main thread's async event loop.
#
# The queue being non-blocking lets the protocol communicate its activity
# instantly *even if the event loop is currently occupied by something else.*
# Various things can accidentally occupy the event loop for too long.
# So if the protocol had to wait for the event loop to be free
# every time it reported some activity,
# it could visibly stall for a moment, making its motion jittery.
self._actions_to_dispatch = ThreadAsyncQueue[pe_actions.Action]()
self._action_dispatching_task: Optional[Task[None]] = None

self._subscription_exit_stack: Optional[ExitStack] = None

self._unsubscribe = ContextUnsubscribe(
command_broker=command_unsubscribe,
labware_broker=labware_unsubscribe,
pipette_broker=pipette_unsubscribe,
module_broker=module_unsubscribe,
)
def setup(self) -> None:
"""Set up the plugin.

def teardown(self) -> None:
"""Unsubscribe from the context's message brokers."""
if self._unsubscribe:
for unsubscribe in self._unsubscribe:
unsubscribe()
* Subscribe to the APIv2 context's message brokers to be informed
of the APIv2 protocol's activity.
* Kick off a background task to inform Protocol Engine of that activity.
"""
context = self._protocol_context

self._unsubcribe = None
# Subscribe to activity on the APIv2 context,
# and arrange to unsubscribe when this plugin is torn down.
# Use an exit stack so if any part of this setup fails,
# we clean up the parts that succeeded in reverse order.
with ExitStack() as exit_stack:
command_unsubscribe = context.broker.subscribe(
topic="command",
handler=self._handle_legacy_command,
)
exit_stack.callback(command_unsubscribe)

labware_unsubscribe = context.labware_load_broker.subscribe(
callback=self._handle_labware_loaded
)
exit_stack.callback(labware_unsubscribe)

pipette_unsubscribe = context.instrument_load_broker.subscribe(
callback=self._handle_instrument_loaded
)
exit_stack.callback(pipette_unsubscribe)

module_unsubscribe = context.module_load_broker.subscribe(
callback=self._handle_module_loaded
)
exit_stack.callback(module_unsubscribe)

# All subscriptions succeeded.
# Save the exit stack so our teardown method can use it later
# to clean up these subscriptions.
self._subscription_exit_stack = exit_stack.pop_all()

# Kick off a background task to report activity to the ProtocolEngine.
self._action_dispatching_task = create_task(self._dispatch_all_actions())

async def teardown(self) -> None:
"""Tear down the plugin, undoing the work done in `setup()`.

Called by Protocol Engine.
At this point, the APIv2 protocol script must have exited.
"""
# todo:
self._actions_to_dispatch.done_putting()
try:
if self._action_dispatching_task is not None:
await self._action_dispatching_task
self._action_dispatching_task = None
finally:
if self._subscription_exit_stack is not None:
self._subscription_exit_stack.close()
self._subscription_exit_stack = None

def handle_action(self, action: pe_actions.Action) -> None:
"""React to a ProtocolEngine action."""
Expand All @@ -100,29 +136,55 @@ def handle_action(self, action: pe_actions.Action) -> None:
):
self._hardware_api.pause(HardwarePauseType.PAUSE)

def _dispatch_legacy_command(self, command: LegacyCommand) -> None:
def _handle_legacy_command(self, command: LegacyCommand) -> None:
"""Handle a command reported by the APIv2 protocol.

Used as a broker callback, so this will run in the APIv2 protocol's thread.
"""
pe_actions = self._legacy_command_mapper.map_command(command=command)
for action in pe_actions:
self.dispatch_threadsafe(action)
for pe_action in pe_actions:
self._actions_to_dispatch.put(pe_action)

def _dispatch_labware_loaded(
self, labware_load_info: LegacyLabwareLoadInfo
) -> None:
def _handle_labware_loaded(self, labware_load_info: LegacyLabwareLoadInfo) -> None:
"""Handle a labware load reported by the APIv2 protocol.

Used as a broker callback, so this will run in the APIv2 protocol's thread.
"""
pe_command = self._legacy_command_mapper.map_labware_load(
labware_load_info=labware_load_info
)
self.dispatch_threadsafe(pe_actions.UpdateCommandAction(command=pe_command))
pe_action = pe_actions.UpdateCommandAction(command=pe_command)
self._actions_to_dispatch.put(pe_action)

def _dispatch_instrument_loaded(
def _handle_instrument_loaded(
self, instrument_load_info: LegacyInstrumentLoadInfo
) -> None:
"""Handle an instrument (pipette) load reported by the APIv2 protocol.

Used as a broker callback, so this will run in the APIv2 protocol's thread.
"""
pe_command = self._legacy_command_mapper.map_instrument_load(
instrument_load_info=instrument_load_info
)
self.dispatch_threadsafe(pe_actions.UpdateCommandAction(command=pe_command))
pe_action = pe_actions.UpdateCommandAction(command=pe_command)
self._actions_to_dispatch.put(pe_action)

def _handle_module_loaded(self, module_load_info: LegacyModuleLoadInfo) -> None:
"""Handle a module load reported by the APIv2 protocol.

def _dispatch_module_loaded(self, module_load_info: LegacyModuleLoadInfo) -> None:
Used as a broker callback, so this will run in the APIv2 protocol's thread.
"""
pe_command = self._legacy_command_mapper.map_module_load(
module_load_info=module_load_info
)
self.dispatch_threadsafe(pe_actions.UpdateCommandAction(command=pe_command))
pe_action = pe_actions.UpdateCommandAction(command=pe_command)
self._actions_to_dispatch.put(pe_action)

async def _dispatch_all_actions(self) -> None:
"""Dispatch all actions to the `ProtocolEngine`.

Exits only when `self._actions_to_dispatch` is closed
(or an unexpected exception is raised).
"""
async for action in self._actions_to_dispatch.get_async_until_closed():
self.dispatch(action)
2 changes: 1 addition & 1 deletion api/src/opentrons/protocol_runner/legacy_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def execute(


__all__ = [
# Re-exports of main public API stuff:
# Re-exports of user-facing Python Protocol APIv2 stuff:
"LegacyProtocolContext",
"LegacyLabware",
"LegacyWell",
Expand Down
Loading