Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: consolidate motion functions and fix some small design issues #2606

Merged
merged 5 commits into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions api/src/opentrons/hardware_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(self,
top_types.Mount.RIGHT: None
}
self._attached_modules: Dict[str, Any] = {}
self._last_moved_mount: Optional[top_types.Mount] = None

@classmethod
def build_hardware_controller(
Expand Down Expand Up @@ -292,6 +293,8 @@ def current_position(self, mount: top_types.Mount) -> Dict[Axis, float]:
This returns cached position to avoid hitting the smoothie driver
unless ``refresh`` is ``True``.
"""
if not self._current_position:
raise MustHomeError
if mount == mount.RIGHT:
offset = top_types.Point(0, 0, 0)
else:
Expand Down Expand Up @@ -340,6 +343,9 @@ async def move_to(
"""
if not self._current_position:
raise MustHomeError

await self._cache_and_maybe_retract_mount(mount)

z_axis = Axis.by_mount(mount)
if mount == top_types.Mount.LEFT:
offset = top_types.Point(*self.config.mount_offset)
Expand All @@ -363,6 +369,9 @@ async def move_rel(self, mount: top_types.Mount, delta: top_types.Point,
"""
if not self._current_position:
raise MustHomeError

await self._cache_and_maybe_retract_mount(mount)

z_axis = Axis.by_mount(mount)
try:
target_position = OrderedDict(
Expand All @@ -377,8 +386,21 @@ async def move_rel(self, mount: top_types.Mount, delta: top_types.Point,
raise MustHomeError
await self._move(target_position, speed=speed)

async def _cache_and_maybe_retract_mount(self, mount: top_types.Mount):
""" Retract the 'other' mount if necessary

If `mount` does not match the value in :py:attr:`_last_moved_mount`
(and :py:attr:`_last_moved_mount` exists) then retract the mount
in :py:attr:`_last_moved_mount`. Also unconditionally update
:py:attr:`_last_moved_mount` to contain `mount`.
"""
if mount != self._last_moved_mount and self._last_moved_mount:
await self.retract(self._last_moved_mount, 10)
self._last_moved_mount = mount

async def _move_plunger(self, mount: top_types.Mount, dist: float,
speed: float = None):

z_axis = Axis.by_mount(mount)
pl_axis = Axis.of_plunger(mount)
all_axes_pos = OrderedDict(
Expand Down
158 changes: 95 additions & 63 deletions api/src/opentrons/protocol_api/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,49 +197,28 @@ def update_config(self, **kwargs):
"""
self._hardware.update_config(**kwargs)

def move_to(self, mount: types.Mount,
location: types.Location):
""" Implement motions of the robot.

This should not need to be called by the user; it is called by
:py:meth:`InstrumentContext.move_to` (and thus all other
:py:class:`InstrumentContext` methods that involve moving, such as
:py:meth:`InstrumentContext.aspirate`) to move the pipettes around.

It encapsulates location caching and ensures that all moves are safe.
It does this by taking a :py:class:`.types.Location` that can have
a position attached to it, and its behavior depends on the state of
that location cache and the passed location.
"""
switching_instr = self._last_moved_instrument\
and self._last_moved_instrument != mount
if switching_instr:
# TODO: Is 10 the right number here? This is what’s used in
# robot since it’s a default to an argument that is never
# changed
self._log.debug("retract {}".format(self._last_moved_instrument))
self._hardware.retract(self._last_moved_instrument, 10)

if self._location_cache and not switching_instr:
from_loc = self._location_cache
else:
from_loc = types.Location(
point=self._hardware.gantry_position(mount),
labware=None)
moves = geometry.plan_moves(from_loc, location, self._deck_layout)
self._log.debug("planned moves for {}->{}: {}"
.format(from_loc, location, moves))
self._location_cache = location
self._last_moved_instrument = mount
for move in moves:
self._hardware.move_to(mount, move)

def home(self):
""" Homes the robot.
"""
self._log.debug("home")
self._hardware.home()

@property
def location_cache(self) -> Optional[types.Location]:
""" The cache used by the robot to determine where it last was.
"""
return self._location_cache

@location_cache.setter
def location_cache(self, loc: Optional[types.Location]):
self._location_cache = loc

@property
def deck(self) -> geometry.Deck:
""" The object holding the deck layout of the robot.
"""
return self._deck_layout

@staticmethod
def _build_hardware_adapter(
loop: asyncio.AbstractEventLoop,
Expand Down Expand Up @@ -276,10 +255,11 @@ def __init__(self,
self._last_location: Union[Labware, Well, None] = None
self._log = log_parent.getChild(repr(self))
self._log.info("attached")
self._well_bottom_clearance = 0.5

def aspirate(self,
volume: float = None,
location: types.Location = None,
location: Union[types.Location, Well] = None,
rate: float = 1.0):
"""
Aspirate a volume of liquid (in microliters/uL) using this pipette
Expand All @@ -289,16 +269,17 @@ def aspirate(self,
from its current position. If only a location is passed,
:py:meth:`aspirate` will default to its :py:attr:`max_volume`.

If the :py:class:`.types.Location` passed in `location` has an
associated labware, that labware will be saved until another motion
is commanded. This is used to optimize motions - for instance, moving
between two wells requires much less Z-distance to avoid collisions
than moving between two pieces of labware.

:param volume: The volume to aspirate, in microliters. If not
specified, :py:attr:`max_volume`.
:type volume: int or float
:param location: Where to aspirate from. If unspecified, the
:param location: Where to aspirate from. If `location` is a
:py:class:`.Well`, the robot will aspirate from
:py:attr:`well_bottom_clearance` mm
above the bottom of the well. If `location` is a
:py:class:`.Location` (i.e. the result of
:py:meth:`.Well.top` or :py:meth:`.Well.bottom`), the
robot will aspirate from the exact specified location.
If unspecified, the robot will aspirate from the
current position.
:param rate: The relative plunger speed for this aspirate. During
this aspirate, the speed of the plunger will be
Expand All @@ -311,14 +292,24 @@ def aspirate(self,
.format(volume,
location if location else 'current position',
rate))
if location:
if isinstance(location, Well):
point, well = location.bottom()
self.move_to(
types.Location(point + types.Point(0, 0,
self.well_bottom_clearance),
well))
elif isinstance(location, types.Location):
self.move_to(location)
elif location is not None:
raise TypeError(
'location should be a Well or Location, but it is {}'
.format(location))
self._hardware.aspirate(self._mount, volume, rate)
return self

def dispense(self,
volume: float = None,
location: types.Location = None,
location: Union[types.Location, Well] = None,
rate: float = 1.0):
"""
Dispense a volume of liquid (in microliters/uL) using this pipette
Expand All @@ -329,20 +320,21 @@ def dispense(self,
into the pipette will be dispensed (this volume is accessible through
:py:attr:`current_volume`).

The location may be a :py:class:`.Well`, or a specific position in
relation to a :py:class:`.Well`, such as :py:meth:`.Well.top`. If a
:py:class:`.Well` is specified without calling a position method
(such as :py:meth:`.Well.top` or :py:meth:`.Well.bottom`), the liquid
will be dispensed at the bottom of the well.

:param volume: The volume of liquid to dispense, in microliters. If not
specified, defaults to :py:attr:`current_volume`.
:type volume: int or float
:param location: Where to dispense into. If unspecified, the
:param location: Where to dispense into. If `location` is a
:py:class:`.Well`, the robot will dispense into
:py:attr:`well_bottom_clearance` mm
above the bottom of the well. If `location` is a
:py:class:`.Location` (i.e. the result of
:py:meth:`.Well.top` or :py:meth:`.Well.bottom`), the
robot will dispense into the exact specified location.
If unspecified, the robot will dispense into the
current position.
:param rate: The relative plunger speed for this aspirate. During
this aspirate, the speed of the plunger will be
`rate` * :py:attr:`aspirate_speed`. If not specified,
:param rate: The relative plunger speed for this dispense. During
this dispense, the speed of the plunger will be
`rate` * :py:attr:`dispense_speed`. If not specified,
defaults to 1.0 (speed will not be modified).
:type rate: float
:returns: This instance.
Expand All @@ -351,8 +343,18 @@ def dispense(self,
.format(volume,
location if location else 'current position',
rate))
if location:
if isinstance(location, Well):
point, well = location.bottom()
self.move_to(
types.Location(point + types.Point(0, 0,
self.well_bottom_clearance),
well))
elif isinstance(location, types.Location):
self.move_to(location)
elif location is not None:
raise TypeError(
'location should be a Well or Location, but it is {}'
.format(location))
sanni-t marked this conversation as resolved.
Show resolved Hide resolved
self._hardware.dispense(self._mount, volume, rate)
return self

Expand Down Expand Up @@ -420,13 +422,27 @@ def transfer(self,
raise NotImplementedError

def move_to(self, location: types.Location):
""" Move this pipette to a specific location on the deck.
""" Move the instrument.

:param location: Where to move to.
:raises ValueError: if an argument is incorrect.
:param location: The location to move to.
"""
self._log.debug("move to {}".format(location))
self._ctx.move_to(self._mount, location)
if self._ctx.location_cache:
from_lw = self._ctx.location_cache.labware
else:
from_lw = None
from_loc = types.Location(self._hardware.gantry_position(self._mount),
from_lw)
moves = geometry.plan_moves(from_loc, location, self._ctx.deck)
self._log.debug("move {}->{}: {}"
.format(from_loc, location, moves))
try:
for move in moves:
self._hardware.move_to(self._mount, move)
except Exception:
self._ctx.location_cache = None
raise
else:
self._ctx.location_cache = location
return self

@property
Expand Down Expand Up @@ -545,6 +561,22 @@ def hw_pipette(self) -> Optional[Dict[str, Any]]:
"""
return self._hardware.attached_instruments[self._mount]

@property
def well_bottom_clearance(self) -> float:
""" The distance above the bottom of a well to aspirate or dispense.

When :py:meth:`aspirate` or :py:meth:`dispense` is given a
:py:class:`.Well` rather than a full :py:class:`.Location`, the robot
will move this distance above the bottom of the well to aspirate or
dispense.
"""
return self._well_bottom_clearance

@well_bottom_clearance.setter
def well_bottom_clearance(self, clearance: float):
assert clearance >= 0
self._well_bottom_clearance = clearance

def __repr__(self):
return '<{}: {} in {}>'.format(self.__class__.__name__,
self.hw_pipette['name'],
Expand Down
14 changes: 13 additions & 1 deletion api/tests/opentrons/hardware_control/test_moves.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ async def test_controller_musthome(hardware_api):
async def test_home_specific_sim(hardware_api, monkeypatch):
await hardware_api.home()
await hardware_api.move_to(types.Mount.RIGHT, types.Point(-10, 10, 20))
# Avoid the autoretract when moving two difference instruments
hardware_api._last_moved_mount = None
await hardware_api.move_rel(types.Mount.LEFT, types.Point(0, 0, -20))
await hardware_api.home([Axis.Z, Axis.C])
assert hardware_api._current_position == {Axis.X: -10,
Expand Down Expand Up @@ -100,7 +102,7 @@ async def test_move(hardware_api):
target_position2 = {Axis.X: 60,
Axis.Y: 40,
Axis.Z: 228,
Axis.A: 10,
Axis.A: 218, # The other instrument is retracted
Axis.B: 19,
Axis.C: 19}
await hardware_api.move_rel(mount2, rel_position)
Expand Down Expand Up @@ -191,3 +193,13 @@ def mock_move(position, speed=None):
assert called_with['X'] == 44
assert called_with['Y'] == 20
assert called_with['Z'] == 30


async def test_other_mount_retracted(hardware_api):
await hardware_api.home()
await hardware_api.move_to(types.Mount.RIGHT, types.Point(0, 0, 0))
assert hardware_api.gantry_position(types.Mount.RIGHT)\
== types.Point(0, 0, 0)
await hardware_api.move_to(types.Mount.LEFT, types.Point(20, 20, 0))
assert hardware_api.gantry_position(types.Mount.RIGHT) \
== types.Point(54, 20, 218)
Loading