Skip to content

Commit

Permalink
feat(papiv2): retract pipettes before shaking or opening latch (#11268)
Browse files Browse the repository at this point in the history
Closes #11210, closes #11209

Co-authored-by: Mike Cousins <[email protected]>
  • Loading branch information
sanni-t and mcous authored Aug 2, 2022
1 parent e09f782 commit f0913d7
Show file tree
Hide file tree
Showing 11 changed files with 397 additions and 37 deletions.
46 changes: 41 additions & 5 deletions api/src/opentrons/protocol_api/module_contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,12 +925,16 @@ def set_and_wait_for_shake_speed(self, rpm: int) -> None:
Set the heater shaker's target speed and wait until the specified speed has
reached. Delays protocol execution until the target speed has been achieved.
NOTE: Before shaking, this command will retract the pipettes up if they are
parked adjacent to the heater-shaker.
"""
if (
self._module.labware_latch_status
== HeaterShakerLabwareLatchStatus.IDLE_CLOSED
):
validated_speed = validate_heater_shaker_speed(rpm=rpm)
self._prepare_for_shake()
self._module.set_speed(rpm=validated_speed)
else:
# TODO: Figure out whether to issue close latch behind the scenes instead
Expand All @@ -943,11 +947,13 @@ def set_and_wait_for_shake_speed(self, rpm: int) -> None:
def open_labware_latch(self) -> None:
"""Open the Heater-Shaker's labware latch.
Note that the labware latch needs to be closed before:
* Shaking
* Pipetting to or from the labware on the Heater-Shaker
* Pipetting to or from labware to the left or right of the Heater-Shaker
NOTE:
1. This command will retract the pipettes up if they are parked east or west
of the Heater-Shaker.
2. The labware latch needs to be closed before:
* Shaking
* Pipetting to or from the labware on the Heater-Shaker
* Pipetting to or from labware to the left or right of the Heater-Shaker
Raises an error when attempting to open the latch while the Heater-Shaker is shaking.
"""
Expand All @@ -956,6 +962,7 @@ def open_labware_latch(self) -> None:
raise CannotPerformModuleAction(
"""Cannot open labware latch while module is shaking."""
)
self._prepare_for_latch_open()
self._module.open_labware_latch()

# TODO: add API version requirement
Expand Down Expand Up @@ -1019,3 +1026,32 @@ def flag_unsafe_move(
is_plate_shaking=is_plate_shaking,
is_labware_latch_closed=is_labware_latch_closed,
)

def _prepare_for_shake(self) -> None:
"""
Before shaking, retracts pipettes if they're parked over a slot
adjacent to the heater-shaker.
"""

if cast(HeaterShakerGeometry, self.geometry).is_pipette_blocking_shake_movement(
pipette_location=self._ctx.location_cache
):
ctx_implementation = self._ctx._implementation
hardware = ctx_implementation.get_hardware()
for mount in types.Mount:
hardware.retract(mount=mount)
self._ctx.location_cache = None

def _prepare_for_latch_open(self) -> None:
"""
Before opening latch, retracts pipettes if they're parked over a slot
east/ west of the heater-shaker.
"""
if cast(HeaterShakerGeometry, self.geometry).is_pipette_blocking_latch_movement(
pipette_location=self._ctx.location_cache
):
ctx_implementation = self._ctx._implementation
hardware = ctx_implementation.get_hardware()
for mount in types.Mount:
hardware.retract(mount=mount)
self._ctx.location_cache = None
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ class OpenLabwareLatchParams(BaseModel):
class OpenLabwareLatchResult(BaseModel):
"""Result data from opening a Heater-Shaker's labware latch."""

pipetteRetracted: bool = Field(
...,
description="Whether the pipette was retracted/ homed before starting shake.",
)


class OpenLabwareLatchImpl(
AbstractCommandImpl[OpenLabwareLatchParams, OpenLabwareLatchResult]
Expand All @@ -51,10 +56,13 @@ async def execute(self, params: OpenLabwareLatchParams) -> OpenLabwareLatchResul

hs_module_substate.raise_if_shaking()

# Move pipette away if it is close to the heater-shaker
if self._state_view.motion.check_pipette_blocking_hs_latch(
hs_module_substate.module_id
):
pipette_should_retract = (
self._state_view.motion.check_pipette_blocking_hs_latch(
hs_module_substate.module_id
)
)
if pipette_should_retract:
# Move pipette away if it is close to the heater-shaker
# TODO(jbl 2022-07-28) replace home movement with a retract movement
await self._movement.home(
[
Expand All @@ -71,7 +79,7 @@ async def execute(self, params: OpenLabwareLatchParams) -> OpenLabwareLatchResul
if hs_hardware_module is not None:
await hs_hardware_module.open_labware_latch()

return OpenLabwareLatchResult()
return OpenLabwareLatchResult(pipetteRetracted=pipette_should_retract)


class OpenLabwareLatch(BaseCommand[OpenLabwareLatchParams, OpenLabwareLatchResult]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ class SetAndWaitForShakeSpeedParams(BaseModel):
class SetAndWaitForShakeSpeedResult(BaseModel):
"""Result data from setting and waiting for a Heater-Shaker's shake speed."""

pipetteRetracted: bool = Field(
...,
description="Whether the pipette was retracted/ homed before starting shake.",
)


class SetAndWaitForShakeSpeedImpl(
AbstractCommandImpl[SetAndWaitForShakeSpeedParams, SetAndWaitForShakeSpeedResult]
Expand Down Expand Up @@ -60,10 +65,13 @@ async def execute(
# Verify speed from hs module view
validated_speed = hs_module_substate.validate_target_speed(params.rpm)

# Move pipette away if it is close to the heater-shaker
if self._state_view.motion.check_pipette_blocking_hs_shaker(
hs_module_substate.module_id
):
pipette_should_retract = (
self._state_view.motion.check_pipette_blocking_hs_shaker(
hs_module_substate.module_id
)
)
if pipette_should_retract:
# Move pipette away if it is close to the heater-shaker
# TODO(jbl 2022-07-28) replace home movement with a retract movement
await self._movement.home(
[
Expand All @@ -80,7 +88,7 @@ async def execute(
if hs_hardware_module is not None:
await hs_hardware_module.set_speed(rpm=validated_speed)

return SetAndWaitForShakeSpeedResult()
return SetAndWaitForShakeSpeedResult(pipetteRetracted=pipette_should_retract)


class SetAndWaitForShakeSpeed(
Expand Down
33 changes: 31 additions & 2 deletions api/src/opentrons/protocol_engine/state/pipettes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Basic pipette data state and store."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Mapping, Optional
from typing import Dict, List, Mapping, Optional, Union

from opentrons.hardware_control.dev_types import PipetteDict
from opentrons.types import MountType, Mount as HwMount
Expand All @@ -22,6 +22,7 @@
BlowOutResult,
TouchTipResult,
thermocycler,
heater_shaker,
)
from ..actions import Action, UpdateCommandAction
from .abstract_store import HasState, HandlesActions
Expand Down Expand Up @@ -100,12 +101,14 @@ def _handle_command(self, command: Command) -> None:
MoveToCoordinatesResult,
thermocycler.OpenLidResult,
thermocycler.CloseLidResult,
heater_shaker.SetAndWaitForShakeSpeedResult,
heater_shaker.OpenLabwareLatchResult,
),
):
# A command left the pipette in a place that we can't associate
# with a logical well location. Set the current well to None
# to reflect the fact that it's now unknown.
self._state.current_well = None
self._handle_current_well_clearing_commands(command_result=command.result)

if isinstance(command.result, LoadPipetteResult):
pipette_id = command.result.pipetteId
Expand Down Expand Up @@ -146,6 +149,32 @@ def _handle_command(self, command: Command) -> None:
pipette_id = command.params.pipetteId
self._state.aspirated_volume_by_id[pipette_id] = 0

def _handle_current_well_clearing_commands(
self,
command_result: Union[
HomeResult,
MoveToCoordinatesResult,
thermocycler.OpenLidResult,
thermocycler.CloseLidResult,
heater_shaker.SetAndWaitForShakeSpeedResult,
heater_shaker.OpenLabwareLatchResult,
],
) -> None:
if (
not isinstance(
command_result,
(
heater_shaker.SetAndWaitForShakeSpeedResult,
heater_shaker.OpenLabwareLatchResult,
),
)
or command_result.pipetteRetracted
):
# Clear current_well for all above commands except h/s commands.
# For h/s commands, clear current_well only if pipettes were moved before
# command execution for safety.
self._state.current_well = None


class PipetteView(HasState[PipetteState]):
"""Read-only view of computed pipettes state."""
Expand Down
63 changes: 63 additions & 0 deletions api/src/opentrons/protocols/geometry/module_geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from opentrons.motion_planning.adjacent_slots_getters import (
get_north_south_slots,
get_east_west_slots,
get_adjacent_slots,
)

from opentrons.hardware_control.modules.types import (
Expand Down Expand Up @@ -436,6 +437,68 @@ def flag_unsafe_move(
"Cannot move multi-channel pipette to non-tip rack labware north or south of Heater-Shaker"
)

def is_pipette_blocking_shake_movement(
self, pipette_location: Optional[Location]
) -> bool:
"""Check whether pipette is parked adjacent to heater-shaker.
Returns True if pipette's last known location was on east/west/north/south of or
on the heater-shaker. Also returns True if last location is not known or is
not associated with a slot.
"""
if pipette_location is None:
# If we don't know the pipette's latest location then let's be extra
# cautious and call it blocking
return True

pipette_location_slot = pipette_location.labware.first_parent()
if pipette_location_slot is None:
# If a location is not associated w/ a slot (e.g., if it has labware=None)
# then we don't know if it's close to the h/s, so, we will be cautious
# and call it blocking
return True

heater_shaker_slot = self.parent

assert isinstance(
heater_shaker_slot, str
), "Could not determine module slot location"

return heater_shaker_slot == pipette_location_slot or int(
pipette_location_slot
) in get_adjacent_slots(int(heater_shaker_slot))

def is_pipette_blocking_latch_movement(
self, pipette_location: Optional[Location]
) -> bool:
"""Check whether pipette is parked east or west of heater-shaker.
Returns True is pipette's last known location was on east/west of or on the
heater-shaker. Also returns True if last location is not known or is not
associated with a slot.
"""
if pipette_location is None:
# If we don't know the pipette's latest location then let's be extra
# cautious and call it blocking
return True

pipette_location_slot = pipette_location.labware.first_parent()
if pipette_location_slot is None:
# If a location is not associated w/ a slot (e.g., if it has labware=None)
# then we don't know if it's close to the h/s, so, we will be cautious
# and call it blocking
return True

heater_shaker_slot = self.parent

assert isinstance(
heater_shaker_slot, str
), "Could not determine module slot location"

return heater_shaker_slot == pipette_location_slot or int(
pipette_location_slot
) in get_east_west_slots(int(heater_shaker_slot))


def _load_from_v1(
definition: "ModuleDefinitionV1", parent: Location, api_level: APIVersion
Expand Down
Loading

0 comments on commit f0913d7

Please sign in to comment.