diff --git a/hardware-testing/hardware_testing/examples/jog_ot3.py b/hardware-testing/hardware_testing/examples/jog_ot3.py index 6fef151ab60..a077a36a413 100644 --- a/hardware-testing/hardware_testing/examples/jog_ot3.py +++ b/hardware-testing/hardware_testing/examples/jog_ot3.py @@ -1,6 +1,7 @@ """Test Jogging.""" import argparse import asyncio +from typing import Optional from opentrons.hardware_control.ot3api import OT3API @@ -60,11 +61,13 @@ async def _exercise_gripper(api: OT3API) -> None: print(f"unexpected input: {inp}") -async def _main(is_simulating: bool, mount: types.OT3Mount) -> None: +async def _main( + is_simulating: bool, mount: types.OT3Mount, speed: Optional[float] +) -> None: api = await helpers_ot3.build_async_ot3_hardware_api(is_simulating=is_simulating) await api.home() while True: - await helpers_ot3.jog_mount_ot3(api, mount) + await helpers_ot3.jog_mount_ot3(api, mount, speed=speed) if mount == types.OT3Mount.GRIPPER: await _exercise_gripper(api) else: @@ -82,6 +85,7 @@ async def _main(is_simulating: bool, mount: types.OT3Mount) -> None: parser.add_argument( "--mount", type=str, choices=list(mount_options.keys()), default="left" ) + parser.add_argument("--speed", type=float) args = parser.parse_args() _mount = mount_options[args.mount] - asyncio.run(_main(args.simulate, _mount)) + asyncio.run(_main(args.simulate, _mount, args.speed)) diff --git a/hardware-testing/hardware_testing/gravimetric/config.py b/hardware-testing/hardware_testing/gravimetric/config.py index a86f2853247..26b1e4b71ab 100644 --- a/hardware-testing/hardware_testing/gravimetric/config.py +++ b/hardware-testing/hardware_testing/gravimetric/config.py @@ -57,6 +57,7 @@ class PhotometricConfig: NUM_BLANK_TRIALS: Final = 3 NUM_MIXES_BEFORE_ASPIRATE = 5 +SCALE_SECONDS_TO_TRUE_STABILIZE = 30 LOW_VOLUME_UPPER_LIMIT_UL: Final = 2.0 diff --git a/hardware-testing/hardware_testing/gravimetric/execute.py b/hardware-testing/hardware_testing/gravimetric/execute.py index 3a6c6a6b6ce..cb356b9e924 100644 --- a/hardware-testing/hardware_testing/gravimetric/execute.py +++ b/hardware-testing/hardware_testing/gravimetric/execute.py @@ -9,7 +9,7 @@ from hardware_testing.data import create_run_id_and_start_time, ui, get_git_description from hardware_testing.data.csv_report import CSVReport -from hardware_testing.opentrons_api.types import OT3Mount, Point, OT3Axis +from hardware_testing.opentrons_api.types import OT3Mount, Point from hardware_testing.opentrons_api.helpers_ot3 import clear_pipette_ul_per_mm from . import report @@ -160,7 +160,10 @@ def _load_pipette( if cfg.pipette_channels not in load_str_channels: raise ValueError(f"unexpected number of channels: {cfg.pipette_channels}") chnl_str = load_str_channels[cfg.pipette_channels] - pip_name = f"p{cfg.pipette_volume}_{chnl_str}_gen3" + if cfg.pipette_channels == 96: + pip_name = "p1000_96" + else: + pip_name = f"p{cfg.pipette_volume}_{chnl_str}_gen3" print(f'pipette "{pip_name}" on mount "{cfg.pipette_mount}"') pipette = ctx.load_instrument(pip_name, cfg.pipette_mount) assert pipette.channels == cfg.pipette_channels, ( @@ -210,16 +213,26 @@ def _load_labware( labware_on_scale = ctx.load_labware( cfg.labware_on_scale, location=cfg.slot_scale ) - tiprack_load_settings: List[Tuple[int, str]] = [ + if cfg.pipette_channels == 96: + tiprack_namespace = "custom_beta" + tiprack_loadname = f"opentrons_flex_96_tiprack_{cfg.tip_volume}ul_adp" + else: + tiprack_namespace = "opentrons" + tiprack_loadname = f"opentrons_flex_96_tiprack_{cfg.tip_volume}ul" + tiprack_load_settings: List[Tuple[int, str, str]] = [ ( slot, - f"opentrons_flex_96_tiprack_{cfg.tip_volume}ul", + tiprack_loadname, + tiprack_namespace, ) for slot in cfg.slots_tiprack ] for ls in tiprack_load_settings: - print(f'Loading tiprack "{ls[1]}" in slot #{ls[0]}') - tipracks = [ctx.load_labware(ls[1], location=ls[0]) for ls in tiprack_load_settings] + print(f'Loading tiprack "{ls[1]}" in slot #{ls[0]} with namespace "{ls[2]}"') + tipracks = [ + ctx.load_labware(ls[1], location=ls[0], namespace=ls[2]) + for ls in tiprack_load_settings + ] _apply_labware_offsets(cfg, tipracks, labware_on_scale) return labware_on_scale, tipracks @@ -442,10 +455,8 @@ def _pick_up_tip( ) -def _drop_tip( - ctx: ProtocolContext, pipette: InstrumentContext, cfg: config.GravimetricConfig -) -> None: - if cfg.return_tip: +def _drop_tip(pipette: InstrumentContext, return_tip: bool) -> None: + if return_tip: pipette.return_tip(home_after=False) else: pipette.drop_tip(home_after=False) @@ -466,6 +477,20 @@ def _get_channel_divider(cfg: config.GravimetricConfig) -> float: return float(cfg.pipette_channels) +def _get_tag_from_pipette( + pipette: InstrumentContext, cfg: config.GravimetricConfig +) -> str: + pipette_tag = get_pipette_unique_name(pipette) + print(f'found pipette "{pipette_tag}"') + if cfg.increment: + pipette_tag += "-increment" + elif cfg.user_volumes: + pipette_tag += "-user-volume" + else: + pipette_tag += "-qc" + return pipette_tag + + def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: """Run.""" run_id, start_time = create_run_id_and_start_time() @@ -477,20 +502,14 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: ui.print_header("LOAD PIPETTE") pipette = _load_pipette(ctx, cfg) - pipette_tag = get_pipette_unique_name(pipette) - print(f'found pipette "{pipette_tag}"') - if cfg.increment: - pipette_tag += "-increment" - elif cfg.user_volumes: - pipette_tag += "-user-volume" - else: - pipette_tag += "-qc" + pipette_tag = _get_tag_from_pipette(pipette, cfg) ui.print_header("GET PARAMETERS") test_volumes = _get_volumes(ctx, cfg) for v in test_volumes: print(f"\t{v} uL") - tips = get_tips(ctx, pipette, all_channels=cfg.increment) + all_channels_same_time = cfg.increment or cfg.pipette_channels == 96 + tips = get_tips(ctx, pipette, all_channels=all_channels_same_time) total_tips = len([tip for chnl_tips in tips.values() for tip in chnl_tips]) channels_to_test = _get_test_channels(cfg) if len(channels_to_test) > 1: @@ -502,6 +521,9 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: trial_total <= total_tips ), f"more trials ({trial_total}) than tips ({total_tips})" + def _next_tip_for_channel(channel: int) -> Well: + return tips[channel].pop(0) + ui.print_header("LOAD SCALE") print( "Some Radwag settings cannot be controlled remotely.\n" @@ -545,45 +567,50 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: liquid="None", ) - ui.print_title("FIND LIQUID HEIGHT") - print("homing...") - ctx.home() - # get the first channel's first-used tip - # NOTE: note using list.pop(), b/c tip will be re-filled by operator, - # and so we can use pick-up-tip from there again - setup_tip = tips[0][0] - setup_channel_offset = _get_channel_offset(cfg, channel=0) - setup_tip_location = setup_tip.top().move(setup_channel_offset) - _pick_up_tip(ctx, pipette, cfg, location=setup_tip_location) - print("moving to vial") - well = labware_on_scale["A1"] - pipette.move_to(well.top()) - _liquid_height = _jog_to_find_liquid_height(ctx, pipette, well) - height_below_top = well.depth - _liquid_height - print(f"liquid is {height_below_top} mm below top of vial") - liquid_tracker.set_start_volume_from_liquid_height( - labware_on_scale["A1"], _liquid_height, name="Water" - ) - vial_volume = liquid_tracker.get_volume(well) - print(f"software thinks there is {vial_volume} uL of liquid in the vial") - print("dropping tip") - _drop_tip(ctx, pipette, cfg) - if not ctx.is_simulating(): - ui.get_user_ready("REPLACE first Tip with NEW Tip") - + calibration_tip_in_use = True try: + ui.print_title("FIND LIQUID HEIGHT") + print("homing...") + ctx.home() + pipette.home_plunger() + first_tip = tips[0][0] + setup_channel_offset = _get_channel_offset(cfg, channel=0) + first_tip_location = first_tip.top().move(setup_channel_offset) + _pick_up_tip(ctx, pipette, cfg, location=first_tip_location) + pipette.home() + if not ctx.is_simulating(): + ui.get_user_ready("REPLACE first tip with NEW TIP") + ui.get_user_ready("CLOSE the door, and MOVE AWAY from machine") + print("moving to scale") + well = labware_on_scale["A1"] + pipette.move_to(well.top()) + _liquid_height = _jog_to_find_liquid_height(ctx, pipette, well) + height_below_top = well.depth - _liquid_height + print(f"liquid is {height_below_top} mm below top of vial") + liquid_tracker.set_start_volume_from_liquid_height( + labware_on_scale["A1"], _liquid_height, name="Water" + ) + vial_volume = liquid_tracker.get_volume(well) + print(f"software thinks there is {vial_volume} uL of liquid in the vial") + if not cfg.blank or cfg.inspect: average_aspirate_evaporation_ul = 0.0 average_dispense_evaporation_ul = 0.0 else: ui.print_title("MEASURE EVAPORATION") print(f"running {config.NUM_BLANK_TRIALS}x blank measurements") + hover_pos = labware_on_scale["A1"].top().move(Point(z=50)) + pipette.move_to(hover_pos) + for i in range(config.SCALE_SECONDS_TO_TRUE_STABILIZE): + print( + f"wait {i + 1}/{config.SCALE_SECONDS_TO_TRUE_STABILIZE} seconds before" + f" measuring evaporation" + ) actual_asp_list_evap: List[float] = [] actual_disp_list_evap: List[float] = [] for trial in range(config.NUM_BLANK_TRIALS): ui.print_header(f"BLANK {trial + 1}/{config.NUM_BLANK_TRIALS}") - hover_above_setup_tip = setup_tip_location.move(Point(z=20)) - _pick_up_tip(ctx, pipette, cfg, location=hover_above_setup_tip) + pipette.move_to(hover_pos) evap_aspirate, _, evap_dispense, _ = _run_trial( ctx=ctx, pipette=pipette, @@ -610,8 +637,6 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: ) actual_asp_list_evap.append(evap_aspirate) actual_disp_list_evap.append(evap_dispense) - print("dropping tip") - _drop_tip(ctx, pipette, cfg) ui.print_header("EVAPORATION AVERAGE") average_aspirate_evaporation_ul = _calculate_average(actual_asp_list_evap) average_dispense_evaporation_ul = _calculate_average(actual_disp_list_evap) @@ -625,6 +650,9 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: average_aspirate_evaporation_ul, average_dispense_evaporation_ul, ) + print("dropping tip") + _drop_tip(pipette, return_tip=False) # always trash calibration tips + calibration_tip_in_use = False trial_count = 0 for volume in test_volumes: actual_asp_list_all = [] @@ -649,8 +677,9 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: f"{volume} uL channel {channel + 1} ({trial + 1}/{cfg.trials})" ) print(f"trial total {trial_count}/{trial_total}") - # remove it so it's not used again - next_tip: Well = tips[channel].pop(0) + # NOTE: always pick-up new tip for each trial + # b/c it seems tips heatup + next_tip: Well = _next_tip_for_channel(channel) next_tip_location = next_tip.top().move(channel_offset) _pick_up_tip(ctx, pipette, cfg, location=next_tip_location) ( @@ -711,7 +740,7 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: disp_with_evap, ) print("dropping tip") - _drop_tip(ctx, pipette, cfg) + _drop_tip(pipette, cfg.return_tip) ui.print_header(f"{volume} uL channel {channel + 1} CALCULATIONS") aspirate_average, aspirate_cv, aspirate_d = _calculate_stats( @@ -825,9 +854,21 @@ def run(ctx: ProtocolContext, cfg: config.GravimetricConfig) -> None: print("ending recording") recorder.stop() recorder.deactivate() - # FIXME: instead keep motors engaged, and move to an ATTACH position - hw_api = ctx._core.get_hardware() - hw_api.disengage_axes([OT3Axis.X, OT3Axis.Y]) # disengage xy axis + ui.print_title("CHANGE PIPETTES") + if pipette.has_tip: + if pipette.current_volume > 0: + print("dispensing liquid to trash") + trash = pipette.trash_container.wells()[0] + # FIXME: this should be a blow_out() at max volume, + # but that is not available through PyAPI yet + # so instead just dispensing. + pipette.dispense(pipette.current_volume, trash.top()) + pipette.aspirate(10) # to pull any droplets back up + print("dropping tip") + _return_tip = False if calibration_tip_in_use else cfg.return_tip + _drop_tip(pipette, _return_tip) + print("moving to attach position") + pipette.move_to(ctx.deck.position_for(5).move(Point(x=0, y=9 * 7, z=150))) ui.print_title("RESULTS") _print_final_results( volumes=test_volumes, diff --git a/hardware-testing/hardware_testing/gravimetric/execute_photometric.py b/hardware-testing/hardware_testing/gravimetric/execute_photometric.py index 4a984ddf003..a9b50cd5af4 100644 --- a/hardware-testing/hardware_testing/gravimetric/execute_photometric.py +++ b/hardware-testing/hardware_testing/gravimetric/execute_photometric.py @@ -460,6 +460,14 @@ def run(ctx: ProtocolContext, cfg: config.PhotometricConfig) -> None: test_volumes ) + def _next_tip() -> Well: + nonlocal tips + if not len(tips[0]): + if not ctx.is_simulating(): + ui.get_user_ready(f"replace TIPRACKS in slots {cfg.slots_tiprack}") + tips = get_tips(ctx, pipette) + return tips[0].pop(0) + assert ( trial_total <= total_tips ), f"more trials ({trial_total}) than tips ({total_tips})" @@ -490,12 +498,12 @@ def run(ctx: ProtocolContext, cfg: config.PhotometricConfig) -> None: print("homing...") ctx.home() + pipette.home_plunger() # get the first channel's first-used tip # NOTE: note using list.pop(), b/c tip will be re-filled by operator, # and so we can use pick-up-tip from there again try: trial_count = 0 - tip_iter = 0 for volume in test_volumes: ui.print_title(f"{volume} uL") do_jog = True @@ -505,7 +513,7 @@ def run(ctx: ProtocolContext, cfg: config.PhotometricConfig) -> None: print(f"trial total {trial_count}/{trial_total}") if not ctx.is_simulating(): ui.get_user_ready(f"put PLATE #{trial + 1} and remove SEAL") - next_tip: Well = tips[0][tip_iter] + next_tip: Well = _next_tip() next_tip_location = next_tip.top() _pick_up_tip(ctx, pipette, cfg, location=next_tip_location) @@ -527,15 +535,6 @@ def run(ctx: ProtocolContext, cfg: config.PhotometricConfig) -> None: mix=cfg.mix, stable=True, ) - tip_iter += 1 - if tip_iter >= len(tips[0]) and not ( - (trial + 1) == cfg.trials and volume == test_volumes[-1] - ): - if not ctx.is_simulating(): - ui.get_user_ready( - f"replace TIPRACKS in slots {cfg.slots_tiprack}" - ) - tip_iter = 0 if volume < 250: do_jog = False @@ -543,13 +542,14 @@ def run(ctx: ProtocolContext, cfg: config.PhotometricConfig) -> None: ui.print_title("CHANGE PIPETTES") if pipette.has_tip: if pipette.current_volume > 0: + print("dispensing liquid to trash") trash = pipette.trash_container.wells()[0] # FIXME: this should be a blow_out() at max volume, # but that is not available through PyAPI yet # so instead just dispensing. pipette.dispense(pipette.current_volume, trash.top()) pipette.aspirate(10) # to pull any droplets back up + print("dropping tip") _drop_tip(ctx, pipette, cfg) - ctx.home() - # move to attach point + print("moving to attach position") pipette.move_to(ctx.deck.position_for(5).move(Point(x=0, y=9 * 7, z=150))) diff --git a/hardware-testing/hardware_testing/gravimetric/liquid_class/defaults.py b/hardware-testing/hardware_testing/gravimetric/liquid_class/defaults.py index 4724a476c6c..f7437f6b956 100644 --- a/hardware-testing/hardware_testing/gravimetric/liquid_class/defaults.py +++ b/hardware-testing/hardware_testing/gravimetric/liquid_class/defaults.py @@ -733,6 +733,8 @@ def get_test_volumes(pipette: int, channels: int, tip: int) -> List[float]: return [5.0] elif tip == 200: return [200.0] + elif tip == 1000: + return [1000.0] else: raise ValueError(f"no volumes to test for tip size: {tip} uL") else: diff --git a/hardware-testing/hardware_testing/gravimetric/overrides/api.patch b/hardware-testing/hardware_testing/gravimetric/overrides/api.patch index 14bb806fa3d..4785484687c 100644 --- a/hardware-testing/hardware_testing/gravimetric/overrides/api.patch +++ b/hardware-testing/hardware_testing/gravimetric/overrides/api.patch @@ -79,23 +79,23 @@ index 3b3d96fdbf..ca9b0e045b 100644 @@ -1682,8 +1682,8 @@ class OT3API( # TODO: implement tip-detection sequence during pick-up-tip for 96ch, # but not with DVT pipettes because those can only detect drops - + - if self.gantry_load != GantryLoad.HIGH_THROUGHPUT: - await self._backend.get_tip_present(realmount, TipStateType.PRESENT) + # if self.gantry_load != GantryLoad.HIGH_THROUGHPUT: + # await self._backend.get_tip_present(realmount, TipStateType.PRESENT) - + _add_tip_to_instrs() - + @@ -1764,8 +1764,8 @@ class OT3API( ) - + # TODO: implement tip-detection sequence during drop-tip for 96ch - if self.gantry_load != GantryLoad.HIGH_THROUGHPUT: - await self._backend.get_tip_present(realmount, TipStateType.ABSENT) + # if self.gantry_load != GantryLoad.HIGH_THROUGHPUT: + # await self._backend.get_tip_present(realmount, TipStateType.ABSENT) - + # home mount axis if home_after: diff --git a/api/src/opentrons/protocol_api/core/legacy/deck.py b/api/src/opentrons/protocol_api/core/legacy/deck.py diff --git a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py index 1a220e15862..78eb545c0ad 100644 --- a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py +++ b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py @@ -548,17 +548,21 @@ def _jog_read_user_input(terminator: str, home_key: str) -> Tuple[str, float, bo async def _jog_axis_some_distance( - api: OT3API, mount: OT3Mount, axis: str, distance: float + api: OT3API, + mount: OT3Mount, + axis: str, + distance: float, + speed: Optional[float], ) -> None: if not axis or distance == 0.0: return elif axis == "G": await move_gripper_jaw_relative_ot3(api, distance) elif axis == "P": - await move_plunger_relative_ot3(api, mount, distance) + await move_plunger_relative_ot3(api, mount, distance, speed=speed) else: delta = Point(**{axis.lower(): distance}) - await api.move_rel(mount=mount, delta=delta) + await api.move_rel(mount=mount, delta=delta, speed=speed) async def _jog_print_current_position( @@ -590,6 +594,7 @@ async def _jog_do_print_then_input_then_move( distance: float, do_home: bool, display: Optional[bool] = True, + speed: Optional[float] = None, ) -> Tuple[str, float, bool]: try: if display: @@ -610,7 +615,7 @@ async def _jog_do_print_then_input_then_move( } await api.home([str_to_axes[axis]]) else: - await _jog_axis_some_distance(api, mount, axis, distance) + await _jog_axis_some_distance(api, mount, axis, distance, speed) return axis, distance, do_home @@ -619,6 +624,7 @@ async def jog_mount_ot3( mount: OT3Mount, critical_point: Optional[CriticalPoint] = None, display: Optional[bool] = True, + speed: Optional[float] = None, ) -> Dict[OT3Axis, float]: """Jog an OT3 mount's gantry XYZ and pipettes axes.""" if api.is_simulator: @@ -632,7 +638,14 @@ async def jog_mount_ot3( while True: try: axis, distance, do_home = await _jog_do_print_then_input_then_move( - api, mount, critical_point, axis, distance, do_home, display=display + api, + mount, + critical_point, + axis, + distance, + do_home, + display=display, + speed=speed, ) except ValueError as e: print(e)