diff --git a/hardware/opentrons_hardware/firmware_bindings/constants.py b/hardware/opentrons_hardware/firmware_bindings/constants.py index 3b24a77ced7..99d7db0f792 100644 --- a/hardware/opentrons_hardware/firmware_bindings/constants.py +++ b/hardware/opentrons_hardware/firmware_bindings/constants.py @@ -101,6 +101,8 @@ class MessageId(int, Enum): pipette_info_response = 0x307 gripper_info_response = 0x308 set_serial_number = 0x30A + get_motor_usage_request = 0x30B + get_motor_usage_response = 0x30C stop_request = 0x00 diff --git a/hardware/opentrons_hardware/firmware_bindings/messages/message_definitions.py b/hardware/opentrons_hardware/firmware_bindings/messages/message_definitions.py index eee2e6bd096..593e03a39b7 100644 --- a/hardware/opentrons_hardware/firmware_bindings/messages/message_definitions.py +++ b/hardware/opentrons_hardware/firmware_bindings/messages/message_definitions.py @@ -829,3 +829,25 @@ class PushTipPresenceNotification(BaseMessage): message_id: Literal[ MessageId.tip_presence_notification ] = MessageId.tip_presence_notification + + +@dataclass +class GetMotorUsageRequest(EmptyPayloadMessage): + """Prompt a motor to send it's total lifetime usage.""" + + message_id: Literal[ + MessageId.get_motor_usage_request + ] = MessageId.get_motor_usage_request + + +@dataclass +class GetMotorUsageResponse(BaseMessage): + """Motor response with total lifetime usage.""" + + payload: payloads.GetMotorUsageResponsePayload + payload_type: Type[ + payloads.GetMotorUsageResponsePayload + ] = payloads.GetMotorUsageResponsePayload + message_id: Literal[ + MessageId.get_motor_usage_response + ] = MessageId.get_motor_usage_response diff --git a/hardware/opentrons_hardware/firmware_bindings/messages/messages.py b/hardware/opentrons_hardware/firmware_bindings/messages/messages.py index 4db270ee146..b8224558139 100644 --- a/hardware/opentrons_hardware/firmware_bindings/messages/messages.py +++ b/hardware/opentrons_hardware/firmware_bindings/messages/messages.py @@ -92,6 +92,8 @@ defs.SetGripperErrorTolerance, defs.PushTipPresenceNotification, defs.TipStatusQueryRequest, + defs.GetMotorUsageRequest, + defs.GetMotorUsageResponse, ] diff --git a/hardware/opentrons_hardware/firmware_bindings/messages/payloads.py b/hardware/opentrons_hardware/firmware_bindings/messages/payloads.py index f69fe561a0d..c34d56f3708 100644 --- a/hardware/opentrons_hardware/firmware_bindings/messages/payloads.py +++ b/hardware/opentrons_hardware/firmware_bindings/messages/payloads.py @@ -568,3 +568,10 @@ class SerialNumberPayload(EmptyPayload): """A payload with a serial number.""" serial: SerialField + + +@dataclass(eq=False) +class GetMotorUsageResponsePayload(EmptyPayload): + """A payload with motor lifetime usage.""" + + distance_um: utils.UInt64Field diff --git a/hardware/opentrons_hardware/scripts/dump_eeprom.py b/hardware/opentrons_hardware/scripts/dump_eeprom.py new file mode 100644 index 00000000000..28d7c2aac38 --- /dev/null +++ b/hardware/opentrons_hardware/scripts/dump_eeprom.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +"""blow away and reset EEPROM file systems.""" + +import asyncio +import logging +import argparse +from logging.config import dictConfig +from typing import Dict, Any +from typing_extensions import Final + +from opentrons_hardware.drivers.can_bus import build, CanMessenger +from opentrons_hardware.firmware_bindings import utils, ArbitrationId +from opentrons_hardware.firmware_bindings.constants import MessageId +from opentrons_hardware.firmware_bindings.messages import ( + MessageDefinition, + message_definitions, + payloads, +) +from opentrons_hardware.firmware_bindings.constants import NodeId +from opentrons_hardware.scripts.can_args import add_can_args, build_settings + +logger = logging.getLogger(__name__) + +LOG_CONFIG = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "basic": {"format": "%(asctime)s %(name)s %(levelname)s %(message)s"} + }, + "handlers": { + "stream_handler": { + "class": "logging.StreamHandler", + "formatter": "basic", + "level": logging.DEBUG, + }, + }, + "loggers": { + "": { + "handlers": ["stream_handler"], + "level": logging.DEBUG, + }, + }, +} + +TARGETS: Final[Dict[str, NodeId]] = { + "head": NodeId.head, + "gantry-x": NodeId.gantry_x, + "gantry-y": NodeId.gantry_y, + "pipette-left": NodeId.pipette_left, + "pipette-right": NodeId.pipette_right, + "gripper": NodeId.gripper, +} + + +async def run(args: argparse.Namespace) -> None: + """Script entrypoint.""" + with open(args.file, "w") as f: + + def handle_read_resp( + message: MessageDefinition, arbitration_id: ArbitrationId + ) -> None: + """Called by can messenger when a message arrives.""" + if isinstance(message, message_definitions.ReadFromEEPromResponse): + f.write( + "".join("{:02x} ".format(x) for x in message.payload.data.value) + ) + f.write("\n") + + async with build.driver(build_settings(args)) as driver, CanMessenger( + driver + ) as messenger: + messenger.add_listener( + handle_read_resp, + lambda arbitration_id: bool( + arbitration_id.parts.message_id == MessageId.read_eeprom_response + ), + ) + + await dump_eeprom( + messenger, + TARGETS[args.target], + 256 if args.old_version else 16384, + ) + + +async def dump_eeprom(messenger: CanMessenger, node: NodeId, limit: int) -> None: + """Wipe out all of the data used for the general purpose file system.""" + start = 0 + max_read = 8 + while start < limit: + read_len = min(max_read, limit - start) + read_msg = message_definitions.ReadFromEEPromRequest( + payload=payloads.EEPromReadPayload( + address=utils.UInt16Field(start), + data_length=utils.UInt16Field(read_len), + ) + ) + start += read_len + await messenger.send(node, read_msg) + + +def main() -> None: + """Entry point.""" + parser = argparse.ArgumentParser(description=__doc__) + + add_can_args(parser) + parser.add_argument( + "--target", + help="The FW subsystem to be cleared.", + type=str, + required=True, + choices=TARGETS.keys(), + ) + parser.add_argument( + "--file", + help="file where to save the dump file.", + type=str, + default="/var/log/eeprom_dump.hex", + required=False, + ) + parser.add_argument( + "--less-logs", + help="Set log level to INFO, so we see less logs.", + action="store_true", + default=False, + ) + parser.add_argument( + "--old-version", + help="Enable this flag to clear eeprom on the older 256 Byte eeproms.", + action="store_true", + default=False, + ) + + args = parser.parse_args() + + def _set_log_lvl_warn(d: Dict[str, Any]) -> None: + for k in d.keys(): + if isinstance(d[k], dict): + _set_log_lvl_warn(d[k]) + elif k == "level": + d[k] = logging.WARNING + + if args.less_logs: + _set_log_lvl_warn(LOG_CONFIG) + dictConfig(LOG_CONFIG) + asyncio.run(run(args)) + + +if __name__ == "__main__": + main() diff --git a/hardware/opentrons_hardware/scripts/reset_eeprom.py b/hardware/opentrons_hardware/scripts/reset_eeprom.py new file mode 100644 index 00000000000..fe0f3f628d5 --- /dev/null +++ b/hardware/opentrons_hardware/scripts/reset_eeprom.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +"""blow away and reset EEPROM file systems.""" + +import asyncio +import logging +import argparse +from logging.config import dictConfig +from typing import Dict, Any +from typing_extensions import Final + +from opentrons_hardware.drivers.can_bus import build, CanMessenger +from opentrons_hardware.firmware_bindings import utils +from opentrons_hardware.firmware_bindings.messages import ( + message_definitions, + payloads, + fields, +) +from opentrons_hardware.firmware_bindings.constants import NodeId +from opentrons_hardware.scripts.can_args import add_can_args, build_settings + +logger = logging.getLogger(__name__) + +LOG_CONFIG = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "basic": {"format": "%(asctime)s %(name)s %(levelname)s %(message)s"} + }, + "handlers": { + "stream_handler": { + "class": "logging.StreamHandler", + "formatter": "basic", + "level": logging.DEBUG, + }, + }, + "loggers": { + "": { + "handlers": ["stream_handler"], + "level": logging.DEBUG, + }, + }, +} + +TARGETS: Final[Dict[str, NodeId]] = { + "head": NodeId.head, + "gantry-x": NodeId.gantry_x, + "gantry-y": NodeId.gantry_y, + "pipette-left": NodeId.pipette_left, + "pipette-right": NodeId.pipette_right, + "gripper": NodeId.gripper, +} + + +async def run(args: argparse.Namespace) -> None: + """Script entrypoint.""" + async with build.driver(build_settings(args)) as driver, CanMessenger( + driver + ) as messenger: + await clear_eeprom( + messenger, + TARGETS[args.target], + 256 if args.old_version else 16384, + "0000000000000000" if args.old_version else "FFFFFFFFFFFFFFFF", + ) + + +async def clear_eeprom( + messenger: CanMessenger, node: NodeId, limit: int, filler: str +) -> None: + """Wipe out all of the data used for the general purpose file system.""" + start = 28 + max_write = 8 + while start < limit: + write_len = min(max_write, limit - start) + write_msg = message_definitions.WriteToEEPromRequest( + payload=payloads.EEPromDataPayload( + address=utils.UInt16Field(start), + data_length=utils.UInt16Field(write_len), + data=fields.EepromDataField.from_string(filler), + ) + ) + start += write_len + await messenger.ensure_send(node, write_msg, expected_nodes=[node]) + + +def main() -> None: + """Entry point.""" + parser = argparse.ArgumentParser(description=__doc__) + + add_can_args(parser) + parser.add_argument( + "--target", + help="The FW subsystem to be cleared.", + type=str, + required=True, + choices=TARGETS.keys(), + ) + parser.add_argument( + "--less-logs", + help="Set log level to INFO, so we see less logs.", + action="store_true", + default=False, + ) + parser.add_argument( + "--old-version", + help="Enable this flag to clear eeprom on the older 256 Byte eeproms.", + action="store_true", + default=False, + ) + + args = parser.parse_args() + + def _set_log_lvl_warn(d: Dict[str, Any]) -> None: + for k in d.keys(): + if isinstance(d[k], dict): + _set_log_lvl_warn(d[k]) + elif k == "level": + d[k] = logging.WARNING + + if args.less_logs: + _set_log_lvl_warn(LOG_CONFIG) + dictConfig(LOG_CONFIG) + asyncio.run(run(args)) + + +if __name__ == "__main__": + main()