Skip to content

Commit

Permalink
feat(engine): pause protocol execution if door is opened (#11021)
Browse files Browse the repository at this point in the history
Adds pausing protocol execution if a door state hardware notification is detected to protocol engine
  • Loading branch information
jbleon95 authored Jul 11, 2022
1 parent 4b958e7 commit c3dc40e
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 117 deletions.
4 changes: 2 additions & 2 deletions api/src/opentrons/protocol_engine/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
AddLabwareDefinitionAction,
AddModuleAction,
FinishErrorDetails,
HardwareEventAction,
DoorChangeAction,
)

__all__ = [
Expand All @@ -41,7 +41,7 @@
"AddLabwareOffsetAction",
"AddLabwareDefinitionAction",
"AddModuleAction",
"HardwareEventAction",
"DoorChangeAction",
# action payload values
"PauseSource",
"FinishErrorDetails",
Expand Down
8 changes: 4 additions & 4 deletions api/src/opentrons/protocol_engine/actions/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import Optional, Union

from opentrons.protocols.models import LabwareDefinition
from opentrons.hardware_control.types import HardwareEvent
from opentrons.hardware_control.types import DoorState
from opentrons.hardware_control.modules import LiveData

from ..commands import Command, CommandCreate
Expand Down Expand Up @@ -80,10 +80,10 @@ class HardwareStoppedAction:


@dataclass(frozen=True)
class HardwareEventAction:
class DoorChangeAction:
"""Handle events coming in from hardware control."""

event: HardwareEvent
door_state: DoorState


@dataclass(frozen=True)
Expand Down Expand Up @@ -150,7 +150,7 @@ class AddModuleAction:
StopAction,
FinishAction,
HardwareStoppedAction,
HardwareEventAction,
DoorChangeAction,
QueueCommandAction,
UpdateCommandAction,
FailCommandAction,
Expand Down
4 changes: 2 additions & 2 deletions api/src/opentrons/protocol_engine/execution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .rail_lights import RailLightsHandler
from .run_control import RunControlHandler
from .hardware_stopper import HardwareStopper
from .hardware_event_forwarder import HardwareEventForwarder
from .door_watcher import DoorWatcher

# .thermocycler_movement_flagger omitted from package's public interface.

Expand All @@ -33,6 +33,6 @@
"QueueWorker",
"RunControlHandler",
"HardwareStopper",
"HardwareEventForwarder",
"DoorWatcher",
"RailLightsHandler",
]
Original file line number Diff line number Diff line change
@@ -1,36 +1,46 @@
"""Forward events from a `HardwareControlAPI` into a `ProtocolEngine`."""
"""Forward door events from a `HardwareControlAPI` into a `ProtocolEngine`."""

from __future__ import annotations

from asyncio import get_running_loop, run_coroutine_threadsafe
from typing import Callable, Optional

from opentrons.hardware_control import HardwareControlAPI
from opentrons.hardware_control.types import HardwareEvent
from opentrons.hardware_control.types import (
HardwareEvent,
DoorStateNotification,
DoorState,
PauseType,
)

from opentrons.protocol_engine.actions import ActionDispatcher, HardwareEventAction
from opentrons.protocol_engine.actions import ActionDispatcher, DoorChangeAction

from ..state import StateStore


_UnsubscribeCallback = Callable[[], None]


class HardwareEventForwarder:
"""Forward events from a `HardwareControlAPI` into a `ProtocolEngine`."""
class DoorWatcher:
"""Forward door events from a `HardwareControlAPI` into a `ProtocolEngine`."""

def __init__(
self,
state_store: StateStore,
hardware_api: HardwareControlAPI,
action_dispatcher: ActionDispatcher,
) -> None:
"""Initialize the HardwareEventForwarder.
"""Initialize the DoorWatcher.
Args:
state_store: The StateStore to check if the protocol queue is running.
hardware_api: The HardwareControlAPI whose events we will listen for.
Assumed to be running in a separate thread from action_dispatcher.
action_dispatcher: The ActionDispatcher to dispatch actions into.
Assumed to be owned by the same event loop that this
HardwareEventForwarder was constructed in.
DoorWatcher was constructed in.
"""
self._state_store = state_store
self._hardware_api = hardware_api
self._action_dispatcher = action_dispatcher
self._loop = get_running_loop()
Expand All @@ -40,7 +50,7 @@ def start(self) -> None:
"""Subscribe to hardware events and start forwarding them as PE actions."""
if self._unsubscribe_callback is None:
self._unsubscribe_callback = self._hardware_api.register_callback(
self._handle_hardware_event
self._handle_hardware_door_event
)

# todo(mm, 2022-02-01):
Expand All @@ -60,8 +70,8 @@ def stop_soon(self) -> None:
self._unsubscribe_callback()
self._unsubscribe_callback = None

def _handle_hardware_event(self, event: HardwareEvent) -> None:
"""Handle a hardware event, ensuring thread-safety.
def _handle_hardware_door_event(self, event: HardwareEvent) -> None:
"""Handle a door state hardware event, ensuring thread-safety.
This is used as a callback for HardwareControlAPI.register_callback(),
and it's run inside the hardware thread.
Expand All @@ -71,22 +81,30 @@ def _handle_hardware_event(self, event: HardwareEvent) -> None:
is running in, that may take a moment, and the hardware thread may be blocked.
This method will deadlock if it's ever run from the same thread that
owns the event loop that this HardwareEventForwarder was constructed in.
owns the event loop that this DoorWatcher was constructed in.
"""
action = HardwareEventAction(event=event)
coroutine = self._dispatch_action(action)
future = run_coroutine_threadsafe(coroutine, self._loop)
# Wait for the dispatch to complete before returning,
# which is important for ordering guarantees.
future.result()

async def _dispatch_action(self, action: HardwareEventAction) -> None:
if isinstance(event, DoorStateNotification):
action = DoorChangeAction(door_state=event.new_state)
coroutine = self._dispatch_action(action)
future = run_coroutine_threadsafe(coroutine, self._loop)
# Wait for the dispatch to complete before returning,
# which is important for ordering guarantees.
future.result()

async def _dispatch_action(self, action: DoorChangeAction) -> None:
"""Dispatch an action into self._action_dispatcher.
This must run in the event loop that owns self._action_dispatcher, for safety.
Defined as an async function so we can use this with
Defined as an async function, so we can use this with
run_coroutine_threadsafe(), which lets us block until
the dispatch completes.
"""
if (
self._state_store.commands.get_is_running()
and action.door_state == DoorState.OPEN
and self._state_store.config.block_on_door_open
):
self._hardware_api.pause(PauseType.PAUSE)

self._action_dispatcher.dispatch(action)
18 changes: 8 additions & 10 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .execution import (
QueueWorker,
create_queue_worker,
HardwareEventForwarder,
DoorWatcher,
HardwareStopper,
)
from .state import StateStore, StateView
Expand Down Expand Up @@ -50,7 +50,7 @@ def __init__(
queue_worker: Optional[QueueWorker] = None,
model_utils: Optional[ModelUtils] = None,
hardware_stopper: Optional[HardwareStopper] = None,
hardware_event_forwarder: Optional[HardwareEventForwarder] = None,
door_watcher: Optional[DoorWatcher] = None,
module_data_provider: Optional[ModuleDataProvider] = None,
) -> None:
"""Initialize a ProtocolEngine instance.
Expand Down Expand Up @@ -79,17 +79,15 @@ def __init__(
self._hardware_stopper = hardware_stopper or HardwareStopper(
hardware_api=hardware_api, state_store=state_store
)
self._hardware_event_forwarder = (
hardware_event_forwarder
or HardwareEventForwarder(
hardware_api=hardware_api,
action_dispatcher=self._action_dispatcher,
)
self._door_watcher = door_watcher or DoorWatcher(
state_store=state_store,
hardware_api=hardware_api,
action_dispatcher=self._action_dispatcher,
)
self._module_data_provider = module_data_provider or ModuleDataProvider()

self._queue_worker.start()
self._hardware_event_forwarder.start()
self._door_watcher.start()

@property
def state_view(self) -> StateView:
Expand Down Expand Up @@ -241,7 +239,7 @@ async def finish(
# Note: After we stop listening, straggling events might be processed
# concurrently to the below lines in this .finish() call,
# or even after this .finish() call completes.
self._hardware_event_forwarder.stop_soon()
self._door_watcher.stop_soon()

await self._hardware_stopper.do_stop_and_recover(drop_tips_and_home)

Expand Down
15 changes: 6 additions & 9 deletions api/src/opentrons/protocol_engine/state/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from opentrons.ordered_set import OrderedSet

from opentrons.hardware_control.types import DoorStateNotification, DoorState
from opentrons.hardware_control.types import DoorState

from ..actions import (
Action,
Expand All @@ -20,7 +20,7 @@
StopAction,
FinishAction,
HardwareStoppedAction,
HardwareEventAction,
DoorChangeAction,
)

from ..commands import Command, CommandStatus, CommandIntent
Expand Down Expand Up @@ -340,16 +340,13 @@ def handle_action(self, action: Action) -> None: # noqa: C901
self._state.run_result = self._state.run_result or RunResult.STOPPED
self._state.run_completed_at = action.completed_at

elif isinstance(action, HardwareEventAction):
if (
isinstance(action.event, DoorStateNotification)
and self._config.block_on_door_open
):
if action.event.new_state == DoorState.OPEN:
elif isinstance(action, DoorChangeAction):
if self._config.block_on_door_open:
if action.door_state == DoorState.OPEN:
self._state.is_door_blocking = True
if self._state.queue_status != QueueStatus.SETUP:
self._state.queue_status = QueueStatus.PAUSED
elif action.event.new_state == DoorState.CLOSED:
elif action.door_state == DoorState.CLOSED:
self._state.is_door_blocking = False


Expand Down
12 changes: 2 additions & 10 deletions api/src/opentrons/protocol_runner/legacy_context_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@

from opentrons.commands.types import CommandMessage as LegacyCommand
from opentrons.hardware_control import HardwareControlAPI
from opentrons.hardware_control.types import (
DoorStateNotification,
PauseType as HardwarePauseType,
)

from opentrons.protocol_engine import AbstractPlugin, actions as pe_actions

Expand Down Expand Up @@ -115,12 +111,8 @@ async def teardown(self) -> None:

def handle_action(self, action: pe_actions.Action) -> None:
"""React to a ProtocolEngine action."""
if (
isinstance(action, pe_actions.HardwareEventAction)
and not self.state.commands.get_is_implicitly_active()
and isinstance(action.event, DoorStateNotification)
):
self._hardware_api.pause(HardwarePauseType.PAUSE)
# TODO(jbl 2022-07-06) handle_action stub should be completely removed
pass

def _handle_legacy_command(self, command: LegacyCommand) -> None:
"""Handle a command reported by the APIv2 protocol.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Tests for hardware_event_forwarder."""
"""Tests for door_watcher."""


from typing import cast
Expand All @@ -12,14 +12,22 @@
DoorStateNotification,
DoorState,
HardwareEventHandler,
PauseType,
)

from opentrons.protocol_engine.actions import ActionDispatcher, HardwareEventAction
from opentrons.protocol_engine.execution.hardware_event_forwarder import (
HardwareEventForwarder,
from opentrons.protocol_engine.actions import ActionDispatcher, DoorChangeAction
from opentrons.protocol_engine.state import StateStore
from opentrons.protocol_engine.execution.door_watcher import (
DoorWatcher,
)


@pytest.fixture
def state_store(decoy: Decoy) -> StateStore:
"""Get a mock in the shape of a StateStore."""
return decoy.mock(cls=StateStore)


@pytest.fixture
def hardware_control_api(
decoy: Decoy,
Expand All @@ -36,20 +44,25 @@ def action_dispatcher(decoy: Decoy) -> ActionDispatcher:

@pytest.fixture
async def subject(
hardware_control_api: HardwareControlAPI, action_dispatcher: ActionDispatcher
) -> HardwareEventForwarder:
"""Return a HardwareEventForwarder with mocked dependencies.
state_store: StateStore,
hardware_control_api: HardwareControlAPI,
action_dispatcher: ActionDispatcher,
) -> DoorWatcher:
"""Return a DoorWatcher with mocked dependencies.
Async because HardwareEventForwarder's initializer requires a running event loop.
Async because DoorWatcher's initializer requires a running event loop.
"""
return HardwareEventForwarder(
hardware_api=hardware_control_api, action_dispatcher=action_dispatcher
return DoorWatcher(
state_store=state_store,
hardware_api=hardware_control_api,
action_dispatcher=action_dispatcher,
)


async def test_event_forwarding(
decoy: Decoy,
subject: HardwareEventForwarder,
subject: DoorWatcher,
state_store: StateStore,
hardware_control_api: HardwareControlAPI,
action_dispatcher: ActionDispatcher,
) -> None:
Expand All @@ -60,21 +73,35 @@ async def test_event_forwarding(
unsubscribe_callback
)

decoy.when(state_store.commands.get_is_running()).then_return(True)

subject.start()

captured_handler = cast(HardwareEventHandler, handler_captor.value)

input_event = DoorStateNotification(new_state=DoorState.OPEN)
expected_action_to_forward = HardwareEventAction(input_event)
expected_action_to_forward = DoorChangeAction(DoorState.OPEN)

await to_thread.run_sync(captured_handler, input_event)
decoy.verify(action_dispatcher.dispatch(expected_action_to_forward))
decoy.verify(
hardware_control_api.pause(PauseType.PAUSE),
action_dispatcher.dispatch(expected_action_to_forward),
times=1,
)

decoy.reset()
input_event = DoorStateNotification(new_state=DoorState.CLOSED)
await to_thread.run_sync(captured_handler, input_event)
decoy.verify(
hardware_control_api.pause(PauseType.PAUSE),
times=0,
)


async def test_one_subscribe_one_unsubscribe(
decoy: Decoy,
hardware_control_api: HardwareControlAPI,
subject: HardwareEventForwarder,
subject: DoorWatcher,
) -> None:
"""Multiple start()s and stop()s should be collapsed."""
unsubscribe = decoy.mock()
Expand Down
Loading

0 comments on commit c3dc40e

Please sign in to comment.