Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Pause run after recoverable errors #14646

Merged
merged 14 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/src/opentrons/ordered_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,9 @@ def __sub__(
The elements that aren't removed retain their original relative order.
"""
return OrderedSet(e for e in self if e not in other)

def __repr__(self) -> str: # noqa: D105
# Use repr() on the keys view in case it's super long and Python is smart
# enough to abbreviate it.
elements_str = repr(self._elements.keys())
return f"OrderedSet({elements_str})"
10 changes: 3 additions & 7 deletions api/src/opentrons/protocol_engine/actions/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime
from enum import Enum
from typing import Optional, Union
from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryType

from opentrons.protocols.models import LabwareDefinition
from opentrons.hardware_control.types import DoorState
Expand Down Expand Up @@ -129,18 +130,13 @@ class UpdateCommandAction:

@dataclass(frozen=True)
class FailCommandAction:
"""Mark a given command as failed.
"""Mark a given command as failed."""

The given command and all currently queued commands will be marked
as failed due to the given error.
"""

# TODO(mc, 2021-11-12): we'll likely need to add the command params
# to this payload for state reaction purposes
command_id: str
error_id: str
failed_at: datetime
error: EnumeratedError
type: ErrorRecoveryType
SyntaxColoring marked this conversation as resolved.
Show resolved Hide resolved


@dataclass(frozen=True)
Expand Down
76 changes: 76 additions & 0 deletions api/src/opentrons/protocol_engine/error_recovery_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# noqa: D100

import enum
from typing import Protocol

from opentrons_shared_data.errors import EnumeratedError, ErrorCodes

from opentrons.config import feature_flags as ff
from opentrons.protocol_engine.commands import Command


class ErrorRecoveryType(enum.Enum):
"""Ways to handle a command failure."""

FAIL_RUN = enum.auto()
"""Permanently fail the entire run.

TODO(mm, 2024-03-18): This might be a misnomer because failing the run is not
a decision that's up to Protocol Engine. It's decided by what the caller supplies
to `ProtocolEngine.finish()`. For example, a Python protocol can
theoretically swallow the exception and continue on.
"""

WAIT_FOR_RECOVERY = enum.auto()
"""Stop and wait for the error to be recovered from manually."""

# TODO(mm, 2023-03-18): Add something like this for
# https://opentrons.atlassian.net/browse/EXEC-302.
# CONTINUE = enum.auto()
# """Continue with the run, as if the command never failed."""


class ErrorRecoveryPolicy(Protocol):
"""An interface to decide how to handle a command failure.

This describes a function that Protocol Engine calls after each command failure,
with the details of that failure. The implementation should inspect those details
and return an appropriate `ErrorRecoveryType`.
"""

@staticmethod
def __call__( # noqa: D102
failed_command: Command, exception: Exception
) -> ErrorRecoveryType:
...


def error_recovery_by_ff(
failed_command: Command, exception: Exception
) -> ErrorRecoveryType:
"""Use API feature flags to decide how to handle an error.

This is just for development. This should be replaced by a proper config
system exposed through robot-server's HTTP API.
"""
# todo(mm, 2024-03-18): Do we need to do anything explicit here to disable
# error recovery on the OT-2?
if ff.enable_error_recovery_experiments() and _is_recoverable(
failed_command, exception
):
return ErrorRecoveryType.WAIT_FOR_RECOVERY
else:
return ErrorRecoveryType.FAIL_RUN


def _is_recoverable(failed_command: Command, exception: Exception) -> bool:
if (
failed_command.commandType == "pickUpTip"
and isinstance(exception, EnumeratedError)
# Hack(?): It seems like this should be ErrorCodes.TIP_PICKUP_FAILED, but that's
# not what gets raised in practice.
and exception.code == ErrorCodes.UNEXPECTED_TIP_REMOVAL
):
return True
else:
return False
Comment on lines +66 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can foresee this function getting huge. As we add more recoverable states, will this eventually be replaced with a more standardized approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm also worried about this, and I don't know.

The opposite way to organize this would be to distribute this choice across the command implementations. So, instead of PickUpTipImplementation returning an error, and this deciding whether that error is recoverable, PickUpTipImplementation would return an error and a recoverable boolean (or something like that).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it true that any executed command, that failed and is recoverable, would throw an EnumeratedError exception?

If so, could the command implementations be required to have an is_recoverable(error_code: ErrorCodes) method?

Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are probably places where that's not true today, but we can (and might have to) aim for that to become true as part of this project.

15 changes: 15 additions & 0 deletions api/src/opentrons/protocol_engine/execution/command_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
PythonException,
)

from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryPolicy

from ..state import StateStore
from ..resources import ModelUtils
from ..commands import (
Expand Down Expand Up @@ -82,6 +84,7 @@ def __init__(
run_control: RunControlHandler,
rail_lights: RailLightsHandler,
status_bar: StatusBarHandler,
error_recovery_policy: ErrorRecoveryPolicy,
model_utils: Optional[ModelUtils] = None,
command_note_tracker_provider: Optional[CommandNoteTrackerProvider] = None,
) -> None:
Expand All @@ -102,6 +105,7 @@ def __init__(
self._command_note_tracker_provider = (
command_note_tracker_provider or _NoteTracker
)
self._error_recovery_policy = error_recovery_policy

async def execute(self, command_id: str) -> None:
"""Run a given command's execution procedure.
Expand Down Expand Up @@ -177,6 +181,17 @@ async def execute(self, command_id: str) -> None:
command_id=command_id,
error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
# todo(mm, 2024-03-13):
# When a command fails recoverably, and we handle it with
# WAIT_FOR_RECOVERY or CONTINUE, we want to update our logical
# protocol state as if the command succeeded. (e.g. if a tip
# pickup failed, pretend that it succeeded and that the tip is now
# on the pipette.) However, this currently does the opposite,
# acting as if the command never executed.
type=self._error_recovery_policy(
running_command,
error,
),
)
)
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""QueueWorker and dependency factory."""
from opentrons.hardware_control import HardwareControlAPI
from opentrons.protocol_engine.error_recovery_policy import ErrorRecoveryPolicy
from opentrons.protocol_engine.execution.rail_lights import RailLightsHandler

from ..state import StateStore
Expand All @@ -20,13 +21,15 @@ def create_queue_worker(
hardware_api: HardwareControlAPI,
state_store: StateStore,
action_dispatcher: ActionDispatcher,
error_recovery_policy: ErrorRecoveryPolicy,
) -> QueueWorker:
"""Create a ready-to-use QueueWorker instance.

Arguments:
hardware_api: Hardware control API to pass down to dependencies.
state_store: StateStore to pass down to dependencies.
action_dispatcher: ActionDispatcher to pass down to dependencies.
error_recovery_policy: ErrorRecoveryPolicy to pass down to dependencies.
"""
gantry_mover = create_gantry_mover(
hardware_api=hardware_api,
Expand Down Expand Up @@ -85,6 +88,7 @@ def create_queue_worker(
run_control=run_control_handler,
rail_lights=rail_lights_handler,
status_bar=status_bar_handler,
error_recovery_policy=error_recovery_policy,
)

return QueueWorker(
Expand Down
13 changes: 11 additions & 2 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
from logging import getLogger
from typing import Dict, Optional, Union
from opentrons.protocol_engine.actions.actions import ResumeFromRecoveryAction
from opentrons.protocol_engine.error_recovery_policy import (
ErrorRecoveryPolicy,
ErrorRecoveryType,
error_recovery_by_ff,
)

from opentrons.protocols.models import LabwareDefinition
from opentrons.hardware_control import HardwareControlAPI
Expand Down Expand Up @@ -90,6 +95,7 @@ def __init__(
hardware_stopper: Optional[HardwareStopper] = None,
door_watcher: Optional[DoorWatcher] = None,
module_data_provider: Optional[ModuleDataProvider] = None,
error_recovery_policy: ErrorRecoveryPolicy = error_recovery_by_ff,
) -> None:
"""Initialize a ProtocolEngine instance.

Expand All @@ -113,6 +119,7 @@ def __init__(
hardware_api=hardware_api,
state_store=self._state_store,
action_dispatcher=self._action_dispatcher,
error_recovery_policy=error_recovery_policy,
)
self._hardware_stopper = hardware_stopper or HardwareStopper(
hardware_api=hardware_api,
Expand Down Expand Up @@ -259,6 +266,7 @@ def estop(self, maintenance_run: bool) -> None:
error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
error=EStopActivatedError(message="Estop Activated"),
type=ErrorRecoveryType.FAIL_RUN,
)
self._action_dispatcher.dispatch(fail_action)

Expand All @@ -271,6 +279,7 @@ def estop(self, maintenance_run: bool) -> None:
error_id=self._model_utils.generate_id(),
failed_at=self._model_utils.get_timestamp(),
error=EStopActivatedError(message="Estop Activated"),
type=ErrorRecoveryType.FAIL_RUN,
)
self._action_dispatcher.dispatch(fail_action)
self._queue_worker.cancel()
Expand Down Expand Up @@ -316,12 +325,12 @@ async def stop(self) -> None:
async def wait_until_complete(self) -> None:
"""Wait until there are no more commands to execute.

Raises:
CommandExecutionFailedError: if any protocol command failed.
If a command encountered a fatal error, it's raised as an exception.
"""
await self._state_store.wait_for(
condition=self._state_store.commands.get_all_commands_final
)
self._state_store.commands.raise_fatal_command_error()

async def finish(
self,
Expand Down
Loading
Loading