Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(api): pipette critical points #2526

Merged
merged 2 commits into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions api/src/opentrons/config/pipette_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
'aspirate_flow_rate',
'dispense_flow_rate',
'channels',
'name',
'model_offset',
'plunger_current',
'drop_tip_current',
'min_volume',
'max_volume',
'ul_per_mm',
'quirks',
'tip_length' # TODO (andy): remove from pipette, move to tip-rack
'tip_length', # TODO (andy): remove from pipette, move to tip-rack
'display_name'
]
)

Expand Down Expand Up @@ -86,15 +86,15 @@ def load(pipette_model: str) -> pipette_config:
aspirate_flow_rate=cfg.get('aspirateFlowRate'),
dispense_flow_rate=cfg.get('dispenseFlowRate'),
channels=cfg.get('channels'),
name=pipette_model,
model_offset=cfg.get('modelOffset'),
plunger_current=cfg.get('plungerCurrent'),
drop_tip_current=cfg.get('dropTipCurrent'),
min_volume=cfg.get('minVolume'),
max_volume=cfg.get('maxVolume'),
ul_per_mm=cfg.get('ulPerMm'),
quirks=cfg.get('quirks'),
tip_length=cfg.get('tipLength')
tip_length=cfg.get('tipLength'),
display_name=cfg.get('displayName')
)

# Verify that stored values agree with calculations
Expand Down
4 changes: 2 additions & 2 deletions api/src/opentrons/deck_calibration/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ def set_current_mount(attached_pipettes):
if left['model'] in pipette_config.configs:
pip_config = pipette_config.load(left['model'])
left_pipette = instruments._create_pipette_from_config(
mount='left', config=pip_config)
mount='left', config=pip_config, name=left['model'])

if right['model'] in pipette_config.configs:
pip_config = pipette_config.load(right['model'])
right_pipette = instruments._create_pipette_from_config(
mount='right', config=pip_config)
mount='right', config=pip_config, name=right['model'])

if right_pipette and right_pipette.channels == 1:
session.current_mount = 'A'
Expand Down
151 changes: 88 additions & 63 deletions api/src/opentrons/hardware_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
from collections import OrderedDict
import functools
import logging
from typing import Any, Dict, Union, List, Tuple
from typing import Any, Dict, Union, List, Optional, Tuple
from opentrons import types as top_types
from opentrons.util import linal
from .simulator import Simulator
from opentrons.config import robot_configs
from contextlib import contextmanager
from opentrons.config import pipette_config
from .pipette import Pipette
try:
from .controller import Controller
except ModuleNotFoundError:
Expand Down Expand Up @@ -50,6 +50,7 @@ class PipetteNotAttachedError(KeyError):


_Backend = Union[Controller, Simulator]
Instruments = Dict[top_types.Mount, Optional[Pipette]]


class API:
Expand Down Expand Up @@ -85,12 +86,10 @@ def __init__(self,
# {'X': 0.0, 'Y': 0.0, 'Z': 0.0, 'A': 0.0, 'B': 0.0, 'C': 0.0}
self._current_position: Dict[Axis, float] = {}

self._attached_instruments: \
Dict[top_types.Mount, Dict[str, Any]] = {top_types.Mount.LEFT: {},
top_types.Mount.RIGHT: {}}
self._current_volume: \
Dict[top_types.Mount, float] = {top_types.Mount.LEFT: 0,
top_types.Mount.RIGHT: 0}
self._attached_instruments: Instruments = {
top_types.Mount.LEFT: None,
top_types.Mount.RIGHT: None
}
self._attached_modules: Dict[str, Any] = {}

@classmethod
Expand All @@ -113,7 +112,7 @@ def build_hardware_controller(
@classmethod
def build_hardware_simulator(
cls,
attached_instruments: Dict[top_types.Mount, Dict[str, Any]] = None,
attached_instruments: Dict[top_types.Mount, str] = None,
attached_modules: List[str] = None,
config: robot_configs.robot_config = None,
loop: asyncio.AbstractEventLoop = None) -> 'API':
Expand All @@ -123,8 +122,7 @@ def build_hardware_simulator(
Multiple simulating hardware controllers may be active at one time.
"""
if None is attached_instruments:
attached_instruments = {top_types.Mount.LEFT: {},
top_types.Mount.RIGHT: {}}
attached_instruments = {}

if None is attached_modules:
attached_modules = []
Expand Down Expand Up @@ -171,8 +169,7 @@ async def cache_instruments(self):
for mount in top_types.Mount:
instrument_model = self._backend.get_attached_instrument(mount)
if instrument_model:
configs = pipette_config.load(instrument_model)
self._attached_instruments[mount].update(configs._asdict())
self._attached_instruments[mount] = Pipette(instrument_model)
mod_log.info("Instruments found:{}".format(self._attached_instruments))

@property
Expand All @@ -182,11 +179,12 @@ def attached_instruments(self):
instruments = {top_types.Mount.LEFT: {},
top_types.Mount.RIGHT: {}}
for mount in top_types.Mount:
if not self._attached_instruments[mount].get('name'):
instr = self._attached_instruments[mount]
if not instr:
continue
instr_dict = instr.as_dict()
for key in configs:
instruments[mount][key] = \
self._attached_instruments[mount][key]
instruments[mount][key] = instr_dict[key]
return instruments

@_log_call
Expand Down Expand Up @@ -279,10 +277,11 @@ def current_position(self, mount: top_types.Mount) -> Dict[Axis, float]:
offset = top_types.Point(*self.config.mount_offset)
z_ax = Axis.by_mount(mount)
plunger_ax = Axis.of_plunger(mount)
cp = self._critical_point_for(mount)
return {
Axis.X: self._current_position[Axis.X] + offset[0],
Axis.Y: self._current_position[Axis.Y] + offset[1],
z_ax: self._current_position[z_ax] + offset[2],
Axis.X: self._current_position[Axis.X] + offset[0] + cp.x,
Axis.Y: self._current_position[Axis.Y] + offset[1] + cp.y,
z_ax: self._current_position[z_ax] + offset[2] + cp.z,
plunger_ax: self._current_position[plunger_ax]
}

Expand Down Expand Up @@ -311,10 +310,11 @@ async def move_to(
offset = top_types.Point(*self.config.mount_offset)
else:
offset = top_types.Point(0, 0, 0)
cp = self._critical_point_for(mount)
target_position = OrderedDict(
((Axis.X, abs_position.x - offset.x),
(Axis.Y, abs_position.y - offset.y),
(z_axis, abs_position.z - offset.z))
((Axis.X, abs_position.x - offset.x - cp.x),
(Axis.Y, abs_position.y - offset.y - cp.y),
(z_axis, abs_position.z - offset.z - cp.z))
)
await self._move(target_position)

Expand Down Expand Up @@ -394,6 +394,24 @@ async def _move(self, target_position: 'OrderedDict[Axis, float]'):
else:
self._current_position.update(target_position)

def _critical_point_for(self, mount: top_types.Mount) -> top_types.Point:
""" Return the current critical point of the specified mount.

The mount's critical point is the position of the mount itself, if no
pipette is attached, or the pipette's critical point (which depends on
tip status).
"""
pip = self._attached_instruments[mount]
if pip is not None:
return pip.critical_point
else:
# TODO: The smoothie’s z/a home position is calculated to provide
# the offset for a P300 single. Here we should decide whether we
# implicitly accept this as correct (by returning a null offset)
# or not (by returning an offset calculated to move back up the
# length of the P300 single).
return top_types.Point(0, 0, 0)

# Gantry/frame (i.e. not pipette) config API
@property
def config(self) -> robot_configs.robot_config:
Expand Down Expand Up @@ -423,42 +441,41 @@ async def aspirate(self, mount: top_types.Mount, volume: float = None,
speed = rate * aspirate_speed
"""
this_pipette = self._attached_instruments[mount]
if 'name' not in this_pipette.keys():
if not this_pipette:
raise PipetteNotAttachedError("No pipette attached to {} mount"
.format(mount.name))

if volume is None:
asp_vol = this_pipette['max_volume'] - self._current_volume[mount]
asp_vol = this_pipette.available_volume
mod_log.debug(
"No aspirate volume defined. Aspirating up to pipette "
"max_volume ({}uL)".format(this_pipette['max_volume']))
"max_volume ({}uL)".format(this_pipette.config.max_volume))
else:
asp_vol = volume

assert self._current_volume[mount] + asp_vol \
<= this_pipette['max_volume'], \
assert this_pipette.ok_to_add_volume(asp_vol), \
"Cannot aspirate more than pipette max volume"
if asp_vol == 0:
return
# using a context generator to temporarily change pipette speed to a
# user specified rate, then switch back to default
with self._set_temp_pipette_speed(mount, 'aspirate', rate):
with self._set_temp_pipette_speed(this_pipette, 'aspirate', rate):
self._backend.set_active_current(
Axis.of_plunger(mount), this_pipette['plunger_current'])
target_position = {Axis.of_plunger(mount): self._plunger_position(
mount,
self._current_volume[mount] + asp_vol,
'aspirate')}
Axis.of_plunger(mount), this_pipette.config.plunger_current)
target_position = {
Axis.of_plunger(mount): self._plunger_position(
this_pipette,
this_pipette.current_volume + asp_vol,
'aspirate')}
try:
self._backend.move({ax.name: pos
for ax, pos in target_position.items()})
except Exception:
self._log.exception('Aspirate failed')
self._current_volume.clear()
this_pipette.set_current_volume(0)
raise
else:
self._current_position.update(target_position)
self._current_volume[mount] += asp_vol
this_pipette.add_current_volume(asp_vol)

@_log_call
async def dispense(self, mount: top_types.Mount, volume: float = None,
Expand All @@ -474,57 +491,55 @@ async def dispense(self, mount: top_types.Mount, volume: float = None,
speed = rate * dispense_speed
"""
this_pipette = self._attached_instruments[mount]
if 'name' not in this_pipette.keys():
if not this_pipette:
raise PipetteNotAttachedError("No pipette attached to {} mount"
.format(mount.name))
if volume is None:
disp_vol = self._current_volume[mount]
disp_vol = this_pipette.current_volume
mod_log.debug("No dispense volume specified. Dispensing all "
"remaining liquid ({}uL) from pipette".format
(disp_vol))
else:
disp_vol = volume
# Ensure we don't dispense more than the current volume
disp_vol = min(self._current_volume[mount], disp_vol)
disp_vol = min(this_pipette.current_volume, disp_vol)

if disp_vol == 0:
return
# using a context generator to temporarily change pipette speed to a
# user specified rate, then switch back to default
with self._set_temp_pipette_speed(mount, 'dispense', rate):
with self._set_temp_pipette_speed(this_pipette, 'dispense', rate):
self._backend.set_active_current(
Axis.of_plunger(mount), this_pipette['plunger_current'])
target_position = {Axis.of_plunger(mount): self._plunger_position(
mount,
self._current_volume[mount] - disp_vol,
'dispense')}
Axis.of_plunger(mount), this_pipette.config.plunger_current)
target_position = {
Axis.of_plunger(mount): self._plunger_position(
this_pipette,
this_pipette.current_volume - disp_vol,
'dispense')}
try:
self._backend.move({ax.name: pos
for ax, pos in target_position.items()})
except Exception:
self._log.exception('Dispense failed')
self._current_volume.clear()
this_pipette.set_current_volume(0)
raise
else:
self._current_position.update(target_position)
self._current_volume[mount] -= disp_vol
this_pipette.remove_current_volume(disp_vol)

def _plunger_position(self, mount: top_types.Mount, ul: float,
def _plunger_position(self, instr: Pipette, ul: float,
action: str) -> float:
mm = ul / self._ul_per_mm(mount, ul, action)
position = mm + self._attached_instruments[
mount]['plunger_positions']['bottom']
mm = ul / instr.ul_per_mm(ul, action)
position = mm + instr.config.plunger_positions['bottom']
return round(position, 6)

def _ul_per_mm(self, mount: top_types.Mount,
ul: float, action: str) -> float:
sequence = self._attached_instruments[mount]['ul_per_mm'][action]
return pipette_config.piecewise_volume_conversion(ul, sequence)

@contextmanager
def _set_temp_pipette_speed(self, mount, action, rate):
def _set_temp_pipette_speed(self,
instr: Pipette,
action: str,
rate: float):
action_str = '{}_flow_rate'.format(action)
saved_speed = self._attached_instruments[mount][action_str]
saved_speed = getattr(instr.config, action_str)
self._backend.set_pipette_speed(saved_speed * rate)
try:
yield
Expand All @@ -540,12 +555,18 @@ async def air_gap(self, mount, volume=None):
pass

@_log_call
async def pick_up_tip(self, mount, tip_length):
pass
async def pick_up_tip(self, mount):
instr = self._attached_instruments[mount]
assert instr
# TODO: Move commands to pick up tip(s)
instr.add_tip()

@_log_call
async def drop_tip(self, mount):
pass
instr = self._attached_instruments[mount]
assert instr
# TODO: Move commands to drop tip(s)
instr.remove_tip()

# Pipette config api
@_log_call
Expand All @@ -555,10 +576,14 @@ async def calibrate_plunger(

@_log_call
async def set_flow_rate(self, mount, aspirate=None, dispense=None):
this_pipette = self._attached_instruments[mount]
if not this_pipette:
raise PipetteNotAttachedError("No pipette attached to {} mount"
.format(mount))
if aspirate:
self._attached_instruments[mount]['aspirate_flow_rate'] = aspirate
this_pipette.update_config_item('aspirate_flow_rate', aspirate)
if dispense:
self._attached_instruments[mount]['dispense_flow_rate'] = dispense
this_pipette.update_config_item('dispense_float_rate', dispense)

@_log_call
# Used by pick_up_tip
Expand Down
Loading