diff --git a/api/src/opentrons/hardware_control/backends/ot3controller.py b/api/src/opentrons/hardware_control/backends/ot3controller.py index bb3377d375a..0933c11750f 100644 --- a/api/src/opentrons/hardware_control/backends/ot3controller.py +++ b/api/src/opentrons/hardware_control/backends/ot3controller.py @@ -43,6 +43,8 @@ create_tip_action_group, PipetteAction, sub_system_to_node_id, + NODEID_SUBSYSTEM, + USBTARGET_SUBSYSTEM, ) try: @@ -112,6 +114,7 @@ UpdateStatus, mount_to_subsystem, DoorState, + OT3SubSystem, ) from opentrons.hardware_control.errors import ( MustHomeError, @@ -244,9 +247,18 @@ def initialized(self, value: bool) -> None: self._initialized = value @property - def fw_version(self) -> Optional[str]: + def fw_version(self) -> Dict[OT3SubSystem, int]: """Get the firmware version.""" - return None + subsystem_map: Dict[Union[NodeId, USBTarget], OT3SubSystem] = deepcopy( + cast(Dict[Union[NodeId, USBTarget], OT3SubSystem], USBTARGET_SUBSYSTEM) + ) + subsystem_map.update( + cast(Dict[Union[NodeId, USBTarget], OT3SubSystem], NODEID_SUBSYSTEM) + ) + return { + subsystem_map[node.application_for()]: device.version + for node, device in self._network_info.device_info.items() + } @property def update_required(self) -> bool: diff --git a/api/src/opentrons/hardware_control/backends/ot3simulator.py b/api/src/opentrons/hardware_control/backends/ot3simulator.py index 35a4f881617..40c8c953218 100644 --- a/api/src/opentrons/hardware_control/backends/ot3simulator.py +++ b/api/src/opentrons/hardware_control/backends/ot3simulator.py @@ -31,6 +31,7 @@ create_gripper_jaw_home_group, create_tip_action_group, PipetteAction, + NODEID_SUBSYSTEM, ) from opentrons_hardware.firmware_bindings.constants import ( @@ -56,6 +57,7 @@ MotorStatus, PipetteSubType, UpdateStatus, + OT3SubSystem, ) from opentrons_hardware.hardware_control.motion import MoveStopCondition from opentrons_hardware.hardware_control import status_bar @@ -483,9 +485,11 @@ def axis_bounds(self) -> OT3AxisMap[Tuple[float, float]]: } @property - def fw_version(self) -> Optional[str]: + def fw_version(self) -> Dict[OT3SubSystem, int]: """Get the firmware version.""" - return None + return { + NODEID_SUBSYSTEM[node.application_for()]: 0 for node in self._present_nodes + } @property def update_required(self) -> bool: diff --git a/api/src/opentrons/hardware_control/backends/ot3utils.py b/api/src/opentrons/hardware_control/backends/ot3utils.py index 0867ae27874..d9cc40f3065 100644 --- a/api/src/opentrons/hardware_control/backends/ot3utils.py +++ b/api/src/opentrons/hardware_control/backends/ot3utils.py @@ -22,6 +22,7 @@ PipetteType, SensorId, PipetteTipActionType, + USBTarget, ) from opentrons_hardware.firmware_update.types import FirmwareUpdateStatus, StatusElement from opentrons_hardware.hardware_control.motion_planning import ( @@ -68,6 +69,25 @@ OT3SubSystem.gripper: NodeId.gripper, } +NODEID_SUBSYSTEM: Dict[NodeId, OT3SubSystem] = { + NodeId.gantry_x: OT3SubSystem.gantry_x, + NodeId.gantry_x_bootloader: OT3SubSystem.gantry_x, + NodeId.gantry_y: OT3SubSystem.gantry_y, + NodeId.gantry_y_bootloader: OT3SubSystem.gantry_y, + NodeId.head: OT3SubSystem.head, + NodeId.head_bootloader: OT3SubSystem.head, + NodeId.pipette_left: OT3SubSystem.pipette_left, + NodeId.pipette_left_bootloader: OT3SubSystem.pipette_left, + NodeId.pipette_right: OT3SubSystem.pipette_right, + NodeId.pipette_right_bootloader: OT3SubSystem.pipette_right, + NodeId.gripper: OT3SubSystem.gripper, + NodeId.gripper_bootloader: OT3SubSystem.gripper, +} + +USBTARGET_SUBSYSTEM: Dict[USBTarget, OT3SubSystem] = { + USBTarget.rear_panel: OT3SubSystem.rear_panel +} + def axis_nodes() -> List["NodeId"]: return [ diff --git a/api/src/opentrons/hardware_control/ot3api.py b/api/src/opentrons/hardware_control/ot3api.py index 5d214c68d7f..2ef9c1c64e2 100644 --- a/api/src/opentrons/hardware_control/ot3api.py +++ b/api/src/opentrons/hardware_control/ot3api.py @@ -385,10 +385,11 @@ def get_fw_version(self) -> str: Return the firmware version of the connected hardware. """ from_backend = self._backend.fw_version - if from_backend is None: + uniques = set(version for version in from_backend.values()) + if not from_backend: return "unknown" else: - return from_backend + return ", ".join(str(version) for version in uniques) @property def fw_version(self) -> str: diff --git a/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py b/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py index beb16fea4aa..068cbd48cff 100644 --- a/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py +++ b/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py @@ -32,6 +32,7 @@ OT3Mount, OT3AxisMap, MotorStatus, + OT3SubSystem, ) from opentrons.hardware_control.errors import ( FirmwareUpdateRequired, @@ -982,7 +983,7 @@ async def test_update_firmware_update_required( async def test_update_firmware_up_to_date( controller: OT3Controller, fw_update_info: Dict[NodeId, str], -): +) -> None: """Test that updates are not started if they are not required.""" with mock.patch( "opentrons_hardware.firmware_update.RunUpdate.run_updates" @@ -1003,7 +1004,7 @@ async def test_update_firmware_specified_nodes( controller: OT3Controller, fw_node_info: Dict[NodeId, DeviceInfoCache], fw_update_info: Dict[NodeId, str], -): +) -> None: """Test that updates are started if nodes are NOT out-of-date when nodes are specified.""" for node_cache in fw_node_info.values(): node_cache.shortsha = "978abcde" @@ -1046,7 +1047,7 @@ async def test_update_firmware_invalid_specified_node( controller: OT3Controller, fw_node_info: Dict[NodeId, DeviceInfoCache], fw_update_info: Dict[FirmwareUpdateType, UpdateInfo], -): +) -> None: """Test that only nodes in device_info_cache are updated when nodes are specified.""" check_fw_update_return = { NodeId.head: (1, "/some/path/head.hex"), @@ -1082,7 +1083,7 @@ async def test_update_firmware_progress( controller: OT3Controller, fw_node_info: Dict[NodeId, DeviceInfoCache], fw_update_info: Dict[FirmwareUpdateType, UpdateInfo], -): +) -> None: """Test that the progress is reported for nodes updating.""" controller._network_info._device_info_cache = fw_node_info @@ -1119,3 +1120,40 @@ async def _fake_update_progress( assert not controller.update_required assert controller._update_tracker is None probe.assert_called_once() + + +@pytest.mark.parametrize("versions", [(1, 2, 3), (1, 1, 1), (1, 2, 2)]) +def test_fw_versions(controller: OT3Controller, versions: Tuple[int, int, int]) -> None: + info = { + NodeId.head: DeviceInfoCache( + NodeId.head, + versions[0], + "12345678", + None, + PCBARevision(None), + subidentifier=0, + ), + NodeId.gantry_y: DeviceInfoCache( + NodeId.gantry_y, + versions[1], + "12345678", + None, + PCBARevision(None), + subidentifier=0, + ), + NodeId.pipette_right_bootloader: DeviceInfoCache( + NodeId.pipette_right_bootloader, + versions[2], + "12345678", + None, + PCBARevision(None), + subidentifier=2, + ), + } + + controller._network_info._device_info_cache = info + assert controller.fw_version == { + OT3SubSystem.head: versions[0], + OT3SubSystem.gantry_y: versions[1], + OT3SubSystem.pipette_right: versions[2], + } diff --git a/api/tests/opentrons/hardware_control/test_ot3_api.py b/api/tests/opentrons/hardware_control/test_ot3_api.py index 429149096c7..892bdd67e8e 100644 --- a/api/tests/opentrons/hardware_control/test_ot3_api.py +++ b/api/tests/opentrons/hardware_control/test_ot3_api.py @@ -4,7 +4,7 @@ from typing_extensions import Literal from math import copysign import pytest -from mock import AsyncMock, patch, Mock, call +from mock import AsyncMock, patch, Mock, call, PropertyMock from opentrons.config.types import ( GantryLoad, CapacitivePassSettings, @@ -30,6 +30,7 @@ InstrumentProbeType, LiquidNotFound, EarlyLiquidSenseTrigger, + OT3SubSystem, ) from opentrons.hardware_control.errors import ( GripperNotAttachedError, @@ -1210,3 +1211,32 @@ async def test_light_settings( check = await ot3_hardware.get_lights() assert check["rails"] != setting assert not check["button"] + + +@pytest.mark.parametrize( + "versions,version_str", + [ + ({}, "unknown"), + ({OT3SubSystem.pipette_right: 2}, "2"), + ( + { + OT3SubSystem.pipette_left: 2, + OT3SubSystem.gantry_x: 2, + OT3SubSystem.gantry_y: 2, + }, + "2", + ), + ({OT3SubSystem.gripper: 3, OT3SubSystem.head: 1}, "1, 3"), + ], +) +def test_fw_version( + ot3_hardware: ThreadManager[OT3API], + versions: Dict[OT3SubSystem, int], + version_str: str, +) -> None: + with patch( + "opentrons.hardware_control.ot3api.OT3Simulator.fw_version", + new_callable=PropertyMock, + ) as mock_fw_version: + mock_fw_version.return_value = versions + assert ot3_hardware.get_fw_version() == version_str