-
Notifications
You must be signed in to change notification settings - Fork 179
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(shared-data, api): add absorbance plate reader definition and mo…
…dule control (#15167) <!-- Thanks for taking the time to open a pull request! Please make sure you've read the "Opening Pull Requests" section of our Contributing Guide: https://github.com/Opentrons/opentrons/blob/edge/CONTRIBUTING.md#opening-pull-requests To ensure your code is reviewed quickly and thoroughly, please fill out the sections below to the best of your ability! --> # Overview This PR adds support for the Absorbance Plate Reader in the hardware controller API. We can now see the plate reader as an attached module on the backend and use the AbsorbanceReaderDriver to communicate with the plate reader like we would with the other modules. # Caveat The driver is not fully finished - there are some functions that still need to be implemented and I expect the format of the response will change depending on how we implement the engine commands. We still need to add some changes on oe-core side to make sure this runs on all robot - however i'm happy to show you the plate reader in action at this time.
- Loading branch information
1 parent
bda3836
commit 643a9dd
Showing
27 changed files
with
5,336 additions
and
810 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from .abstract import AbstractAbsorbanceReaderDriver | ||
from .driver import AbsorbanceReaderDriver | ||
from .simulator import SimulatingDriver | ||
from .hid_protocol import AbsorbanceHidInterface | ||
|
||
__all__ = [ | ||
"AbstractAbsorbanceReaderDriver", | ||
"AbsorbanceReaderDriver", | ||
"SimulatingDriver", | ||
"AbsorbanceHidInterface", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import Dict, List | ||
from opentrons.drivers.types import AbsorbanceReaderLidStatus | ||
|
||
|
||
class AbstractAbsorbanceReaderDriver(ABC): | ||
@abstractmethod | ||
async def connect(self) -> None: | ||
"""Connect to absorbance reader""" | ||
... | ||
|
||
@abstractmethod | ||
async def disconnect(self) -> None: | ||
"""Disconnect from absorbance reader""" | ||
... | ||
|
||
@abstractmethod | ||
async def is_connected(self) -> bool: | ||
"""Check connection to absorbance reader""" | ||
... | ||
|
||
@abstractmethod | ||
async def get_lid_status(self) -> AbsorbanceReaderLidStatus: | ||
... | ||
|
||
@abstractmethod | ||
async def get_available_wavelengths(self) -> List[int]: | ||
... | ||
|
||
@abstractmethod | ||
async def get_single_measurement(self, wavelength: int) -> List[float]: | ||
... | ||
|
||
@abstractmethod | ||
async def initialize_measurement(self, wavelength: int) -> None: | ||
... | ||
|
||
@abstractmethod | ||
async def get_status(self) -> None: | ||
... | ||
|
||
@abstractmethod | ||
async def get_device_info(self) -> Dict[str, str]: | ||
"""Get device info""" | ||
... |
272 changes: 272 additions & 0 deletions
272
api/src/opentrons/drivers/absorbance_reader/async_byonoy.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,272 @@ | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
import re | ||
from concurrent.futures.thread import ThreadPoolExecutor | ||
from functools import partial | ||
from typing import Optional, List, Dict | ||
import usb.core as usb_core # type: ignore[import-untyped] | ||
|
||
|
||
from .hid_protocol import AbsorbanceHidInterface as AbsProtocol, ErrorCodeNames | ||
from opentrons.drivers.types import ( | ||
AbsorbanceReaderLidStatus, | ||
AbsorbanceReaderPlatePresence, | ||
) | ||
from opentrons.drivers.rpi_drivers.types import USBPort | ||
|
||
|
||
SN_PARSER = re.compile(r'ATTRS{serial}=="(?P<serial>.+?)"') | ||
|
||
|
||
class AsyncByonoy: | ||
"""Async wrapper around Byonoy Device Library.""" | ||
|
||
@staticmethod | ||
def match_device_with_sn( | ||
sn: str, devices: List[AbsProtocol.Device] | ||
) -> AbsProtocol.Device: | ||
for device in devices: | ||
if device.sn == sn: | ||
return device | ||
raise RuntimeError(f"Unavailble module with serial number: {sn}") | ||
|
||
@staticmethod | ||
def serial_number_from_port(name: str) -> str: | ||
""" | ||
Get the serial number from a port using pyusb. | ||
""" | ||
port_numbers = tuple(int(s) for s in name.split("-")[1].split(".")) | ||
device = usb_core.find(port_numbers=port_numbers) | ||
if device: | ||
return str(device.serial_number) | ||
raise RuntimeError(f"Could not find serial number for port: {name}") | ||
|
||
@classmethod | ||
async def create( | ||
cls, | ||
port: str, | ||
usb_port: USBPort, | ||
loop: Optional[asyncio.AbstractEventLoop] = None, | ||
) -> AsyncByonoy: | ||
""" | ||
Create an AsyncByonoy instance. | ||
Args: | ||
port: url or port name | ||
baud_rate: the baud rate | ||
timeout: optional timeout in seconds | ||
write_timeout: optional write timeout in seconds | ||
loop: optional event loop. if None get_running_loop will be used | ||
reset_buffer_before_write: reset the serial input buffer before | ||
writing to it | ||
""" | ||
loop = loop or asyncio.get_running_loop() | ||
executor = ThreadPoolExecutor(max_workers=1) | ||
|
||
import pybyonoy_device_library as byonoy # type: ignore[import-not-found] | ||
|
||
interface: AbsProtocol = byonoy | ||
|
||
device_sn = cls.serial_number_from_port(usb_port.name) | ||
found: List[AbsProtocol.Device] = await loop.run_in_executor( | ||
executor=executor, func=byonoy.byonoy_available_devices | ||
) | ||
device = cls.match_device_with_sn(device_sn, found) | ||
|
||
return cls( | ||
interface=interface, | ||
device=device, | ||
executor=executor, | ||
loop=loop, | ||
) | ||
|
||
def __init__( | ||
self, | ||
interface: AbsProtocol, | ||
device: AbsProtocol.Device, | ||
executor: ThreadPoolExecutor, | ||
loop: asyncio.AbstractEventLoop, | ||
) -> None: | ||
""" | ||
Constructor | ||
Args: | ||
serial: connected Serial object | ||
executor: a thread pool executor | ||
loop: event loop | ||
""" | ||
self._interface = interface | ||
self._device = device | ||
self._executor = executor | ||
self._loop = loop | ||
self._supported_wavelengths: Optional[list[int]] = None | ||
self._device_handle: Optional[int] = None | ||
self._current_config: Optional[AbsProtocol.MeasurementConfig] = None | ||
|
||
def _cleanup(self) -> None: | ||
self._device_handle = None | ||
|
||
def _open(self) -> None: | ||
err, device_handle = self._interface.byonoy_open_device(self._device) | ||
if err.name != "BYONOY_ERROR_NO_ERROR": | ||
raise RuntimeError(f"Error opening device: {err}") | ||
self._device_handle = device_handle | ||
|
||
def _free(self) -> None: | ||
if self._device_handle: | ||
self._interface.byonoy_free_device(self._device_handle) | ||
self._cleanup() | ||
|
||
def verify_device_handle(self) -> int: | ||
assert self._device_handle is not None, RuntimeError( | ||
"Device handle not set up." | ||
) | ||
return self._device_handle | ||
|
||
def _raise_if_error( | ||
self, | ||
err_name: ErrorCodeNames, | ||
msg: str = "Error occurred: ", | ||
) -> None: | ||
if err_name != "BYONOY_ERROR_NO_ERROR": | ||
raise RuntimeError(msg, err_name) | ||
|
||
def _get_device_information(self) -> AbsProtocol.DeviceInfo: | ||
handle = self.verify_device_handle() | ||
err, device_info = self._interface.byonoy_get_device_information(handle) | ||
self._raise_if_error(err.name, "Error getting device information: ") | ||
return device_info | ||
|
||
def _get_device_status(self) -> AbsProtocol.DeviceState: | ||
handle = self.verify_device_handle() | ||
err, status = self._interface.byonoy_get_device_status(handle) | ||
self._raise_if_error(err.name, "Error getting device status: ") | ||
return status | ||
|
||
def _get_slot_status(self) -> AbsProtocol.SlotState: | ||
handle = self.verify_device_handle() | ||
err, slot_status = self._interface.byonoy_get_device_slot_status(handle) | ||
self._raise_if_error(err.name, "Error getting slot status: ") | ||
return slot_status | ||
|
||
def _get_lid_status(self) -> bool: | ||
handle = self.verify_device_handle() | ||
lid_on: bool | ||
err, lid_on = self._interface.byonoy_get_device_parts_aligned(handle) | ||
self._raise_if_error(err.name, "Error getting lid status: ") | ||
return lid_on | ||
|
||
def _get_supported_wavelengths(self) -> List[int]: | ||
handle = self.verify_device_handle() | ||
wavelengths: List[int] | ||
err, wavelengths = self._interface.byonoy_abs96_get_available_wavelengths( | ||
handle | ||
) | ||
self._raise_if_error(err.name, "Error getting available wavelengths: ") | ||
self._supported_wavelengths = wavelengths | ||
return wavelengths | ||
|
||
def _initialize_measurement(self, conf: AbsProtocol.MeasurementConfig) -> None: | ||
handle = self.verify_device_handle() | ||
err = self._interface.byonoy_abs96_initialize_single_measurement(handle, conf) | ||
self._raise_if_error(err.name, "Error initializing measurement: ") | ||
self._current_config = conf | ||
|
||
def _single_measurement(self, conf: AbsProtocol.MeasurementConfig) -> List[float]: | ||
handle = self.verify_device_handle() | ||
measurements: List[float] | ||
err, measurements = self._interface.byonoy_abs96_single_measure(handle, conf) | ||
self._raise_if_error(err.name, "Error getting single measurement: ") | ||
return measurements | ||
|
||
def _set_sample_wavelength(self, wavelength: int) -> AbsProtocol.MeasurementConfig: | ||
if not self._supported_wavelengths: | ||
self._get_supported_wavelengths() | ||
assert self._supported_wavelengths | ||
if wavelength in self._supported_wavelengths: | ||
conf = self._interface.ByonoyAbs96SingleMeasurementConfig() | ||
conf.sample_wavelength = wavelength | ||
return conf | ||
else: | ||
raise ValueError( | ||
f"Unsupported wavelength: {wavelength}, expected: {self._supported_wavelengths}" | ||
) | ||
|
||
def _initialize(self, wavelength: int) -> None: | ||
conf = self._set_sample_wavelength(wavelength) | ||
self._initialize_measurement(conf) | ||
|
||
def _get_single_measurement(self, wavelength: int) -> List[float]: | ||
initialized = self._current_config | ||
assert initialized and initialized.sample_wavelength == wavelength | ||
return self._single_measurement(initialized) | ||
|
||
async def open(self) -> None: | ||
""" | ||
Open the connection. | ||
Returns: None | ||
""" | ||
return await self._loop.run_in_executor( | ||
executor=self._executor, func=self._open | ||
) | ||
|
||
async def close(self) -> None: | ||
""" | ||
Close the connection | ||
Returns: None | ||
""" | ||
await self._loop.run_in_executor(executor=self._executor, func=self._free) | ||
|
||
async def is_open(self) -> bool: | ||
""" | ||
Check if connection is open. | ||
Returns: boolean | ||
""" | ||
return self._device_handle is not None | ||
|
||
async def get_device_static_info(self) -> Dict[str, str]: | ||
return { | ||
"serial": self._device.sn, | ||
"model": "ABS96", | ||
} | ||
|
||
async def get_device_information(self) -> Dict[str, str]: | ||
device_info = await self._loop.run_in_executor( | ||
executor=self._executor, func=self._get_device_information | ||
) | ||
return { | ||
"serial_number": device_info.sn, | ||
"reference_number": device_info.ref_no, | ||
"version": device_info.version, | ||
} | ||
|
||
async def get_lid_status(self) -> AbsorbanceReaderLidStatus: | ||
lid_info = await self._loop.run_in_executor( | ||
executor=self._executor, func=self._get_lid_status | ||
) | ||
return ( | ||
AbsorbanceReaderLidStatus.ON if lid_info else AbsorbanceReaderLidStatus.OFF | ||
) | ||
|
||
async def get_supported_wavelengths(self) -> list[int]: | ||
return await self._loop.run_in_executor( | ||
executor=self._executor, func=self._get_supported_wavelengths | ||
) | ||
|
||
async def initialize(self, wavelength: int) -> None: | ||
return await self._loop.run_in_executor( | ||
executor=self._executor, func=partial(self._initialize, wavelength) | ||
) | ||
|
||
async def get_single_measurement(self, wavelength: int) -> List[float]: | ||
return await self._loop.run_in_executor( | ||
executor=self._executor, | ||
func=partial(self._get_single_measurement, wavelength), | ||
) | ||
|
||
async def get_plate_presence(self) -> AbsorbanceReaderPlatePresence: | ||
return AbsorbanceReaderPlatePresence.UNKNOWN |
Oops, something went wrong.