Skip to content

Commit

Permalink
refactor(api): refactor aspirate/dispense
Browse files Browse the repository at this point in the history
Closes #2235
  • Loading branch information
sanni-t committed Oct 15, 2018
1 parent 63b2bae commit 594aa0f
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import json
from collections import namedtuple
from typing import List
from opentrons import __file__ as root_file


Expand Down Expand Up @@ -114,3 +115,24 @@ def load(pipette_model: str) -> pipette_config:
assert res.model_offset[2] == Z_OFFSET_P50

return res


def piecewise_volume_conversion(
ul: float, sequence: List[List[float]]) -> float:
"""
Takes a volume in microliters and a sequence representing a piecewise
function for the slope and y-intercept of a ul/mm function, where each
sub-list in the sequence contains:
- the max volume for the piece of the function (minimum implied from the
max of the previous item or 0
- the slope of the segment
- the y-intercept of the segment
:return: the ul/mm value for the specified volume
"""
# pick the first item from the seq for which the target is less than
# the bracketing element
i = list(filter(lambda x: ul <= x[0], sequence))[0]
# use that element to calculate the movement distance in mm
return i[1]*ul + i[2]
2 changes: 1 addition & 1 deletion api/src/opentrons/deck_calibration/endpoints.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from aiohttp import web
from uuid import uuid1
from opentrons.legacy_api.instruments import pipette_config
from opentrons.config import pipette_config
from opentrons import instruments, robot
from opentrons.legacy_api.robot import robot_configs
from opentrons.deck_calibration import jog, position, dots_set, z_pos
Expand Down
130 changes: 111 additions & 19 deletions api/src/opentrons/hardware_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
import functools
import logging
import enum
from typing import Any, Dict, Union, List, Optional, Tuple
from typing import Any, Dict, Union, List, Tuple
from opentrons import types
from .simulator import Simulator
from opentrons.config import pipette_config
try:
from .controller import Controller
except ModuleNotFoundError:
Expand All @@ -41,12 +42,19 @@ class _Axis(enum.Enum):
Y = enum.auto()
Z = enum.auto()
A = enum.auto()
B = enum.auto()
C = enum.auto()

@classmethod
def by_mount(cls, mount):
def pipette_by_mount(cls, mount):
bm = {types.Mount.LEFT: cls.Z, types.Mount.RIGHT: cls.A}
return bm[mount]

@classmethod
def plunger_by_mount(cls, mount):
pm = {types.Mount.LEFT: cls.B, types.Mount.RIGHT: cls.C}
return pm[mount]


class MustHomeError(RuntimeError):
pass
Expand Down Expand Up @@ -87,8 +95,9 @@ def __init__(self,
# {'X': 0.0, 'Y': 0.0, 'Z': 0.0, 'A': 0.0, 'B': 0.0, 'C': 0.0}
self._current_position: Dict[str, float] = {}

self._attached_instruments = {types.Mount.LEFT: None,
types.Mount.RIGHT: None}
self._attached_instruments: \
Dict[types.Mount, Dict[str, Any]] = {types.Mount.LEFT: {},
types.Mount.RIGHT: {}}
self._attached_modules: Dict[str, Any] = {}

@classmethod
Expand All @@ -110,7 +119,7 @@ def build_hardware_controller(
@classmethod
def build_hardware_simulator(
cls,
attached_instruments: Dict[types.Mount, Optional[str]] = None,
attached_instruments: Dict[types.Mount, Dict[str, Any]] = None,
attached_modules: List[str] = None,
config: dict = None,
loop: asyncio.AbstractEventLoop = None) -> 'API':
Expand All @@ -120,8 +129,9 @@ def build_hardware_simulator(
Multiple simulating hardware controllers may be active at one time.
"""
if None is attached_instruments:
attached_instruments = {types.Mount.LEFT: None,
types.Mount.RIGHT: None}
attached_instruments = {types.Mount.LEFT: {},
types.Mount.RIGHT: {}}

if None is attached_modules:
attached_modules = []
return cls(Simulator(attached_instruments,
Expand Down Expand Up @@ -158,11 +168,17 @@ async def identify(self, seconds):
pass

@_log_call
async def cache_instrument_models(self):
async def cache_instruments(self):
"""
- Get the attached instrument on each mount and
- Cache their pipette configs from pipette-config.json
"""
self._log.info("Updating instrument model cache")
for mount in types.Mount:
self._attached_instruments[mount] = \
self._backend.get_attached_instruments(mount)
instrument_model = self._backend.get_attached_instrument(mount)
if instrument_model:
configs = pipette_config.load(instrument_model)
self._attached_instruments[mount].update(configs._asdict())

@_log_call
async def update_smoothie_firmware(self, firmware_file):
Expand All @@ -181,6 +197,10 @@ async def resume(self):
async def halt(self):
pass

@_log_call
async def reset(self):
pass

# Gantry/frame (i.e. not pipette) action API
@_log_call
async def home(self, *args, **kwargs):
Expand All @@ -192,11 +212,13 @@ async def home_z(self):
pass

@_log_call
async def move_to(
self, mount: types.Mount, abs_position: types.Point):
async def move_to(self, mount: types.Mount, abs_position: types.Point):
"""
Move to absolute position (X, Y, Z)
"""
if not self._current_position:
raise MustHomeError
z_axis = _Axis.by_mount(mount)
z_axis = _Axis.pipette_by_mount(mount)
try:
target_position = {_Axis.X.name: abs_position.x,
_Axis.Y.name: abs_position.y,
Expand All @@ -207,9 +229,12 @@ async def move_to(

@_log_call
async def move_rel(self, mount: types.Mount, delta: types.Point):
"""
Move to relative position (dX, dY, dZ)
"""
if not self._current_position:
raise MustHomeError
z_axis = _Axis.by_mount(mount)
z_axis = _Axis.pipette_by_mount(mount)
try:
target_position = \
{_Axis.X.name: self._current_position[_Axis.X.name] + delta.x,
Expand All @@ -236,12 +261,75 @@ async def head_speed(self, combined_speed=None,

# Pipette action API
@_log_call
async def aspirate(self, mount, volume=None, rate=None):
pass
async def aspirate(self, mount: types.Mount, volume: float,
rate: float = 1.0):
"""
'volume' is the exact amount to be aspirated. Calculations expected to
be done by protocol API
"""
assert volume <= self._attached_instruments[mount]['max_volume'], \
"Cannot aspirate more than pipette max volume"
if volume == 0:
return
saved_speed = self._attached_instruments[mount]['aspirate_flow_rate']
speed = saved_speed * rate
self._backend.set_pipette_speed(speed)
self._backend.set_active_current(
_Axis.plunger_by_mount(mount),
self._attached_instruments[mount]['plunger_current'])
target_position = {
_Axis.plunger_by_mount(mount).name:
self._plunger_displacement(mount, volume, 'aspirate')}
await self._move(target_position)
self._backend.set_pipette_speed(saved_speed)

@_log_call
async def dispense(self, mount, volume=None, rate=None):
pass
async def dispense(self, mount: types.Mount, volume: float,
rate: float = 1.0):
"""
'volume' is the exact amount to be aspirated. Calculations expected
to be done by protocol API
"""
if volume == 0:
return
saved_speed = self._attached_instruments[mount]['dispense_flow_rate']
speed = saved_speed * rate
self._backend.set_pipette_speed(speed)
self._backend.set_active_current(
_Axis.plunger_by_mount(mount),
self._attached_instruments[mount]['plunger_current'])
target_position = {
_Axis.plunger_by_mount(mount).name:
self._plunger_displacement(mount, volume, 'dispense')}
await self._move(target_position)
self._backend.set_pipette_speed(saved_speed)

def _plunger_displacement(self, mount: types.Mount, ul: float,
action: str):
mm = ul / self._ul_per_mm(mount, ul, action)
displacement = mm + self._attached_instruments[
mount]['plunger_positions']['bottom']
return round(displacement, 6)

def _ul_per_mm(self, mount: types.Mount, ul: float, action: str) -> float:
sequence = self._attached_instruments[mount]['ul_per_mm'][action]
return pipette_config.piecewise_volume_conversion(ul, sequence)

@_log_call
def pipette_min_vol(self, mount):
return self._attached_instruments[mount]['min_volume']

@_log_call
def pipette_max_vol(self, mount):
return self._attached_instruments[mount]['max_volume']

@_log_call
def aspirate_flow_rate(self, mount):
return self._attached_instruments[mount]['aspirate_flow_rate']

@_log_call
def dispense_flow_rate(self, mount):
return self._attached_instruments[mount]['dispense_flow_rate']

@_log_call
async def blow_out(self, mount):
Expand All @@ -267,9 +355,13 @@ async def calibrate_plunger(

@_log_call
async def set_flow_rate(self, mount, aspirate=None, dispense=None):
pass
if aspirate:
self._attached_instruments[mount]['aspirate_flow_rate'] = aspirate
if dispense:
self._attached_instruments[mount]['dispense_flow_rate'] = dispense

@_log_call
# Used by pick_up_tip
async def set_pick_up_current(self, mount, amperes):
pass

Expand Down
10 changes: 8 additions & 2 deletions api/src/opentrons/hardware_control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,18 @@ def move(self, target_position: Dict[str, float], home_flagged_axes=True):
self._smoothie_driver.move(
target_position, home_flagged_axes=home_flagged_axes)

def home(self):
def home(self) -> Dict[str, float]:
return self._smoothie_driver.home()

def get_attached_instruments(self, mount):
def get_attached_instrument(self, mount):
return self._smoothie_driver.read_pipette_model(mount.name.lower())

def set_active_current(self, axis, amp):
self._smoothie_driver.set_active_current({axis.name: amp})

def set_pipette_speed(self, val: float):
self._smoothie_driver.set_speed(val)

def get_attached_modules(self) -> List[Tuple[str, str]]:
return modules.discover()

Expand Down
14 changes: 10 additions & 4 deletions api/src/opentrons/hardware_control/simulator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Dict, Optional, List, Tuple
from typing import Dict, Optional, List, Tuple, Any

from opentrons import types
from . import modules
Expand All @@ -11,7 +11,7 @@ class Simulator:
a robot with no smoothie connected.
"""
def __init__(self,
attached_instruments: Dict[types.Mount, Optional[str]],
attached_instruments: Dict[types.Mount, Dict[str, Any]],
attached_modules: List[str],
config, loop) -> None:
self._config = config
Expand All @@ -28,8 +28,14 @@ def home(self):
# driver_3_0-> HOMED_POSITION
return {'X': 418, 'Y': 353, 'Z': 218, 'A': 218, 'B': 19, 'C': 19}

def get_attached_instruments(self, mount):
return self._attached_instruments[mount]
def get_attached_instrument(self, mount):
return self._attached_instruments.get(mount).get('name')

def set_active_current(self, axis, amp):
pass

def set_pipette_speed(self, speed):
pass

def get_attached_modules(self) -> List[Tuple[str, str]]:
return self._attached_modules
Expand Down
2 changes: 1 addition & 1 deletion api/src/opentrons/legacy_api/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from . import robot, instruments as inst, containers as cnt, modules
from .instruments import pipette_config
from opentrons.config import pipette_config

# Ignore the type here because well, this is exactly why this is the legacy_api
robot = robot.Robot() # type: ignore
Expand Down
26 changes: 2 additions & 24 deletions api/src/opentrons/legacy_api/instruments/pipette.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import warnings
import logging
import time
from typing import List

from opentrons import commands
from ..containers import unpack_location
from ..containers.placeable import (
Container, Placeable, WellSeries
)
from opentrons.helpers import helpers
from opentrons.trackers import pose_tracker
from opentrons.config import pipette_config

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -1427,7 +1426,7 @@ def _ul_per_mm(self, ul: float, func: str) -> float:
:return: microliters/mm as a float
"""
sequence = self.ul_per_mm[func]
return piecewise_volume_conversion(ul, sequence)
return pipette_config.piecewise_volume_conversion(ul, sequence)

def _volume_percentage(self, volume):
"""Returns the plunger percentage for a given volume.
Expand Down Expand Up @@ -1774,24 +1773,3 @@ def _max_deck_height(self):
@property
def type(self):
return 'single' if self.channels == 1 else 'multi'


def piecewise_volume_conversion(
ul: float, sequence: List[List[float]]) -> float:
"""
Takes a volume in microliters and a sequence representing a piecewise
function for the slope and y-intercept of a ul/mm function, where each
sub-list in the sequence contains:
- the max volume for the piece of the function (minimum implied from the
max of the previous item or 0
- the slope of the segment
- the y-intercept of the segment
:return: the ul/mm value for the specified volume
"""
# pick the first item from the seq for which the target is less than
# the bracketing element
i = list(filter(lambda x: ul <= x[0], sequence))[0]
# use that element to calculate the movement distance in mm
return i[1]*ul + i[2]
2 changes: 1 addition & 1 deletion api/src/opentrons/legacy_api/robot/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from opentrons.legacy_api.robot.mover import Mover
from opentrons.legacy_api.robot.robot_configs import load
from opentrons.legacy_api.containers import Container
from opentrons.legacy_api.instruments import pipette_config
from opentrons.config import pipette_config

log = logging.getLogger(__name__)

Expand Down
1 change: 1 addition & 0 deletions api/src/opentrons/legacy_api/robot/robot_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def _clear_file(filename):
os.remove(filename)


# TODO: move to util (write a default load, save JSON function)
def _load_json(filename) -> dict:
try:
with open(filename, 'r') as file:
Expand Down
Loading

0 comments on commit 594aa0f

Please sign in to comment.