Skip to content

Commit

Permalink
feat(api): add air gap PE command
Browse files Browse the repository at this point in the history
Up until now, we've implemented air gap with an aspirate command. We
can't do that anymore because we need to be able to know when to remove
liquid from the target well and when not to - and when you're air
gapping, you're not removing anything from the well.

Closes EXEC-792
  • Loading branch information
sfoster1 committed Oct 28, 2024
1 parent e0b9daa commit 1e5856f
Show file tree
Hide file tree
Showing 4 changed files with 503 additions and 2 deletions.
180 changes: 180 additions & 0 deletions api/src/opentrons/protocol_engine/commands/air_gap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""AirGap command request, result, and implementation models."""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Type, Union
from opentrons_shared_data.errors.exceptions import PipetteOverpressureError
from typing_extensions import Literal

from .pipetting_common import (
OverpressureError,
PipetteIdMixin,
AspirateVolumeMixin,
FlowRateMixin,
LiquidHandlingWellLocationMixin,
BaseLiquidHandlingResult,
DestinationPositionResult,
)
from .command import (
AbstractCommandImpl,
BaseCommand,
BaseCommandCreate,
DefinedErrorData,
SuccessData,
)
from ..errors.error_occurrence import ErrorOccurrence

from opentrons.hardware_control import HardwareControlAPI

from ..state.update_types import StateUpdate
from ..types import WellLocation, WellOrigin, CurrentWell, DeckPoint

if TYPE_CHECKING:
from ..execution import MovementHandler, PipettingHandler
from ..resources import ModelUtils
from ..state.state import StateView
from ..notes import CommandNoteAdder


AirGapCommandType = Literal["airGap"]


class AirGapParams(
PipetteIdMixin, AspirateVolumeMixin, FlowRateMixin, LiquidHandlingWellLocationMixin
):
"""Parameters required to aspirate from a specific well."""

pass


class AirGapResult(BaseLiquidHandlingResult, DestinationPositionResult):
"""Result data from execution of an AirGap command."""

pass


_ExecuteReturn = Union[
SuccessData[AirGapResult, None],
DefinedErrorData[OverpressureError],
]


class AirGapImplementation(AbstractCommandImpl[AirGapParams, _ExecuteReturn]):
"""AirGap command implementation."""

def __init__(
self,
pipetting: PipettingHandler,
state_view: StateView,
hardware_api: HardwareControlAPI,
movement: MovementHandler,
command_note_adder: CommandNoteAdder,
model_utils: ModelUtils,
**kwargs: object,
) -> None:
self._pipetting = pipetting
self._state_view = state_view
self._hardware_api = hardware_api
self._movement = movement
self._command_note_adder = command_note_adder
self._model_utils = model_utils

async def execute(self, params: AirGapParams) -> _ExecuteReturn:
"""Move to and aspirate from the requested well.
Raises:
TipNotAttachedError: if no tip is attached to the pipette.
"""
pipette_id = params.pipetteId
labware_id = params.labwareId
well_name = params.wellName

ready_to_aspirate = self._pipetting.get_is_ready_to_aspirate(
pipette_id=pipette_id
)

current_well = None
state_update = StateUpdate()

if not ready_to_aspirate:
await self._movement.move_to_well(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=WellLocation(origin=WellOrigin.TOP),
)

await self._pipetting.prepare_for_aspirate(pipette_id=pipette_id)

# set our current deck location to the well now that we've made
# an intermediate move for the "prepare for aspirate" step
current_well = CurrentWell(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
)

position = await self._movement.move_to_well(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
current_well=current_well,
)
deck_point = DeckPoint.construct(x=position.x, y=position.y, z=position.z)
state_update.set_pipette_location(
pipette_id=pipette_id,
new_labware_id=labware_id,
new_well_name=well_name,
new_deck_point=deck_point,
)

try:
volume_aspirated = await self._pipetting.aspirate_in_place(
pipette_id=pipette_id,
volume=params.volume,
flow_rate=params.flowRate,
command_note_adder=self._command_note_adder,
)
except PipetteOverpressureError as e:
return DefinedErrorData(
public=OverpressureError(
id=self._model_utils.generate_id(),
createdAt=self._model_utils.get_timestamp(),
wrappedErrors=[
ErrorOccurrence.from_failed(
id=self._model_utils.generate_id(),
createdAt=self._model_utils.get_timestamp(),
error=e,
)
],
errorInfo={"retryLocation": (position.x, position.y, position.z)},
),
state_update=state_update,
)
else:
return SuccessData(
public=AirGapResult(
volume=volume_aspirated,
position=deck_point,
),
private=None,
state_update=state_update,
)


class AirGap(BaseCommand[AirGapParams, AirGapResult, OverpressureError]):
"""AirGap command model."""

commandType: AirGapCommandType = "airGap"
params: AirGapParams
result: Optional[AirGapResult]

_ImplementationCls: Type[AirGapImplementation] = AirGapImplementation


class AirGapCreate(BaseCommandCreate[AirGapParams]):
"""Create aspirate command request model."""

commandType: AirGapCommandType = "airGap"
params: AirGapParams

_CommandCls: Type[AirGap] = AirGap
158 changes: 158 additions & 0 deletions api/src/opentrons/protocol_engine/commands/air_gap_in_place.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""AirGap in place command request, result, and implementation models."""

from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Type, Union
from typing_extensions import Literal

from opentrons_shared_data.errors.exceptions import PipetteOverpressureError

from opentrons.hardware_control import HardwareControlAPI

from .pipetting_common import (
PipetteIdMixin,
AspirateVolumeMixin,
FlowRateMixin,
BaseLiquidHandlingResult,
OverpressureError,
)
from .command import (
AbstractCommandImpl,
BaseCommand,
BaseCommandCreate,
SuccessData,
DefinedErrorData,
)
from ..errors.error_occurrence import ErrorOccurrence
from ..errors.exceptions import PipetteNotReadyToAspirateError
from ..state.update_types import StateUpdate
from ..types import CurrentWell

if TYPE_CHECKING:
from ..execution import PipettingHandler, GantryMover
from ..resources import ModelUtils
from ..state.state import StateView
from ..notes import CommandNoteAdder

AirGapInPlaceCommandType = Literal["airGapInPlace"]


class AirGapInPlaceParams(PipetteIdMixin, AspirateVolumeMixin, FlowRateMixin):
"""Payload required to aspirate in place."""

pass


class AirGapInPlaceResult(BaseLiquidHandlingResult):
"""Result data from the execution of a AirGapInPlace command."""

pass


_ExecuteReturn = Union[
SuccessData[AirGapInPlaceResult, None],
DefinedErrorData[OverpressureError],
]


class AirGapInPlaceImplementation(
AbstractCommandImpl[AirGapInPlaceParams, _ExecuteReturn]
):
"""AirGapInPlace command implementation."""

def __init__(
self,
pipetting: PipettingHandler,
hardware_api: HardwareControlAPI,
state_view: StateView,
command_note_adder: CommandNoteAdder,
model_utils: ModelUtils,
gantry_mover: GantryMover,
**kwargs: object,
) -> None:
self._pipetting = pipetting
self._state_view = state_view
self._hardware_api = hardware_api
self._command_note_adder = command_note_adder
self._model_utils = model_utils
self._gantry_mover = gantry_mover

async def execute(self, params: AirGapInPlaceParams) -> _ExecuteReturn:
"""AirGap without moving the pipette.
Raises:
TipNotAttachedError: if no tip is attached to the pipette.
PipetteNotReadyToAirGapError: pipette plunger is not ready.
"""
ready_to_aspirate = self._pipetting.get_is_ready_to_aspirate(
pipette_id=params.pipetteId,
)

if not ready_to_aspirate:
raise PipetteNotReadyToAspirateError(
"Pipette cannot air gap in place because of a previous blow out."
" The first aspirate following a blow-out must be from a specific well"
" so the plunger can be reset in a known safe position."
)

state_update = StateUpdate()
current_location = self._state_view.pipettes.get_current_location()

try:
current_position = await self._gantry_mover.get_position(params.pipetteId)
volume = await self._pipetting.aspirate_in_place(
pipette_id=params.pipetteId,
volume=params.volume,
flow_rate=params.flowRate,
command_note_adder=self._command_note_adder,
)
except PipetteOverpressureError as e:
return DefinedErrorData(
public=OverpressureError(
id=self._model_utils.generate_id(),
createdAt=self._model_utils.get_timestamp(),
wrappedErrors=[
ErrorOccurrence.from_failed(
id=self._model_utils.generate_id(),
createdAt=self._model_utils.get_timestamp(),
error=e,
)
],
errorInfo=(
{
"retryLocation": (
current_position.x,
current_position.y,
current_position.z,
)
}
),
),
state_update=state_update,
)
else:
return SuccessData(
public=AirGapInPlaceResult(volume=volume),
private=None,
state_update=state_update,
)


class AirGapInPlace(
BaseCommand[AirGapInPlaceParams, AirGapInPlaceResult, OverpressureError]
):
"""AirGapInPlace command model."""

commandType: AirGapInPlaceCommandType = "airGapInPlace"
params: AirGapInPlaceParams
result: Optional[AirGapInPlaceResult]

_ImplementationCls: Type[AirGapInPlaceImplementation] = AirGapInPlaceImplementation


class AirGapInPlaceCreate(BaseCommandCreate[AirGapInPlaceParams]):
"""AirGapInPlace command request model."""

commandType: AirGapInPlaceCommandType = "airGapInPlace"
params: AirGapInPlaceParams

_CommandCls: Type[AirGapInPlace] = AirGapInPlace
Loading

0 comments on commit 1e5856f

Please sign in to comment.