diff --git a/hardware/opentrons_hardware/firmware_bindings/constants.py b/hardware/opentrons_hardware/firmware_bindings/constants.py index cd91ced91b7..4dd7c759782 100644 --- a/hardware/opentrons_hardware/firmware_bindings/constants.py +++ b/hardware/opentrons_hardware/firmware_bindings/constants.py @@ -4,6 +4,7 @@ by default. Please do not unconditionally import things outside the python standard library. """ + from enum import Enum, unique from typing import Union, Dict, List @@ -294,6 +295,7 @@ class ErrorCode(int, Enum): door_open = 0x0E reed_open = 0x0F motor_driver_error_detected = 0x10 + safety_relay_inactive = 0x11 @unique diff --git a/hardware/opentrons_hardware/firmware_bindings/messages/payloads.py b/hardware/opentrons_hardware/firmware_bindings/messages/payloads.py index c351495ba5b..50890187fe8 100644 --- a/hardware/opentrons_hardware/firmware_bindings/messages/payloads.py +++ b/hardware/opentrons_hardware/firmware_bindings/messages/payloads.py @@ -1,4 +1,5 @@ """Payloads of can bus messages.""" + # TODO (amit, 2022-01-26): Figure out why using annotations import ruins # dataclass fields interpretation. # from __future__ import annotations @@ -684,6 +685,7 @@ class GetHepaUVStatePayloadResponse(EmptyPayload): uv_light_on: utils.UInt8Field remaining_time_s: utils.UInt32Field uv_current_ma: utils.UInt16Field + safety_relay_active: utils.UInt8Field @dataclass(eq=False) diff --git a/hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py b/hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py index e51e48fd8ad..602efc53dcf 100644 --- a/hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py +++ b/hardware/opentrons_hardware/hardware_control/hepa_uv_settings.py @@ -1,4 +1,5 @@ """Utilities for controlling the hepa/uv extension module.""" + import logging import asyncio from typing import Optional @@ -46,6 +47,7 @@ class HepaUVState: uv_duration_s: int remaining_time_s: int uv_current_ma: int + safety_relay_active: bool async def set_hepa_fan_state( @@ -136,6 +138,7 @@ def _listener(message: MessageDefinition, arb_id: ArbitrationId) -> None: uv_duration_s=int(message.payload.uv_duration_s.value), remaining_time_s=int(message.payload.remaining_time_s.value), uv_current_ma=int(message.payload.uv_current_ma.value), + safety_relay_active=bool(message.payload.safety_relay_active.value), ) def _filter(arb_id: ArbitrationId) -> bool: diff --git a/hardware/tests/opentrons_hardware/hardware_control/test_hepauv_settings.py b/hardware/tests/opentrons_hardware/hardware_control/test_hepauv_settings.py index dcaf85a8653..ad188bd153c 100644 --- a/hardware/tests/opentrons_hardware/hardware_control/test_hepauv_settings.py +++ b/hardware/tests/opentrons_hardware/hardware_control/test_hepauv_settings.py @@ -55,6 +55,7 @@ def create_hepa_uv_state_response( duration: int, remaining_time: int, uv_current: int, + safety_relay_active: bool, ) -> MessageDefinition: """Create a GetHepaUVStateResponse.""" return md.GetHepaUVStateResponse( @@ -63,6 +64,7 @@ def create_hepa_uv_state_response( uv_duration_s=UInt32Field(duration), remaining_time_s=UInt32Field(remaining_time), uv_current_ma=UInt16Field(uv_current), + safety_relay_active=UInt8Field(safety_relay_active), ) ) @@ -162,12 +164,29 @@ def responder( [ ( NodeId.host, - create_hepa_uv_state_response(True, 900, 300, 3300), + create_hepa_uv_state_response(True, 900, 300, 3300, True), + NodeId.hepa_uv, + ), + ( + NodeId.host, + create_hepa_uv_state_response(True, 0, 0, 33000, True), + NodeId.hepa_uv, + ), + ( + NodeId.host, + create_hepa_uv_state_response(True, 0, 0, 33000, False), + NodeId.hepa_uv, + ), + ( + NodeId.host, + create_hepa_uv_state_response(False, 0, 0, 0, True), + NodeId.hepa_uv, + ), + ( + NodeId.host, + create_hepa_uv_state_response(False, 900, 0, 0, False), NodeId.hepa_uv, ), - (NodeId.host, create_hepa_uv_state_response(True, 0, 0, 33000), NodeId.hepa_uv), - (NodeId.host, create_hepa_uv_state_response(False, 0, 0, 0), NodeId.hepa_uv), - (NodeId.host, create_hepa_uv_state_response(False, 900, 0, 0), NodeId.hepa_uv), ], ) async def test_get_hepa_uv_state( @@ -202,6 +221,7 @@ def responder( int(payload.uv_duration_s.value), int(payload.remaining_time_s.value), int(payload.uv_current_ma.value), + bool(payload.safety_relay_active.value), ) == res )