Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): expose versions of connected micros #12470

Merged
merged 2 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
create_tip_action_group,
PipetteAction,
sub_system_to_node_id,
NODEID_SUBSYSTEM,
USBTARGET_SUBSYSTEM,
)

try:
Expand Down Expand Up @@ -112,6 +114,7 @@
UpdateStatus,
mount_to_subsystem,
DoorState,
OT3SubSystem,
)
from opentrons.hardware_control.errors import (
MustHomeError,
Expand Down Expand Up @@ -244,9 +247,18 @@ def initialized(self, value: bool) -> None:
self._initialized = value

@property
def fw_version(self) -> Optional[str]:
def fw_version(self) -> Dict[OT3SubSystem, int]:
"""Get the firmware version."""
return None
subsystem_map: Dict[Union[NodeId, USBTarget], OT3SubSystem] = deepcopy(
cast(Dict[Union[NodeId, USBTarget], OT3SubSystem], USBTARGET_SUBSYSTEM)
)
subsystem_map.update(
cast(Dict[Union[NodeId, USBTarget], OT3SubSystem], NODEID_SUBSYSTEM)
)
return {
subsystem_map[node.application_for()]: device.version
for node, device in self._network_info.device_info.items()
}

@property
def update_required(self) -> bool:
Expand Down
8 changes: 6 additions & 2 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
create_gripper_jaw_home_group,
create_tip_action_group,
PipetteAction,
NODEID_SUBSYSTEM,
)

from opentrons_hardware.firmware_bindings.constants import (
Expand All @@ -56,6 +57,7 @@
MotorStatus,
PipetteSubType,
UpdateStatus,
OT3SubSystem,
)
from opentrons_hardware.hardware_control.motion import MoveStopCondition
from opentrons_hardware.hardware_control import status_bar
Expand Down Expand Up @@ -483,9 +485,11 @@ def axis_bounds(self) -> OT3AxisMap[Tuple[float, float]]:
}

@property
def fw_version(self) -> Optional[str]:
def fw_version(self) -> Dict[OT3SubSystem, int]:
"""Get the firmware version."""
return None
return {
NODEID_SUBSYSTEM[node.application_for()]: 0 for node in self._present_nodes
}

@property
def update_required(self) -> bool:
Expand Down
20 changes: 20 additions & 0 deletions api/src/opentrons/hardware_control/backends/ot3utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
PipetteType,
SensorId,
PipetteTipActionType,
USBTarget,
)
from opentrons_hardware.firmware_update.types import FirmwareUpdateStatus, StatusElement
from opentrons_hardware.hardware_control.motion_planning import (
Expand Down Expand Up @@ -68,6 +69,25 @@
OT3SubSystem.gripper: NodeId.gripper,
}

NODEID_SUBSYSTEM: Dict[NodeId, OT3SubSystem] = {
NodeId.gantry_x: OT3SubSystem.gantry_x,
NodeId.gantry_x_bootloader: OT3SubSystem.gantry_x,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not super relevant to this PR, but should we be throwing an error somewhere in the hardware controller if one of the microcontrollers is in a bootloader state? Or, at the very least, should we not mask whether a node id is a bootloader?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we should. i think we need to overhaul that at some point.

NodeId.gantry_y: OT3SubSystem.gantry_y,
NodeId.gantry_y_bootloader: OT3SubSystem.gantry_y,
NodeId.head: OT3SubSystem.head,
NodeId.head_bootloader: OT3SubSystem.head,
NodeId.pipette_left: OT3SubSystem.pipette_left,
NodeId.pipette_left_bootloader: OT3SubSystem.pipette_left,
NodeId.pipette_right: OT3SubSystem.pipette_right,
NodeId.pipette_right_bootloader: OT3SubSystem.pipette_right,
NodeId.gripper: OT3SubSystem.gripper,
NodeId.gripper_bootloader: OT3SubSystem.gripper,
}

USBTARGET_SUBSYSTEM: Dict[USBTarget, OT3SubSystem] = {
USBTarget.rear_panel: OT3SubSystem.rear_panel
}


def axis_nodes() -> List["NodeId"]:
return [
Expand Down
5 changes: 3 additions & 2 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,11 @@ def get_fw_version(self) -> str:
Return the firmware version of the connected hardware.
"""
from_backend = self._backend.fw_version
if from_backend is None:
uniques = set(version for version in from_backend.values())
if not from_backend:
return "unknown"
else:
return from_backend
return ", ".join(str(version) for version in uniques)

@property
def fw_version(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
OT3Mount,
OT3AxisMap,
MotorStatus,
OT3SubSystem,
)
from opentrons.hardware_control.errors import (
FirmwareUpdateRequired,
Expand Down Expand Up @@ -982,7 +983,7 @@ async def test_update_firmware_update_required(
async def test_update_firmware_up_to_date(
controller: OT3Controller,
fw_update_info: Dict[NodeId, str],
):
) -> None:
"""Test that updates are not started if they are not required."""
with mock.patch(
"opentrons_hardware.firmware_update.RunUpdate.run_updates"
Expand All @@ -1003,7 +1004,7 @@ async def test_update_firmware_specified_nodes(
controller: OT3Controller,
fw_node_info: Dict[NodeId, DeviceInfoCache],
fw_update_info: Dict[NodeId, str],
):
) -> None:
"""Test that updates are started if nodes are NOT out-of-date when nodes are specified."""
for node_cache in fw_node_info.values():
node_cache.shortsha = "978abcde"
Expand Down Expand Up @@ -1046,7 +1047,7 @@ async def test_update_firmware_invalid_specified_node(
controller: OT3Controller,
fw_node_info: Dict[NodeId, DeviceInfoCache],
fw_update_info: Dict[FirmwareUpdateType, UpdateInfo],
):
) -> None:
"""Test that only nodes in device_info_cache are updated when nodes are specified."""
check_fw_update_return = {
NodeId.head: (1, "/some/path/head.hex"),
Expand Down Expand Up @@ -1082,7 +1083,7 @@ async def test_update_firmware_progress(
controller: OT3Controller,
fw_node_info: Dict[NodeId, DeviceInfoCache],
fw_update_info: Dict[FirmwareUpdateType, UpdateInfo],
):
) -> None:
"""Test that the progress is reported for nodes updating."""
controller._network_info._device_info_cache = fw_node_info

Expand Down Expand Up @@ -1119,3 +1120,40 @@ async def _fake_update_progress(
assert not controller.update_required
assert controller._update_tracker is None
probe.assert_called_once()


@pytest.mark.parametrize("versions", [(1, 2, 3), (1, 1, 1), (1, 2, 2)])
def test_fw_versions(controller: OT3Controller, versions: Tuple[int, int, int]) -> None:
info = {
NodeId.head: DeviceInfoCache(
NodeId.head,
versions[0],
"12345678",
None,
PCBARevision(None),
subidentifier=0,
),
NodeId.gantry_y: DeviceInfoCache(
NodeId.gantry_y,
versions[1],
"12345678",
None,
PCBARevision(None),
subidentifier=0,
),
NodeId.pipette_right_bootloader: DeviceInfoCache(
NodeId.pipette_right_bootloader,
versions[2],
"12345678",
None,
PCBARevision(None),
subidentifier=2,
),
}

controller._network_info._device_info_cache = info
assert controller.fw_version == {
OT3SubSystem.head: versions[0],
OT3SubSystem.gantry_y: versions[1],
OT3SubSystem.pipette_right: versions[2],
}
32 changes: 31 additions & 1 deletion api/tests/opentrons/hardware_control/test_ot3_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing_extensions import Literal
from math import copysign
import pytest
from mock import AsyncMock, patch, Mock, call
from mock import AsyncMock, patch, Mock, call, PropertyMock
from opentrons.config.types import (
GantryLoad,
CapacitivePassSettings,
Expand All @@ -30,6 +30,7 @@
InstrumentProbeType,
LiquidNotFound,
EarlyLiquidSenseTrigger,
OT3SubSystem,
)
from opentrons.hardware_control.errors import (
GripperNotAttachedError,
Expand Down Expand Up @@ -1210,3 +1211,32 @@ async def test_light_settings(
check = await ot3_hardware.get_lights()
assert check["rails"] != setting
assert not check["button"]


@pytest.mark.parametrize(
"versions,version_str",
[
({}, "unknown"),
({OT3SubSystem.pipette_right: 2}, "2"),
(
{
OT3SubSystem.pipette_left: 2,
OT3SubSystem.gantry_x: 2,
OT3SubSystem.gantry_y: 2,
},
"2",
),
({OT3SubSystem.gripper: 3, OT3SubSystem.head: 1}, "1, 3"),
],
)
def test_fw_version(
ot3_hardware: ThreadManager[OT3API],
versions: Dict[OT3SubSystem, int],
version_str: str,
) -> None:
with patch(
"opentrons.hardware_control.ot3api.OT3Simulator.fw_version",
new_callable=PropertyMock,
) as mock_fw_version:
mock_fw_version.return_value = versions
assert ot3_hardware.get_fw_version() == version_str