From 964f681a0b828db55f31309d47071ce5b2d3a7aa Mon Sep 17 00:00:00 2001 From: caila-marashaj Date: Tue, 6 Aug 2024 14:36:47 -0400 Subject: [PATCH] initial refactor --- api/src/opentrons/hardware_control/ot3api.py | 130 +++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/api/src/opentrons/hardware_control/ot3api.py b/api/src/opentrons/hardware_control/ot3api.py index 5f9c9840834..6e933e18548 100644 --- a/api/src/opentrons/hardware_control/ot3api.py +++ b/api/src/opentrons/hardware_control/ot3api.py @@ -2762,6 +2762,136 @@ async def liquid_probe( raise error return height + async def liquid_probe_refactored( + self, + mount: Union[top_types.Mount, OT3Mount], + max_z_dist: float, + probe_settings: Optional[LiquidProbeSettings] = None, + probe: Optional[InstrumentProbeType] = None, + force_both_sensors: bool = False, + ) -> float: + """Search for and return liquid level height. + + This function begins by moving the mount 2 mm upward to protect against a case where the tip starts right at a + liquid meniscus. + After this, the mount and plunger motors will move simultaneously while + reading from the pressure sensor. + + If the move is completed without the specified threshold being triggered, a + PipetteLiquidNotFoundError error will be thrown. + + Otherwise, the function will stop moving once the threshold is triggered, + and return the position of the + z axis in deck coordinates, as well as the encoder position, where + the liquid was found. + """ + + checked_mount = OT3Mount.from_mount(mount) + instrument = self._pipette_handler.get_pipette(checked_mount) + self._pipette_handler.ready_for_tip_action( + instrument, HardwareAction.LIQUID_PROBE, checked_mount + ) + + # We need to significantly slow down the 96 channel liquid probe + if self.gantry_load == GantryLoad.HIGH_THROUGHPUT: + max_plunger_speed = self.config.motion_settings.max_speed_discontinuity[ + GantryLoad.HIGH_THROUGHPUT + ][OT3AxisKind.P] + probe_settings.plunger_speed = min( + max_plunger_speed, probe_settings.plunger_speed + ) + + starting_position = await self.gantry_position(checked_mount, refresh=True) + if not probe_settings: + probe_settings = self.config.liquid_sense + + sensor_baseline_plunger_move_mm = ( + probe_settings.plunger_impulse_time * probe_settings.plunger_speed + ) + total_plunger_axis_mm = ( + instrument.plunger_positions.bottom - instrument.plunger_positions.top + ) + max_allowed_plunger_distance_mm = total_plunger_axis_mm - ( + instrument.backlash_distance + sensor_baseline_plunger_move_mm + ) + # height where probe action will begin + # TODO: (sigler) add this to pipette's liquid def (per tip) + z_overlap_between_passes_mm = 0.1 + sensor_baseline_z_move_mm = OT3API.liquid_probe_non_responsive_z_distance( + probe_settings.mount_speed + ) + z_offset_per_pass = sensor_baseline_z_move_mm + z_overlap_between_passes_mm + + # height that is considered safe to reset the plunger without disturbing liquid + # this usually needs to at least 1-2mm from liquid, to avoid splashes from air + # TODO: (sigler) add this to pipette's liquid def (per tip) + z_offset_for_plunger_prep = max(2.0, z_offset_per_pass) + + async def prep_plunger_for_probe_move() -> None: + # safe distance so we don't accidentally aspirate liquid if we're already close to liquid + mount_pos_for_plunger_prep = top_types.Point( + current_position.x, + current_position.y, + current_position.z + z_offset_for_plunger_prep, + ) + # Prep the plunger + await self.move_to(checked_mount, mount_pos_for_plunger_prep) + if probe_settings.aspirate_while_sensing: + # TODO(cm, 7/8/24): remove p_prep_speed from the rate at some point + await self._move_to_plunger_bottom(checked_mount, rate=1) + else: + await self._move_to_plunger_top_for_liquid_probe(checked_mount, rate=1) + + error: Optional[PipetteLiquidNotFoundError] = None + current_position = await self.gantry_position(checked_mount, refresh=True) + # starting_position.z + z_distance of pass - pos.z should be < max_z_dist + # due to rounding errors this can get caught in an infinite loop when the distance is almost equal + # so we check to see if they're within 0.01 which is 1/5th the minimum movement distance from move_utils.py + while (starting_position.z - current_position.z) < (max_z_dist + 0.01): + await prep_plunger_for_probe_move() + + # overlap amount we want to use between passes + pass_start_pos = top_types.Point( + current_position.x, + current_position.y, + current_position.z + z_offset_per_pass, + ) + + total_remaining_z_dist = pass_start_pos.z - ( + starting_position.z - max_z_dist + ) + finish_probe_move_duration = ( + total_remaining_z_dist / probe_settings.mount_speed + ) + finish_probe_plunger_distance_mm = ( + finish_probe_move_duration * probe_settings.plunger_speed + ) + plunger_travel_mm = min( + finish_probe_plunger_distance_mm, max_allowed_plunger_distance_mm + ) + try: + # move to where we want to start a pass and run a pass + await self.move_to(checked_mount, pass_start_pos) + height = await self._liquid_probe_pass( + checked_mount, + probe_settings, + probe if probe else InstrumentProbeType.PRIMARY, + plunger_travel_mm + sensor_baseline_plunger_move_mm, + ) + # if we made it here without an error we found the liquid + error = None + break + except PipetteLiquidNotFoundError as lnfe: + error = lnfe + current_position = await self.gantry_position(checked_mount, refresh=True) + await self.move_to(checked_mount, starting_position + top_types.Point(z=2)) + await self.prepare_for_aspirate(checked_mount) + await self.move_to(checked_mount, starting_position) + if error is not None: + # if we never found liquid raise an error + raise error + return height + async def capacitive_probe( self, mount: OT3Mount,