diff --git a/api/src/opentrons/hardware_control/backends/ot3controller.py b/api/src/opentrons/hardware_control/backends/ot3controller.py index 6b587036f48..34292ce2bd5 100644 --- a/api/src/opentrons/hardware_control/backends/ot3controller.py +++ b/api/src/opentrons/hardware_control/backends/ot3controller.py @@ -50,6 +50,8 @@ except (OSError, ModuleNotFoundError): aionotify = None + +from opentrons_hardware.drivers import SystemDrivers from opentrons_hardware.drivers.can_bus import CanMessenger, DriverSettings from opentrons_hardware.drivers.can_bus.abstract_driver import AbstractCanDriver from opentrons_hardware.drivers.can_bus.build import build_driver @@ -58,6 +60,7 @@ SerialUsbDriver, build_rear_panel_driver, ) +from opentrons_hardware.drivers.eeprom import EEPROMDriver, EEPROMData from opentrons_hardware.hardware_control.move_group_runner import MoveGroupRunner from opentrons_hardware.hardware_control.motion_planning import ( Move, @@ -220,6 +223,7 @@ def __init__( config: OT3Config, driver: AbstractCanDriver, usb_driver: Optional[SerialUsbDriver] = None, + eeprom_driver: Optional[EEPROMDriver] = None, check_updates: bool = True, ) -> None: """Construct. @@ -232,7 +236,11 @@ def __init__( self._module_controls: Optional[AttachedModulesControl] = None self._messenger = CanMessenger(driver=driver) self._messenger.start() - self._gpio_dev, self._usb_messenger = self._build_system_hardware(usb_driver) + self._drivers = self._build_system_hardware( + self._messenger, usb_driver, eeprom_driver + ) + self._usb_messenger = self._drivers.usb_messenger + self._gpio_dev = self._drivers.gpio_dev self._subsystem_manager = SubsystemManager( self._messenger, self._usb_messenger, @@ -276,20 +284,41 @@ def fw_version(self) -> Dict[SubSystem, int]: for subsystem, info in self.subsystems.items() } + @property + def eeprom_driver(self) -> EEPROMDriver: + """The eeprom driver interface.""" + return self._drivers.eeprom + + @property + def eeprom_data(self) -> EEPROMData: + """Get the data on the eeprom.""" + return self._drivers.eeprom.data + @property def update_required(self) -> bool: return self._subsystem_manager.update_required and self._check_updates @staticmethod def _build_system_hardware( + can_messenger: CanMessenger, usb_driver: Optional[SerialUsbDriver], - ) -> Tuple[Union[OT3GPIO, RemoteOT3GPIO], Optional[BinaryMessenger]]: - if usb_driver is None: - return OT3GPIO("hardware_control"), None - else: + eeprom_driver: Optional[EEPROMDriver], + ) -> SystemDrivers: + gpio = OT3GPIO("hardware_control") + eeprom_driver = eeprom_driver or EEPROMDriver(gpio) + eeprom_driver.setup() + gpio_dev: Union[OT3GPIO, RemoteOT3GPIO] = gpio + usb_messenger: Optional[BinaryMessenger] = None + if usb_driver: usb_messenger = BinaryMessenger(usb_driver) usb_messenger.start() - return RemoteOT3GPIO(usb_messenger), usb_messenger + gpio_dev = RemoteOT3GPIO(usb_messenger) + return SystemDrivers( + can_messenger, + gpio_dev, + eeprom_driver, + usb_messenger=usb_messenger, + ) def _motor_nodes(self) -> Set[NodeId]: """Get a list of the motor controller nodes of all attached and ok devices.""" diff --git a/api/src/opentrons/hardware_control/backends/ot3simulator.py b/api/src/opentrons/hardware_control/backends/ot3simulator.py index 84e8378d495..b2074bd1b38 100644 --- a/api/src/opentrons/hardware_control/backends/ot3simulator.py +++ b/api/src/opentrons/hardware_control/backends/ot3simulator.py @@ -45,7 +45,7 @@ Move, Coordinates, ) - +from opentrons_hardware.drivers.eeprom import EEPROMData from opentrons.hardware_control.module_control import AttachedModulesControl from opentrons.hardware_control import modules from opentrons.hardware_control.types import ( @@ -197,6 +197,10 @@ def initialized(self) -> bool: def initialized(self, value: bool) -> None: self._initialized = value + @property + def eeprom_data(self) -> EEPROMData: + return EEPROMData() + @property def board_revision(self) -> BoardRevision: """Get the board revision""" diff --git a/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py b/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py index 47e008a02f0..ef2ce55ef7b 100644 --- a/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py +++ b/api/tests/opentrons/hardware_control/backends/test_ot3_controller.py @@ -25,6 +25,7 @@ target_to_subsystem, ) from opentrons.hardware_control.backends.subsystem_manager import SubsystemManager +from opentrons_hardware.drivers.eeprom import EEPROMDriver from opentrons_hardware.drivers.can_bus.can_messenger import ( MessageListenerCallback, MessageListenerCallbackFilter, @@ -128,12 +129,22 @@ def mock_usb_driver() -> SerialUsbDriver: return mock.AsyncMock(spec=SerialUsbDriver) +@pytest.fixture +def mock_eeprom_driver() -> EEPROMDriver: + """Mock eeprom driver.""" + return mock.Mock(spec=EEPROMDriver) + + @pytest.fixture def controller( - mock_config: OT3Config, mock_can_driver: AbstractCanDriver + mock_config: OT3Config, + mock_can_driver: AbstractCanDriver, + mock_eeprom_driver: EEPROMDriver, ) -> Iterator[OT3Controller]: - with mock.patch("opentrons.hardware_control.backends.ot3controller.OT3GPIO"): - yield OT3Controller(mock_config, mock_can_driver) + with (mock.patch("opentrons.hardware_control.backends.ot3controller.OT3GPIO")): + yield OT3Controller( + mock_config, mock_can_driver, eeprom_driver=mock_eeprom_driver + ) @pytest.fixture diff --git a/hardware/opentrons_hardware/drivers/__init__.py b/hardware/opentrons_hardware/drivers/__init__.py index 2be19755a1d..dadcfca4c77 100644 --- a/hardware/opentrons_hardware/drivers/__init__.py +++ b/hardware/opentrons_hardware/drivers/__init__.py @@ -1 +1,35 @@ """Drivers package.""" + +from typing import Optional, Union + +from .can_bus import CanMessenger +from .binary_usb import BinaryMessenger +from .gpio import OT3GPIO, RemoteOT3GPIO +from .eeprom import EEPROMDriver + + +class SystemDrivers: + """Holder class for hardware drivers.""" + + def __init__( + self, + can_messenger: CanMessenger, + gpio_dev: Union[OT3GPIO, RemoteOT3GPIO], + eeprom: EEPROMDriver, + usb_messenger: Optional[BinaryMessenger] = None, + ) -> None: + """Constructor""" + self.can_messenger: CanMessenger = can_messenger + self.usb_messenger: Optional[BinaryMessenger] = usb_messenger + self.gpio_dev: Union[OT3GPIO, RemoteOT3GPIO] = gpio_dev + self.eeprom: EEPROMDriver = eeprom + + +__all__ = [ + "SystemDrivers", + "CanMessenger", + "BinaryMessenger", + "EEPROMDriver", + "OT3GPIO", + "RemoteOT3GPIO", +] diff --git a/hardware/opentrons_hardware/drivers/eeprom/__init__.py b/hardware/opentrons_hardware/drivers/eeprom/__init__.py new file mode 100644 index 00000000000..dc6fedfd681 --- /dev/null +++ b/hardware/opentrons_hardware/drivers/eeprom/__init__.py @@ -0,0 +1,29 @@ +"""The eeprom interface package.""" + +from .eeprom import ( + EEPROMDriver, + DEFAULT_BUS, + DEFAULT_ADDRESS, + DEFAULT_READ_SIZE, +) + +from .types import ( + PropId, + PropType, + Property, + EEPROMData, + FORMAT_VERSION, +) + + +__all__ = [ + "PropId", + "PropType", + "Property", + "EEPROMData", + "EEPROMDriver", + "DEFAULT_BUS", + "DEFAULT_ADDRESS", + "DEFAULT_READ_SIZE", + "FORMAT_VERSION", +] diff --git a/hardware/opentrons_hardware/drivers/eeprom/build.py b/hardware/opentrons_hardware/drivers/eeprom/build.py new file mode 100644 index 00000000000..7f68348ba52 --- /dev/null +++ b/hardware/opentrons_hardware/drivers/eeprom/build.py @@ -0,0 +1,34 @@ +"""Factory for building the eeprom driver.""" + +from typing import Optional, Iterator +from pathlib import Path +from contextlib import contextmanager + +from ..gpio import OT3GPIO +from .eeprom import ( + EEPROMDriver, +) + + +DEFAULT_EEPROM_PATH = Path("/sys/bus/i2c/devices/3-0050/eeprom") + + +def build_eeprom_driver( + gpio: Optional[OT3GPIO] = None, eeprom_path: Optional[Path] = None +) -> EEPROMDriver: + """Create an instance of the eeprom driver""" + gpio = gpio or OT3GPIO("eeprom_hardware_controller") + eeprom_path = eeprom_path or DEFAULT_EEPROM_PATH + eeprom_driver = EEPROMDriver(gpio, eeprom_path=eeprom_path) + eeprom_driver.setup() + return eeprom_driver + + +@contextmanager +def eeprom_driver() -> Iterator[EEPROMDriver]: + """Context manager creating an eeprom driver.""" + d = build_eeprom_driver() + try: + yield d + finally: + d.__exit__() diff --git a/hardware/opentrons_hardware/drivers/eeprom/eeprom.py b/hardware/opentrons_hardware/drivers/eeprom/eeprom.py new file mode 100644 index 00000000000..4437158c50a --- /dev/null +++ b/hardware/opentrons_hardware/drivers/eeprom/eeprom.py @@ -0,0 +1,252 @@ +"""Module to read/write to the eeprom on the Flex SOM.""" + +import re +import os +import logging +from datetime import datetime +from pathlib import Path +from typing import Optional, Set, Type, Tuple, Any +from types import TracebackType + +from ..gpio import OT3GPIO +from .types import ( + PropId, + Property, + EEPROMData, +) +from .utils import ( + parse_data, + generate_packet, +) + +logger = logging.getLogger(__name__) + +# The default bus line and address of the i2c eeprom device +DEFAULT_BUS = 3 +DEFAULT_ADDRESS = "0050" +DEFAULT_READ_SIZE = 64 + + +class EEPROMDriver: + """This class lets you read/write to the eeprom using a sysfs file.""" + + def __init__( + self, + gpio: OT3GPIO, + bus: Optional[int] = DEFAULT_BUS, + address: Optional[str] = DEFAULT_ADDRESS, + eeprom_path: Optional[Path] = None, + ) -> None: + """Contructor + + Args: + gpio: An instance of the gpio class so we can toggle lines on the SOM + bus: The i2c bus this device is on + address: The unique address for this device + eeprom_path: The path of the eeprom device, for testing. + """ + self._gpio = gpio + self._bus = bus + self._address = address + self._eeprom_path = eeprom_path or Path( + f"/sys/bus/i2c/devices/{bus}-{address}/eeprom" + ) + self._size = 0 + self._name = "" + self._eeprom_fd = -1 + self._eeprom_data: EEPROMData = EEPROMData() + self._properties: Set[Property] = set() + + @property + def name(self) -> str: + """The name of this eeprom device.""" + return self._name + + @property + def address(self) -> str: + """The address of the i2c device.""" + return f"{self._bus}-{self._address}" + + @property + def size(self) -> int: + """The size in bytes of the eeprom.""" + return self._size + + @property + def data(self) -> EEPROMData: + """Object representing the serialized data stored in the eeprom.""" + return self._eeprom_data + + @property + def properties(self) -> Set[Property]: + """Returns a set of Property objects that are on the eeprom.""" + return self._properties + + def __enter__(self) -> "EEPROMDriver": + """Enter runtime context.""" + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_value: Optional[BaseException] = None, + traceback: Optional[TracebackType] = None, + ) -> bool: + """Exit runtime context and close the file descriptor.""" + self._gpio.deactivate_eeprom_wp() + return self.close() + + def __del__(self) -> None: + """Destructor to close the file descriptor.""" + self.close() + + def setup(self) -> None: + """Setup the class and serialize the eeprom data.""" + # Open a file descriptor for the eeprom + self._eeprom_fd = self.open() + # Get the eeeprom metadata + self._name, self._size = self._get_eeprom_info() + # Read and serialize eeprom data + self.property_read() + + def open(self) -> int: + """Opens up the eeprom file and returns the file descriptor.""" + if self._eeprom_fd > 0: + logger.warning("File descriptor already opened for eeprom") + return self._eeprom_fd + + try: + self._eeprom_fd = os.open(self._eeprom_path, os.O_RDWR) + except OSError: + logger.error(f"Could not open eeprom file - {self._eeprom_path}") + self._eeprom_fd = -1 + return self._eeprom_fd + + def close(self) -> bool: + """Close the file descriptor""" + if self._eeprom_fd != -1: + logger.debug("Closing eeprom file descriptor") + os.close(self._eeprom_fd) + self._eeprom_fd = -1 + return True + + def property_read(self, prop_ids: Optional[Set[PropId]] = None) -> Set[Property]: + """Returns a set of properties read from the eeprom.""" + properties: Set[Property] = set() + address = 0 + old_overflow = overflow = b"" + while True: + page = address // DEFAULT_READ_SIZE + 1 + logger.debug(f"Reading eeprom page {page}") + # read data in n byte chunks and prepend any leftover data from previous read + data = overflow + self._read(size=DEFAULT_READ_SIZE, address=address) + props, overflow = parse_data(data, prop_ids=prop_ids) + if props: + properties.update(props) + elif not props and not overflow: + # we dont have any more valid data to read so break out. + break + elif old_overflow == overflow: + # we have stale data + break + + # read the next page + old_overflow = overflow + address += DEFAULT_READ_SIZE + + # sort by PropId value to keep things in order + properties = set(sorted(properties, key=lambda prop: prop.id.value)) + + # update internal states + if properties: + self._properties = properties + self._populate_data() + return properties + + def property_write(self, properties: Set[Tuple[PropId, Any]]) -> Set[PropId]: + """Write the given properties to the eeprom, returning a set of the successful ones.""" + written_props: Set[PropId] = set() + # sort the properties so they are written in ascending order + properties = set(sorted(properties, key=lambda prop: prop[0].value)) + data: bytes = b"" + for prop_id, value in properties: + packet = generate_packet(prop_id, value) + if packet: + written_props.add(prop_id) + data += packet + if data: + try: + self._gpio.activate_eeprom_wp() + self._write(data) + except RuntimeError: + # something went wrong, clear written props + written_props = set() + finally: + self._gpio.deactivate_eeprom_wp() + return written_props + + def _read(self, size: int = DEFAULT_READ_SIZE, address: int = 0) -> bytes: + """Reads a number of bytes from the eeprom.""" + if self._eeprom_fd == -1: + raise RuntimeError(f"eeprom file descriptor is not opened {self.name}.") + + logger.debug( + f"Reading {size} bytes from address {hex(address)} for device {self.name}" + ) + data = b"" + try: + os.lseek(self._eeprom_fd, address, os.SEEK_SET) + data = os.read(self._eeprom_fd, size) + except Exception as e: + logger.error(f"Could not read from eeprom {self.name} - {e}") + return data + + def _write(self, data: bytes, address: int = 0) -> int: + """Write data to the eeprom at the given address.""" + if self._eeprom_fd == -1: + raise RuntimeError( + "Could not read from eeprom, file descriptor is unavailable" + ) + + try: + logger.debug( + f"Writing {len(data)} bytes to address {hex(address)} for device {self.name} - {data.hex()}" + ) + os.lseek(self._eeprom_fd, address, os.SEEK_SET) + return os.write(self._eeprom_fd, data) + except TimeoutError: + logging.error( + f"Could not write data to eeprom {self.address}, make sure the write bit is low." + ) + raise + + def _get_eeprom_info(self) -> Tuple[str, int]: + """This will get the name and size in bytes of the eeprom.""" + name = "" + size = 0 + eeprom_name = self._eeprom_path.parent / "name" + if os.path.exists(eeprom_name): + with open(eeprom_name) as fh: + name = fh.read().strip() + match = re.match(r"24c([\d]+)", name) + if match: + # The eeprom size is in kbytes so we need to + # multiply by 128 to get the bytes + size = int(match[1]) * 128 + return name, size + + def _populate_data(self) -> EEPROMData: + """This will create and populate the EEPROMData object.""" + for prop in self._properties: + if prop.id == PropId.FORMAT_VERSION: + self._eeprom_data.format_version = prop.value + elif prop.id == PropId.SERIAL_NUMBER and len(prop.value) >= 17: + self._eeprom_data.serial_number = prop.value + self._eeprom_data.machine_type = prop.value[:3] + self._eeprom_data.machine_version = prop.value[3:6] + date_string = prop.value[6:14] + self._eeprom_data.programmed_date = datetime.strptime( + date_string, "%Y%m%d" + ) + self._eeprom_data.unit_number = int(prop.value[14:17]) + return self._eeprom_data diff --git a/hardware/opentrons_hardware/drivers/eeprom/types.py b/hardware/opentrons_hardware/drivers/eeprom/types.py new file mode 100644 index 00000000000..64704cacaf7 --- /dev/null +++ b/hardware/opentrons_hardware/drivers/eeprom/types.py @@ -0,0 +1,71 @@ +"""Type definitions used for the eeprom interface module.""" + +from datetime import datetime +from dataclasses import dataclass +from enum import Enum +from typing import Any, Optional + + +# The version of the properties format +FORMAT_VERSION = 1 + + +# NOTE: a serialized property can be up-to 255 (0xff) bytes long, +# this includes the property id (1b) + data length (1b) + data (1-253b) +MAX_DATA_LEN = 253 + + +class PropId(Enum): + """The unique property id for a property.""" + + INVALID = 0xFF + FORMAT_VERSION = 1 + SERIAL_NUMBER = 2 + + +class PropType(Enum): + """The types of properties that can be saved/loaded from the eeprom.""" + + BYTE = 1 + CHAR = 2 + SHORT = 3 + INT = 4 + STR = 5 + BIN = 6 + + +PROP_ID_TYPES = { + PropId.FORMAT_VERSION: PropType.BYTE, + PropId.SERIAL_NUMBER: PropType.STR, +} + +PROP_TYPE_SIZE = { + PropType.BYTE: 1, + PropType.CHAR: 1, + PropType.SHORT: 2, + PropType.INT: 4, + PropType.STR: MAX_DATA_LEN, + PropType.BIN: MAX_DATA_LEN, +} + + +@dataclass(frozen=True) +class Property: + """A single piece of data to read/write to eeprom.""" + + id: PropId + type: PropType + max_size: int + value: Any + + +@dataclass +class EEPROMData: + """Dataclass that represents the serialized data from the eeprom.""" + + format_version: int = FORMAT_VERSION + serial_number: Optional[str] = None + machine_type: Optional[str] = None + machine_version: Optional[str] = None + programmed_date: Optional[datetime] = None + unit_number: Optional[int] = None diff --git a/hardware/opentrons_hardware/drivers/eeprom/utils.py b/hardware/opentrons_hardware/drivers/eeprom/utils.py new file mode 100644 index 00000000000..a721c1036bf --- /dev/null +++ b/hardware/opentrons_hardware/drivers/eeprom/utils.py @@ -0,0 +1,106 @@ +"""Utilities to parse and format bynary data into Property objects.""" + +import struct +from typing import Any, Optional, Set, Tuple + +from .types import ( + PropId, + Property, + PropType, + PROP_ID_TYPES, + PROP_TYPE_SIZE, + MAX_DATA_LEN, +) + + +ParsedData = Tuple[Set[Property], bytes] + + +def parse_data(data: bytes, prop_ids: Optional[Set[PropId]] = None) -> ParsedData: + """This function will parse bytes and return a list of valid Property objects. + + Any data that is unparsed or incomplete will be returned to the caller, + this way it can be combined with new data and reparsed. + """ + prop_ids = prop_ids or set(PropId.__members__.values()) + properties: Set[Property] = set() + packet = b"" + start_idx = end_idx = 0 + data_len = len(data) + while start_idx < data_len: + prop_id = data[start_idx] + # break out if we have an invalid prop id (0xff) + if prop_id == PropId.INVALID.value: + break + + # this will determine if the data is going to overflow + if start_idx + 1 >= data_len: + packet = data[start_idx:] + break + prop_len = data[start_idx + 1] + if prop_len > MAX_DATA_LEN: + break + end_idx = start_idx + 2 + prop_len + if end_idx > data_len: + packet = data[start_idx:] + break + + prop_data = data[start_idx + 2 : end_idx] + start_idx += prop_len + 2 # prop_id (1b) + prop_len (1b) + + # decode the data for the given property + prop = _parse_prop(prop_id, prop_len, prop_data) + if prop and prop.id in prop_ids: + properties.add(prop) + return properties, packet + + +def _parse_prop(prop_id: int, prop_len: int, data: bytes) -> Optional[Property]: + try: + prop = PropId(prop_id) + data_type = PROP_ID_TYPES[prop] + data_size = PROP_TYPE_SIZE[data_type] + decoded_data: Any = data + if data_type == PropType.BYTE: + decoded_data = data[0] + elif data_type == PropType.CHAR: + decoded_data = chr(data[0]) + elif data_type in [PropType.SHORT, PropType.INT]: + decoded_data = int.from_bytes(data, "big") + elif data_type == PropType.STR: + decoded_data = data.decode("utf-8") + return Property(id=prop, type=data_type, max_size=data_size, value=decoded_data) + except ValueError: + return None + + +def generate_packet(prop_id: PropId, value: Any) -> Optional[bytes]: + """This function will turn prop_ids and their data into a bytes for writting to eeprom.""" + data = _encode_data(prop_id, value) + if data and len(data) <= MAX_DATA_LEN: + return struct.pack("!BB", prop_id.value, len(data)) + data + return None + + +def _encode_data(prop_id: PropId, value: Any) -> Optional[bytes]: # noqa: C901 + if prop_id == PropId.INVALID: + return None + encoded_data: bytes = b"" + try: + prop_id = PropId(prop_id) + data_type = PROP_ID_TYPES[prop_id] + if data_type == PropType.BYTE: + encoded_data = struct.pack("!B", value) + elif data_type == PropType.CHAR: + encoded_data = struct.pack("!B", ord(value)) + elif data_type == PropType.SHORT: + encoded_data = struct.pack("!h", value) + elif data_type == PropType.INT: + encoded_data = struct.pack("!i", value) + elif data_type == PropType.STR: + encoded_data = f"{value}".encode("utf-8") + elif data_type == PropType.BIN: + encoded_data = bytes(value) + return encoded_data + except (ValueError, TypeError, struct.error): + return None diff --git a/hardware/opentrons_hardware/drivers/gpio/__init__.py b/hardware/opentrons_hardware/drivers/gpio/__init__.py index b9617334db8..bd02bec3adc 100644 --- a/hardware/opentrons_hardware/drivers/gpio/__init__.py +++ b/hardware/opentrons_hardware/drivers/gpio/__init__.py @@ -15,6 +15,7 @@ CONSUMER_NAME_DEFAULT: Final[str] = "opentrons" ESTOP_OUT_GPIO_NAME: Final[str] = "SODIMM_210" NSYNC_OUT_GPIO_NAME: Final[str] = "SODIMM_206" +EEPROM_WP_OUT_GPIO_NAME: Final[str] = "SODIMM_222" LOG = getLogger(__name__) @@ -53,8 +54,13 @@ def __init__(self, consumer_name: Optional[str] = None) -> None: self._nsync_out_line.request( self._consumer_name, type=self._gpiod.LINE_REQ_DIR_OUT ) + self._eeprom_wp_out_line = self._gpiod.find_line(EEPROM_WP_OUT_GPIO_NAME) + self._eeprom_wp_out_line.request( + self._consumer_name, type=self._gpiod.LINE_REQ_DIR_OUT + ) self.deactivate_estop() self.deactivate_nsync_out() + self.deactivate_eeprom_wp() sleep(1) def activate_estop(self) -> None: @@ -77,6 +83,14 @@ def deactivate_nsync_out(self) -> None: """Stop asserting the nsync out line.""" self._nsync_out_line.set_value(1) + def activate_eeprom_wp(self) -> None: + """Assert the eeprom write protect, which will enable writes to the eeprom.""" + self._eeprom_wp_out_line.set_value(0) + + def deactivate_eeprom_wp(self) -> None: + """Stop asserting the eeprom wp line.""" + self._eeprom_wp_out_line.set_value(1) + class RemoteOT3GPIO: """Driver class for OT3 gpio lines that are controlled remotely.""" diff --git a/hardware/tests/opentrons_hardware/drivers/eeprom/__init__.py b/hardware/tests/opentrons_hardware/drivers/eeprom/__init__.py new file mode 100644 index 00000000000..46d2cd406b7 --- /dev/null +++ b/hardware/tests/opentrons_hardware/drivers/eeprom/__init__.py @@ -0,0 +1 @@ +"""eeprom driver tests""" diff --git a/hardware/tests/opentrons_hardware/drivers/eeprom/test_driver.py b/hardware/tests/opentrons_hardware/drivers/eeprom/test_driver.py new file mode 100644 index 00000000000..ccb6485b8a9 --- /dev/null +++ b/hardware/tests/opentrons_hardware/drivers/eeprom/test_driver.py @@ -0,0 +1,448 @@ +"""Tests for the eeprom module.""" + +import mock +import pytest +import tempfile + +from enum import Enum +from pathlib import Path +from datetime import datetime +from typing import Generator + + +from opentrons_hardware.drivers import OT3GPIO +from opentrons_hardware.drivers.eeprom import ( + EEPROMDriver, + PropId, +) + + +@pytest.fixture +def eeprom_api() -> Generator[EEPROMDriver, None, None]: + """Mock out OT3GPIO and create a temp /eeprom and /name files.""" + with tempfile.TemporaryDirectory() as eeprom_dir: + # create eeprom and name files + eeprom_path = Path(eeprom_dir) / "eeprom" + eeprom_name_path = eeprom_path.parent / "name" + with open(eeprom_path, "wb"), open(eeprom_name_path, "w") as fh: + # write we can get the name and size of the eeprom + fh.write("24c128") + gpio = mock.Mock(spec=OT3GPIO) + yield EEPROMDriver(gpio, eeprom_path=eeprom_path) + + +def test_eeprom_setup(eeprom_api: EEPROMDriver) -> None: + """Test that the eeprom is setup successfully.""" + # write some data to load from + with open(eeprom_api._eeprom_path, "wb") as fh: + fh.write(b"\x02\x11FLXA1020230602001") + + # Make sure we dont have any data loaded yet + assert eeprom_api._name == "" + assert eeprom_api._size == 0 + assert eeprom_api._eeprom_fd == -1 + assert len(eeprom_api._properties) == 0 + assert eeprom_api.data.serial_number is None + assert eeprom_api.data.machine_type is None + assert eeprom_api.data.machine_version is None + assert eeprom_api.data.programmed_date is None + assert eeprom_api.data.unit_number is None + + # call the setup function + eeprom_api.setup() + + # We now have the name and size of the eeprom + assert eeprom_api.name == "24c128" + assert eeprom_api.size == 16384 + + # We now have a file descriptor pointing to the eeprom + assert eeprom_api._eeprom_fd != -1 + # As well as some properties the setup function deserialized + assert len(eeprom_api._properties) == 1 + assert {prop.id == PropId.SERIAL_NUMBER for prop in eeprom_api._properties} + assert eeprom_api.data.serial_number == "FLXA1020230602001" + assert eeprom_api.data.machine_type == "FLX" + assert eeprom_api.data.machine_version == "A10" + assert eeprom_api.data.programmed_date == datetime(2023, 6, 2) + assert eeprom_api.data.unit_number == 1 + + +def test_eeprom_setup_no_data(eeprom_api: EEPROMDriver) -> None: + """Test that we have default values if there is no data to load during setup.""" + # Make sure we dont have any data loaded yet + assert eeprom_api._eeprom_fd == -1 + assert len(eeprom_api._properties) == 0 + assert eeprom_api.data.serial_number is None + assert eeprom_api.data.machine_type is None + assert eeprom_api.data.machine_version is None + assert eeprom_api.data.programmed_date is None + assert eeprom_api.data.unit_number is None + + # call the setup function + eeprom_api.setup() + + # We know have a file descriptor pointing to the eeprom + assert eeprom_api._eeprom_fd != -1 + + # But we dont have any new data loaded in + assert len(eeprom_api._properties) == 0 + assert eeprom_api.data.serial_number is None + assert eeprom_api.data.machine_type is None + assert eeprom_api.data.machine_version is None + assert eeprom_api.data.programmed_date is None + assert eeprom_api.data.unit_number is None + + +def test_eeprom_open(eeprom_api: EEPROMDriver) -> None: + """Test that eeprom fd is opened before anything else.""" + # write some data to load from + with open(eeprom_api._eeprom_path, "wb") as fh: + fh.write(b"\x02\x11FLXA1020230602001") + + # Make sure we dont have any data loaded yet + assert eeprom_api._eeprom_fd == -1 + assert len(eeprom_api._properties) == 0 + assert eeprom_api.data.serial_number is None + assert eeprom_api.data.machine_type is None + assert eeprom_api.data.machine_version is None + assert eeprom_api.data.programmed_date is None + + # call read function, which will raise a RuntimeError exception + with pytest.raises(RuntimeError): + eeprom_api.property_read({PropId.SERIAL_NUMBER}) + + # now if we open the eeprom fd for reading we should succeed + eeprom_api.open() + assert eeprom_api._eeprom_fd != 1 + + # now that we have a file descriptor, read some data from it + result = eeprom_api.property_read({PropId.SERIAL_NUMBER}) + assert len(result) == 1 + assert {prop.id == PropId.SERIAL_NUMBER for prop in result} + + +def test_eeprom_open_context(eeprom_api: EEPROMDriver) -> None: + """Test that we can use the EEPROMDriver class as a context manager.""" + with open(eeprom_api._eeprom_path, "wb") as fh: + fh.write(b"\x02\x11FLXA1020230602001") + + # Make sure we dont have any data loaded yet + assert eeprom_api._eeprom_fd == -1 + assert len(eeprom_api._properties) == 0 + assert eeprom_api.data.serial_number is None + assert eeprom_api.data.machine_type is None + assert eeprom_api.data.machine_version is None + assert eeprom_api.data.programmed_date is None + + # make sure we can read within context + with eeprom_api as eeprom: + assert eeprom._eeprom_fd != -1 + result = eeprom.property_read() + assert len(result) == 1 + + # now we have left context the eeprom fd should be closed + assert eeprom_api._eeprom_fd == -1 + + +def test_eeprom_open_more_than_once(eeprom_api: EEPROMDriver) -> None: + """Test that we can only have one fd open at once.""" + # the file descriptor is closed by default + assert eeprom_api._eeprom_fd == -1 + # once we call open we get a non -1 fd + old_fd = eeprom_api.open() + assert eeprom_api._eeprom_fd != -1 + + # however if we can open again we dont create a new fd, but return the old one + new_fd = eeprom_api.open() + assert new_fd == old_fd + + +def test_eeprom_close(eeprom_api: EEPROMDriver) -> None: + """Test closing of the file descriptor.""" + # nothing happens if the fd is already closed + assert eeprom_api._eeprom_fd == -1 + assert eeprom_api.close() + + # if the fd is open then we close it, we clear the fd variable + old_fd = eeprom_api.open() + assert old_fd != -1 + + assert eeprom_api.close() + assert eeprom_api._eeprom_fd != old_fd + assert eeprom_api._eeprom_fd == -1 + + +def test_property_read_single(eeprom_api: EEPROMDriver) -> None: + """Test that we can read one property from the eeprom.""" + # register the file descriptor + eeprom_api.open() + + # make sure we have no data on the fake eeprom + assert eeprom_api.properties == set() + with open(eeprom_api._eeprom_path, "rb") as fh: + assert fh.read() == b"" + + # write multiple properties to the fake eeprom so we can test only + # reading the one we want + prop_ids = {PropId.FORMAT_VERSION, PropId.SERIAL_NUMBER} + result = eeprom_api.property_write( + {(PropId.FORMAT_VERSION, 1), (PropId.SERIAL_NUMBER, "FLXA1020230602001")} + ) + + # make sure the return value is a set of the PropIds that were written + assert result == prop_ids + + # now we should have data in the fake eeprom + with open(eeprom_api._eeprom_path, "rb") as fh: + assert fh.read() != b"" + + # read a single property from it and validate the data + props = eeprom_api.property_read({PropId.FORMAT_VERSION}) + assert len(props) == 1 + for prop in props: + assert prop.id == PropId.FORMAT_VERSION + assert prop.value == 1 + assert eeprom_api.data.format_version == 1 + + # make sure the internal state is updated + assert len(eeprom_api.properties) == 1 + for prop in eeprom_api.properties: + assert prop.id in prop_ids + + +def test_property_read_multi(eeprom_api: EEPROMDriver) -> None: + """Test that we can read multiple properties from the eeprom.""" + # register the file descriptor + eeprom_api.open() + + # make sure we have no data on the fake eeprom + with open(eeprom_api._eeprom_path, "rb") as fh: + assert fh.read() == b"" + + # make sure we dont have any serialized data + assert len(eeprom_api.properties) == 0 + assert eeprom_api.data.format_version == 1 + assert eeprom_api.data.machine_type is None + assert eeprom_api.data.machine_version is None + assert eeprom_api.data.programmed_date is None + assert eeprom_api.data.unit_number is None + + # write multiple properties to the fake eeprom so we can test only + prop_ids = {PropId.FORMAT_VERSION, PropId.SERIAL_NUMBER} + result = eeprom_api.property_write( + {(PropId.FORMAT_VERSION, 2), (PropId.SERIAL_NUMBER, "FLXA1020230604004")} + ) + + # make sure the return value is a set of the PropIds that were written + assert result == prop_ids + + # now we should have data in the fake eeprom + with open(eeprom_api._eeprom_path, "rb") as fh: + data = fh.read() + assert data != b"" + + # read multiple properties and validate the value + props = list(eeprom_api.property_read(prop_ids)) + assert len(props) == 2 + for prop in props: + # check that the prop we read is the one we asked to read + assert prop.id in prop_ids + + # validate the value we read + if prop.id == PropId.FORMAT_VERSION: + assert prop.value == 2 + # make sure we update our internal cache + assert eeprom_api.data.format_version == 2 + + if prop.id == PropId.SERIAL_NUMBER: + assert prop.value == "FLXA1020230604004" + # make sure we update our internal cache + assert eeprom_api.data.serial_number == "FLXA1020230604004" + assert eeprom_api.data.machine_type == "FLX" + assert eeprom_api.data.machine_version == "A10" + assert eeprom_api.data.programmed_date == datetime(2023, 6, 4) + assert eeprom_api.data.unit_number == 4 + + # make sure that the internal properties variable is updated as well + for prop in eeprom_api.properties: + assert prop.id in result + + +def test_property_read_overflow(eeprom_api: EEPROMDriver) -> None: + """Test that data overflow is handled properly.""" + # while we dont have many properties right now, eventually we might + # which means we could get into a situation where a serialized property + # might span more than one page (64 bytes by default). In this case + # we want to make sure data that has overflown is combined with new + # data and re-parsed. + + # Lets bring down the default read size by patching DEFAULT_READ_SIZE, + # this way we can run into the overflow issue with less data. + mock.patch("opentrons_hardware.drivers.eeprom.eeprom.DEFAULT_READ_SIZE", 10) + + # write some test data greater than DEFAULT_READ_SIZE (10 in this case) + eeprom_api.open() + w_size = eeprom_api._write(b"\x02\x10123456789ABCDEFG") + + # verify that the length of the data we wrote is whats actually written + r_size = len(eeprom_api._read(size=40)) + assert w_size == r_size + + # now lets do a property read with the default read size of 5, so it would take + # at least 2 read cycles + the overflow data to reach a valid property. + with mock.patch("opentrons_hardware.drivers.eeprom.eeprom.DEFAULT_READ_SIZE", 5): + props = list(eeprom_api.property_read({PropId.SERIAL_NUMBER})) + assert len(props) == 1 + assert props[0].id == PropId.SERIAL_NUMBER + assert props[0].value == "123456789ABCDEFG" + + +def test_property_read_invalid_data_prop_id(eeprom_api: EEPROMDriver) -> None: + """Test that we can handle invalid eeprom data.""" + # Invalid data refers to any data whose + # 1. PropId (byte 0) is invalid + # 2. Property length (byte 1) does not match the data length + # 3. Property length (byte 1) goes over the MAX_DATA_LEN = 253b + eeprom_api.open() + + # lets write some junk data + w_size = eeprom_api._write( + b"\xab\x11123456q3dasda2BCDEFG\xff" + + b"\xff\xff\xff\xff\xff\xff\xff\xff" + + b"\xff\xff\xff\xff\xff\xff\xff\xff" + ) + + # while we can read this data, it cant be parsed + r_size = len(eeprom_api._read()) + assert r_size == w_size + + # if we try and parse this we get no properties + props = eeprom_api.property_read() + assert len(props) == 0 + + +def test_property_read_invalid_data_blank(eeprom_api: EEPROMDriver) -> None: + """Test reading from eeprom with default data (0xff).""" + eeprom_api.open() + + # by default the eeprom is written with all 0xff + # since 0xff (255) is not a valid PropId we should ignore this data + eeprom_api._write( + b"\xff\xff\xff\xff\xff\xff\xff\xff" + + b"\xff\xff\xff\xff\xff\xff\xff\xff" + + b"\xff\xff\xff\xff\xff\xff\xff\xff" + ) + + assert len(eeprom_api.property_read()) == 0 + + +def test_property_write_single(eeprom_api: EEPROMDriver) -> None: + """Test that we can write single properties to the eeprom.""" + eeprom_api.open() + + # make sure we have no data on the fake eeprom + assert eeprom_api.properties == set() + with open(eeprom_api._eeprom_path, "rb") as fh: + assert fh.read() == b"" + + # lets write on property + prop_ids = {PropId.FORMAT_VERSION} + result = eeprom_api.property_write({(PropId.FORMAT_VERSION, 3)}) + # make sure we wrote the property + assert result == prop_ids + + # now lets read it back to confirm + props = list(eeprom_api.property_read()) + assert len(props) == 1 + assert props[0].id == PropId.FORMAT_VERSION + assert props[0].value == 3 + + +def test_property_write_multi(eeprom_api: EEPROMDriver) -> None: + """Test that we can write multiple properties to the eeprom.""" + eeprom_api.open() + + # make sure we have no data on the fake eeprom + assert eeprom_api.properties == set() + with open(eeprom_api._eeprom_path, "rb") as fh: + assert fh.read() == b"" + + # we can write multiple properties at once + prop_ids = {PropId.FORMAT_VERSION, PropId.SERIAL_NUMBER} + result = eeprom_api.property_write( + {(PropId.FORMAT_VERSION, 4), (PropId.SERIAL_NUMBER, "FLXA1020230604004")} + ) + + # make sure we wrote the properties + assert result == prop_ids + + # now read them back and make sure we have the same properties + props = eeprom_api.property_read() + assert len(props) == len(prop_ids) + for prop in props: + # make sure reading this updated our internal states + assert prop in eeprom_api.properties + assert prop.id in prop_ids + if prop.id == PropId.FORMAT_VERSION: + assert prop.value == 4 + assert eeprom_api.data.format_version == 4 + elif prop.id == PropId.SERIAL_NUMBER: + assert prop.value == "FLXA1020230604004" + assert eeprom_api.data.machine_type == "FLX" + assert eeprom_api.data.machine_version == "A10" + assert eeprom_api.data.programmed_date == datetime(2023, 6, 4) + assert eeprom_api.data.unit_number == 4 + + +def test_property_write_invalid_property(eeprom_api: EEPROMDriver) -> None: + """Test that invalid properties are not written to the eeprom.""" + eeprom_api.open() + + # dont write unknown PropIds + class FakePropId(Enum): + FAKE = 99 + + # attempt to write properties + result = eeprom_api.property_write( + { + (FakePropId.FAKE, "some data"), # type: ignore + (PropId.SERIAL_NUMBER, "FLXA1020230604004"), + } + ) + + # only PropId.SERIAL_NUMBER should be written + assert len(result) == 1 + assert PropId.SERIAL_NUMBER in result + + # verify by reading the data back + props = list(eeprom_api.property_read()) + assert len(props) == len(result) == 1 + assert props[0].id == PropId.SERIAL_NUMBER + assert props[0].value == "FLXA1020230604004" + + +def test_property_write_invalid_property_data(eeprom_api: EEPROMDriver) -> None: + """Test that invalida property data is not written to the eeprom.""" + eeprom_api.open() + + # although a PropId might be valid, we want to make sure the correct data is passed + # in for the property type and that we arent writting invalid data. + result = eeprom_api.property_write( + { + (PropId.FORMAT_VERSION, "not an int"), + (PropId.SERIAL_NUMBER, "FLXA1020230604004"), + } + ) + + # since PropId.FORMAT_VERSION expects an int it should not be written + # only PropId.SERIAL_NUMBER which is a string should be on the fake eeprom + assert len(result) == 1 + + # validate by reading back the data + props = list(eeprom_api.property_read()) + + # we should have the same number of properties read as written + assert len(props) == len(result) == 1 + + assert props[0].id == PropId.SERIAL_NUMBER + assert props[0].value == "FLXA1020230604004" diff --git a/hardware/tests/opentrons_hardware/drivers/eeprom/test_utils.py b/hardware/tests/opentrons_hardware/drivers/eeprom/test_utils.py new file mode 100644 index 00000000000..8dd59b9f3cb --- /dev/null +++ b/hardware/tests/opentrons_hardware/drivers/eeprom/test_utils.py @@ -0,0 +1 @@ +"""Test the parser util functions."""