Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): raise an error if pipette nozzles might collide with thermocycler lid clips #14522

Merged
merged 3 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 92 additions & 36 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
overload,
Union,
TYPE_CHECKING,
List,
)

from opentrons_shared_data.errors.exceptions import MotionPlanningFailureError
from opentrons_shared_data.module import FLEX_TC_LID_CLIP_POSITIONS_IN_DECK_COORDINATES

from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType
from opentrons.hardware_control.modules.types import ModuleType
Expand All @@ -30,7 +32,10 @@
DropTipWellLocation,
)
from opentrons.protocol_engine.errors.exceptions import LabwareNotLoadedOnModuleError
from opentrons.protocol_engine.types import StagingSlotLocation, Dimensions
from opentrons.protocol_engine.types import (
StagingSlotLocation,
Dimensions,
)
from opentrons.types import DeckSlotName, StagingSlotName, Point
from ..._trash_bin import TrashBin
from ..._waste_chute import WasteChute
Expand Down Expand Up @@ -195,7 +200,8 @@ def check(
)


def check_safe_for_pipette_movement( # noqa: C901
# TODO (spp, 2023-02-16): move pipette movement safety checks to its own separate file.
def check_safe_for_pipette_movement(
engine_state: StateView,
pipette_id: str,
labware_id: str,
Expand Down Expand Up @@ -249,44 +255,94 @@ def check_safe_for_pipette_movement( # noqa: C901
slot=labware_slot.as_int(), robot_type=engine_state.config.robot_type
)

def _check_conflict_with_slot_item(
surrounding_slot: Union[DeckSlotName, StagingSlotName],
) -> None:
"""Raises error if the pipette is expected to collide with surrounding slot items."""
# Check if slot overlaps with pipette position
slot_pos = engine_state.addressable_areas.get_addressable_area_position(
addressable_area_name=surrounding_slot.id,
do_compatibility_check=False,
)
slot_bounds = engine_state.addressable_areas.get_addressable_area_bounding_box(
addressable_area_name=surrounding_slot.id,
do_compatibility_check=False,
if _will_collide_with_thermocycler_lid(
engine_state=engine_state,
pipette_bounds=pipette_bounds_at_well_location,
surrounding_regular_slots=surrounding_slots.regular_slots,
):
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with {primary_nozzle} nozzle partial configuration"
f" will result in collision with thermocycler lid in deck slot A1."
)
for bound_vertex in pipette_bounds_at_well_location:
if not _point_overlaps_with_slot(
slot_pos, slot_bounds, nozzle_point=bound_vertex
):
continue
# Check z-height of items in overlapping slot
if isinstance(surrounding_slot, DeckSlotName):
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=surrounding_slot)
)
else:
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
StagingSlotLocation(slotName=surrounding_slot)
)
if slot_highest_z + Z_SAFETY_MARGIN > pipette_bounds_at_well_location[0].z:
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with {primary_nozzle} nozzle partial configuration"
f" will result in collision with items in deck slot {surrounding_slot}."
)

for regular_slot in surrounding_slots.regular_slots:
_check_conflict_with_slot_item(regular_slot)
if _slot_has_potential_colliding_object(
engine_state=engine_state,
pipette_bounds=pipette_bounds_at_well_location,
surrounding_slot=regular_slot,
):
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with {primary_nozzle} nozzle partial configuration"
f" will result in collision with items in deck slot {regular_slot}."
)
for staging_slot in surrounding_slots.staging_slots:
_check_conflict_with_slot_item(staging_slot)
if _slot_has_potential_colliding_object(
engine_state=engine_state,
pipette_bounds=pipette_bounds_at_well_location,
surrounding_slot=staging_slot,
):
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with {primary_nozzle} nozzle partial configuration"
f" will result in collision with items in staging slot {staging_slot}."
)


def _slot_has_potential_colliding_object(
engine_state: StateView,
pipette_bounds: Tuple[Point, Point, Point, Point],
surrounding_slot: Union[DeckSlotName, StagingSlotName],
) -> bool:
"""Return the slot, if any, that has an item that the pipette might collide into."""
# Check if slot overlaps with pipette position
slot_pos = engine_state.addressable_areas.get_addressable_area_position(
addressable_area_name=surrounding_slot.id,
do_compatibility_check=False,
)
slot_bounds = engine_state.addressable_areas.get_addressable_area_bounding_box(
addressable_area_name=surrounding_slot.id,
do_compatibility_check=False,
)
for bound_vertex in pipette_bounds:
if not _point_overlaps_with_slot(
slot_pos, slot_bounds, nozzle_point=bound_vertex
):
continue
# Check z-height of items in overlapping slot
if isinstance(surrounding_slot, DeckSlotName):
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=surrounding_slot)
)
else:
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
StagingSlotLocation(slotName=surrounding_slot)
)
if slot_highest_z + Z_SAFETY_MARGIN > pipette_bounds[0].z:
return True
return False


def _will_collide_with_thermocycler_lid(
engine_state: StateView,
pipette_bounds: Tuple[Point, Point, Point, Point],
surrounding_regular_slots: List[DeckSlotName],
) -> bool:
"""Return whether the pipette might collide with thermocycler's lid on a Flex."""
if (
DeckSlotName.SLOT_A1 in surrounding_regular_slots
and engine_state.modules.is_flex_deck_with_thermocycler()
):
tc_right_clip_pos = FLEX_TC_LID_CLIP_POSITIONS_IN_DECK_COORDINATES["right_clip"]
for bound_vertex in pipette_bounds:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently we're only doing this check on the outside edge of the right clip. What happens in the following case:
If a single channel pipette were to move inside the thermocycler then manually offsets to a position in which it would collide with the inside edge (so left edge) of the right clip, would we not catch it?

Same question applies to anything that would collide with the left clip.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check looks for anything left of the right edge of the right clip so it covers your case. That said, these checks are currently only enabled for partially configured pipettes so it won't raise for single channels.

But it just made me realize that this might actually raise an unnecessary error if an 8-channel is to the left of the left clip. Not sure if it's actually possible for an 8-channel to be in that nook safely. But I can update this check regardless.

if (
bound_vertex.x <= tc_right_clip_pos["x"]
and bound_vertex.y >= tc_right_clip_pos["y"]
and bound_vertex.z <= tc_right_clip_pos["z"]
):
return True
return False


def _point_overlaps_with_slot(
Expand Down
14 changes: 14 additions & 0 deletions api/src/opentrons/protocol_engine/state/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1049,3 +1049,17 @@ def get_overflowed_module_in_slot(
return self.get(module_id)

return None

def is_flex_deck_with_thermocycler(self) -> bool:
"""Return if this is a Flex deck with a thermocycler loaded in B1-A1 slots."""
maybe_module = self.get_by_slot(
DeckSlotName.SLOT_A1
) or self.get_overflowed_module_in_slot(DeckSlotName.SLOT_A1)
if (
self._state.deck_type == DeckType.OT3_STANDARD
and maybe_module
and maybe_module.model == ModuleModel.THERMOCYCLER_MODULE_V2
):
return True
else:
return False
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def test_maps_trash_bins(decoy: Decoy, mock_state_view: StateView) -> None:
),
pytest.raises(
deck_conflict.PartialTipMovementNotAllowedError,
match="collision with items in deck slot C4",
match="collision with items in staging slot C4",
),
),
],
Expand Down Expand Up @@ -536,6 +536,73 @@ def test_deck_conflict_raises_for_bad_pipette_move(
)


@pytest.mark.parametrize(
("robot_type", "deck_type"),
[("OT-3 Standard", DeckType.OT3_STANDARD)],
)
def test_deck_conflict_raises_for_collision_with_tc_lid(
decoy: Decoy,
mock_state_view: StateView,
) -> None:
"""It should raise an error if pipette might collide with thermocycler lid on the Flex."""
destination_well_point = Point(x=123, y=123, z=123)
nozzle_bounds_at_destination = (
Point(x=50, y=150, z=60),
Point(x=150, y=50, z=60),
Point(x=97, y=403, z=204.5),
Point(x=50, y=50, z=60),
)

decoy.when(
mock_state_view.pipettes.get_is_partially_configured("pipette-id")
).then_return(True)
decoy.when(mock_state_view.pipettes.get_primary_nozzle("pipette-id")).then_return(
"A12"
)
decoy.when(
mock_state_view.geometry.get_ancestor_slot_name("destination-labware-id")
).then_return(DeckSlotName.SLOT_C2)

decoy.when(
mock_state_view.geometry.get_well_position(
labware_id="destination-labware-id",
well_name="A2",
well_location=WellLocation(origin=WellOrigin.TOP, offset=WellOffset(z=10)),
)
).then_return(destination_well_point)
decoy.when(
mock_state_view.pipettes.get_nozzle_bounds_at_specified_move_to_position(
pipette_id="pipette-id", destination_position=destination_well_point
)
).then_return(nozzle_bounds_at_destination)

decoy.when(
adjacent_slots_getters.get_surrounding_slots(5, robot_type="OT-3 Standard")
).then_return(
_MixedTypeSlots(
regular_slots=[
DeckSlotName.SLOT_A1,
DeckSlotName.SLOT_B1,
],
staging_slots=[StagingSlotName.SLOT_C4],
)
)
decoy.when(mock_state_view.modules.is_flex_deck_with_thermocycler()).then_return(
True
)
with pytest.raises(
deck_conflict.PartialTipMovementNotAllowedError,
match="collision with thermocycler lid in deck slot A1.",
):
deck_conflict.check_safe_for_pipette_movement(
engine_state=mock_state_view,
pipette_id="pipette-id",
labware_id="destination-labware-id",
well_name="A2",
well_location=WellLocation(origin=WellOrigin.TOP, offset=WellOffset(z=10)),
)


@pytest.mark.parametrize(
("robot_type", "deck_type"),
[("OT-3 Standard", DeckType.OT3_STANDARD)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def test_deck_conflicts_for_96_ch_a12_column_configuration() -> None:
)

thermocycler = protocol_context.load_module("thermocyclerModuleV2")
tc_adjacent_plate = protocol_context.load_labware(
"opentrons_96_wellplate_200ul_pcr_full_skirt", "A2"
)
accessible_plate = thermocycler.load_labware(
"opentrons_96_wellplate_200ul_pcr_full_skirt"
)
Expand Down Expand Up @@ -67,8 +70,13 @@ def test_deck_conflicts_for_96_ch_a12_column_configuration() -> None:
):
instrument.dispense(50, badly_placed_labware.wells()[0])

# Currently does not raise a 'collision with thermocycler lid' error`
# because it's the pipette outer cover that hits the lid, but we don't include
# the cover in pipette dimensions yet.
instrument.dispense(10, tc_adjacent_plate.wells_by_name()["A1"])

# No error cuz dispensing from high above plate, so it clears tuberack in west slot
instrument.dispense(25, badly_placed_labware.wells_by_name()["A1"].top(150))
instrument.dispense(15, badly_placed_labware.wells_by_name()["A1"].top(150))

thermocycler.open_lid() # type: ignore[union-attr]

Expand Down
52 changes: 52 additions & 0 deletions api/tests/opentrons/protocol_engine/state/test_module_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -1819,3 +1819,55 @@ def test_get_overflowed_module_in_slot(tempdeck_v1_def: ModuleDefinition) -> Non
location=DeckSlotLocation(slotName=DeckSlotName.SLOT_1),
serialNumber="serial-number",
)


@pytest.mark.parametrize(
argnames=["deck_type", "module_def", "module_slot", "expected_result"],
argvalues=[
(
DeckType.OT3_STANDARD,
lazy_fixture("thermocycler_v2_def"),
DeckSlotName.SLOT_A1,
True,
),
(
DeckType.OT3_STANDARD,
lazy_fixture("tempdeck_v1_def"),
DeckSlotName.SLOT_A1,
False,
),
(
DeckType.OT3_STANDARD,
lazy_fixture("thermocycler_v2_def"),
DeckSlotName.SLOT_1,
False,
),
(
DeckType.OT2_STANDARD,
lazy_fixture("thermocycler_v2_def"),
DeckSlotName.SLOT_A1,
False,
),
],
)
def test_is_flex_deck_with_thermocycler(
deck_type: DeckType,
module_def: ModuleDefinition,
module_slot: DeckSlotName,
expected_result: bool,
) -> None:
"""It should return True if there is a thermocycler on Flex."""
subject = make_module_view(
slot_by_module_id={"module-id": DeckSlotName.SLOT_B1},
hardware_by_module_id={
"module-id": HardwareModule(
serial_number="serial-number",
definition=module_def,
)
},
additional_slots_occupied_by_module_id={
"module-id": [module_slot, DeckSlotName.SLOT_C1],
},
deck_type=deck_type,
)
assert subject.is_flex_deck_with_thermocycler() == expected_result
7 changes: 7 additions & 0 deletions shared-data/python/opentrons_shared_data/module/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@

OLD_TC_GEN2_LABWARE_OFFSET = {"x": 0, "y": 68.06, "z": 98.26}

# TODO (spp, 2023-02-14): make these a part of thermocycler/ deck definitions
FLEX_TC_LID_CLIP_POSITIONS_IN_DECK_COORDINATES = {
"left_clip": {"x": -3.25, "y": 402, "z": 205},
"right_clip": {"x": 97.75, "y": 402, "z": 205},
}
SyntaxColoring marked this conversation as resolved.
Show resolved Hide resolved

# TODO (spp, 2022-05-12): Python has a built-in error called `ModuleNotFoundError` so,
# maybe rename this one?


class ModuleNotFoundError(KeyError):
def __init__(self, version: str, model_or_loadname: str):
super().__init__(model_or_loadname)
Expand Down
Loading