Skip to content

Commit

Permalink
added pipette bounds fetch and height check on overlapping slots
Browse files Browse the repository at this point in the history
  • Loading branch information
sanni-t committed Feb 5, 2024
1 parent 35a8b78 commit ff9cb42
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 17 deletions.
14 changes: 14 additions & 0 deletions api/src/opentrons/hardware_control/nozzle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,20 @@ def front_nozzle_offset(self) -> Point:
front_left = next(iter(self.columns.values()))[-1]
return self.map_store[front_left]

@property
def front_right_nozzle_offset(self) -> Point:
"""The offset for the front_right nozzle."""
# Front-right-most nozzle of the 96 channel in a given configuration
# and Front-most nozzle of the 8-channel
return self.map_store[self.front_right]

@property
def back_left_nozzle_offset(self) -> Point:
"""The offset for the back_left nozzle."""
# Back-left-most nozzle of the 96-channel in a given configuration
# and back-most nozzle of the 8-channel
return self.map_store[self.back_left]

@property
def tip_count(self) -> int:
"""The total number of active nozzles in the configuration, and thus the number of tips that will be picked up."""
Expand Down
73 changes: 72 additions & 1 deletion api/src/opentrons/motion_planning/adjacent_slots_getters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Getters for specific adjacent slots."""

from typing import Optional, List, Dict
from typing import Optional, List, Dict, Union

from opentrons.types import DeckSlotName, StagingSlotName

Expand Down Expand Up @@ -37,6 +37,56 @@ def get_west_slot(slot: int) -> Optional[int]:
return slot - 1


def get_north_west_slot(slot: int) -> Optional[int]:
"""Get the slot that's north-west of the given slot."""
if slot in [1, 4, 7, 10, 11, 12]:
return None
else:
north_slot = get_north_slot(slot)
return north_slot - 1 if north_slot else None


def get_north_east_slot(slot: int) -> Optional[int]:
"""Get the slot that's north-east of the given slot."""
if slot in [3, 6, 9, 10, 11, 12]:
return None
else:
north_slot = get_north_slot(slot)
return north_slot + 1 if north_slot else None


def get_south_west_slot(slot: int) -> Optional[int]:
"""Get the slot that's south-west of the given slot."""
if slot in [1, 2, 3, 4, 7, 10]:
return None
else:
south_slot = get_south_slot(slot)
return south_slot - 1 if south_slot else None


def get_south_east_slot(slot: int) -> Optional[int]:
"""Get the slot that's south-east of the given slot."""
if slot in [1, 2, 3, 6, 9, 12]:
return None
else:
south_slot = get_south_slot(slot)
return south_slot + 1 if south_slot else None


def get_surrounding_slots(slot: int) -> List[int]:
"""Get all the surrounding slots, i.e., adjacent slots as well as corner slots."""
corner_slots: List[Union[int, None]] = [
get_north_west_slot(slot),
get_north_east_slot(slot),
get_south_west_slot(slot),
get_south_east_slot(slot),
]

return get_adjacent_slots(slot) + [
maybe_slot for maybe_slot in corner_slots if maybe_slot is not None
]


_WEST_OF_STAGING_SLOT_MAP: Dict[StagingSlotName, DeckSlotName] = {
StagingSlotName.SLOT_A4: DeckSlotName.SLOT_A3,
StagingSlotName.SLOT_B4: DeckSlotName.SLOT_B3,
Expand All @@ -50,6 +100,22 @@ def get_west_slot(slot: int) -> Optional[int]:
}


_SURROUNDING_STAGING_SLOTS_MAP: Dict[DeckSlotName, List[StagingSlotName]] = {
DeckSlotName.SLOT_D3: [StagingSlotName.SLOT_D4, StagingSlotName.SLOT_C4],
DeckSlotName.SLOT_C3: [
StagingSlotName.SLOT_D4,
StagingSlotName.SLOT_C4,
StagingSlotName.SLOT_B4,
],
DeckSlotName.SLOT_B3: [
StagingSlotName.SLOT_C4,
StagingSlotName.SLOT_B4,
StagingSlotName.SLOT_A4,
],
DeckSlotName.SLOT_A3: [StagingSlotName.SLOT_B4, StagingSlotName.SLOT_A4],
}


def get_west_of_staging_slot(staging_slot: StagingSlotName) -> DeckSlotName:
"""Get slot west of a staging slot."""
return _WEST_OF_STAGING_SLOT_MAP[staging_slot]
Expand All @@ -60,6 +126,11 @@ def get_adjacent_staging_slot(deck_slot: DeckSlotName) -> Optional[StagingSlotNa
return _EAST_OF_FLEX_COLUMN_3_MAP.get(deck_slot)


def get_surrounding_staging_slots(deck_slot: DeckSlotName) -> List[StagingSlotName]:
"""Get the staging slots surrounding the given deck slot."""
return _SURROUNDING_STAGING_SLOTS_MAP.get(deck_slot, [])


def get_east_west_slots(slot: int) -> List[int]:
"""Get slots east & west of the given slot."""
east = get_east_slot(slot)
Expand Down
88 changes: 74 additions & 14 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
get_west_slot,
get_east_slot,
get_south_slot,
get_surrounding_slots,
get_surrounding_staging_slots,
)
from opentrons.protocol_engine import (
StateView,
Expand Down Expand Up @@ -205,26 +207,82 @@ def check_safe_for_pipette_movement(
well_name: Name of the well to move to
well_location: exact location within the well to move to
"""
# TODO: either hide unsupported configurations behind an advance setting
# or log a warning that deck conflicts cannot be checked for tip config other than
# column config with A12 primary nozzle for the 96 channel
# or single tip config for 8-channel.
if engine_state.pipettes.get_channels(pipette_id) == 96:
_check_deck_conflict_for_96_channel(
engine_state=engine_state,
if not engine_state.pipettes.get_is_partially_configured(pipette_id):
return

if isinstance(well_location, DropTipWellLocation):
# convert to WellLocation
well_location = engine_state.geometry.get_checked_tip_drop_location(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
partially_configured=True,
)
well_location_point = engine_state.geometry.get_well_position(
labware_id=labware_id, well_name=well_name, well_location=well_location
)
primary_nozzle = engine_state.pipettes.get_primary_nozzle(pipette_id)

if not _is_within_pipette_extents(
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point
):
raise PartialTipMovementNotAllowedError(
f"Requested motion with the {primary_nozzle} nozzle column configuration"
f" is outside of robot bounds for the pipette."
)
elif engine_state.pipettes.get_channels(pipette_id) == 8:
_check_deck_conflict_for_8_channel(
engine_state=engine_state,

labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id)
pipette_bounds_at_well_location = (
engine_state.pipettes.get_nozzle_bounds_at_specified_move_to_position(
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
destination_position=well_location_point,
)
)
surrounding_regular_slots = get_surrounding_slots(labware_slot.as_int())
surrounding_staging_slots = get_surrounding_staging_slots(labware_slot)

def _check_conflict_with_slot_item(
surrounding_slot: Union[DeckSlotName, StagingSlotName],
) -> None:
"""Raises error if the pipette is expected to collide with adjacent slot items."""
# Check if slot overlaps with pipette position
slot_pos = engine_state.addressable_areas.get_addressable_area_position(
surrounding_slot.id
)
slot_bounds = engine_state.addressable_areas.get_addressable_area_bounding_box(
surrounding_slot.id
)
for bound_vertex in pipette_bounds_at_well_location:
if not (
slot_pos.x < bound_vertex.x < slot_pos.x + slot_bounds.x
and slot_pos.y < bound_vertex.y < slot_pos.y + slot_bounds.y
):
continue

# Check z-height of items in overlapping slot
# TODO (spp): err.. this needs to handle staging slots too!
slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=surrounding_slot)
)
pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id)
tip_length = pipette_tip.length if pipette_tip else 0.0

if slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length:
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_display_name(labware_id)} in slot"
f" {labware_slot} with pipette column {primary_nozzle} nozzle configuration"
f" will result in collision with items in deck slot {surrounding_slot}."
)

for slot in surrounding_regular_slots:
_check_conflict_with_slot_item(
DeckSlotName.from_primitive(slot).to_equivalent_for_robot_type(
engine_state.config.robot_type
)
)
for slot in surrounding_staging_slots:
_check_conflict_with_slot_item(slot)
# TODO (spp, 2024-02-05): check potential crash w/ trash bin & waste chute too!


def check_safe_for_tip_pickup_and_return(
Expand Down Expand Up @@ -281,6 +339,7 @@ def check_safe_for_tip_pickup_and_return(
)


# TODO (spp): remove this. No longer used
def _check_deck_conflict_for_96_channel(
engine_state: StateView,
pipette_id: str,
Expand Down Expand Up @@ -356,6 +415,7 @@ def _check_conflict_with_slot_item(
)


# TODO (spp): remove this. No longer used
def _check_deck_conflict_for_8_channel(
engine_state: StateView,
pipette_id: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)

from ..types import FlowRates
from ...types import Point


@dataclass(frozen=True)
Expand All @@ -36,6 +37,8 @@ class LoadedStaticPipetteData:
float, pipette_definition.SupportedTipsDefinition
]
nominal_tip_overlap: Dict[str, float]
back_left_nozzle_offset: Point
front_right_nozzle_offset: Point


class VirtualPipetteDataProvider:
Expand Down Expand Up @@ -147,6 +150,7 @@ def _get_virtual_pipette_static_config_by_model(
tip_type
]

nozzle_manager = NozzleConfigurationManager.build_from_config(config)
return LoadedStaticPipetteData(
model=str(pipette_model),
display_name=config.display_name,
Expand All @@ -169,6 +173,8 @@ def _get_virtual_pipette_static_config_by_model(
nominal_tip_overlap=config.liquid_properties[
liquid_class
].tip_overlap_dictionary,
back_left_nozzle_offset=nozzle_manager.current_configuration.back_left_nozzle_offset,
front_right_nozzle_offset=nozzle_manager.current_configuration.front_right_nozzle_offset,
)

def get_virtual_pipette_static_config(
Expand Down Expand Up @@ -202,4 +208,11 @@ def get_pipette_static_config(pipette_dict: PipetteDict) -> LoadedStaticPipetteD
# https://opentrons.atlassian.net/browse/RCORE-655
home_position=0,
nozzle_offset_z=0,
# TODO (spp): Confirm that the nozzle map is populated by the hardware api by default
back_left_nozzle_offset=pipette_dict[
"current_nozzle_map"
].back_left_nozzle_offset,
front_right_nozzle_offset=pipette_dict[
"current_nozzle_map"
].front_right_nozzle_offset,
)
15 changes: 15 additions & 0 deletions api/src/opentrons/protocol_engine/state/addressable_areas.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
AddressableArea,
PotentialCutoutFixture,
DeckConfigurationType,
Dimensions,
)
from ..actions import Action, UpdateCommandAction, PlayAction, AddAddressableAreaAction
from .config import Config
Expand Down Expand Up @@ -438,6 +439,20 @@ def get_addressable_area_position(self, addressable_area_name: str) -> Point:
position = addressable_area.position
return Point(x=position.x, y=position.y, z=position.z)

def get_addressable_area_bounding_box(
self, addressable_area_name: str
) -> Dimensions:
"""Get the bounding box of an addressable area.
This does not require the addressable area to be in the deck configuration.
For movement purposes, this should only be called for
areas that have been pre-validated, otherwise there could be the risk of collision.
"""
addressable_area = self._get_addressable_area_from_deck_data(
addressable_area_name
)
return addressable_area.bounding_box

def get_addressable_area_move_to_location(
self, addressable_area_name: str
) -> Point:
Expand Down
1 change: 0 additions & 1 deletion api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from opentrons.types import Point, DeckSlotName, StagingSlotName, MountType
from opentrons_shared_data.labware.constants import WELL_NAME_PATTERN


from .. import errors
from ..errors import (
LabwareNotLoadedOnLabwareError,
Expand Down
Loading

0 comments on commit ff9cb42

Please sign in to comment.