diff --git a/api/mypy.ini b/api/mypy.ini index 56b8435855c..4e02621d6a5 100644 --- a/api/mypy.ini +++ b/api/mypy.ini @@ -4,7 +4,7 @@ show_error_codes = True warn_unused_configs = True strict = True # TODO(mc, 2021-09-12): work through and remove these exclusions -exclude = tests/opentrons/(hardware_control/test_.*py|hardware_control/integration/|hardware_control/emulation/|hardware_control/modules/|protocols/advanced_control/|protocols/api_support/|protocols/duration/|protocols/execution/|protocols/fixtures/|protocols/geometry/) +exclude = tests/opentrons/(hardware_control/test_(?!ot3).*py|hardware_control/integration/|hardware_control/emulation/|hardware_control/modules/|protocols/advanced_control/|protocols/api_support/|protocols/duration/|protocols/execution/|protocols/fixtures/|protocols/geometry/) [pydantic-mypy] init_forbid_extra = True diff --git a/api/src/opentrons/hardware_control/estop_state.py b/api/src/opentrons/hardware_control/backends/estop_state.py similarity index 96% rename from api/src/opentrons/hardware_control/estop_state.py rename to api/src/opentrons/hardware_control/backends/estop_state.py index 2c8884dcb26..d421af6a77a 100644 --- a/api/src/opentrons/hardware_control/estop_state.py +++ b/api/src/opentrons/hardware_control/backends/estop_state.py @@ -13,6 +13,7 @@ EstopAttachLocation, EstopStateNotification, HardwareEventHandler, + HardwareEventUnsubscriber, ) @@ -51,10 +52,12 @@ def __del__(self) -> None: if self._detector is not None: self._detector.remove_listener(self.detector_listener) - def add_listener(self, listener: HardwareEventHandler) -> None: + def add_listener(self, listener: HardwareEventHandler) -> HardwareEventUnsubscriber: """Add a hardware event listener for estop event changes.""" if listener not in self._listeners: self._listeners.append(listener) + return lambda: self.remove_listener(listener) + return lambda: None def remove_listener(self, listener: HardwareEventHandler) -> None: """Remove an existing hardware event listener for estop detector changes.""" diff --git a/api/src/opentrons/hardware_control/backends/flex_protocol.py b/api/src/opentrons/hardware_control/backends/flex_protocol.py new file mode 100644 index 00000000000..05c416f20ba --- /dev/null +++ b/api/src/opentrons/hardware_control/backends/flex_protocol.py @@ -0,0 +1,398 @@ +import asyncio +from contextlib import asynccontextmanager +from typing import ( + Protocol, + Dict, + Optional, + List, + Mapping, + AsyncIterator, + Sequence, + Tuple, + Set, + TypeVar, +) +from opentrons_shared_data.pipette.dev_types import ( + PipetteName, +) +from opentrons.config.types import GantryLoad +from opentrons.hardware_control.types import ( + BoardRevision, + Axis, + OT3Mount, + OT3AxisMap, + InstrumentProbeType, + MotorStatus, + UpdateStatus, + SubSystem, + SubSystemState, + TipStateType, + GripperJawState, + HardwareFeatureFlags, + EstopOverallStatus, + EstopState, + HardwareEventHandler, + HardwareEventUnsubscriber, +) +from opentrons.hardware_control.module_control import AttachedModulesControl +from ..dev_types import OT3AttachedInstruments +from ..types import StatusBarState +from .types import HWStopCondition + +Cls = TypeVar("Cls") + + +class FlexBackend(Protocol): + """Flex backend mypy protocol.""" + + async def get_serial_number(self) -> Optional[str]: + ... + + @asynccontextmanager + def restore_system_constraints(self) -> AsyncIterator[None]: + ... + + def update_constraints_for_gantry_load(self, gantry_load: GantryLoad) -> None: + ... + + def update_constraints_for_calibration_with_gantry_load( + self, + gantry_load: GantryLoad, + ) -> None: + ... + + def update_constraints_for_plunger_acceleration( + self, mount: OT3Mount, acceleration: float, gantry_load: GantryLoad + ) -> None: + ... + + @property + def initialized(self) -> bool: + """True when the hardware controller has initialized and is ready.""" + ... + + @initialized.setter + def initialized(self, value: bool) -> None: + ... + + @property + def gear_motor_position(self) -> Optional[float]: + ... + + @property + def board_revision(self) -> BoardRevision: + """Get the board revision""" + ... + + @property + def module_controls(self) -> AttachedModulesControl: + """Get the module controls.""" + ... + + @module_controls.setter + def module_controls(self, module_controls: AttachedModulesControl) -> None: + """Set the module controls""" + ... + + async def update_to_default_current_settings(self, gantry_load: GantryLoad) -> None: + ... + + def update_feature_flags(self, feature_flags: HardwareFeatureFlags) -> None: + """Update the hardware feature flags used by the hardware controller.""" + ... + + async def update_motor_status(self) -> None: + """Retreieve motor and encoder status and position from all present devices""" + ... + + async def update_motor_estimation(self, axes: Sequence[Axis]) -> None: + """Update motor position estimation for commanded axes, and update cache of data.""" + # Simulate conditions as if there are no stalls, aka do nothing + ... + + def _get_motor_status( + self, axes: Sequence[Axis] + ) -> Dict[Axis, Optional[MotorStatus]]: + ... + + def get_invalid_motor_axes(self, axes: Sequence[Axis]) -> List[Axis]: + """Get axes that currently do not have the motor-ok flag.""" + ... + + def get_invalid_encoder_axes(self, axes: Sequence[Axis]) -> List[Axis]: + """Get axes that currently do not have the encoder-ok flag.""" + ... + + def check_motor_status(self, axes: Sequence[Axis]) -> bool: + ... + + def check_encoder_status(self, axes: Sequence[Axis]) -> bool: + ... + + async def update_position(self) -> OT3AxisMap[float]: + """Get the current position.""" + ... + + async def update_encoder_position(self) -> OT3AxisMap[float]: + """Get the encoder current position.""" + ... + + async def liquid_probe( + self, + mount: OT3Mount, + max_z_distance: float, + mount_speed: float, + plunger_speed: float, + threshold_pascals: float, + log_pressure: bool = True, + auto_zero_sensor: bool = True, + num_baseline_reads: int = 10, + probe: InstrumentProbeType = InstrumentProbeType.PRIMARY, + ) -> float: + ... + + async def move( + self, + origin: Dict[Axis, float], + target: Dict[Axis, float], + speed: float, + stop_condition: HWStopCondition = HWStopCondition.none, + nodes_in_moves_only: bool = True, + ) -> None: + """Move to a position. + + Args: + target_position: Map of axis to position. + home_flagged_axes: Whether to home afterwords. + speed: Optional speed + axis_max_speeds: Optional map of axis to speed. + + Returns: + None + """ + ... + + async def home( + self, axes: Sequence[Axis], gantry_load: GantryLoad + ) -> OT3AxisMap[float]: + """Home axes. + + Args: + axes: Optional list of axes. + + Returns: + Homed position. + """ + ... + + async def gripper_grip_jaw( + self, + duty_cycle: float, + stop_condition: HWStopCondition = HWStopCondition.none, + stay_engaged: bool = True, + ) -> None: + """Move gripper inward.""" + ... + + async def gripper_home_jaw(self, duty_cycle: float) -> None: + """Move gripper outward.""" + ... + + async def gripper_hold_jaw( + self, + encoder_position_um: int, + ) -> None: + ... + + async def get_jaw_state(self) -> GripperJawState: + """Get the state of the gripper jaw.""" + ... + + async def tip_action( + self, origin: Dict[Axis, float], targets: List[Tuple[Dict[Axis, float], float]] + ) -> None: + ... + + async def home_tip_motors( + self, + distance: float, + velocity: float, + back_off: bool = True, + ) -> None: + ... + + async def get_attached_instruments( + self, expected: Mapping[OT3Mount, PipetteName] + ) -> Mapping[OT3Mount, OT3AttachedInstruments]: + """Get attached instruments. + + Args: + expected: Which mounts are expected. + + Returns: + A map of mount to pipette name. + """ + ... + + async def get_limit_switches(self) -> OT3AxisMap[bool]: + """Get the state of the gantry's limit switches on each axis.""" + ... + + async def set_active_current(self, axis_currents: OT3AxisMap[float]) -> None: + """Set the active current. + + Args: + axis_currents: Axes' currents + + Returns: + None + """ + ... + + @asynccontextmanager + def motor_current( + self, + run_currents: Optional[OT3AxisMap[float]] = None, + hold_currents: Optional[OT3AxisMap[float]] = None, + ) -> AsyncIterator[None]: + """Save the current.""" + ... + + @asynccontextmanager + def restore_z_r_run_current(self) -> AsyncIterator[None]: + """ + Temporarily restore the active current ONLY when homing or + retracting the Z_R axis while the 96-channel is attached. + """ + ... + + async def watch(self, loop: asyncio.AbstractEventLoop) -> None: + ... + + @property + def axis_bounds(self) -> OT3AxisMap[Tuple[float, float]]: + """Get the axis bounds.""" + ... + + @property + def fw_version(self) -> Dict[SubSystem, int]: + """Get the firmware version.""" + ... + + def axis_is_present(self, axis: Axis) -> bool: + ... + + @property + def update_required(self) -> bool: + ... + + def update_firmware( + self, + subsystems: Set[SubSystem], + force: bool = False, + ) -> AsyncIterator[UpdateStatus]: + """Updates the firmware on the OT3.""" + ... + + def engaged_axes(self) -> OT3AxisMap[bool]: + """Get engaged axes.""" + ... + + async def disengage_axes(self, axes: List[Axis]) -> None: + """Disengage axes.""" + ... + + async def engage_axes(self, axes: List[Axis]) -> None: + """Engage axes.""" + ... + + async def set_lights(self, button: Optional[bool], rails: Optional[bool]) -> None: + """Set the light states.""" + ... + + async def get_lights(self) -> Dict[str, bool]: + """Get the light state.""" + ... + + def pause(self) -> None: + """Pause the controller activity.""" + ... + + def resume(self) -> None: + """Resume the controller activity.""" + ... + + async def halt(self) -> None: + """Halt the motors.""" + ... + + async def probe(self, axis: Axis, distance: float) -> OT3AxisMap[float]: + """Probe.""" + ... + + async def clean_up(self) -> None: + """Clean up.""" + ... + + @staticmethod + def home_position() -> OT3AxisMap[float]: + ... + + async def capacitive_probe( + self, + mount: OT3Mount, + moving: Axis, + distance_mm: float, + speed_mm_per_s: float, + sensor_threshold_pf: float, + probe: InstrumentProbeType, + ) -> bool: + ... + + async def capacitive_pass( + self, + mount: OT3Mount, + moving: Axis, + distance_mm: float, + speed_mm_per_s: float, + probe: InstrumentProbeType, + ) -> List[float]: + ... + + @property + def subsystems(self) -> Dict[SubSystem, SubSystemState]: + ... + + async def get_tip_status(self, mount: OT3Mount) -> TipStateType: + ... + + def current_tip_state(self, mount: OT3Mount) -> Optional[bool]: + ... + + async def update_tip_detector(self, mount: OT3Mount, sensor_count: int) -> None: + ... + + async def teardown_tip_detector(self, mount: OT3Mount) -> None: + ... + + async def set_status_bar_state(self, state: StatusBarState) -> None: + ... + + async def set_status_bar_enabled(self, enabled: bool) -> None: + ... + + def get_status_bar_state(self) -> StatusBarState: + ... + + @property + def estop_status(self) -> EstopOverallStatus: + ... + + def estop_acknowledge_and_clear(self) -> EstopOverallStatus: + ... + + def get_estop_state(self) -> EstopState: + ... + + def add_estop_callback(self, cb: HardwareEventHandler) -> HardwareEventUnsubscriber: + ... diff --git a/api/src/opentrons/hardware_control/backends/ot3controller.py b/api/src/opentrons/hardware_control/backends/ot3controller.py index 8d96014483e..2e49989b7e9 100644 --- a/api/src/opentrons/hardware_control/backends/ot3controller.py +++ b/api/src/opentrons/hardware_control/backends/ot3controller.py @@ -14,7 +14,6 @@ List, Optional, Tuple, - TYPE_CHECKING, Sequence, AsyncIterator, cast, @@ -23,6 +22,7 @@ Iterator, KeysView, Union, + Mapping, ) from opentrons.config.types import OT3Config, GantryLoad from opentrons.config import gripper_config @@ -46,6 +46,9 @@ map_pipette_type_to_sensor_id, moving_axes_in_move_group, gripper_jaw_state_from_fw, + get_system_constraints, + get_system_constraints_for_calibration, + get_system_constraints_for_plunger_acceleration, ) from .tip_presence_manager import TipPresenceManager @@ -67,14 +70,15 @@ from opentrons_hardware.drivers.eeprom import EEPROMDriver, EEPROMData from opentrons_hardware.hardware_control.move_group_runner import MoveGroupRunner from opentrons_hardware.hardware_control.motion_planning import ( - Move, - Coordinates, + MoveManager, + MoveTarget, + ZeroLengthMoveError, ) from opentrons_hardware.hardware_control.estop.detector import ( EstopDetector, ) -from opentrons.hardware_control.estop_state import EstopStateMachine +from opentrons.hardware_control.backends.estop_state import EstopStateMachine from opentrons_hardware.hardware_control.motor_enable_disable import ( set_enable_motor, @@ -127,9 +131,13 @@ SubSystemState, SubSystem, TipStateType, - EstopState, GripperJawState, HardwareFeatureFlags, + EstopOverallStatus, + EstopAttachLocation, + EstopState, + HardwareEventHandler, + HardwareEventUnsubscriber, ) from opentrons.hardware_control.errors import ( InvalidPipetteName, @@ -178,12 +186,16 @@ from .subsystem_manager import SubsystemManager -if TYPE_CHECKING: - from ..dev_types import ( - AttachedPipette, - AttachedGripper, - OT3AttachedInstruments, - ) +from ..dev_types import ( + AttachedPipette, + AttachedGripper, + OT3AttachedInstruments, +) +from ..types import StatusBarState + +from .types import HWStopCondition +from .flex_protocol import FlexBackend +from .status_bar_state import StatusBarStateController log = logging.getLogger(__name__) @@ -229,7 +241,7 @@ async def wrapper(self: OT3Controller, *args: Any, **kwargs: Any) -> Any: return cast(Wrapped, wrapper) -class OT3Controller: +class OT3Controller(FlexBackend): """OT3 Hardware Controller Backend.""" _initialized: bool @@ -317,6 +329,8 @@ def __init__( self._check_updates = check_updates self._initialized = False self._status_bar = status_bar.StatusBar(messenger=self._usb_messenger) + self._status_bar_controller = StatusBarStateController(self._status_bar) + try: self._event_watcher = self._build_event_watcher() except AttributeError: @@ -326,6 +340,46 @@ def __init__( ) self._current_settings: Optional[OT3AxisMap[CurrentConfig]] = None self._tip_presence_manager = TipPresenceManager(self._messenger) + self._move_manager = MoveManager( + constraints=get_system_constraints( + self._configuration.motion_settings, GantryLoad.LOW_THROUGHPUT + ) + ) + + @asynccontextmanager + async def restore_system_constraints(self) -> AsyncIterator[None]: + old_system_constraints = deepcopy(self._move_manager.get_constraints()) + try: + yield + finally: + self._move_manager.update_constraints(old_system_constraints) + log.debug(f"Restore previous system constraints: {old_system_constraints}") + + def update_constraints_for_calibration_with_gantry_load( + self, + gantry_load: GantryLoad, + ) -> None: + self._move_manager.update_constraints( + get_system_constraints_for_calibration( + self._configuration.motion_settings, gantry_load + ) + ) + log.debug( + f"Set system constraints for calibration: {self._move_manager.get_constraints()}" + ) + + def update_constraints_for_gantry_load(self, gantry_load: GantryLoad) -> None: + self._move_manager.update_constraints( + get_system_constraints(self._configuration.motion_settings, gantry_load) + ) + + def update_constraints_for_plunger_acceleration( + self, mount: OT3Mount, acceleration: float, gantry_load: GantryLoad + ) -> None: + new_constraints = get_system_constraints_for_plunger_acceleration( + self._configuration.motion_settings, gantry_load, mount, acceleration + ) + self._move_manager.update_constraints(new_constraints) async def get_serial_number(self) -> Optional[str]: if not self.initialized: @@ -394,8 +448,8 @@ def _build_system_hardware( ) @property - def gear_motor_position(self) -> Dict[NodeId, float]: - return self._gear_motor_position + def gear_motor_position(self) -> Optional[float]: + return self._gear_motor_position.get(NodeId.pipette_left, None) def _motor_nodes(self) -> Set[NodeId]: """Get a list of the motor controller nodes of all attached and ok devices.""" @@ -547,9 +601,10 @@ def _handle_motor_status_response( @requires_estop async def move( self, - origin: Coordinates[Axis, float], - moves: List[Move[Axis]], - stop_condition: MoveStopCondition = MoveStopCondition.none, + origin: Dict[Axis, float], + target: Dict[Axis, float], + speed: float, + stop_condition: HWStopCondition = HWStopCondition.none, nodes_in_moves_only: bool = True, ) -> None: """Move to a position. @@ -568,6 +623,17 @@ async def move( Returns: None """ + move_target = MoveTarget.build(position=target, max_speed=speed) + try: + _, movelist = self._move_manager.plan_motion( + origin=origin, target_list=[move_target] + ) + except ZeroLengthMoveError as zme: + log.warning(f"Not moving because move was zero length {str(zme)}") + return + moves = movelist[0] + log.info(f"move: machine {target} from {origin} requires {moves}") + ordered_nodes = self._motor_nodes() if nodes_in_moves_only: moving_axes = { @@ -575,7 +641,9 @@ async def move( } ordered_nodes = ordered_nodes.intersection(moving_axes) - group = create_move_group(origin, moves, ordered_nodes, stop_condition) + group = create_move_group( + origin, moves, ordered_nodes, MoveStopCondition[stop_condition.name] + ) move_group, _ = group runner = MoveGroupRunner( move_groups=[move_group], @@ -750,10 +818,15 @@ async def home_tip_motors( raise e async def tip_action( - self, - moves: List[Move[Axis]], + self, origin: Dict[Axis, float], targets: List[Tuple[Dict[Axis, float], float]] ) -> None: - move_group = create_tip_action_group(moves, [NodeId.pipette_left], "clamp") + move_targets = [ + MoveTarget.build(target_pos, speed) for target_pos, speed in targets + ] + _, moves = self._move_manager.plan_motion( + origin=origin, target_list=move_targets + ) + move_group = create_tip_action_group(moves[0], [NodeId.pipette_left], "clamp") runner = MoveGroupRunner( move_groups=[move_group], @@ -780,11 +853,11 @@ async def tip_action( async def gripper_grip_jaw( self, duty_cycle: float, - stop_condition: MoveStopCondition = MoveStopCondition.none, + stop_condition: HWStopCondition = HWStopCondition.none, stay_engaged: bool = True, ) -> None: move_group = create_gripper_jaw_grip_group( - duty_cycle, stop_condition, stay_engaged + duty_cycle, MoveStopCondition[stop_condition.name], stay_engaged ) runner = MoveGroupRunner(move_groups=[move_group]) positions = await runner.run(can_messenger=self._messenger) @@ -892,7 +965,7 @@ def _generate_attached_instrs( ) async def get_attached_instruments( - self, expected: Dict[OT3Mount, PipetteName] + self, expected: Mapping[OT3Mount, PipetteName] ) -> Dict[OT3Mount, OT3AttachedInstruments]: """Get attached instruments. @@ -1215,7 +1288,7 @@ async def liquid_probe( auto_zero_sensor: bool = True, num_baseline_reads: int = 10, probe: InstrumentProbeType = InstrumentProbeType.PRIMARY, - ) -> Dict[NodeId, float]: + ) -> float: head_node = axis_to_node(Axis.by_mount(mount)) tool = sensor_node_for_pipette(OT3Mount(mount.value)) positions = await liquid_probe( @@ -1234,7 +1307,7 @@ async def liquid_probe( for node, point in positions.items(): self._position.update({node: point.motor_position}) self._encoder_position.update({node: point.encoder_position}) - return self._position + return self._position[axis_to_node(Axis.by_mount(mount))] async def capacitive_probe( self, @@ -1334,9 +1407,6 @@ def _door_listener(msg: BinaryMessageDefinition) -> None: ), ) - def status_bar_interface(self) -> status_bar.StatusBar: - return self._status_bar - async def build_estop_detector(self) -> bool: """Must be called to set up the estop detector & state machine.""" if self._drivers.usb_messenger is None: @@ -1347,11 +1417,6 @@ async def build_estop_detector(self) -> bool: self._estop_state_machine.subscribe_to_detector(self._estop_detector) return True - @property - def estop_state_machine(self) -> EstopStateMachine: - """Accessor for the API to get the state machine, if it exists.""" - return self._estop_state_machine - @property def tip_presence_manager(self) -> TipPresenceManager: return self._tip_presence_manager @@ -1369,3 +1434,37 @@ async def get_tip_status(self, mount: OT3Mount) -> TipStateType: def current_tip_state(self, mount: OT3Mount) -> Optional[bool]: return self.tip_presence_manager.current_tip_state(mount) + + async def set_status_bar_state(self, state: StatusBarState) -> None: + await self._status_bar_controller.set_status_bar_state(state) + + async def set_status_bar_enabled(self, enabled: bool) -> None: + await self._status_bar_controller.set_enabled(enabled) + + def get_status_bar_state(self) -> StatusBarState: + return self._status_bar_controller.get_current_state() + + @property + def estop_status(self) -> EstopOverallStatus: + return EstopOverallStatus( + state=self._estop_state_machine.state, + left_physical_state=self._estop_state_machine.get_physical_status( + EstopAttachLocation.LEFT + ), + right_physical_state=self._estop_state_machine.get_physical_status( + EstopAttachLocation.RIGHT + ), + ) + + def estop_acknowledge_and_clear(self) -> EstopOverallStatus: + """Attempt to acknowledge an Estop event and clear the status. + + Returns the estop status after clearing the status.""" + self._estop_state_machine.acknowledge_and_clear() + return self.estop_status + + def get_estop_state(self) -> EstopState: + return self._estop_state_machine.state + + def add_estop_callback(self, cb: HardwareEventHandler) -> HardwareEventUnsubscriber: + return self._estop_state_machine.add_listener(cb) diff --git a/api/src/opentrons/hardware_control/backends/ot3simulator.py b/api/src/opentrons/hardware_control/backends/ot3simulator.py index 2cd8c1a37ec..911bc5ddf8e 100644 --- a/api/src/opentrons/hardware_control/backends/ot3simulator.py +++ b/api/src/opentrons/hardware_control/backends/ot3simulator.py @@ -19,31 +19,7 @@ from opentrons.config.types import OT3Config, GantryLoad from opentrons.config import gripper_config -from .ot3utils import ( - axis_convert, - create_move_group, - get_current_settings, - node_to_axis, - axis_to_node, - create_gripper_jaw_hold_group, - create_gripper_jaw_grip_group, - create_gripper_jaw_home_group, - NODEID_SUBSYSTEM, - motor_nodes, - target_to_subsystem, -) -from opentrons_hardware.firmware_bindings.constants import ( - NodeId, - SensorId, - FirmwareTarget, -) -from opentrons_hardware.hardware_control.motion_planning import ( - Move, - Coordinates, -) -from opentrons.hardware_control.estop_state import EstopStateMachine -from opentrons_hardware.drivers.eeprom import EEPROMData from opentrons.hardware_control.module_control import AttachedModulesControl from opentrons.hardware_control import modules from opentrons.hardware_control.types import ( @@ -61,9 +37,13 @@ TipStateType, GripperJawState, HardwareFeatureFlags, + StatusBarState, + EstopOverallStatus, + EstopState, + EstopPhysicalStatus, + HardwareEventHandler, + HardwareEventUnsubscriber, ) -from opentrons_hardware.hardware_control.motion import MoveStopCondition -from opentrons_hardware.hardware_control import status_bar from opentrons_shared_data.pipette.dev_types import PipetteName, PipetteModel from opentrons_shared_data.pipette import ( @@ -72,7 +52,6 @@ ) from opentrons_shared_data.gripper.gripper_definition import GripperModel from opentrons.hardware_control.dev_types import ( - InstrumentHardwareConfigs, PipetteSpec, GripperSpec, AttachedPipette, @@ -80,16 +59,42 @@ OT3AttachedInstruments, ) from opentrons.util.async_helpers import ensure_yield +from .types import HWStopCondition +from .flex_protocol import FlexBackend log = logging.getLogger(__name__) +AXIS_TO_SUBSYSTEM = { + Axis.X: SubSystem.gantry_x, + Axis.Y: SubSystem.gantry_y, + Axis.Z_L: SubSystem.head, + Axis.Z_R: SubSystem.head, + Axis.Z_G: SubSystem.gripper, + Axis.G: SubSystem.gripper, + Axis.P_L: SubSystem.pipette_left, + Axis.P_R: SubSystem.pipette_right, +} + + +def coalesce_move_segments( + origin: Dict[Axis, float], targets: List[Dict[Axis, float]] +) -> Dict[Axis, float]: + for target in targets: + for axis, increment in target.items(): + origin[axis] += increment + return origin + + +def axis_pad(positions: Dict[Axis, float], default_value: float) -> Dict[Axis, float]: + return {ax: positions.get(ax, default_value) for ax in Axis.node_axes()} -class OT3Simulator: + +class OT3Simulator(FlexBackend): """OT3 Hardware Controller Backend.""" - _position: Dict[NodeId, float] - _encoder_position: Dict[NodeId, float] - _motor_status: Dict[NodeId, MotorStatus] + _position: Dict[Axis, float] + _encoder_position: Dict[Axis, float] + _motor_status: Dict[Axis, MotorStatus] @classmethod async def build( @@ -140,8 +145,7 @@ def __init__( self._update_required = False self._initialized = False self._lights = {"button": False, "rails": False} - self._estop_state_machine = EstopStateMachine(detector=None) - self._gear_motor_position: Dict[NodeId, float] = {} + self._gear_motor_position: Dict[Axis, float] = {} self._feature_flags = feature_flags or HardwareFeatureFlags() def _sanitize_attached_instrument( @@ -182,26 +186,53 @@ def _sanitize_attached_instrument( self._position = self._get_home_position() self._encoder_position = self._get_home_position() self._motor_status = {} - nodes = set((NodeId.head_l, NodeId.head_r, NodeId.gantry_x, NodeId.gantry_y)) + axes = set((Axis.Z_L, Axis.Z_R, Axis.X, Axis.Y)) if self._attached_instruments[OT3Mount.LEFT].get("model", None): - nodes.add(NodeId.pipette_left) + axes.add(Axis.P_L) if self._attached_instruments[OT3Mount.RIGHT].get("model", None): - nodes.add(NodeId.pipette_right) + axes.add(Axis.P_L) if self._attached_instruments.get( OT3Mount.GRIPPER ) and self._attached_instruments[OT3Mount.GRIPPER].get("model", None): - nodes.add(NodeId.gripper) - self._present_nodes = nodes + axes.update((Axis.G, Axis.Z_G)) + self._present_axes = axes self._current_settings: Optional[OT3AxisMap[CurrentConfig]] = None self._sim_jaw_state = GripperJawState.HOMED_READY self._sim_tip_state: Dict[OT3Mount, Optional[bool]] = { mount: False if self._attached_instruments[mount] else None for mount in [OT3Mount.LEFT, OT3Mount.RIGHT] } + self._sim_gantry_load = GantryLoad.LOW_THROUGHPUT + self._sim_status_bar_state = StatusBarState.IDLE + self._sim_estop_state = EstopState.DISENGAGED + self._sim_estop_left_state = EstopPhysicalStatus.DISENGAGED + self._sim_estop_right_state = EstopPhysicalStatus.DISENGAGED async def get_serial_number(self) -> Optional[str]: return "simulator" + @asynccontextmanager + async def restore_system_constraints(self) -> AsyncIterator[None]: + log.debug("Simulating saving system constraints") + try: + yield + finally: + log.debug("Simulating restoring system constraints") + + def update_constraints_for_gantry_load(self, gantry_load: GantryLoad) -> None: + self._sim_gantry_load = gantry_load + + def update_constraints_for_calibration_with_gantry_load( + self, + gantry_load: GantryLoad, + ) -> None: + self._sim_gantry_load = gantry_load + + def update_constraints_for_plunger_acceleration( + self, mount: OT3Mount, acceleration: float, gantry_load: GantryLoad + ) -> None: + self._sim_gantry_load = gantry_load + @property def initialized(self) -> bool: """True when the hardware controller has initialized and is ready.""" @@ -212,12 +243,8 @@ def initialized(self, value: bool) -> None: self._initialized = value @property - def eeprom_data(self) -> EEPROMData: - return EEPROMData() - - @property - def gear_motor_position(self) -> Dict[NodeId, float]: - return self._gear_motor_position + def gear_motor_position(self) -> Optional[float]: + return self._gear_motor_position.get(Axis.Q, None) @property def board_revision(self) -> BoardRevision: @@ -238,15 +265,13 @@ def module_controls(self, module_controls: AttachedModulesControl) -> None: @ensure_yield async def update_to_default_current_settings(self, gantry_load: GantryLoad) -> None: - self._current_settings = get_current_settings( - self._configuration.current_settings, gantry_load - ) + self._gantry_load = gantry_load def update_feature_flags(self, feature_flags: HardwareFeatureFlags) -> None: """Update the hardware feature flags used by the hardware controller.""" self._feature_flags = feature_flags - def _handle_motor_status_update(self, response: Dict[NodeId, float]) -> None: + def _handle_motor_status_update(self, response: Dict[Axis, float]) -> None: self._position.update(response) self._encoder_position.update(response) self._motor_status.update( @@ -255,27 +280,27 @@ def _handle_motor_status_update(self, response: Dict[NodeId, float]) -> None: @ensure_yield async def update_motor_status(self) -> None: - """Retreieve motor and encoder status and position from all present nodes""" + """Retreieve motor and encoder status and position from all present devices""" if not self._motor_status: # Simulate condition at boot, status would not be ok self._motor_status.update( - (node, MotorStatus(False, False)) for node in self._present_nodes + (axis, MotorStatus(False, False)) for axis in self._present_axes ) else: self._motor_status.update( - (node, MotorStatus(True, True)) for node in self._present_nodes + (axis, MotorStatus(True, True)) for axis in self._present_axes ) @ensure_yield async def update_motor_estimation(self, axes: Sequence[Axis]) -> None: - """Update motor position estimation for commanded nodes, and update cache of data.""" + """Update motor position estimation for commanded axes, and update cache of data.""" # Simulate conditions as if there are no stalls, aka do nothing return None def _get_motor_status( self, axes: Sequence[Axis] ) -> Dict[Axis, Optional[MotorStatus]]: - return {ax: self._motor_status.get(axis_to_node(ax)) for ax in axes} + return {ax: self._motor_status.get(ax) for ax in axes} def get_invalid_motor_axes(self, axes: Sequence[Axis]) -> List[Axis]: """Get axes that currently do not have the motor-ok flag.""" @@ -301,17 +326,11 @@ def check_encoder_status(self, axes: Sequence[Axis]) -> bool: async def update_position(self) -> OT3AxisMap[float]: """Get the current position.""" - return axis_convert(self._position, 0.0) + return axis_pad(self._position, 0.0) async def update_encoder_position(self) -> OT3AxisMap[float]: """Get the encoder current position.""" - return axis_convert(self._encoder_position, 0.0) - - @asynccontextmanager - async def monitor_overpressure( - self, mount: OT3Mount, sensor_id: SensorId = SensorId.S0 - ) -> AsyncIterator[None]: - yield + return axis_pad(self._encoder_position, 0.0) @ensure_yield async def liquid_probe( @@ -325,21 +344,22 @@ async def liquid_probe( auto_zero_sensor: bool = True, num_baseline_reads: int = 10, probe: InstrumentProbeType = InstrumentProbeType.PRIMARY, - ) -> Dict[NodeId, float]: - - head_node = axis_to_node(Axis.by_mount(mount)) + ) -> float: + z_axis = Axis.by_mount(mount) pos = self._position - pos[head_node] += max_z_distance + pos[z_axis] += max_z_distance self._position.update(pos) self._encoder_position.update(pos) - return self._position + return self._position[z_axis] @ensure_yield async def move( self, - origin: Coordinates[Axis, float], - moves: List[Move[Axis]], - stop_condition: MoveStopCondition = MoveStopCondition.none, + origin: Dict[Axis, float], + target: Dict[Axis, float], + speed: Optional[float] = None, + stop_condition: HWStopCondition = HWStopCondition.none, + nodes_in_moves_only: bool = True, ) -> None: """Move to a position. @@ -352,9 +372,8 @@ async def move( Returns: None """ - _, final_positions = create_move_group(origin, moves, self._present_nodes) - self._position.update(final_positions) - self._encoder_position.update(final_positions) + self._position.update(target) + self._encoder_position.update(target) @ensure_yield async def home( @@ -369,30 +388,28 @@ async def home( Homed position. """ if axes: - homed = [axis_to_node(a) for a in axes] + homed = axes else: - homed = list(self._position.keys()) + homed = list(iter(self._position.keys())) for h in homed: self._position[h] = self._get_home_position()[h] self._motor_status[h] = MotorStatus(True, True) - return axis_convert(self._position, 0.0) + return axis_pad(self._position, 0.0) @ensure_yield async def gripper_grip_jaw( self, duty_cycle: float, - stop_condition: MoveStopCondition = MoveStopCondition.none, + stop_condition: HWStopCondition = HWStopCondition.none, stay_engaged: bool = True, ) -> None: """Move gripper inward.""" - _ = create_gripper_jaw_grip_group(duty_cycle, stop_condition, stay_engaged) self._sim_jaw_state = GripperJawState.GRIPPING @ensure_yield async def gripper_home_jaw(self, duty_cycle: float) -> None: """Move gripper outward.""" - _ = create_gripper_jaw_home_group(duty_cycle) - self._motor_status[NodeId.gripper_g] = MotorStatus(True, True) + self._motor_status[Axis.G] = MotorStatus(True, True) self._sim_jaw_state = GripperJawState.HOMED_READY @ensure_yield @@ -400,8 +417,7 @@ async def gripper_hold_jaw( self, encoder_position_um: int, ) -> None: - _ = create_gripper_jaw_hold_group(encoder_position_um) - self._encoder_position[NodeId.gripper_g] = encoder_position_um / 1000.0 + self._encoder_position[Axis.G] = encoder_position_um / 1000.0 self._sim_jaw_state = GripperJawState.HOLDING async def get_jaw_state(self) -> GripperJawState: @@ -409,10 +425,12 @@ async def get_jaw_state(self) -> GripperJawState: return self._sim_jaw_state async def tip_action( - self, - moves: List[Move[Axis]], + self, origin: Dict[Axis, float], targets: List[Tuple[Dict[Axis, float], float]] ) -> None: - pass + self._gear_motor_position.update( + coalesce_move_segments(origin, [target[0] for target in targets]) + ) + await asyncio.sleep(0) async def home_tip_motors( self, @@ -584,18 +602,10 @@ def axis_bounds(self) -> OT3AxisMap[Tuple[float, float]]: @property def fw_version(self) -> Dict[SubSystem, int]: """Get the firmware version.""" - return { - NODEID_SUBSYSTEM[node.application_for()]: 0 for node in self._present_nodes - } + return {AXIS_TO_SUBSYSTEM[axis]: 0 for axis in self._present_axes} def axis_is_present(self, axis: Axis) -> bool: - try: - return axis_to_node(axis) in motor_nodes( - cast(Set[FirmwareTarget], self._present_nodes) - ) - except KeyError: - # Currently unhandled axis - return False + return axis in self._present_axes @property def update_required(self) -> bool: @@ -667,31 +677,22 @@ async def clean_up(self) -> None: """Clean up.""" pass - @ensure_yield - async def configure_mount( - self, mount: OT3Mount, config: InstrumentHardwareConfigs - ) -> None: - """Configure a mount.""" - return None - @staticmethod - def _get_home_position() -> Dict[NodeId, float]: + def _get_home_position() -> Dict[Axis, float]: return { - NodeId.head_l: 0, - NodeId.head_r: 0, - NodeId.gantry_x: 0, - NodeId.gantry_y: 0, - NodeId.pipette_left: 0, - NodeId.pipette_right: 0, - NodeId.gripper_z: 0, - NodeId.gripper_g: 0, + Axis.Z_L: 0, + Axis.Z_R: 0, + Axis.X: 0, + Axis.Y: 0, + Axis.P_L: 0, + Axis.P_R: 0, + Axis.Z_G: 0, + Axis.G: 0, } @staticmethod def home_position() -> OT3AxisMap[float]: - return { - node_to_axis(k): v for k, v in OT3Simulator._get_home_position().items() - } + return OT3Simulator._get_home_position() @ensure_yield async def capacitive_probe( @@ -703,7 +704,7 @@ async def capacitive_probe( sensor_threshold_pf: float, probe: InstrumentProbeType, ) -> bool: - self._position[axis_to_node(moving)] += distance_mm + self._position[moving] += distance_mm return True @ensure_yield @@ -715,21 +716,13 @@ async def capacitive_pass( speed_mm_per_s: float, probe: InstrumentProbeType, ) -> List[float]: - self._position[axis_to_node(moving)] += distance_mm + self._position[moving] += distance_mm return [] - @ensure_yield - async def connect_usb_to_rear_panel(self) -> None: - """Connect to rear panel over usb.""" - return None - - def status_bar_interface(self) -> status_bar.StatusBar: - return status_bar.StatusBar(None) - @property def subsystems(self) -> Dict[SubSystem, SubSystemState]: return { - target_to_subsystem(target): SubSystemState( + AXIS_TO_SUBSYSTEM[axis]: SubSystemState( ok=True, current_fw_version=1, next_fw_version=1, @@ -738,14 +731,9 @@ def subsystems(self) -> Dict[SubSystem, SubSystemState]: pcba_revision="A1", update_state=None, ) - for target in self._present_nodes + for axis in self._present_axes } - @property - def estop_state_machine(self) -> EstopStateMachine: - """Return an estop state machine locked in the "disengaged" state.""" - return self._estop_state_machine - async def get_tip_status(self, mount: OT3Mount) -> TipStateType: return TipStateType(self._sim_tip_state[mount]) @@ -757,3 +745,36 @@ async def update_tip_detector(self, mount: OT3Mount, sensor_count: int) -> None: async def teardown_tip_detector(self, mount: OT3Mount) -> None: pass + + async def set_status_bar_state(self, state: StatusBarState) -> None: + self._sim_status_bar_state = state + await asyncio.sleep(0) + + async def set_status_bar_enabled(self, enabled: bool) -> None: + await asyncio.sleep(0) + + def get_status_bar_state(self) -> StatusBarState: + return self._sim_status_bar_state + + @property + def estop_status(self) -> EstopOverallStatus: + return EstopOverallStatus( + state=self._sim_estop_state, + left_physical_state=self._sim_estop_left_state, + right_physical_state=self._sim_estop_right_state, + ) + + def estop_acknowledge_and_clear(self) -> EstopOverallStatus: + """Attempt to acknowledge an Estop event and clear the status. + + Returns the estop status after clearing the status.""" + self._sim_estop_state = EstopState.DISENGAGED + self._sim_estop_left_state = EstopPhysicalStatus.DISENGAGED + self._sim_estop_right_state = EstopPhysicalStatus.DISENGAGED + return self.estop_status + + def get_estop_state(self) -> EstopState: + return self._sim_estop_state + + def add_estop_callback(self, cb: HardwareEventHandler) -> HardwareEventUnsubscriber: + return lambda: None diff --git a/api/src/opentrons/hardware_control/backends/ot3utils.py b/api/src/opentrons/hardware_control/backends/ot3utils.py index 2b1d50f5ade..967d4640a95 100644 --- a/api/src/opentrons/hardware_control/backends/ot3utils.py +++ b/api/src/opentrons/hardware_control/backends/ot3utils.py @@ -105,16 +105,7 @@ def axis_nodes() -> List["NodeId"]: def node_axes() -> List[Axis]: - return [ - Axis.X, - Axis.Y, - Axis.Z_L, - Axis.Z_R, - Axis.P_L, - Axis.P_R, - Axis.Z_G, - Axis.G, - ] + return Axis.node_axes() def home_axes() -> List[Axis]: diff --git a/api/src/opentrons/hardware_control/status_bar_state.py b/api/src/opentrons/hardware_control/backends/status_bar_state.py similarity index 99% rename from api/src/opentrons/hardware_control/status_bar_state.py rename to api/src/opentrons/hardware_control/backends/status_bar_state.py index b38a709be86..616fec2ff3a 100644 --- a/api/src/opentrons/hardware_control/status_bar_state.py +++ b/api/src/opentrons/hardware_control/backends/status_bar_state.py @@ -1,4 +1,4 @@ -from .types import StatusBarState +from opentrons.hardware_control.types import StatusBarState from opentrons_hardware.hardware_control import status_bar from opentrons_hardware.firmware_bindings.binary_constants import ( LightAnimationType, diff --git a/api/src/opentrons/hardware_control/backends/types.py b/api/src/opentrons/hardware_control/backends/types.py new file mode 100644 index 00000000000..e29001abee9 --- /dev/null +++ b/api/src/opentrons/hardware_control/backends/types.py @@ -0,0 +1,14 @@ +"""backends.types - wrapper types for api/backend interaction""" + +from enum import Enum, auto + + +class HWStopCondition(Enum): + none = auto() + limit_switch = auto() + sync_line = auto() + encoder_position = auto() + gripper_force = auto() + stall = auto() + ignore_stalls = auto() + limit_switch_backoff = auto() diff --git a/api/src/opentrons/hardware_control/ot3api.py b/api/src/opentrons/hardware_control/ot3api.py index b31cac026f8..f97fbf38136 100644 --- a/api/src/opentrons/hardware_control/ot3api.py +++ b/api/src/opentrons/hardware_control/ot3api.py @@ -4,7 +4,6 @@ from functools import partial, lru_cache, wraps from dataclasses import replace import logging -from copy import deepcopy from collections import OrderedDict from typing import ( AsyncIterator, @@ -49,14 +48,7 @@ LiquidProbeSettings, ) from opentrons.drivers.rpi_drivers.types import USBPort, PortGroup -from opentrons_hardware.hardware_control.motion_planning import ( - Move, - MoveManager, - MoveTarget, - ZeroLengthMoveError, -) from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType -from opentrons_hardware.hardware_control.motion import MoveStopCondition from opentrons_shared_data.errors.exceptions import ( EnumeratedError, PythonException, @@ -76,15 +68,7 @@ GripperCalibrationOffset, PipetteOffsetSummary, ) -from .backends.ot3controller import OT3Controller -from .backends.ot3simulator import OT3Simulator -from .backends.ot3utils import ( - axis_convert, - get_system_constraints, - get_system_constraints_for_calibration, - get_system_constraints_for_plunger_acceleration, -) -from .backends.errors import SubsystemUpdating + from .execution_manager import ExecutionManagerProvider from .pause_manager import PauseManager from .module_control import AttachedModulesControl @@ -110,7 +94,6 @@ SubSystemState, TipStateType, EstopOverallStatus, - EstopAttachLocation, EstopStateNotification, EstopState, HardwareFeatureFlags, @@ -156,10 +139,12 @@ InstrumentDict, GripperDict, ) +from .backends.types import HWStopCondition +from .backends.flex_protocol import FlexBackend +from .backends.ot3simulator import OT3Simulator +from .backends.errors import SubsystemUpdating -from .status_bar_state import StatusBarStateController - mod_log = logging.getLogger(__name__) AXES_IN_HOMING_ORDER: Tuple[Axis, Axis, Axis, Axis, Axis, Axis, Axis, Axis, Axis] = ( @@ -221,7 +206,7 @@ class OT3API( def __init__( self, - backend: Union[OT3Simulator, OT3Controller], + backend: FlexBackend, loop: asyncio.AbstractEventLoop, config: OT3Config, feature_flags: HardwareFeatureFlags, @@ -241,7 +226,7 @@ def estop_cb(event: HardwareEvent) -> None: self._update_estop_state(event) self._feature_flags = feature_flags - backend.estop_state_machine.add_listener(estop_cb) + backend.add_estop_callback(estop_cb) self._callbacks: Set[HardwareEventHandler] = set() # {'X': 0.0, 'Y': 0.0, 'Z': 0.0, 'A': 0.0, 'B': 0.0, 'C': 0.0} @@ -257,18 +242,9 @@ def estop_cb(event: HardwareEvent) -> None: self._motion_lock = asyncio.Lock() self._door_state = DoorState.CLOSED self._pause_manager = PauseManager() - self._gantry_load = GantryLoad.LOW_THROUGHPUT - self._move_manager = MoveManager( - constraints=get_system_constraints( - self._config.motion_settings, self._gantry_load - ) - ) - self._status_bar_controller = StatusBarStateController( - self._backend.status_bar_interface() - ) - self._pipette_handler = OT3PipetteHandler({m: None for m in OT3Mount}) self._gripper_handler = GripperHandler(gripper=None) + self._gantry_load = GantryLoad.LOW_THROUGHPUT OT3RobotCalibrationProvider.__init__(self, self._config) ExecutionManagerProvider.__init__(self, isinstance(backend, OT3Simulator)) @@ -287,42 +263,28 @@ def gantry_load(self) -> GantryLoad: async def set_gantry_load(self, gantry_load: GantryLoad) -> None: mod_log.info(f"Setting gantry load to {gantry_load}") self._gantry_load = gantry_load - self._move_manager.update_constraints( - get_system_constraints(self._config.motion_settings, gantry_load) - ) + self._backend.update_constraints_for_gantry_load(gantry_load) await self._backend.update_to_default_current_settings(gantry_load) async def get_serial_number(self) -> Optional[str]: return await self._backend.get_serial_number() async def set_system_constraints_for_calibration(self) -> None: - self._move_manager.update_constraints( - get_system_constraints_for_calibration( - self._config.motion_settings, self._gantry_load - ) - ) - mod_log.debug( - f"Set system constraints for calibration: {self._move_manager.get_constraints()}" + self._backend.update_constraints_for_calibration_with_gantry_load( + self._gantry_load ) async def set_system_constraints_for_plunger_acceleration( self, mount: OT3Mount, acceleration: float ) -> None: - new_constraints = get_system_constraints_for_plunger_acceleration( - self._config.motion_settings, self._gantry_load, mount, acceleration + self._backend.update_constraints_for_plunger_acceleration( + mount, acceleration, self._gantry_load ) - self._move_manager.update_constraints(new_constraints) @contextlib.asynccontextmanager async def restore_system_constrants(self) -> AsyncIterator[None]: - old_system_constraints = deepcopy(self._move_manager.get_constraints()) - try: + async with self._backend.restore_system_constraints(): yield - finally: - self._move_manager.update_constraints(old_system_constraints) - mod_log.debug( - f"Restore previous system constraints: {old_system_constraints}" - ) def _update_door_state(self, door_state: DoorState) -> None: mod_log.info(f"Updating the window switch status: {door_state}") @@ -395,6 +357,8 @@ async def build_hardware_controller( checked_config = robot_configs.load_ot3() else: checked_config = config + from .backends.ot3controller import OT3Controller + backend = await OT3Controller.build( checked_config, use_usb_bus, @@ -452,6 +416,7 @@ async def build_hardware_simulator( checked_config = robot_configs.load_ot3() else: checked_config = config + backend = await OT3Simulator.build( {OT3Mount.from_mount(k): v for k, v in attached_instruments.items()} if attached_instruments @@ -565,13 +530,13 @@ async def identify(self, duration_s: int = 5) -> None: await self.set_lights(button=True) async def set_status_bar_state(self, state: StatusBarState) -> None: - await self._status_bar_controller.set_status_bar_state(state) + await self._backend.set_status_bar_state(state) async def set_status_bar_enabled(self, enabled: bool) -> None: - await self._status_bar_controller.set_enabled(enabled) + await self._backend.set_status_bar_enabled(enabled) def get_status_bar_state(self) -> StatusBarState: - return self._status_bar_controller.get_current_state() + return self._backend.get_status_bar_state() @ExecutionManagerProvider.wait_for_running async def delay(self, duration_s: float) -> None: @@ -910,16 +875,14 @@ async def home_gear_motors(self) -> None: max_distance = self._backend.axis_bounds[Axis.Q][1] # if position is not known, move toward limit switch at a constant velocity - if len(self._backend.gear_motor_position.keys()) == 0: + if self._backend.gear_motor_position is None: await self._backend.home_tip_motors( distance=max_distance, velocity=homing_velocity, ) return - current_pos_float = axis_convert(self._backend.gear_motor_position, 0.0)[ - Axis.P_L - ] + current_pos_float = self._backend.gear_motor_position or 0.0 # We filter out a distance more than `max_distance` because, if the tip motor was stopped during # a slow-home motion, the position may be stuck at an enormous large value. @@ -928,16 +891,14 @@ async def home_gear_motors(self) -> None: and current_pos_float < max_distance ): - fast_home_moves = self._build_moves( - {Axis.Q: current_pos_float}, {Axis.Q: self._config.safe_home_distance} - ) # move toward home until a safe distance - await self._backend.tip_action(moves=fast_home_moves[0]) + await self._backend.tip_action( + origin={Axis.Q: current_pos_float}, + targets=[({Axis.Q: self._config.safe_home_distance}, 400)], + ) # update current position - current_pos_float = axis_convert(self._backend.gear_motor_position, 0.0)[ - Axis.P_L - ] + current_pos_float = self._backend.gear_motor_position or 0.0 # move until the limit switch is triggered, with no acceleration await self._backend.home_tip_motors( @@ -1106,9 +1067,7 @@ def _effector_pos_from_carriage_pos( plunger_ax: carriage_position[plunger_ax], } if self._gantry_load == GantryLoad.HIGH_THROUGHPUT: - effector_pos[Axis.Q] = axis_convert(self._backend.gear_motor_position, 0.0)[ - Axis.P_L - ] + effector_pos[Axis.Q] = self._backend.gear_motor_position or 0.0 return effector_pos @@ -1345,21 +1304,6 @@ def raise_error_if_gripper_pickup_failed(self, labware_width: float) -> None: def gripper_jaw_can_home(self) -> bool: return self._gripper_handler.is_ready_for_jaw_home() - def _build_moves( - self, - origin: Dict[Axis, float], - target: Dict[Axis, float], - speed: Optional[float] = None, - ) -> List[List[Move[Axis]]]: - """Build move with Move Manager with machine positions.""" - # TODO: (2022-02-10) Use actual max speed for MoveTarget - checked_speed = speed or 400 - move_target = MoveTarget.build(position=target, max_speed=checked_speed) - _, moves = self._move_manager.plan_motion( - origin=origin, target_list=[move_target] - ) - return moves - @ExecutionManagerProvider.wait_for_running async def _move( self, @@ -1385,27 +1329,17 @@ async def _move( if ax in Axis.gantry_axes() } check_motion_bounds(to_check, target_position, bounds, check_bounds) - + self._log.info(f"Move: deck {target_position} becomes machine {machine_pos}") origin = await self._backend.update_position() - try: - moves = self._build_moves(origin, machine_pos, speed) - except ZeroLengthMoveError as zero_length_error: - self._log.info(f"{str(zero_length_error)}, ignoring") - return - self._log.info( - f"move: deck {target_position} becomes machine {machine_pos} from {origin} " - f"requiring {moves}" - ) async with contextlib.AsyncExitStack() as stack: if acquire_lock: await stack.enter_async_context(self._motion_lock) try: await self._backend.move( origin, - moves[0], - MoveStopCondition.stall - if expect_stalls - else MoveStopCondition.none, + machine_pos, + speed or 400.0, + HWStopCondition.stall if expect_stalls else HWStopCondition.none, ) except Exception: self._log.exception("Move failed") @@ -1432,9 +1366,6 @@ async def _set_plunger_current_and_home( if encoder_ok and motor_ok: if origin[axis] - target_pos[axis] > self._config.safe_home_distance: target_pos[axis] += self._config.safe_home_distance - moves = self._build_moves( - origin, target_pos, instr.config.plunger_homing_configurations.speed - ) async with self._backend.motor_current( run_currents={ axis: instr.config.plunger_homing_configurations.current @@ -1442,8 +1373,9 @@ async def _set_plunger_current_and_home( ): await self._backend.move( origin, - moves[0], - MoveStopCondition.none, + target_pos, + instr.config.plunger_homing_configurations.speed, + HWStopCondition.none, ) await self._backend.home([axis], self.gantry_load) else: @@ -1508,12 +1440,12 @@ async def _home_axis(self, axis: Axis) -> None: axis_home_dist = 20.0 if origin[axis] - target_pos[axis] > axis_home_dist: target_pos[axis] += axis_home_dist - moves = self._build_moves(origin, target_pos) try: await self._backend.move( origin, - moves[0], - MoveStopCondition.none, + target_pos, + speed=400, + stop_condition=HWStopCondition.none, ) except StallOrCollisionDetectedError: self._log.warning( @@ -1537,9 +1469,6 @@ async def _home(self, axes: Sequence[Axis]) -> None: await self._backend.home([axis], self.gantry_load) else: await self._home_axis(axis) - except ZeroLengthMoveError: - self._log.info(f"{axis} already at home position, skip homing") - continue except BaseException as e: self._log.exception(f"Homing failed: {e}") self._current_position.clear() @@ -1625,11 +1554,7 @@ async def retract_axis(self, axis: Axis) -> None: # we can move to the home position without checking the limit switch origin = await self._backend.update_position() target_pos = {axis: self._backend.home_position()[axis]} - try: - moves = self._build_moves(origin, target_pos) - await self._backend.move(origin, moves[0], MoveStopCondition.none) - except ZeroLengthMoveError: - self._log.info(f"{axis} already at home position, skip retract") + await self._backend.move(origin, target_pos, 400, HWStopCondition.none) else: # home the axis await self._home_axis(axis) @@ -2010,16 +1935,14 @@ async def _high_throughput_check_tip(self) -> AsyncIterator[None]: if self._backend.gear_motor_position is None: await self.home_gear_motors() - tip_motor_pos_float = axis_convert(self._backend.gear_motor_position, 0.0)[ - Axis.of_main_tool_actuator(OT3Mount.LEFT) - ] + tip_motor_pos_float = self._backend.gear_motor_position or 0.0 # only move tip motors if they are not already below the sensor if tip_motor_pos_float < tip_presence_check_target: - clamp_moves = self._build_moves( - {Axis.Q: tip_motor_pos_float}, {Axis.Q: tip_presence_check_target} + await self._backend.tip_action( + origin={Axis.Q: tip_motor_pos_float}, + targets=[({Axis.Q: tip_presence_check_target}, 400)], ) - await self._backend.tip_action(moves=clamp_moves[0]) try: yield finally: @@ -2074,27 +1997,18 @@ async def _tip_motor_action( currents = pipette_spec[0].currents # Move to pickup position async with self._backend.motor_current(run_currents=currents): - if not any(self._backend.gear_motor_position): + if self._backend.gear_motor_position is None: # home gear motor if position not known await self.home_gear_motors() - pipette_axis = Axis.of_main_tool_actuator(mount) - gear_origin_float = axis_convert(self._backend.gear_motor_position, 0.0)[ - pipette_axis - ] + gear_origin_float = self._backend.gear_motor_position or 0.0 move_targets = [ - MoveTarget.build( - position={Axis.Q: move_segment.distance}, - max_speed=move_segment.speed or 400, - ) + ({Axis.Q: move_segment.distance}, move_segment.speed or 400) for move_segment in pipette_spec ] - - _, moves = self._move_manager.plan_motion( - origin={Axis.Q: gear_origin_float}, target_list=move_targets + await self._backend.tip_action( + origin={Axis.Q: gear_origin_float}, targets=move_targets ) - await self._backend.tip_action(moves=moves[0]) - await self.home_gear_motors() async def pick_up_tip( @@ -2673,22 +2587,14 @@ def attached_subsystems(self) -> Dict[SubSystem, SubSystemState]: @property def estop_status(self) -> EstopOverallStatus: - return EstopOverallStatus( - state=self._backend.estop_state_machine.state, - left_physical_state=self._backend.estop_state_machine.get_physical_status( - EstopAttachLocation.LEFT - ), - right_physical_state=self._backend.estop_state_machine.get_physical_status( - EstopAttachLocation.RIGHT - ), - ) + return self._backend.estop_status def estop_acknowledge_and_clear(self) -> EstopOverallStatus: """Attempt to acknowledge an Estop event and clear the status. Returns the estop status after clearing the status.""" - self._backend.estop_state_machine.acknowledge_and_clear() + self._backend.estop_acknowledge_and_clear() return self.estop_status def get_estop_state(self) -> EstopState: - return self._backend.estop_state_machine.state + return self._backend.get_estop_state() diff --git a/api/src/opentrons/hardware_control/thread_manager.py b/api/src/opentrons/hardware_control/thread_manager.py index 4a8a7ae5936..3e189c57a49 100644 --- a/api/src/opentrons/hardware_control/thread_manager.py +++ b/api/src/opentrons/hardware_control/thread_manager.py @@ -295,7 +295,7 @@ def sync(self) -> SynchronousAdapter[WrappedObj]: def __repr__(self) -> str: return "" - def clean_up(self) -> None: + def clean_up_tm(self) -> None: try: loop = object.__getattribute__(self, "_loop") loop.call_soon_threadsafe(loop.stop) @@ -348,7 +348,7 @@ def __getattribute__(self, attr_name: str) -> Any: wrapped_cleanup = getattr( object.__getattribute__(self, "bridged_obj"), "clean_up" ) - our_cleanup = object.__getattribute__(self, "clean_up") + our_cleanup = object.__getattribute__(self, "clean_up_tm") def call_both() -> None: # the wrapped cleanup wants to happen in the managed thread, diff --git a/api/src/opentrons/hardware_control/types.py b/api/src/opentrons/hardware_control/types.py index 9cd8c6b758e..aaf05a30796 100644 --- a/api/src/opentrons/hardware_control/types.py +++ b/api/src/opentrons/hardware_control/types.py @@ -232,6 +232,13 @@ def of_plunger(cls, mount: top_types.Mount) -> "Axis": """ return cls.of_main_tool_actuator(mount) + @classmethod + def node_axes(cls) -> List["Axis"]: + """ + Get a list of axes that are backed by flex canbus nodes. + """ + return [cls.X, cls.Y, cls.Z_L, cls.Z_R, cls.P_L, cls.P_R, cls.Z_G, cls.G] + class SubSystem(enum.Enum): """An enumeration of ot3 components. @@ -412,6 +419,7 @@ class ErrorMessageNotification: ] HardwareEventHandler = Callable[[HardwareEvent], None] +HardwareEventUnsubscriber = Callable[[], None] RevisionLiteral = Literal["2.1", "A", "B", "C", "UNKNOWN"] diff --git a/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py b/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py index 79baa8d868a..b41df9ec36d 100644 --- a/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py +++ b/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py @@ -77,7 +77,7 @@ GripperInformation, ) -from opentrons.hardware_control.estop_state import EstopStateMachine +from opentrons.hardware_control.backends.estop_state import EstopStateMachine from opentrons_shared_data.errors.exceptions import ( EStopActivatedError, diff --git a/api/tests/opentrons/hardware_control/test_ot3_estop_state.py b/api/tests/opentrons/hardware_control/backends/test_ot3_estop_state.py similarity index 96% rename from api/tests/opentrons/hardware_control/test_ot3_estop_state.py rename to api/tests/opentrons/hardware_control/backends/test_ot3_estop_state.py index af660606b9a..e179f29417c 100644 --- a/api/tests/opentrons/hardware_control/test_ot3_estop_state.py +++ b/api/tests/opentrons/hardware_control/backends/test_ot3_estop_state.py @@ -2,7 +2,7 @@ from decoy import Decoy from typing import List, Tuple, Optional -from opentrons.hardware_control.estop_state import EstopStateMachine +from opentrons.hardware_control.backends.estop_state import EstopStateMachine from opentrons_hardware.hardware_control.estop.detector import ( EstopSummary, EstopDetector, @@ -73,11 +73,9 @@ async def test_estop_state_no_detector( subject.subscribe_to_detector(detector=mock_estop_detector) decoy.verify( - [ - mock_estop_detector.add_listener(subject.detector_listener), - mock_estop_detector.remove_listener(subject.detector_listener), - mock_estop_detector.add_listener(subject.detector_listener), - ] + mock_estop_detector.add_listener(subject.detector_listener), + mock_estop_detector.remove_listener(subject.detector_listener), + mock_estop_detector.add_listener(subject.detector_listener), ) diff --git a/api/tests/opentrons/hardware_control/backends/test_ot3_status_bar.py b/api/tests/opentrons/hardware_control/backends/test_ot3_status_bar.py new file mode 100644 index 00000000000..9e23d545961 --- /dev/null +++ b/api/tests/opentrons/hardware_control/backends/test_ot3_status_bar.py @@ -0,0 +1,43 @@ +import pytest +from decoy import Decoy +from opentrons.hardware_control.types import StatusBarState +from opentrons.hardware_control.backends.status_bar_state import ( + StatusBarStateController, +) +from opentrons_hardware.hardware_control.status_bar import StatusBar + + +@pytest.fixture +def mock_status_bar_controller(decoy: Decoy) -> StatusBar: + return decoy.mock(cls=StatusBar) + + +@pytest.fixture +def subject(mock_status_bar_controller: StatusBar) -> StatusBarStateController: + return StatusBarStateController(mock_status_bar_controller) + + +@pytest.mark.parametrize(argnames=["enabled"], argvalues=[[True], [False]]) +async def test_status_bar_interface( + subject: StatusBarStateController, enabled: bool +) -> None: + """Test setting status bar statuses and make sure the cached status is correct.""" + await subject.set_enabled(enabled) + + settings = { + StatusBarState.IDLE: StatusBarState.IDLE, + StatusBarState.RUNNING: StatusBarState.RUNNING, + StatusBarState.PAUSED: StatusBarState.PAUSED, + StatusBarState.HARDWARE_ERROR: StatusBarState.HARDWARE_ERROR, + StatusBarState.SOFTWARE_ERROR: StatusBarState.SOFTWARE_ERROR, + StatusBarState.CONFIRMATION: StatusBarState.IDLE, + StatusBarState.RUN_COMPLETED: StatusBarState.RUN_COMPLETED, + StatusBarState.UPDATING: StatusBarState.UPDATING, + StatusBarState.ACTIVATION: StatusBarState.IDLE, + StatusBarState.DISCO: StatusBarState.IDLE, + StatusBarState.OFF: StatusBarState.OFF, + } + + for setting, response in settings.items(): + await subject.set_status_bar_state(state=setting) + assert subject.get_current_state() == response diff --git a/api/tests/opentrons/hardware_control/test_importability.py b/api/tests/opentrons/hardware_control/test_importability.py new file mode 100644 index 00000000000..c3c62424309 --- /dev/null +++ b/api/tests/opentrons/hardware_control/test_importability.py @@ -0,0 +1,8 @@ +import pytest +from opentrons.hardware_control.ot3api import OT3API + + +@pytest.mark.ot2_only +async def test_flex_simulator_always_importable() -> None: + api = await OT3API.build_hardware_simulator() + assert isinstance(api, OT3API) diff --git a/api/tests/opentrons/hardware_control/test_ot3_api.py b/api/tests/opentrons/hardware_control/test_ot3_api.py index 0fae2a16f90..cca953dc183 100644 --- a/api/tests/opentrons/hardware_control/test_ot3_api.py +++ b/api/tests/opentrons/hardware_control/test_ot3_api.py @@ -1,6 +1,18 @@ """ Tests for behaviors specific to the OT3 hardware controller. """ -from typing import Iterator, Union, Dict, Tuple, List, Any, OrderedDict, Optional +from typing import ( + AsyncIterator, + Iterator, + Union, + Dict, + Tuple, + List, + Any, + OrderedDict, + Optional, + cast, + TypedDict, +) from typing_extensions import Literal from math import copysign import pytest @@ -19,6 +31,7 @@ AttachedGripper, AttachedPipette, GripperDict, + GripperSpec, ) from opentrons.hardware_control.motion_utilities import target_position_from_plunger from opentrons.hardware_control.instruments.ot3.gripper_handler import GripperHandler @@ -41,7 +54,6 @@ InstrumentProbeType, SubSystem, GripperJawState, - StatusBarState, EstopState, EstopStateNotification, TipStateType, @@ -50,13 +62,11 @@ from opentrons.hardware_control.errors import InvalidCriticalPoint from opentrons.hardware_control.ot3api import OT3API from opentrons.hardware_control import ThreadManager -from opentrons.hardware_control.backends.ot3utils import ( - axis_to_node, -) + +from opentrons.hardware_control.backends.ot3simulator import OT3Simulator from opentrons_hardware.firmware_bindings.constants import NodeId from opentrons.types import Point, Mount -from opentrons_hardware.hardware_control.motion import MoveStopCondition from opentrons_hardware.hardware_control.motion_planning.types import Move from opentrons.config import gripper_config as gc @@ -76,6 +86,7 @@ from opentrons_shared_data.pipette import ( load_data as load_pipette_data, ) +from opentrons_shared_data.pipette.dev_types import PipetteModel from opentrons.hardware_control.modules import ( Thermocycler, TempDeck, @@ -84,6 +95,7 @@ SpeedStatus, ) from opentrons.hardware_control.module_control import AttachedModulesControl +from opentrons.hardware_control.backends.types import HWStopCondition # TODO (spp, 2023-08-22): write tests for ot3api.stop & ot3api.halt @@ -118,38 +130,45 @@ def fake_liquid_settings() -> LiquidProbeSettings: @pytest.fixture -def mock_move_to(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +def managed_obj(ot3_hardware: ThreadManager[OT3API]) -> OT3API: + managed = ot3_hardware.managed_obj + assert managed + return managed + + +@pytest.fixture +def mock_move_to(managed_obj: OT3API) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "move_to", AsyncMock( - spec=ot3_hardware.managed_obj.move_to, - wraps=ot3_hardware.managed_obj.move_to, + spec=managed_obj.move_to, + wraps=managed_obj.move_to, ), ) as mock_move: yield mock_move @pytest.fixture -def mock_home(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +def mock_home(managed_obj: OT3API) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "home", AsyncMock( - spec=ot3_hardware.managed_obj.home, - wraps=ot3_hardware.managed_obj.home, + spec=managed_obj.home, + wraps=managed_obj.home, ), ) as mock_move: yield mock_move @pytest.fixture -def mock_home_plunger(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +def mock_home_plunger(managed_obj: OT3API) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "home_plunger", AsyncMock( - spec=ot3_hardware.managed_obj.home_plunger, + spec=managed_obj.home_plunger, ), ) as mock_move: yield mock_move @@ -157,142 +176,142 @@ def mock_home_plunger(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock @pytest.fixture def mock_move_to_plunger_bottom( - ot3_hardware: ThreadManager[OT3API], + managed_obj: OT3API, ) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "_move_to_plunger_bottom", AsyncMock( - spec=ot3_hardware.managed_obj._move_to_plunger_bottom, + spec=managed_obj._move_to_plunger_bottom, ), ) as mock_move: yield mock_move @pytest.fixture -def mock_move(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +def mock_move(managed_obj: OT3API) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "_move", AsyncMock( - spec=ot3_hardware.managed_obj._move, + spec=managed_obj._move, ), ) as mock_move: yield mock_move @pytest.fixture -def mock_gantry_position(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +def mock_gantry_position(managed_obj: OT3API) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "gantry_position", AsyncMock( - spec=ot3_hardware.managed_obj.gantry_position, - wraps=ot3_hardware.managed_obj.gantry_position, + spec=managed_obj.gantry_position, + wraps=managed_obj.gantry_position, ), ) as mock_gantry_pos: yield mock_gantry_pos @pytest.fixture -def mock_grip(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +def mock_grip(managed_obj: OT3API) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "_grip", AsyncMock( - spec=ot3_hardware.managed_obj._grip, - wraps=ot3_hardware.managed_obj._grip, + spec=managed_obj._grip, + wraps=managed_obj._grip, ), ) as mock_move: yield mock_move @pytest.fixture -def mock_ungrip(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +def mock_ungrip(managed_obj: OT3API) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "_ungrip", AsyncMock( - spec=ot3_hardware.managed_obj._ungrip, - wraps=ot3_hardware.managed_obj._ungrip, + spec=managed_obj._ungrip, + wraps=managed_obj._ungrip, ), ) as mock_move: yield mock_move @pytest.fixture -def mock_home_gear_motors(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +def mock_home_gear_motors(managed_obj: OT3API) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "home_gear_motors", AsyncMock( - spec=ot3_hardware.managed_obj.home_gear_motors, - wraps=ot3_hardware.managed_obj.home_gear_motors, + spec=managed_obj.home_gear_motors, + wraps=managed_obj.home_gear_motors, ), ) as mock_home_gear: yield mock_home_gear @pytest.fixture -def mock_hold_jaw_width(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +def mock_hold_jaw_width(managed_obj: OT3API) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "_hold_jaw_width", AsyncMock( - spec=ot3_hardware.managed_obj._hold_jaw_width, - wraps=ot3_hardware.managed_obj._hold_jaw_width, + spec=managed_obj._hold_jaw_width, + wraps=managed_obj._hold_jaw_width, ), ) as mock_move: yield mock_move @pytest.fixture -async def mock_backend_move(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +async def mock_backend_move(managed_obj: OT3API) -> AsyncIterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj._backend, + managed_obj._backend, "move", - AsyncMock(spec=ot3_hardware.managed_obj._backend.move), + AsyncMock(spec=managed_obj._backend.move), ) as mock_move: yield mock_move @pytest.fixture -def mock_check_motor(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +def mock_check_motor(managed_obj: OT3API) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj._backend, + managed_obj._backend, "check_motor_status", - Mock(spec=ot3_hardware.managed_obj._backend.check_motor_status), + Mock(spec=managed_obj._backend.check_motor_status), ) as mock_check: yield mock_check @pytest.fixture -def mock_check_encoder(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +def mock_check_encoder(managed_obj: OT3API) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj._backend, + managed_obj._backend, "check_encoder_status", - Mock(spec=ot3_hardware.managed_obj._backend.check_encoder_status), + Mock(spec=managed_obj._backend.check_encoder_status), ) as mock_check: yield mock_check @pytest.fixture -async def mock_refresh(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +async def mock_refresh(managed_obj: OT3API) -> AsyncIterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "refresh_positions", AsyncMock( - spec=ot3_hardware.managed_obj.refresh_positions, - wraps=ot3_hardware.managed_obj.refresh_positions, + spec=managed_obj.refresh_positions, + wraps=managed_obj.refresh_positions, ), ) as mock_refresh: yield mock_refresh @pytest.fixture -async def mock_reset(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: +async def mock_reset(managed_obj: OT3API) -> AsyncIterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "reset", AsyncMock(), ) as mock_reset: @@ -319,110 +338,146 @@ def mock_max_grip_error() -> Iterator[MagicMock]: @pytest.fixture async def mock_instrument_handlers( - ot3_hardware: ThreadManager[OT3API], -) -> Iterator[Tuple[MagicMock]]: + managed_obj: OT3API, +) -> AsyncIterator[Tuple[MagicMock, MagicMock]]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "_gripper_handler", MagicMock(spec=GripperHandler), ) as mock_gripper_handler, patch.object( - ot3_hardware.managed_obj, "_pipette_handler", MagicMock(spec=OT3PipetteHandler) + managed_obj, "_pipette_handler", MagicMock(spec=OT3PipetteHandler) ) as mock_pipette_handler: yield mock_gripper_handler, mock_pipette_handler @pytest.fixture -async def gripper_present(ot3_hardware: ThreadManager[OT3API]) -> None: +async def gripper_present( + managed_obj: OT3API, + ot3_hardware: ThreadManager[OT3API], + hardware_backend: OT3Simulator, +) -> None: # attach a gripper if we're testing the gripper mount gripper_config = gc.load(GripperModel.v1) instr_data = AttachedGripper(config=gripper_config, id="test") - ot3_hardware._backend._attached_instruments[OT3Mount.GRIPPER] = { - "model": GripperModel.v1, - "id": "test", - } - ot3_hardware._backend._present_nodes.add(NodeId.gripper) + hardware_backend._attached_instruments[OT3Mount.GRIPPER] = cast( + GripperSpec, + { + "model": GripperModel.v1, + "id": "test", + }, + ) + hardware_backend._present_axes.update((Axis.G, Axis.Z_G)) await ot3_hardware.cache_gripper(instr_data) +@pytest.fixture +def hardware_backend(managed_obj: OT3API) -> OT3Simulator: + assert isinstance( + managed_obj._backend, OT3Simulator + ), "Tests only work with simulator" + return managed_obj._backend + + +class PipetteLoadConfig(TypedDict): + channels: Literal[1, 8, 96] + version: Tuple[Literal[1, 2, 3], Literal[0, 1, 2, 3, 4, 5, 6]] + model: PipetteModel + + +class GripperLoadConfig(TypedDict): + model: GripperModel + id: str + + +LoadConfigs = List[ + Union[ + Tuple[Literal[OT3Mount.RIGHT], PipetteLoadConfig], + Tuple[Literal[OT3Mount.LEFT], PipetteLoadConfig], + Tuple[Literal[OT3Mount.GRIPPER], GripperLoadConfig], + ] +] + + @pytest.mark.parametrize( "load_configs,load", ( ( - { - OT3Mount.RIGHT: {"channels": 8, "version": (3, 3), "model": "p50"}, - OT3Mount.LEFT: {"channels": 1, "version": (3, 3), "model": "p1000"}, - }, + [ + (OT3Mount.RIGHT, {"channels": 8, "version": (3, 3), "model": "p50"}), + (OT3Mount.LEFT, {"channels": 1, "version": (3, 3), "model": "p1000"}), + ], GantryLoad.LOW_THROUGHPUT, ), - ({}, GantryLoad.LOW_THROUGHPUT), + ([], GantryLoad.LOW_THROUGHPUT), ( - {OT3Mount.GRIPPER: {"model": GripperModel.v1, "id": "g12345"}}, + [(OT3Mount.GRIPPER, {"model": GripperModel.v1, "id": "g12345"})], GantryLoad.LOW_THROUGHPUT, ), ( - {OT3Mount.LEFT: {"channels": 8, "version": (3, 3), "model": "p1000"}}, + [(OT3Mount.LEFT, {"channels": 8, "version": (3, 3), "model": "p1000"})], GantryLoad.LOW_THROUGHPUT, ), ( - {OT3Mount.RIGHT: {"channels": 8, "version": (3, 3), "model": "p1000"}}, + [(OT3Mount.RIGHT, {"channels": 8, "version": (3, 3), "model": "p1000"})], GantryLoad.LOW_THROUGHPUT, ), ( - {OT3Mount.LEFT: {"channels": 96, "model": "p1000", "version": (3, 3)}}, + [(OT3Mount.LEFT, {"channels": 96, "model": "p1000", "version": (3, 3)})], GantryLoad.HIGH_THROUGHPUT, ), ( - { - OT3Mount.LEFT: {"channels": 1, "version": (3, 3), "model": "p1000"}, - OT3Mount.GRIPPER: {"model": GripperModel.v1, "id": "g12345"}, - }, + [ + (OT3Mount.LEFT, {"channels": 1, "version": (3, 3), "model": "p1000"}), + (OT3Mount.GRIPPER, {"model": GripperModel.v1, "id": "g12345"}), + ], GantryLoad.LOW_THROUGHPUT, ), ( - { - OT3Mount.RIGHT: {"channels": 8, "version": (3, 3), "model": "p1000"}, - OT3Mount.GRIPPER: {"model": GripperModel.v1, "id": "g12345"}, - }, + [ + (OT3Mount.RIGHT, {"channels": 8, "version": (3, 3), "model": "p1000"}), + (OT3Mount.GRIPPER, {"model": GripperModel.v1, "id": "g12345"}), + ], GantryLoad.LOW_THROUGHPUT, ), ( - { - OT3Mount.LEFT: {"channels": 96, "model": "p1000", "version": (3, 3)}, - OT3Mount.GRIPPER: {"model": GripperModel.v1, "id": "g12345"}, - }, + [ + (OT3Mount.LEFT, {"channels": 96, "model": "p1000", "version": (3, 3)}), + (OT3Mount.GRIPPER, {"model": GripperModel.v1, "id": "g12345"}), + ], GantryLoad.HIGH_THROUGHPUT, ), ), ) async def test_gantry_load_transform( ot3_hardware: ThreadManager[OT3API], - load_configs: Dict[str, Union[int, str, Tuple[int, int]]], + load_configs: LoadConfigs, load: GantryLoad, ) -> None: - for mount, configs in load_configs.items(): - if mount == OT3Mount.GRIPPER: - gripper_config = gc.load(configs["model"]) - instr_data = AttachedGripper(config=gripper_config, id="2345") - await ot3_hardware.cache_gripper(instr_data) + for pair in load_configs: + if pair[0] == OT3Mount.GRIPPER: + gripper_config = gc.load(pair[1]["model"]) + gripper_data = AttachedGripper(config=gripper_config, id="2345") + await ot3_hardware.cache_gripper(gripper_data) else: pipette_config = load_pipette_data.load_definition( - PipetteModelType(configs["model"]), - PipetteChannelType(configs["channels"]), - PipetteVersionType(*configs["version"]), + PipetteModelType(pair[1]["model"]), + PipetteChannelType(pair[1]["channels"]), + PipetteVersionType(*pair[1]["version"]), ) instr_data = AttachedPipette(config=pipette_config, id="fakepip") - await ot3_hardware.cache_pipette(mount, instr_data, None) + await ot3_hardware.cache_pipette(pair[0], instr_data, None) assert ot3_hardware._gantry_load_from_instruments() == load @pytest.fixture def mock_backend_capacitive_probe( - ot3_hardware: ThreadManager[OT3API], + hardware_backend: OT3Simulator, ) -> Iterator[AsyncMock]: - backend = ot3_hardware.managed_obj._backend with patch.object( - backend, "capacitive_probe", AsyncMock(spec=backend.capacitive_probe) + hardware_backend, + "capacitive_probe", + AsyncMock(spec=hardware_backend.capacitive_probe), ) as mock_probe: def _update_position( @@ -433,7 +488,7 @@ def _update_position( threshold_pf: float, probe: InstrumentProbeType, ) -> None: - ot3_hardware._backend._position[axis_to_node(moving)] += distance_mm / 2 + hardware_backend._position[moving] += distance_mm / 2 mock_probe.side_effect = _update_position @@ -442,12 +497,12 @@ def _update_position( @pytest.fixture def mock_current_position_ot3( - ot3_hardware: ThreadManager[OT3API], + managed_obj: OT3API, ) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, + managed_obj, "current_position_ot3", - AsyncMock(spec=ot3_hardware.managed_obj.current_position_ot3), + AsyncMock(spec=managed_obj.current_position_ot3), ) as mock_position: mock_position.return_value = { Axis.X: 477.2, @@ -463,12 +518,11 @@ def mock_current_position_ot3( @pytest.fixture -def mock_backend_capacitive_pass( - ot3_hardware: ThreadManager[OT3API], -) -> Iterator[AsyncMock]: - backend = ot3_hardware.managed_obj._backend +def mock_backend_capacitive_pass(hardware_backend: OT3Simulator) -> Iterator[AsyncMock]: with patch.object( - backend, "capacitive_pass", AsyncMock(spec=backend.capacitive_pass) + hardware_backend, + "capacitive_pass", + AsyncMock(spec=hardware_backend.capacitive_pass), ) as mock_pass: async def _update_position( @@ -477,8 +531,8 @@ async def _update_position( distance_mm: float, speed_mm_per_s: float, probe: InstrumentProbeType, - ) -> None: - ot3_hardware._backend._position[axis_to_node(moving)] += distance_mm / 2 + ) -> List[float]: + hardware_backend._position[moving] += distance_mm / 2 return [1, 2, 3, 4, 5, 6, 8] mock_pass.side_effect = _update_position @@ -486,20 +540,19 @@ async def _update_position( @pytest.fixture -def mock_backend_get_tip_status( - ot3_hardware: ThreadManager[OT3API], -) -> Iterator[AsyncMock]: - backend = ot3_hardware.managed_obj._backend - with patch.object(backend, "get_tip_status", AsyncMock()) as mock_tip_status: +def mock_backend_get_tip_status(hardware_backend: OT3Simulator) -> Iterator[AsyncMock]: + with patch.object( + hardware_backend, "get_tip_status", AsyncMock() + ) as mock_tip_status: yield mock_tip_status @pytest.fixture def mock_verify_tip_presence( - ot3_hardware: ThreadManager[OT3API], + managed_obj: OT3API, ) -> Iterator[AsyncMock]: with patch.object( - ot3_hardware.managed_obj, "verify_tip_presence", AsyncMock() + managed_obj, "verify_tip_presence", AsyncMock() ) as mock_check_tip: yield mock_check_tip @@ -540,10 +593,11 @@ async def prepare_for_mock_blowout( @pytest.mark.parametrize("load_configs", load_pipette_configs) async def test_pickup_moves( ot3_hardware: ThreadManager[OT3API], - mock_instrument_handlers: Tuple[Mock], + managed_obj: OT3API, + mock_instrument_handlers: Tuple[Mock, Mock], mock_move_to_plunger_bottom: AsyncMock, mock_home_gear_motors: AsyncMock, - load_configs: List[Dict[str, Any]], + load_configs: Dict[OT3Mount, PipetteLoadConfig], ) -> None: _, pipette_handler = mock_instrument_handlers for mount, configs in load_configs.items(): @@ -578,9 +632,9 @@ async def test_pickup_moves( pipette_handler.plan_lt_pick_up_tip.return_value = move_plan_return_val with patch.object( - ot3_hardware.managed_obj, + managed_obj, "move_rel", - AsyncMock(spec=ot3_hardware.managed_obj.move_rel), + AsyncMock(spec=managed_obj.move_rel), ) as mock_move_rel: await ot3_hardware.pick_up_tip(Mount.LEFT, 40.0) move_call_list = [call.args for call in mock_move_rel.call_args_list] @@ -600,7 +654,7 @@ async def test_pickup_moves( async def test_blow_out_position( ot3_hardware: ThreadManager[OT3API], mock_backend_get_tip_status: AsyncMock, - load_configs: List[Dict[str, Any]], + load_configs: Dict[OT3Mount, PipetteLoadConfig], blowout_volume: float, ) -> None: liquid_class = LiquidClasses.default @@ -650,7 +704,7 @@ async def test_blow_out_position( async def test_blow_out_error( ot3_hardware: ThreadManager[OT3API], mock_backend_get_tip_status: AsyncMock, - load_configs: List[Dict[str, Any]], + load_configs: Dict[OT3Mount, PipetteLoadConfig], blowout_volume: float, ) -> None: liquid_class = LiquidClasses.default @@ -718,6 +772,7 @@ async def test_move_to_without_homing_first( async def test_liquid_probe( mock_move_to: AsyncMock, ot3_hardware: ThreadManager[OT3API], + hardware_backend: OT3Simulator, head_node: NodeId, pipette_node: Axis, mount: OT3Mount, @@ -728,13 +783,12 @@ async def test_liquid_probe( mock_move_to_plunger_bottom: AsyncMock, ) -> None: mock_ungrip.return_value = None - backend = ot3_hardware.managed_obj._backend await ot3_hardware.home() mock_move_to.return_value = None with patch.object( - backend, "liquid_probe", AsyncMock(spec=backend.liquid_probe) - ) as mock_position: + hardware_backend, "liquid_probe", AsyncMock(spec=hardware_backend.liquid_probe) + ) as mock_liquid_probe: return_dict = { head_node: 140, NodeId.gantry_x: 0, @@ -743,7 +797,7 @@ async def test_liquid_probe( } # make sure aspirate while sensing reverses direction - mock_position.return_value = return_dict + mock_liquid_probe.return_value = return_dict fake_settings_aspirate = LiquidProbeSettings( starting_mount_height=100, max_z_distance=15, @@ -760,7 +814,7 @@ async def test_liquid_probe( ) await ot3_hardware.liquid_probe(mount, fake_settings_aspirate) mock_move_to_plunger_bottom.assert_called_once() - backend.liquid_probe.assert_called_once_with( + mock_liquid_probe.assert_called_once_with( mount, fake_settings_aspirate.max_z_distance, fake_settings_aspirate.mount_speed, @@ -773,7 +827,7 @@ async def test_liquid_probe( ) return_dict[head_node], return_dict[pipette_node] = 142, 142 - mock_position.return_value = return_dict + mock_liquid_probe.return_value = return_dict await ot3_hardware.liquid_probe( mount, fake_liquid_settings ) # should raise no exceptions @@ -820,9 +874,6 @@ async def test_capacitive_probe( assert this_point == original -Direction = Union[Literal[0.0], Literal[1.0], Literal[-1.0]] - - @pytest.mark.parametrize( "target,origin,prep_direction,probe_direction", [ @@ -861,8 +912,8 @@ async def test_probe_direction( fake_settings: CapacitivePassSettings, target: float, origin: Point, - prep_direction: Direction, - probe_direction: Direction, + prep_direction: float, + probe_direction: float, ) -> None: mock_gantry_position.return_value = origin await ot3_hardware.capacitive_probe(OT3Mount.RIGHT, Axis.X, target, fake_settings) @@ -1153,7 +1204,7 @@ async def test_gripper_fails_for_pipette_cps( @pytest.mark.xfail -async def test_gripper_position(ot3_hardware: ThreadManager[OT3API]): +async def test_gripper_position(ot3_hardware: ThreadManager[OT3API]) -> None: gripper_config = gc.load(GripperModel.v1) instr_data = AttachedGripper(config=gripper_config, id="g12345") await ot3_hardware.cache_gripper(instr_data) @@ -1169,27 +1220,29 @@ async def test_gripper_position(ot3_hardware: ThreadManager[OT3API]): async def test_gripper_move_to( ot3_hardware: ThreadManager[OT3API], mock_backend_move: AsyncMock -): +) -> None: # Moving the gripper should, well, work gripper_config = gc.load(GripperModel.v1) instr_data = AttachedGripper(config=gripper_config, id="g12345") await ot3_hardware.cache_gripper(instr_data) await ot3_hardware.move_to(OT3Mount.GRIPPER, Point(0, 0, 0)) - _, moves, _ = mock_backend_move.call_args_list[0][0] - for move in moves: - assert list(sorted(move.unit_vector.keys(), key=lambda elem: elem.value)) == [ + origin, target, _, _ = mock_backend_move.call_args_list[0][0] + assert sorted(target.keys(), key=lambda elem: cast(int, elem.value)) == sorted( + [ Axis.X, Axis.Y, Axis.Z_G, - ] + ], + key=lambda elem: cast(int, elem.value), + ) async def test_home_plunger( ot3_hardware: ThreadManager[OT3API], mock_move_to_plunger_bottom: AsyncMock, mock_home: AsyncMock, -): +) -> None: mount = OT3Mount.LEFT instr_data = AttachedPipette( config=load_pipette_data.load_definition( @@ -1208,7 +1261,7 @@ async def test_home_plunger( async def test_prepare_for_aspirate( ot3_hardware: ThreadManager[OT3API], mock_move_to_plunger_bottom: AsyncMock, -): +) -> None: mount = OT3Mount.LEFT instr_data = AttachedPipette( config=load_pipette_data.load_definition( @@ -1241,7 +1294,7 @@ async def test_plunger_ready_to_aspirate_after_dispense( disp_vol: float, push_out: Optional[float], is_ready: bool, -): +) -> None: mount = OT3Mount.LEFT instr_data = AttachedPipette( @@ -1270,7 +1323,7 @@ async def test_plunger_ready_to_aspirate_after_dispense( async def test_move_to_plunger_bottom( ot3_hardware: ThreadManager[OT3API], mock_move: AsyncMock, -): +) -> None: mount = OT3Mount.LEFT instr_data = AttachedPipette( config=load_pipette_data.load_definition( @@ -1368,7 +1421,7 @@ async def test_move_axes( mock_check_motor: Mock, input_position: Dict[Axis, float], expected_move_pos: OrderedDict[Axis, float], -): +) -> None: await ot3_hardware.move_axes(position=input_position) mock_check_motor.return_value = True @@ -1389,11 +1442,11 @@ async def test_move_expect_stall_flag( expect_stalls: bool, ) -> None: - expected = MoveStopCondition.stall if expect_stalls else MoveStopCondition.none + expected = HWStopCondition.stall if expect_stalls else HWStopCondition.none await ot3_hardware.move_to(Mount.LEFT, Point(0, 0, 0), _expect_stalls=expect_stalls) mock_backend_move.assert_called_once() - _, _, condition = mock_backend_move.call_args_list[0][0] + _, _, _, condition = mock_backend_move.call_args_list[0][0] assert condition == expected mock_backend_move.reset_mock() @@ -1401,7 +1454,7 @@ async def test_move_expect_stall_flag( Mount.LEFT, Point(10, 0, 0), _expect_stalls=expect_stalls ) mock_backend_move.assert_called_once() - _, _, condition = mock_backend_move.call_args_list[0][0] + _, _, _, condition = mock_backend_move.call_args_list[0][0] assert condition == expected @@ -1418,7 +1471,7 @@ async def test_move_expect_stall_flag( async def test_reset_instrument_offset( ot3_hardware: ThreadManager[OT3API], mount: Union[OT3Mount, Mount], - mock_instrument_handlers: Tuple[Mock], + mock_instrument_handlers: Tuple[Mock, Mock], ) -> None: gripper_handler, pipette_handler = mock_instrument_handlers await ot3_hardware.reset_instrument_offset(mount) @@ -1432,60 +1485,66 @@ async def test_reset_instrument_offset( @pytest.mark.parametrize( - argnames=["mount", "expected_offset"], + argnames=["mount_expected_offset"], argvalues=[ [ - OT3Mount.GRIPPER, - GripperCalibrationOffset( - offset=Point(1, 2, 3), - source=SourceType.default, - status=CalibrationStatus(), - last_modified=None, + ( + OT3Mount.GRIPPER, + GripperCalibrationOffset( + offset=Point(1, 2, 3), + source=SourceType.default, + status=CalibrationStatus(), + last_modified=None, + ), ), ], [ - OT3Mount.RIGHT, - PipetteOffsetByPipetteMount( - offset=Point(10, 20, 30), - source=SourceType.default, - status=CalibrationStatus(), - last_modified=None, + ( + OT3Mount.RIGHT, + PipetteOffsetByPipetteMount( + offset=Point(10, 20, 30), + source=SourceType.default, + status=CalibrationStatus(), + last_modified=None, + ), ), ], [ - OT3Mount.LEFT, - PipetteOffsetByPipetteMount( - offset=Point(100, 200, 300), - source=SourceType.default, - status=CalibrationStatus(), - last_modified=None, + ( + OT3Mount.LEFT, + PipetteOffsetByPipetteMount( + offset=Point(100, 200, 300), + source=SourceType.default, + status=CalibrationStatus(), + last_modified=None, + ), ), ], ], ) def test_get_instrument_offset( ot3_hardware: ThreadManager[OT3API], - mount: OT3Mount, - expected_offset: Union[GripperCalibrationOffset, PipetteOffsetByPipetteMount], - mock_instrument_handlers: Tuple[Mock], + mount_expected_offset: Union[ + Tuple[Literal[OT3Mount.GRIPPER], GripperCalibrationOffset], + Tuple[Literal[OT3Mount.RIGHT], PipetteOffsetByPipetteMount], + Tuple[Literal[OT3Mount.LEFT], PipetteOffsetByPipetteMount], + ], + mock_instrument_handlers: Tuple[Mock, Mock], ) -> None: gripper_handler, pipette_handler = mock_instrument_handlers - if mount == OT3Mount.GRIPPER: + if mount_expected_offset[0] == OT3Mount.GRIPPER: gripper_handler.get_gripper_dict.return_value = GripperDict( model=GripperModel.v1, gripper_id="abc", state=GripperJawState.UNHOMED, display_name="abc", - fw_update_required=False, - fw_current_version=100, - fw_next_version=None, - calibration_offset=expected_offset, + calibration_offset=mount_expected_offset[1], ) else: - pipette_handler.get_instrument_offset.return_value = expected_offset + pipette_handler.get_instrument_offset.return_value = mount_expected_offset[1] - found_offset = ot3_hardware.get_instrument_offset(mount=mount) - assert found_offset == expected_offset + found_offset = ot3_hardware.get_instrument_offset(mount=mount_expected_offset[0]) + assert found_offset == mount_expected_offset[1] @pytest.mark.parametrize( @@ -1501,7 +1560,7 @@ def test_get_instrument_offset( async def test_save_instrument_offset( ot3_hardware: ThreadManager[OT3API], mount: Union[OT3Mount, Mount], - mock_instrument_handlers: Tuple[Mock], + mock_instrument_handlers: Tuple[Mock, Mock], ) -> None: gripper_handler, pipette_handler = mock_instrument_handlers await ot3_hardware.save_instrument_offset(mount, Point(1, 1, 1)) @@ -1517,7 +1576,8 @@ async def test_save_instrument_offset( @pytest.mark.xfail() async def test_pick_up_tip_full_tiprack( ot3_hardware: ThreadManager[OT3API], - mock_instrument_handlers: Tuple[Mock], + hardware_backend: OT3Simulator, + mock_instrument_handlers: Tuple[Mock, Mock], mock_ungrip: AsyncMock, mock_move_to_plunger_bottom: AsyncMock, mock_home_gear_motors: AsyncMock, @@ -1526,15 +1586,14 @@ async def test_pick_up_tip_full_tiprack( mock_ungrip.return_value = None await ot3_hardware.home() _, pipette_handler = mock_instrument_handlers - backend = ot3_hardware.managed_obj._backend instr_mock = AsyncMock(spec=Pipette) instr_mock.nozzle_manager.current_configruation.configuration.return_value = ( NozzleConfigurationType.FULL ) with patch.object( - backend, "tip_action", AsyncMock(spec=backend.tip_action) + hardware_backend, "tip_action", AsyncMock(spec=hardware_backend.tip_action) ) as tip_action: - backend._gear_motor_position = {NodeId: 0} + hardware_backend._gear_motor_position = {Axis.P_L: 0} pipette_handler.get_pipette.return_value = instr_mock pipette_handler.plan_ht_pick_up_tip.return_value = TipActionSpec( @@ -1556,16 +1615,16 @@ def _update_gear_motor_pos( moves: Optional[List[Move[Axis]]] = None, distance: Optional[float] = None, ) -> None: - if NodeId.pipette_left not in backend._gear_motor_position: - backend._gear_motor_position = {NodeId.pipette_left: 0.0} + if Axis.P_L not in hardware_backend._gear_motor_position: + hardware_backend._gear_motor_position = {Axis.P_L: 0.0} if moves: for move in moves: for block in move.blocks: - backend._gear_motor_position[NodeId.pipette_left] += ( + hardware_backend._gear_motor_position[Axis.P_L] += float( block.distance * move.unit_vector[Axis.Q] ) elif distance: - backend._gear_motor_position[NodeId.pipette_left] += distance + hardware_backend._gear_motor_position[Axis.P_L] += distance tip_action.side_effect = _update_gear_motor_pos await ot3_hardware.set_gantry_load(GantryLoad.HIGH_THROUGHPUT) @@ -1582,17 +1641,20 @@ def _update_gear_motor_pos( async def test_drop_tip_full_tiprack( ot3_hardware: ThreadManager[OT3API], - mock_instrument_handlers: Tuple[Mock], + hardware_backend: OT3Simulator, + mock_instrument_handlers: Tuple[Mock, Mock], + mock_backend_get_tip_status: AsyncMock, mock_home_gear_motors: AsyncMock, mock_verify_tip_presence: AsyncMock, ) -> None: _, pipette_handler = mock_instrument_handlers - backend = ot3_hardware.managed_obj._backend with patch.object( - backend, "tip_action", AsyncMock(spec=backend.tip_action) + hardware_backend, + "tip_action", + AsyncMock(spec=hardware_backend.tip_action, wraps=hardware_backend.tip_action), ) as tip_action: - backend._gear_motor_position = {NodeId.pipette_left: 0} + hardware_backend._gear_motor_position = {Axis.Q: 0} pipette_handler.plan_ht_drop_tip.return_value = TipActionSpec( tip_action_moves=[ TipActionMoveSpec( @@ -1610,35 +1672,21 @@ def set_mock_plunger_configs() -> None: mock_instr.config.plunger_homing_configurations.current = 1.0 mock_instr.plunger_positions.bottom = -18.5 - def _update_gear_motor_pos( - moves: Optional[List[Move[Axis]]] = None, - distance: Optional[float] = None, - velocity: Optional[float] = None, - tip_action: str = "home", - ) -> None: - if NodeId.pipette_left not in backend._gear_motor_position: - backend._gear_motor_position = {NodeId.pipette_left: 0.0} - if moves: - for move in moves: - for block in move.blocks: - backend._gear_motor_position[ - NodeId.pipette_left - ] += block.distance - elif distance: - backend._gear_motor_position[NodeId.pipette_left] += distance - - tip_action.side_effect = _update_gear_motor_pos set_mock_plunger_configs() await ot3_hardware.set_gantry_load(GantryLoad.HIGH_THROUGHPUT) mock_backend_get_tip_status.return_value = TipStateType.ABSENT await ot3_hardware.drop_tip(Mount.LEFT, home_after=True) pipette_handler.plan_ht_drop_tip.assert_called_once_with() + assert len(tip_action.call_args_list) == 2 # first call should be "clamp", moving down - assert tip_action.call_args_list[0][-1]["moves"][0].unit_vector == {Axis.Q: 1} + first_target = tip_action.call_args_list[0][-1]["targets"][0][0] + assert list(first_target.keys()) == [Axis.Q] + assert first_target[Axis.Q] == 10 # next call should be "clamp", moving back up - assert tip_action.call_args_list[1][-1]["moves"][0].unit_vector == {Axis.Q: -1} - assert len(tip_action.call_args_list) == 2 + second_target = tip_action.call_args_list[1][-1]["targets"][0][0] + assert list(second_target.keys()) == [Axis.Q] + assert second_target[Axis.Q] < 10 # home should be called after tip_action is done assert len(mock_home_gear_motors.call_args_list) == 1 @@ -1648,14 +1696,14 @@ def _update_gear_motor_pos( [[Axis.X], [Axis.X, Axis.Y], [Axis.X, Axis.Y, Axis.P_L], None], ) async def test_update_position_estimation( - ot3_hardware: ThreadManager[OT3API], axes: List[Axis] + ot3_hardware: ThreadManager[OT3API], + hardware_backend: OT3Simulator, + axes: List[Axis], ) -> None: - - backend = ot3_hardware.managed_obj._backend with patch.object( - backend, + hardware_backend, "update_motor_estimation", - AsyncMock(spec=backend.update_motor_estimation), + AsyncMock(spec=hardware_backend.update_motor_estimation), ) as mock_update: await ot3_hardware._update_position_estimation(axes) if axes is None: @@ -1663,24 +1711,25 @@ async def test_update_position_estimation( mock_update.assert_called_once_with(axes) -async def test_refresh_positions(ot3_hardware: ThreadManager[OT3API]) -> None: +async def test_refresh_positions( + ot3_hardware: ThreadManager[OT3API], hardware_backend: OT3Simulator +) -> None: - backend = ot3_hardware.managed_obj._backend ot3_hardware._current_position.clear() ot3_hardware._encoder_position.clear() with patch.object( - backend, + hardware_backend, "update_motor_status", - AsyncMock(spec=backend.update_motor_status), + AsyncMock(spec=hardware_backend.update_motor_status), ) as mock_update_status, patch.object( - backend, + hardware_backend, "update_position", - AsyncMock(spec=backend.update_position), + AsyncMock(spec=hardware_backend.update_position), ) as mock_pos, patch.object( - backend, + hardware_backend, "update_encoder_position", - AsyncMock(spec=backend.update_encoder_position), + AsyncMock(spec=hardware_backend.update_encoder_position), ) as mock_encoder: mock_pos.return_value = {ax: 100 for ax in Axis} @@ -1707,6 +1756,7 @@ async def test_refresh_positions(ot3_hardware: ThreadManager[OT3API]) -> None: ) async def test_home_axis( ot3_hardware: ThreadManager[OT3API], + hardware_backend: OT3Simulator, mock_check_motor: Mock, mock_check_encoder: Mock, axis: Axis, @@ -1722,37 +1772,34 @@ async def test_home_axis( instr_data = AttachedPipette(config=pipette_config, id="fakepip") await ot3_hardware.cache_pipette(Axis.to_ot3_mount(axis), instr_data, None) - backend = ot3_hardware.managed_obj._backend origin_pos = {ax: 100 for ax in Axis} origin_encoder = {ax: 99 for ax in Axis} - backend._position = {axis_to_node(ax): v for ax, v in origin_pos.items()} - backend._encoder_position = { - axis_to_node(ax): v for ax, v in origin_encoder.items() - } + hardware_backend._position = {ax: v for ax, v in origin_pos.items()} + hardware_backend._encoder_position = {ax: v for ax, v in origin_encoder.items()} mock_check_motor.return_value = stepper_ok mock_check_encoder.return_value = encoder_ok with patch.object( - backend, + hardware_backend, "move", AsyncMock( - spec=backend.move, - wraps=backend.move, + spec=hardware_backend.move, + wraps=hardware_backend.move, ), - ) as mock_backend_move, patch.object( - backend, + ) as mock_hardware_backend_move, patch.object( + hardware_backend, "home", AsyncMock( - spec=backend.home, - wraps=backend.home, + spec=hardware_backend.home, + wraps=hardware_backend.home, ), - ) as mock_backend_home, patch.object( - backend, + ) as mock_hardware_backend_home, patch.object( + hardware_backend, "update_motor_estimation", AsyncMock( - spec=backend.update_motor_estimation, - wraps=backend.update_motor_estimation, + spec=hardware_backend.update_motor_estimation, + wraps=hardware_backend.update_motor_estimation, ), ) as mock_estimate: @@ -1766,31 +1813,31 @@ async def test_home_axis( if stepper_ok and encoder_ok: """Copy encoder position to stepper pos""" - # for accurate axis, we just move to home pos: + # for accurate axis, we just move very close to home pos if axis in [Axis.Z_L, Axis.P_L]: # move is called - mock_backend_move.assert_awaited_once() - move = mock_backend_move.call_args_list[0][0][1][0] - assert move.distance == 95.0 + mock_hardware_backend_move.assert_awaited_once() + target = mock_hardware_backend_move.call_args_list[0][0][1][axis] + assert target == 5 # then home is called - mock_backend_home.assert_awaited_once() + mock_hardware_backend_home.assert_awaited_once() else: # we move to 20 mm away from home - mock_backend_move.assert_awaited_once() - move = mock_backend_move.call_args_list[0][0][1][0] - assert move.distance == 80.0 + mock_hardware_backend_move.assert_awaited_once() + target = mock_hardware_backend_move.call_args_list[0][0][1][axis] + assert target == 20.0 # then home is called - mock_backend_home.assert_awaited_once() + mock_hardware_backend_home.assert_awaited_once() else: # home axis - mock_backend_home.assert_awaited_once() + mock_hardware_backend_home.assert_awaited_once() # move not called - mock_backend_move.assert_not_awaited() + mock_hardware_backend_move.assert_not_awaited() # axis is at the home position - expected_pos = {axis_to_node(ax): v for ax, v in origin_pos.items()} - expected_pos.update({axis_to_node(axis): 0}) - assert backend._position == expected_pos + expected_pos = {ax: v for ax, v in origin_pos.items()} + expected_pos.update({axis: 0}) + assert hardware_backend._position == expected_pos @pytest.mark.parametrize("setting", [True, False]) @@ -1843,33 +1890,6 @@ def test_fw_version( assert ot3_hardware.get_fw_version() == version_str -@pytest.mark.parametrize(argnames=["enabled"], argvalues=[[True], [False]]) -async def test_status_bar_interface( - ot3_hardware: ThreadManager[OT3API], - enabled: bool, -) -> None: - """Test setting status bar statuses and make sure the cached status is correct.""" - await ot3_hardware.set_status_bar_enabled(enabled) - - settings = { - StatusBarState.IDLE: StatusBarState.IDLE, - StatusBarState.RUNNING: StatusBarState.RUNNING, - StatusBarState.PAUSED: StatusBarState.PAUSED, - StatusBarState.HARDWARE_ERROR: StatusBarState.HARDWARE_ERROR, - StatusBarState.SOFTWARE_ERROR: StatusBarState.SOFTWARE_ERROR, - StatusBarState.CONFIRMATION: StatusBarState.IDLE, - StatusBarState.RUN_COMPLETED: StatusBarState.RUN_COMPLETED, - StatusBarState.UPDATING: StatusBarState.UPDATING, - StatusBarState.ACTIVATION: StatusBarState.IDLE, - StatusBarState.DISCO: StatusBarState.IDLE, - StatusBarState.OFF: StatusBarState.OFF, - } - - for setting, response in settings.items(): - await ot3_hardware.set_status_bar_state(state=setting) - assert ot3_hardware.get_status_bar_state() == response - - @pytest.mark.parametrize( argnames=["old_state", "new_state", "should_trigger"], argvalues=[ @@ -1937,7 +1957,7 @@ async def test_stop_only_home_necessary_axes( mock_home: AsyncMock, mock_reset: AsyncMock, jaw_state: GripperJawState, -): +) -> None: gripper_config = gc.load(GripperModel.v1) instr_data = AttachedGripper(config=gripper_config, id="test") await ot3_hardware.cache_gripper(instr_data) diff --git a/api/tests/opentrons/hardware_control/test_ot3_calibration.py b/api/tests/opentrons/hardware_control/test_ot3_calibration.py index 6ecd5f360c1..f2eee2bdcca 100644 --- a/api/tests/opentrons/hardware_control/test_ot3_calibration.py +++ b/api/tests/opentrons/hardware_control/test_ot3_calibration.py @@ -4,8 +4,7 @@ import pytest import json from math import isclose -from typing import Iterator, Tuple -from typing_extensions import Literal +from typing import AsyncIterator, Iterator, Tuple, Any, Literal from mock import patch, AsyncMock, Mock, call as mock_call from opentrons.hardware_control import ThreadManager from opentrons.hardware_control.ot3api import OT3API @@ -14,7 +13,6 @@ from opentrons.hardware_control.ot3_calibration import ( find_edge_binary, find_axis_center, - EarlyCapacitiveSenseTrigger, find_calibration_structure_height, find_slot_center_binary, find_slot_center_noncontact, @@ -23,30 +21,35 @@ _edges_from_data, _probe_deck_at, _verify_edge_pos, - InaccurateNonContactSweepError, - CalibrationStructureNotFoundError, - EdgeNotFoundError, PREP_OFFSET_DEPTH, EDGES, ) from opentrons.types import Point from opentrons_shared_data.deck import get_calibration_square_position_in_slot +from opentrons_shared_data.errors.exceptions import ( + CalibrationStructureNotFoundError, + EdgeNotFoundError, + EarlyCapacitiveSenseTrigger, + InaccurateNonContactSweepError, +) @pytest.fixture(autouse=True) -def mock_save_json(): +def mock_save_json() -> Iterator[Mock]: with patch("json.dump", Mock(spec=json.dump)) as jd: yield jd @pytest.fixture def mock_move_to(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: + managed = ot3_hardware.managed_obj + assert managed with patch.object( - ot3_hardware.managed_obj, + managed, "move_to", AsyncMock( - spec=ot3_hardware.managed_obj.move_to, - wraps=ot3_hardware.managed_obj.move_to, + spec=managed.move_to, + wraps=managed.move_to, ), ) as mock_move: yield mock_move @@ -54,12 +57,14 @@ def mock_move_to(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: @pytest.fixture def mock_capacitive_probe(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: + managed = ot3_hardware.managed_obj + assert managed with patch.object( - ot3_hardware.managed_obj, + managed, "capacitive_probe", AsyncMock( - spec=ot3_hardware.managed_obj.capacitive_probe, - wraps=ot3_hardware.managed_obj.capacitive_probe, + spec=managed.capacitive_probe, + wraps=managed.capacitive_probe, ), ) as mock_probe: yield mock_probe @@ -79,12 +84,14 @@ def mock_probe_deck() -> Iterator[AsyncMock]: @pytest.fixture def mock_capacitive_sweep(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]: + managed = ot3_hardware.managed_obj + assert managed with patch.object( - ot3_hardware.managed_obj, + managed, "capacitive_sweep", AsyncMock( - spec=ot3_hardware.managed_obj.capacitive_sweep, - wraps=ot3_hardware.managed_obj.capacitive_sweep, + spec=managed.capacitive_sweep, + wraps=managed.capacitive_sweep, ), ) as mock_sweep: yield mock_sweep @@ -111,13 +118,15 @@ def mock_data_analysis() -> Iterator[Mock]: def _update_edge_sense_config( - old: OT3CalibrationSettings, **new_edge_sense_settings + old: OT3CalibrationSettings, **new_edge_sense_settings: Any ) -> OT3CalibrationSettings: return replace(old, edge_sense=replace(old.edge_sense, **new_edge_sense_settings)) @pytest.fixture -async def override_cal_config(ot3_hardware: ThreadManager[OT3API]) -> Iterator[None]: +async def override_cal_config( + ot3_hardware: ThreadManager[OT3API], +) -> AsyncIterator[None]: old_calibration = copy.deepcopy(ot3_hardware.config.calibration) await ot3_hardware.update_config( calibration=_update_edge_sense_config( @@ -148,18 +157,18 @@ def _other_axis_val(point: Tuple[float, float, float], main_axis: Axis) -> float @pytest.mark.parametrize( - "search_axis,direction_if_hit,probe_results,search_result", + "direction_if_hit,probe_results,search_result", [ # For each axis and direction, test # 1. hit-miss-miss # 2. miss-hit-hit # 3. miss-hit-miss - (Axis.X, -1, (_HIT, _MISS, _MISS), -1), - (Axis.X, -1, (_MISS, _HIT, _HIT), 1), - (Axis.X, -1, (_MISS, _HIT, _MISS), 3), - (Axis.X, 1, (_HIT, _MISS, _MISS), 1), - (Axis.X, 1, (_MISS, _HIT, _HIT), -1), - (Axis.X, 1, (_MISS, _HIT, _MISS), -3), + (-1, (_HIT, _MISS, _MISS), -1), + (-1, (_MISS, _HIT, _HIT), 1), + (-1, (_MISS, _HIT, _MISS), 3), + (1, (_HIT, _MISS, _MISS), 1), + (1, (_MISS, _HIT, _HIT), -1), + (1, (_MISS, _HIT, _MISS), -3), ], ) async def test_find_edge( @@ -168,7 +177,6 @@ async def test_find_edge( override_cal_config: None, mock_verify_edge: AsyncMock, mock_move_to: AsyncMock, - search_axis: Axis, direction_if_hit: Literal[1, -1], probe_results: Tuple[float, float, float], search_result: float, @@ -179,18 +187,18 @@ async def test_find_edge( ot3_hardware, OT3Mount.RIGHT, Point(0, 0, 0), - search_axis, + Axis.X, direction_if_hit, False, ) - assert search_axis.of_point(result) == search_result + assert Axis.X.of_point(result) == search_result # the first move is in z only to the cal height checked_calls = mock_move_to.call_args_list[1:] # all other moves should only move in the search axis for call in checked_calls: assert call[0][0] == OT3Mount.RIGHT - assert _other_axis_val(call[0][1], search_axis) == pytest.approx( - _other_axis_val(Point(0, 0, 0), search_axis) + assert _other_axis_val(call[0][1], Axis.X) == pytest.approx( + _other_axis_val(Point(0, 0, 0), Axis.X) ) @@ -206,7 +214,7 @@ async def test_edge_not_found( mock_capacitive_probe: AsyncMock, override_cal_config: None, mock_move_to: AsyncMock, - search_axis: Axis, + search_axis: Literal[Axis.X, Axis.Y], direction_if_hit: Literal[1, -1], probe_results: Tuple[float, float, float], ) -> None: @@ -295,6 +303,8 @@ async def test_method_enum( ot3_hardware: ThreadManager[OT3API], override_cal_config: None, ) -> None: + managed = ot3_hardware.managed_obj + assert managed with patch( "opentrons.hardware_control.ot3_calibration.find_slot_center_binary", AsyncMock(spec=find_slot_center_binary), @@ -308,9 +318,9 @@ async def test_method_enum( "opentrons.hardware_control.ot3_calibration.find_calibration_structure_height", AsyncMock(spec=find_calibration_structure_height), ) as find_deck, patch.object( - ot3_hardware.managed_obj, "reset_instrument_offset", AsyncMock() + managed, "reset_instrument_offset", AsyncMock() ) as reset_instrument_offset, patch.object( - ot3_hardware.managed_obj, "save_instrument_offset", AsyncMock() + managed, "save_instrument_offset", AsyncMock() ) as save_instrument_offset: find_deck.return_value = 10 calibration_target.return_value = Point(0.0, 0.0, 0.0) @@ -348,10 +358,12 @@ async def test_method_enum( async def test_calibrate_mount_errors( ot3_hardware: ThreadManager[OT3API], mock_data_analysis: Mock ) -> None: + managed = ot3_hardware.managed_obj + assert managed with patch.object( - ot3_hardware.managed_obj, "reset_instrument_offset", AsyncMock() + managed, "reset_instrument_offset", AsyncMock() ) as reset_instrument_offset, patch.object( - ot3_hardware.managed_obj, "save_instrument_offset", AsyncMock() + managed, "save_instrument_offset", AsyncMock() ) as save_instrument_offset, patch( "opentrons.hardware_control.ot3_calibration.find_calibration_structure_height", AsyncMock(spec=find_calibration_structure_height), diff --git a/api/tests/opentrons/hardware_control/test_ot3_transforms.py b/api/tests/opentrons/hardware_control/test_ot3_transforms.py index a365e1866d7..37328043e84 100644 --- a/api/tests/opentrons/hardware_control/test_ot3_transforms.py +++ b/api/tests/opentrons/hardware_control/test_ot3_transforms.py @@ -1,16 +1,18 @@ import pytest +from typing import Dict, Optional from unittest import mock from opentrons import types from opentrons.hardware_control import ot3api -from opentrons.hardware_control.types import Axis, OT3Mount +from opentrons.hardware_control.types import Axis from opentrons_shared_data.pipette import name_for_model +from opentrons_shared_data.pipette.dev_types import PipetteModel @pytest.mark.parametrize( "pipette_model", ["p1000_single_v3.3", "p1000_single_v3.3", "p50_multi_v3.3"] ) -async def test_transforms_roundtrip(pipette_model): - attached = { +async def test_transforms_roundtrip(pipette_model: PipetteModel) -> None: + attached: Dict[types.Mount, Dict[str, Optional[str]]] = { types.Mount.LEFT: { "model": pipette_model, "id": pipette_model + "_idididid_left", @@ -21,7 +23,6 @@ async def test_transforms_roundtrip(pipette_model): "id": pipette_model + "_idididid_right", "name": name_for_model(pipette_model), }, - OT3Mount.GRIPPER: None, } sim = await ot3api.OT3API.build_hardware_simulator(attached_instruments=attached) target = types.Point(20, 30, 40) @@ -32,8 +33,10 @@ async def test_transforms_roundtrip(pipette_model): @pytest.mark.parametrize( "pipette_model", ["p1000_single_v3.3", "p50_single_v3.3", "p1000_multi_v3.3"] ) -async def test_transform_values(pipette_model, enable_ot3_hardware_controller): - attached = { +async def test_transform_values( + pipette_model: PipetteModel, enable_ot3_hardware_controller: None +) -> None: + attached: Dict[types.Mount, Dict[str, Optional[str]]] = { types.Mount.LEFT: { "model": pipette_model, "id": pipette_model + "_idididid_left", @@ -48,13 +51,15 @@ async def test_transform_values(pipette_model, enable_ot3_hardware_controller): sim = await ot3api.OT3API.build_hardware_simulator(attached_instruments=attached) target = types.Point(20, 30, 40) with mock.patch.object( - sim._move_manager, - "plan_motion", - mock.MagicMock(side_effect=sim._move_manager.plan_motion), - spec=sim._move_manager.plan_motion, + sim._backend, + "move", + mock.MagicMock(side_effect=sim._backend.move), + spec=sim._backend.move, ) as mock_move: await sim.move_to(types.Mount.RIGHT, target) - right_offset = sim.hardware_instruments[types.Mount.RIGHT].critical_point() + right_pipette = sim.hardware_instruments[types.Mount.RIGHT] + assert right_pipette + right_offset = right_pipette.critical_point() point = [ (target.x - right_offset[0] - sim.config.right_mount_offset[0]) * -1 + sim.config.carriage_offset[0], @@ -63,18 +68,20 @@ async def test_transform_values(pipette_model, enable_ot3_hardware_controller): (target.z - right_offset[2] - sim.config.right_mount_offset[2]) * -1 + sim.config.carriage_offset[2], ] - assert mock_move.call_args[1]["target_list"][0].position[Axis.X] == point[0] - assert mock_move.call_args[1]["target_list"][0].position[Axis.Y] == point[1] - assert mock_move.call_args[1]["target_list"][0].position[Axis.Z_R] == point[2] + assert mock_move.call_args[0][1][Axis.X] == point[0] + assert mock_move.call_args[0][1][Axis.Y] == point[1] + assert mock_move.call_args[0][1][Axis.Z_R] == point[2] with mock.patch.object( - sim._move_manager, - "plan_motion", - mock.MagicMock(side_effect=sim._move_manager.plan_motion), - spec=sim._move_manager.plan_motion, + sim._backend, + "move", + mock.MagicMock(side_effect=sim._backend.move), + spec=sim._backend.move, ) as mock_move: await sim.move_to(types.Mount.LEFT, target) - left_offset = sim.hardware_instruments[types.Mount.LEFT].critical_point() + left_pipette = sim.hardware_instruments[types.Mount.LEFT] + assert left_pipette + left_offset = left_pipette.critical_point() point = [ (target.x - left_offset[0] - sim.config.left_mount_offset[0]) * -1 + sim.config.carriage_offset[0], @@ -83,6 +90,6 @@ async def test_transform_values(pipette_model, enable_ot3_hardware_controller): (target.z - left_offset[2] - sim.config.left_mount_offset[2]) * -1 + sim.config.carriage_offset[2], ] - assert mock_move.call_args[1]["target_list"][0].position[Axis.X] == point[0] - assert mock_move.call_args[1]["target_list"][0].position[Axis.Y] == point[1] - assert mock_move.call_args[1]["target_list"][0].position[Axis.Z_L] == point[2] + assert mock_move.call_args[0][1][Axis.X] == point[0] + assert mock_move.call_args[0][1][Axis.Y] == point[1] + assert mock_move.call_args[0][1][Axis.Z_L] == point[2] diff --git a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py index 61ba3f628f4..dfd05477043 100644 --- a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py +++ b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py @@ -6,7 +6,7 @@ from math import pi from subprocess import run, Popen from time import time -from typing import Callable, Coroutine, Dict, List, Optional, Tuple, Union +from typing import Callable, Coroutine, Dict, List, Optional, Tuple, Union, cast import atexit from opentrons_hardware.drivers.can_bus import DriverSettings, build, CanMessenger from opentrons_hardware.drivers.can_bus import settings as can_bus_settings @@ -19,9 +19,9 @@ from opentrons.config.robot_configs import build_config_ot3, load_ot3 as load_ot3_config from opentrons.config.advanced_settings import set_adv_setting from opentrons.hardware_control.types import SubSystem +from opentrons.hardware_control.backends.ot3controller import OT3Controller from opentrons.hardware_control.backends.ot3utils import ( sensor_node_for_mount, - axis_convert, ) # TODO (lc 10-27-2022) This should be changed to an ot3 pipette object once we @@ -207,10 +207,11 @@ async def reset_api(api: OT3API) -> None: """Reset OT3API.""" print(f"Firmware: v{api.fw_version}") if not api.is_simulator: - await api._backend.engage_sync() # type: ignore[union-attr] - await api._backend.release_estop() # type: ignore[union-attr] + backend = cast(OT3Controller, api._backend) + await backend.engage_sync() + await backend.release_estop() await update_firmware(api) - await api._backend.probe_network() # type: ignore[union-attr] + await backend.probe_network() await api.cache_instruments() await api.refresh_positions() @@ -609,18 +610,16 @@ async def move_tip_motor_relative_ot3( if not api.hardware_pipettes[OT3Mount.LEFT.to_mount()]: raise RuntimeError("No pipette found on LEFT mount") - current_gear_pos_float = axis_convert(api._backend.gear_motor_position, 0.0)[ - Axis.P_L - ] + current_gear_pos_float = api._backend.gear_motor_position or 0.0 current_gear_pos_dict = {Axis.Q: current_gear_pos_float} target_pos_dict = {Axis.Q: current_gear_pos_float + distance} if speed is not None and distance < 0: speed *= -1 - tip_motor_move = api._build_moves(current_gear_pos_dict, target_pos_dict) - - _move_coro = api._backend.tip_action(moves=tip_motor_move[0]) + _move_coro = api._backend.tip_action( + current_gear_pos_dict, [(target_pos_dict, speed or 400)] + ) if motor_current is None: await _move_coro else: @@ -864,7 +863,7 @@ async def get_temperature_humidity_ot3( """Get the temperature/humidity reading from the pipette.""" if api.is_simulator: return 25.0, 50.0 - messenger = api._backend._messenger # type: ignore[union-attr] + messenger = cast(OT3Controller, api._backend)._messenger return await _get_temp_humidity(messenger, mount, sensor_id) @@ -908,7 +907,10 @@ async def get_capacitance_ot3( capacitive = sensor_types.CapacitiveSensor.build(sensor_id, node_id) s_driver = sensor_driver.SensorDriver() data = await s_driver.read( - api._backend._messenger, capacitive, offset=False, timeout=2 # type: ignore[union-attr] + cast(OT3Controller, api._backend)._messenger, + capacitive, + offset=False, + timeout=2, ) if data is None: raise SensorResponseBad("no response from sensor") @@ -925,7 +927,7 @@ async def get_pressure_ot3( pressure = sensor_types.PressureSensor.build(sensor_id, node_id) s_driver = sensor_driver.SensorDriver() data = await s_driver.read( - api._backend._messenger, pressure, offset=False, timeout=2 # type: ignore[union-attr] + cast(OT3Controller, api._backend)._messenger, pressure, offset=False, timeout=2 ) if data is None: raise SensorResponseBad("no response from sensor") @@ -1103,7 +1105,7 @@ def get_robot_serial_ot3(api: OT3API) -> str: """Get robot serial number.""" if api.is_simulator: return "FLXA1000000000000" - robot_id = api._backend.eeprom_data.serial_number + robot_id = cast(OT3Controller, api._backend).eeprom_data.serial_number if not robot_id: robot_id = "" return robot_id diff --git a/hardware-testing/hardware_testing/production_qc/gripper_assembly_qc_ot3/test_width.py b/hardware-testing/hardware_testing/production_qc/gripper_assembly_qc_ot3/test_width.py index 453b038313b..4b98189cf74 100644 --- a/hardware-testing/hardware_testing/production_qc/gripper_assembly_qc_ot3/test_width.py +++ b/hardware-testing/hardware_testing/production_qc/gripper_assembly_qc_ot3/test_width.py @@ -1,8 +1,9 @@ """Test Width.""" -from typing import List, Union, Tuple, Optional +from typing import List, Union, Tuple, Optional, cast from opentrons.hardware_control.ot3api import OT3API from opentrons_hardware.firmware_bindings.constants import NodeId +from opentrons.hardware_control.backends.ot3controller import OT3Controller from hardware_testing.data import ui from hardware_testing.data.csv_report import ( @@ -64,7 +65,9 @@ async def _save_result(_width: float, _force: float, _cache_error: bool) -> floa # fake the encoder to be in the right place, during simulation if api.is_simulator: sim_enc_pox = (max_width - width) / 2.0 - api._backend._encoder_position[NodeId.gripper_g] = sim_enc_pox + cast(OT3Controller, api._backend)._encoder_position[ + NodeId.gripper_g + ] = sim_enc_pox await api.refresh_positions() _width_actual = api._gripper_handler.get_gripper().jaw_width assert _width_actual is not None diff --git a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_capacitance.py b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_capacitance.py index b781bb57447..3564bc0d364 100644 --- a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_capacitance.py +++ b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_capacitance.py @@ -1,12 +1,13 @@ """Test Capacitance.""" from asyncio import sleep -from typing import List, Union, Tuple, Optional +from typing import List, Union, Tuple, Optional, cast from opentrons_hardware.hardware_control.tool_sensors import capacitive_probe from opentrons_hardware.firmware_bindings.constants import NodeId, SensorId from opentrons.hardware_control.ot3api import OT3API from opentrons.hardware_control.backends.ot3utils import sensor_id_for_instrument +from opentrons.hardware_control.backends.ot3controller import OT3Controller from opentrons.hardware_control.types import InstrumentProbeType @@ -168,7 +169,7 @@ async def _probe(distance: float, speed: float) -> float: if api.is_simulator: return 0.0 pos = await capacitive_probe( - api._backend._messenger, # type: ignore[union-attr] + cast(OT3Controller, api._backend)._messenger, NodeId.pipette_left, NodeId.head_l, distance=distance, diff --git a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_tip_sensor.py b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_tip_sensor.py index cdada933d4d..2422143e617 100644 --- a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_tip_sensor.py +++ b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_tip_sensor.py @@ -2,7 +2,6 @@ import asyncio from typing import List, Union, cast -from opentrons_hardware.drivers.can_bus.can_messenger import CanMessenger from opentrons_hardware.firmware_bindings import ArbitrationId from opentrons_hardware.firmware_bindings.constants import MessageId from opentrons_hardware.firmware_bindings.messages import MessageDefinition @@ -13,6 +12,7 @@ from opentrons_hardware.firmware_bindings.constants import NodeId from opentrons.hardware_control.ot3api import OT3API +from opentrons.hardware_control.backends.ot3controller import OT3Controller from hardware_testing.data import ui from hardware_testing.data.csv_report import ( @@ -40,7 +40,7 @@ def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]: async def get_tip_status(api: OT3API) -> bool: """Get the tip status for the 96 channel.""" - can_messenger: CanMessenger = api._backend._messenger # type: ignore[union-attr] + can_messenger = cast(OT3Controller, api._backend)._messenger node: NodeId = NodeId.pipette_left event = asyncio.Event() value = 0 diff --git a/hardware-testing/hardware_testing/production_qc/pipette_assembly_qc_ot3/__main__.py b/hardware-testing/hardware_testing/production_qc/pipette_assembly_qc_ot3/__main__.py index f9f60173eed..0f336aa7c50 100644 --- a/hardware-testing/hardware_testing/production_qc/pipette_assembly_qc_ot3/__main__.py +++ b/hardware-testing/hardware_testing/production_qc/pipette_assembly_qc_ot3/__main__.py @@ -8,7 +8,7 @@ import os from pathlib import Path from time import time -from typing import Optional, Callable, List, Any, Tuple, Dict +from typing import Optional, Callable, List, Any, Tuple, Dict, cast from typing_extensions import Final from opentrons_hardware.firmware_bindings.arbitration_id import ArbitrationId @@ -32,6 +32,7 @@ EarlyCapacitiveSenseTrigger, CalibrationStructureNotFoundError, ) +from opentrons.hardware_control.backends.ot3controller import OT3Controller from hardware_testing import data from hardware_testing.drivers.pressure_fixture import ( @@ -1493,7 +1494,7 @@ def _listener(message: MessageDefinition, arb_id: ArbitrationId) -> None: if isinstance(message, PushTipPresenceNotification): event.set() - messenger = api._backend._messenger # type: ignore[union-attr] + messenger = cast(OT3Controller, api._backend)._messenger messenger.add_listener(_listener) try: for i in range(seconds_to_wait): diff --git a/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_connectivity.py b/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_connectivity.py index 8e8beb4b9e4..66e4bb72782 100644 --- a/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_connectivity.py +++ b/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_connectivity.py @@ -1,10 +1,11 @@ """Test Connectivity.""" import asyncio from subprocess import run as run_subprocess -from typing import List, Union, Optional, Tuple +from typing import List, Union, Optional, Tuple, cast import re from opentrons.hardware_control.ot3api import OT3API +from opentrons.hardware_control.backends.ot3controller import OT3Controller from opentrons.system import nmcli from opentrons import config @@ -301,10 +302,11 @@ async def _aux_subtest( api: OT3API, ui_promt: str, pass_states: RearPinState, sync_state: int ) -> Tuple[bool, str]: ui.get_user_ready(ui_promt) - await set_sync_pin(sync_state, api._backend._usb_messenger) # type: ignore[union-attr] - result = await get_all_pin_state(api._backend._usb_messenger) # type: ignore[union-attr] + backend = cast(OT3Controller, api._backend) + await set_sync_pin(sync_state, backend._usb_messenger) + result = await get_all_pin_state(backend._usb_messenger) LOG.info(f"Aux Result: {result}") - await set_sync_pin(0, api._backend._usb_messenger) # type: ignore[union-attr] + await set_sync_pin(0, backend._usb_messenger) # format the state comparison nicely for csv output result_dict = vars(result) diff --git a/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_peripherals.py b/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_peripherals.py index b7dcdd9d3bc..b7ea527955e 100644 --- a/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_peripherals.py +++ b/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_peripherals.py @@ -210,7 +210,7 @@ def _get_user_confirmation(question: str) -> bool: state[2], # blue state[1], # green state[3], # white - api._backend._usb_messenger, # type: ignore[union-attr] + api._backend._usb_messenger, # type: ignore[attr-defined] ) result = _get_user_confirmation(f"is the STATUS-LIGHT {color}") report(section, f"status-light-{color}", [CSVResult.from_bool(result)]) diff --git a/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_signals.py b/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_signals.py index bfbddc6fe53..419a5e6350c 100644 --- a/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_signals.py +++ b/hardware-testing/hardware_testing/production_qc/robot_assembly_qc_ot3/test_signals.py @@ -81,11 +81,11 @@ async def _move_and_interrupt_with_signal(api: OT3API, sig_name: str) -> None: backend: OT3Controller = api._backend # type: ignore[assignment] messenger = backend._messenger if sig_name == "nsync": - engage = api._backend.release_sync # type: ignore[union-attr] - release = api._backend.engage_sync # type: ignore[union-attr] + engage = backend.release_sync + release = backend.engage_sync elif sig_name == "estop": - engage = api._backend.engage_estop # type: ignore[union-attr] - release = api._backend.release_estop # type: ignore[union-attr] + engage = backend.engage_estop + release = backend.release_estop async def _sleep_then_activate_stop_signal() -> None: if "external" in sig_name: diff --git a/hardware-testing/hardware_testing/production_qc/stress_test_qc_ot3.py b/hardware-testing/hardware_testing/production_qc/stress_test_qc_ot3.py index 1ca0c7625a8..05ee42e6765 100644 --- a/hardware-testing/hardware_testing/production_qc/stress_test_qc_ot3.py +++ b/hardware-testing/hardware_testing/production_qc/stress_test_qc_ot3.py @@ -5,7 +5,7 @@ import time from dataclasses import dataclass -from typing import Optional, Callable, List, Any, Tuple, Dict +from typing import Optional, Callable, List, Any, Tuple, Dict, cast from pathlib import Path from opentrons.config.defaults_ot3 import ( @@ -15,6 +15,7 @@ DEFAULT_HOLD_CURRENT, ) from opentrons.hardware_control.ot3api import OT3API +from opentrons.hardware_control.backends.ot3controller import OT3Controller from hardware_testing.opentrons_api import types from hardware_testing.opentrons_api.types import Axis, OT3Mount, Point, OT3AxisKind @@ -599,7 +600,9 @@ async def _run_z_motion( run_current=setting[z_ax].run_current, hold_current=setting[z_ax].hold_current, # NOTE: only set this for Z axes ) - LOG.info(f"Motor Current Settings: {api._backend._current_settings}") + LOG.info( + f"Motor Current Settings: {cast(OT3Controller, api._backend)._current_settings}" + ) fail_count = 0 pass_count = 0 for i in range(arguments.cycles): @@ -678,7 +681,9 @@ async def _run_xy_motion( api.gantry_load, run_current=setting[ax].run_current, ) - LOG.info(f"Motor Current Settings: {api._backend._current_settings}") + LOG.info( + f"Motor Current Settings: {cast(OT3Controller, api._backend)._current_settings}" + ) fail_count = 0 pass_count = 0 for i in range(max(int(arguments.cycles / 2), 1)): @@ -801,7 +806,7 @@ async def get_test_metadata( """Get the operator name and robot serial number.""" if arguments.no_input: _operator = args.operator if isinstance(args.operator, str) else "None" - _robot_id = api._backend.eeprom_data.serial_number + _robot_id = cast(OT3Controller, api._backend).eeprom_data.serial_number if not _robot_id: ui.print_error("no serial number saved on this robot") _robot_id = "None" @@ -810,7 +815,7 @@ async def get_test_metadata( _robot_id = "ot3-simulated-A01" _operator = "simulation" else: - _robot_id = api._backend.eeprom_data.serial_number + _robot_id = cast(OT3Controller, api._backend).eeprom_data.serial_number if not _robot_id: ui.print_error("no serial number saved on this robot") _robot_id = input("enter ROBOT SERIAL number: ").strip() @@ -896,7 +901,9 @@ async def _main(arguments: argparse.Namespace) -> None: hold_current=DEFAULT_Z_CURRENT, # NOTE: only set this for Z axes ) LOG.info(DEFAULT_Z_CURRENT) - LOG.info(f"Motor Current Settings: {api._backend._current_settings}") + LOG.info( + f"Motor Current Settings: {cast(OT3Controller, api._backend)._current_settings}" + ) qc_pass = await _run_gantry_cycles( arguments, diff --git a/hardware-testing/hardware_testing/scripts/gripper_to_adapter_tolerance.py b/hardware-testing/hardware_testing/scripts/gripper_to_adapter_tolerance.py index 04a0dc17596..1db172032ee 100644 --- a/hardware-testing/hardware_testing/scripts/gripper_to_adapter_tolerance.py +++ b/hardware-testing/hardware_testing/scripts/gripper_to_adapter_tolerance.py @@ -1,9 +1,10 @@ """Gripper-to-Adapter Tolerance.""" import argparse import asyncio -from typing import List +from typing import List, cast from opentrons_hardware.hardware_control.gripper_settings import set_error_tolerance +from opentrons.hardware_control.backends.ot3controller import OT3Controller from opentrons.hardware_control.ot3api import OT3API @@ -152,7 +153,7 @@ async def _main( # so disable gripper collision detection. if not api.is_simulator: await set_error_tolerance( - api._backend._messenger, # type: ignore[union-attr] + cast(OT3Controller, api._backend)._messenger, max_pos_error=0.1, max_unwanted_movement=50.0, # much bigger than gripper's jaw width )