Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(api): add explicit Flex hardware protocol #14091

Merged
merged 15 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions api/src/opentrons/hardware_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,29 @@
from .api import API
from .pause_manager import PauseManager
from .backends import Controller, Simulator
from .types import CriticalPoint, ExecutionState
from .types import CriticalPoint, ExecutionState, OT3Mount
from .constants import DROP_TIP_RELEASE_DISTANCE
from .thread_manager import ThreadManager
from .execution_manager import ExecutionManager
from .threaded_async_lock import ThreadedAsyncLock, ThreadedAsyncForbidden
from .protocols import HardwareControlInterface
from .protocols import HardwareControlInterface, FlexHardwareControlInterface
from .instruments import AbstractInstrument, Gripper
from typing import Union
from .ot3_calibration import OT3Transforms
from .robot_calibration import RobotCalibration
from opentrons.config.types import RobotConfig, OT3Config

from opentrons.types import Mount

# TODO (lc 12-05-2022) We should 1. figure out if we need
# to globally export a class that is strictly used in the hardware controller
# and 2. how to properly export an ot2 and ot3 pipette.
from .instruments.ot2.pipette import Pipette

OT2HardwareControlAPI = HardwareControlInterface[RobotCalibration]
OT3HardwareControlAPI = HardwareControlInterface[OT3Transforms]
OT2HardwareControlAPI = HardwareControlInterface[RobotCalibration, Mount, RobotConfig]
OT3HardwareControlAPI = FlexHardwareControlInterface[
OT3Transforms, Union[Mount, OT3Mount], OT3Config
]
HardwareControlAPI = Union[OT2HardwareControlAPI, OT3HardwareControlAPI]

ThreadManagedHardware = ThreadManager[HardwareControlAPI]
Expand All @@ -55,4 +60,6 @@
"ThreadedAsyncForbidden",
"ThreadManagedHardware",
"SyncHardwareAPI",
"OT2HardwareControlAPI",
"OT3HardwareControlAPI",
]
5 changes: 4 additions & 1 deletion api/src/opentrons/hardware_control/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class API(
# of methods that are present in the protocol will call the (empty,
# do-nothing) methods in the protocol. This will happily make all the
# tests fail.
HardwareControlInterface[RobotCalibration],
HardwareControlInterface[RobotCalibration, top_types.Mount, RobotConfig],
):
"""This API is the primary interface to the hardware controller.

Expand Down Expand Up @@ -426,6 +426,9 @@ async def update_firmware(
firmware_file, checked_loop, explicit_modeset
)

def has_gripper(self) -> bool:
return False

async def cache_instruments(
self, require: Optional[Dict[top_types.Mount, PipetteName]] = None
) -> None:
Expand Down
40 changes: 20 additions & 20 deletions api/src/opentrons/hardware_control/ot3_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from .util import DeckTransformState

if TYPE_CHECKING:
sfoster1 marked this conversation as resolved.
Show resolved Hide resolved
from .ot3api import OT3API
from opentrons.hardware_control import OT3HardwareControlAPI

LOG = getLogger(__name__)

Expand Down Expand Up @@ -123,7 +123,7 @@ def _verify_height(


async def _verify_edge_pos(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
search_axis: Union[Literal[Axis.X, Axis.Y]],
found_edge: Point,
Expand Down Expand Up @@ -177,7 +177,7 @@ def critical_edge_offset(


async def find_edge_binary(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
slot_edge_nominal: Point,
search_axis: Union[Literal[Axis.X, Axis.Y]],
Expand Down Expand Up @@ -272,7 +272,7 @@ async def find_edge_binary(


async def find_slot_center_binary(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
estimated_center: Point,
raise_verify_error: bool = True,
Expand Down Expand Up @@ -337,7 +337,7 @@ async def find_slot_center_binary(


async def find_calibration_structure_height(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
nominal_center: Point,
probe: InstrumentProbeType = InstrumentProbeType.PRIMARY,
Expand Down Expand Up @@ -365,7 +365,7 @@ async def find_calibration_structure_height(


async def _probe_deck_at(
api: OT3API,
api: OT3HardwareControlAPI,
mount: OT3Mount,
target: Point,
settings: CapacitivePassSettings,
Expand All @@ -390,7 +390,7 @@ async def _probe_deck_at(


async def find_axis_center(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
minus_edge_nominal: Point,
plus_edge_nominal: Point,
Expand Down Expand Up @@ -530,7 +530,7 @@ def _edges_from_data(


async def find_slot_center_noncontact(
hcapi: OT3API, mount: OT3Mount, estimated_center: Point
hcapi: OT3HardwareControlAPI, mount: OT3Mount, estimated_center: Point
) -> Point:
NONCONTACT_INTERVAL_MM: float = 0.1
travel_center = estimated_center + Point(0, 0, NONCONTACT_INTERVAL_MM)
Expand All @@ -552,7 +552,7 @@ async def find_slot_center_noncontact(


async def find_calibration_structure_center(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
nominal_center: Point,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand All @@ -574,7 +574,7 @@ async def find_calibration_structure_center(


async def _calibrate_mount(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
slot: int = SLOT_CENTER,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand Down Expand Up @@ -641,7 +641,7 @@ async def _calibrate_mount(


async def find_calibration_structure_position(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
nominal_center: Point,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand Down Expand Up @@ -673,7 +673,7 @@ async def find_calibration_structure_position(


async def find_slot_center_binary_from_nominal_center(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
slot: int,
) -> Tuple[Point, Point]:
Expand All @@ -698,7 +698,7 @@ async def find_slot_center_binary_from_nominal_center(


async def _determine_transform_matrix(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
) -> Tuple[types.AttitudeMatrix, Dict[str, Any]]:
"""
Expand Down Expand Up @@ -750,7 +750,7 @@ def gripper_pin_offsets_mean(front: Point, rear: Point) -> Point:


async def calibrate_gripper_jaw(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
probe: GripperProbe,
slot: int = 5,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand Down Expand Up @@ -788,7 +788,7 @@ async def calibrate_gripper_jaw(


async def calibrate_gripper(
hcapi: OT3API, offset_front: Point, offset_rear: Point
hcapi: OT3HardwareControlAPI, offset_front: Point, offset_rear: Point
) -> Point:
"""Calibrate gripper."""
offset = gripper_pin_offsets_mean(front=offset_front, rear=offset_rear)
Expand All @@ -798,7 +798,7 @@ async def calibrate_gripper(


async def find_pipette_offset(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: Literal[OT3Mount.LEFT, OT3Mount.RIGHT],
slot: int = 5,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand Down Expand Up @@ -829,7 +829,7 @@ async def find_pipette_offset(


async def calibrate_pipette(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: Literal[OT3Mount.LEFT, OT3Mount.RIGHT],
slot: int = 5,
method: CalibrationMethod = CalibrationMethod.BINARY_SEARCH,
Expand All @@ -852,7 +852,7 @@ async def calibrate_pipette(


async def calibrate_module(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
slot: str,
module_id: str,
Expand Down Expand Up @@ -907,7 +907,7 @@ async def calibrate_module(


async def calibrate_belts(
hcapi: OT3API,
hcapi: OT3HardwareControlAPI,
mount: OT3Mount,
pipette_id: str,
) -> Tuple[types.AttitudeMatrix, Dict[str, Any]]:
Expand Down Expand Up @@ -1005,7 +1005,7 @@ def validate_attitude_deck_calibration(
return DeckTransformState.OK


def delete_belt_calibration_data(hcapi: OT3API) -> None:
def delete_belt_calibration_data(hcapi: OT3HardwareControlAPI) -> None:
delete_robot_belt_attitude()
hcapi.reset_deck_calibration()

Expand Down
40 changes: 12 additions & 28 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
from . import modules
from .ot3_calibration import OT3Transforms, OT3RobotCalibrationProvider

from .protocols import HardwareControlInterface
from .protocols import FlexHardwareControlInterface

# TODO (lc 09/15/2022) We should update our pipette handler to reflect OT-3 properties
# in a follow-up PR.
Expand Down Expand Up @@ -197,7 +197,9 @@ class OT3API(
# of methods that are present in the protocol will call the (empty,
# do-nothing) methods in the protocol. This will happily make all the
# tests fail.
HardwareControlInterface[OT3Transforms],
FlexHardwareControlInterface[
OT3Transforms, Union[top_types.Mount, OT3Mount], OT3Config
],
):
"""This API is the primary interface to the hardware controller.

Expand Down Expand Up @@ -1312,6 +1314,9 @@ async def idle_gripper(self) -> None:
except GripperNotPresentError:
pass

def gripper_jaw_can_home(self) -> bool:
return self._gripper_handler.is_ready_for_jaw_home()

def _build_moves(
self,
origin: Dict[Axis, float],
Expand Down Expand Up @@ -2254,7 +2259,7 @@ def reset_instrument(
self._pipette_handler.reset_instrument(checked_mount)

def get_instrument_offset(
self, mount: OT3Mount
self, mount: Union[top_types.Mount, OT3Mount]
) -> Union[GripperCalibrationOffset, PipetteOffsetSummary, None]:
"""Get instrument calibration data."""
# TODO (spp, 2023-04-19): We haven't introduced a 'calibration_offset' key in
Expand All @@ -2263,11 +2268,13 @@ def get_instrument_offset(
# to be a part of the dict, this getter can be updated to fetch pipette offset
# from the dict, or just remove this getter entirely.

if mount == OT3Mount.GRIPPER:
ot3_mount = OT3Mount.from_mount(mount)

if ot3_mount == OT3Mount.GRIPPER:
gripper_dict = self._gripper_handler.get_gripper_dict()
return gripper_dict["calibration_offset"] if gripper_dict else None
else:
return self._pipette_handler.get_instrument_offset(mount=mount)
return self._pipette_handler.get_instrument_offset(mount=ot3_mount)

async def reset_instrument_offset(
self, mount: Union[top_types.Mount, OT3Mount], to_default: bool = True
Expand Down Expand Up @@ -2532,29 +2539,6 @@ async def capacitive_probe(
retract_after: bool = True,
probe: Optional[InstrumentProbeType] = None,
) -> Tuple[float, bool]:
"""Determine the position of something using the capacitive sensor.

This function orchestrates detecting the position of a collision between the
capacitive probe on the tool on the specified mount, and some fixed element
of the robot.

When calling this function, the mount's probe critical point should already
be aligned in the probe axis with the item to be probed.

It will move the mount's probe critical point to a small distance behind
the expected position of the element (which is target_pos, in deck coordinates,
in the axis to be probed) while running the tool's capacitive sensor. When the
sensor senses contact, the mount stops.

This function moves away and returns the sensed position.

This sensed position can be used in several ways, including
- To get an absolute position in deck coordinates of whatever was
targeted, if something was guaranteed to be physically present.
- To detect whether a collision occured at all. If this function
returns a value far enough past the anticipated position, then it indicates
there was no material there.
"""
if moving_axis not in [
Axis.X,
Axis.Y,
Expand Down
Loading