Skip to content

Commit

Permalink
fix(api): Slow down prepare-for-aspirate following a blowout (#12412)
Browse files Browse the repository at this point in the history
  • Loading branch information
andySigler authored May 19, 2023
1 parent b6f086b commit 60fdec3
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 18 deletions.
81 changes: 63 additions & 18 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,11 +776,10 @@ async def home_plunger(self, mount: Union[top_types.Mount, OT3Mount]) -> None:
await self.home([OT3Axis.of_main_tool_actuator(checked_mount)])
instr = self._pipette_handler.hardware_instruments[checked_mount]
if instr:
target_pos = target_position_from_plunger(
checked_mount, instr.plunger_positions.bottom, self._current_position
)
self._log.info("Attempting to move the plunger to bottom.")
await self._move(target_pos, acquire_lock=False, home_flagged_axes=False)
await self._move_to_plunger_bottom(
checked_mount, rate=1.0, acquire_lock=False
)
await self.current_position_ot3(mount=checked_mount, refresh=True)

@lru_cache(1)
Expand Down Expand Up @@ -1385,31 +1384,77 @@ async def hold_jaw_width(self, jaw_width_mm: int) -> None:
await self._hold_jaw_width(jaw_width_mm)
self._gripper_handler.set_jaw_state(GripperJawState.HOLDING_CLOSED)

async def _move_to_plunger_bottom(
self, mount: OT3Mount, rate: float, acquire_lock: bool = True
) -> None:
"""
Move an instrument's plunger to its bottom position, while no liquids
are held by said instrument.
Possible events where this occurs:
1. After homing the plunger
2. Between a blow-out and an aspiration (eg: re-using tips)
Three possible physical tip states when this happens:
1. no tip on pipette
2. empty and dry (unused) tip on pipette
3. empty and wet (used) tip on pipette
With wet tips, the primary concern is leftover droplets inside the tip.
These droplets ideally only move down and out of the tip, not up into the tip.
Therefore, it is preferable to use the "blow-out" speed when moving the
plunger down, and the slower "aspirate" speed when moving the plunger up.
Assume all tips are wet, because we do not differentiate between wet/dry tips.
When no tip is attached, moving at the max speed is preferable, to save time.
"""
checked_mount = OT3Mount.from_mount(mount)
instrument = self._pipette_handler.get_pipette(checked_mount)
if instrument.current_volume > 0:
raise RuntimeError("cannot position plunger while holding liquid")
target_pos = target_position_from_plunger(
OT3Mount.from_mount(mount),
instrument.plunger_positions.bottom,
self._current_position,
)
pip_ax = OT3Axis.of_main_tool_actuator(mount)
current_pos = self._current_position[pip_ax]
if instrument.has_tip:
if current_pos > target_pos[pip_ax]:
# using slower aspirate flow-rate, to avoid pulling droplets up
speed = self._pipette_handler.plunger_speed(
instrument, instrument.aspirate_flow_rate, "aspirate"
)
else:
# use blow-out flow-rate, so we can push droplets out
speed = self._pipette_handler.plunger_speed(
instrument, instrument.blow_out_flow_rate, "dispense"
)
else:
# save time by using max speed
max_speeds = self.config.motion_settings.default_max_speed
speed = max_speeds[self.gantry_load][OT3AxisKind.P]
await self._move(
target_pos,
speed=(speed * rate),
acquire_lock=acquire_lock,
)

# Pipette action API
async def prepare_for_aspirate(
self, mount: Union[top_types.Mount, OT3Mount], rate: float = 1.0
) -> None:
"""Prepare the pipette for aspiration."""
checked_mount = OT3Mount.from_mount(mount)
instrument = self._pipette_handler.get_pipette(checked_mount)

self._pipette_handler.ready_for_tip_action(
instrument, HardwareAction.PREPARE_ASPIRATE
)

if instrument.current_volume == 0:
speed = self._pipette_handler.plunger_speed(
instrument, instrument.blow_out_flow_rate, "aspirate"
)
bottom = instrument.plunger_positions.bottom
target_pos = target_position_from_plunger(
OT3Mount.from_mount(mount), bottom, self._current_position
)
await self._move(
target_pos,
speed=(speed * rate),
home_flagged_axes=False,
)
await self._move_to_plunger_bottom(checked_mount, rate)
instrument.ready_to_aspirate = True

async def aspirate(
Expand Down
142 changes: 142 additions & 0 deletions api/tests/opentrons/hardware_control/test_ot3_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
OT3AttachedPipette,
GripperDict,
)
from opentrons.hardware_control.motion_utilities import target_position_from_plunger
from opentrons.hardware_control.instruments.ot3.gripper_handler import (
GripError,
GripperHandler,
Expand All @@ -36,6 +37,7 @@
from opentrons.hardware_control.types import (
OT3Mount,
OT3Axis,
OT3AxisKind,
CriticalPoint,
GripperProbe,
InstrumentProbeType,
Expand Down Expand Up @@ -133,6 +135,32 @@ def mock_home_plunger(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock
yield mock_move


@pytest.fixture
def mock_move_to_plunger_bottom(
ot3_hardware: ThreadManager[OT3API],
) -> Iterator[AsyncMock]:
with patch.object(
ot3_hardware.managed_obj,
"_move_to_plunger_bottom",
AsyncMock(
spec=ot3_hardware.managed_obj._move_to_plunger_bottom,
),
) as mock_move:
yield mock_move


@pytest.fixture
def mock_move(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]:
with patch.object(
ot3_hardware.managed_obj,
"_move",
AsyncMock(
spec=ot3_hardware.managed_obj._move,
),
) as mock_move:
yield mock_move


@pytest.fixture
def mock_gantry_position(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]:
with patch.object(
Expand Down Expand Up @@ -1023,6 +1051,120 @@ async def test_gripper_move_to(
]


async def test_home_plunger(
ot3_hardware: ThreadManager[OT3API],
mock_move_to_plunger_bottom: AsyncMock,
mock_home: AsyncMock,
):
mount = OT3Mount.LEFT
instr_data = OT3AttachedPipette(
config=ot3_pipette_config.load_ot3_pipette(
ot3_pipette_config.PipetteModelVersionType(
PipetteModelType("p1000"),
PipetteChannelType(1),
PipetteVersionType(3, 4),
)
),
id="fakepip",
)
await ot3_hardware.cache_pipette(mount, instr_data, None)
assert ot3_hardware.hardware_pipettes[mount.to_mount()]

await ot3_hardware.home_plunger(mount)
mock_home.assert_called_once()
mock_move_to_plunger_bottom.assert_called_once_with(mount, 1.0, False)


async def test_prepare_for_aspirate(
ot3_hardware: ThreadManager[OT3API],
mock_move_to_plunger_bottom: AsyncMock,
):
mount = OT3Mount.LEFT
instr_data = OT3AttachedPipette(
config=ot3_pipette_config.load_ot3_pipette(
ot3_pipette_config.PipetteModelVersionType(
PipetteModelType("p1000"),
PipetteChannelType(1),
PipetteVersionType(3, 4),
)
),
id="fakepip",
)
await ot3_hardware.cache_pipette(mount, instr_data, None)
assert ot3_hardware.hardware_pipettes[mount.to_mount()]

await ot3_hardware.add_tip(mount, 100)
await ot3_hardware.prepare_for_aspirate(OT3Mount.LEFT)
mock_move_to_plunger_bottom.assert_called_once_with(OT3Mount.LEFT, 1.0)


async def test_move_to_plunger_bottom(
ot3_hardware: ThreadManager[OT3API],
mock_move: AsyncMock,
):
mount = OT3Mount.LEFT
instr_data = OT3AttachedPipette(
config=ot3_pipette_config.load_ot3_pipette(
ot3_pipette_config.PipetteModelVersionType(
PipetteModelType("p1000"),
PipetteChannelType(1),
PipetteVersionType(3, 4),
)
),
id="fakepip",
)
await ot3_hardware.cache_pipette(mount, instr_data, None)
pipette = ot3_hardware.hardware_pipettes[mount.to_mount()]
assert pipette

max_speeds = ot3_hardware.config.motion_settings.default_max_speed
target_pos = target_position_from_plunger(
OT3Mount.from_mount(mount),
pipette.plunger_positions.bottom,
ot3_hardware._current_position,
)

# plunger will move at different speeds, depending on if:
# - no tip attached (max speed)
# - tip attached and moving down (blowout speed)
# - tip attached and moving up (aspirate speed)
expected_speed_no_tip = max_speeds[ot3_hardware.gantry_load][OT3AxisKind.P]
expected_speed_moving_down = ot3_hardware._pipette_handler.plunger_speed(
pipette, pipette.blow_out_flow_rate, "dispense"
)
expected_speed_moving_up = ot3_hardware._pipette_handler.plunger_speed(
pipette, pipette.aspirate_flow_rate, "aspirate"
)

# no tip attached
await ot3_hardware.home()
mock_move.reset_mock()
await ot3_hardware.home_plunger(mount)
mock_move.assert_called_once_with(
target_pos, speed=expected_speed_no_tip, acquire_lock=False
)

# tip attached, moving DOWN towards "bottom" position
await ot3_hardware.home()
await ot3_hardware.add_tip(mount, 100)
mock_move.reset_mock()
await ot3_hardware.prepare_for_aspirate(mount)
mock_move.assert_called_once_with(
target_pos, speed=expected_speed_moving_down, acquire_lock=True
)

# tip attached, moving UP towards "bottom" position
# NOTE: _move() is mocked, so we need to update the OT3API's
# cached coordinates in the test
pip_ax = OT3Axis.of_main_tool_actuator(mount)
ot3_hardware._current_position[pip_ax] = target_pos[pip_ax] + 1
mock_move.reset_mock()
await ot3_hardware.prepare_for_aspirate(mount)
mock_move.assert_called_once_with(
target_pos, speed=expected_speed_moving_up, acquire_lock=True
)


async def test_move_gripper_mount_without_gripper_attached(
ot3_hardware: ThreadManager[OT3API], mock_backend_move: AsyncMock
) -> None:
Expand Down

0 comments on commit 60fdec3

Please sign in to comment.