Skip to content

Commit

Permalink
Merge branch 'chore_release-7.1.0' into OT2_and_2_15_trash_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
CaseyBatten committed Dec 7, 2023
2 parents d9d03f2 + 5141a35 commit d1e2c7d
Show file tree
Hide file tree
Showing 74 changed files with 1,569 additions and 709 deletions.
230 changes: 228 additions & 2 deletions api/src/opentrons/protocol_api/core/engine/deck_conflict.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,51 @@
"""A Protocol-Engine-friendly wrapper for opentrons.motion_planning.deck_conflict."""

import itertools
from typing import Collection, Dict, Optional, Tuple, overload
import logging
from typing import Collection, Dict, Optional, Tuple, overload, Union

from opentrons_shared_data.errors.exceptions import MotionPlanningFailureError

from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType
from opentrons.hardware_control.modules.types import ModuleType
from opentrons.motion_planning import deck_conflict as wrapped_deck_conflict
from opentrons.motion_planning.adjacent_slots_getters import (
get_north_slot,
get_west_slot,
)
from opentrons.protocol_engine import (
StateView,
DeckSlotLocation,
ModuleLocation,
OnLabwareLocation,
AddressableAreaLocation,
OFF_DECK_LOCATION,
WellLocation,
DropTipWellLocation,
)
from opentrons.protocol_engine.errors.exceptions import LabwareNotLoadedOnModuleError
from opentrons.types import DeckSlotName
from opentrons.types import DeckSlotName, Point


class PartialTipMovementNotAllowedError(MotionPlanningFailureError):
"""Error raised when trying to perform a partial tip movement to an illegal location."""

def __init__(self, message: str) -> None:
super().__init__(
message=message,
)


_log = logging.getLogger(__name__)

# TODO (spp, 2023-12-06): move this to a location like motion planning where we can
# derive these values from geometry definitions
# Bounding box measurements
A12_column_front_left_bound = Point(x=-1.8, y=2)
A12_column_back_right_bound = Point(x=592, y=506.2)

# Arbitrary safety margin in z-direction
Z_SAFETY_MARGIN = 10


@overload
Expand Down Expand Up @@ -106,6 +137,201 @@ def check(
)


def check_safe_for_pipette_movement(
engine_state: StateView,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: Union[WellLocation, DropTipWellLocation],
) -> None:
"""Check if the labware is safe to move to with a pipette in partial tip configuration.
Args:
engine_state: engine state view
pipette_id: ID of the pipette to be moved
labware_id: ID of the labware we are moving to
well_name: Name of the well to move to
well_location: exact location within the well to move to
"""
# TODO: either hide unsupported configurations behind an advance setting
# or log a warning that deck conflicts cannot be checked for tip config other than
# column config with A12 primary nozzle for the 96 channel
# or single tip config for 8-channel.
if engine_state.pipettes.get_channels(pipette_id) == 96:
_check_deck_conflict_for_96_channel(
engine_state=engine_state,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
elif engine_state.pipettes.get_channels(pipette_id) == 8:
_check_deck_conflict_for_8_channel(
engine_state=engine_state,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)


def _check_deck_conflict_for_96_channel(
engine_state: StateView,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: Union[WellLocation, DropTipWellLocation],
) -> None:
"""Check if there are any conflicts moving to the given labware with the configuration of 96-ch pipette."""
if not (
engine_state.pipettes.get_nozzle_layout_type(pipette_id)
== NozzleConfigurationType.COLUMN
and engine_state.pipettes.get_primary_nozzle(pipette_id) == "A12"
):
# Checking deck conflicts only for 12th column config
return

if isinstance(well_location, DropTipWellLocation):
# convert to WellLocation
well_location = engine_state.geometry.get_checked_tip_drop_location(
pipette_id=pipette_id,
labware_id=labware_id,
well_location=well_location,
partially_configured=True,
)

well_location_point = engine_state.geometry.get_well_position(
labware_id=labware_id, well_name=well_name, well_location=well_location
)

if not _is_within_pipette_extents(
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point
):
raise PartialTipMovementNotAllowedError(
"Requested motion with A12 nozzle column configuration"
" is outside of robot bounds for the 96-channel."
)

labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id)
west_slot_number = get_west_slot(
_deck_slot_to_int(DeckSlotLocation(slotName=labware_slot))
)
if west_slot_number is None:
return

west_slot = DeckSlotName.from_primitive(
west_slot_number
).to_equivalent_for_robot_type(engine_state.config.robot_type)

west_slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=west_slot)
)

pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id)
tip_length = pipette_tip.length if pipette_tip else 0.0

if (
west_slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length
): # a safe margin magic number
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_load_name(labware_id)} in slot {labware_slot}"
f" with a Column nozzle configuration will result in collision with"
f" items in deck slot {west_slot}."
)


def _check_deck_conflict_for_8_channel(
engine_state: StateView,
pipette_id: str,
labware_id: str,
well_name: str,
well_location: Union[WellLocation, DropTipWellLocation],
) -> None:
"""Check if there are any conflicts moving to the given labware with the configuration of 8-ch pipette."""
if not (
engine_state.pipettes.get_nozzle_layout_type(pipette_id)
== NozzleConfigurationType.SINGLE
and engine_state.pipettes.get_primary_nozzle(pipette_id) == "H1"
):
# Checking deck conflicts only for H1 single tip config
return

if isinstance(well_location, DropTipWellLocation):
# convert to WellLocation
well_location = engine_state.geometry.get_checked_tip_drop_location(
pipette_id=pipette_id,
labware_id=labware_id,
well_location=well_location,
partially_configured=True,
)

well_location_point = engine_state.geometry.get_well_position(
labware_id=labware_id, well_name=well_name, well_location=well_location
)

if not _is_within_pipette_extents(
engine_state=engine_state, pipette_id=pipette_id, location=well_location_point
):
# WARNING: (spp, 2023-11-30: this needs to be wired up to check for
# 8-channel pipette extents on both OT2 & Flex!!)
raise PartialTipMovementNotAllowedError(
"Requested motion with single H1 nozzle configuration"
" is outside of robot bounds for the 8-channel."
)

labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id)
north_slot_number = get_north_slot(
_deck_slot_to_int(DeckSlotLocation(slotName=labware_slot))
)
if north_slot_number is None:
return

north_slot = DeckSlotName.from_primitive(
north_slot_number
).to_equivalent_for_robot_type(engine_state.config.robot_type)

north_slot_highest_z = engine_state.geometry.get_highest_z_in_slot(
DeckSlotLocation(slotName=north_slot)
)

pipette_tip = engine_state.pipettes.get_attached_tip(pipette_id)
tip_length = pipette_tip.length if pipette_tip else 0.0

if north_slot_highest_z + Z_SAFETY_MARGIN > well_location_point.z + tip_length:
raise PartialTipMovementNotAllowedError(
f"Moving to {engine_state.labware.get_load_name(labware_id)} in slot {labware_slot}"
f" with a Single nozzle configuration will result in collision with"
f" items in deck slot {north_slot}."
)


def _is_within_pipette_extents(
engine_state: StateView,
pipette_id: str,
location: Point,
) -> bool:
"""Whether a given point is within the extents of a configured pipette on the specified robot."""
robot_type = engine_state.config.robot_type
pipette_channels = engine_state.pipettes.get_channels(pipette_id)
nozzle_config = engine_state.pipettes.get_nozzle_layout_type(pipette_id)
primary_nozzle = engine_state.pipettes.get_primary_nozzle(pipette_id)
if robot_type == "OT-3 Standard":
if (
pipette_channels == 96
and nozzle_config == NozzleConfigurationType.COLUMN
and primary_nozzle == "A12"
):
return (
A12_column_front_left_bound.x
< location.x
< A12_column_back_right_bound.x
and A12_column_front_left_bound.y
< location.y
< A12_column_back_right_bound.y
)
# TODO (spp, 2023-11-07): check for 8-channel nozzle H1 extents on Flex & OT2
return True


def _map_labware(
engine_state: StateView,
labware_id: str,
Expand Down
58 changes: 48 additions & 10 deletions api/src/opentrons/protocol_api/core/engine/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from opentrons_shared_data.pipette.dev_types import PipetteNameType
from opentrons.protocol_api._nozzle_layout import NozzleLayout
from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType
from . import deck_conflict

from ..instrument import AbstractInstrument
from .well import WellCore
Expand Down Expand Up @@ -141,7 +142,13 @@ def aspirate(
absolute_point=location.point,
)
)

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.aspirate(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down Expand Up @@ -209,7 +216,13 @@ def dispense(
absolute_point=location.point,
)
)

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.dispense(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down Expand Up @@ -274,7 +287,13 @@ def blow_out(
absolute_point=location.point,
)
)

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.blow_out(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down Expand Up @@ -316,7 +335,13 @@ def touch_tip(
well_location = WellLocation(
origin=WellOrigin.TOP, offset=WellOffset(x=0, y=0, z=z_offset)
)

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.touch_tip(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down Expand Up @@ -358,7 +383,13 @@ def pick_up_tip(
well_name=well_name,
absolute_point=location.point,
)

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
)
self._engine_client.pick_up_tip(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down Expand Up @@ -403,7 +434,13 @@ def drop_tip(
)
else:
well_location = DropTipWellLocation()

deck_conflict.check_safe_for_pipette_movement(
engine_state=self._engine_client.state,
pipette_id=self._pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=WellLocation(),
)
self._engine_client.drop_tip(
pipette_id=self._pipette_id,
labware_id=labware_id,
Expand Down Expand Up @@ -440,10 +477,11 @@ def _move_to_disposal_location(
addressable_area_name = disposal_location._addressable_area_name
if isinstance(disposal_location, WasteChute):
num_channels = self.get_channels()
if num_channels == 96:
addressable_area_name = "96ChannelWasteChute"
else:
addressable_area_name = "1and8ChannelWasteChute"
addressable_area_name = {
1: "1ChannelWasteChute",
8: "8ChannelWasteChute",
96: "96ChannelWasteChute",
}[num_channels]

self._engine_client.move_to_addressable_area(
pipette_id=self._pipette_id,
Expand Down
4 changes: 2 additions & 2 deletions api/src/opentrons/protocol_api/core/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def configure_for_volume(self, volume: float) -> None:
"""Configure the pipette for a specific volume.
Args:
volume: The volume to preppare to handle.
volume: The volume to prepare to handle.
"""
...

Expand All @@ -269,7 +269,7 @@ def configure_nozzle_layout(
Args:
style: The type of configuration you wish to build.
primary_nozzle: The nozzle that will determine a pipettes critical point.
primary_nozzle: The nozzle that will determine a pipette's critical point.
front_right_nozzle: The front right most nozzle in the requested layout.
"""
...
Expand Down
Loading

0 comments on commit d1e2c7d

Please sign in to comment.