Skip to content

Commit

Permalink
feat(hardware-testing): 96ch working on scale (#13019)
Browse files Browse the repository at this point in the history
  • Loading branch information
andySigler authored Jun 30, 2023
1 parent c9e15b7 commit d02dc6f
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 81 deletions.
10 changes: 7 additions & 3 deletions hardware-testing/hardware_testing/examples/jog_ot3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Test Jogging."""
import argparse
import asyncio
from typing import Optional

from opentrons.hardware_control.ot3api import OT3API

Expand Down Expand Up @@ -60,11 +61,13 @@ async def _exercise_gripper(api: OT3API) -> None:
print(f"unexpected input: {inp}")


async def _main(is_simulating: bool, mount: types.OT3Mount) -> None:
async def _main(
is_simulating: bool, mount: types.OT3Mount, speed: Optional[float]
) -> None:
api = await helpers_ot3.build_async_ot3_hardware_api(is_simulating=is_simulating)
await api.home()
while True:
await helpers_ot3.jog_mount_ot3(api, mount)
await helpers_ot3.jog_mount_ot3(api, mount, speed=speed)
if mount == types.OT3Mount.GRIPPER:
await _exercise_gripper(api)
else:
Expand All @@ -82,6 +85,7 @@ async def _main(is_simulating: bool, mount: types.OT3Mount) -> None:
parser.add_argument(
"--mount", type=str, choices=list(mount_options.keys()), default="left"
)
parser.add_argument("--speed", type=float)
args = parser.parse_args()
_mount = mount_options[args.mount]
asyncio.run(_main(args.simulate, _mount))
asyncio.run(_main(args.simulate, _mount, args.speed))
1 change: 1 addition & 0 deletions hardware-testing/hardware_testing/gravimetric/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class PhotometricConfig:

NUM_BLANK_TRIALS: Final = 3
NUM_MIXES_BEFORE_ASPIRATE = 5
SCALE_SECONDS_TO_TRUE_STABILIZE = 30

LOW_VOLUME_UPPER_LIMIT_UL: Final = 2.0

Expand Down
151 changes: 96 additions & 55 deletions hardware-testing/hardware_testing/gravimetric/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from hardware_testing.data import create_run_id_and_start_time, ui, get_git_description
from hardware_testing.data.csv_report import CSVReport
from hardware_testing.opentrons_api.types import OT3Mount, Point, OT3Axis
from hardware_testing.opentrons_api.types import OT3Mount, Point
from hardware_testing.opentrons_api.helpers_ot3 import clear_pipette_ul_per_mm

from . import report
Expand Down Expand Up @@ -160,7 +160,10 @@ def _load_pipette(
if cfg.pipette_channels not in load_str_channels:
raise ValueError(f"unexpected number of channels: {cfg.pipette_channels}")
chnl_str = load_str_channels[cfg.pipette_channels]
pip_name = f"p{cfg.pipette_volume}_{chnl_str}_gen3"
if cfg.pipette_channels == 96:
pip_name = "p1000_96"
else:
pip_name = f"p{cfg.pipette_volume}_{chnl_str}_gen3"
print(f'pipette "{pip_name}" on mount "{cfg.pipette_mount}"')
pipette = ctx.load_instrument(pip_name, cfg.pipette_mount)
assert pipette.channels == cfg.pipette_channels, (
Expand Down Expand Up @@ -210,16 +213,26 @@ def _load_labware(
labware_on_scale = ctx.load_labware(
cfg.labware_on_scale, location=cfg.slot_scale
)
tiprack_load_settings: List[Tuple[int, str]] = [
if cfg.pipette_channels == 96:
tiprack_namespace = "custom_beta"
tiprack_loadname = f"opentrons_flex_96_tiprack_{cfg.tip_volume}ul_adp"
else:
tiprack_namespace = "opentrons"
tiprack_loadname = f"opentrons_flex_96_tiprack_{cfg.tip_volume}ul"
tiprack_load_settings: List[Tuple[int, str, str]] = [
(
slot,
f"opentrons_flex_96_tiprack_{cfg.tip_volume}ul",
tiprack_loadname,
tiprack_namespace,
)
for slot in cfg.slots_tiprack
]
for ls in tiprack_load_settings:
print(f'Loading tiprack "{ls[1]}" in slot #{ls[0]}')
tipracks = [ctx.load_labware(ls[1], location=ls[0]) for ls in tiprack_load_settings]
print(f'Loading tiprack "{ls[1]}" in slot #{ls[0]} with namespace "{ls[2]}"')
tipracks = [
ctx.load_labware(ls[1], location=ls[0], namespace=ls[2])
for ls in tiprack_load_settings
]
_apply_labware_offsets(cfg, tipracks, labware_on_scale)
return labware_on_scale, tipracks

Expand Down Expand Up @@ -442,10 +455,8 @@ def _pick_up_tip(
)


def _drop_tip(
ctx: ProtocolContext, pipette: InstrumentContext, cfg: config.GravimetricConfig
) -> None:
if cfg.return_tip:
def _drop_tip(pipette: InstrumentContext, return_tip: bool) -> None:
if return_tip:
pipette.return_tip(home_after=False)
else:
pipette.drop_tip(home_after=False)
Expand All @@ -466,6 +477,20 @@ def _get_channel_divider(cfg: config.GravimetricConfig) -> float:
return float(cfg.pipette_channels)


def _get_tag_from_pipette(
pipette: InstrumentContext, cfg: config.GravimetricConfig
) -> str:
pipette_tag = get_pipette_unique_name(pipette)
print(f'found pipette "{pipette_tag}"')
if cfg.increment:
pipette_tag += "-increment"
elif cfg.user_volumes:
pipette_tag += "-user-volume"
else:
pipette_tag += "-qc"
return pipette_tag


def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None:
"""Run."""
run_id, start_time = create_run_id_and_start_time()
Expand All @@ -477,20 +502,14 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None:

ui.print_header("LOAD PIPETTE")
pipette = _load_pipette(ctx, cfg)
pipette_tag = get_pipette_unique_name(pipette)
print(f'found pipette "{pipette_tag}"')
if cfg.increment:
pipette_tag += "-increment"
elif cfg.user_volumes:
pipette_tag += "-user-volume"
else:
pipette_tag += "-qc"
pipette_tag = _get_tag_from_pipette(pipette, cfg)

ui.print_header("GET PARAMETERS")
test_volumes = _get_volumes(ctx, cfg)
for v in test_volumes:
print(f"\t{v} uL")
tips = get_tips(ctx, pipette, all_channels=cfg.increment)
all_channels_same_time = cfg.increment or cfg.pipette_channels == 96
tips = get_tips(ctx, pipette, all_channels=all_channels_same_time)
total_tips = len([tip for chnl_tips in tips.values() for tip in chnl_tips])
channels_to_test = _get_test_channels(cfg)
if len(channels_to_test) > 1:
Expand All @@ -502,6 +521,9 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None:
trial_total <= total_tips
), f"more trials ({trial_total}) than tips ({total_tips})"

def _next_tip_for_channel(channel: int) -> Well:
return tips[channel].pop(0)

ui.print_header("LOAD SCALE")
print(
"Some Radwag settings cannot be controlled remotely.\n"
Expand Down Expand Up @@ -545,45 +567,50 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None:
liquid="None",
)

ui.print_title("FIND LIQUID HEIGHT")
print("homing...")
ctx.home()
# get the first channel's first-used tip
# NOTE: note using list.pop(), b/c tip will be re-filled by operator,
# and so we can use pick-up-tip from there again
setup_tip = tips[0][0]
setup_channel_offset = _get_channel_offset(cfg, channel=0)
setup_tip_location = setup_tip.top().move(setup_channel_offset)
_pick_up_tip(ctx, pipette, cfg, location=setup_tip_location)
print("moving to vial")
well = labware_on_scale["A1"]
pipette.move_to(well.top())
_liquid_height = _jog_to_find_liquid_height(ctx, pipette, well)
height_below_top = well.depth - _liquid_height
print(f"liquid is {height_below_top} mm below top of vial")
liquid_tracker.set_start_volume_from_liquid_height(
labware_on_scale["A1"], _liquid_height, name="Water"
)
vial_volume = liquid_tracker.get_volume(well)
print(f"software thinks there is {vial_volume} uL of liquid in the vial")
print("dropping tip")
_drop_tip(ctx, pipette, cfg)
if not ctx.is_simulating():
ui.get_user_ready("REPLACE first Tip with NEW Tip")

calibration_tip_in_use = True
try:
ui.print_title("FIND LIQUID HEIGHT")
print("homing...")
ctx.home()
pipette.home_plunger()
first_tip = tips[0][0]
setup_channel_offset = _get_channel_offset(cfg, channel=0)
first_tip_location = first_tip.top().move(setup_channel_offset)
_pick_up_tip(ctx, pipette, cfg, location=first_tip_location)
pipette.home()
if not ctx.is_simulating():
ui.get_user_ready("REPLACE first tip with NEW TIP")
ui.get_user_ready("CLOSE the door, and MOVE AWAY from machine")
print("moving to scale")
well = labware_on_scale["A1"]
pipette.move_to(well.top())
_liquid_height = _jog_to_find_liquid_height(ctx, pipette, well)
height_below_top = well.depth - _liquid_height
print(f"liquid is {height_below_top} mm below top of vial")
liquid_tracker.set_start_volume_from_liquid_height(
labware_on_scale["A1"], _liquid_height, name="Water"
)
vial_volume = liquid_tracker.get_volume(well)
print(f"software thinks there is {vial_volume} uL of liquid in the vial")

if not cfg.blank or cfg.inspect:
average_aspirate_evaporation_ul = 0.0
average_dispense_evaporation_ul = 0.0
else:
ui.print_title("MEASURE EVAPORATION")
print(f"running {config.NUM_BLANK_TRIALS}x blank measurements")
hover_pos = labware_on_scale["A1"].top().move(Point(z=50))
pipette.move_to(hover_pos)
for i in range(config.SCALE_SECONDS_TO_TRUE_STABILIZE):
print(
f"wait {i + 1}/{config.SCALE_SECONDS_TO_TRUE_STABILIZE} seconds before"
f" measuring evaporation"
)
actual_asp_list_evap: List[float] = []
actual_disp_list_evap: List[float] = []
for trial in range(config.NUM_BLANK_TRIALS):
ui.print_header(f"BLANK {trial + 1}/{config.NUM_BLANK_TRIALS}")
hover_above_setup_tip = setup_tip_location.move(Point(z=20))
_pick_up_tip(ctx, pipette, cfg, location=hover_above_setup_tip)
pipette.move_to(hover_pos)
evap_aspirate, _, evap_dispense, _ = _run_trial(
ctx=ctx,
pipette=pipette,
Expand All @@ -610,8 +637,6 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None:
)
actual_asp_list_evap.append(evap_aspirate)
actual_disp_list_evap.append(evap_dispense)
print("dropping tip")
_drop_tip(ctx, pipette, cfg)
ui.print_header("EVAPORATION AVERAGE")
average_aspirate_evaporation_ul = _calculate_average(actual_asp_list_evap)
average_dispense_evaporation_ul = _calculate_average(actual_disp_list_evap)
Expand All @@ -625,6 +650,9 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None:
average_aspirate_evaporation_ul,
average_dispense_evaporation_ul,
)
print("dropping tip")
_drop_tip(pipette, return_tip=False) # always trash calibration tips
calibration_tip_in_use = False
trial_count = 0
for volume in test_volumes:
actual_asp_list_all = []
Expand All @@ -649,8 +677,9 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None:
f"{volume} uL channel {channel + 1} ({trial + 1}/{cfg.trials})"
)
print(f"trial total {trial_count}/{trial_total}")
# remove it so it's not used again
next_tip: Well = tips[channel].pop(0)
# NOTE: always pick-up new tip for each trial
# b/c it seems tips heatup
next_tip: Well = _next_tip_for_channel(channel)
next_tip_location = next_tip.top().move(channel_offset)
_pick_up_tip(ctx, pipette, cfg, location=next_tip_location)
(
Expand Down Expand Up @@ -711,7 +740,7 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None:
disp_with_evap,
)
print("dropping tip")
_drop_tip(ctx, pipette, cfg)
_drop_tip(pipette, cfg.return_tip)

ui.print_header(f"{volume} uL channel {channel + 1} CALCULATIONS")
aspirate_average, aspirate_cv, aspirate_d = _calculate_stats(
Expand Down Expand Up @@ -825,9 +854,21 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None:
print("ending recording")
recorder.stop()
recorder.deactivate()
# FIXME: instead keep motors engaged, and move to an ATTACH position
hw_api = ctx._core.get_hardware()
hw_api.disengage_axes([OT3Axis.X, OT3Axis.Y]) # disengage xy axis
ui.print_title("CHANGE PIPETTES")
if pipette.has_tip:
if pipette.current_volume > 0:
print("dispensing liquid to trash")
trash = pipette.trash_container.wells()[0]
# FIXME: this should be a blow_out() at max volume,
# but that is not available through PyAPI yet
# so instead just dispensing.
pipette.dispense(pipette.current_volume, trash.top())
pipette.aspirate(10) # to pull any droplets back up
print("dropping tip")
_return_tip = False if calibration_tip_in_use else cfg.return_tip
_drop_tip(pipette, _return_tip)
print("moving to attach position")
pipette.move_to(ctx.deck.position_for(5).move(Point(x=0, y=9 * 7, z=150)))
ui.print_title("RESULTS")
_print_final_results(
volumes=test_volumes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,14 @@ def run(ctx: ProtocolContext, cfg: config.PhotometricConfig) -> None:
test_volumes
)

def _next_tip() -> Well:
nonlocal tips
if not len(tips[0]):
if not ctx.is_simulating():
ui.get_user_ready(f"replace TIPRACKS in slots {cfg.slots_tiprack}")
tips = get_tips(ctx, pipette)
return tips[0].pop(0)

assert (
trial_total <= total_tips
), f"more trials ({trial_total}) than tips ({total_tips})"
Expand Down Expand Up @@ -490,12 +498,12 @@ def run(ctx: ProtocolContext, cfg: config.PhotometricConfig) -> None:

print("homing...")
ctx.home()
pipette.home_plunger()
# get the first channel's first-used tip
# NOTE: note using list.pop(), b/c tip will be re-filled by operator,
# and so we can use pick-up-tip from there again
try:
trial_count = 0
tip_iter = 0
for volume in test_volumes:
ui.print_title(f"{volume} uL")
do_jog = True
Expand All @@ -505,7 +513,7 @@ def run(ctx: ProtocolContext, cfg: config.PhotometricConfig) -> None:
print(f"trial total {trial_count}/{trial_total}")
if not ctx.is_simulating():
ui.get_user_ready(f"put PLATE #{trial + 1} and remove SEAL")
next_tip: Well = tips[0][tip_iter]
next_tip: Well = _next_tip()
next_tip_location = next_tip.top()
_pick_up_tip(ctx, pipette, cfg, location=next_tip_location)

Expand All @@ -527,29 +535,21 @@ def run(ctx: ProtocolContext, cfg: config.PhotometricConfig) -> None:
mix=cfg.mix,
stable=True,
)
tip_iter += 1
if tip_iter >= len(tips[0]) and not (
(trial + 1) == cfg.trials and volume == test_volumes[-1]
):
if not ctx.is_simulating():
ui.get_user_ready(
f"replace TIPRACKS in slots {cfg.slots_tiprack}"
)
tip_iter = 0
if volume < 250:
do_jog = False

finally:
ui.print_title("CHANGE PIPETTES")
if pipette.has_tip:
if pipette.current_volume > 0:
print("dispensing liquid to trash")
trash = pipette.trash_container.wells()[0]
# FIXME: this should be a blow_out() at max volume,
# but that is not available through PyAPI yet
# so instead just dispensing.
pipette.dispense(pipette.current_volume, trash.top())
pipette.aspirate(10) # to pull any droplets back up
print("dropping tip")
_drop_tip(ctx, pipette, cfg)
ctx.home()
# move to attach point
print("moving to attach position")
pipette.move_to(ctx.deck.position_for(5).move(Point(x=0, y=9 * 7, z=150)))
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,8 @@ def get_test_volumes(pipette: int, channels: int, tip: int) -> List[float]:
return [5.0]
elif tip == 200:
return [200.0]
elif tip == 1000:
return [1000.0]
else:
raise ValueError(f"no volumes to test for tip size: {tip} uL")
else:
Expand Down
Loading

0 comments on commit d02dc6f

Please sign in to comment.