diff --git a/api/src/opentrons/protocol_engine/commands/command.py b/api/src/opentrons/protocol_engine/commands/command.py index 1fefcbf7315..fe47c9dbbcc 100644 --- a/api/src/opentrons/protocol_engine/commands/command.py +++ b/api/src/opentrons/protocol_engine/commands/command.py @@ -6,8 +6,9 @@ import dataclasses from abc import ABC, abstractmethod from datetime import datetime -from enum import Enum +import enum from typing import ( + cast, TYPE_CHECKING, Generic, Optional, @@ -15,6 +16,11 @@ List, Type, Union, + Callable, + Awaitable, + Literal, + Final, + TypeAlias, ) from pydantic import BaseModel, Field @@ -41,7 +47,7 @@ _ErrorT_co = TypeVar("_ErrorT_co", bound=ErrorOccurrence, covariant=True) -class CommandStatus(str, Enum): +class CommandStatus(str, enum.Enum): """Command execution status.""" QUEUED = "queued" @@ -50,7 +56,7 @@ class CommandStatus(str, Enum): FAILED = "failed" -class CommandIntent(str, Enum): +class CommandIntent(str, enum.Enum): """Run intent for a given command. Props: @@ -242,6 +248,240 @@ class BaseCommand( ] +class IsErrorValue(Exception): + """Panic exception if a Maybe contains an Error.""" + + pass + + +class _NothingEnum(enum.Enum): + _NOTHING = enum.auto() + + +NOTHING: Final = _NothingEnum._NOTHING +NothingT: TypeAlias = Literal[_NothingEnum._NOTHING] + + +class _UnknownEnum(enum.Enum): + _UNKNOWN = enum.auto() + + +UNKNOWN: Final = _UnknownEnum._UNKNOWN +UnknownT: TypeAlias = Literal[_UnknownEnum._UNKNOWN] + +_ResultT_co_general = TypeVar("_ResultT_co_general", covariant=True) +_ErrorT_co_general = TypeVar("_ErrorT_co_general", covariant=True) + + +_SecondResultT_co_general = TypeVar("_SecondResultT_co_general", covariant=True) +_SecondErrorT_co_general = TypeVar("_SecondErrorT_co_general", covariant=True) + + +@dataclasses.dataclass +class Maybe(Generic[_ResultT_co_general, _ErrorT_co_general]): + """Represents an possibly completed, possibly errored result. + + By using this class's chaining methods like and_then or or_else, you can build + functions that preserve previous defined errors and augment them or transform them + and transform the results. + + Build objects of this type using from_result or from_error on fully type-qualified + aliases. For instance, + + MyFunctionReturn = Maybe[SuccessData[SomeSuccessModel], DefinedErrorData[SomeErrorKind]] + + def my_function(args...) -> MyFunctionReturn: + try: + do_thing(args...) + except SomeException as e: + return MyFunctionReturn.from_error(ErrorOccurrence.from_error(e)) + else: + return MyFunctionReturn.from_result(SuccessData(SomeSuccessModel(args...))) + + Then, in the calling function, you can react to the results and unwrap to a union: + + OuterMaybe = Maybe[SuccessData[SomeOtherModel], DefinedErrorData[SomeErrors]] + OuterReturn = Union[SuccessData[SomeOtherModel], DefinedErrorData[SomeErrors]] + + def my_calling_function(args...) -> OuterReturn: + def handle_result(result: SuccessData[SomeSuccessModel]) -> OuterMaybe: + return OuterMaybe.from_result(result=some_result_transformer(result)) + return do_thing.and_then(handle_result).unwrap() + """ + + _contents: tuple[_ResultT_co_general, NothingT] | tuple[ + NothingT, _ErrorT_co_general + ] + + _CtorErrorT = TypeVar("_CtorErrorT") + _CtorResultT = TypeVar("_CtorResultT") + + @classmethod + def from_result( + cls: Type[Maybe[_CtorResultT, _CtorErrorT]], result: _CtorResultT + ) -> Maybe[_CtorResultT, _CtorErrorT]: + """Build a Maybe from a valid result.""" + return cls(_contents=(result, NOTHING)) + + @classmethod + def from_error( + cls: Type[Maybe[_CtorResultT, _CtorErrorT]], error: _CtorErrorT + ) -> Maybe[_CtorResultT, _CtorErrorT]: + """Build a Maybe from a known error.""" + return cls(_contents=(NOTHING, error)) + + def result_or_panic(self) -> _ResultT_co_general: + """Unwrap to a result or throw if the Maybe is an error.""" + contents = self._contents + if contents[1] is NOTHING: + # https://github.com/python/mypy/issues/12364 + return cast(_ResultT_co_general, contents[0]) + else: + raise IsErrorValue() + + def unwrap(self) -> _ResultT_co_general | _ErrorT_co_general: + """Unwrap to a union, which is useful for command returns.""" + # https://github.com/python/mypy/issues/12364 + if self._contents[1] is NOTHING: + return cast(_ResultT_co_general, self._contents[0]) + else: + return self._contents[1] + + # note: casts in these methods are because of https://github.com/python/mypy/issues/11730 + def and_then( + self, + functor: Callable[ + [_ResultT_co_general], + Maybe[_SecondResultT_co_general, _SecondErrorT_co_general], + ], + ) -> Maybe[ + _SecondResultT_co_general, _ErrorT_co_general | _SecondErrorT_co_general + ]: + """Conditionally execute functor if the Maybe contains a result. + + Functor should take the result type and return a new Maybe. Since this function returns + a Maybe, it can be chained. The result type will have only the Result type of the Maybe + returned by the functor, but the error type is the union of the error type in the Maybe + returned by the functor and the error type in this Maybe, since the functor may not have + actually been called. + """ + match self._contents: + case (result, _NothingEnum._NOTHING): + return cast( + Maybe[ + _SecondResultT_co_general, + _ErrorT_co_general | _SecondErrorT_co_general, + ], + functor(cast(_ResultT_co_general, result)), + ) + case _: + return cast( + Maybe[ + _SecondResultT_co_general, + _ErrorT_co_general | _SecondErrorT_co_general, + ], + self, + ) + + def or_else( + self, + functor: Callable[ + [_ErrorT_co_general], + Maybe[_SecondResultT_co_general, _SecondErrorT_co_general], + ], + ) -> Maybe[ + _SecondResultT_co_general | _ResultT_co_general, _SecondErrorT_co_general + ]: + """Conditionally execute functor if the Maybe contains an error. + + The functor should take the error type and return a new Maybe. Since this function returns + a Maybe, it can be chained. The result type will have only the Error type of the Maybe + returned by the functor, but the result type is the union of the Result of the Maybe returned + by the functor and the Result of this Maybe, since the functor may not have been called. + """ + match self._contents: + case (_NothingEnum._NOTHING, error): + return cast( + Maybe[ + _ResultT_co_general | _SecondResultT_co_general, + _SecondErrorT_co_general, + ], + functor(cast(_ErrorT_co_general, error)), + ) + case _: + return cast( + Maybe[ + _ResultT_co_general | _SecondResultT_co_general, + _SecondErrorT_co_general, + ], + self, + ) + + async def and_then_async( + self, + functor: Callable[ + [_ResultT_co_general], + Awaitable[Maybe[_SecondResultT_co_general, _SecondErrorT_co_general]], + ], + ) -> Awaitable[ + Maybe[_SecondResultT_co_general, _ErrorT_co_general | _SecondErrorT_co_general] + ]: + """As and_then, but for an async functor.""" + match self._contents: + case (result, _NothingEnum._NOTHING): + return cast( + Awaitable[ + Maybe[ + _SecondResultT_co_general, + _ErrorT_co_general | _SecondErrorT_co_general, + ] + ], + await functor(cast(_ResultT_co_general, result)), + ) + case _: + return cast( + Awaitable[ + Maybe[ + _SecondResultT_co_general, + _ErrorT_co_general | _SecondErrorT_co_general, + ] + ], + self, + ) + + async def or_else_async( + self, + functor: Callable[ + [_ErrorT_co_general], + Awaitable[Maybe[_SecondResultT_co_general, _SecondErrorT_co_general]], + ], + ) -> Awaitable[ + Maybe[_SecondResultT_co_general | _ResultT_co_general, _SecondErrorT_co_general] + ]: + """As or_else, but for an async functor.""" + match self._contents: + case (_NothingEnum._NOTHING, error): + return cast( + Awaitable[ + Maybe[ + _ResultT_co_general | _SecondResultT_co_general, + _SecondErrorT_co_general, + ] + ], + await functor(cast(_ErrorT_co_general, error)), + ) + case _: + return cast( + Awaitable[ + Maybe[ + _ResultT_co_general | _SecondResultT_co_general, + _SecondErrorT_co_general, + ] + ], + self, + ) + + _ExecuteReturnT_co = TypeVar( "_ExecuteReturnT_co", bound=Union[ diff --git a/api/src/opentrons/protocol_engine/commands/pipetting_common.py b/api/src/opentrons/protocol_engine/commands/pipetting_common.py index 2dafb4c81b2..6e0064211fa 100644 --- a/api/src/opentrons/protocol_engine/commands/pipetting_common.py +++ b/api/src/opentrons/protocol_engine/commands/pipetting_common.py @@ -1,12 +1,20 @@ """Common pipetting command base models.""" +from __future__ import annotations from opentrons_shared_data.errors import ErrorCodes from pydantic import BaseModel, Field -from typing import Literal, Optional, Tuple, TypedDict +from typing import Literal, Optional, Tuple, TypedDict, TYPE_CHECKING from opentrons.protocol_engine.errors.error_occurrence import ErrorOccurrence +from opentrons_shared_data.errors.exceptions import PipetteOverpressureError +from .command import Maybe, DefinedErrorData, SuccessData +from opentrons.protocol_engine.state.update_types import StateUpdate from ..types import WellLocation, LiquidHandlingWellLocation, DeckPoint +if TYPE_CHECKING: + from ..execution.pipetting import PipettingHandler + from ..resources import ModelUtils + class PipetteIdMixin(BaseModel): """Mixin for command requests that take a pipette ID.""" @@ -201,3 +209,44 @@ class TipPhysicallyAttachedError(ErrorOccurrence): errorCode: str = ErrorCodes.TIP_DROP_FAILED.value.code detail: str = ErrorCodes.TIP_DROP_FAILED.value.detail + + +PrepareForAspirateReturn = Maybe[ + SuccessData[BaseModel], DefinedErrorData[OverpressureError] +] + + +async def prepare_for_aspirate( + pipette_id: str, + pipetting: PipettingHandler, + model_utils: ModelUtils, + location_if_error: ErrorLocationInfo, +) -> PrepareForAspirateReturn: + """Execute pipetting.prepare_for_aspirate, handle errors, and marshal success.""" + state_update = StateUpdate() + try: + await pipetting.prepare_for_aspirate(pipette_id) + except PipetteOverpressureError as e: + state_update.set_fluid_unknown(pipette_id=pipette_id) + return PrepareForAspirateReturn.from_error( + DefinedErrorData( + public=OverpressureError( + id=model_utils.generate_id(), + createdAt=model_utils.get_timestamp(), + wrappedErrors=[ + ErrorOccurrence.from_failed( + id=model_utils.generate_id(), + createdAt=model_utils.get_timestamp(), + error=e, + ) + ], + errorInfo=location_if_error, + ), + state_update=state_update, + ) + ) + else: + state_update.set_fluid_empty(pipette_id=pipette_id) + return PrepareForAspirateReturn.from_result( + SuccessData(public=BaseModel(), state_update=state_update) + ) diff --git a/api/src/opentrons/protocol_engine/commands/prepare_to_aspirate.py b/api/src/opentrons/protocol_engine/commands/prepare_to_aspirate.py index f5525b3c90e..38f3a60516a 100644 --- a/api/src/opentrons/protocol_engine/commands/prepare_to_aspirate.py +++ b/api/src/opentrons/protocol_engine/commands/prepare_to_aspirate.py @@ -1,24 +1,20 @@ """Prepare to aspirate command request, result, and implementation models.""" from __future__ import annotations -from opentrons_shared_data.errors.exceptions import PipetteOverpressureError from pydantic import BaseModel from typing import TYPE_CHECKING, Optional, Type, Union from typing_extensions import Literal -from .pipetting_common import ( - OverpressureError, - PipetteIdMixin, -) +from .pipetting_common import OverpressureError, PipetteIdMixin, prepare_for_aspirate from .command import ( AbstractCommandImpl, BaseCommand, BaseCommandCreate, DefinedErrorData, SuccessData, + Maybe, ) from ..errors.error_occurrence import ErrorOccurrence -from ..state import update_types if TYPE_CHECKING: from ..execution import PipettingHandler, GantryMover @@ -46,6 +42,11 @@ class PrepareToAspirateResult(BaseModel): ] +_ExecuteMaybe = Maybe[ + SuccessData[PrepareToAspirateResult], DefinedErrorData[OverpressureError] +] + + class PrepareToAspirateImplementation( AbstractCommandImpl[PrepareToAspirateParams, _ExecuteReturn] ): @@ -62,44 +63,29 @@ def __init__( self._model_utils = model_utils self._gantry_mover = gantry_mover + def _transform_result(self, result: SuccessData[BaseModel]) -> _ExecuteMaybe: + return _ExecuteMaybe.from_result( + SuccessData( + public=PrepareToAspirateResult(), state_update=result.state_update + ) + ) + async def execute(self, params: PrepareToAspirateParams) -> _ExecuteReturn: """Prepare the pipette to aspirate.""" current_position = await self._gantry_mover.get_position(params.pipetteId) - state_update = update_types.StateUpdate() - try: - await self._pipetting_handler.prepare_for_aspirate( - pipette_id=params.pipetteId, - ) - except PipetteOverpressureError as e: - state_update.set_fluid_unknown(pipette_id=params.pipetteId) - return DefinedErrorData( - public=OverpressureError( - id=self._model_utils.generate_id(), - createdAt=self._model_utils.get_timestamp(), - wrappedErrors=[ - ErrorOccurrence.from_failed( - id=self._model_utils.generate_id(), - createdAt=self._model_utils.get_timestamp(), - error=e, - ) - ], - errorInfo=( - { - "retryLocation": ( - current_position.x, - current_position.y, - current_position.z, - ) - } - ), - ), - state_update=state_update, - ) - else: - state_update.set_fluid_empty(pipette_id=params.pipetteId) - return SuccessData( - public=PrepareToAspirateResult(), state_update=state_update - ) + prepare_result = await prepare_for_aspirate( + pipette_id=params.pipetteId, + pipetting=self._pipetting_handler, + model_utils=self._model_utils, + location_if_error={ + "retryLocation": ( + current_position.x, + current_position.y, + current_position.z, + ) + }, + ) + return prepare_result.and_then(self._transform_result).unwrap() class PrepareToAspirate( diff --git a/api/tests/opentrons/protocol_engine/commands/test_prepare_to_aspirate.py b/api/tests/opentrons/protocol_engine/commands/test_prepare_to_aspirate.py index 2de35e38332..f9eded1ffa0 100644 --- a/api/tests/opentrons/protocol_engine/commands/test_prepare_to_aspirate.py +++ b/api/tests/opentrons/protocol_engine/commands/test_prepare_to_aspirate.py @@ -34,14 +34,19 @@ def subject( async def test_prepare_to_aspirate_implementation( - decoy: Decoy, subject: PrepareToAspirateImplementation, pipetting: PipettingHandler + decoy: Decoy, + gantry_mover: GantryMover, + subject: PrepareToAspirateImplementation, + pipetting: PipettingHandler, ) -> None: """A PrepareToAspirate command should have an executing implementation.""" data = PrepareToAspirateParams(pipetteId="some id") + position = Point(x=1, y=2, z=3) decoy.when(await pipetting.prepare_for_aspirate(pipette_id="some id")).then_return( None ) + decoy.when(await gantry_mover.get_position("some id")).then_return(position) result = await subject.execute(data) assert result == SuccessData(