Skip to content

Commit

Permalink
feat(can): replace uart script (#8450)
Browse files Browse the repository at this point in the history
* message payloads.

* identify uses payload object.

* Create can script.

* add todo.

* tests.

* add mock package.

* lint.

* memoize.

* clean up asyncio stuff.

* remove deprecated identify script. can_comm supercedes it.

* cosmetics.

* async.

* add scripts to readme

* lint
  • Loading branch information
amitlissack authored Oct 4, 2021
1 parent d7eaddc commit 63eb764
Show file tree
Hide file tree
Showing 16 changed files with 708 additions and 172 deletions.
3 changes: 2 additions & 1 deletion hardware/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ flake8 = "~=3.9.0"
flake8-annotations = "~=2.6.2"
flake8-docstrings = "~=1.6.0"
flake8-noqa = "~=1.1.0"

mock = "~=4.0.2"
types-mock = "==4.0.1"

[requires]
python_version = "3.7"
167 changes: 92 additions & 75 deletions hardware/Pipfile.lock

Large diffs are not rendered by default.

31 changes: 30 additions & 1 deletion hardware/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Hardware Project
# Opentrons-Hardware Package

## Running Tests

Expand All @@ -9,3 +9,32 @@ and running.
Tests will default to using `vcan0` as their SocketCAN network. If you wish to change
the network, then specify the `CAN_CHANNEL` environment variable with the name of
your network.

## Tools

The `opentrons-hardware` package includes some utility scripts.

### Header Generator

This will generate a C++ header file defining constants shared between firmware and the `opentrons-hardware` package.

#### Usage

```
opentrons_generate_header --target TARGET
```

Example: `opentrons_generate_header --target some_file.h`

### CAN Communication

This is a tool for sending messages to firmware over CAN bus

#### Usage

```
opentrons_can_comm --interface INTERFACE [--bitrate BITRATE]
[--channel CHANNEL]
```

Example: `opentrons_can_comm --interface socketcan --channel vcan0`
4 changes: 2 additions & 2 deletions hardware/opentrons_hardware/drivers/can_bus/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from .arbitration_id import ArbitrationId


@dataclass
@dataclass(frozen=True)
class CanMessage:
"""A can message."""

arbitration_id: ArbitrationId
data: bytearray
data: bytes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Can bus message definitions."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Defintion of CAN messages."""
from dataclasses import dataclass
from typing import Type

from typing_extensions import Literal

from opentrons_hardware.utils import BinarySerializable
from ..constants import MessageId
from . import payloads


@dataclass
class HeartbeatRequest: # noqa: D101
message_id: Literal[MessageId.heartbeat_request] = MessageId.heartbeat_request
payload_type: Type[BinarySerializable] = payloads.EmptyMessage


@dataclass
class HeartbeatResponse: # noqa: D101
message_id: Literal[MessageId.heartbeat_response] = MessageId.heartbeat_response
payload_type: Type[BinarySerializable] = payloads.EmptyMessage


@dataclass
class DeviceInfoRequest: # noqa: D101
message_id: Literal[MessageId.device_info_request] = MessageId.device_info_request
payload_type: Type[BinarySerializable] = payloads.EmptyMessage


@dataclass
class DeviceInfoResponse: # noqa: D101
message_id: Literal[MessageId.device_info_response] = MessageId.device_info_response
payload_type: Type[BinarySerializable] = payloads.DeviceInfoResponseBody


@dataclass
class StopRequest: # noqa: D101
message_id: Literal[MessageId.stop_request] = MessageId.stop_request
payload_type: Type[BinarySerializable] = payloads.EmptyMessage


@dataclass
class GetStatusRequest: # noqa: D101
message_id: Literal[MessageId.get_status_request] = MessageId.get_status_request
payload_type: Type[BinarySerializable] = payloads.EmptyMessage


@dataclass
class GetStatusResponse: # noqa: D101
message_id: Literal[MessageId.get_status_response] = MessageId.get_status_response
payload_type: Type[BinarySerializable] = payloads.EmptyMessage


@dataclass
class MoveRequest: # noqa: D101
message_id: Literal[MessageId.move_request] = MessageId.move_request
payload_type: Type[BinarySerializable] = payloads.MoveRequest


@dataclass
class SetupRequest: # noqa: D101
message_id: Literal[MessageId.setup_request] = MessageId.setup_request
payload_type: Type[BinarySerializable] = payloads.EmptyMessage


@dataclass
class GetSpeedRequest: # noqa: D101
message_id: Literal[MessageId.get_speed_request] = MessageId.get_speed_request
payload_type: Type[BinarySerializable] = payloads.EmptyMessage


@dataclass
class GetSpeedResponse: # noqa: D101
message_id: Literal[MessageId.get_speed_response] = MessageId.get_speed_response
payload_type: Type[BinarySerializable] = payloads.GetSpeedResponse


@dataclass
class WriteToEEPromRequest: # noqa: D101
message_id: Literal[MessageId.write_eeprom] = MessageId.write_eeprom
payload_type: Type[BinarySerializable] = payloads.WriteToEEPromRequest


@dataclass
class ReadFromEEPromRequest: # noqa: D101
message_id: Literal[MessageId.read_eeprom_request] = MessageId.read_eeprom_request
payload_type: Type[BinarySerializable] = payloads.EmptyMessage


@dataclass
class ReadFromEEPromResponse: # noqa: D101
message_id: Literal[MessageId.read_eeprom_response] = MessageId.read_eeprom_response
payload_type: Type[BinarySerializable] = payloads.ReadFromEEPromResponse
44 changes: 44 additions & 0 deletions hardware/opentrons_hardware/drivers/can_bus/messages/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Message types."""
from functools import lru_cache
from typing import Union, Optional

from typing_extensions import get_args

from opentrons_hardware.drivers.can_bus.messages import message_definitions as defs
from opentrons_hardware.drivers.can_bus.constants import MessageId

MessageDefinition = Union[
defs.HeartbeatRequest,
defs.HeartbeatResponse,
defs.DeviceInfoRequest,
defs.DeviceInfoResponse,
defs.StopRequest,
defs.GetStatusRequest,
defs.GetStatusResponse,
defs.MoveRequest,
defs.SetupRequest,
defs.GetSpeedRequest,
defs.GetSpeedResponse,
defs.WriteToEEPromRequest,
defs.ReadFromEEPromRequest,
defs.ReadFromEEPromResponse,
]


@lru_cache(maxsize=None)
def get_definition(message_id: MessageId) -> Optional[MessageDefinition]:
"""Get the message type for a message id.
Args:
message_id: A message id
Returns: The message definition for a type
"""
# Dumb linear search, but the result is memoized.
for i in get_args(MessageDefinition):
if i.message_id == message_id:
# get args returns Tuple[Any...]
return i # type: ignore

return None
55 changes: 55 additions & 0 deletions hardware/opentrons_hardware/drivers/can_bus/messages/payloads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Payloads of can bus messages."""
from dataclasses import dataclass

from opentrons_hardware import utils


@dataclass
class EmptyMessage(utils.BinarySerializable):
"""An empty payload."""

pass


@dataclass
class DeviceInfoResponseBody(utils.BinarySerializable):
"""Device info response."""

node_id: utils.UInt8Field
version: utils.UInt32Field


@dataclass
class GetStatusResponse(utils.BinarySerializable):
"""Get status response."""

status: utils.UInt8Field
data: utils.UInt32Field


@dataclass
class MoveRequest(utils.BinarySerializable):
"""Move request."""

steps: utils.UInt32Field


@dataclass
class GetSpeedResponse(utils.BinarySerializable):
"""Get speed response."""

mm_sec: utils.UInt32Field


@dataclass
class WriteToEEPromRequest(utils.BinarySerializable):
"""Write to eeprom request."""

serial_number: utils.UInt8Field


@dataclass
class ReadFromEEPromResponse(utils.BinarySerializable):
"""Read from ee prom response."""

serial_number: utils.UInt8Field
25 changes: 25 additions & 0 deletions hardware/opentrons_hardware/scripts/can_args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""ArgumentParser setup for a can device."""
from argparse import ArgumentParser


def add_can_args(parser: ArgumentParser) -> ArgumentParser:
"""Add CAN interface arguments to ArgumentParser.
Args:
parser: ArgumentParser
Returns: ArgumentParser with added arguments for CAN interface.
"""
parser.add_argument(
"--interface",
type=str,
required=True,
help="the interface to use (ie: virtual, pcan, socketcan)",
)
parser.add_argument(
"--bitrate", type=int, default=250000, required=False, help="the bitrate"
)
parser.add_argument(
"--channel", type=str, default=None, required=False, help="optional channel"
)
return parser
Loading

0 comments on commit 63eb764

Please sign in to comment.