Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Use deck-absolute coords in hardware_control #2502

Merged
merged 3 commits into from
Oct 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions api/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ def _get_data_files(self):
to_include = get_shared_data_files()
destination = os.path.join(build_base, DEST_BASE_PATH)
# And finally, tell the system about our files
print("FILES BEFORE {}".format(files))
files.append(('opentrons', SHARED_DATA_PATH,
destination, to_include))
print("FILES AFTER {}".format(files))
return files


Expand Down
6 changes: 3 additions & 3 deletions api/src/opentrons/deck_calibration/dc_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from typing import Tuple
from numpy.linalg import inv
from numpy import dot, array
from opentrons.legacy_api.robot import robot_configs
from opentrons import robot, instruments
from opentrons.config import robot_configs
from opentrons.util.calibration_functions import probe_instrument
from opentrons.deck_calibration.linal import solve, add_z, apply_transform
from opentrons.util.linal import solve, add_z, apply_transform
from opentrons.deck_calibration import *

# TODO: add tests for methods, split out current point behavior per comment
Expand Down Expand Up @@ -329,7 +329,7 @@ def _update_text_box(self, msg):
points,
# 'Smoothie: {}'.format(self.current_position),
'World: {}'.format(apply_transform(
self._calibration_matrix, self.current_position)),
inv(self._calibration_matrix), self.current_position)),
'Step: {}'.format(self.current_step()),
'Message: {}'.format(msg)
])
Expand Down
6 changes: 3 additions & 3 deletions api/src/opentrons/deck_calibration/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from uuid import uuid1
from opentrons.legacy_api.instruments import pipette_config
from opentrons import instruments, robot
from opentrons.legacy_api.robot import robot_configs
from opentrons.deck_calibration import jog, position, dots_set, z_pos
from opentrons.deck_calibration.linal import add_z, solve
from opentrons.config import robot_configs
from . import jog, position, dots_set, z_pos
from opentrons.util.linal import add_z, solve
from typing import Dict, Tuple

import logging
Expand Down
228 changes: 182 additions & 46 deletions api/src/opentrons/hardware_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,23 @@
"""

import asyncio
from collections import OrderedDict
import functools
import logging
import enum
from typing import Any, Dict, Union, List, Optional, Tuple
from opentrons import types

from opentrons import types as top_types
from opentrons.util import linal
from .simulator import Simulator
from opentrons.config import robot_configs

try:
from .controller import Controller
except ModuleNotFoundError:
# implies windows
Controller = None # type: ignore
from . import modules
from .types import Axis


mod_log = logging.getLogger(__name__)
Expand All @@ -36,18 +41,6 @@ def _log_call_inner(*args, **kwargs):
return _log_call_inner


class _Axis(enum.Enum):
X = enum.auto()
Y = enum.auto()
Z = enum.auto()
A = enum.auto()

@classmethod
def by_mount(cls, mount):
bm = {types.Mount.LEFT: cls.Z, types.Mount.RIGHT: cls.A}
return bm[mount]


class MustHomeError(RuntimeError):
pass

Expand All @@ -70,7 +63,7 @@ class API:

def __init__(self,
backend: _Backend,
config: dict = None,
config: robot_configs.robot_config = None,
loop: asyncio.AbstractEventLoop = None) -> None:
""" Initialize an API instance.

Expand All @@ -79,21 +72,22 @@ def __init__(self,
build_hardware_simulator should be used.
"""
self._log = self.CLS_LOG.getChild(str(id(self)))
self._config = config or robot_configs.load()
self._backend = backend
if None is loop:
self._loop = asyncio.get_event_loop()
else:
self._loop = loop
# {'X': 0.0, 'Y': 0.0, 'Z': 0.0, 'A': 0.0, 'B': 0.0, 'C': 0.0}
self._current_position: Dict[str, float] = {}
self._current_position: Dict[Axis, float] = {}

self._attached_instruments = {types.Mount.LEFT: None,
types.Mount.RIGHT: None}
self._attached_instruments = {top_types.Mount.LEFT: None,
top_types.Mount.RIGHT: None}
self._attached_modules: Dict[str, Any] = {}

@classmethod
def build_hardware_controller(
cls, config: dict = None,
cls, config: robot_configs.robot_config = None,
loop: asyncio.AbstractEventLoop = None) -> 'API':
""" Build a hardware controller that will actually talk to hardware.

Expand All @@ -110,18 +104,18 @@ def build_hardware_controller(
@classmethod
def build_hardware_simulator(
cls,
attached_instruments: Dict[types.Mount, Optional[str]] = None,
attached_instruments: Dict[top_types.Mount, Optional[str]] = None,
attached_modules: List[str] = None,
config: dict = None,
config: robot_configs.robot_config = None,
loop: asyncio.AbstractEventLoop = None) -> 'API':
""" Build a simulating hardware controller.

This method may be used both on a real robot and on dev machines.
Multiple simulating hardware controllers may be active at one time.
"""
if None is attached_instruments:
attached_instruments = {types.Mount.LEFT: None,
types.Mount.RIGHT: None}
attached_instruments = {top_types.Mount.LEFT: None,
top_types.Mount.RIGHT: None}
if None is attached_modules:
attached_modules = []
return cls(Simulator(attached_instruments,
Expand Down Expand Up @@ -160,7 +154,7 @@ async def identify(self, seconds):
@_log_call
async def cache_instrument_models(self):
self._log.info("Updating instrument model cache")
for mount in types.Mount:
for mount in top_types.Mount:
self._attached_instruments[mount] = \
self._backend.get_attached_instruments(mount)

Expand All @@ -183,52 +177,194 @@ async def halt(self):

# Gantry/frame (i.e. not pipette) action API
@_log_call
async def home(self, *args, **kwargs):
# Initialize/update current_position
self._current_position = self._backend.home()
async def home_z(self, mount: top_types.Mount):
""" Home one mount's Z-axis """
backend_pos = self._backend.home(Axis.by_mount(mount))
self._current_position = self._deck_from_smoothie(backend_pos)

@_log_call
async def home_z(self):
pass
async def home(self):
""" Home the entire robot and initialize current position.
"""
# Initialize/update current_position
smoothie_pos = self._backend.home()
self._current_position = self._deck_from_smoothie(smoothie_pos)

def _deck_from_smoothie(
self, smoothie_pos: Dict[str, float]) -> Dict[Axis, float]:
""" Build a deck-abs position store from the smoothie's position

This should take the smoothie style position {'X': float, etc}
and turn it into the position dict used here {Axis.X: float} in
deck-absolute coordinates. It runs the reverse deck transformation
for the axes that require it.

One piece of complexity is that if the gantry transformation includes
a transition between non parallel planes, the z position of the left
mount would depend on its actual position in deck frame, so we have
to apply the mount offset.

TODO: Figure out which frame the mount offset is measured in, because
if it's measured in the deck frame (e.g. by touching off points
on the deck) it has to go through the reverse transform to be
added to the smoothie coordinates here.
"""
with_enum = {Axis[k]: v for k, v in smoothie_pos.items()}
plunger_axes = {k: v for k, v in with_enum.items()
if k not in Axis.gantry_axes()}
right = (with_enum[Axis.X], with_enum[Axis.Y],
with_enum[Axis.by_mount(top_types.Mount.RIGHT)])
# Tell apply_transform to just do the change of base part of the
# transform rather than the full affine transform, because this is
# an offset
left = (with_enum[Axis.X],
with_enum[Axis.Y],
with_enum[Axis.by_mount(top_types.Mount.LEFT)])
right_deck = linal.apply_reverse(self.config.gantry_calibration,
right)
left_deck = linal.apply_reverse(self.config.gantry_calibration,
left)
deck_pos = {Axis.X: right_deck[0],
Axis.Y: right_deck[1],
Axis.by_mount(top_types.Mount.RIGHT): right_deck[2],
Axis.by_mount(top_types.Mount.LEFT): left_deck[2]}
deck_pos.update(plunger_axes)
return deck_pos

def current_position(self, mount: top_types.Mount) -> Dict[Axis, float]:
""" Return the postion (in deck coords) of the critical point of the
specified mount.

This returns cached position to avoid hitting the smoothie driver
unless ``refresh`` is ``True``.
"""
if mount == mount.RIGHT:
offset = top_types.Point(0, 0, 0)
else:
offset = top_types.Point(*self.config.mount_offset)
z_ax = Axis.by_mount(mount)
return {
Axis.X: self._current_position[Axis.X] + offset[0],
Axis.Y: self._current_position[Axis.Y] + offset[1],
z_ax: self._current_position[z_ax] + offset[2]
}

@_log_call
async def move_to(
self, mount: types.Mount, abs_position: types.Point):
self, mount: top_types.Mount, abs_position: top_types.Point):
""" Move the critical point of the specified mount to a location
relative to the deck.

The critical point of the mount depends on the current status of
the mount:
- If the mount does not have anything attached, its critical point is
the bottom of the mount attach bracket.
- If the mount has a pipette attached and it is not known to have a
pipette tip, the critical point is the end of the nozzle of a single
pipette or the end of the backmost nozzle of a multipipette
- If the mount has a pipette attached and it is known to have a
pipette tip, the critical point is the end of the pipette tip for
a single pipette or the end of the tip of the backmost nozzle of a
multipipette
"""
if not self._current_position:
raise MustHomeError
z_axis = _Axis.by_mount(mount)
try:
target_position = {_Axis.X.name: abs_position.x,
_Axis.Y.name: abs_position.y,
z_axis.name: abs_position.z}
except KeyError:
raise MustHomeError
z_axis = Axis.by_mount(mount)
if mount == top_types.Mount.LEFT:
offset = top_types.Point(*self.config.mount_offset)
else:
offset = top_types.Point(0, 0, 0)
target_position = OrderedDict(
((Axis.X, abs_position.x - offset.x),
(Axis.Y, abs_position.y - offset.y),
(z_axis, abs_position.z - offset.z))
)
await self._move(target_position)

@_log_call
async def move_rel(self, mount: types.Mount, delta: types.Point):
async def move_rel(self, mount: top_types.Mount, delta: top_types.Point):
""" Move the critical point of the specified mount by a specified
displacement in a specified direction.
"""
if not self._current_position:
raise MustHomeError
z_axis = _Axis.by_mount(mount)
z_axis = Axis.by_mount(mount)
try:
target_position = \
{_Axis.X.name: self._current_position[_Axis.X.name] + delta.x,
_Axis.Y.name: self._current_position[_Axis.Y.name] + delta.y,
z_axis.name: self._current_position[z_axis.name] + delta.z}
target_position = OrderedDict(
((Axis.X,
self._current_position[Axis.X] + delta.x),
(Axis.Y,
self._current_position[Axis.Y] + delta.y),
(z_axis,
self._current_position[z_axis] + delta.z))
)
except KeyError:
raise MustHomeError
await self._move(target_position)

async def _move(self, target_position: Dict[str, float]):
self._current_position.update(target_position)
async def _move(self, target_position: 'OrderedDict[Axis, float]'):
""" Worker function to apply robot motion.

Robot motion means the kind of motions that are relevant to the robot,
i.e. only one pipette plunger and mount move at the same time, and an
XYZ move in the coordinate frame of one of the pipettes.

``target_position`` should be an ordered dict (ordered by XYZABC)
containing any specified XY motion and at most one of a ZA or BC
components. The frame in which to move is identified by the presence of
(ZA) or (BC).
"""
# Transform only the x, y, and (z or a) axes specified since this could
# get the b or c axes as well
to_transform = tuple((tp
for ax, tp in target_position.items()
if ax in Axis.gantry_axes()))

# Pre-fill the dict we’ll send to the backend with the axes we don’t
# need to transform
smoothie_pos = {ax.name: pos for ax, pos in target_position.items()
if ax not in Axis.gantry_axes()}

# We’d better have all of (x, y, (z or a)) or none of them since the
# gantry transform requires them all
if len(to_transform) != 3:
self._log.error("Move derived {} axes to transform from {}"
.format(len(to_transform), target_position))
raise ValueError("Moves must specify either exactly an x, y, and "
"(z or a) or none of them")

# Type ignored below because linal.apply_transform (rightly) specifies
# Tuple[float, float, float] and the implied type from
# target_position.items() is (rightly) Tuple[float, ...] with unbounded
# size; unfortunately, mypy can’t quite figure out the length check
# above that makes this OK
transformed = linal.apply_transform( # type: ignore
self.config.gantry_calibration, to_transform)

# Since target_position is an OrderedDict with the axes ordered by
# (x, y, z, a, b, c), and we’ll only have one of a or z (as checked
# by the len(to_transform) check above) we can use an enumerate to
# fuse the specified axes and the transformed values back together
for idx, ax in enumerate(target_position.keys()):
if ax in Axis.gantry_axes():
smoothie_pos[ax.name] = transformed[idx]
try:
self._backend.move(target_position)
self._backend.move(smoothie_pos)
except Exception:
self._log.exception('Move failed')
self._current_position.clear()
raise
else:
self._current_position.update(target_position)

# Gantry/frame (i.e. not pipette) config API
@property
def config(self) -> robot_configs.robot_config:
return self._config

async def update_deck_calibration(self, new_transform):
pass

@_log_call
async def head_speed(self, combined_speed=None,
x=None, y=None, z=None, a=None, b=None, c=None):
Expand Down
Loading