Skip to content

Commit

Permalink
feat(api): add monadic error passing to protocol engine (#16788)
Browse files Browse the repository at this point in the history
Let's make an intermediate layer of command execution, inside the
command domain so we don't have to move a bunch of types, that calls
execution layer functions and returns a new Maybe class that has nice
monadic chaining calls like a JS promise or some other burrito-style
implementations.

As an example, let's use it for overpressure errors in
PrepareToAspirate.

## reviews
- does this seem worth it? in your mind's eye, imagine doing this for a
new `gantry_common.move_to()` that wraps `gantry_mover.move_to()` and
returns a stall defined error.

## testing
- none, this is a refactor that passes tests with few changes

Closes EXEC-830
  • Loading branch information
sfoster1 authored Nov 14, 2024
1 parent 72178ca commit 5f9d2a3
Show file tree
Hide file tree
Showing 4 changed files with 326 additions and 46 deletions.
246 changes: 243 additions & 3 deletions api/src/opentrons/protocol_engine/commands/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@
import dataclasses
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
import enum
from typing import (
cast,
TYPE_CHECKING,
Generic,
Optional,
TypeVar,
List,
Type,
Union,
Callable,
Awaitable,
Literal,
Final,
TypeAlias,
)

from pydantic import BaseModel, Field
Expand All @@ -41,7 +47,7 @@
_ErrorT_co = TypeVar("_ErrorT_co", bound=ErrorOccurrence, covariant=True)


class CommandStatus(str, Enum):
class CommandStatus(str, enum.Enum):
"""Command execution status."""

QUEUED = "queued"
Expand All @@ -50,7 +56,7 @@ class CommandStatus(str, Enum):
FAILED = "failed"


class CommandIntent(str, Enum):
class CommandIntent(str, enum.Enum):
"""Run intent for a given command.
Props:
Expand Down Expand Up @@ -242,6 +248,240 @@ class BaseCommand(
]


class IsErrorValue(Exception):
"""Panic exception if a Maybe contains an Error."""

pass


class _NothingEnum(enum.Enum):
_NOTHING = enum.auto()


NOTHING: Final = _NothingEnum._NOTHING
NothingT: TypeAlias = Literal[_NothingEnum._NOTHING]


class _UnknownEnum(enum.Enum):
_UNKNOWN = enum.auto()


UNKNOWN: Final = _UnknownEnum._UNKNOWN
UnknownT: TypeAlias = Literal[_UnknownEnum._UNKNOWN]

_ResultT_co_general = TypeVar("_ResultT_co_general", covariant=True)
_ErrorT_co_general = TypeVar("_ErrorT_co_general", covariant=True)


_SecondResultT_co_general = TypeVar("_SecondResultT_co_general", covariant=True)
_SecondErrorT_co_general = TypeVar("_SecondErrorT_co_general", covariant=True)


@dataclasses.dataclass
class Maybe(Generic[_ResultT_co_general, _ErrorT_co_general]):
"""Represents an possibly completed, possibly errored result.
By using this class's chaining methods like and_then or or_else, you can build
functions that preserve previous defined errors and augment them or transform them
and transform the results.
Build objects of this type using from_result or from_error on fully type-qualified
aliases. For instance,
MyFunctionReturn = Maybe[SuccessData[SomeSuccessModel], DefinedErrorData[SomeErrorKind]]
def my_function(args...) -> MyFunctionReturn:
try:
do_thing(args...)
except SomeException as e:
return MyFunctionReturn.from_error(ErrorOccurrence.from_error(e))
else:
return MyFunctionReturn.from_result(SuccessData(SomeSuccessModel(args...)))
Then, in the calling function, you can react to the results and unwrap to a union:
OuterMaybe = Maybe[SuccessData[SomeOtherModel], DefinedErrorData[SomeErrors]]
OuterReturn = Union[SuccessData[SomeOtherModel], DefinedErrorData[SomeErrors]]
def my_calling_function(args...) -> OuterReturn:
def handle_result(result: SuccessData[SomeSuccessModel]) -> OuterMaybe:
return OuterMaybe.from_result(result=some_result_transformer(result))
return do_thing.and_then(handle_result).unwrap()
"""

_contents: tuple[_ResultT_co_general, NothingT] | tuple[
NothingT, _ErrorT_co_general
]

_CtorErrorT = TypeVar("_CtorErrorT")
_CtorResultT = TypeVar("_CtorResultT")

@classmethod
def from_result(
cls: Type[Maybe[_CtorResultT, _CtorErrorT]], result: _CtorResultT
) -> Maybe[_CtorResultT, _CtorErrorT]:
"""Build a Maybe from a valid result."""
return cls(_contents=(result, NOTHING))

@classmethod
def from_error(
cls: Type[Maybe[_CtorResultT, _CtorErrorT]], error: _CtorErrorT
) -> Maybe[_CtorResultT, _CtorErrorT]:
"""Build a Maybe from a known error."""
return cls(_contents=(NOTHING, error))

def result_or_panic(self) -> _ResultT_co_general:
"""Unwrap to a result or throw if the Maybe is an error."""
contents = self._contents
if contents[1] is NOTHING:
# https://github.com/python/mypy/issues/12364
return cast(_ResultT_co_general, contents[0])
else:
raise IsErrorValue()

def unwrap(self) -> _ResultT_co_general | _ErrorT_co_general:
"""Unwrap to a union, which is useful for command returns."""
# https://github.com/python/mypy/issues/12364
if self._contents[1] is NOTHING:
return cast(_ResultT_co_general, self._contents[0])
else:
return self._contents[1]

# note: casts in these methods are because of https://github.com/python/mypy/issues/11730
def and_then(
self,
functor: Callable[
[_ResultT_co_general],
Maybe[_SecondResultT_co_general, _SecondErrorT_co_general],
],
) -> Maybe[
_SecondResultT_co_general, _ErrorT_co_general | _SecondErrorT_co_general
]:
"""Conditionally execute functor if the Maybe contains a result.
Functor should take the result type and return a new Maybe. Since this function returns
a Maybe, it can be chained. The result type will have only the Result type of the Maybe
returned by the functor, but the error type is the union of the error type in the Maybe
returned by the functor and the error type in this Maybe, since the functor may not have
actually been called.
"""
match self._contents:
case (result, _NothingEnum._NOTHING):
return cast(
Maybe[
_SecondResultT_co_general,
_ErrorT_co_general | _SecondErrorT_co_general,
],
functor(cast(_ResultT_co_general, result)),
)
case _:
return cast(
Maybe[
_SecondResultT_co_general,
_ErrorT_co_general | _SecondErrorT_co_general,
],
self,
)

def or_else(
self,
functor: Callable[
[_ErrorT_co_general],
Maybe[_SecondResultT_co_general, _SecondErrorT_co_general],
],
) -> Maybe[
_SecondResultT_co_general | _ResultT_co_general, _SecondErrorT_co_general
]:
"""Conditionally execute functor if the Maybe contains an error.
The functor should take the error type and return a new Maybe. Since this function returns
a Maybe, it can be chained. The result type will have only the Error type of the Maybe
returned by the functor, but the result type is the union of the Result of the Maybe returned
by the functor and the Result of this Maybe, since the functor may not have been called.
"""
match self._contents:
case (_NothingEnum._NOTHING, error):
return cast(
Maybe[
_ResultT_co_general | _SecondResultT_co_general,
_SecondErrorT_co_general,
],
functor(cast(_ErrorT_co_general, error)),
)
case _:
return cast(
Maybe[
_ResultT_co_general | _SecondResultT_co_general,
_SecondErrorT_co_general,
],
self,
)

async def and_then_async(
self,
functor: Callable[
[_ResultT_co_general],
Awaitable[Maybe[_SecondResultT_co_general, _SecondErrorT_co_general]],
],
) -> Awaitable[
Maybe[_SecondResultT_co_general, _ErrorT_co_general | _SecondErrorT_co_general]
]:
"""As and_then, but for an async functor."""
match self._contents:
case (result, _NothingEnum._NOTHING):
return cast(
Awaitable[
Maybe[
_SecondResultT_co_general,
_ErrorT_co_general | _SecondErrorT_co_general,
]
],
await functor(cast(_ResultT_co_general, result)),
)
case _:
return cast(
Awaitable[
Maybe[
_SecondResultT_co_general,
_ErrorT_co_general | _SecondErrorT_co_general,
]
],
self,
)

async def or_else_async(
self,
functor: Callable[
[_ErrorT_co_general],
Awaitable[Maybe[_SecondResultT_co_general, _SecondErrorT_co_general]],
],
) -> Awaitable[
Maybe[_SecondResultT_co_general | _ResultT_co_general, _SecondErrorT_co_general]
]:
"""As or_else, but for an async functor."""
match self._contents:
case (_NothingEnum._NOTHING, error):
return cast(
Awaitable[
Maybe[
_ResultT_co_general | _SecondResultT_co_general,
_SecondErrorT_co_general,
]
],
await functor(cast(_ErrorT_co_general, error)),
)
case _:
return cast(
Awaitable[
Maybe[
_ResultT_co_general | _SecondResultT_co_general,
_SecondErrorT_co_general,
]
],
self,
)


_ExecuteReturnT_co = TypeVar(
"_ExecuteReturnT_co",
bound=Union[
Expand Down
51 changes: 50 additions & 1 deletion api/src/opentrons/protocol_engine/commands/pipetting_common.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
"""Common pipetting command base models."""
from __future__ import annotations
from opentrons_shared_data.errors import ErrorCodes
from pydantic import BaseModel, Field
from typing import Literal, Optional, Tuple, TypedDict
from typing import Literal, Optional, Tuple, TypedDict, TYPE_CHECKING

from opentrons.protocol_engine.errors.error_occurrence import ErrorOccurrence
from opentrons_shared_data.errors.exceptions import PipetteOverpressureError
from .command import Maybe, DefinedErrorData, SuccessData
from opentrons.protocol_engine.state.update_types import StateUpdate

from ..types import WellLocation, LiquidHandlingWellLocation, DeckPoint

if TYPE_CHECKING:
from ..execution.pipetting import PipettingHandler
from ..resources import ModelUtils


class PipetteIdMixin(BaseModel):
"""Mixin for command requests that take a pipette ID."""
Expand Down Expand Up @@ -201,3 +209,44 @@ class TipPhysicallyAttachedError(ErrorOccurrence):

errorCode: str = ErrorCodes.TIP_DROP_FAILED.value.code
detail: str = ErrorCodes.TIP_DROP_FAILED.value.detail


PrepareForAspirateReturn = Maybe[
SuccessData[BaseModel], DefinedErrorData[OverpressureError]
]


async def prepare_for_aspirate(
pipette_id: str,
pipetting: PipettingHandler,
model_utils: ModelUtils,
location_if_error: ErrorLocationInfo,
) -> PrepareForAspirateReturn:
"""Execute pipetting.prepare_for_aspirate, handle errors, and marshal success."""
state_update = StateUpdate()
try:
await pipetting.prepare_for_aspirate(pipette_id)
except PipetteOverpressureError as e:
state_update.set_fluid_unknown(pipette_id=pipette_id)
return PrepareForAspirateReturn.from_error(
DefinedErrorData(
public=OverpressureError(
id=model_utils.generate_id(),
createdAt=model_utils.get_timestamp(),
wrappedErrors=[
ErrorOccurrence.from_failed(
id=model_utils.generate_id(),
createdAt=model_utils.get_timestamp(),
error=e,
)
],
errorInfo=location_if_error,
),
state_update=state_update,
)
)
else:
state_update.set_fluid_empty(pipette_id=pipette_id)
return PrepareForAspirateReturn.from_result(
SuccessData(public=BaseModel(), state_update=state_update)
)
Loading

0 comments on commit 5f9d2a3

Please sign in to comment.