From 2d97353a6ad4f14e740c38b78b3d75821e2d77c4 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Thu, 1 Nov 2018 17:18:25 -0400 Subject: [PATCH] refactor(api): Change motion target locations and add arcs (#2598) * refactor(api): Change motion target locations and add arcs The motion commands in the protocol API take a new type, Location, which has a reference to a labware or well it is related to. This labware or well is used by a new function, geometry.plan_arc, to break down individual motion commands into either arc or direct moves depending on if the target is in the same well or labware as the target of the last move. In addition, change the Well class position accessors (top(), bottom(), center()) to return a Location instead of just a Point, and build Wells with references to their parent Labwares. --- .../opentrons/hardware_control/__init__.py | 11 + api/src/opentrons/protocol_api/__init__.py | 8 +- api/src/opentrons/protocol_api/contexts.py | 238 +++++------------- api/src/opentrons/protocol_api/geometry.py | 95 +++++-- api/src/opentrons/protocol_api/labware.py | 78 ++++-- api/src/opentrons/types.py | 50 +++- .../protocol_api/test_accessor_fn.py | 10 + .../protocol_api/test_back_compat.py | 40 +++ .../opentrons/protocol_api/test_context.py | 133 +++++----- .../opentrons/protocol_api/test_geometry.py | 147 +++++++++++ .../opentrons/protocol_api/test_labware.py | 51 ++-- .../opentrons/protocol_api/test_offsets.py | 5 + 12 files changed, 546 insertions(+), 320 deletions(-) create mode 100644 api/tests/opentrons/protocol_api/test_back_compat.py create mode 100644 api/tests/opentrons/protocol_api/test_geometry.py diff --git a/api/src/opentrons/hardware_control/__init__.py b/api/src/opentrons/hardware_control/__init__.py index 0849bb67fbc..a426cbbb821 100644 --- a/api/src/opentrons/hardware_control/__init__.py +++ b/api/src/opentrons/hardware_control/__init__.py @@ -306,6 +306,17 @@ def current_position(self, mount: top_types.Mount) -> Dict[Axis, float]: plunger_ax: self._current_position[plunger_ax] } + def gantry_position(self, mount: top_types.Mount) -> top_types.Point: + """ Return the position of the critical point as pertains to the gantry + + This ignores the plunger position and gives the Z-axis a predictable + name (as :py:attr:`.Point.z`). + """ + cur_pos = self.current_position(mount) + return top_types.Point(x=cur_pos[Axis.X], + y=cur_pos[Axis.Y], + z=cur_pos[Axis.by_mount(mount)]) + @_log_call async def move_to( self, mount: top_types.Mount, abs_position: top_types.Point, diff --git a/api/src/opentrons/protocol_api/__init__.py b/api/src/opentrons/protocol_api/__init__.py index 28a1bb6b47b..3622c31bb2b 100644 --- a/api/src/opentrons/protocol_api/__init__.py +++ b/api/src/opentrons/protocol_api/__init__.py @@ -7,7 +7,7 @@ import logging import os -from . import back_compat +from . import back_compat, labware from .contexts import ProtocolContext, InstrumentContext @@ -45,4 +45,8 @@ def run(protocol_bytes: bytes = None, pass -__all__ = ['run', 'ProtocolContext', 'InstrumentContext', 'back_compat'] +__all__ = ['run', + 'ProtocolContext', + 'InstrumentContext', + 'back_compat', + 'labware'] diff --git a/api/src/opentrons/protocol_api/contexts.py b/api/src/opentrons/protocol_api/contexts.py index 96d516a6287..cf63cc5a409 100644 --- a/api/src/opentrons/protocol_api/contexts.py +++ b/api/src/opentrons/protocol_api/contexts.py @@ -38,6 +38,7 @@ def __init__(self, self._instruments: Dict[types.Mount, Optional[InstrumentContext]]\ = {mount: None for mount in types.Mount} self._last_moved_instrument: Optional[types.Mount] = None + self._location_cache: Optional[types.Location] = None self._hardware = self._build_hardware_adapter(self._loop) self._log = MODULE_LOG.getChild(self.__class__.__name__) @@ -197,24 +198,41 @@ def update_config(self, **kwargs): self._hardware.update_config(**kwargs) def move_to(self, mount: types.Mount, - location: geometry.Location, - strategy: types.MotionStrategy): - where = geometry.point_from_location(location) - if self._last_moved_instrument\ - and self._last_moved_instrument != mount: + location: types.Location): + """ Implement motions of the robot. + + This should not need to be called by the user; it is called by + :py:meth:`InstrumentContext.move_to` (and thus all other + :py:class:`InstrumentContext` methods that involve moving, such as + :py:meth:`InstrumentContext.aspirate`) to move the pipettes around. + + It encapsulates location caching and ensures that all moves are safe. + It does this by taking a :py:class:`.types.Location` that can have + a position attached to it, and its behavior depends on the state of + that location cache and the passed location. + """ + switching_instr = self._last_moved_instrument\ + and self._last_moved_instrument != mount + if switching_instr: # TODO: Is 10 the right number here? This is what’s used in # robot since it’s a default to an argument that is never # changed self._log.debug("retract {}".format(self._last_moved_instrument)) self._hardware.retract(self._last_moved_instrument, 10) - if strategy == types.MotionStrategy.DIRECT: - self._hardware.move_to(mount, where) - self._log.debug("move {} direct {}".format(mount.name, where)) + + if self._location_cache and not switching_instr: + from_loc = self._location_cache else: - self._log.debug("move {} arc {}".format(mount.name, where)) - self._log.warn( - "Arc moves are not implemented, following back to direct") - self._hardware.move_to(mount, where) + from_loc = types.Location( + point=self._hardware.gantry_position(mount), + labware=None) + moves = geometry.plan_moves(from_loc, location, self._deck_layout) + self._log.debug("planned moves for {}->{}: {}" + .format(from_loc, location, moves)) + self._location_cache = location + self._last_moved_instrument = mount + for move in moves: + self._hardware.move_to(mount, move) def home(self): """ Homes the robot. @@ -261,7 +279,7 @@ def __init__(self, def aspirate(self, volume: float = None, - location: geometry.Location = None, + location: types.Location = None, rate: float = 1.0): """ Aspirate a volume of liquid (in microliters/uL) using this pipette @@ -271,58 +289,36 @@ def aspirate(self, from its current position. If only a location is passed, :py:meth:`aspirate` will default to its :py:attr:`max_volume`. - The location may be a :py:class:`.Well`, or a specific position in - relation to a :py:class:`.Well`, such as the return value of - :py:meth:`.Well.top`. If a :py:class:`.Well` is specified without - calling a position method (such as :py:meth:`.Well.top` or - :py:meth:`.Well.bottom`), this method will aspirate from the bottom - of the well. + If the :py:class:`.types.Location` passed in `location` has an + associated labware, that labware will be saved until another motion + is commanded. This is used to optimize motions - for instance, moving + between two wells requires much less Z-distance to avoid collisions + than moving between two pieces of labware. :param volume: The volume to aspirate, in microliters. If not specified, :py:attr:`max_volume`. :type volume: int or float - :param location: Where to aspirate from. A :py:class:`.Well` or - position (e.g. the return value from - :py:meth:`.Well.top` or :py:meth:`.Well.bottom`). If - unspecified, the current position. For advanced usage, - an (x, y, z) tuple or instance of - :py:class:`types.Point` may be passed. This is a - location in :ref:`protocol-api-deck-coords`. - :type location: Well or tuple[float, float, float] or types.Point + :param location: Where to aspirate from. If unspecified, the + current position. :param rate: The relative plunger speed for this aspirate. During this aspirate, the speed of the plunger will be `rate` * :py:attr:`aspirate_speed`. If not specified, defaults to 1.0 (speed will not be modified). :type rate: float :returns: This instance. - - Examples - -------- - .. - >>> from opentrons import instruments, labware, robot # doctest: +SKIP - >>> robot.reset() # doctest: +SKIP - >>> plate = labware.load('96-flat', '2') # doctest: +SKIP - >>> p300 = instruments.P300_Single(mount='right') # doctest: +SKIP - >>> p300.pick_up_tip() # doctest: +SKIP - # aspirate 50uL from a Well - >>> p300.aspirate(50, plate[0]) # doctest: +SKIP - # aspirate 50uL from the center of a well - >>> p300.aspirate(50, plate[1].bottom()) # doctest: +SKIP - >>> # aspirate 20uL in place, twice as fast - >>> p300.aspirate(20, rate=2.0) # doctest: +SKIP - >>> # aspirate the pipette's remaining volume (80uL) from a Well - >>> p300.aspirate(plate[2]) # doctest: +SKIP - """ - where = self._get_point_and_cache(location, 'bottom') + """ self._log.debug("aspirate {} from {} at {}" - .format(volume, where, rate)) - self._ctx.move_to(self._mount, where, types.MotionStrategy.ARC) + .format(volume, + location if location else 'current position', + rate)) + if location: + self.move_to(location) self._hardware.aspirate(self._mount, volume, rate) return self def dispense(self, volume: float = None, - location: geometry.Location = None, + location: types.Location = None, rate: float = 1.0): """ Dispense a volume of liquid (in microliters/uL) using this pipette @@ -342,44 +338,21 @@ def dispense(self, :param volume: The volume of liquid to dispense, in microliters. If not specified, defaults to :py:attr:`current_volume`. :type volume: int or float - :param location: Where to dispense into. A :py:class:`.Well` or - position (e.g. the return value from - :py:meth:`.Well.top` or :py:meth:`.Well.bottom`). If - unspecified, the bottom of the current well. For - advanced usage, an `(x, y, z)` tuple or instance of - :py:class:`types.Point` may be passed containing a - location in :ref:`protocol-api-deck-coords`. - :type location: .Well or tuple[float, float, float] or types.Point + :param location: Where to dispense into. If unspecified, the + current position. :param rate: The relative plunger speed for this aspirate. During this aspirate, the speed of the plunger will be `rate` * :py:attr:`aspirate_speed`. If not specified, defaults to 1.0 (speed will not be modified). :type rate: float :returns: This instance. - - Examples - -------- - .. - >>> from opentrons import instruments, labware, robot # doctest: +SKIP - >>> robot.reset() # doctest: +SKIP - >>> plate = labware.load('96-flat', '3') # doctest: +SKIP - >>> p300 = instruments.P300_Single(mount='left') # doctest: +SKIP - # fill the pipette with liquid (200uL) - >>> p300.aspirate(plate[0]) # doctest: +SKIP - # dispense 50uL to a Well - >>> p300.dispense(50, plate[0]) # doctest: +SKIP - # dispense 50uL to the center of a well - >>> relative_vector = plate[1].center() # doctest: +SKIP - >>> p300.dispense(50, (plate[1], relative_vector)) # doctest: +SKIP - # dispense 20uL in place, at half the speed - >>> p300.dispense(20, rate=0.5) # doctest: +SKIP - # dispense the pipette's remaining volume (80uL) to a Well - >>> p300.dispense(plate[2]) # doctest: +SKIP - """ - where = self._get_point_and_cache(location, 'bottom') + """ self._log.debug("dispense {} from {} at {}" - .format(volume, where, rate)) - self._ctx.move_to(self._mount, where, types.MotionStrategy.ARC) + .format(volume, + location if location else 'current position', + rate)) + if location: + self.move_to(location) self._hardware.dispense(self._mount, volume, rate) return self @@ -446,55 +419,14 @@ def transfer(self, **kwargs): raise NotImplementedError - def move_to(self, - location: geometry.Location, - strategy: Union[types.MotionStrategy, str] = None): + def move_to(self, location: types.Location): """ Move this pipette to a specific location on the deck. - :param location: Where to move to. This can be a :py:class:`.Well`, in - which case the pipette will move to its - :py:meth:`.Well.top`; a :py:class:`Labware`, in which - case the pipette will move to the top of its first - well; or a position, whether specified by the result - of a call to something like :py:class:`.Well.top` or - directly specified as a `tuple` or - :py:attr:`types.Point` in - :ref:`protocol-api-deck-coords`. - :type location: Well or tuple[float, float, float] or types.Point - :param strategy: How to move. This can be a member of - :py:class:`types.MotionStrategy` or one of the strings - `'arc'` and `'direct'` (with any capitalization). - The `'arc'` strategy (default) will pick the head up - on Z axis, then move over to the XY destination, then - finally down to the Z destination. This avoids any - obstacles, like labware, and is suitable for moving - around between different areas on the deck. The - `'direct'` strategy will simply move in a straight - line from the current position to the destination, - and is suitable for smaller motions, for instance - plate streaking or custom touch tip implementations. + :param location: Where to move to. :raises ValueError: if an argument is incorrect. """ - if None is strategy: - strategy = types.MotionStrategy.DIRECT - if strategy not in types.MotionStrategy: - # ignore the type here because mypy isn’t quite good enough - # to catch that if strategy is not in types.MotionStrategy - # it definitely isn’t an instance of types.MotionStrategy - name = strategy.upper() # type: ignore - try: - checked_strategy = types.MotionStrategy[name] - except KeyError: - raise ValueError( - "invalid motion strategy {}. Please use 'arc', 'direct'" - "opentrons.types.MotionStrategy.ARC or " - "opentrons.types.MotionStrategy.DIRECT" - .format(strategy)) - else: - # Same reason for the type: ignore as above - checked_strategy = strategy # type: ignore - where = self._get_point_and_cache(location, 'top') - self._ctx.move_to(self._mount, where, checked_strategy) + self._log.debug("move to {}".format(location)) + self._ctx.move_to(self._mount, location) return self @property @@ -613,62 +545,6 @@ def hw_pipette(self) -> Optional[Dict[str, Any]]: """ return self._hardware.attached_instruments[self._mount] - def _get_point_and_cache(self, - location: Optional[geometry.Location], - accessor: str) -> types.Point: - """ Take a location and turn it into a point, caching the labware. - - This method resolves a :py:class:`.Point` in absolute coordinates. - If given a :py:class:`.Point` (or a 3-tuple of coordinates) it will - use the input directly; otherwise it will attempt to resolve a - :py:class:`.Well` (by taking the first well of a passed - :py:class:`.Labware` if necessary) and use the specified `accessor` - on it. - - If `location` is `None` and there is a cached location, the cache - will be used. - - If a :py:class:`.Well` could be resolved (i.e. `location` was a - :py:class:`.Labware` or :py:class:`.Well` or it was `None` and there - was a cached location) the resolved :py:class:`.Well` will be cached. - If `location` did not resolve to a :py:class:`.Well` the location - cache will be invalidated. - - If `location` is `None` and nothing is cached, raises. - - :param location: The location to resolve (see above). - :param accessor: The name of the position accessor on - :py:class:`.Well` (e.g. 'top' or 'bottom') to use - on the eventually-resolved :py:class:`.Well`. - :raises RuntimeError: If `location` was `None` and no location is - cached. - """ - if None is location: - if self._last_location: - location = self._last_location - else: - raise RuntimeError( - 'Locationless move specified but no location cached') - if isinstance(location, Labware): - well: Optional[Well] = location.wells()[0] - point: types.Point = getattr(well, accessor)() - elif isinstance(location, Well): - well = location - point = getattr(well, accessor)() - elif isinstance(location, types.Point): - point = location - well = None - elif isinstance(location, tuple): - point = types.Point(location[0], location[1], location[2]) - well = None - else: - raise TypeError( - 'Bad location {}, must be None, Labware, Well, Point or tuple' - .format(location)) - - self._last_location = well - return point - def __repr__(self): return '<{}: {} in {}>'.format(self.__class__.__name__, self.hw_pipette['name'], diff --git a/api/src/opentrons/protocol_api/geometry.py b/api/src/opentrons/protocol_api/geometry.py index 0681e0fc14a..66191adddf7 100644 --- a/api/src/opentrons/protocol_api/geometry.py +++ b/api/src/opentrons/protocol_api/geometry.py @@ -1,33 +1,84 @@ from collections import UserDict +import functools import logging -from typing import Union, Tuple +from typing import List, Optional, Tuple -from opentrons.protocol_api.labware import Labware, Well from opentrons import types +from .labware import Labware, Well + MODULE_LOG = logging.getLogger(__name__) -Location = Union[Labware, Well, types.Point, Tuple[float, float, float]] +def max_many(*args): + return functools.reduce(max, args[1:], args[0]) + + +def plan_moves(from_loc: types.Location, + to_loc: types.Location, + deck: 'Deck', + well_z_margin: float = 5.0, + lw_z_margin: float = 20.0) -> List[types.Point]: + """ Plan moves between one :py:class:`.Location` and another. + + Each :py:class:`.Location` instance might or might not have a specific + kind of geometry attached. This function is intended to return series + of moves that contain the minimum safe retractions to avoid (known) + labware on the specified :py:class:`Deck`. + + :param from_loc: The last location. + :param to_loc: The location to move to. + :param deck: The :py:class:`Deck` instance describing the robot. + :param float well_z_margin: How much extra Z margin to raise the cp by over + the bare minimum to clear wells within the same + labware. Default: 5mm + :param float lw_z_margin: How much extra Z margin to raise the cp by over + the bare minimum to clear different pieces of + labware. Default: 20mm + + :returns: A list of :py:class:`.Point` to move through. + """ + + def _split_loc_labware( + loc: types.Location) -> Tuple[Optional[Labware], Optional[Well]]: + if isinstance(loc.labware, Labware): + return loc.labware, None + elif isinstance(loc.labware, Well): + return loc.labware.parent, loc.labware + else: + return None, None -def point_from_location(location: Location) -> types.Point: - """ Build a deck-abs point from anything the user passes in """ + to_point = to_loc.point + to_lw, to_well = _split_loc_labware(to_loc) + from_point = from_loc.point + from_lw, from_well = _split_loc_labware(from_loc) - # Defined with an inner function like this to make logging the result - # a bit less tedious and reasonably mypy-compliant - def _point(loc: Location) -> types.Point: - if isinstance(location, Well): - return location.top() - elif isinstance(location, Labware): - return location.wells()[0].top() - elif isinstance(location, tuple): - return types.Point(*location[:3]) + if to_lw and to_lw == from_lw: + # Two valid labwares. We’ll either raise to clear a well or go direct + if to_well and to_well == from_well: + return [to_point] else: - return location - - point = _point(location) - MODULE_LOG.debug("Location {} -> {}".format(location, point)) - return point + if to_well: + to_safety = to_well.top().point.z + well_z_margin + else: + to_safety = to_lw.highest_z + well_z_margin + if from_well: + from_safety = from_well.top().point.z + well_z_margin + else: + from_safety = from_lw.highest_z + well_z_margin + safe = max_many( + to_point.z, + from_point.z, + to_safety, + from_safety) + else: + # For now, the only fallback is to clear all known labware + safe = max_many(to_point.z, + from_point.z, + deck.highest_z + lw_z_margin) + return [from_point._replace(z=safe), + to_point._replace(z=safe), + to_point] class Deck(UserDict): @@ -65,7 +116,7 @@ def _check_name(self, key: object) -> int: else: return key_int - def __getitem__(self, key: types.DeckLocation) -> Labware: + def __getitem__(self, key: types.DeckLocation) -> 'Labware': return self.data[self._check_name(key)] def __delitem__(self, key: types.DeckLocation) -> None: @@ -77,13 +128,13 @@ def __delitem__(self, key: types.DeckLocation) -> None: for item in [lw for lw in self.data.values() if lw]: self._highest_z = max(item.wells()[0].top().z, self._highest_z) - def __setitem__(self, key: types.DeckLocation, val: Labware) -> None: + def __setitem__(self, key: types.DeckLocation, val: 'Labware') -> None: key_int = self._check_name(key) if self.data.get(key_int) is not None: raise ValueError('Deck location {} already has an item: {}' .format(key, self.data[key_int])) self.data[key_int] = val - self._highest_z = max(val.wells()[0].top().z, self._highest_z) + self._highest_z = max(val.wells()[0].top().point.z, self._highest_z) def __contains__(self, key: object) -> bool: try: diff --git a/api/src/opentrons/protocol_api/labware.py b/api/src/opentrons/protocol_api/labware.py index 05bfb182040..e4fa1fccf45 100644 --- a/api/src/opentrons/protocol_api/labware.py +++ b/api/src/opentrons/protocol_api/labware.py @@ -1,13 +1,14 @@ """This module will replace Placeable""" -import re -import os +from collections import defaultdict +from enum import Enum, auto import json +import os +import re import time from typing import List, Dict -from enum import Enum, auto -from opentrons.types import Point + +from opentrons.types import Point, Location from opentrons.util import environment as env -from collections import defaultdict class WellShape(Enum): @@ -25,7 +26,8 @@ class WellShape(Enum): class Well: def __init__( - self, well_props: dict, parent: Point, display_name: str) -> None: + self, well_props: dict, parent: Location, display_name: str)\ + -> None: """ Create a well, and track the Point corresponding to the top-center of the well (this Point is in absolute deck coordinates) @@ -37,15 +39,19 @@ def __init__( This is created by the caller and passed in, so here it is just saved and made available. :param well_props: a dict that conforms to the json-schema for a Well - :param parent: a Point representing the absolute position of the parent - of the Well (usually the lower-left corner of a labware) + :param parent: a :py:class:`.Location` Point representing the absolute + position of the parent of the Well (usually the + lower-left corner of a labware) """ self._display_name = display_name - self._position = Point( - x=well_props['x'] + parent.x, - y=well_props['y'] + parent.y, - z=well_props['z'] + well_props['depth'] + parent.z) - + self._position\ + = Point(well_props['x'], + well_props['y'], + well_props['z'] + well_props['depth']) + parent.point + + if not parent.labware: + raise ValueError("Wells must have a parent") + self._parent = parent.labware self._shape = well_shapes.get(well_props['shape']) if self._shape is WellShape.RECTANGULAR: self._length = well_props['length'] @@ -62,33 +68,37 @@ def __init__( self._depth = well_props['depth'] - def top(self) -> Point: + @property + def parent(self) -> 'Labware': + return self._parent + + def top(self) -> Location: """ :return: a Point corresponding to the absolute position of the top-center of the well relative to the deck (with the lower-left corner of slot 1 as (0,0,0)) """ - return self._position + return Location(self._position, self) - def bottom(self) -> Point: + def bottom(self) -> Location: """ :return: a Point corresponding to the absolute position of the bottom-center of the well (with the lower-left corner of slot 1 as (0,0,0)) """ top = self.top() - bottom_z = top.z - self._depth - return Point(x=top.x, y=top.y, z=bottom_z) + bottom_z = top.point.z - self._depth + return Location(Point(x=top.point.x, y=top.point.y, z=bottom_z), self) - def center(self) -> Point: + def center(self) -> Location: """ :return: a Point corresponding to the absolute position of the center of the well relative to the deck (with the lower-left corner of slot 1 as (0,0,0)) """ top = self.top() - center_z = top.z - (self._depth / 2.0) - return Point(x=top.x, y=top.y, z=center_z) + center_z = top.point.z - (self._depth / 2.0) + return Location(Point(x=top.point.x, y=top.point.y, z=center_z), self) def _from_center_cartesian( self, x: float, y: float, z: float) -> Point: @@ -121,9 +131,9 @@ def _from_center_cartesian( z_size = self._depth return Point( - x=center.x + (x * (x_size / 2.0)), - y=center.y + (y * (y_size / 2.0)), - z=center.z + (z * (z_size / 2.0))) + x=center.point.x + (x * (x_size / 2.0)), + y=center.point.y + (y * (y_size / 2.0)), + z=center.point.z + (z * (z_size / 2.0))) def __str__(self): return self._display_name @@ -135,7 +145,7 @@ def __eq__(self, other: object) -> bool: """ if not isinstance(other, Well): return NotImplemented - return self.top() == other.top() + return self.top().point == other.top().point class Labware: @@ -170,6 +180,7 @@ def __init__( self._id = definition['otId'] self._parameters = definition['parameters'] offset = definition['cornerOffsetFromSlot'] + self._dimensions = definition['dimensions'] # Inferred from definition self._ordering = [well for col in definition['ordering'] @@ -179,6 +190,7 @@ def __init__( z=offset['z'] + parent.z) # Applied properties self.set_calibration(self._calibrated_offset) + self._pattern = re.compile(r'^([A-Z]+)([1-9][0-9]*)$', re.X) def _build_wells(self) -> List[Well]: @@ -190,7 +202,7 @@ def _build_wells(self) -> List[Well]: return [ Well( self._well_definition[well], - self._calibrated_offset, + Location(self._calibrated_offset, self), "{} of {}".format(well, self._display_name)) for well in self._ordering] @@ -217,6 +229,10 @@ def set_calibration(self, delta: Point): z=self._offset.z + delta.z) self._wells = self._build_wells() + @property + def calibrated_offset(self) -> Point: + return self._calibrated_offset + def well(self, idx) -> Well: """Deprecated---use result of `wells` or `wells_by_index`""" if isinstance(idx, int): @@ -357,6 +373,16 @@ def cols(self, *args): """Deprecated--use `columns`""" return self.columns(*args) + @property + def highest_z(self) -> float: + """ + The z-coordinate of the tallest single point anywhere on the labware. + + This is drawn from the 'dimensions'/'overallHeight' elements of the + labware definition and takes into account the calibration offset. + """ + return self._dimensions['overallHeight'] + self._calibrated_offset.z + def __repr__(self): return self._display_name diff --git a/api/src/opentrons/types.py b/api/src/opentrons/types.py index 52f13dc0602..745741c5ddb 100644 --- a/api/src/opentrons/types.py +++ b/api/src/opentrons/types.py @@ -1,12 +1,17 @@ import enum -from typing import Union -from collections import namedtuple -from typing import Any +from typing import Any, NamedTuple, TYPE_CHECKING, Union -_PointTuple = namedtuple('Point', ['x', 'y', 'z']) +if TYPE_CHECKING: + from typing import (Optional, # noqa(F401) Used for typechecking + Tuple) + from .labware import Labware, Well # noqa(F401) Used for typechecking -class Point(_PointTuple): +class Point(NamedTuple): + x: float + y: float + z: float + def __eq__(self, other: Any) -> bool: if not isinstance(other, Point): return False @@ -23,14 +28,39 @@ def __sub__(self, other: Any) -> 'Point': return Point(self.x - other.x, self.y - other.y, self.z - other.z) +class Location(NamedTuple): + """ A location to target as a motion in the :ref:`protocol-api`. + + The location contains a :py:class:`.Point` (in + :ref:`protocol-api-deck-coordinates`) and possibly an associated + :py:class:`.Labware` or :py:class:`.Well` instance. + + It should rarely be constructed directly by the user; rather, it is the + return type of most :py:class:`.Well` accessors like :py:meth:`.Well.top` + and is passed directly into a method like + :py:meth:`InstrumentContext.aspirate`. + + .. warning:: + The :py:attr:`labware` attribute of this class is used by the protocol + API internals to, among other things, determine safe heights to retract + the instruments to when moving between locations. If constructing an + instance of this class manually, be sure to either specify `None` as the + labware (so the robot does its worst case retraction) or specify the + correct labware for the :py:attr:`point` attribute. + + + .. warning:: + The `==` operation compares both the position and associated labware. + If you only need to compare locations, compare the :py:attr:`point` + of each item. + """ + point: Point + labware: 'Union[Labware, Well, None]' + + class Mount(enum.Enum): LEFT = enum.auto() RIGHT = enum.auto() DeckLocation = Union[int, str] - - -class MotionStrategy(enum.Enum): - DIRECT = enum.auto() - ARC = enum.auto() diff --git a/api/tests/opentrons/protocol_api/test_accessor_fn.py b/api/tests/opentrons/protocol_api/test_accessor_fn.py index aa671a30969..6be2e340811 100644 --- a/api/tests/opentrons/protocol_api/test_accessor_fn.py +++ b/api/tests/opentrons/protocol_api/test_accessor_fn.py @@ -34,6 +34,11 @@ "z": 0, "shape": "circular" } + }, + "dimensions": { + "overallLength": 1.0, + "overallWidth": 2.0, + "overallHeight": 3.0 } } @@ -106,6 +111,11 @@ "z": 0, "shape": "circular" } + }, + "dimensions": { + "overallLength": 1.0, + "overallWidth": 2.0, + "overallHeight": 3.0 } } diff --git a/api/tests/opentrons/protocol_api/test_back_compat.py b/api/tests/opentrons/protocol_api/test_back_compat.py new file mode 100644 index 00000000000..1df61599af8 --- /dev/null +++ b/api/tests/opentrons/protocol_api/test_back_compat.py @@ -0,0 +1,40 @@ +from opentrons.protocol_api import back_compat # , ProtocolContext +from opentrons.types import Mount + + +def test_add_instrument(loop, monkeypatch): + requested_instr, requested_mount = None, None + + def fake_load(instr_name, mount): + nonlocal requested_instr + nonlocal requested_mount + requested_instr = instr_name + requested_mount = mount + + monkeypatch.setattr(back_compat.instruments._ctx, + 'load_instrument', fake_load) + + back_compat.instruments.P1000_Single('left') + assert requested_instr == 'p1000_single' + assert requested_mount == Mount.LEFT + back_compat.instruments.P10_Single('right') + assert requested_instr == 'p10_single' + assert requested_mount == Mount.RIGHT + back_compat.instruments.P10_Multi('left') + assert requested_instr == 'p10_multi' + assert requested_mount == Mount.LEFT + back_compat.instruments.P50_Single('right') + assert requested_instr == 'p50_single' + assert requested_mount == Mount.RIGHT + back_compat.instruments.P50_Multi('left') + assert requested_instr == 'p50_multi' + assert requested_mount == Mount.LEFT + back_compat.instruments.P300_Single('right') + assert requested_instr == 'p300_single' + assert requested_mount == Mount.RIGHT + back_compat.instruments.P300_Multi('left') + assert requested_instr == 'p300_multi' + assert requested_mount == Mount.LEFT + back_compat.instruments.P1000_Single('right') + assert requested_instr == 'p1000_single' + assert requested_mount == Mount.RIGHT diff --git a/api/tests/opentrons/protocol_api/test_context.py b/api/tests/opentrons/protocol_api/test_context.py index 70049b51ce4..fce57e52922 100644 --- a/api/tests/opentrons/protocol_api/test_context.py +++ b/api/tests/opentrons/protocol_api/test_context.py @@ -4,9 +4,7 @@ import pkgutil import opentrons.protocol_api as papi -from opentrons.protocol_api.geometry import Deck -from opentrons.protocol_api.labware import load -from opentrons.types import MotionStrategy, Mount, Point +from opentrons.types import Mount, Point, Location from opentrons.hardware_control import API from opentrons.hardware_control.types import Axis from opentrons.config.pipette_config import configs @@ -14,6 +12,7 @@ import pytest +# TODO: Remove once load_labware_by_name is implemented labware_name = 'generic_96_wellPlate_380_uL' labware_def = json.loads( pkgutil.get_data('opentrons', @@ -27,36 +26,6 @@ def dummy_load(labware): monkeypatch.setattr(papi.labware, '_load_definition_by_name', dummy_load) -def test_slot_names(load_my_labware): - slots_by_int = list(range(1, 13)) - slots_by_str = [str(idx) for idx in slots_by_int] - for method in (slots_by_int, slots_by_str): - d = Deck() - for idx, slot in enumerate(method): - lw = load(labware_name, d.position_for(slot), str(slot)) - assert slot in d - d[slot] = lw - with pytest.raises(ValueError): - d[slot] = 'not this time boyo' - del d[slot] - assert slot in d - assert d[slot] is None - - assert 'hasasdaia' not in d - with pytest.raises(ValueError): - d['ahgoasia'] = 'nope' - - -def test_highest_z(load_my_labware): - deck = Deck() - assert deck.highest_z == 0 - lw = load(labware_name, deck.position_for(1), '1') - deck[1] = lw - assert deck.highest_z == lw.wells()[0].top().z - del deck[1] - assert deck.highest_z == 0 - - def test_load_instrument(loop): ctx = papi.ProtocolContext(loop=loop) for config in configs: @@ -73,35 +42,71 @@ def test_motion(loop): ctx.connect(hardware) instr = ctx.load_instrument('p10_single', Mount.RIGHT) instr.home() - assert instr.move_to((0, 0, 0)) is instr + assert instr.move_to(Location(Point(0, 0, 0), None)) is instr assert hardware.current_position(instr._mount) == {Axis.X: 0, Axis.Y: 0, Axis.A: 0, Axis.C: 19} -def test_location_parsing(loop, load_my_labware): +def test_location_cache(loop, monkeypatch, load_my_labware): + hardware = API.build_hardware_simulator(loop=loop) ctx = papi.ProtocolContext(loop) - lw = ctx.load_labware_by_name('generic_96_wellPlate_380_uL', '1') - instr = ctx.load_instrument('p10_single', Mount.RIGHT) - w0 = lw.wells()[0] - assert instr._get_point_and_cache(w0, 'top') == w0.top() - assert instr._last_location is w0 - assert instr._get_point_and_cache(w0, 'bottom') == w0.bottom() - assert instr._last_location is w0 - assert instr._get_point_and_cache(w0, 'center') == w0.center() - assert instr._last_location is w0 - assert instr._get_point_and_cache(w0.bottom(), 'top') == w0.bottom() - assert instr._last_location is None - assert instr._get_point_and_cache(lw, 'top') == lw.wells()[0].top() - assert instr._last_location == lw.wells()[0] - assert instr._get_point_and_cache(None, 'top') == w0.top() - assert instr._last_location == lw.wells()[0] - assert instr._get_point_and_cache((0, 1, 2), 'bottom') == Point(0, 1, 2) - with pytest.raises(RuntimeError): - instr._get_point_and_cache(None, 'top') - with pytest.raises(TypeError): - instr._get_point_and_cache(2, 'bottom') + ctx.connect(hardware) + right = ctx.load_instrument('p10_single', Mount.RIGHT) + left = ctx.load_instrument('p300_multi', Mount.LEFT) + lw = ctx.load_labware_by_name('generic_96_wellPlate_380_uL', 1) + ctx.home() + + test_args = None + + def fake_plan_move(from_loc, to_loc, deck, + well_z_margin=None, + lw_z_margin=None): + nonlocal test_args + test_args = (from_loc, to_loc, deck, well_z_margin, lw_z_margin) + return [Point(0, 1, 10), Point(1, 2, 10), Point(1, 2, 3)] + + monkeypatch.setattr(papi.geometry, 'plan_moves', fake_plan_move) + # When we move without a cache, the from location should be the gantry + # position + right.move_to(lw.wells()[0].top()) + # The home position from hardware_control/simulator.py, taking into account + # that the right pipette is a p10 single which is a different height than + # the reference p300 single + assert test_args[0].point == Point(418, 353, 205) + assert test_args[0].labware is None + + # Once we have a location cache, that should be our from_loc + right.move_to(lw.wells()[1].top()) + assert test_args[0] == lw.wells()[0].top() + + # If we switch instruments, we should ignore the cache + here = hardware.gantry_position(Mount.LEFT) + left.move_to(lw.wells()[1].top()) + assert test_args[0].point == here + assert test_args[0].labware is None + + +def test_move_uses_arc(loop, monkeypatch, load_my_labware): + hardware = API.build_hardware_simulator(loop=loop) + ctx = papi.ProtocolContext(loop) + ctx.connect(hardware) + right = ctx.load_instrument('p10_single', Mount.RIGHT) + lw = ctx.load_labware_by_name('generic_96_wellPlate_380_uL', 1) + ctx.home() + + targets = [] + + async def fake_move(mount, target_pos): + nonlocal targets + targets.append((mount, target_pos)) + monkeypatch.setattr(hardware, 'move_to', fake_move) + + right.move_to(lw.wells()[0].top()) + assert len(targets) == 3 + assert targets[-1][0] == Mount.RIGHT + assert targets[-1][1] == lw.wells()[0].top().point def test_pipette_info(loop): @@ -128,18 +133,17 @@ async def fake_hw_aspirate(mount, volume=None, rate=1.0): move_called_with = None - def fake_move(mount, loc, strat): + def fake_move(mount, loc): nonlocal move_called_with - move_called_with = (mount, loc, strat) + move_called_with = (mount, loc) monkeypatch.setattr(ctx._hardware._api, 'aspirate', fake_hw_aspirate) monkeypatch.setattr(ctx, 'move_to', fake_move) - instr.aspirate(2.0, lw) + instr.aspirate(2.0, lw.wells()[0].bottom()) assert asp_called_with == (Mount.RIGHT, 2.0, 1.0) - assert move_called_with == (Mount.RIGHT, lw.wells()[0].bottom(), - MotionStrategy.ARC) + assert move_called_with == (Mount.RIGHT, lw.wells()[0].bottom()) def test_dispense(loop, load_my_labware, monkeypatch): @@ -155,15 +159,14 @@ async def fake_hw_dispense(mount, volume=None, rate=1.0): move_called_with = None - def fake_move(mount, loc, strat): + def fake_move(mount, loc): nonlocal move_called_with - move_called_with = (mount, loc, strat) + move_called_with = (mount, loc) monkeypatch.setattr(ctx._hardware._api, 'dispense', fake_hw_dispense) monkeypatch.setattr(ctx, 'move_to', fake_move) - instr.dispense(2.0, lw) + instr.dispense(2.0, lw.wells()[0].bottom()) assert disp_called_with == (Mount.RIGHT, 2.0, 1.0) - assert move_called_with == (Mount.RIGHT, lw.wells()[0].bottom(), - MotionStrategy.ARC) + assert move_called_with == (Mount.RIGHT, lw.wells()[0].bottom()) diff --git a/api/tests/opentrons/protocol_api/test_geometry.py b/api/tests/opentrons/protocol_api/test_geometry.py new file mode 100644 index 00000000000..ddeaa843dc6 --- /dev/null +++ b/api/tests/opentrons/protocol_api/test_geometry.py @@ -0,0 +1,147 @@ +import json +import pkgutil + +import pytest + +import opentrons.protocol_api as papi +from opentrons.protocol_api.geometry import Deck, plan_moves +from opentrons.protocol_api.labware import load + +# TODO: Remove once load_labware_by_name is implemented +labware_name = 'generic_96_wellPlate_380_uL' +labware_def = json.loads( + pkgutil.get_data('opentrons', + 'shared_data/definitions2/{}.json'.format(labware_name))) + + +@pytest.fixture +def load_my_labware(monkeypatch): + def dummy_load(labware): + return labware_def + monkeypatch.setattr(papi.labware, '_load_definition_by_name', dummy_load) + + +def test_slot_names(load_my_labware): + slots_by_int = list(range(1, 13)) + slots_by_str = [str(idx) for idx in slots_by_int] + for method in (slots_by_int, slots_by_str): + d = Deck() + for idx, slot in enumerate(method): + lw = load(labware_name, d.position_for(slot), str(slot)) + assert slot in d + d[slot] = lw + with pytest.raises(ValueError): + d[slot] = 'not this time boyo' + del d[slot] + assert slot in d + assert d[slot] is None + + assert 'hasasdaia' not in d + with pytest.raises(ValueError): + d['ahgoasia'] = 'nope' + + +def test_highest_z(load_my_labware): + deck = Deck() + assert deck.highest_z == 0 + lw = load(labware_name, deck.position_for(1), '1') + deck[1] = lw + assert deck.highest_z == lw.wells()[0].top().point.z + del deck[1] + assert deck.highest_z == 0 + + +def check_arc_basic(arc, from_loc, to_loc): + """ Check the tests that should always be true for different-well moves + - we should always go only up, then only xy, then only down + - we should have three moves + """ + assert len(arc) == 3 + assert arc[0]._replace(z=0) == from_loc.point._replace(z=0) + assert arc[0].z >= from_loc.point.z + assert arc[0].z == arc[1].z + assert arc[1]._replace(z=0) == to_loc.point._replace(z=0) + assert arc[1].z >= to_loc.point.z + assert arc[2] == to_loc.point + + +def test_direct_movs(load_my_labware): + deck = Deck() + lw1 = load(labware_name, deck.position_for(1), 'lw1') + + same_place = plan_moves(lw1.wells()[0].top(), lw1.wells()[0].top(), deck) + assert same_place == [lw1.wells()[0].top().point] + + same_well = plan_moves(lw1.wells()[0].top(), lw1.wells()[0].bottom(), deck) + assert same_well == [lw1.wells()[0].bottom().point] + + +def test_basic_arc(load_my_labware): + deck = Deck() + lw1 = load(labware_name, deck.position_for(1), 'lw1') + lw2 = load(labware_name, deck.position_for(2), 'lw2') + # same-labware moves should use the smaller safe z + same_lw = plan_moves(lw1.wells()[0].top(), + lw1.wells()[8].bottom(), + deck, + 7.0, 15.0) + check_arc_basic(same_lw, lw1.wells()[0].top(), lw1.wells()[8].bottom()) + assert same_lw[0].z == lw1.wells()[0].top().point.z + 7.0 + + # different-labware moves, or moves with no labware attached, + # should use the larger safe z and the global z + different_lw = plan_moves(lw1.wells()[0].top(), + lw2.wells()[0].bottom(), + deck, + 7.0, 15.0) + check_arc_basic(different_lw, + lw1.wells()[0].top(), lw2.wells()[0].bottom()) + assert different_lw[0].z == deck.highest_z + 15.0 + + +def test_no_labware_loc(load_my_labware): + deck = Deck() + lw1 = load(labware_name, deck.position_for(1), 'lw1') + lw2 = load(labware_name, deck.position_for(2), 'lw2') + # Various flavors of locations without labware should work + no_lw = lw1.wells()[0].top()._replace(labware=None) + + no_from = plan_moves(no_lw, lw2.wells()[0].bottom(), deck, 7.0, 15.0) + check_arc_basic(no_from, no_lw, lw2.wells()[0].bottom()) + assert no_from[0].z == deck.highest_z + 15.0 + + no_to = plan_moves(lw1.wells()[0].bottom(), no_lw, deck, 7.0, 15.0) + check_arc_basic(no_to, lw1.wells()[0].bottom(), no_lw) + assert no_from[0].z == deck.highest_z + 15.0 + + no_well = lw1.wells()[0].top()._replace(labware=lw1) + + no_from_well = plan_moves(no_well, lw1.wells()[1].top(), deck, 7.0, 15.0) + check_arc_basic(no_from_well, no_well, lw1.wells()[1].top()) + assert no_from_well[0].z\ + == labware_def['dimensions']['overallHeight'] + 7.0 + + no_to_well = plan_moves(lw1.wells()[1].top(), no_well, deck, 7.0, 15.0) + check_arc_basic(no_to_well, lw1.wells()[1].top(), no_well) + assert no_to_well[0].z == labware_def['dimensions']['overallHeight'] + 7.0 + + +def test_arc_tall_point(load_my_labware): + deck = Deck() + lw1 = load(labware_name, deck.position_for(1), 'lw1') + tall_z = 100 + old_top = lw1.wells()[0].top() + tall_point = old_top.point._replace(z=tall_z) + tall_top = old_top._replace(point=tall_point) + to_tall = plan_moves(lw1.wells()[2].top(), tall_top, deck, 7.0, 15.0) + check_arc_basic(to_tall, lw1.wells()[2].top(), tall_top) + assert to_tall[0].z == tall_z + + from_tall = plan_moves(tall_top, lw1.wells()[3].top(), deck, 7.0, 15.0) + check_arc_basic(from_tall, tall_top, lw1.wells()[3].top()) + assert from_tall[0].z == tall_z + + no_well = tall_top._replace(labware=lw1) + from_tall_lw = plan_moves(no_well, lw1.wells()[4].bottom(), deck, + 7.0, 15.0) + check_arc_basic(from_tall_lw, no_well, lw1.wells()[4].bottom()) diff --git a/api/tests/opentrons/protocol_api/test_labware.py b/api/tests/opentrons/protocol_api/test_labware.py index 8855edc4039..6e919df1abc 100644 --- a/api/tests/opentrons/protocol_api/test_labware.py +++ b/api/tests/opentrons/protocol_api/test_labware.py @@ -1,7 +1,7 @@ import json import pkgutil from opentrons.protocol_api import labware -from opentrons.types import Point +from opentrons.types import Point, Location test_data = { 'circular_well_json': { @@ -27,7 +27,7 @@ def test_well_init(): - slot = Point(1, 2, 3) + slot = Location(Point(1, 2, 3), 1) well_name = 'circular_well_json' well1 = labware.Well(test_data[well_name], slot, well_name) assert well1._diameter == test_data[well_name]['diameter'] @@ -42,29 +42,31 @@ def test_well_init(): def test_top(): - slot = Point(4, 5, 6) + slot = Location(Point(4, 5, 6), 1) well_name = 'circular_well_json' well = labware.Well(test_data[well_name], slot, well_name) well_data = test_data[well_name] - expected_x = well_data['x'] + slot.x - expected_y = well_data['y'] + slot.y - expected_z = well_data['z'] + well_data['depth'] + slot.z - assert well.top() == Point(expected_x, expected_y, expected_z) + expected_x = well_data['x'] + slot.point.x + expected_y = well_data['y'] + slot.point.y + expected_z = well_data['z'] + well_data['depth'] + slot.point.z + assert well.top() == Location(Point(expected_x, expected_y, expected_z), + well) def test_bottom(): - slot = Point(7, 8, 9) + slot = Location(Point(7, 8, 9), 1) well_name = 'rectangular_well_json' well = labware.Well(test_data[well_name], slot, well_name) well_data = test_data[well_name] - expected_x = well_data['x'] + slot.x - expected_y = well_data['y'] + slot.y - expected_z = well_data['z'] + slot.z - assert well.bottom() == Point(expected_x, expected_y, expected_z) + expected_x = well_data['x'] + slot.point.x + expected_y = well_data['y'] + slot.point.y + expected_z = well_data['z'] + slot.point.z + assert well.bottom() == Location(Point(expected_x, expected_y, expected_z), + well) def test_from_center_cartesian(): - slot1 = Point(10, 11, 12) + slot1 = Location(Point(10, 11, 12), 1) well_name = 'circular_well_json' well1 = labware.Well(test_data[well_name], slot1, well_name) @@ -84,7 +86,7 @@ def test_from_center_cartesian(): assert point1.y == expected_y assert point1.z == expected_z - slot2 = Point(13, 14, 15) + slot2 = Location(Point(13, 14, 15), 1) well2_name = 'rectangular_well_json' well2 = labware.Well(test_data[well2_name], slot2, well2_name) percent2_x = -0.25 @@ -165,3 +167,24 @@ def test_backcompat(): w11 = lw.columns('2', '3', '6') assert len(w11) == 3 assert repr(w11[1][2]) == well_c3_name + + +def test_well_parent(): + labware_name = 'generic_96_wellPlate_380_uL' + labware_def = json.loads( + pkgutil.get_data('opentrons', + 'shared_data/definitions2/{}.json'.format( + labware_name))) + lw = labware.Labware(labware_def, Point(0, 0, 0), 'Test Slot') + parent = Location(Point(7, 8, 9), lw) + well_name = 'circular_well_json' + well = labware.Well(test_data[well_name], + parent, + well_name) + assert well.parent is lw + assert well.top().labware is well + assert well.top().labware.parent is lw + assert well.bottom().labware is well + assert well.bottom().labware.parent is lw + assert well.center().labware is well + assert well.center().labware.parent is lw diff --git a/api/tests/opentrons/protocol_api/test_offsets.py b/api/tests/opentrons/protocol_api/test_offsets.py index d2660cfa5dc..dabb941320b 100644 --- a/api/tests/opentrons/protocol_api/test_offsets.py +++ b/api/tests/opentrons/protocol_api/test_offsets.py @@ -39,6 +39,11 @@ "z": 0, "shape": "circular" } + }, + "dimensions": { + "overallLength": 1.0, + "overallWidth": 2.0, + "overallHeight": 3.0 } }