Skip to content

Commit

Permalink
feat(api): Allow treating errors as false-positives (ignore them and …
Browse files Browse the repository at this point in the history
…continue with the run) (#16556)
  • Loading branch information
SyntaxColoring authored Oct 28, 2024
1 parent 288b8a4 commit 24fcc0d
Show file tree
Hide file tree
Showing 36 changed files with 560 additions and 148 deletions.
4 changes: 2 additions & 2 deletions api/src/opentrons/protocol_engine/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
SetPipetteMovementSpeedAction,
AddAbsorbanceReaderLidAction,
)
from .get_state_update import get_state_update
from .get_state_update import get_state_updates

__all__ = [
# action pipeline interface
Expand Down Expand Up @@ -63,5 +63,5 @@
"PauseSource",
"FinishErrorDetails",
# helper functions
"get_state_update",
"get_state_updates",
]
2 changes: 1 addition & 1 deletion api/src/opentrons/protocol_engine/actions/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class StopAction:
class ResumeFromRecoveryAction:
"""See `ProtocolEngine.resume_from_recovery()`."""

pass
state_update: StateUpdate


@dataclasses.dataclass(frozen=True)
Expand Down
29 changes: 23 additions & 6 deletions api/src/opentrons/protocol_engine/actions/get_state_update.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
# noqa: D100


from .actions import Action, SucceedCommandAction, FailCommandAction
from .actions import (
Action,
ResumeFromRecoveryAction,
SucceedCommandAction,
FailCommandAction,
)
from ..commands.command import DefinedErrorData
from ..error_recovery_policy import ErrorRecoveryType
from ..state.update_types import StateUpdate


def get_state_update(action: Action) -> StateUpdate | None:
"""Extract the StateUpdate from an action, if there is one."""
def get_state_updates(action: Action) -> list[StateUpdate]:
"""Extract all the StateUpdates that the StateStores should apply when they apply an action."""
if isinstance(action, SucceedCommandAction):
return action.state_update
return [action.state_update]

elif isinstance(action, FailCommandAction) and isinstance(
action.error, DefinedErrorData
):
return action.error.state_update
if action.type == ErrorRecoveryType.ASSUME_FALSE_POSITIVE_AND_CONTINUE:
return [
action.error.state_update,
action.error.state_update_if_false_positive,
]
else:
return [action.error.state_update]

elif isinstance(action, ResumeFromRecoveryAction):
return [action.state_update]

else:
return None
return []
4 changes: 4 additions & 0 deletions api/src/opentrons/protocol_engine/commands/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ class DefinedErrorData(Generic[_ErrorT_co]):
)
"""How the engine state should be updated to reflect this command failure."""

state_update_if_false_positive: StateUpdate = dataclasses.field(
default_factory=StateUpdate
)


class BaseCommand(
GenericModel,
Expand Down
10 changes: 9 additions & 1 deletion api/src/opentrons/protocol_engine/commands/drop_tip.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,15 @@ async def execute(self, params: DropTipParams) -> _ExecuteReturn:
)
],
)
return DefinedErrorData(public=error, state_update=state_update)
state_update_if_false_positive = update_types.StateUpdate()
state_update_if_false_positive.update_pipette_tip_state(
pipette_id=params.pipetteId, tip_geometry=None
)
return DefinedErrorData(
public=error,
state_update=state_update,
state_update_if_false_positive=state_update_if_false_positive,
)
else:
state_update.update_pipette_tip_state(
pipette_id=params.pipetteId, tip_geometry=None
Expand Down
10 changes: 9 additions & 1 deletion api/src/opentrons/protocol_engine/commands/drop_tip_in_place.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ async def execute(self, params: DropTipInPlaceParams) -> _ExecuteReturn:
pipette_id=params.pipetteId, home_after=params.homeAfter
)
except TipAttachedError as exception:
state_update_if_false_positive = update_types.StateUpdate()
state_update_if_false_positive.update_pipette_tip_state(
pipette_id=params.pipetteId, tip_geometry=None
)
error = TipPhysicallyAttachedError(
id=self._model_utils.generate_id(),
createdAt=self._model_utils.get_timestamp(),
Expand All @@ -83,7 +87,11 @@ async def execute(self, params: DropTipInPlaceParams) -> _ExecuteReturn:
)
],
)
return DefinedErrorData(public=error, state_update=state_update)
return DefinedErrorData(
public=error,
state_update=state_update,
state_update_if_false_positive=state_update_if_false_positive,
)
else:
state_update.update_pipette_tip_state(
pipette_id=params.pipetteId, tip_geometry=None
Expand Down
10 changes: 8 additions & 2 deletions api/src/opentrons/protocol_engine/commands/pick_up_tip.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing_extensions import Literal


from ..errors import ErrorOccurrence, TipNotAttachedError
from ..errors import ErrorOccurrence, PickUpTipTipNotAttachedError
from ..resources import ModelUtils
from ..state import update_types
from ..types import PickUpTipWellLocation, DeckPoint
Expand Down Expand Up @@ -140,7 +140,12 @@ async def execute(
labware_id=labware_id,
well_name=well_name,
)
except TipNotAttachedError as e:
except PickUpTipTipNotAttachedError as e:
state_update_if_false_positive = update_types.StateUpdate()
state_update_if_false_positive.update_pipette_tip_state(
pipette_id=pipette_id,
tip_geometry=e.tip_geometry,
)
state_update.mark_tips_as_used(
pipette_id=pipette_id, labware_id=labware_id, well_name=well_name
)
Expand All @@ -157,6 +162,7 @@ async def execute(
],
),
state_update=state_update,
state_update_if_false_positive=state_update_if_false_positive,
)
else:
state_update.update_pipette_tip_state(
Expand Down
31 changes: 28 additions & 3 deletions api/src/opentrons/protocol_engine/create_protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@

from opentrons.hardware_control import HardwareControlAPI
from opentrons.hardware_control.types import DoorState
from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryPolicy
from opentrons.protocol_engine.execution.error_recovery_hardware_state_synchronizer import (
ErrorRecoveryHardwareStateSynchronizer,
)
from opentrons.util.async_helpers import async_context_manager_in_thread

from opentrons_shared_data.robot import load as load_robot

from .actions.action_dispatcher import ActionDispatcher
from .error_recovery_policy import ErrorRecoveryPolicy
from .execution.door_watcher import DoorWatcher
from .execution.hardware_stopper import HardwareStopper
from .plugins import PluginStarter
from .protocol_engine import ProtocolEngine
from .resources import DeckDataProvider, ModuleDataProvider, FileProvider
from .resources import DeckDataProvider, ModuleDataProvider, FileProvider, ModelUtils
from .state.config import Config
from .state.state import StateStore
from .types import PostRunHardwareState, DeckConfigurationType
Expand Down Expand Up @@ -61,10 +69,27 @@ async def create_protocol_engine(
deck_configuration=deck_configuration,
notify_publishers=notify_publishers,
)
hardware_state_synchronizer = ErrorRecoveryHardwareStateSynchronizer(
hardware_api, state_store
)
action_dispatcher = ActionDispatcher(state_store)
action_dispatcher.add_handler(hardware_state_synchronizer)
plugin_starter = PluginStarter(state_store, action_dispatcher)
model_utils = ModelUtils()
hardware_stopper = HardwareStopper(hardware_api, state_store)
door_watcher = DoorWatcher(state_store, hardware_api, action_dispatcher)
module_data_provider = ModuleDataProvider()
file_provider = file_provider or FileProvider()

return ProtocolEngine(
state_store=state_store,
hardware_api=hardware_api,
state_store=state_store,
action_dispatcher=action_dispatcher,
plugin_starter=plugin_starter,
model_utils=model_utils,
hardware_stopper=hardware_stopper,
door_watcher=door_watcher,
module_data_provider=module_data_provider,
file_provider=file_provider,
)

Expand Down
16 changes: 13 additions & 3 deletions api/src/opentrons/protocol_engine/error_recovery_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,20 @@ class ErrorRecoveryType(enum.Enum):
"""

WAIT_FOR_RECOVERY = enum.auto()
"""Stop and wait for the error to be recovered from manually."""
"""Enter interactive error recovery mode."""

IGNORE_AND_CONTINUE = enum.auto()
"""Continue with the run, as if the command never failed."""
CONTINUE_WITH_ERROR = enum.auto()
"""Continue without interruption, carrying on from whatever error state the failed
command left the engine in.
This is like `ProtocolEngine.resume_from_recovery(reconcile_false_positive=False)`.
"""

ASSUME_FALSE_POSITIVE_AND_CONTINUE = enum.auto()
"""Continue without interruption, acting as if the underlying error was a false positive.
This is like `ProtocolEngine.resume_from_recovery(reconcile_false_positive=True)`.
"""


class ErrorRecoveryPolicy(Protocol):
Expand Down
2 changes: 2 additions & 0 deletions api/src/opentrons/protocol_engine/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
InvalidSpecificationForRobotTypeError,
InvalidLoadPipetteSpecsError,
TipNotAttachedError,
PickUpTipTipNotAttachedError,
TipAttachedError,
CommandDoesNotExistError,
LabwareNotLoadedError,
Expand Down Expand Up @@ -89,6 +90,7 @@
"InvalidSpecificationForRobotTypeError",
"InvalidLoadPipetteSpecsError",
"TipNotAttachedError",
"PickUpTipTipNotAttachedError",
"TipAttachedError",
"CommandDoesNotExistError",
"LabwareNotLoadedError",
Expand Down
23 changes: 22 additions & 1 deletion api/src/opentrons/protocol_engine/errors/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
"""Protocol engine exceptions."""

from __future__ import annotations

from logging import getLogger
from typing import Any, Dict, Optional, Union, Iterator, Sequence
from typing import Any, Dict, Final, Optional, Union, Iterator, Sequence, TYPE_CHECKING

from opentrons_shared_data.errors import ErrorCodes
from opentrons_shared_data.errors.exceptions import EnumeratedError, PythonException

if TYPE_CHECKING:
from opentrons.protocol_engine.types import TipGeometry


log = getLogger(__name__)


Expand Down Expand Up @@ -132,6 +138,21 @@ def __init__(
super().__init__(ErrorCodes.UNEXPECTED_TIP_REMOVAL, message, details, wrapping)


class PickUpTipTipNotAttachedError(TipNotAttachedError):
"""Raised from TipHandler.pick_up_tip().
This is like TipNotAttachedError except that it carries some extra information
about the attempted operation.
"""

tip_geometry: Final[TipGeometry]
"""The tip geometry that would have been on the pipette, had the operation succeeded."""

def __init__(self, tip_geometry: TipGeometry) -> None:
super().__init__()
self.tip_geometry = tip_geometry


class TipAttachedError(ProtocolEngineError):
"""Raised when a tip shouldn't be attached, but is."""

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# noqa: D100


from opentrons.hardware_control import HardwareControlAPI
from opentrons.protocol_engine.actions.action_handler import ActionHandler
from opentrons.protocol_engine.actions.actions import (
Action,
FailCommandAction,
ResumeFromRecoveryAction,
)
from opentrons.protocol_engine.commands.command import DefinedErrorData
from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryType
from opentrons.protocol_engine.execution.tip_handler import HardwareTipHandler
from opentrons.protocol_engine.state import update_types
from opentrons.protocol_engine.state.state import StateView


class ErrorRecoveryHardwareStateSynchronizer(ActionHandler):
"""A hack to keep the hardware API's state correct through certain error recovery flows.
BACKGROUND:
Certain parts of robot state are duplicated between `opentrons.protocol_engine` and
`opentrons.hardware_control`. Stuff like "is there a tip attached."
Normally, Protocol Engine command implementations (`opentrons.protocol_engine.commands`)
mutate hardware API state when they execute; and then when they finish executing,
the Protocol Engine state stores (`opentrons.protocol_engine.state`) update Protocol
Engine state accordingly. So both halves are accounted for. This generally works fine.
However, we need to go out of our way to support
`ProtocolEngine.resume_from_recovery(reconcile_false_positive=True)`.
It wants to apply a second set of state updates to "fix things up" with the
new knowledge that some error was a false positive. The Protocol Engine half of that
is easy for us to apply the normal way, through the state stores; but the
hardware API half of that cannot be applied the normal way, from the command
implementation, because the command in question is no longer running.
THE HACK:
This listens for the same error recovery state updates that the state stores do,
figures out what hardware API state mutations ought to go along with them,
and then does those mutations.
The problem is that hardware API state is now mutated from two different places
(sometimes the command implementations, and sometimes here), which are bound
to grow accidental differences.
TO FIX:
Make Protocol Engine's use of the hardware API less stateful. e.g. supply
tip geometry every time we call a hardware API movement method, instead of
just once when we pick up a tip. Use Protocol Engine state as the single source
of truth.
"""

def __init__(self, hardware_api: HardwareControlAPI, state_view: StateView) -> None:
self._hardware_api = hardware_api
self._state_view = state_view

def handle_action(self, action: Action) -> None:
"""Modify hardware API state in reaction to a Protocol Engine action."""
state_update = _get_state_update(action)
if state_update:
self._synchronize(state_update)

def _synchronize(self, state_update: update_types.StateUpdate) -> None:
tip_handler = HardwareTipHandler(self._state_view, self._hardware_api)

if state_update.pipette_tip_state != update_types.NO_CHANGE:
pipette_id = state_update.pipette_tip_state.pipette_id
tip_geometry = state_update.pipette_tip_state.tip_geometry
if tip_geometry is None:
tip_handler.remove_tip(pipette_id)
else:
tip_handler.cache_tip(pipette_id=pipette_id, tip=tip_geometry)


def _get_state_update(action: Action) -> update_types.StateUpdate | None:
"""Get the mutations that we need to do on the hardware API to stay in sync with an engine action.
The mutations are returned in Protocol Engine terms, as a StateUpdate.
They then need to be converted to hardware API terms.
"""
match action:
case ResumeFromRecoveryAction(state_update=state_update):
return state_update

case FailCommandAction(
error=DefinedErrorData(
state_update_if_false_positive=state_update_if_false_positive
)
):
return (
state_update_if_false_positive
if action.type == ErrorRecoveryType.ASSUME_FALSE_POSITIVE_AND_CONTINUE
else None
)

case _:
return None
Loading

0 comments on commit 24fcc0d

Please sign in to comment.