Skip to content

Commit

Permalink
refactor(api): refactor aspirate/dispense
Browse files Browse the repository at this point in the history
Closes #2235
  • Loading branch information
sanni-t committed Oct 15, 2018
1 parent 63b2bae commit 5ee474f
Show file tree
Hide file tree
Showing 15 changed files with 223 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import json
from collections import namedtuple
from typing import List
from opentrons import __file__ as root_file


Expand Down Expand Up @@ -114,3 +115,24 @@ def load(pipette_model: str) -> pipette_config:
assert res.model_offset[2] == Z_OFFSET_P50

return res


def piecewise_volume_conversion(
ul: float, sequence: List[List[float]]) -> float:
"""
Takes a volume in microliters and a sequence representing a piecewise
function for the slope and y-intercept of a ul/mm function, where each
sub-list in the sequence contains:
- the max volume for the piece of the function (minimum implied from the
max of the previous item or 0
- the slope of the segment
- the y-intercept of the segment
:return: the ul/mm value for the specified volume
"""
# pick the first item from the seq for which the target is less than
# the bracketing element
i = list(filter(lambda x: ul <= x[0], sequence))[0]
# use that element to calculate the movement distance in mm
return i[1]*ul + i[2]
2 changes: 1 addition & 1 deletion api/src/opentrons/deck_calibration/endpoints.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from aiohttp import web
from uuid import uuid1
from opentrons.legacy_api.instruments import pipette_config
from opentrons.config import pipette_config
from opentrons import instruments, robot
from opentrons.legacy_api.robot import robot_configs
from opentrons.deck_calibration import jog, position, dots_set, z_pos
Expand Down
141 changes: 122 additions & 19 deletions api/src/opentrons/hardware_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
import functools
import logging
import enum
from typing import Any, Dict, Union, List, Optional, Tuple
from typing import Any, Dict, Union, List, Tuple
from opentrons import types
from .simulator import Simulator
from opentrons.config import pipette_config
try:
from .controller import Controller
except ModuleNotFoundError:
Expand All @@ -41,16 +42,25 @@ class _Axis(enum.Enum):
Y = enum.auto()
Z = enum.auto()
A = enum.auto()
B = enum.auto()
C = enum.auto()

@classmethod
def by_mount(cls, mount):
def pipette_by_mount(cls, mount):
bm = {types.Mount.LEFT: cls.Z, types.Mount.RIGHT: cls.A}
return bm[mount]

@classmethod
def plunger_by_mount(cls, mount):
pm = {types.Mount.LEFT: cls.B, types.Mount.RIGHT: cls.C}
return pm[mount]


class MustHomeError(RuntimeError):
pass

class PipetteNotAttachedError(KeyError):
pass

_Backend = Union[Controller, Simulator]

Expand Down Expand Up @@ -87,10 +97,12 @@ def __init__(self,
# {'X': 0.0, 'Y': 0.0, 'Z': 0.0, 'A': 0.0, 'B': 0.0, 'C': 0.0}
self._current_position: Dict[str, float] = {}

self._attached_instruments = {types.Mount.LEFT: None,
types.Mount.RIGHT: None}
self._attached_instruments: \
Dict[types.Mount, Dict[str, Any]] = {types.Mount.LEFT: {},
types.Mount.RIGHT: {}}
self._attached_modules: Dict[str, Any] = {}


@classmethod
def build_hardware_controller(
cls, config: dict = None,
Expand All @@ -110,7 +122,7 @@ def build_hardware_controller(
@classmethod
def build_hardware_simulator(
cls,
attached_instruments: Dict[types.Mount, Optional[str]] = None,
attached_instruments: Dict[types.Mount, Dict[str, Any]] = None,
attached_modules: List[str] = None,
config: dict = None,
loop: asyncio.AbstractEventLoop = None) -> 'API':
Expand All @@ -120,8 +132,9 @@ def build_hardware_simulator(
Multiple simulating hardware controllers may be active at one time.
"""
if None is attached_instruments:
attached_instruments = {types.Mount.LEFT: None,
types.Mount.RIGHT: None}
attached_instruments = {types.Mount.LEFT: {},
types.Mount.RIGHT: {}}

if None is attached_modules:
attached_modules = []
return cls(Simulator(attached_instruments,
Expand Down Expand Up @@ -158,11 +171,17 @@ async def identify(self, seconds):
pass

@_log_call
async def cache_instrument_models(self):
async def cache_instruments(self):
"""
- Get the attached instrument on each mount and
- Cache their pipette configs from pipette-config.json
"""
self._log.info("Updating instrument model cache")
for mount in types.Mount:
self._attached_instruments[mount] = \
self._backend.get_attached_instruments(mount)
instrument_model = self._backend.get_attached_instrument(mount)
if instrument_model:
configs = pipette_config.load(instrument_model)
self._attached_instruments[mount].update(configs._asdict())

@_log_call
async def update_smoothie_firmware(self, firmware_file):
Expand All @@ -181,6 +200,10 @@ async def resume(self):
async def halt(self):
pass

@_log_call
async def reset(self):
pass

# Gantry/frame (i.e. not pipette) action API
@_log_call
async def home(self, *args, **kwargs):
Expand All @@ -192,11 +215,13 @@ async def home_z(self):
pass

@_log_call
async def move_to(
self, mount: types.Mount, abs_position: types.Point):
async def move_to(self, mount: types.Mount, abs_position: types.Point):
"""
Move to absolute position (X, Y, Z)
"""
if not self._current_position:
raise MustHomeError
z_axis = _Axis.by_mount(mount)
z_axis = _Axis.pipette_by_mount(mount)
try:
target_position = {_Axis.X.name: abs_position.x,
_Axis.Y.name: abs_position.y,
Expand All @@ -207,9 +232,12 @@ async def move_to(

@_log_call
async def move_rel(self, mount: types.Mount, delta: types.Point):
"""
Move to relative position (dX, dY, dZ)
"""
if not self._current_position:
raise MustHomeError
z_axis = _Axis.by_mount(mount)
z_axis = _Axis.pipette_by_mount(mount)
try:
target_position = \
{_Axis.X.name: self._current_position[_Axis.X.name] + delta.x,
Expand All @@ -236,12 +264,83 @@ async def head_speed(self, combined_speed=None,

# Pipette action API
@_log_call
async def aspirate(self, mount, volume=None, rate=None):
pass
async def aspirate(self, mount: types.Mount, volume: float,
rate: float = 1.0):
"""
'volume' is the exact amount to be aspirated. Calculations expected to
be done by protocol API
"""
# TODO: is this useful?
assert volume <= self._attached_instruments[mount]['max_volume'], \
"Cannot aspirate more than pipette max volume"
if volume == 0:
return
try:
saved_speed = self._attached_instruments[mount]['aspirate_flow_rate']
speed = saved_speed * rate
self._backend.set_pipette_speed(speed)
self._backend.set_active_current(
_Axis.plunger_by_mount(mount),
self._attached_instruments[mount]['plunger_current'])
target_position = {
_Axis.plunger_by_mount(mount).name:
self._plunger_displacement(mount, volume, 'aspirate')}
await self._move(target_position)
self._backend.set_pipette_speed(saved_speed)
except KeyError:
raise PipetteNotAttachedError("No pipette attached to {} mount"
.format(mount.name))

@_log_call
async def dispense(self, mount, volume=None, rate=None):
pass
async def dispense(self, mount: types.Mount, volume: float,
rate: float = 1.0):
"""
'volume' is the exact amount to be aspirated. Calculations expected
to be done by protocol API
"""
if volume == 0:
return
try:
saved_speed = self._attached_instruments[
mount]['dispense_flow_rate']
speed = saved_speed * rate
self._backend.set_pipette_speed(speed)
self._backend.set_active_current(
_Axis.plunger_by_mount(mount),
self._attached_instruments[mount]['plunger_current'])
target_position = {
_Axis.plunger_by_mount(mount).name:
self._plunger_displacement(mount, volume, 'dispense')}
await self._move(target_position)
self._backend.set_pipette_speed(saved_speed)
except KeyError:
raise PipetteNotAttachedError("No pipette attached to {} mount"
.format(mount.name))

def _plunger_displacement(self, mount: types.Mount, ul: float,
action: str):
mm = ul / self._ul_per_mm(mount, ul, action)
displacement = mm + self._attached_instruments[
mount]['plunger_positions']['bottom']
return round(displacement, 6)

def _ul_per_mm(self, mount: types.Mount, ul: float, action: str) -> float:
sequence = self._attached_instruments[mount]['ul_per_mm'][action]
return pipette_config.piecewise_volume_conversion(ul, sequence)

@property
def attached_instruments(self):
configs = ['model', 'min_volume', 'max_volume',
'aspirate_flow_rate', 'dispense_flow_rate']
instruments = {types.Mount.LEFT: {},
types.Mount.RIGHT: {}}
for mount in types.Mount:
if not self._attached_instruments[mount]['model']:
continue
for key in configs:
instruments[mount][key] = \
self._attached_instruments[mount][key]
return instruments

@_log_call
async def blow_out(self, mount):
Expand All @@ -267,9 +366,13 @@ async def calibrate_plunger(

@_log_call
async def set_flow_rate(self, mount, aspirate=None, dispense=None):
pass
if aspirate:
self._attached_instruments[mount]['aspirate_flow_rate'] = aspirate
if dispense:
self._attached_instruments[mount]['dispense_flow_rate'] = dispense

@_log_call
# Used by pick_up_tip
async def set_pick_up_current(self, mount, amperes):
pass

Expand Down
11 changes: 9 additions & 2 deletions api/src/opentrons/hardware_control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,24 @@ def __init__(self, config, loop):
self._smoothie_driver = driver_3_0.SmoothieDriver_3_0_0(
config=self.config)
self._attached_modules = {}
self._smoothie_driver.connect()

def move(self, target_position: Dict[str, float], home_flagged_axes=True):
self._smoothie_driver.move(
target_position, home_flagged_axes=home_flagged_axes)

def home(self):
def home(self) -> Dict[str, float]:
return self._smoothie_driver.home()

def get_attached_instruments(self, mount):
def get_attached_instrument(self, mount):
return self._smoothie_driver.read_pipette_model(mount.name.lower())

def set_active_current(self, axis, amp):
self._smoothie_driver.set_active_current({axis.name: amp})

def set_pipette_speed(self, val: float):
self._smoothie_driver.set_speed(val)

def get_attached_modules(self) -> List[Tuple[str, str]]:
return modules.discover()

Expand Down
14 changes: 10 additions & 4 deletions api/src/opentrons/hardware_control/simulator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Dict, Optional, List, Tuple
from typing import Dict, Optional, List, Tuple, Any

from opentrons import types
from . import modules
Expand All @@ -11,7 +11,7 @@ class Simulator:
a robot with no smoothie connected.
"""
def __init__(self,
attached_instruments: Dict[types.Mount, Optional[str]],
attached_instruments: Dict[types.Mount, Dict[str, Any]],
attached_modules: List[str],
config, loop) -> None:
self._config = config
Expand All @@ -28,8 +28,14 @@ def home(self):
# driver_3_0-> HOMED_POSITION
return {'X': 418, 'Y': 353, 'Z': 218, 'A': 218, 'B': 19, 'C': 19}

def get_attached_instruments(self, mount):
return self._attached_instruments[mount]
def get_attached_instrument(self, mount):
return self._attached_instruments.get(mount).get('name')

def set_active_current(self, axis, amp):
pass

def set_pipette_speed(self, speed):
pass

def get_attached_modules(self) -> List[Tuple[str, str]]:
return self._attached_modules
Expand Down
2 changes: 1 addition & 1 deletion api/src/opentrons/legacy_api/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from . import robot, instruments as inst, containers as cnt, modules
from .instruments import pipette_config
from opentrons.config import pipette_config

# Ignore the type here because well, this is exactly why this is the legacy_api
robot = robot.Robot() # type: ignore
Expand Down
26 changes: 2 additions & 24 deletions api/src/opentrons/legacy_api/instruments/pipette.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import warnings
import logging
import time
from typing import List

from opentrons import commands
from ..containers import unpack_location
from ..containers.placeable import (
Container, Placeable, WellSeries
)
from opentrons.helpers import helpers
from opentrons.trackers import pose_tracker
from opentrons.config import pipette_config

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -1427,7 +1426,7 @@ def _ul_per_mm(self, ul: float, func: str) -> float:
:return: microliters/mm as a float
"""
sequence = self.ul_per_mm[func]
return piecewise_volume_conversion(ul, sequence)
return pipette_config.piecewise_volume_conversion(ul, sequence)

def _volume_percentage(self, volume):
"""Returns the plunger percentage for a given volume.
Expand Down Expand Up @@ -1774,24 +1773,3 @@ def _max_deck_height(self):
@property
def type(self):
return 'single' if self.channels == 1 else 'multi'


def piecewise_volume_conversion(
ul: float, sequence: List[List[float]]) -> float:
"""
Takes a volume in microliters and a sequence representing a piecewise
function for the slope and y-intercept of a ul/mm function, where each
sub-list in the sequence contains:
- the max volume for the piece of the function (minimum implied from the
max of the previous item or 0
- the slope of the segment
- the y-intercept of the segment
:return: the ul/mm value for the specified volume
"""
# pick the first item from the seq for which the target is less than
# the bracketing element
i = list(filter(lambda x: ul <= x[0], sequence))[0]
# use that element to calculate the movement distance in mm
return i[1]*ul + i[2]
Loading

0 comments on commit 5ee474f

Please sign in to comment.