Skip to content

Commit

Permalink
feat(engine): move pipettes away if blocking heater-shaker open latch…
Browse files Browse the repository at this point in the history
… or start shake (#11248)
  • Loading branch information
jbleon95 authored Jul 29, 2022
1 parent 41a5a39 commit e77add5
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

from pydantic import BaseModel, Field

from opentrons.protocol_engine.types import MotorAxis

from ..command import AbstractCommandImpl, BaseCommand, BaseCommandCreate

if TYPE_CHECKING:
from opentrons.protocol_engine.state import StateView
from opentrons.protocol_engine.execution import EquipmentHandler
from opentrons.protocol_engine.execution import EquipmentHandler, MovementHandler

OpenLabwareLatchCommandType = Literal["heaterShaker/openLabwareLatch"]

Expand All @@ -33,10 +35,12 @@ def __init__(
self,
state_view: StateView,
equipment: EquipmentHandler,
movement: MovementHandler,
**unused_dependencies: object,
) -> None:
self._state_view = state_view
self._equipment = equipment
self._movement = movement

async def execute(self, params: OpenLabwareLatchParams) -> OpenLabwareLatchResult:
"""Open a Heater-Shaker's labware latch."""
Expand All @@ -47,6 +51,18 @@ async def execute(self, params: OpenLabwareLatchParams) -> OpenLabwareLatchResul

hs_module_substate.raise_if_shaking()

# Move pipette away if it is close to the heater-shaker
if self._state_view.motion.check_pipette_blocking_hs_latch(
hs_module_substate.module_id
):
# TODO(jbl 2022-07-28) replace home movement with a retract movement
await self._movement.home(
[
MotorAxis.RIGHT_Z,
MotorAxis.LEFT_Z,
]
)

# Allow propagation of ModuleNotAttachedError.
hs_hardware_module = self._equipment.get_module_hardware_api(
hs_module_substate.module_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

from pydantic import BaseModel, Field

from opentrons.protocol_engine.types import MotorAxis

from ..command import AbstractCommandImpl, BaseCommand, BaseCommandCreate

if TYPE_CHECKING:
from opentrons.protocol_engine.state import StateView
from opentrons.protocol_engine.execution import EquipmentHandler
from opentrons.protocol_engine.execution import EquipmentHandler, MovementHandler

SetAndWaitForShakeSpeedCommandType = Literal["heaterShaker/setAndWaitForShakeSpeed"]

Expand All @@ -36,10 +38,12 @@ def __init__(
self,
state_view: StateView,
equipment: EquipmentHandler,
movement: MovementHandler,
**unused_dependencies: object,
) -> None:
self._state_view = state_view
self._equipment = equipment
self._movement = movement

async def execute(
self,
Expand All @@ -56,6 +60,18 @@ async def execute(
# Verify speed from hs module view
validated_speed = hs_module_substate.validate_target_speed(params.rpm)

# Move pipette away if it is close to the heater-shaker
if self._state_view.motion.check_pipette_blocking_hs_shaker(
hs_module_substate.module_id
):
# TODO(jbl 2022-07-28) replace home movement with a retract movement
await self._movement.home(
[
MotorAxis.RIGHT_Z,
MotorAxis.LEFT_Z,
]
)

# Allow propagation of ModuleNotAttachedError.
hs_hardware_module = self._equipment.get_module_hardware_api(
hs_module_substate.module_id
Expand Down
35 changes: 35 additions & 0 deletions api/src/opentrons/protocol_engine/state/motion.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

from opentrons.types import MountType, Point, DeckSlotName
from opentrons.hardware_control.types import CriticalPoint
from opentrons.motion_planning.adjacent_slots_getters import (
get_east_west_slots,
get_adjacent_slots,
)
from opentrons import motion_planning

from .. import errors
Expand All @@ -12,6 +16,7 @@
from .pipettes import PipetteView, CurrentWell
from .geometry import GeometryView
from .modules import ModuleView
from .module_substates import HeaterShakerModuleId


@dataclass(frozen=True)
Expand Down Expand Up @@ -175,3 +180,33 @@ def get_movement_waypoints_to_coords(
)
except motion_planning.MotionPlanningError as error:
raise errors.FailedToPlanMoveError(str(error))

def check_pipette_blocking_hs_latch(
self, hs_module_id: HeaterShakerModuleId
) -> bool:
"""Check if pipette would block h/s latch from opening if it is east, west or on module."""
pipette_blocking = True
current_well = self._pipettes.get_current_well()
if current_well is not None:
pipette_deck_slot = int(
self._geometry.get_ancestor_slot_name(current_well.labware_id)
)
hs_deck_slot = int(self._module.get_location(hs_module_id).slotName)
conflicting_slots = get_east_west_slots(hs_deck_slot) + [hs_deck_slot]
pipette_blocking = pipette_deck_slot in conflicting_slots
return pipette_blocking

def check_pipette_blocking_hs_shaker(
self, hs_module_id: HeaterShakerModuleId
) -> bool:
"""Check if pipette would block h/s latch from starting shake if it is adjacent or on module."""
pipette_blocking = True
current_well = self._pipettes.get_current_well()
if current_well is not None:
pipette_deck_slot = int(
self._geometry.get_ancestor_slot_name(current_well.labware_id)
)
hs_deck_slot = int(self._module.get_location(hs_module_id).slotName)
conflicting_slots = get_adjacent_slots(hs_deck_slot) + [hs_deck_slot]
pipette_blocking = pipette_deck_slot in conflicting_slots
return pipette_blocking
13 changes: 12 additions & 1 deletion api/src/opentrons/protocol_engine/state/pipettes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
DropTipResult,
HomeResult,
BlowOutResult,
TouchTipResult,
thermocycler,
)
from ..actions import Action, UpdateCommandAction
from .abstract_store import HasState, HandlesActions
Expand Down Expand Up @@ -81,6 +83,7 @@ def _handle_command(self, command: Command) -> None:
AspirateResult,
DispenseResult,
BlowOutResult,
TouchTipResult,
),
):
self._state.current_well = CurrentWell(
Expand All @@ -90,7 +93,15 @@ def _handle_command(self, command: Command) -> None:
)

# TODO(mc, 2021-11-12): wipe out current_well on movement failures, too
elif isinstance(command.result, (HomeResult, MoveToCoordinatesResult)):
elif isinstance(
command.result,
(
HomeResult,
MoveToCoordinatesResult,
thermocycler.OpenLidResult,
thermocycler.CloseLidResult,
),
):
# A command left the pipette in a place that we can't associate
# with a logical well location. Set the current well to None
# to reflect the fact that it's now unknown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@
HeaterShakerModuleSubState,
HeaterShakerModuleId,
)
from opentrons.protocol_engine.execution import EquipmentHandler
from opentrons.protocol_engine.execution import EquipmentHandler, MovementHandler
from opentrons.protocol_engine.commands import heater_shaker
from opentrons.protocol_engine.commands.heater_shaker.open_labware_latch import (
OpenLabwareLatchImpl,
)
from opentrons.protocol_engine.types import MotorAxis


async def test_open_labware_latch(
decoy: Decoy,
state_view: StateView,
equipment: EquipmentHandler,
movement: MovementHandler,
) -> None:
"""It should be able to open the module's labware latch."""
subject = OpenLabwareLatchImpl(state_view=state_view, equipment=equipment)
subject = OpenLabwareLatchImpl(
state_view=state_view, equipment=equipment, movement=movement
)
data = heater_shaker.OpenLabwareLatchParams(moduleId="input-heater-shaker-id")

hs_module_substate = decoy.mock(cls=HeaterShakerModuleSubState)
Expand All @@ -37,13 +41,21 @@ async def test_open_labware_latch(
HeaterShakerModuleId("heater-shaker-id")
)

decoy.when(
state_view.motion.check_pipette_blocking_hs_latch(
HeaterShakerModuleId("heater-shaker-id")
)
).then_return(True)

# Get stubbed hardware module
decoy.when(
equipment.get_module_hardware_api(HeaterShakerModuleId("heater-shaker-id"))
).then_return(hs_hardware)

result = await subject.execute(data)
decoy.verify(
hs_module_substate.raise_if_shaking(), await hs_hardware.open_labware_latch()
hs_module_substate.raise_if_shaking(),
await movement.home([MotorAxis.RIGHT_Z, MotorAxis.LEFT_Z]),
await hs_hardware.open_labware_latch(),
)
assert result == heater_shaker.OpenLabwareLatchResult()
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@
HeaterShakerModuleSubState,
HeaterShakerModuleId,
)
from opentrons.protocol_engine.execution import EquipmentHandler
from opentrons.protocol_engine.execution import EquipmentHandler, MovementHandler
from opentrons.protocol_engine.commands import heater_shaker
from opentrons.protocol_engine.commands.heater_shaker.set_and_wait_for_shake_speed import (
SetAndWaitForShakeSpeedImpl,
)
from opentrons.protocol_engine.types import MotorAxis


async def test_set_and_wait_for_shake_speed(
decoy: Decoy,
state_view: StateView,
equipment: EquipmentHandler,
movement: MovementHandler,
) -> None:
"""It should be able to set the module's shake speed."""
subject = SetAndWaitForShakeSpeedImpl(state_view=state_view, equipment=equipment)
subject = SetAndWaitForShakeSpeedImpl(
state_view=state_view, equipment=equipment, movement=movement
)
data = heater_shaker.SetAndWaitForShakeSpeedParams(
moduleId="input-heater-shaker-id",
rpm=1234.56,
Expand All @@ -40,6 +44,12 @@ async def test_set_and_wait_for_shake_speed(
HeaterShakerModuleId("heater-shaker-id")
)

decoy.when(
state_view.motion.check_pipette_blocking_hs_shaker(
HeaterShakerModuleId("heater-shaker-id")
)
).then_return(True)

# Stub speed validation from hs module view
decoy.when(hs_module_substate.validate_target_speed(rpm=1234.56)).then_return(1234)

Expand All @@ -51,6 +61,7 @@ async def test_set_and_wait_for_shake_speed(
result = await subject.execute(data)
decoy.verify(
hs_module_substate.raise_if_labware_latch_not_closed(),
await movement.home([MotorAxis.RIGHT_Z, MotorAxis.LEFT_Z]),
await hs_hardware.set_speed(rpm=1234),
)
assert result == heater_shaker.SetAndWaitForShakeSpeedResult()
82 changes: 82 additions & 0 deletions api/tests/opentrons/protocol_engine/state/test_motion_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
WellOffset,
PipetteName,
LoadedPipette,
DeckSlotLocation,
)
from opentrons.protocol_engine.state import PipetteLocationData
from opentrons.protocol_engine.state.labware import LabwareView
from opentrons.protocol_engine.state.pipettes import PipetteView, CurrentWell
from opentrons.protocol_engine.state.geometry import GeometryView
from opentrons.protocol_engine.state.motion import MotionView
from opentrons.protocol_engine.state.modules import ModuleView
from opentrons.protocol_engine.state.module_substates import HeaterShakerModuleId


@pytest.fixture
Expand Down Expand Up @@ -571,3 +573,83 @@ def test_get_movement_waypoints_to_coords_raises(
direct=False,
additional_min_travel_z=None,
)


@pytest.mark.parametrize(
("labware_deck_slot", "expected_result"),
[
(DeckSlotName.SLOT_4, True),
(DeckSlotName.SLOT_5, True),
(DeckSlotName.SLOT_6, True),
(DeckSlotName.SLOT_2, False),
(DeckSlotName.SLOT_8, False),
(DeckSlotName.SLOT_1, False),
],
)
def test_check_pipette_blocking_hs_latch(
decoy: Decoy,
geometry_view: GeometryView,
pipette_view: PipetteView,
mock_module_view: ModuleView,
subject: MotionView,
labware_deck_slot: DeckSlotName,
expected_result: bool,
) -> None:
"""It should return True if pipette is blocking opening the latch."""
decoy.when(pipette_view.get_current_well()).then_return(
CurrentWell(pipette_id="pipette-id", labware_id="labware-id", well_name="A1")
)

decoy.when(geometry_view.get_ancestor_slot_name("labware-id")).then_return(
labware_deck_slot
)

decoy.when(
mock_module_view.get_location(HeaterShakerModuleId("heater-shaker-id"))
).then_return(DeckSlotLocation(slotName=DeckSlotName.SLOT_5))

result = subject.check_pipette_blocking_hs_latch(
HeaterShakerModuleId("heater-shaker-id")
)

assert result == expected_result


@pytest.mark.parametrize(
("labware_deck_slot", "expected_result"),
[
(DeckSlotName.SLOT_4, True),
(DeckSlotName.SLOT_5, True),
(DeckSlotName.SLOT_6, True),
(DeckSlotName.SLOT_2, True),
(DeckSlotName.SLOT_8, True),
(DeckSlotName.SLOT_1, False),
],
)
def test_check_pipette_blocking_hs_shake(
decoy: Decoy,
geometry_view: GeometryView,
pipette_view: PipetteView,
mock_module_view: ModuleView,
subject: MotionView,
labware_deck_slot: DeckSlotName,
expected_result: bool,
) -> None:
"""It should return True if pipette is blocking the h/s from shaking."""
decoy.when(pipette_view.get_current_well()).then_return(
CurrentWell(pipette_id="pipette-id", labware_id="labware-id", well_name="A1")
)

decoy.when(geometry_view.get_ancestor_slot_name("labware-id")).then_return(
labware_deck_slot
)

decoy.when(
mock_module_view.get_location(HeaterShakerModuleId("heater-shaker-id"))
).then_return(DeckSlotLocation(slotName=DeckSlotName.SLOT_5))

result = subject.check_pipette_blocking_hs_shaker(
HeaterShakerModuleId("heater-shaker-id")
)

assert result == expected_result

0 comments on commit e77add5

Please sign in to comment.