Skip to content

Commit

Permalink
fix(api): Allow right mount Z to home/retract when a 96-chan pipette …
Browse files Browse the repository at this point in the history
…is attached (#13632)

* temporarily update Z_R run current to home/retract
  • Loading branch information
ahiuchingau authored Sep 25, 2023
1 parent 636c352 commit 0057362
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 20 deletions.
28 changes: 25 additions & 3 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,13 @@ async def update_firmware(
async for update in self._subsystem_manager.update_firmware(subsystems, force):
yield update

def get_current_settings(
self, gantry_load: GantryLoad
) -> OT3AxisMap[CurrentConfig]:
return get_current_settings(self._configuration.current_settings, gantry_load)

async def update_to_default_current_settings(self, gantry_load: GantryLoad) -> None:
self._current_settings = get_current_settings(
self._configuration.current_settings, gantry_load
)
self._current_settings = self.get_current_settings(gantry_load)
await self.set_default_currents()

async def update_motor_status(self) -> None:
Expand Down Expand Up @@ -938,6 +941,25 @@ async def restore_current(self) -> AsyncIterator[None]:
self._current_settings = old_current_settings
await self.set_default_currents()

@asynccontextmanager
async def restore_z_r_run_current(self) -> AsyncIterator[None]:
"""
Temporarily restore the active current ONLY when homing or
retracting the Z_R axis while the 96-channel is attached.
"""
assert self._current_settings
high_throughput_settings = deepcopy(self._current_settings)
conf = self.get_current_settings(GantryLoad.LOW_THROUGHPUT)[Axis.Z_R]
# outside of homing and retracting, Z_R run current should
# be reduced to its hold current
await self.set_active_current({Axis.Z_R: conf.run_current})
try:
yield
finally:
await self.set_active_current(
{Axis.Z_R: high_throughput_settings[Axis.Z_R].run_current}
)

@staticmethod
def _build_event_watcher() -> aionotify.Watcher:
watcher = aionotify.Watcher()
Expand Down
8 changes: 8 additions & 0 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,14 @@ async def restore_current(self) -> AsyncIterator[None]:
"""Save the current."""
yield

@asynccontextmanager
async def restore_z_r_run_current(self) -> AsyncIterator[None]:
"""
Temporarily restore the active current ONLY when homing or
retracting the Z_R axis while the 96-channel is attached.
"""
yield

@ensure_yield
async def watch(self, loop: asyncio.AbstractEventLoop) -> None:
new_mods_at_ports = [
Expand Down
39 changes: 22 additions & 17 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from concurrent.futures import Future
import contextlib
from functools import partial, lru_cache
from functools import partial, lru_cache, wraps
from dataclasses import replace
import logging
from copy import deepcopy
Expand All @@ -20,6 +20,7 @@
TypeVar,
Tuple,
Mapping,
Awaitable,
)
from opentrons.hardware_control.modules.module_calibration import (
ModuleCalibrationOffset,
Expand Down Expand Up @@ -161,6 +162,24 @@
Axis.Q,
)

Wrapped = TypeVar("Wrapped", bound=Callable[..., Awaitable[Any]])


def _adjust_high_throughput_z_current(func: Wrapped) -> Wrapped:
"""
A decorator that temproarily and conditionally changes the active current (based on the axis input)
before a function is executed and the cleans up afterwards
"""
# only home and retract should be wrappeed by this decorator
@wraps(func)
async def wrapper(self: Any, axis: Axis, *args: Any, **kwargs: Any) -> Any:
async with contextlib.AsyncExitStack() as stack:
if axis == Axis.Z_R and self.gantry_load == GantryLoad.HIGH_THROUGHPUT:
await stack.enter_async_context(self._backend.restore_z_r_run_current())
return await func(self, axis, *args, **kwargs)

return cast(Wrapped, wrapper)


class OT3API(
ExecutionManagerProvider,
Expand Down Expand Up @@ -1335,6 +1354,7 @@ async def _retrieve_home_position(
target_pos.update({axis: self._backend.home_position()[axis]})
return origin, target_pos

@_adjust_high_throughput_z_current
async def _home_axis(self, axis: Axis) -> None:
"""
Perform home; base on axis motor/encoder statuses, shorten homing time
Expand Down Expand Up @@ -1440,16 +1460,6 @@ async def home(
checked_axes = [ax for ax in Axis if ax != Axis.Q]
if self.gantry_load == GantryLoad.HIGH_THROUGHPUT:
checked_axes.append(Axis.Q)
# NOTE: Z_R current is dropped very low when 96CH attached,
# so trying to home it would cause timeout error
if not axes:
checked_axes.remove(Axis.Z_R)
elif Axis.Z_R in axes:
raise RuntimeError(
f"unable to home {Axis.Z_R.name} axis"
f"with {self.gantry_load.name} gantry load"
)

if skip:
checked_axes = [ax for ax in checked_axes if ax not in skip]
self._log.info(f"Homing {axes}")
Expand Down Expand Up @@ -1491,6 +1501,7 @@ async def retract(
await self.retract_axis(Axis.by_mount(mount))

@ExecutionManagerProvider.wait_for_running
@_adjust_high_throughput_z_current
async def retract_axis(self, axis: Axis) -> None:
"""
Move an axis to its home position, without engaging the limit switch,
Expand All @@ -1500,12 +1511,6 @@ async def retract_axis(self, axis: Axis) -> None:
the behaviors between the two robots similar, retract_axis on the FLEX
will call home if the stepper position is inaccurate.
"""
if self.gantry_load == GantryLoad.HIGH_THROUGHPUT and axis == Axis.Z_R:
raise RuntimeError(
f"unable to retract {Axis.Z_R.name} axis"
f"with {self.gantry_load.name} gantry load"
)

motor_ok = self._backend.check_motor_status([axis])
encoder_ok = self._backend.check_encoder_status([axis])

Expand Down

0 comments on commit 0057362

Please sign in to comment.