Skip to content

Commit

Permalink
feat(api): add deck conflict checks for trash bins loaded via address…
Browse files Browse the repository at this point in the history
…able areas (#14325)
  • Loading branch information
jbleon95 authored and ncdiehl11 committed Feb 1, 2024
1 parent 908198d commit 99e9949
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 17 deletions.
20 changes: 17 additions & 3 deletions api/src/opentrons/motion_planning/deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ class Labware:
is_fixed_trash: bool


@dataclass
class TrashBin:
"""A non-labware trash bin (loaded via api level 2.16 and above)."""

name_for_errors: str


@dataclass
class _Module:
name_for_errors: str
Expand Down Expand Up @@ -98,6 +105,7 @@ class OtherModule(_Module):
MagneticBlockModule,
ThermocyclerModule,
OtherModule,
TrashBin,
]


Expand Down Expand Up @@ -129,6 +137,10 @@ def is_allowed(self, item: DeckItem) -> bool:
return item.highest_z < self.max_height
elif isinstance(item, _Module):
return item.highest_z_including_labware < self.max_height
elif isinstance(item, TrashBin):
# Since this is a restriction for OT-2 only and OT-2 trashes exceeded the height limit, always return False
# TODO(jbl 2024-01-16) Include trash height and use that for check for more robustness
return False


class _NoModule(NamedTuple):
Expand Down Expand Up @@ -234,7 +246,7 @@ def _create_ot2_restrictions( # noqa: C901
)
)

if _is_fixed_trash(item):
if _is_ot2_fixed_trash(item):
# A Heater-Shaker can't safely be placed just south of the fixed trash,
# because the fixed trash blocks access to the screw that locks the
# Heater-Shaker onto the deck.
Expand Down Expand Up @@ -379,5 +391,7 @@ def _flex_slots_covered_by_thermocycler() -> Set[DeckSlotName]:
return {DeckSlotName.SLOT_B1, DeckSlotName.SLOT_A1}


def _is_fixed_trash(item: DeckItem) -> bool:
return isinstance(item, Labware) and item.is_fixed_trash
def _is_ot2_fixed_trash(item: DeckItem) -> bool:
return (isinstance(item, Labware) and item.is_fixed_trash) or isinstance(
item, TrashBin
)
10 changes: 10 additions & 0 deletions api/src/opentrons/protocol_api/_trash_bin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,13 @@ class TrashBin:
def __init__(self, location: DeckSlotName, addressable_area_name: str) -> None:
self._location = location
self._addressable_area_name = addressable_area_name

@property
def location(self) -> DeckSlotName:
"""Location of the trash bin.
:meta private:
This is intended for Opentrons internal use only and is not a guarenteed API.
"""
return self._location
49 changes: 46 additions & 3 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""A Protocol-Engine-friendly wrapper for opentrons.motion_planning.deck_conflict."""

from __future__ import annotations
import itertools
import logging
from typing import Collection, Dict, Optional, Tuple, overload, Union
from typing import Collection, Dict, Optional, Tuple, overload, Union, TYPE_CHECKING

from opentrons_shared_data.errors.exceptions import MotionPlanningFailureError

Expand All @@ -27,6 +27,11 @@
)
from opentrons.protocol_engine.errors.exceptions import LabwareNotLoadedOnModuleError
from opentrons.types import DeckSlotName, StagingSlotName, Point
from ..._trash_bin import TrashBin
from ..._waste_chute import WasteChute

if TYPE_CHECKING:
from ...labware import Labware


class PartialTipMovementNotAllowedError(MotionPlanningFailureError):
Expand Down Expand Up @@ -74,6 +79,7 @@ def check(
engine_state: StateView,
existing_labware_ids: Collection[str],
existing_module_ids: Collection[str],
existing_disposal_locations: Collection[Union[Labware, WasteChute, TrashBin]],
new_labware_id: str,
) -> None:
pass
Expand All @@ -85,22 +91,37 @@ def check(
engine_state: StateView,
existing_labware_ids: Collection[str],
existing_module_ids: Collection[str],
existing_disposal_locations: Collection[Union[Labware, WasteChute, TrashBin]],
new_module_id: str,
) -> None:
pass


@overload
def check(
*,
engine_state: StateView,
existing_labware_ids: Collection[str],
existing_module_ids: Collection[str],
existing_disposal_locations: Collection[Union[Labware, WasteChute, TrashBin]],
new_trash_bin: TrashBin,
) -> None:
pass


def check(
*,
engine_state: StateView,
existing_labware_ids: Collection[str],
existing_module_ids: Collection[str],
existing_disposal_locations: Collection[Union[Labware, WasteChute, TrashBin]],
# TODO(mm, 2023-02-23): This interface is impossible to use correctly. In order
# to have new_labware_id or new_module_id, the caller needs to have already loaded
# the new item into Protocol Engine--but then, it's too late to do deck conflict.
# checking. Find a way to do deck conflict checking before the new item is loaded.
new_labware_id: Optional[str] = None,
new_module_id: Optional[str] = None,
new_trash_bin: Optional[TrashBin] = None,
) -> None:
"""Check for conflicts between items on the deck.
Expand All @@ -125,6 +146,8 @@ def check(
new_location_and_item = _map_labware(engine_state, new_labware_id)
if new_module_id is not None:
new_location_and_item = _map_module(engine_state, new_module_id)
if new_trash_bin is not None:
new_location_and_item = _map_disposal_location(new_trash_bin)

if new_location_and_item is None:
# The new item should be excluded from deck conflict checking. Nothing to do.
Expand All @@ -142,11 +165,19 @@ def check(
)
mapped_existing_modules = (m for m in all_existing_modules if m is not None)

all_exisiting_disposal_locations = (
_map_disposal_location(disposal_location)
for disposal_location in existing_disposal_locations
)
mapped_disposal_locations = (
m for m in all_exisiting_disposal_locations if m is not None
)

existing_items: Dict[
Union[DeckSlotName, StagingSlotName], wrapped_deck_conflict.DeckItem
] = {}
for existing_location, existing_item in itertools.chain(
mapped_existing_labware, mapped_existing_modules
mapped_existing_labware, mapped_existing_modules, mapped_disposal_locations
):
assert existing_location not in existing_items
existing_items[existing_location] = existing_item
Expand Down Expand Up @@ -556,6 +587,18 @@ def _map_module(
)


def _map_disposal_location(
disposal_location: Union[Labware, WasteChute, TrashBin],
) -> Optional[Tuple[DeckSlotName, wrapped_deck_conflict.DeckItem]]:
if isinstance(disposal_location, TrashBin):
return (
disposal_location.location,
wrapped_deck_conflict.TrashBin(name_for_errors="trash bin"),
)
else:
return None


def _deck_slot_to_int(deck_slot_location: DeckSlotLocation) -> int:
return deck_slot_location.slotName.as_int()

Expand Down
37 changes: 26 additions & 11 deletions api/src/opentrons/protocol_api/core/engine/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,22 @@ def append_disposal_location(
self, disposal_location: Union[Labware, TrashBin, WasteChute]
) -> None:
"""Append a disposal location object to the core"""
if isinstance(disposal_location, TrashBin):
deck_conflict.check(
engine_state=self._engine_client.state,
new_trash_bin=disposal_location,
existing_disposal_locations=self._disposal_locations,
# TODO: We can now fetch these IDs from engine too.
# See comment in self.load_labware().
#
# Wrapping .keys() in list() is just to make Decoy verification easier.
existing_labware_ids=list(self._labware_cores_by_id.keys()),
existing_module_ids=list(self._module_cores_by_id.keys()),
)
self._disposal_locations.append(disposal_location)

def get_disposal_locations(self) -> List[Union[Labware, TrashBin, WasteChute]]:
"""Get disposal locations."""

return self._disposal_locations

def get_max_speeds(self) -> AxisMaxSpeeds:
Expand Down Expand Up @@ -212,11 +223,12 @@ def load_labware(
deck_conflict.check(
engine_state=self._engine_client.state,
new_labware_id=load_result.labwareId,
# It's important that we don't fetch these IDs from Protocol Engine, and
# use our own bookkeeping instead. If we fetched these IDs from Protocol
# Engine, it would have leaked state from Labware Position Check in the
# same HTTP run.
#
existing_disposal_locations=self._disposal_locations,
# TODO (spp, 2023-11-27): We've been using IDs from _labware_cores_by_id
# and _module_cores_by_id instead of getting the lists directly from engine
# because of the chance of engine carrying labware IDs from LPC too.
# But with https://github.com/Opentrons/opentrons/pull/13943,
# & LPC in maintenance runs, we can now rely on engine state for these IDs too.
# Wrapping .keys() in list() is just to make Decoy verification easier.
existing_labware_ids=list(self._labware_cores_by_id.keys()),
existing_module_ids=list(self._module_cores_by_id.keys()),
Expand Down Expand Up @@ -266,11 +278,10 @@ def load_adapter(
deck_conflict.check(
engine_state=self._engine_client.state,
new_labware_id=load_result.labwareId,
# TODO (spp, 2023-11-27): We've been using IDs from _labware_cores_by_id
# and _module_cores_by_id instead of getting the lists directly from engine
# because of the chance of engine carrying labware IDs from LPC too.
# But with https://github.com/Opentrons/opentrons/pull/13943,
# & LPC in maintenance runs, we can now rely on engine state for these IDs too.
existing_disposal_locations=self._disposal_locations,
# TODO: We can now fetch these IDs from engine too.
# See comment in self.load_labware().
#
# Wrapping .keys() in list() is just to make Decoy verification easier.
existing_labware_ids=list(self._labware_cores_by_id.keys()),
existing_module_ids=list(self._module_cores_by_id.keys()),
Expand Down Expand Up @@ -343,6 +354,9 @@ def move_labware(
deck_conflict.check(
engine_state=self._engine_client.state,
new_labware_id=labware_core.labware_id,
existing_disposal_locations=self._disposal_locations,
# TODO: We can now fetch these IDs from engine too.
# See comment in self.load_labware().
existing_labware_ids=[
labware_id
for labware_id in self._labware_cores_by_id
Expand Down Expand Up @@ -400,6 +414,7 @@ def load_module(
deck_conflict.check(
engine_state=self._engine_client.state,
new_module_id=result.moduleId,
existing_disposal_locations=self._disposal_locations,
# TODO: We can now fetch these IDs from engine too.
# See comment in self.load_labware().
#
Expand Down
72 changes: 72 additions & 0 deletions api/tests/opentrons/motion_planning/test_deck_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,41 @@ def test_flex_labware_when_thermocycler(
)


def test_flex_trash_bin_blocks_thermocycler() -> None:
"""It should prevent loading a thermocycler when there is a trash in A1 and vice-versa."""
thermocycler = deck_conflict.ThermocyclerModule(
name_for_errors="some_thermocycler",
highest_z_including_labware=123,
is_semi_configuration=False,
)
trash = deck_conflict.TrashBin(name_for_errors="some_trash_bin")

with pytest.raises(
deck_conflict.DeckConflictError,
match=(
"some_trash_bin in slot A1 prevents some_thermocycler from using slot B1"
),
):
deck_conflict.check(
existing_items={DeckSlotName.SLOT_A1: trash},
new_item=thermocycler,
new_location=DeckSlotName.SLOT_B1,
robot_type="OT-3 Standard",
)
with pytest.raises(
deck_conflict.DeckConflictError,
match=(
"some_thermocycler in slot B1 prevents some_trash_bin from using slot A1"
),
):
deck_conflict.check(
existing_items={DeckSlotName.SLOT_B1: thermocycler},
new_item=trash,
new_location=DeckSlotName.SLOT_A1,
robot_type="OT-3 Standard",
)


@pytest.mark.parametrize(
("heater_shaker_location", "labware_location"),
[
Expand Down Expand Up @@ -496,6 +531,43 @@ def test_no_heater_shaker_south_of_trash() -> None:
)


def test_heater_shaker_restrictions_trash_bin_addressable_area() -> None:
"""It should prevent loading a Heater-Shaker adjacent of a non-labware trash bin.
This is for the OT-2 only and for slot 11 and slot 9
"""
heater_shaker = deck_conflict.HeaterShakerModule(
highest_z_including_labware=123, name_for_errors="some_heater_shaker"
)
trash = deck_conflict.TrashBin(name_for_errors="some_trash_bin")

with pytest.raises(
deck_conflict.DeckConflictError,
match=(
"some_trash_bin in slot 12" " prevents some_heater_shaker from using slot 9"
),
):
deck_conflict.check(
existing_items={DeckSlotName.FIXED_TRASH: trash},
new_item=heater_shaker,
new_location=DeckSlotName.SLOT_9,
robot_type="OT-2 Standard",
)
with pytest.raises(
deck_conflict.DeckConflictError,
match=(
"some_trash_bin in slot 12"
" prevents some_heater_shaker from using slot 11"
),
):
deck_conflict.check(
existing_items={DeckSlotName.FIXED_TRASH: trash},
new_item=heater_shaker,
new_location=DeckSlotName.SLOT_11,
robot_type="OT-2 Standard",
)


@pytest.mark.parametrize(
("deck_slot_name", "adjacent_staging_slot", "non_adjacent_staging_slot"),
[
Expand Down
Loading

0 comments on commit 99e9949

Please sign in to comment.