Skip to content

Commit

Permalink
feat(api): Update absorbance reader driver namespace for byonoy_devic…
Browse files Browse the repository at this point in the history
…es library version 2024.9.0 (#16289)
  • Loading branch information
vegano1 committed Sep 23, 2024
1 parent cd0447e commit f55ec63
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 112 deletions.
58 changes: 24 additions & 34 deletions api/src/opentrons/drivers/absorbance_reader/async_byonoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ async def create(
loop = loop or asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=1)

import pybyonoy_device_library as byonoy # type: ignore[import-not-found]
import byonoy_devices as byonoy # type: ignore[import-not-found]

interface: AbsProtocol = byonoy

device_sn = cls.serial_number_from_port(usb_port.name)
found: List[AbsProtocol.Device] = await loop.run_in_executor(
executor=executor, func=byonoy.byonoy_available_devices
executor=executor, func=byonoy.available_devices
)
device = cls.match_device_with_sn(device_sn, found)

Expand Down Expand Up @@ -123,7 +123,7 @@ async def open(self) -> bool:

err, device_handle = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_open_device, self._device),
func=partial(self._interface.open_device, self._device),
)
self._raise_if_error(err.name, f"Error opening device: {err}")
self._device_handle = device_handle
Expand All @@ -134,7 +134,7 @@ async def close(self) -> None:
handle = self._verify_device_handle()
await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_free_device, handle),
func=partial(self._interface.free_device, handle),
)
self._device_handle = None

Expand All @@ -145,15 +145,15 @@ async def is_open(self) -> bool:
handle = self._verify_device_handle()
return await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_device_open, handle),
func=partial(self._interface.device_open, handle),
)

async def get_device_information(self) -> Dict[str, str]:
"""Get serial number and version info."""
handle = self._verify_device_handle()
err, device_info = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_get_device_information, handle),
func=partial(self._interface.get_device_information, handle),
)
self._raise_if_error(err.name, f"Error getting device information: {err}")
serial_match = SERIAL_PARSER.match(device_info.sn)
Expand All @@ -172,7 +172,7 @@ async def get_device_status(self) -> AbsorbanceReaderDeviceState:
handle = self._verify_device_handle()
err, status = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_get_device_status, handle),
func=partial(self._interface.get_device_status, handle),
)
self._raise_if_error(err.name, f"Error getting device status: {err}")
return self.convert_device_state(status.name)
Expand All @@ -184,11 +184,9 @@ async def update_firmware(self, firmware_file_path: str) -> Tuple[bool, str]:
return False, f"Firmware file not found: {firmware_file_path}"
err = await self._loop.run_in_executor(
executor=self._executor,
func=partial(
self._interface.byonoy_update_device, handle, firmware_file_path
),
func=partial(self._interface.update_device, handle, firmware_file_path),
)
if err.name != "BYONOY_ERROR_NO_ERROR":
if err.name != "NO_ERROR":
return False, f"Byonoy update failed with error: {err}"
return True, ""

Expand All @@ -197,7 +195,7 @@ async def get_device_uptime(self) -> int:
handle = self._verify_device_handle()
err, uptime = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_get_device_uptime, handle),
func=partial(self._interface.get_device_uptime, handle),
)
self._raise_if_error(err.name, "Error getting device uptime: ")
return uptime
Expand All @@ -207,7 +205,7 @@ async def get_lid_status(self) -> AbsorbanceReaderLidStatus:
handle = self._verify_device_handle()
err, lid_info = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_get_device_parts_aligned, handle),
func=partial(self._interface.get_device_parts_aligned, handle),
)
self._raise_if_error(err.name, f"Error getting lid status: {err}")
return (
Expand All @@ -219,9 +217,7 @@ async def get_supported_wavelengths(self) -> list[int]:
handle = self._verify_device_handle()
err, wavelengths = await self._loop.run_in_executor(
executor=self._executor,
func=partial(
self._interface.byonoy_abs96_get_available_wavelengths, handle
),
func=partial(self._interface.abs96_get_available_wavelengths, handle),
)
self._raise_if_error(err.name, "Error getting available wavelengths: ")
self._supported_wavelengths = wavelengths
Expand All @@ -233,9 +229,9 @@ async def get_measurement(self) -> List[List[float]]:
assert (
self._current_config is not None
), "Cannot get measurement without initializing."
measure_func: Any = self._interface.byonoy_abs96_single_measure
measure_func: Any = self._interface.abs96_single_measure
if isinstance(self._current_config, AbsProtocol.MultiMeasurementConfig):
measure_func = self._interface.byonoy_abs96_multiple_measure
measure_func = self._interface.abs96_multiple_measure
err, measurements = await self._loop.run_in_executor(
executor=self._executor,
func=partial(
Expand All @@ -252,31 +248,25 @@ async def get_plate_presence(self) -> AbsorbanceReaderPlatePresence:
handle = self._verify_device_handle()
err, presence = await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._interface.byonoy_get_device_slot_status, handle),
func=partial(self._interface.get_device_slot_status, handle),
)
self._raise_if_error(err.name, f"Error getting slot status: {err}")
return self.convert_plate_presence(presence.name)

def _get_supported_wavelengths(self) -> List[int]:
handle = self._verify_device_handle()
wavelengths: List[int]
err, wavelengths = self._interface.byonoy_abs96_get_available_wavelengths(
handle
)
err, wavelengths = self._interface.abs96_get_available_wavelengths(handle)
self._raise_if_error(err.name, f"Error getting available wavelengths: {err}")
self._supported_wavelengths = wavelengths
return wavelengths

def _initialize_measurement(self, conf: MeasurementConfig) -> None:
handle = self._verify_device_handle()
if isinstance(conf, AbsProtocol.SingleMeasurementConfig):
err = self._interface.byonoy_abs96_initialize_single_measurement(
handle, conf
)
err = self._interface.abs96_initialize_single_measurement(handle, conf)
else:
err = self._interface.byonoy_abs96_initialize_multiple_measurement(
handle, conf
)
err = self._interface.abs96_initialize_multiple_measurement(handle, conf)
self._raise_if_error(err.name, f"Error initializing measurement: {err}")
self._current_config = conf

Expand All @@ -292,11 +282,11 @@ def _initialize(
conf: MeasurementConfig
if set(wavelengths).issubset(self._supported_wavelengths):
if mode == ABSMeasurementMode.SINGLE:
conf = self._interface.ByonoyAbs96SingleMeasurementConfig()
conf = self._interface.Abs96SingleMeasurementConfig()
conf.sample_wavelength = wavelengths[0] or 0
conf.reference_wavelength = reference_wavelength or 0
else:
conf = self._interface.ByonoyAbs96MultipleMeasurementConfig()
conf = self._interface.Abs96MultipleMeasurementConfig()
conf.sample_wavelengths = wavelengths
else:
raise ValueError(
Expand Down Expand Up @@ -328,12 +318,12 @@ def _raise_if_error(
msg: str = "Error occurred: ",
) -> None:
if err_name in [
"BYONOY_ERROR_DEVICE_CLOSED",
"BYONOY_ERROR_DEVICE_COMMUNICATION_FAILURE",
"BYONOY_ERROR_UNSUPPORTED_OPERATION",
"DEVICE_CLOSED",
"DEVICE_COMMUNICATION_FAILURE",
"UNSUPPORTED_OPERATION",
]:
raise AbsorbanceReaderDisconnectedError(self._device.sn)
if err_name != "BYONOY_ERROR_NO_ERROR":
if err_name != "NO_ERROR":
raise RuntimeError(msg, err_name)

@staticmethod
Expand Down
86 changes: 39 additions & 47 deletions api/src/opentrons/drivers/absorbance_reader/hid_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@
Response = TypeVar("Response")

ErrorCodeNames = Literal[
"BYONOY_ERROR_NO_ERROR",
"BYONOY_ERROR_UNKNOWN_ERROR",
"BYONOY_ERROR_DEVICE_CLOSED",
"BYONOY_ERROR_INVALID_ARGUMENT",
"BYONOY_ERROR_NO_MEMORY",
"BYONOY_ERROR_UNSUPPORTED_OPERATION",
"BYONOY_ERROR_DEVICE_COMMUNICATION_FAILURE",
"BYONOY_ERROR_DEVICE_OPERATION_FAILED",
"BYONOY_ERROR_DEVICE_OPEN_PREFIX",
"BYONOY_ERROR_DEVICE_NOT_FOUND",
"BYONOY_ERROR_DEVICE_TOO_NEW",
"BYONOY_ERROR_DEVICE_ALREADY_OPEN",
"BYONOY_ERROR_FIRMWARE_UPDATE_ERROR_PREFIX",
"BYONOY_ERROR_FIRMWARE_UPDATE_FILE_NOT_FOUND",
"BYONOY_ERROR_FIRMWARE_UPDATE_FILE_NOT_VALID",
"BYONOY_ERROR_FIRMWARE_UPDATE_FAILED",
"BYONOY_ERROR_FILE_ERROR_PREFIX",
"BYONOY_ERROR_FILE_WRITE_ERROR",
"BYONOY_ERROR_MEASUTEMNT_ERROR_PREFIX",
"BYONOY_ERROR_MEASUTEMNT_SLOT_NOT_EMPTY",
"BYONOY_ERROR_NOT_INITIALIZED",
"BYONOY_ERROR_INTERNAL",
"NO_ERROR",
"UNKNOWN_ERROR",
"DEVICE_CLOSED",
"INVALID_ARGUMENT",
"NO_MEMORY",
"UNSUPPORTED_OPERATION",
"DEVICE_COMMUNICATION_FAILURE",
"DEVICE_OPERATION_FAILED",
"DEVICE_OPEN_PREFIX",
"DEVICE_NOT_FOUND",
"DEVICE_TOO_NEW",
"DEVICE_ALREADY_OPEN",
"FIRMWARE_UPDATE_ERROR_PREFIX",
"FIRMWARE_UPDATE_FILE_NOT_FOUND",
"FIRMWARE_UPDATE_FILE_NOT_VALID",
"FIRMWARE_UPDATE_FAILED",
"FILE_ERROR_PREFIX",
"FILE_WRITE_ERROR",
"MEASUTEMNT_ERROR_PREFIX",
"MEASUTEMNT_SLOT_NOT_EMPTY",
"NOT_INITIALIZED",
"INTERNAL",
]

SlotStateNames = Literal[
Expand Down Expand Up @@ -91,75 +91,67 @@ class DeviceState(Protocol):
name: DeviceStateNames
value: int

def ByonoyAbs96SingleMeasurementConfig(self) -> SingleMeasurementConfig:
def Abs96SingleMeasurementConfig(self) -> SingleMeasurementConfig:
...

def ByonoyAbs96MultipleMeasurementConfig(self) -> MultiMeasurementConfig:
def Abs96MultipleMeasurementConfig(self) -> MultiMeasurementConfig:
...

def byonoy_open_device(self, device: Device) -> Tuple[ErrorCode, int]:
def open_device(self, device: Device) -> Tuple[ErrorCode, int]:
...

def byonoy_free_device(self, device_handle: int) -> Tuple[ErrorCode, bool]:
def free_device(self, device_handle: int) -> Tuple[ErrorCode, bool]:
...

def byonoy_device_open(self, device_handle: int) -> bool:
def device_open(self, device_handle: int) -> bool:
...

def byonoy_get_device_information(
def get_device_information(
self, device_handle: int
) -> Tuple[ErrorCode, DeviceInfo]:
...

def byonoy_update_device(
self, device_handle: int, firmware_file_path: str
) -> ErrorCode:
def update_device(self, device_handle: int, firmware_file_path: str) -> ErrorCode:
...

def byonoy_get_device_status(
self, device_handle: int
) -> Tuple[ErrorCode, DeviceState]:
def get_device_status(self, device_handle: int) -> Tuple[ErrorCode, DeviceState]:
...

def byonoy_get_device_uptime(self, device_handle: int) -> Tuple[ErrorCode, int]:
def get_device_uptime(self, device_handle: int) -> Tuple[ErrorCode, int]:
...

def byonoy_get_device_slot_status(
self, device_handle: int
) -> Tuple[ErrorCode, SlotState]:
def get_device_slot_status(self, device_handle: int) -> Tuple[ErrorCode, SlotState]:
...

def byonoy_get_device_parts_aligned(
self, device_handle: int
) -> Tuple[ErrorCode, bool]:
def get_device_parts_aligned(self, device_handle: int) -> Tuple[ErrorCode, bool]:
...

def byonoy_abs96_get_available_wavelengths(
def abs96_get_available_wavelengths(
self, device_handle: int
) -> Tuple[ErrorCode, List[int]]:
...

def byonoy_abs96_initialize_single_measurement(
def abs96_initialize_single_measurement(
self, device_handle: int, conf: SingleMeasurementConfig
) -> ErrorCode:
...

def byonoy_abs96_initialize_multiple_measurement(
def abs96_initialize_multiple_measurement(
self, device_handle: int, conf: MultiMeasurementConfig
) -> ErrorCode:
...

def byonoy_abs96_single_measure(
def abs96_single_measure(
self, device_handle: int, conf: SingleMeasurementConfig
) -> Tuple[ErrorCode, List[float]]:
...

def byonoy_abs96_multiple_measure(
def abs96_multiple_measure(
self, device_handle: int, conf: MultiMeasurementConfig
) -> Tuple[ErrorCode, List[List[float]]]:
...

def byonoy_available_devices(self) -> List[Device]:
def available_devices(self) -> List[Device]:
...


Expand Down
Loading

0 comments on commit f55ec63

Please sign in to comment.