Skip to content

Commit

Permalink
refactor(api): add blow_out, pick_up_tip, drop_tip
Browse files Browse the repository at this point in the history
Closes #2483
  • Loading branch information
sanni-t committed Oct 29, 2018
1 parent a1cceee commit 52856db
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 84 deletions.
243 changes: 170 additions & 73 deletions api/src/opentrons/hardware_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from opentrons.util import linal
from .simulator import Simulator
from opentrons.config import robot_configs
from contextlib import contextmanager
from .pipette import Pipette
try:
from .controller import Controller
Expand All @@ -31,6 +30,7 @@


mod_log = logging.getLogger(__name__)
PICK_UP_SPEED = 30


def _log_call(func):
Expand All @@ -51,6 +51,9 @@ class PipetteNotAttachedError(KeyError):

_Backend = Union[Controller, Simulator]
Instruments = Dict[top_types.Mount, Optional[Pipette]]
SHAKE_OFF_TIPS_SPEED = 50
SHAKE_OFF_TIPS_DISTANCE = 2.25
DROP_TIP_RELEASE_DISTANCE = 20


class API:
Expand Down Expand Up @@ -179,8 +182,8 @@ async def cache_instruments(self,
if instrument_model else None
for mount, instrument_model
in found.items()}
mod_log.info("Instruments found: {}"
.format(self._attached_instruments))
self._log.info("Instruments found: {}"
.format(self._attached_instruments))

@property
def attached_instruments(self):
Expand Down Expand Up @@ -300,7 +303,8 @@ def current_position(self, mount: top_types.Mount) -> Dict[Axis, float]:

@_log_call
async def move_to(
self, mount: top_types.Mount, abs_position: top_types.Point):
self, mount: top_types.Mount, abs_position: top_types.Point,
speed: float = None):
""" Move the critical point of the specified mount to a location
relative to the deck.
Expand Down Expand Up @@ -329,10 +333,11 @@ async def move_to(
(Axis.Y, abs_position.y - offset.y - cp.y),
(z_axis, abs_position.z - offset.z - cp.z))
)
await self._move(target_position)
await self._move(target_position, speed=speed)

@_log_call
async def move_rel(self, mount: top_types.Mount, delta: top_types.Point):
async def move_rel(self, mount: top_types.Mount, delta: top_types.Point,
speed: float = None):
""" Move the critical point of the specified mount by a specified
displacement in a specified direction.
"""
Expand All @@ -350,19 +355,38 @@ async def move_rel(self, mount: top_types.Mount, delta: top_types.Point):
)
except KeyError:
raise MustHomeError
await self._move(target_position)
await self._move(target_position, speed=speed)

async def _move(self, target_position: 'OrderedDict[Axis, float]'):
async def _move_plunger(self, mount: top_types.Mount, dist: float,
speed: float = None):
z_axis = Axis.by_mount(mount)
pl_axis = Axis.of_plunger(mount)
all_axes_pos = OrderedDict(
((Axis.X,
self._current_position[Axis.X]),
(Axis.Y,
self._current_position[Axis.Y]),
(z_axis,
self._current_position[z_axis]),
(pl_axis, dist))
)
try:
await self._move(all_axes_pos, speed)
except KeyError:
raise MustHomeError

async def _move(self, target_position: 'OrderedDict[Axis, float]',
speed: float = None):
""" Worker function to apply robot motion.
Robot motion means the kind of motions that are relevant to the robot,
i.e. only one pipette plunger and mount move at the same time, and an
XYZ move in the coordinate frame of one of the pipettes.
``target_position`` should be an ordered dict (ordered by XYZABC)
containing any specified XY motion and at most one of a ZA or BC
components. The frame in which to move is identified by the presence of
(ZA) or (BC).
of deck calibrated values, containing any specified XY motion and
at most one of a ZA or BC components. The frame in which to move
is identified by the presence of (ZA) or (BC).
"""
# Transform only the x, y, and (z or a) axes specified since this could
# get the b or c axes as well
Expand Down Expand Up @@ -399,7 +423,7 @@ async def _move(self, target_position: 'OrderedDict[Axis, float]'):
if ax in Axis.gantry_axes():
smoothie_pos[ax.name] = transformed[idx]
try:
self._backend.move(smoothie_pos)
self._backend.move(smoothie_pos, speed=speed)
except Exception:
self._log.exception('Move failed')
self._current_position.clear()
Expand Down Expand Up @@ -479,32 +503,28 @@ async def aspirate(self, mount: top_types.Mount, volume: float = None,
"Cannot aspirate more than pipette max volume"
if asp_vol == 0:
return
# using a context generator to temporarily change pipette speed to a
# user specified rate, then switch back to default
with self._set_temp_pipette_speed(this_pipette, 'aspirate', rate):
self._backend.set_active_current(
Axis.of_plunger(mount), this_pipette.config.plunger_current)
target_position = {
Axis.of_plunger(mount): self._plunger_position(
this_pipette,
this_pipette.current_volume + asp_vol,
'aspirate')}
try:
self._backend.move({ax.name: pos
for ax, pos in target_position.items()})
except Exception:
self._log.exception('Aspirate failed')
this_pipette.set_current_volume(0)
raise
else:
self._current_position.update(target_position)
this_pipette.add_current_volume(asp_vol)

self._backend.set_active_current(
Axis.of_plunger(mount), this_pipette.config.plunger_current)
dist = self._plunger_position(
this_pipette,
this_pipette.current_volume + asp_vol,
'aspirate')
speed = this_pipette.config.aspirate_flow_rate * rate
try:
await self._move_plunger(mount, dist, speed=speed)
except Exception:
self._log.exception('Aspirate failed')
this_pipette.set_current_volume(0)
raise
else:
this_pipette.add_current_volume(asp_vol)

@_log_call
async def dispense(self, mount: top_types.Mount, volume: float = None,
rate: float = 1.0):
"""
Dispense a volume of liquid (in microliters/uL) using this pipette
Dispense a volume of liquid in microliters(uL) using this pipette
at the current location. If no volume is specified, `dispense` will
dispense all volume currently present in pipette
Expand All @@ -529,68 +549,145 @@ async def dispense(self, mount: top_types.Mount, volume: float = None,

if disp_vol == 0:
return
# using a context generator to temporarily change pipette speed to a
# user specified rate, then switch back to default
with self._set_temp_pipette_speed(this_pipette, 'dispense', rate):
self._backend.set_active_current(
Axis.of_plunger(mount), this_pipette.config.plunger_current)
target_position = {
Axis.of_plunger(mount): self._plunger_position(
this_pipette,
this_pipette.current_volume - disp_vol,
'dispense')}
try:
self._backend.move({ax.name: pos
for ax, pos in target_position.items()})
except Exception:
self._log.exception('Dispense failed')
this_pipette.set_current_volume(0)
raise
else:
self._current_position.update(target_position)
this_pipette.remove_current_volume(disp_vol)

self._backend.set_active_current(
Axis.of_plunger(mount), this_pipette.config.plunger_current)
dist = self._plunger_position(
this_pipette,
this_pipette.current_volume - disp_vol,
'dispense')
speed = this_pipette.config.dispense_flow_rate * rate
try:
await self._move_plunger(mount, dist, speed)
except Exception:
self._log.exception('Dispense failed')
this_pipette.set_current_volume(0)
raise
else:
this_pipette.remove_current_volume(disp_vol)

def _plunger_position(self, instr: Pipette, ul: float,
action: str) -> float:
mm = ul / instr.ul_per_mm(ul, action)
position = mm + instr.config.plunger_positions['bottom']
return round(position, 6)

@contextmanager
def _set_temp_pipette_speed(self,
instr: Pipette,
action: str,
rate: float):
action_str = '{}_flow_rate'.format(action)
saved_speed = getattr(instr.config, action_str)
self._backend.set_pipette_speed(saved_speed * rate)
try:
yield
finally:
self._backend.set_pipette_speed(saved_speed)

@_log_call
async def blow_out(self, mount):
pass
"""
Force any remaining liquid to dispense. The liquid will be dispensed at
the current location of pipette
"""
this_pipette = self._attached_instruments[mount]
if not this_pipette:
raise PipetteNotAttachedError("No pipette attached to {} mount"
.format(mount.name))

@_log_call
async def air_gap(self, mount, volume=None):
pass
self._backend.set_active_current(Axis.of_plunger(mount),
this_pipette.config.plunger_current)
try:
await self._move_plunger(
mount, this_pipette.config.plunger_positions['blow_out'])
except Exception:
self._log.exception('Blow out failed')
raise

@_log_call
async def pick_up_tip(self, mount):
async def pick_up_tip(self, mount, presses: int = 3, increment: float = 1):
"""
Pick up tip from current location
"""
instr = self._attached_instruments[mount]
assert instr
# TODO: Move commands to pick up tip(s)
assert not instr.has_tip, 'Tip already attached'
instr_ax = Axis.by_mount(mount)
plunger_ax = Axis.of_plunger(mount)
self._log.info('Picking up tip on {}'.format(instr.name))
# Initialize plunger to bottom position
self._backend.set_active_current(plunger_ax,
instr.config.plunger_current)
await self._move_plunger(
mount, instr.config.plunger_positions['bottom'])

# Press the nozzle into the tip <presses> number of times,
# moving further by <increment> mm after each press
for i in range(presses):
# move nozzle down into the tip
with self._backend.save_current():
self._backend.set_active_current(instr_ax,
instr.config.pick_up_current)
dist = -1 * instr.config.pick_up_distance + -1 * increment * i
target_pos = top_types.Point(0, 0, dist)
await self.move_rel(mount, target_pos, PICK_UP_SPEED)
# move nozzle back up
backup_pos = top_types.Point(0, 0, -dist)
await self.move_rel(mount, backup_pos)
instr.add_tip()
instr.set_current_volume(0)

# neighboring tips tend to get stuck in the space between
# the volume chamber and the drop-tip sleeve on p1000.
# This extra shake ensures those tips are removed
if 'needs-pickup-shake' in instr.config.quirks:
await self._shake_off_tips(mount)
await self._shake_off_tips(mount)

await self.retract(mount, instr.config.pick_up_distance)

@_log_call
async def drop_tip(self, mount):
"""
Drop tip at the current location
"""
instr = self._attached_instruments[mount]
assert instr
# TODO: Move commands to drop tip(s)
assert instr.has_tip, 'Cannot drop tip without a tip attached'
self._log.info("Dropping tip off from {}".format(instr.name))
plunger_ax = Axis.of_plunger(mount)
self._backend.set_active_current(plunger_ax,
instr.config.plunger_current)
await self._move_plunger(mount,
instr.config.plunger_positions['bottom'])
self._backend.set_active_current(plunger_ax,
instr.config.drop_tip_current)
await self._move_plunger(mount,
instr.config.plunger_positions['drop_tip'])
await self._shake_off_tips(mount)
await self._home_plunger_after_drop_tip(mount)
instr.set_current_volume(0)
instr.remove_tip()

async def _shake_off_tips(self, mount):
# tips don't always fall off, especially if resting against
# tiprack or other tips below it. To ensure the tip has fallen
# first, shake the pipette to dislodge partially-sealed tips,
# then second, raise the pipette so loosened tips have room to fall
shake_off_dist = SHAKE_OFF_TIPS_DISTANCE
# TODO: ensure the distance is not >25% the diameter of placeable
shake_pos = top_types.Point(0, 0, -shake_off_dist) # move left
await self.move_rel(mount, shake_pos, speed=SHAKE_OFF_TIPS_SPEED)
shake_pos = top_types.Point(0, 0, 2*shake_off_dist) # move right
await self.move_rel(mount, shake_pos, speed=SHAKE_OFF_TIPS_SPEED)
shake_pos = top_types.Point(0, 0, -shake_off_dist) # original position
await self.move_rel(mount, shake_pos, speed=SHAKE_OFF_TIPS_SPEED)
# raise the pipette upwards so we are sure tip has fallen off
up_pos = top_types.Point(0, 0, DROP_TIP_RELEASE_DISTANCE)
await self.move_rel(mount, up_pos)

async def _home_plunger_after_drop_tip(self, mount):
# incase plunger motor stalled while dropping a tip, add a
# safety margin of the distance between `bottom` and `drop_tip`
instr = self._attached_instruments[mount]
b = instr.config.plunger_positions['bottom']
d = instr.config.plunger_positions['drop_tip']
safety_margin = abs(b-d)
self._backend.set_active_current(Axis.of_plunger(mount),
instr.config.plunger_current)
await self._move_plunger(mount, safety_margin)
await self.home([Axis.of_plunger(mount)])
await self._move_plunger(mount,
instr.config.plunger_positions['bottom'])

# Pipette config api
@_log_call
async def calibrate_plunger(
Expand Down
Loading

0 comments on commit 52856db

Please sign in to comment.