Skip to content

Commit

Permalink
updated integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sanni-t committed Feb 6, 2024
1 parent ed0a766 commit a4a4473
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 187 deletions.
174 changes: 10 additions & 164 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
from opentrons.hardware_control.modules.types import ModuleType
from opentrons.motion_planning import deck_conflict as wrapped_deck_conflict
from opentrons.motion_planning.adjacent_slots_getters import (
get_north_slot,
get_west_slot,
get_east_slot,
get_south_slot,
get_surrounding_slots,
get_surrounding_staging_slots,
)
Expand All @@ -28,6 +24,7 @@
DropTipWellLocation,
)
from opentrons.protocol_engine.errors.exceptions import LabwareNotLoadedOnModuleError
from opentrons.protocol_engine.types import StagingSlotLocation
from opentrons.types import DeckSlotName, StagingSlotName, Point
from ..._trash_bin import TrashBin
from ..._waste_chute import WasteChute
Expand Down Expand Up @@ -259,15 +256,15 @@ def _check_conflict_with_slot_item(
):
continue

slot_location: Union[DeckSlotLocation, StagingSlotLocation]
# Check z-height of items in overlapping slot
# TODO (spp): err.. this needs to handle staging slots too!
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=surrounding_slot)
)
pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id)
tip_length = pipette_tip.length if pipette_tip else 0.0
if isinstance(surrounding_slot, DeckSlotName):
slot_location = DeckSlotLocation(slotName=surrounding_slot)
else:
slot_location = StagingSlotLocation(slotName=surrounding_slot)
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(slot_location)

if slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length:
if slot_highest_z + Z_SAFETY_MARGIN > pipette_bounds_at_well_location[0].z:
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with pipette column {primary_nozzle} nozzle configuration"
Expand All @@ -280,9 +277,8 @@ def _check_conflict_with_slot_item(
engine_state.config.robot_type
)
)
for slot in surrounding_staging_slots:
_check_conflict_with_slot_item(slot)
# TODO (spp, 2024-02-05): check potential crash w/ trash bin & waste chute too!
for staging_slot in surrounding_staging_slots:
_check_conflict_with_slot_item(staging_slot)


def check_safe_for_tip_pickup_and_return(
Expand Down Expand Up @@ -339,156 +335,6 @@ def check_safe_for_tip_pickup_and_return(
)


# TODO (spp): remove this. No longer used
def _check_deck_conflict_for_96_channel(
engine_state: StateView,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: Union[WellLocation, DropTipWellLocation],
) -> None:
"""Check if there are any conflicts moving to the given labware with the configuration of 96-ch pipette."""
if not (
engine_state.pipettes.get_nozzle_layout_type(pipette_id)
== NozzleConfigurationType.COLUMN
):
# Checking deck conflicts only for column config
return

if isinstance(well_location, DropTipWellLocation):
# convert to WellLocation
well_location = engine_state.geometry.get_checked_tip_drop_location(
pipette_id=pipette_id,
labware_id=labware_id,
well_location=well_location,
partially_configured=True,
)

well_location_point = engine_state.geometry.get_well_position(
labware_id=labware_id, well_name=well_name, well_location=well_location
)
primary_nozzle = engine_state.pipettes.get_primary_nozzle(pipette_id)

if not _is_within_pipette_extents(
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point
):
raise PartialTipMovementNotAllowedError(
f"Requested motion with the {primary_nozzle} nozzle column configuration"
f" is outside of robot bounds for the 96-channel."
)

labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id)

destination_slot_num = labware_slot.as_int()
adjacent_slot_num = None
# TODO (spp, 2023-12-18): change this eventually to "column 1"/"column 12"
# via the column mappings in the pipette geometry definitions.
if primary_nozzle == "A12":
adjacent_slot_num = get_west_slot(destination_slot_num)
elif primary_nozzle == "A1":
adjacent_slot_num = get_east_slot(destination_slot_num)

def _check_conflict_with_slot_item(
adjacent_slot: DeckSlotName,
) -> None:
"""Raises error if the pipette is expected to collide with adjacent slot items."""
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=adjacent_slot)
)

pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id)
tip_length = pipette_tip.length if pipette_tip else 0.0

if slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length:
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with pipette column {primary_nozzle} nozzle configuration"
f" will result in collision with items in deck slot {adjacent_slot}."
)

if adjacent_slot_num is None:
return
_check_conflict_with_slot_item(
adjacent_slot=DeckSlotName.from_primitive(
adjacent_slot_num
).to_equivalent_for_robot_type(engine_state.config.robot_type)
)


# TODO (spp): remove this. No longer used
def _check_deck_conflict_for_8_channel(
engine_state: StateView,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: Union[WellLocation, DropTipWellLocation],
) -> None:
"""Check if there are any conflicts moving to the given labware with the configuration of 8-ch pipette."""
if not (
engine_state.pipettes.get_nozzle_layout_type(pipette_id)
== NozzleConfigurationType.SINGLE
):
# Checking deck conflicts only for single tip config
return

if isinstance(well_location, DropTipWellLocation):
# convert to WellLocation
well_location = engine_state.geometry.get_checked_tip_drop_location(
pipette_id=pipette_id,
labware_id=labware_id,
well_location=well_location,
partially_configured=True,
)

well_location_point = engine_state.geometry.get_well_position(
labware_id=labware_id, well_name=well_name, well_location=well_location
)
primary_nozzle = engine_state.pipettes.get_primary_nozzle(pipette_id)

if not _is_within_pipette_extents(
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point
):
# WARNING: (spp, 2023-11-30: this needs to be wired up to check for
# 8-channel pipette extents on both OT2 & Flex!!)
raise PartialTipMovementNotAllowedError(
f"Requested motion with single {primary_nozzle} nozzle configuration"
f" is outside of robot bounds for the 8-channel."
)

labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id)
destination_slot = labware_slot.as_int()
adjacent_slot_num = None
# TODO (spp, 2023-12-18): change this eventually to use nozzles from mappings in
# the pipette geometry definitions.
if primary_nozzle == "H1":
adjacent_slot_num = get_north_slot(destination_slot)
elif primary_nozzle == "A1":
adjacent_slot_num = get_south_slot(destination_slot)

def _check_conflict_with_slot_item(adjacent_slot: DeckSlotName) -> None:
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=adjacent_slot)
)

pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id)
tip_length = pipette_tip.length if pipette_tip else 0.0

if slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length:
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with pipette nozzle {primary_nozzle} configuration"
f" will result in collision with items in deck slot {adjacent_slot}."
)

if adjacent_slot_num is None:
return
_check_conflict_with_slot_item(
adjacent_slot=DeckSlotName.from_primitive(
adjacent_slot_num
).to_equivalent_for_robot_type(engine_state.config.robot_type)
)


def _is_within_pipette_extents(
engine_state: StateView,
pipette_id: str,
Expand Down
5 changes: 4 additions & 1 deletion api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
OnDeckLabwareLocation,
AddressableAreaLocation,
AddressableOffsetVector,
StagingSlotLocation,
)
from .config import Config
from .labware import LabwareView
Expand Down Expand Up @@ -148,7 +149,9 @@ def get_all_obstacle_highest_z(self) -> float:
highest_fixture_z,
)

def get_highest_z_in_slot(self, slot: DeckSlotLocation) -> float:
def get_highest_z_in_slot(
self, slot: Union[DeckSlotLocation, StagingSlotLocation]
) -> float:
"""Get the highest Z-point of all items stacked in the given deck slot."""
slot_item = self.get_slot_item(slot.slotName)
if isinstance(slot_item, LoadedModule):
Expand Down
1 change: 0 additions & 1 deletion api/src/opentrons/protocol_engine/state/pipettes.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,6 @@ def get_nozzle_bounds_at_specified_move_to_position(
bounding_nozzles_offsets = self.get_pipette_bounding_nozzle_offsets(pipette_id)

# TODO (spp): add a margin to these bounds
# Note: the z doesn't matter and is not being used.
pip_back_left_bound = (
primary_nozzle_position
- primary_nozzle_offset
Expand Down
16 changes: 15 additions & 1 deletion api/src/opentrons/protocol_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing_extensions import Literal, TypeGuard

from opentrons_shared_data.pipette.dev_types import PipetteNameType
from opentrons.types import MountType, DeckSlotName
from opentrons.types import MountType, DeckSlotName, StagingSlotName
from opentrons.hardware_control.types import TipStateType as HwTipStateType
from opentrons.hardware_control.modules import (
ModuleType as ModuleType,
Expand Down Expand Up @@ -56,6 +56,20 @@ class DeckSlotLocation(BaseModel):
)


class StagingSlotLocation(BaseModel):
"""The location of something placed in a single staging slot."""

slotName: StagingSlotName = Field(
...,
description=(
# This description should be kept in sync with LabwareOffsetLocation.slotName.
"A slot on the robot's staging area."
"\n\n"
"These apply only to the Flex. The OT-2 has no staging slots."
),
)


class AddressableAreaLocation(BaseModel):
"""The location of something place in an addressable area. This is a superset of deck slots."""

Expand Down
41 changes: 23 additions & 18 deletions api/tests/opentrons/protocol_api/core/engine/test_deck_conflict.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Unit tests for the deck_conflict module."""

import pytest
from typing import ContextManager, Any, NamedTuple, List
from typing import ContextManager, Any, NamedTuple, List, Tuple
from decoy import Decoy
from contextlib import nullcontext as does_not_raise
from opentrons_shared_data.labware.dev_types import LabwareUri
Expand Down Expand Up @@ -354,7 +354,7 @@ def test_maps_trash_bins(decoy: Decoy, mock_state_view: StateView) -> None:
[("OT-3 Standard", DeckType.OT3_STANDARD)],
)
@pytest.mark.parametrize(
["destination_well_point", "expected_raise"],
["nozzle_bounds", "expected_raise"],
[
(Point(x=100, y=100, z=60), does_not_raise()),
# Z-collisions
Expand Down Expand Up @@ -406,43 +406,48 @@ def test_maps_trash_bins(decoy: Decoy, mock_state_view: StateView) -> None:
def test_deck_conflict_raises_for_bad_partial_96_channel_move(
decoy: Decoy,
mock_state_view: StateView,
destination_well_point: Point,
nozzle_bounds: Tuple[Point, Point, Point, Point],
expected_raise: ContextManager[Any],
) -> None:
"""It should raise errors when moving to locations with restrictions for partial tip 96-channel movement.
Test premise:
- we are using a pipette configured for COLUMN nozzle layout with primary nozzle A12
- there's a labware of height 50mm in C1
- there are labware of height 50mm in C1, D1 & D2
- we are checking for conflicts when moving to a labware in C2.
For each test case, we are moving to a different point in the destination labware,
with the same pipette and tip (tip length is 10mm)
with the same pipette and tip
"""
decoy.when(mock_state_view.pipettes.get_channels("pipette-id")).then_return(96)
decoy.when(
mock_state_view.pipettes.get_nozzle_layout_type("pipette-id")
).then_return(NozzleConfigurationType.COLUMN)
# decoy.when(mock_state_view.pipettes.get_channels("pipette-id")).then_return(96)
# decoy.when(
# mock_state_view.pipettes.get_nozzle_layout_type("pipette-id")
# ).then_return(NozzleConfigurationType.COLUMN)
destination_well_point = Point(x=123, y=123, z=123)
decoy.when(mock_state_view.pipettes.get_primary_nozzle("pipette-id")).then_return(
"A12"
)
decoy.when(
mock_state_view.geometry.get_ancestor_slot_name("destination-labware-id")
).then_return(DeckSlotName.SLOT_C2)
decoy.when(
mock_state_view.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=DeckSlotName.SLOT_C1)
)
).then_return(50)
for slot_name in [DeckSlotName.SLOT_C1, DeckSlotName.SLOT_D1, DeckSlotName.SLOT_D2]:
decoy.when(
mock_state_view.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=slot_name)
)
).then_return(50)

decoy.when(
mock_state_view.geometry.get_well_position(
labware_id="destination-labware-id",
well_name="A2",
well_location=WellLocation(origin=WellOrigin.TOP, offset=WellOffset(z=10)),
)
).then_return(destination_well_point)
decoy.when(mock_state_view.pipettes.get_attached_tip("pipette-id")).then_return(
TipGeometry(length=10, diameter=100, volume=0)
)
decoy.when(
mock_state_view.pipettes.get_nozzle_bounds_at_specified_move_to_position(
pipette_id="pipette-id", destination_position=destination_well_point
)
).then_return(nozzle_bounds)

with expected_raise:
deck_conflict.check_safe_for_pipette_movement(
Expand Down Expand Up @@ -483,7 +488,7 @@ def test_deck_conflict_raises_for_bad_partial_96_channel_move_with_fixtures(
- there's a waste chute with in D3
- we are checking for conflicts when moving to column A12 of a labware in D2.
"""
decoy.when(mock_state_view.pipettes.get_channels("pipette-id")).then_return(96)
# decoy.when(mock_state_view.pipettes.get_channels("pipette-id")).then_return(96)
decoy.when(
mock_state_view.labware.get_display_name("destination-labware-id")
).then_return("destination-labware")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def test_deck_conflicts_for_96_ch_a1_column_configuration() -> None:
with pytest.raises(
PartialTipMovementNotAllowedError, match="collision with items in deck slot"
):
instrument.aspirate(25, badly_placed_plate.wells_by_name()["A1"])
instrument.aspirate(25, badly_placed_plate.wells_by_name()["A10"])

with pytest.raises(
PartialTipMovementNotAllowedError, match="outside of robot bounds"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Tests for pipette state accessors in the protocol_engine state store."""
from collections import OrderedDict
from dataclasses import dataclass

import pytest
from typing import cast, Dict, List, Optional, Tuple, NamedTuple
Expand Down

0 comments on commit a4a4473

Please sign in to comment.