Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): refresh gripper jaw state from firmware #13506

Merged
merged 11 commits into from
Sep 11, 2023
9 changes: 9 additions & 0 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
LIMIT_SWITCH_OVERTRAVEL_DISTANCE,
map_pipette_type_to_sensor_id,
moving_axes_in_move_group,
gripper_jaw_state_from_fw,
)

try:
Expand Down Expand Up @@ -127,6 +128,7 @@
TipStateType,
FailedTipStateCheck,
EstopState,
GripperJawState,
)
from opentrons.hardware_control.errors import (
InvalidPipetteName,
Expand All @@ -152,6 +154,9 @@
set_deck_light,
get_deck_light_state,
)
from opentrons_hardware.hardware_control.gripper_settings import (
get_gripper_jaw_state,
)

from opentrons_hardware.drivers.gpio import OT3GPIO, RemoteOT3GPIO
from opentrons_shared_data.pipette.dev_types import PipetteName
Expand Down Expand Up @@ -732,6 +737,10 @@ async def gripper_home_jaw(self, duty_cycle: float) -> None:
positions = await runner.run(can_messenger=self._messenger)
self._handle_motor_status_response(positions)

async def get_jaw_state(self) -> GripperJawState:
res = await get_gripper_jaw_state(self._messenger)
return gripper_jaw_state_from_fw(res)

@staticmethod
def _lookup_serial_key(pipette_name: FirmwarePipetteName) -> str:
lookup_name = {
Expand Down
5 changes: 5 additions & 0 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
SubSystem,
SubSystemState,
TipStateType,
GripperJawState,
)
from opentrons_hardware.hardware_control.motion import MoveStopCondition
from opentrons_hardware.hardware_control import status_bar
Expand Down Expand Up @@ -391,6 +392,10 @@ async def get_tip_present_state(self, mount: OT3Mount) -> int:
"""Get the state of the tip ejector flag for a given mount."""
pass

async def get_jaw_state(self) -> GripperJawState:
"""Get the state of the gripper jaw."""
pass

async def tip_action(
self,
moves: Optional[List[Move[Axis]]] = None,
Expand Down
14 changes: 14 additions & 0 deletions api/src/opentrons/hardware_control/backends/ot3utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
PipetteSubType,
UpdateState,
UpdateStatus,
GripperJawState,
)
import numpy as np

Expand All @@ -25,6 +26,7 @@
SensorId,
PipetteTipActionType,
USBTarget,
GripperJawState as FirmwareGripperjawState,
)
from opentrons_hardware.firmware_update.types import FirmwareUpdateStatus, StatusElement
from opentrons_hardware.hardware_control import network
Expand Down Expand Up @@ -631,3 +633,15 @@ def update(
progress = int(progress * 100)
self._tracker[target] = UpdateStatus(subsystem, state, progress)
return set(self._tracker.values())


_gripper_jaw_state_lookup: Dict[FirmwareGripperjawState, GripperJawState] = {
FirmwareGripperjawState.unhomed: GripperJawState.UNHOMED,
FirmwareGripperjawState.force_controlling_home: GripperJawState.HOMED_READY,
FirmwareGripperjawState.force_controlling: GripperJawState.GRIPPING,
FirmwareGripperjawState.position_controlling: GripperJawState.HOLDING,
}


def gripper_jaw_state_from_fw(state: FirmwareGripperjawState) -> GripperJawState:
return _gripper_jaw_state_lookup[state]
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Optional
import logging
import math

from opentrons.types import Point
from .instrument_calibration import (
Expand Down Expand Up @@ -137,6 +138,14 @@ def check_ready_for_jaw_move(self) -> None:
if gripper.state == GripperJawState.UNHOMED:
raise GripError("Gripper jaw must be homed before moving")

def check_ready_for_jaw_home(self) -> None:
"""Raise an exception if it is not currently valid to home the jaw."""
gripper = self.get_gripper()
if gripper.state == GripperJawState.GRIPPING and not math.isclose(
gripper.jaw_width, gripper.geometry.jaw_width["min"], abs_tol=5.0
):
raise GripError("Gripper cannot home while it is gripping a labware")

def set_jaw_state(self, state: GripperJawState) -> None:
self.get_gripper().state = state

Expand Down
30 changes: 24 additions & 6 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
TipMotorPickUpTipSpec,
)
from .instruments.ot3.instrument_calibration import load_pipette_offset
from .instruments.ot3.gripper_handler import GripperHandler
from .instruments.ot3.gripper_handler import GripperHandler, GripError
from .instruments.ot3.instrument_calibration import (
load_gripper_calibration_offset,
)
Expand Down Expand Up @@ -746,7 +746,7 @@ async def halt(self, disengage_before_stopping: bool = False) -> None:
"""Immediately disengage all present motors and clear motor and module tasks."""
if disengage_before_stopping:
await self.disengage_axes(
[ax for ax in Axis if self._backend.axis_is_present(ax)]
[ax for ax in Axis if self._backend.axis_is_present(ax) if ax != Axis.G]
)
await self._stop_motors()

Expand Down Expand Up @@ -790,13 +790,17 @@ async def home_gripper_jaw(self) -> None:
try:
gripper = self._gripper_handler.get_gripper()
self._log.info("Homing gripper jaw.")
self._gripper_handler.check_ready_for_jaw_home()

dc = self._gripper_handler.get_duty_cycle_by_grip_force(
gripper.default_home_force
)
await self._ungrip(duty_cycle=dc)
gripper.state = GripperJawState.HOMED_READY
except GripperNotAttachedError:
pass
except GripError as e:
self._log.error("Could not home when gripper is actively gripping.")
raise e

async def home_plunger(self, mount: Union[top_types.Mount, OT3Mount]) -> None:
"""
Expand Down Expand Up @@ -860,6 +864,14 @@ async def refresh_positions(self) -> None:
await self._backend.update_motor_status()
await self._cache_current_position()
await self._cache_encoder_position()
await self._refresh_jaw_state()

async def _refresh_jaw_state(self) -> None:
try:
gripper = self._gripper_handler.get_gripper()
gripper.state = await self._backend.get_jaw_state()
except GripperNotAttachedError:
pass

async def _cache_current_position(self) -> Dict[Axis, float]:
"""Cache current position from backend and return in absolute deck coords."""
Expand Down Expand Up @@ -1449,6 +1461,7 @@ async def _grip(self, duty_cycle: float, stay_engaged: bool = True) -> None:
duty_cycle=duty_cycle, stay_engaged=stay_engaged
)
await self._cache_encoder_position()
self._gripper_handler.set_jaw_state(await self._backend.get_jaw_state())
except Exception:
self._log.exception(
f"Gripper grip failed, encoder pos: {self._encoder_position[Axis.G]}"
Expand All @@ -1461,6 +1474,7 @@ async def _ungrip(self, duty_cycle: float) -> None:
try:
await self._backend.gripper_home_jaw(duty_cycle=duty_cycle)
await self._cache_encoder_position()
self._gripper_handler.set_jaw_state(await self._backend.get_jaw_state())
except Exception:
self._log.exception("Gripper home failed")
raise
Expand All @@ -1476,6 +1490,7 @@ async def _hold_jaw_width(self, jaw_width_mm: float) -> None:
jaw_displacement_mm = (width_max - jaw_width_mm) / 2.0
await self._backend.gripper_hold_jaw(int(1000 * jaw_displacement_mm))
await self._cache_encoder_position()
self._gripper_handler.set_jaw_state(await self._backend.get_jaw_state())
except Exception:
self._log.exception("Gripper set width failed")
raise
Expand All @@ -1488,21 +1503,24 @@ async def grip(
force_newtons or self._gripper_handler.get_gripper().default_grip_force
)
await self._grip(duty_cycle=dc, stay_engaged=stay_engaged)
self._gripper_handler.set_jaw_state(GripperJawState.GRIPPING)

async def ungrip(self, force_newtons: Optional[float] = None) -> None:
"""
Release gripped object.

To simply open the jaw, use `home_gripper_jaw` instead.
"""
# get default grip force for release if not provided
self._gripper_handler.check_ready_for_jaw_move()
# TODO: check jaw width to make sure it is actually gripping something
dc = self._gripper_handler.get_duty_cycle_by_grip_force(
force_newtons or self._gripper_handler.get_gripper().default_home_force
)
await self._ungrip(duty_cycle=dc)
self._gripper_handler.set_jaw_state(GripperJawState.HOMED_READY)

async def hold_jaw_width(self, jaw_width_mm: int) -> None:
self._gripper_handler.check_ready_for_jaw_move()
await self._hold_jaw_width(jaw_width_mm)
self._gripper_handler.set_jaw_state(GripperJawState.HOLDING_CLOSED)

async def _move_to_plunger_bottom(
self, mount: OT3Mount, rate: float, acquire_lock: bool = True
Expand Down
7 changes: 2 additions & 5 deletions api/src/opentrons/hardware_control/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,8 @@ class GripperJawState(enum.Enum):
#: the gripper has been homed and is at its fully-open homed position
GRIPPING = enum.auto()
#: the gripper is actively force-control gripping something
HOLDING_CLOSED = enum.auto()
#: the gripper is in position-control mode somewhere other than its
#: open position and probably should be opened before gripping something
HOLDING_OPENED = enum.auto()
#: the gripper is holding itself open but not quite at its homed position
HOLDING = enum.auto()
#: the gripper is in position-control mode


class InstrumentProbeType(enum.Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
defs.TipStatusQueryRequest,
defs.GetMotorUsageRequest,
defs.GetMotorUsageResponse,
defs.GripperJawStateRequest,
defs.GripperJawStateResponse,
]


Expand Down
41 changes: 40 additions & 1 deletion hardware/opentrons_hardware/hardware_control/gripper_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from opentrons_hardware.firmware_bindings.arbitration_id import ArbitrationId

from opentrons_hardware.firmware_bindings.messages import payloads
from opentrons_hardware.firmware_bindings.messages.messages import MessageDefinition
from opentrons_hardware.firmware_bindings.messages.message_definitions import (
SetBrushedMotorVrefRequest,
SetBrushedMotorPwmRequest,
Expand All @@ -18,13 +19,20 @@
AddBrushedLinearMoveRequest,
BrushedMotorConfRequest,
BrushedMotorConfResponse,
GripperJawStateRequest,
GripperJawStateResponse,
)
from opentrons_hardware.firmware_bindings.utils import (
UInt8Field,
UInt32Field,
Int32Field,
)
from opentrons_hardware.firmware_bindings.constants import NodeId, ErrorCode
from opentrons_hardware.firmware_bindings.constants import (
MessageId,
NodeId,
ErrorCode,
GripperJawState,
)
from .constants import brushed_motor_interrupts_per_sec

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -192,3 +200,34 @@ async def move(
)
),
)


async def get_gripper_jaw_state(
can_messenger: CanMessenger,
) -> GripperJawState:
"""Get gripper jaw state."""

jaw_state = GripperJawState.unhomed

event = asyncio.Event()

def _listener(message: MessageDefinition, arb_id: ArbitrationId) -> None:
nonlocal jaw_state
if isinstance(message, GripperJawStateResponse):
event.set()
jaw_state = GripperJawState(message.payload.state.value)

def _filter(arb_id: ArbitrationId) -> bool:
return (NodeId(arb_id.parts.originating_node_id) == NodeId.gripper_g) and (
MessageId(arb_id.parts.message_id) == MessageId.gripper_jaw_state_response
)

can_messenger.add_listener(_listener, _filter)
await can_messenger.send(node_id=NodeId.gripper_g, message=GripperJawStateRequest())
try:
await asyncio.wait_for(event.wait(), 1.0)
except asyncio.TimeoutError:
log.warning("gripper jaw state request timed out")
finally:
can_messenger.remove_listener(_listener)
return jaw_state
Loading