Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): implement liquid probe protocol engine command #15351

Merged
merged 22 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@
PipetteOverpressureError,
FirmwareUpdateRequiredError,
FailedGripperPickupError,
LiquidNotFoundError,
PipetteLiquidNotFoundError,
CommunicationError,
PythonException,
UnsupportedHardwareCommand,
Expand Down Expand Up @@ -1412,7 +1412,7 @@ async def liquid_probe(
or positions[head_node].move_ack
== MoveCompleteAck.complete_without_condition
):
raise LiquidNotFoundError(
raise PipetteLiquidNotFoundError(
"Liquid not found during probe.",
{
str(node_to_axis(node)): str(point.motor_position)
Expand Down
23 changes: 12 additions & 11 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
GripperNotPresentError,
InvalidActuator,
FirmwareUpdateFailedError,
LiquidNotFoundError,
PipetteLiquidNotFoundError,
)

from .util import use_or_initialize_loop, check_motion_bounds
Expand Down Expand Up @@ -2594,7 +2594,8 @@ def _get_probe_distances(

async def liquid_probe(
self,
mount: OT3Mount,
mount: Union[top_types.Mount, OT3Mount],
max_z_dist: float,
probe_settings: Optional[LiquidProbeSettings] = None,
probe: Optional[InstrumentProbeType] = None,
) -> float:
Expand All @@ -2605,7 +2606,7 @@ async def liquid_probe(
reading from the pressure sensor.

If the move is completed without the specified threshold being triggered, a
LiquidNotFoundError error will be thrown.
PipetteLiquidNotFoundError error will be thrown.

Otherwise, the function will stop moving once the threshold is triggered,
and return the position of the
Expand All @@ -2622,21 +2623,21 @@ async def liquid_probe(
if not probe_settings:
probe_settings = self.config.liquid_sense

pos = await self.gantry_position(mount, refresh=True)
pos = await self.gantry_position(checked_mount, refresh=True)
probe_start_pos = pos._replace(z=probe_settings.starting_mount_height)
await self.move_to(mount, probe_start_pos)
total_z_travel = probe_settings.max_z_distance
pmoegenburg marked this conversation as resolved.
Show resolved Hide resolved
await self.move_to(checked_mount, probe_start_pos)
total_z_travel = max_z_dist
z_travels = self._get_probe_distances(
checked_mount,
total_z_travel,
probe_settings.plunger_speed,
probe_settings.mount_speed,
)
error: Optional[LiquidNotFoundError] = None
error: Optional[PipetteLiquidNotFoundError] = None
for z_travel in z_travels:

if probe_settings.aspirate_while_sensing:
await self._move_to_plunger_bottom(mount, rate=1.0)
await self._move_to_plunger_bottom(checked_mount, rate=1.0)
else:
# find the ideal travel distance by multiplying the plunger speed
# by the time it will take to complete the z move.
Expand All @@ -2656,17 +2657,17 @@ async def liquid_probe(
await self._move(target_pos, speed=speed, acquire_lock=True)
try:
height = await self._liquid_probe_pass(
mount,
checked_mount,
probe_settings,
probe if probe else InstrumentProbeType.PRIMARY,
z_travel,
)
# if we made it here without an error we found the liquid
error = None
break
except LiquidNotFoundError as lnfe:
except PipetteLiquidNotFoundError as lnfe:
error = lnfe
await self.move_to(mount, probe_start_pos)
await self.move_to(checked_mount, probe_start_pos)
if error is not None:
# if we never found an liquid raise an error
raise error
Expand Down
12 changes: 12 additions & 0 deletions api/src/opentrons/hardware_control/protocols/liquid_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,15 @@ async def drop_tip(
the ejector shroud after a drop.
"""
...

async def liquid_probe(
self,
mount: MountArgType,
max_z_dist: float,
) -> float:
"""Search for and return liquid level height using this pipette
at the current location.

mount : Mount.LEFT or Mount.RIGHT
pmoegenburg marked this conversation as resolved.
Show resolved Hide resolved
"""
...
14 changes: 14 additions & 0 deletions api/src/opentrons/protocol_engine/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,14 @@
VerifyTipPresenceCommandType,
)

from .liquid_probe import (
LiquidProbe,
LiquidProbeParams,
LiquidProbeCreate,
LiquidProbeResult,
LiquidProbeCommandType,
)

__all__ = [
# command type unions
"Command",
Expand Down Expand Up @@ -566,4 +574,10 @@
"VerifyTipPresenceParams",
"VerifyTipPresenceResult",
"VerifyTipPresenceCommandType",
# liquid probe command bundle
"LiquidProbe",
"LiquidProbeParams",
"LiquidProbeCreate",
"LiquidProbeResult",
"LiquidProbeCommandType",
]
21 changes: 20 additions & 1 deletion api/src/opentrons/protocol_engine/commands/command_unions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from opentrons.util.get_union_elements import get_union_elements

from .command import DefinedErrorData
from .pipetting_common import OverpressureError, OverpressureErrorInternalData
from .pipetting_common import (
OverpressureError,
OverpressureErrorInternalData,
LiquidNotFoundError,
LiquidNotFoundErrorInternalData,
)

from . import absorbance_reader
from . import heater_shaker
Expand Down Expand Up @@ -302,6 +307,14 @@
GetTipPresenceCommandType,
)

from .liquid_probe import (
LiquidProbe,
LiquidProbeParams,
LiquidProbeCreate,
LiquidProbeResult,
LiquidProbeCommandType,
)

Command = Annotated[
Union[
Aspirate,
Expand Down Expand Up @@ -339,6 +352,7 @@
SetStatusBar,
VerifyTipPresence,
GetTipPresence,
LiquidProbe,
heater_shaker.WaitForTemperature,
heater_shaker.SetTargetTemperature,
heater_shaker.DeactivateHeater,
Expand Down Expand Up @@ -406,6 +420,7 @@
SetStatusBarParams,
VerifyTipPresenceParams,
GetTipPresenceParams,
LiquidProbeParams,
heater_shaker.WaitForTemperatureParams,
heater_shaker.SetTargetTemperatureParams,
heater_shaker.DeactivateHeaterParams,
Expand Down Expand Up @@ -471,6 +486,7 @@
SetStatusBarCommandType,
VerifyTipPresenceCommandType,
GetTipPresenceCommandType,
LiquidProbeCommandType,
heater_shaker.WaitForTemperatureCommandType,
heater_shaker.SetTargetTemperatureCommandType,
heater_shaker.DeactivateHeaterCommandType,
Expand Down Expand Up @@ -537,6 +553,7 @@
SetStatusBarCreate,
VerifyTipPresenceCreate,
GetTipPresenceCreate,
LiquidProbeCreate,
heater_shaker.WaitForTemperatureCreate,
heater_shaker.SetTargetTemperatureCreate,
heater_shaker.DeactivateHeaterCreate,
Expand Down Expand Up @@ -604,6 +621,7 @@
SetStatusBarResult,
VerifyTipPresenceResult,
GetTipPresenceResult,
LiquidProbeResult,
heater_shaker.WaitForTemperatureResult,
heater_shaker.SetTargetTemperatureResult,
heater_shaker.DeactivateHeaterResult,
Expand Down Expand Up @@ -648,6 +666,7 @@
CommandDefinedErrorData = Union[
DefinedErrorData[TipPhysicallyMissingError, TipPhysicallyMissingErrorInternalData],
DefinedErrorData[OverpressureError, OverpressureErrorInternalData],
DefinedErrorData[LiquidNotFoundError, LiquidNotFoundErrorInternalData],
]


Expand Down
152 changes: 152 additions & 0 deletions api/src/opentrons/protocol_engine/commands/liquid_probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""Liquid-probe command for OT3 hardware. request, result, and implementation models."""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Type, Union
from opentrons_shared_data.errors.exceptions import PipetteLiquidNotFoundError
from typing_extensions import Literal

from pydantic import Field

from ..types import WellLocation, WellOrigin, CurrentWell, DeckPoint
from .pipetting_common import (
LiquidNotFoundError,
LiquidNotFoundErrorInternalData,
PipetteIdMixin,
WellLocationMixin,
DestinationPositionResult,
)
from .command import (
AbstractCommandImpl,
BaseCommand,
BaseCommandCreate,
DefinedErrorData,
SuccessData,
)
from ..errors.error_occurrence import ErrorOccurrence

if TYPE_CHECKING:
from ..execution import MovementHandler, PipettingHandler
from ..resources import ModelUtils


LiquidProbeCommandType = Literal["liquidProbe"]


class LiquidProbeParams(PipetteIdMixin, WellLocationMixin):
"""Parameters required to liquid probe a specific well."""

pass


class LiquidProbeResult(DestinationPositionResult):
"""Result data from the execution of a liquid-probe command."""

z_position: float = Field(..., description="Z position of the found liquid.")
pmoegenburg marked this conversation as resolved.
Show resolved Hide resolved


_ExecuteReturn = Union[
SuccessData[LiquidProbeResult, None],
DefinedErrorData[LiquidNotFoundError, LiquidNotFoundErrorInternalData],
]


class LiquidProbeImplementation(AbstractCommandImpl[LiquidProbeParams, _ExecuteReturn]):
"""The implementation of a `liquidProbe` command."""

def __init__(
self,
movement: MovementHandler,
pipetting: PipettingHandler,
model_utils: ModelUtils,
**kwargs: object,
) -> None:
self._movement = movement
self._pipetting = pipetting
self._model_utils = model_utils

async def execute(self, params: LiquidProbeParams) -> _ExecuteReturn:
"""Move to and liquid probe the requested well.

Return the z-position of the found liquid.

Raises:
LiquidNotFoundError: if liquid is not found during the probe process.
"""
pipette_id = params.pipetteId
labware_id = params.labwareId
well_name = params.wellName

ready_to_probe = self._pipetting.get_is_ready_to_aspirate(pipette_id=pipette_id)

current_well = None

if not ready_to_probe:
await self._movement.move_to_well(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=WellLocation(origin=WellOrigin.TOP),
)

current_well = CurrentWell(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
)

# liquid_probe process start position
position = await self._movement.move_to_well(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=params.wellLocation,
current_well=current_well,
)

try:
z_pos = await self._pipetting.liquid_probe_in_place(
pipette_id=pipette_id, labware_id=labware_id, well_name=well_name
)
except PipetteLiquidNotFoundError as e:
return DefinedErrorData(
public=LiquidNotFoundError(
id=self._model_utils.generate_id(),
createdAt=self._model_utils.get_timestamp(),
wrappedErrors=[
ErrorOccurrence.from_failed(
id=self._model_utils.generate_id(),
createdAt=self._model_utils.get_timestamp(),
error=e,
)
],
),
private=LiquidNotFoundErrorInternalData(
position=DeckPoint(x=position.x, y=position.y, z=position.z)
),
)
else:
return SuccessData(
public=LiquidProbeResult(
z_position=z_pos,
position=DeckPoint(x=position.x, y=position.y, z=position.z),
),
private=None,
)


class LiquidProbe(BaseCommand[LiquidProbeParams, LiquidProbeResult, ErrorOccurrence]):
"""LiquidProbe command model."""

commandType: LiquidProbeCommandType = "liquidProbe"
params: LiquidProbeParams
result: Optional[LiquidProbeResult]

_ImplementationCls: Type[LiquidProbeImplementation] = LiquidProbeImplementation


class LiquidProbeCreate(BaseCommandCreate[LiquidProbeParams]):
"""Create LiquidProbe command request model."""

commandType: LiquidProbeCommandType = "liquidProbe"
params: LiquidProbeParams

_CommandCls: Type[LiquidProbe] = LiquidProbe
22 changes: 22 additions & 0 deletions api/src/opentrons/protocol_engine/commands/pipetting_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,25 @@ class OverpressureErrorInternalData:

position: DeckPoint
"""Same meaning as DestinationPositionResult.position."""


class LiquidNotFoundError(ErrorOccurrence):
"""Returned when no liquid is detected during the liquid probe process/move.

After a failed probing, the pipette returns to the process start position.
"""

isDefined: bool = True

errorType: Literal["LiquidNotFound"] = "LiquidNotFound"

errorCode: str = ErrorCodes.PIPETTE_LIQUID_NOT_FOUND.value.code
detail: str = ErrorCodes.PIPETTE_LIQUID_NOT_FOUND.value.detail


@dataclass(frozen=True)
class LiquidNotFoundErrorInternalData:
"""Internal-to-ProtocolEngine data about a LiquidNotFoundError."""

position: DeckPoint
"""Same meaning as DestinationPositionResult.position."""
Loading
Loading