Skip to content

Commit

Permalink
initial refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
caila-marashaj committed Aug 6, 2024
1 parent 343b8b6 commit 964f681
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2762,6 +2762,136 @@ async def liquid_probe(
raise error
return height

async def liquid_probe_refactored(
self,
mount: Union[top_types.Mount, OT3Mount],
max_z_dist: float,
probe_settings: Optional[LiquidProbeSettings] = None,
probe: Optional[InstrumentProbeType] = None,
force_both_sensors: bool = False,
) -> float:
"""Search for and return liquid level height.
This function begins by moving the mount 2 mm upward to protect against a case where the tip starts right at a
liquid meniscus.
After this, the mount and plunger motors will move simultaneously while
reading from the pressure sensor.
If the move is completed without the specified threshold being triggered, a
PipetteLiquidNotFoundError error will be thrown.
Otherwise, the function will stop moving once the threshold is triggered,
and return the position of the
z axis in deck coordinates, as well as the encoder position, where
the liquid was found.
"""

checked_mount = OT3Mount.from_mount(mount)
instrument = self._pipette_handler.get_pipette(checked_mount)
self._pipette_handler.ready_for_tip_action(
instrument, HardwareAction.LIQUID_PROBE, checked_mount
)

# We need to significantly slow down the 96 channel liquid probe
if self.gantry_load == GantryLoad.HIGH_THROUGHPUT:
max_plunger_speed = self.config.motion_settings.max_speed_discontinuity[
GantryLoad.HIGH_THROUGHPUT
][OT3AxisKind.P]
probe_settings.plunger_speed = min(
max_plunger_speed, probe_settings.plunger_speed
)

starting_position = await self.gantry_position(checked_mount, refresh=True)
if not probe_settings:
probe_settings = self.config.liquid_sense

sensor_baseline_plunger_move_mm = (
probe_settings.plunger_impulse_time * probe_settings.plunger_speed
)
total_plunger_axis_mm = (
instrument.plunger_positions.bottom - instrument.plunger_positions.top
)
max_allowed_plunger_distance_mm = total_plunger_axis_mm - (
instrument.backlash_distance + sensor_baseline_plunger_move_mm
)
# height where probe action will begin
# TODO: (sigler) add this to pipette's liquid def (per tip)
z_overlap_between_passes_mm = 0.1
sensor_baseline_z_move_mm = OT3API.liquid_probe_non_responsive_z_distance(
probe_settings.mount_speed
)
z_offset_per_pass = sensor_baseline_z_move_mm + z_overlap_between_passes_mm

# height that is considered safe to reset the plunger without disturbing liquid
# this usually needs to at least 1-2mm from liquid, to avoid splashes from air
# TODO: (sigler) add this to pipette's liquid def (per tip)
z_offset_for_plunger_prep = max(2.0, z_offset_per_pass)

async def prep_plunger_for_probe_move() -> None:
# safe distance so we don't accidentally aspirate liquid if we're already close to liquid
mount_pos_for_plunger_prep = top_types.Point(
current_position.x,
current_position.y,
current_position.z + z_offset_for_plunger_prep,
)
# Prep the plunger
await self.move_to(checked_mount, mount_pos_for_plunger_prep)
if probe_settings.aspirate_while_sensing:
# TODO(cm, 7/8/24): remove p_prep_speed from the rate at some point
await self._move_to_plunger_bottom(checked_mount, rate=1)
else:
await self._move_to_plunger_top_for_liquid_probe(checked_mount, rate=1)

error: Optional[PipetteLiquidNotFoundError] = None
current_position = await self.gantry_position(checked_mount, refresh=True)
# starting_position.z + z_distance of pass - pos.z should be < max_z_dist
# due to rounding errors this can get caught in an infinite loop when the distance is almost equal
# so we check to see if they're within 0.01 which is 1/5th the minimum movement distance from move_utils.py
while (starting_position.z - current_position.z) < (max_z_dist + 0.01):
await prep_plunger_for_probe_move()

# overlap amount we want to use between passes
pass_start_pos = top_types.Point(
current_position.x,
current_position.y,
current_position.z + z_offset_per_pass,
)

total_remaining_z_dist = pass_start_pos.z - (
starting_position.z - max_z_dist
)
finish_probe_move_duration = (
total_remaining_z_dist / probe_settings.mount_speed
)
finish_probe_plunger_distance_mm = (
finish_probe_move_duration * probe_settings.plunger_speed
)
plunger_travel_mm = min(
finish_probe_plunger_distance_mm, max_allowed_plunger_distance_mm
)
try:
# move to where we want to start a pass and run a pass
await self.move_to(checked_mount, pass_start_pos)
height = await self._liquid_probe_pass(
checked_mount,
probe_settings,
probe if probe else InstrumentProbeType.PRIMARY,
plunger_travel_mm + sensor_baseline_plunger_move_mm,
)
# if we made it here without an error we found the liquid
error = None
break
except PipetteLiquidNotFoundError as lnfe:
error = lnfe
current_position = await self.gantry_position(checked_mount, refresh=True)
await self.move_to(checked_mount, starting_position + top_types.Point(z=2))
await self.prepare_for_aspirate(checked_mount)
await self.move_to(checked_mount, starting_position)
if error is not None:
# if we never found liquid raise an error
raise error
return height

async def capacitive_probe(
self,
mount: OT3Mount,
Expand Down

0 comments on commit 964f681

Please sign in to comment.