Skip to content

Commit

Permalink
refactor(api): refactor aspirate/dispense
Browse files Browse the repository at this point in the history
Closes #2235
  • Loading branch information
sanni-t committed Oct 17, 2018
1 parent 63b2bae commit 6fbac60
Show file tree
Hide file tree
Showing 19 changed files with 347 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import json
from collections import namedtuple
from typing import List
from opentrons import __file__ as root_file


Expand Down Expand Up @@ -114,3 +115,24 @@ def load(pipette_model: str) -> pipette_config:
assert res.model_offset[2] == Z_OFFSET_P50

return res


def piecewise_volume_conversion(
ul: float, sequence: List[List[float]]) -> float:
"""
Takes a volume in microliters and a sequence representing a piecewise
function for the slope and y-intercept of a ul/mm function, where each
sub-list in the sequence contains:
- the max volume for the piece of the function (minimum implied from the
max of the previous item or 0
- the slope of the segment
- the y-intercept of the segment
:return: the ul/mm value for the specified volume
"""
# pick the first item from the seq for which the target is less than
# the bracketing element
i = list(filter(lambda x: ul <= x[0], sequence))[0]
# use that element to calculate the movement distance in mm
return i[1]*ul + i[2]
2 changes: 1 addition & 1 deletion api/src/opentrons/deck_calibration/endpoints.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from aiohttp import web
from uuid import uuid1
from opentrons.legacy_api.instruments import pipette_config
from opentrons.config import pipette_config
from opentrons import instruments, robot
from opentrons.legacy_api.robot import robot_configs
from opentrons.deck_calibration import jog, position, dots_set, z_pos
Expand Down
203 changes: 182 additions & 21 deletions api/src/opentrons/hardware_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import functools
import logging
import enum
from typing import Any, Dict, Union, List, Optional, Tuple
from contextlib import contextmanager
from typing import Any, Dict, Union, List, Tuple
from opentrons import types
from .simulator import Simulator
from opentrons.config import pipette_config
try:
from .controller import Controller
except ModuleNotFoundError:
Expand All @@ -41,17 +43,28 @@ class _Axis(enum.Enum):
Y = enum.auto()
Z = enum.auto()
A = enum.auto()
B = enum.auto()
C = enum.auto()

@classmethod
def by_mount(cls, mount):
def pipette_by_mount(cls, mount):
bm = {types.Mount.LEFT: cls.Z, types.Mount.RIGHT: cls.A}
return bm[mount]

@classmethod
def plunger_by_mount(cls, mount):
pm = {types.Mount.LEFT: cls.B, types.Mount.RIGHT: cls.C}
return pm[mount]


class MustHomeError(RuntimeError):
pass


class PipetteNotAttachedError(KeyError):
pass


_Backend = Union[Controller, Simulator]


Expand Down Expand Up @@ -87,8 +100,12 @@ def __init__(self,
# {'X': 0.0, 'Y': 0.0, 'Z': 0.0, 'A': 0.0, 'B': 0.0, 'C': 0.0}
self._current_position: Dict[str, float] = {}

self._attached_instruments = {types.Mount.LEFT: None,
types.Mount.RIGHT: None}
self._attached_instruments: \
Dict[types.Mount, Dict[str, Any]] = {types.Mount.LEFT: {},
types.Mount.RIGHT: {}}
self._current_volume: \
Dict[types.Mount, float] = {types.Mount.LEFT: 0,
types.Mount.RIGHT: 0}
self._attached_modules: Dict[str, Any] = {}

@classmethod
Expand All @@ -104,13 +121,14 @@ def build_hardware_controller(
if None is Controller:
raise RuntimeError(
'The hardware controller may only be instantiated on a robot')
return cls(Controller(config, loop),
config=config, loop=loop)
backend = Controller(config, loop)
backend._connect()
return cls(backend, config=config, loop=loop)

@classmethod
def build_hardware_simulator(
cls,
attached_instruments: Dict[types.Mount, Optional[str]] = None,
attached_instruments: Dict[types.Mount, Dict[str, Any]] = None,
attached_modules: List[str] = None,
config: dict = None,
loop: asyncio.AbstractEventLoop = None) -> 'API':
Expand All @@ -120,8 +138,9 @@ def build_hardware_simulator(
Multiple simulating hardware controllers may be active at one time.
"""
if None is attached_instruments:
attached_instruments = {types.Mount.LEFT: None,
types.Mount.RIGHT: None}
attached_instruments = {types.Mount.LEFT: {},
types.Mount.RIGHT: {}}

if None is attached_modules:
attached_modules = []
return cls(Simulator(attached_instruments,
Expand Down Expand Up @@ -158,11 +177,32 @@ async def identify(self, seconds):
pass

@_log_call
async def cache_instrument_models(self):
async def cache_instruments(self):
"""
- Get the attached instrument on each mount and
- Cache their pipette configs from pipette-config.json
"""
self._log.info("Updating instrument model cache")
for mount in types.Mount:
self._attached_instruments[mount] = \
self._backend.get_attached_instruments(mount)
instrument_model = self._backend.get_attached_instrument(mount)
if instrument_model:
configs = pipette_config.load(instrument_model)
self._attached_instruments[mount].update(configs._asdict())
mod_log.info("Instruments found:{}".format(self._attached_instruments))

@property
def attached_instruments(self):
configs = ['model', 'min_volume', 'max_volume',
'aspirate_flow_rate', 'dispense_flow_rate']
instruments = {types.Mount.LEFT: {},
types.Mount.RIGHT: {}}
for mount in types.Mount:
if not self._attached_instruments[mount].get('model'):
continue
for key in configs:
instruments[mount][key] = \
self._attached_instruments[mount][key]
return instruments

@_log_call
async def update_smoothie_firmware(self, firmware_file):
Expand All @@ -181,6 +221,10 @@ async def resume(self):
async def halt(self):
pass

@_log_call
async def reset(self):
pass

# Gantry/frame (i.e. not pipette) action API
@_log_call
async def home(self, *args, **kwargs):
Expand All @@ -192,11 +236,13 @@ async def home_z(self):
pass

@_log_call
async def move_to(
self, mount: types.Mount, abs_position: types.Point):
async def move_to(self, mount: types.Mount, abs_position: types.Point):
"""
Move to absolute position (X, Y, Z)
"""
if not self._current_position:
raise MustHomeError
z_axis = _Axis.by_mount(mount)
z_axis = _Axis.pipette_by_mount(mount)
try:
target_position = {_Axis.X.name: abs_position.x,
_Axis.Y.name: abs_position.y,
Expand All @@ -207,9 +253,12 @@ async def move_to(

@_log_call
async def move_rel(self, mount: types.Mount, delta: types.Point):
"""
Move to relative position (dX, dY, dZ)
"""
if not self._current_position:
raise MustHomeError
z_axis = _Axis.by_mount(mount)
z_axis = _Axis.pipette_by_mount(mount)
try:
target_position = \
{_Axis.X.name: self._current_position[_Axis.X.name] + delta.x,
Expand All @@ -236,12 +285,120 @@ async def head_speed(self, combined_speed=None,

# Pipette action API
@_log_call
async def aspirate(self, mount, volume=None, rate=None):
pass
async def aspirate(self, mount: types.Mount, volume: float = None,
rate: float = 1.0):
"""
Aspirate a volume of liquid (in microliters/uL) using this pipette
from the current location
Notes
-----
If no volume is passed, `aspirate` will default to max available volume
Parameters
----------
mount : Mount.LEFT or Mount.RIGHT
volume : [float] The number of microliters to aspirate
rate : [float] Set plunger speed for this aspirate, where
speed = rate * aspirate_speed
"""
try:
if volume is None:
checked_vol = self._attached_instruments[mount]['max_volume'] \
- self._current_volume[mount]
mod_log.debug(
"No aspirate volume defined. Aspirating up to pipette "
"max_volume ({}uL)".format
(self._attached_instruments[mount]['max_volume']))
else:
checked_vol = volume
assert self._current_volume[mount] + checked_vol \
<= self._attached_instruments[mount]['max_volume'], \
"Cannot aspirate more than pipette max volume"
if checked_vol == 0:
return
with self._set_temp_pipette_speed(mount, rate):
self._backend.set_active_current(
_Axis.plunger_by_mount(mount),
self._attached_instruments[mount]['plunger_current'])
target_position = {
_Axis.plunger_by_mount(mount).name:
self._plunger_position(
mount,
self._current_volume[mount] + checked_vol,
'aspirate')}
await self._move(target_position)
self._current_volume[mount] += checked_vol
except KeyError:
raise PipetteNotAttachedError("No pipette attached to {} mount"
.format(mount.name))

@_log_call
async def dispense(self, mount, volume=None, rate=None):
pass
async def dispense(self, mount: types.Mount, volume: float = None,
rate: float = 1.0):
"""
Dispense a volume of liquid (in microliters/uL) using this pipette
at the current location
Notes
-----
If no volume is passed, `dispense` will default to current volume in
pipette
Parameters
----------
mount : Mount.LEFT or Mount.RIGHT
volume : [float] The number of microliters to dispense
rate : [float] Set plunger speed for this dispense, where
speed = rate * dispense_speed
"""
if volume is None:
checked_vol = self._current_volume[mount]
mod_log.debug("No dispense volume specified. Dispensing all "
"remaining liquid ({}uL) from pipette".format
(checked_vol))
else:
checked_vol = volume
# Ensure we don't dispense more than the current volume
checked_vol = min(self._current_volume[mount], checked_vol)
try:
if checked_vol == 0:
return
with self._set_temp_pipette_speed(mount, rate):
self._backend.set_active_current(
_Axis.plunger_by_mount(mount),
self._attached_instruments[mount]['plunger_current'])
target_position = {
_Axis.plunger_by_mount(mount).name:
self._plunger_position(
mount,
self._current_volume[mount] - checked_vol,
'dispense')}
await self._move(target_position)
self._current_volume[mount] -= checked_vol
except KeyError:
raise PipetteNotAttachedError("No pipette attached to {} mount"
.format(mount.name))

def _plunger_position(self, mount: types.Mount, ul: float,
action: str) -> float:
mm = ul / self._ul_per_mm(mount, ul, action)
position = mm + self._attached_instruments[
mount]['plunger_positions']['bottom']
return round(position, 6)

def _ul_per_mm(self, mount: types.Mount, ul: float, action: str) -> float:
sequence = self._attached_instruments[mount]['ul_per_mm'][action]
return pipette_config.piecewise_volume_conversion(ul, sequence)

@contextmanager
def _set_temp_pipette_speed(self, mount, rate):
saved_speed = self._attached_instruments[mount]['dispense_flow_rate']
self._backend.set_pipette_speed(saved_speed * rate)
try:
yield
finally:
self._backend.set_pipette_speed(saved_speed)

@_log_call
async def blow_out(self, mount):
Expand All @@ -267,9 +424,13 @@ async def calibrate_plunger(

@_log_call
async def set_flow_rate(self, mount, aspirate=None, dispense=None):
pass
if aspirate:
self._attached_instruments[mount]['aspirate_flow_rate'] = aspirate
if dispense:
self._attached_instruments[mount]['dispense_flow_rate'] = dispense

@_log_call
# Used by pick_up_tip
async def set_pick_up_current(self, mount, amperes):
pass

Expand Down
13 changes: 11 additions & 2 deletions api/src/opentrons/hardware_control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,18 @@ def move(self, target_position: Dict[str, float], home_flagged_axes=True):
self._smoothie_driver.move(
target_position, home_flagged_axes=home_flagged_axes)

def home(self):
def home(self) -> Dict[str, float]:
return self._smoothie_driver.home()

def get_attached_instruments(self, mount):
def get_attached_instrument(self, mount):
return self._smoothie_driver.read_pipette_model(mount.name.lower())

def set_active_current(self, axis, amp):
self._smoothie_driver.set_active_current({axis.name: amp})

def set_pipette_speed(self, val: float):
self._smoothie_driver.set_speed(val)

def get_attached_modules(self) -> List[Tuple[str, str]]:
return modules.discover()

Expand All @@ -94,3 +100,6 @@ async def update_module(
-> modules.AbstractModule:
return await modules.update_firmware(
module, firmware_file, loop)

def _connect(self):
self._smoothie_driver.connect()
Loading

0 comments on commit 6fbac60

Please sign in to comment.