From 83d1cb82c76de672916b3e0e649abc3dd4194a75 Mon Sep 17 00:00:00 2001 From: Max Marrone Date: Wed, 5 Jan 2022 16:32:05 -0500 Subject: [PATCH] WIP --- api/src/opentrons/protocol_engine/plugins.py | 5 +- .../protocol_runner/legacy_context_plugin.py | 107 +++++++++++++++--- .../protocol_runner/protocol_runner.py | 24 +++- .../test_legacy_context_plugin.py | 10 +- 4 files changed, 122 insertions(+), 24 deletions(-) diff --git a/api/src/opentrons/protocol_engine/plugins.py b/api/src/opentrons/protocol_engine/plugins.py index cff921f5146..f2cf410d11e 100644 --- a/api/src/opentrons/protocol_engine/plugins.py +++ b/api/src/opentrons/protocol_engine/plugins.py @@ -52,7 +52,10 @@ def dispatch_threadsafe(self, action: Action) -> None: return from_thread.run_sync(self._action_dispatcher.dispatch, action) def setup(self) -> None: - """Run any necessary setup steps prior to plugin usage.""" + """Run any necessary setup steps prior to plugin usage. + + Called when the plugin is added to a `ProtocolEngine`. + """ ... def teardown(self) -> None: diff --git a/api/src/opentrons/protocol_runner/legacy_context_plugin.py b/api/src/opentrons/protocol_runner/legacy_context_plugin.py index c77f6559292..b499b6531ef 100644 --- a/api/src/opentrons/protocol_runner/legacy_context_plugin.py +++ b/api/src/opentrons/protocol_runner/legacy_context_plugin.py @@ -1,6 +1,12 @@ """Customize the ProtocolEngine to monitor and control legacy (APIv2) protocols.""" from __future__ import annotations -from typing import Callable, Optional, NamedTuple + +import asyncio +from typing import ( + Callable, + Optional, + NamedTuple, +) from opentrons.commands.types import CommandMessage as LegacyCommand from opentrons.hardware_control import HardwareControlAPI @@ -14,6 +20,7 @@ LegacyModuleLoadInfo, ) from .legacy_command_mapper import LegacyCommandMapper +from .thread_async_queue import ThreadAsyncQueue, QueueClosed class ContextUnsubscribe(NamedTuple): @@ -25,6 +32,34 @@ class ContextUnsubscribe(NamedTuple): module_broker: Callable[[], None] +# NOTES: +# +# Worker thread queues up actions at its leisure +# +# Shoveler task doesn't need to support cancellation +# because the queue of actions should be bounded and small at that point. +# +# +# legacy.plugin.teardown() called automatically at some point +# +# Is it okay for tearing down the plugin to be blocking? +# +# How do I get an async event loop task to block until either: +# * a single item can be retrieved from a queue.Queue (not an asyncio.Queue) +# * the queue is closed +# And sleep (yield) while blocking + + +# Before we close the ProtocolEngine, any scheduled actions from the +# legacy mapper must have made it in to that ProtocolEngine + +# .finalize method + +# TODO: Confirm that things get cleaned up when you cancel a protocol. + +# TODO: Run to ground why this is hanging + + class LegacyContextPlugin(AbstractPlugin): """A ProtocolEngine plugin wrapping a legacy ProtocolContext. @@ -56,22 +91,31 @@ def __init__( self._legacy_command_mapper = legacy_command_mapper or LegacyCommandMapper() self._unsubscribe: Optional[ContextUnsubscribe] = None + self._actions_to_dispatch = ThreadAsyncQueue[pe_actions.Action]() + self._action_shoveling_task: Optional[asyncio.Task[None]] = None + def setup(self) -> None: - """Set up subscriptions to the context's message brokers.""" + """Set up the plugin. + + Called by Protocol Engine when the plugin is added to it. + + * Subscribe to the legacy context's message brokers. + * Prepare a background task to pass legacy commands to Protocol Engine. + """ context = self._protocol_context command_unsubscribe = context.broker.subscribe( topic="command", - handler=self._dispatch_legacy_command, + handler=self._handle_legacy_command, ) labware_unsubscribe = context.labware_load_broker.subscribe( - callback=self._dispatch_labware_loaded + callback=self._handle_labware_loaded ) pipette_unsubscribe = context.instrument_load_broker.subscribe( - callback=self._dispatch_instrument_loaded + callback=self._handle_instrument_loaded ) module_unsubscribe = context.module_load_broker.subscribe( - callback=self._dispatch_module_loaded + callback=self._handle_module_loaded ) self._unsubscribe = ContextUnsubscribe( @@ -81,14 +125,33 @@ def setup(self) -> None: module_broker=module_unsubscribe, ) + self._action_shoveling_task = asyncio.create_task(self._shovel_all_actions()) + def teardown(self) -> None: - """Unsubscribe from the context's message brokers.""" + """Unsubscribe from the context's message brokers. + + Called by Protocol Engine when the engine stops. + """ if self._unsubscribe: for unsubscribe in self._unsubscribe: unsubscribe() self._unsubcribe = None + async def finalize(self) -> None: + """Inform the plugin that the APIv2 script has exited. + + This method will wait until the plugin is finished sending commands + into the Protocol Engine, and then return. + You must call this method *before* stopping the Protocol Engine, + so those commands make it in first. + """ + # TODO: Think through whether this can be merged into teardown(). + # TODO: What if the engine is never started? + if self._action_shoveling_task is not None: + self._actions_to_dispatch.done_putting() + await self._action_shoveling_task + def handle_action(self, action: pe_actions.Action) -> None: """React to a ProtocolEngine action.""" if isinstance(action, pe_actions.PlayAction): @@ -100,29 +163,39 @@ def handle_action(self, action: pe_actions.Action) -> None: ): self._hardware_api.pause(HardwarePauseType.PAUSE) - def _dispatch_legacy_command(self, command: LegacyCommand) -> None: + def _handle_legacy_command(self, command: LegacyCommand) -> None: pe_actions = self._legacy_command_mapper.map_command(command=command) for action in pe_actions: - self.dispatch_threadsafe(action) + self._actions_to_dispatch.put(action) - def _dispatch_labware_loaded( - self, labware_load_info: LegacyLabwareLoadInfo - ) -> None: + def _handle_labware_loaded(self, labware_load_info: LegacyLabwareLoadInfo) -> None: pe_command = self._legacy_command_mapper.map_labware_load( labware_load_info=labware_load_info ) - self.dispatch_threadsafe(pe_actions.UpdateCommandAction(command=pe_command)) + action = pe_actions.UpdateCommandAction(command=pe_command) + self._actions_to_dispatch.put(action) - def _dispatch_instrument_loaded( + def _handle_instrument_loaded( self, instrument_load_info: LegacyInstrumentLoadInfo ) -> None: pe_command = self._legacy_command_mapper.map_instrument_load( instrument_load_info=instrument_load_info ) - self.dispatch_threadsafe(pe_actions.UpdateCommandAction(command=pe_command)) + action = pe_actions.UpdateCommandAction(command=pe_command) + self._actions_to_dispatch.put(action) - def _dispatch_module_loaded(self, module_load_info: LegacyModuleLoadInfo) -> None: + def _handle_module_loaded(self, module_load_info: LegacyModuleLoadInfo) -> None: pe_command = self._legacy_command_mapper.map_module_load( module_load_info=module_load_info ) - self.dispatch_threadsafe(pe_actions.UpdateCommandAction(command=pe_command)) + action = pe_actions.UpdateCommandAction(command=pe_command) + self._actions_to_dispatch.put(action) + + async def _shovel_all_actions(self) -> None: + while True: + try: + action = await self._actions_to_dispatch.get_async() + except QueueClosed: + break + else: + self.dispatch(action) diff --git a/api/src/opentrons/protocol_runner/protocol_runner.py b/api/src/opentrons/protocol_runner/protocol_runner.py index d603071bde3..5d85a8acddd 100644 --- a/api/src/opentrons/protocol_runner/protocol_runner.py +++ b/api/src/opentrons/protocol_runner/protocol_runner.py @@ -1,4 +1,5 @@ """Protocol run control and management.""" +import asyncio from dataclasses import dataclass from typing import List, Optional @@ -179,6 +180,9 @@ def _load_python(self, protocol_source: ProtocolSource) -> None: protocol=protocol, context=context, ) + # ensure the engine is stopped gracefully once the + # protocol file stops issuing commands + self._task_queue.set_cleanup_func(self._protocol_engine.finish) def _load_legacy( self, @@ -187,15 +191,25 @@ def _load_legacy( protocol = self._legacy_file_reader.read(protocol_source) context = self._legacy_context_creator.create(protocol) - self._protocol_engine.add_plugin( - LegacyContextPlugin( - hardware_api=self._hardware_api, - protocol_context=context, - ) + legacy_plugin = LegacyContextPlugin( + hardware_api=self._hardware_api, + protocol_context=context, ) + self._protocol_engine.add_plugin(legacy_plugin) + self._task_queue.set_run_func( func=self._legacy_executor.execute, protocol=protocol, context=context, ) + + async def clean_up(error: Optional[Exception]) -> None: + try: + await legacy_plugin.finalize() + finally: + # ensure the engine is stopped gracefully once the + # protocol file stops issuing commands + await self._protocol_engine.finish(error) + + self._task_queue.set_cleanup_func(clean_up) diff --git a/api/tests/opentrons/protocol_runner/test_legacy_context_plugin.py b/api/tests/opentrons/protocol_runner/test_legacy_context_plugin.py index 053bc1fb168..68deacdcbf3 100644 --- a/api/tests/opentrons/protocol_runner/test_legacy_context_plugin.py +++ b/api/tests/opentrons/protocol_runner/test_legacy_context_plugin.py @@ -106,13 +106,14 @@ def test_pause_action( decoy.verify(hardware_api.pause(PauseType.PAUSE), times=1) -def test_broker_subscribe_unsubscribe( +async def test_broker_subscribe_unsubscribe( decoy: Decoy, legacy_context: LegacyProtocolContext, legacy_command_mapper: LegacyCommandMapper, subject: LegacyContextPlugin, ) -> None: """It should subscribe to the brokers on setup and unsubscribe on teardown.""" + print("Yes, the test is running") main_unsubscribe: Callable[[], None] = decoy.mock() labware_unsubscribe: Callable[[], None] = decoy.mock() instrument_unsubscribe: Callable[[], None] = decoy.mock() @@ -134,8 +135,12 @@ def test_broker_subscribe_unsubscribe( legacy_context.module_load_broker.subscribe(callback=matchers.Anything()) ).then_return(module_unsubscribe) + print("About to setup") subject.setup() + print("Done setup, about to teardown") + await subject.finalize() subject.teardown() + print("Done teardown") decoy.verify( main_unsubscribe(), @@ -143,6 +148,7 @@ def test_broker_subscribe_unsubscribe( instrument_unsubscribe(), module_unsubscribe(), ) + print("Done verify") async def test_main_broker_messages( @@ -184,6 +190,8 @@ async def test_main_broker_messages( await to_thread.run_sync(handler, legacy_command) + await subject.finalize() + decoy.verify( action_dispatcher.dispatch(pe_actions.UpdateCommandAction(engine_command)) )