Skip to content

Commit

Permalink
feat(api): Allow pipette pairing for aspirate and dispense functions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Laura-Danielle authored Aug 28, 2020
1 parent e3906f5 commit f1cbe81
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 101 deletions.
205 changes: 105 additions & 100 deletions api/src/opentrons/hardware_control/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,11 +1042,10 @@ async def _move_relative_n_axes(
(Axis.Y, self._current_position[Axis.Y] + target[1])))
mounts = self._mounts(mount)
secondary_z = None
for idx, m in enumerate(mounts):
for idx, m, in enumerate(mounts):
z = Axis.by_mount(m)
target_index = idx + 2
all_axes_pos[z] =\
self._current_position[z] + target[target_index]
t_index = idx + 2
all_axes_pos[z] = self._current_position[z] + target[t_index]
if idx == 1:
secondary_z = z
await self._move(all_axes_pos, speed=speed, secondary_z=secondary_z)
Expand Down Expand Up @@ -1256,7 +1255,8 @@ async def update_deck_calibration(self, new_transform):

# Pipette action API
async def prepare_for_aspirate(
self, mount: Union[top_types.Mount, ], rate: float = 1.0):
self, mount: Union[top_types.Mount, PipettePair],
rate: float = 1.0):
"""
Prepare the pipette for aspiration.
Expand All @@ -1274,21 +1274,20 @@ async def prepare_for_aspirate(
aspiration. To make the problem more obvious, :py:meth:`aspirate` will
raise an exception if this method has not previously been called.
"""
this_pipette = self._attached_instruments[mount]
if not this_pipette:
raise top_types.PipetteNotAttachedError(
"No pipette attached to {} mount".format(mount.name))
if this_pipette.current_volume == 0:
instruments = self._instruments_for(mount)
self._ready_for_tip_action(
instruments, HardwareAction.PREPARE_ASPIRATE)

with_zero = filter(lambda i: i[0].current_volume == 0, instruments)
for instr in with_zero:
speed = self._plunger_speed(
this_pipette,
this_pipette.blow_out_flow_rate,
'aspirate')
await self._move_plunger(
mount, (this_pipette.config.bottom, ),
speed=(speed*rate))
this_pipette.ready_to_aspirate = True
instr[0],
instr[0].blow_out_flow_rate, 'aspirate')
bottom = (instr[0].config.bottom, )
await self._move_plunger(instr[1], bottom, speed=(speed*rate))
instr[0].ready_to_aspirate = True

async def aspirate(self, mount: top_types.Mount,
async def aspirate(self, mount: Union[top_types.Mount, PipettePair],
volume: float = None, rate: float = 1.0):
"""
Aspirate a volume of liquid (in microliters/uL) using this pipette
Expand All @@ -1309,54 +1308,52 @@ async def aspirate(self, mount: top_types.Mount,
rate : [float] Set plunger speed for this aspirate, where
speed = rate * aspirate_speed
"""
this_pipette = self._attached_instruments[mount]
if not this_pipette:
raise top_types.PipetteNotAttachedError(
"No pipette attached to {} mount".format(mount.name))

if not this_pipette.has_tip:
raise NoTipAttachedError(
f'Aspirate is not allowed if there is no tip attached to the '
f'pipette. Please make sure that a tip is attached on the'
f'{mount.name.lower()} pipette before using liquid-handling '
f'commands')

if this_pipette.current_volume == 0\
and not this_pipette.ready_to_aspirate:
raise RuntimeError('Pipette not ready to aspirate')
instruments = self._instruments_for(mount)
self._ready_for_tip_action(instruments, HardwareAction.ASPIRATE)
plunger_currents = {
Axis.of_plunger(instr[1]): instr[0].config.plunger_current
for instr in instruments}

if volume is None:
asp_vol = this_pipette.available_volume
mod_log.debug(
"No aspirate volume defined. Aspirating up to pipette "
"max_volume ({}uL)".format(this_pipette.config.max_volume))
"No aspirate volume defined. Aspirating up to "
"max_volume for the pipette")
asp_vol = tuple(instr[0].available_volume for instr in instruments)
else:
asp_vol = volume
asp_vol = tuple(volume for instr in instruments)

assert this_pipette.ok_to_add_volume(asp_vol), \
"Cannot aspirate more than pipette max volume"
if asp_vol == 0:
if all([vol == 0 for vol in asp_vol]):
return

self._backend.set_active_current(
{Axis.of_plunger(mount): this_pipette.config.plunger_current})
dist = self._plunger_position(
this_pipette,
this_pipette.current_volume + asp_vol,
'aspirate')
flow_rate = this_pipette.aspirate_flow_rate * rate
speed = self._plunger_speed(this_pipette, flow_rate, 'aspirate')
elif 0 in asp_vol:
raise PairedPipetteConfigValueError(
"Cannot only aspirate from one pipette")

for instr, vol in zip(instruments, asp_vol):
assert instr[0].ok_to_add_volume(vol), \
"Cannot aspirate more than pipette max volume"

dist = tuple(self._plunger_position(
instr[0],
instr[0].current_volume + vol, 'aspirate')
for instr, vol in zip(instruments, asp_vol))
speed = min(
self._plunger_speed(
instr[0], instr[0].aspirate_flow_rate * rate, 'aspirate')
for instr in instruments)
try:
await self._move_plunger(mount, (dist, ), speed=speed)
self._backend.set_active_current(plunger_currents)
await self._move_plunger(mount, dist, speed=speed)
except Exception:
self._log.exception('Aspirate failed')
this_pipette.set_current_volume(0)
for instr in instruments:
instr[0].set_current_volume(0)
raise
else:
this_pipette.add_current_volume(asp_vol)
for instr, vol in zip(instruments, asp_vol):
instr[0].add_current_volume(vol)

async def dispense(self, mount: top_types.Mount, volume: float = None,
rate: float = 1.0):
async def dispense(self, mount: Union[top_types.Mount, PipettePair],
volume: float = None, rate: float = 1.0):
"""
Dispense a volume of liquid in microliters(uL) using this pipette
at the current location. If no volume is specified, `dispense` will
Expand All @@ -1367,47 +1364,50 @@ async def dispense(self, mount: top_types.Mount, volume: float = None,
rate : [float] Set plunger speed for this dispense, where
speed = rate * dispense_speed
"""
this_pipette = self._attached_instruments[mount]
if not this_pipette:
raise top_types.PipetteNotAttachedError(
"No pipette attached to {} mount".format(mount.name))

if not this_pipette.has_tip:
raise NoTipAttachedError(
f'Dispense is not allowed if there is no tip attached to the '
f'pipette. Please make sure that a tip is attached on the '
f'{mount.name.lower()} pipette before using liquid-handling '
f'commands')
instruments = self._instruments_for(mount)
self._ready_for_tip_action(instruments, HardwareAction.DISPENSE)

plunger_currents = {
Axis.of_plunger(instr[1]): instr[0].config.plunger_current
for instr in instruments}
if volume is None:
disp_vol = this_pipette.current_volume
disp_vol = tuple(instr[0].current_volume for instr in instruments)
mod_log.debug("No dispense volume specified. Dispensing all "
"remaining liquid ({}uL) from pipette".format
(disp_vol))
else:
disp_vol = volume
disp_vol = tuple(volume for instr in instruments)

# Ensure we don't dispense more than the current volume
disp_vol = min(this_pipette.current_volume, disp_vol)
disp_vol = tuple(
min(instr[0].current_volume, vol)
for instr, vol in zip(instruments, disp_vol))

if disp_vol == 0:
if all([vol == 0 for vol in disp_vol]):
return
elif 0 in disp_vol:
raise PairedPipetteConfigValueError(
"Cannot only dispense from one pipette")

dist = tuple(self._plunger_position(
instr[0],
instr[0].current_volume - vol, 'dispense')
for instr, vol in zip(instruments, disp_vol))
speed = min(self._plunger_speed(instr[0],
instr[0].dispense_flow_rate * rate, 'dispense')
for instr in instruments)

self._backend.set_active_current(
{Axis.of_plunger(mount): this_pipette.config.plunger_current})
dist = self._plunger_position(
this_pipette,
this_pipette.current_volume - disp_vol,
'dispense')
flow_rate = this_pipette.dispense_flow_rate * rate
speed = self._plunger_speed(this_pipette, flow_rate, 'dispense')
try:
await self._move_plunger(mount, (dist, ), speed=speed)
self._backend.set_active_current(plunger_currents)
await self._move_plunger(mount, dist, speed=speed)
except Exception:
self._log.exception('Dispense failed')
this_pipette.set_current_volume(0)
for instr in instruments:
instr[0].set_current_volume(0)
raise
else:
this_pipette.remove_current_volume(disp_vol)
for instr, vol in zip(instruments, disp_vol):
instr[0].remove_current_volume(vol)

def _plunger_position(self, instr: Pipette, ul: float,
action: 'UlPerMmAction') -> float:
Expand All @@ -1427,30 +1427,31 @@ def _plunger_flowrate(
ul_per_s = mm_per_s * instr.ul_per_mm(instr.config.max_volume, action)
return round(ul_per_s, 6)

async def blow_out(self, mount):
async def blow_out(self, mount: Union[top_types.Mount, PipettePair]):
"""
Force any remaining liquid to dispense. The liquid will be dispensed at
the current location of pipette
"""
this_pipette = self._attached_instruments[mount]
if not this_pipette:
raise top_types.PipetteNotAttachedError(
"No pipette attached to {} mount".format(mount.name))
instruments = self._instruments_for(mount)
self._ready_for_tip_action(instruments, HardwareAction.BLOWOUT)
plunger_currents = {
Axis.of_plunger(instr[1]): instr[0].config.plunger_current
for instr in instruments}
blow_out = tuple(instr[0].config.blow_out for instr in instruments)

self._backend.set_active_current({Axis.of_plunger(mount):
this_pipette.config.plunger_current})
speed = self._plunger_speed(
this_pipette, this_pipette.blow_out_flow_rate, 'dispense')
self._backend.set_active_current(plunger_currents)
speed = max(self._plunger_speed(
instr[0], instr[0].blow_out_flow_rate, 'dispense')
for instr in instruments)
try:
await self._move_plunger(
mount, (this_pipette.config.blow_out, ),
speed=speed)
await self._move_plunger(mount, blow_out, speed=speed)
except Exception:
self._log.exception('Blow out failed')
raise
finally:
this_pipette.set_current_volume(0)
this_pipette.ready_to_aspirate = False
for instr in instruments:
instr[0].set_current_volume(0)
instr[0].ready_to_aspirate = False

@overload
def _instruments_for(
Expand Down Expand Up @@ -1493,6 +1494,10 @@ def _ready_for_tip_action(
if not pipettes[0].has_tip:
raise NoTipAttachedError(
f'Cannot perform {action} without a tip attached')
if (action == HardwareAction.ASPIRATE and
pipettes[0].current_volume == 0 and
not pipettes[0].ready_to_aspirate):
raise RuntimeError('Pipette not ready to aspirate')
self._log.debug(f'{action} on {pipettes[0].name}')

async def pick_up_tip(self,
Expand Down Expand Up @@ -1543,10 +1548,10 @@ async def pick_up_tip(self,
checked_presses = presses

if not increment or increment < 0:
checked_increment = tuple(
check_incr = tuple(
instr[0].config.pick_up_increment for instr in instruments)
else:
checked_increment = (increment, )
check_incr = tuple(increment for instr in instruments)

pick_up_speed = min(
instr[0].config.pick_up_speed for instr in instruments)
Expand All @@ -1557,8 +1562,8 @@ async def pick_up_tip(self,
with self._backend.save_current():
self._backend.set_active_current(z_axis_currents)
dist = tuple(-1.0 * instr[0].config.pick_up_distance
+ -1.0 * checked_increment[idx] * i
for idx, instr in enumerate(instruments))
+ -1.0 * incrt * i
for instr, incrt in zip(instruments, check_incr))
target_pos = (0, 0, *dist)
await self._move_relative_n_axes(
mount, target_pos, pick_up_speed)
Expand All @@ -1581,8 +1586,8 @@ async def pick_up_tip(self,
# Here we add in the debounce distance for the switch as
# a safety precaution
retract_target = max(instr[0].config.pick_up_distance
+ checked_increment[idx] * checked_presses + 2
for idx, instr in enumerate(instruments))
+ incrt * checked_presses + 2
for instr, incrt in zip(instruments, check_incr))

await self.retract(mount, retract_target)

Expand Down
4 changes: 4 additions & 0 deletions api/src/opentrons/hardware_control/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ def secondary(self) -> top_types.Mount:

class HardwareAction(enum.Enum):
DROPTIP = enum.auto()
ASPIRATE = enum.auto()
DISPENSE = enum.auto()
BLOWOUT = enum.auto()
PREPARE_ASPIRATE = enum.auto()

def __str__(self):
return self.name
Expand Down
Loading

0 comments on commit f1cbe81

Please sign in to comment.