From c3dc40ece62775065b1c8700084c37af7cf12f1d Mon Sep 17 00:00:00 2001 From: Jeremy Leon Date: Mon, 11 Jul 2022 14:03:21 -0400 Subject: [PATCH] feat(engine): pause protocol execution if door is opened (#11021) Adds pausing protocol execution if a door state hardware notification is detected to protocol engine --- .../protocol_engine/actions/__init__.py | 4 +- .../protocol_engine/actions/actions.py | 8 +-- .../protocol_engine/execution/__init__.py | 4 +- ...are_event_forwarder.py => door_watcher.py} | 58 ++++++++++++------- .../protocol_engine/protocol_engine.py | 18 +++--- .../protocol_engine/state/commands.py | 15 ++--- .../protocol_runner/legacy_context_plugin.py | 12 +--- ...vent_forwarder.py => test_door_watcher.py} | 55 +++++++++++++----- .../state/test_command_store.py | 18 ++---- .../protocol_engine/test_protocol_engine.py | 20 +++---- .../test_legacy_context_plugin.py | 24 -------- 11 files changed, 119 insertions(+), 117 deletions(-) rename api/src/opentrons/protocol_engine/execution/{hardware_event_forwarder.py => door_watcher.py} (62%) rename api/tests/opentrons/protocol_engine/execution/{test_hardware_event_forwarder.py => test_door_watcher.py} (59%) diff --git a/api/src/opentrons/protocol_engine/actions/__init__.py b/api/src/opentrons/protocol_engine/actions/__init__.py index 63ef37282e1..ee6eec7a529 100644 --- a/api/src/opentrons/protocol_engine/actions/__init__.py +++ b/api/src/opentrons/protocol_engine/actions/__init__.py @@ -20,7 +20,7 @@ AddLabwareDefinitionAction, AddModuleAction, FinishErrorDetails, - HardwareEventAction, + DoorChangeAction, ) __all__ = [ @@ -41,7 +41,7 @@ "AddLabwareOffsetAction", "AddLabwareDefinitionAction", "AddModuleAction", - "HardwareEventAction", + "DoorChangeAction", # action payload values "PauseSource", "FinishErrorDetails", diff --git a/api/src/opentrons/protocol_engine/actions/actions.py b/api/src/opentrons/protocol_engine/actions/actions.py index 71daa25d5bd..fe67309bb1b 100644 --- a/api/src/opentrons/protocol_engine/actions/actions.py +++ b/api/src/opentrons/protocol_engine/actions/actions.py @@ -9,7 +9,7 @@ from typing import Optional, Union from opentrons.protocols.models import LabwareDefinition -from opentrons.hardware_control.types import HardwareEvent +from opentrons.hardware_control.types import DoorState from opentrons.hardware_control.modules import LiveData from ..commands import Command, CommandCreate @@ -80,10 +80,10 @@ class HardwareStoppedAction: @dataclass(frozen=True) -class HardwareEventAction: +class DoorChangeAction: """Handle events coming in from hardware control.""" - event: HardwareEvent + door_state: DoorState @dataclass(frozen=True) @@ -150,7 +150,7 @@ class AddModuleAction: StopAction, FinishAction, HardwareStoppedAction, - HardwareEventAction, + DoorChangeAction, QueueCommandAction, UpdateCommandAction, FailCommandAction, diff --git a/api/src/opentrons/protocol_engine/execution/__init__.py b/api/src/opentrons/protocol_engine/execution/__init__.py index a42a50bfe9b..96f090308ae 100644 --- a/api/src/opentrons/protocol_engine/execution/__init__.py +++ b/api/src/opentrons/protocol_engine/execution/__init__.py @@ -14,7 +14,7 @@ from .rail_lights import RailLightsHandler from .run_control import RunControlHandler from .hardware_stopper import HardwareStopper -from .hardware_event_forwarder import HardwareEventForwarder +from .door_watcher import DoorWatcher # .thermocycler_movement_flagger omitted from package's public interface. @@ -33,6 +33,6 @@ "QueueWorker", "RunControlHandler", "HardwareStopper", - "HardwareEventForwarder", + "DoorWatcher", "RailLightsHandler", ] diff --git a/api/src/opentrons/protocol_engine/execution/hardware_event_forwarder.py b/api/src/opentrons/protocol_engine/execution/door_watcher.py similarity index 62% rename from api/src/opentrons/protocol_engine/execution/hardware_event_forwarder.py rename to api/src/opentrons/protocol_engine/execution/door_watcher.py index 94ea56eaaf0..cc2c9e2766f 100644 --- a/api/src/opentrons/protocol_engine/execution/hardware_event_forwarder.py +++ b/api/src/opentrons/protocol_engine/execution/door_watcher.py @@ -1,4 +1,4 @@ -"""Forward events from a `HardwareControlAPI` into a `ProtocolEngine`.""" +"""Forward door events from a `HardwareControlAPI` into a `ProtocolEngine`.""" from __future__ import annotations @@ -6,31 +6,41 @@ from typing import Callable, Optional from opentrons.hardware_control import HardwareControlAPI -from opentrons.hardware_control.types import HardwareEvent +from opentrons.hardware_control.types import ( + HardwareEvent, + DoorStateNotification, + DoorState, + PauseType, +) -from opentrons.protocol_engine.actions import ActionDispatcher, HardwareEventAction +from opentrons.protocol_engine.actions import ActionDispatcher, DoorChangeAction + +from ..state import StateStore _UnsubscribeCallback = Callable[[], None] -class HardwareEventForwarder: - """Forward events from a `HardwareControlAPI` into a `ProtocolEngine`.""" +class DoorWatcher: + """Forward door events from a `HardwareControlAPI` into a `ProtocolEngine`.""" def __init__( self, + state_store: StateStore, hardware_api: HardwareControlAPI, action_dispatcher: ActionDispatcher, ) -> None: - """Initialize the HardwareEventForwarder. + """Initialize the DoorWatcher. Args: + state_store: The StateStore to check if the protocol queue is running. hardware_api: The HardwareControlAPI whose events we will listen for. Assumed to be running in a separate thread from action_dispatcher. action_dispatcher: The ActionDispatcher to dispatch actions into. Assumed to be owned by the same event loop that this - HardwareEventForwarder was constructed in. + DoorWatcher was constructed in. """ + self._state_store = state_store self._hardware_api = hardware_api self._action_dispatcher = action_dispatcher self._loop = get_running_loop() @@ -40,7 +50,7 @@ def start(self) -> None: """Subscribe to hardware events and start forwarding them as PE actions.""" if self._unsubscribe_callback is None: self._unsubscribe_callback = self._hardware_api.register_callback( - self._handle_hardware_event + self._handle_hardware_door_event ) # todo(mm, 2022-02-01): @@ -60,8 +70,8 @@ def stop_soon(self) -> None: self._unsubscribe_callback() self._unsubscribe_callback = None - def _handle_hardware_event(self, event: HardwareEvent) -> None: - """Handle a hardware event, ensuring thread-safety. + def _handle_hardware_door_event(self, event: HardwareEvent) -> None: + """Handle a door state hardware event, ensuring thread-safety. This is used as a callback for HardwareControlAPI.register_callback(), and it's run inside the hardware thread. @@ -71,22 +81,30 @@ def _handle_hardware_event(self, event: HardwareEvent) -> None: is running in, that may take a moment, and the hardware thread may be blocked. This method will deadlock if it's ever run from the same thread that - owns the event loop that this HardwareEventForwarder was constructed in. + owns the event loop that this DoorWatcher was constructed in. """ - action = HardwareEventAction(event=event) - coroutine = self._dispatch_action(action) - future = run_coroutine_threadsafe(coroutine, self._loop) - # Wait for the dispatch to complete before returning, - # which is important for ordering guarantees. - future.result() - - async def _dispatch_action(self, action: HardwareEventAction) -> None: + if isinstance(event, DoorStateNotification): + action = DoorChangeAction(door_state=event.new_state) + coroutine = self._dispatch_action(action) + future = run_coroutine_threadsafe(coroutine, self._loop) + # Wait for the dispatch to complete before returning, + # which is important for ordering guarantees. + future.result() + + async def _dispatch_action(self, action: DoorChangeAction) -> None: """Dispatch an action into self._action_dispatcher. This must run in the event loop that owns self._action_dispatcher, for safety. - Defined as an async function so we can use this with + Defined as an async function, so we can use this with run_coroutine_threadsafe(), which lets us block until the dispatch completes. """ + if ( + self._state_store.commands.get_is_running() + and action.door_state == DoorState.OPEN + and self._state_store.config.block_on_door_open + ): + self._hardware_api.pause(PauseType.PAUSE) + self._action_dispatcher.dispatch(action) diff --git a/api/src/opentrons/protocol_engine/protocol_engine.py b/api/src/opentrons/protocol_engine/protocol_engine.py index b1c88b27101..107f10fdcd1 100644 --- a/api/src/opentrons/protocol_engine/protocol_engine.py +++ b/api/src/opentrons/protocol_engine/protocol_engine.py @@ -12,7 +12,7 @@ from .execution import ( QueueWorker, create_queue_worker, - HardwareEventForwarder, + DoorWatcher, HardwareStopper, ) from .state import StateStore, StateView @@ -50,7 +50,7 @@ def __init__( queue_worker: Optional[QueueWorker] = None, model_utils: Optional[ModelUtils] = None, hardware_stopper: Optional[HardwareStopper] = None, - hardware_event_forwarder: Optional[HardwareEventForwarder] = None, + door_watcher: Optional[DoorWatcher] = None, module_data_provider: Optional[ModuleDataProvider] = None, ) -> None: """Initialize a ProtocolEngine instance. @@ -79,17 +79,15 @@ def __init__( self._hardware_stopper = hardware_stopper or HardwareStopper( hardware_api=hardware_api, state_store=state_store ) - self._hardware_event_forwarder = ( - hardware_event_forwarder - or HardwareEventForwarder( - hardware_api=hardware_api, - action_dispatcher=self._action_dispatcher, - ) + self._door_watcher = door_watcher or DoorWatcher( + state_store=state_store, + hardware_api=hardware_api, + action_dispatcher=self._action_dispatcher, ) self._module_data_provider = module_data_provider or ModuleDataProvider() self._queue_worker.start() - self._hardware_event_forwarder.start() + self._door_watcher.start() @property def state_view(self) -> StateView: @@ -241,7 +239,7 @@ async def finish( # Note: After we stop listening, straggling events might be processed # concurrently to the below lines in this .finish() call, # or even after this .finish() call completes. - self._hardware_event_forwarder.stop_soon() + self._door_watcher.stop_soon() await self._hardware_stopper.do_stop_and_recover(drop_tips_and_home) diff --git a/api/src/opentrons/protocol_engine/state/commands.py b/api/src/opentrons/protocol_engine/state/commands.py index e3cab95ae15..72a8a7551d2 100644 --- a/api/src/opentrons/protocol_engine/state/commands.py +++ b/api/src/opentrons/protocol_engine/state/commands.py @@ -8,7 +8,7 @@ from opentrons.ordered_set import OrderedSet -from opentrons.hardware_control.types import DoorStateNotification, DoorState +from opentrons.hardware_control.types import DoorState from ..actions import ( Action, @@ -20,7 +20,7 @@ StopAction, FinishAction, HardwareStoppedAction, - HardwareEventAction, + DoorChangeAction, ) from ..commands import Command, CommandStatus, CommandIntent @@ -340,16 +340,13 @@ def handle_action(self, action: Action) -> None: # noqa: C901 self._state.run_result = self._state.run_result or RunResult.STOPPED self._state.run_completed_at = action.completed_at - elif isinstance(action, HardwareEventAction): - if ( - isinstance(action.event, DoorStateNotification) - and self._config.block_on_door_open - ): - if action.event.new_state == DoorState.OPEN: + elif isinstance(action, DoorChangeAction): + if self._config.block_on_door_open: + if action.door_state == DoorState.OPEN: self._state.is_door_blocking = True if self._state.queue_status != QueueStatus.SETUP: self._state.queue_status = QueueStatus.PAUSED - elif action.event.new_state == DoorState.CLOSED: + elif action.door_state == DoorState.CLOSED: self._state.is_door_blocking = False diff --git a/api/src/opentrons/protocol_runner/legacy_context_plugin.py b/api/src/opentrons/protocol_runner/legacy_context_plugin.py index 09e013d1ff1..49089060a81 100644 --- a/api/src/opentrons/protocol_runner/legacy_context_plugin.py +++ b/api/src/opentrons/protocol_runner/legacy_context_plugin.py @@ -7,10 +7,6 @@ from opentrons.commands.types import CommandMessage as LegacyCommand from opentrons.hardware_control import HardwareControlAPI -from opentrons.hardware_control.types import ( - DoorStateNotification, - PauseType as HardwarePauseType, -) from opentrons.protocol_engine import AbstractPlugin, actions as pe_actions @@ -115,12 +111,8 @@ async def teardown(self) -> None: def handle_action(self, action: pe_actions.Action) -> None: """React to a ProtocolEngine action.""" - if ( - isinstance(action, pe_actions.HardwareEventAction) - and not self.state.commands.get_is_implicitly_active() - and isinstance(action.event, DoorStateNotification) - ): - self._hardware_api.pause(HardwarePauseType.PAUSE) + # TODO(jbl 2022-07-06) handle_action stub should be completely removed + pass def _handle_legacy_command(self, command: LegacyCommand) -> None: """Handle a command reported by the APIv2 protocol. diff --git a/api/tests/opentrons/protocol_engine/execution/test_hardware_event_forwarder.py b/api/tests/opentrons/protocol_engine/execution/test_door_watcher.py similarity index 59% rename from api/tests/opentrons/protocol_engine/execution/test_hardware_event_forwarder.py rename to api/tests/opentrons/protocol_engine/execution/test_door_watcher.py index d3c28ed7895..e287f991ace 100644 --- a/api/tests/opentrons/protocol_engine/execution/test_hardware_event_forwarder.py +++ b/api/tests/opentrons/protocol_engine/execution/test_door_watcher.py @@ -1,4 +1,4 @@ -"""Tests for hardware_event_forwarder.""" +"""Tests for door_watcher.""" from typing import cast @@ -12,14 +12,22 @@ DoorStateNotification, DoorState, HardwareEventHandler, + PauseType, ) -from opentrons.protocol_engine.actions import ActionDispatcher, HardwareEventAction -from opentrons.protocol_engine.execution.hardware_event_forwarder import ( - HardwareEventForwarder, +from opentrons.protocol_engine.actions import ActionDispatcher, DoorChangeAction +from opentrons.protocol_engine.state import StateStore +from opentrons.protocol_engine.execution.door_watcher import ( + DoorWatcher, ) +@pytest.fixture +def state_store(decoy: Decoy) -> StateStore: + """Get a mock in the shape of a StateStore.""" + return decoy.mock(cls=StateStore) + + @pytest.fixture def hardware_control_api( decoy: Decoy, @@ -36,20 +44,25 @@ def action_dispatcher(decoy: Decoy) -> ActionDispatcher: @pytest.fixture async def subject( - hardware_control_api: HardwareControlAPI, action_dispatcher: ActionDispatcher -) -> HardwareEventForwarder: - """Return a HardwareEventForwarder with mocked dependencies. + state_store: StateStore, + hardware_control_api: HardwareControlAPI, + action_dispatcher: ActionDispatcher, +) -> DoorWatcher: + """Return a DoorWatcher with mocked dependencies. - Async because HardwareEventForwarder's initializer requires a running event loop. + Async because DoorWatcher's initializer requires a running event loop. """ - return HardwareEventForwarder( - hardware_api=hardware_control_api, action_dispatcher=action_dispatcher + return DoorWatcher( + state_store=state_store, + hardware_api=hardware_control_api, + action_dispatcher=action_dispatcher, ) async def test_event_forwarding( decoy: Decoy, - subject: HardwareEventForwarder, + subject: DoorWatcher, + state_store: StateStore, hardware_control_api: HardwareControlAPI, action_dispatcher: ActionDispatcher, ) -> None: @@ -60,21 +73,35 @@ async def test_event_forwarding( unsubscribe_callback ) + decoy.when(state_store.commands.get_is_running()).then_return(True) + subject.start() captured_handler = cast(HardwareEventHandler, handler_captor.value) input_event = DoorStateNotification(new_state=DoorState.OPEN) - expected_action_to_forward = HardwareEventAction(input_event) + expected_action_to_forward = DoorChangeAction(DoorState.OPEN) await to_thread.run_sync(captured_handler, input_event) - decoy.verify(action_dispatcher.dispatch(expected_action_to_forward)) + decoy.verify( + hardware_control_api.pause(PauseType.PAUSE), + action_dispatcher.dispatch(expected_action_to_forward), + times=1, + ) + + decoy.reset() + input_event = DoorStateNotification(new_state=DoorState.CLOSED) + await to_thread.run_sync(captured_handler, input_event) + decoy.verify( + hardware_control_api.pause(PauseType.PAUSE), + times=0, + ) async def test_one_subscribe_one_unsubscribe( decoy: Decoy, hardware_control_api: HardwareControlAPI, - subject: HardwareEventForwarder, + subject: DoorWatcher, ) -> None: """Multiple start()s and stop()s should be collapsed.""" unsubscribe = decoy.mock() diff --git a/api/tests/opentrons/protocol_engine/state/test_command_store.py b/api/tests/opentrons/protocol_engine/state/test_command_store.py index d1057d94315..296ccae37a6 100644 --- a/api/tests/opentrons/protocol_engine/state/test_command_store.py +++ b/api/tests/opentrons/protocol_engine/state/test_command_store.py @@ -6,7 +6,7 @@ from opentrons.ordered_set import OrderedSet from opentrons.types import MountType, DeckSlotName -from opentrons.hardware_control.types import DoorStateNotification, DoorState +from opentrons.hardware_control.types import DoorState from opentrons.protocol_engine import commands, errors from opentrons.protocol_engine.types import DeckSlotLocation, PipetteName, WellLocation @@ -30,7 +30,7 @@ FinishErrorDetails, StopAction, HardwareStoppedAction, - HardwareEventAction, + DoorChangeAction, ) from .command_fixtures import ( @@ -913,15 +913,12 @@ def test_handles_door_open_and_close_event_before_play( """It should update state but not pause on door open whenis setup.""" subject = CommandStore(is_door_open=False, config=config) - door_open_event = DoorStateNotification(new_state=DoorState.OPEN) - door_close_event = DoorStateNotification(new_state=DoorState.CLOSED) - - subject.handle_action(HardwareEventAction(event=door_open_event)) + subject.handle_action(DoorChangeAction(door_state=DoorState.OPEN)) assert subject.state.queue_status == QueueStatus.SETUP assert subject.state.is_door_blocking is expected_is_door_blocking - subject.handle_action(HardwareEventAction(event=door_close_event)) + subject.handle_action(DoorChangeAction(door_state=DoorState.CLOSED)) assert subject.state.queue_status == QueueStatus.SETUP assert subject.state.is_door_blocking is False @@ -940,16 +937,13 @@ def test_handles_door_open_and_close_event_after_play( """It should update state when door opened and closed after run is played.""" subject = CommandStore(is_door_open=False, config=config) - door_open_event = DoorStateNotification(new_state=DoorState.OPEN) - door_close_event = DoorStateNotification(new_state=DoorState.CLOSED) - subject.handle_action(PlayAction(requested_at=datetime(year=2021, month=1, day=1))) - subject.handle_action(HardwareEventAction(event=door_open_event)) + subject.handle_action(DoorChangeAction(door_state=DoorState.OPEN)) assert subject.state.queue_status == expected_queue_status assert subject.state.is_door_blocking is expected_is_door_blocking - subject.handle_action(HardwareEventAction(event=door_close_event)) + subject.handle_action(DoorChangeAction(door_state=DoorState.CLOSED)) assert subject.state.queue_status == expected_queue_status assert subject.state.is_door_blocking is False diff --git a/api/tests/opentrons/protocol_engine/test_protocol_engine.py b/api/tests/opentrons/protocol_engine/test_protocol_engine.py index 0183ed04d44..4db72a0f910 100644 --- a/api/tests/opentrons/protocol_engine/test_protocol_engine.py +++ b/api/tests/opentrons/protocol_engine/test_protocol_engine.py @@ -25,7 +25,7 @@ from opentrons.protocol_engine.execution import ( QueueWorker, HardwareStopper, - HardwareEventForwarder, + DoorWatcher, ) from opentrons.protocol_engine.resources import ModelUtils, ModuleDataProvider from opentrons.protocol_engine.state import StateStore @@ -90,9 +90,9 @@ def hardware_stopper(decoy: Decoy) -> HardwareStopper: @pytest.fixture -def hardware_event_forwarder(decoy: Decoy) -> HardwareEventForwarder: - """Get a mock HardwareListener.""" - return decoy.mock(cls=HardwareEventForwarder) +def door_watcher(decoy: Decoy) -> DoorWatcher: + """Get a mock DoorWatcher.""" + return decoy.mock(cls=DoorWatcher) @pytest.fixture @@ -110,7 +110,7 @@ def subject( queue_worker: QueueWorker, model_utils: ModelUtils, hardware_stopper: HardwareStopper, - hardware_event_forwarder: HardwareEventForwarder, + door_watcher: DoorWatcher, module_data_provider: ModuleDataProvider, ) -> ProtocolEngine: """Get a ProtocolEngine test subject with its dependencies stubbed out.""" @@ -122,7 +122,7 @@ def subject( queue_worker=queue_worker, model_utils=model_utils, hardware_stopper=hardware_stopper, - hardware_event_forwarder=hardware_event_forwarder, + door_watcher=door_watcher, module_data_provider=module_data_provider, ) @@ -130,11 +130,11 @@ def subject( def test_create_starts_background_tasks( decoy: Decoy, queue_worker: QueueWorker, - hardware_event_forwarder: HardwareEventForwarder, + door_watcher: DoorWatcher, subject: ProtocolEngine, ) -> None: """It should start the queue worker upon creation.""" - decoy.verify(queue_worker.start(), hardware_event_forwarder.start()) + decoy.verify(queue_worker.start(), door_watcher.start()) def test_add_command( @@ -445,7 +445,7 @@ async def test_finish_stops_hardware_if_queue_worker_join_fails( queue_worker: QueueWorker, hardware_api: HardwareControlAPI, hardware_stopper: HardwareStopper, - hardware_event_forwarder: HardwareEventForwarder, + door_watcher: DoorWatcher, action_dispatcher: ActionDispatcher, plugin_starter: PluginStarter, subject: ProtocolEngine, @@ -464,7 +464,7 @@ async def test_finish_stops_hardware_if_queue_worker_join_fails( await subject.finish() decoy.verify( - hardware_event_forwarder.stop_soon(), + door_watcher.stop_soon(), await hardware_stopper.do_stop_and_recover(drop_tips_and_home=True), action_dispatcher.dispatch(HardwareStoppedAction(completed_at=completed_at)), await plugin_starter.stop(), diff --git a/api/tests/opentrons/protocol_runner/test_legacy_context_plugin.py b/api/tests/opentrons/protocol_runner/test_legacy_context_plugin.py index 4e6e5f80462..beb4efc1830 100644 --- a/api/tests/opentrons/protocol_runner/test_legacy_context_plugin.py +++ b/api/tests/opentrons/protocol_runner/test_legacy_context_plugin.py @@ -7,11 +7,6 @@ from opentrons.commands.types import CommandMessage as LegacyCommand, PauseMessage from opentrons.hardware_control import API as HardwareAPI -from opentrons.hardware_control.types import ( - PauseType, - DoorStateNotification, - DoorState, -) from opentrons.protocol_engine import ( StateView, actions as pe_actions, @@ -80,25 +75,6 @@ def subject( return plugin -def test_hardware_event_action( - decoy: Decoy, - hardware_api: HardwareAPI, - state_view: StateView, - subject: LegacyContextPlugin, -) -> None: - """It should pause the hardware controller upon a blocking HardwareEventAction.""" - door_open_event = DoorStateNotification(new_state=DoorState.OPEN) - decoy.when(state_view.commands.get_is_implicitly_active()).then_return(True) - subject.handle_action(pe_actions.HardwareEventAction(event=door_open_event)) - # Should not pause when engine queue is implicitly active - decoy.verify(hardware_api.pause(PauseType.PAUSE), times=0) - - decoy.when(state_view.commands.get_is_implicitly_active()).then_return(False) - subject.handle_action(pe_actions.HardwareEventAction(event=door_open_event)) - # Should pause - decoy.verify(hardware_api.pause(PauseType.PAUSE), times=1) - - async def test_broker_subscribe_unsubscribe( decoy: Decoy, legacy_context: LegacyProtocolContext,