Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hardware): add messages to fetch lifetime motor usage data #12464

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hardware/opentrons_hardware/firmware_bindings/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class MessageId(int, Enum):
pipette_info_response = 0x307
gripper_info_response = 0x308
set_serial_number = 0x30A
get_motor_usage_request = 0x30B
get_motor_usage_response = 0x30C

stop_request = 0x00

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,3 +829,25 @@ class PushTipPresenceNotification(BaseMessage):
message_id: Literal[
MessageId.tip_presence_notification
] = MessageId.tip_presence_notification


@dataclass
class GetMotorUsageRequest(EmptyPayloadMessage):
"""Prompt a motor to send it's total lifetime usage."""

message_id: Literal[
MessageId.get_motor_usage_request
] = MessageId.get_motor_usage_request


@dataclass
class GetMotorUsageResponse(BaseMessage):
"""Motor response with total lifetime usage."""

payload: payloads.GetMotorUsageResponsePayload
payload_type: Type[
payloads.GetMotorUsageResponsePayload
] = payloads.GetMotorUsageResponsePayload
message_id: Literal[
MessageId.get_motor_usage_response
] = MessageId.get_motor_usage_response
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@
defs.SetGripperErrorTolerance,
defs.PushTipPresenceNotification,
defs.TipStatusQueryRequest,
defs.GetMotorUsageRequest,
defs.GetMotorUsageResponse,
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,10 @@ class SerialNumberPayload(EmptyPayload):
"""A payload with a serial number."""

serial: SerialField


@dataclass(eq=False)
class GetMotorUsageResponsePayload(EmptyPayload):
"""A payload with motor lifetime usage."""

distance_um: utils.UInt64Field
150 changes: 150 additions & 0 deletions hardware/opentrons_hardware/scripts/dump_eeprom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/usr/bin/env python3
"""blow away and reset EEPROM file systems."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🌬️


import asyncio
import logging
import argparse
from logging.config import dictConfig
from typing import Dict, Any
from typing_extensions import Final

from opentrons_hardware.drivers.can_bus import build, CanMessenger
from opentrons_hardware.firmware_bindings import utils, ArbitrationId
from opentrons_hardware.firmware_bindings.constants import MessageId
from opentrons_hardware.firmware_bindings.messages import (
MessageDefinition,
message_definitions,
payloads,
)
from opentrons_hardware.firmware_bindings.constants import NodeId
from opentrons_hardware.scripts.can_args import add_can_args, build_settings

logger = logging.getLogger(__name__)

LOG_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"basic": {"format": "%(asctime)s %(name)s %(levelname)s %(message)s"}
},
"handlers": {
"stream_handler": {
"class": "logging.StreamHandler",
"formatter": "basic",
"level": logging.DEBUG,
},
},
"loggers": {
"": {
"handlers": ["stream_handler"],
"level": logging.DEBUG,
},
},
}

TARGETS: Final[Dict[str, NodeId]] = {
"head": NodeId.head,
"gantry-x": NodeId.gantry_x,
"gantry-y": NodeId.gantry_y,
"pipette-left": NodeId.pipette_left,
"pipette-right": NodeId.pipette_right,
"gripper": NodeId.gripper,
}


async def run(args: argparse.Namespace) -> None:
"""Script entrypoint."""
with open(args.file, "w") as f:

def handle_read_resp(
message: MessageDefinition, arbitration_id: ArbitrationId
) -> None:
"""Called by can messenger when a message arrives."""
if isinstance(message, message_definitions.ReadFromEEPromResponse):
f.write(
"".join("{:02x} ".format(x) for x in message.payload.data.value)
)
f.write("\n")

async with build.driver(build_settings(args)) as driver, CanMessenger(
driver
) as messenger:
messenger.add_listener(
handle_read_resp,
lambda arbitration_id: bool(
arbitration_id.parts.message_id == MessageId.read_eeprom_response
),
)

await dump_eeprom(
messenger,
TARGETS[args.target],
256 if args.old_version else 16384,
)


async def dump_eeprom(messenger: CanMessenger, node: NodeId, limit: int) -> None:
"""Wipe out all of the data used for the general purpose file system."""
start = 0
max_read = 8
while start < limit:
read_len = min(max_read, limit - start)
read_msg = message_definitions.ReadFromEEPromRequest(
payload=payloads.EEPromReadPayload(
address=utils.UInt16Field(start),
data_length=utils.UInt16Field(read_len),
)
)
start += read_len
await messenger.send(node, read_msg)


def main() -> None:
"""Entry point."""
parser = argparse.ArgumentParser(description=__doc__)

add_can_args(parser)
parser.add_argument(
"--target",
help="The FW subsystem to be cleared.",
type=str,
required=True,
choices=TARGETS.keys(),
)
parser.add_argument(
"--file",
help="file where to save the dump file.",
type=str,
default="/var/log/eeprom_dump.hex",
required=False,
)
parser.add_argument(
"--less-logs",
help="Set log level to INFO, so we see less logs.",
action="store_true",
default=False,
)
parser.add_argument(
"--old-version",
help="Enable this flag to clear eeprom on the older 256 Byte eeproms.",
action="store_true",
default=False,
)

args = parser.parse_args()

def _set_log_lvl_warn(d: Dict[str, Any]) -> None:
for k in d.keys():
if isinstance(d[k], dict):
_set_log_lvl_warn(d[k])
elif k == "level":
d[k] = logging.WARNING

if args.less_logs:
_set_log_lvl_warn(LOG_CONFIG)
dictConfig(LOG_CONFIG)
asyncio.run(run(args))


if __name__ == "__main__":
main()
127 changes: 127 additions & 0 deletions hardware/opentrons_hardware/scripts/reset_eeprom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""blow away and reset EEPROM file systems."""

import asyncio
import logging
import argparse
from logging.config import dictConfig
from typing import Dict, Any
from typing_extensions import Final

from opentrons_hardware.drivers.can_bus import build, CanMessenger
from opentrons_hardware.firmware_bindings import utils
from opentrons_hardware.firmware_bindings.messages import (
message_definitions,
payloads,
fields,
)
from opentrons_hardware.firmware_bindings.constants import NodeId
from opentrons_hardware.scripts.can_args import add_can_args, build_settings

logger = logging.getLogger(__name__)

LOG_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"basic": {"format": "%(asctime)s %(name)s %(levelname)s %(message)s"}
},
"handlers": {
"stream_handler": {
"class": "logging.StreamHandler",
"formatter": "basic",
"level": logging.DEBUG,
},
},
"loggers": {
"": {
"handlers": ["stream_handler"],
"level": logging.DEBUG,
},
},
}

TARGETS: Final[Dict[str, NodeId]] = {
"head": NodeId.head,
"gantry-x": NodeId.gantry_x,
"gantry-y": NodeId.gantry_y,
"pipette-left": NodeId.pipette_left,
"pipette-right": NodeId.pipette_right,
"gripper": NodeId.gripper,
}


async def run(args: argparse.Namespace) -> None:
"""Script entrypoint."""
async with build.driver(build_settings(args)) as driver, CanMessenger(
driver
) as messenger:
await clear_eeprom(
messenger,
TARGETS[args.target],
256 if args.old_version else 16384,
"0000000000000000" if args.old_version else "FFFFFFFFFFFFFFFF",
)


async def clear_eeprom(
messenger: CanMessenger, node: NodeId, limit: int, filler: str
) -> None:
"""Wipe out all of the data used for the general purpose file system."""
start = 28
max_write = 8
while start < limit:
write_len = min(max_write, limit - start)
write_msg = message_definitions.WriteToEEPromRequest(
payload=payloads.EEPromDataPayload(
address=utils.UInt16Field(start),
data_length=utils.UInt16Field(write_len),
data=fields.EepromDataField.from_string(filler),
)
)
start += write_len
await messenger.ensure_send(node, write_msg, expected_nodes=[node])


def main() -> None:
"""Entry point."""
parser = argparse.ArgumentParser(description=__doc__)

add_can_args(parser)
parser.add_argument(
"--target",
help="The FW subsystem to be cleared.",
type=str,
required=True,
choices=TARGETS.keys(),
)
parser.add_argument(
"--less-logs",
help="Set log level to INFO, so we see less logs.",
action="store_true",
default=False,
)
parser.add_argument(
"--old-version",
help="Enable this flag to clear eeprom on the older 256 Byte eeproms.",
action="store_true",
default=False,
)

args = parser.parse_args()

def _set_log_lvl_warn(d: Dict[str, Any]) -> None:
for k in d.keys():
if isinstance(d[k], dict):
_set_log_lvl_warn(d[k])
elif k == "level":
d[k] = logging.WARNING

if args.less_logs:
_set_log_lvl_warn(LOG_CONFIG)
dictConfig(LOG_CONFIG)
asyncio.run(run(args))


if __name__ == "__main__":
main()