Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): driver and simulator for FLEX Stacker #17120

Open
wants to merge 4 commits into
base: edge
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ async def create(
alarm_keyword: Optional[str] = None,
reset_buffer_before_write: bool = False,
async_error_ack: Optional[str] = None,
number_of_retries: int = 0,
) -> AsyncResponseSerialConnection:
"""
Create a connection.
Expand Down Expand Up @@ -340,6 +341,7 @@ async def create(
error_keyword=error_keyword or "err",
alarm_keyword=alarm_keyword or "alarm",
async_error_ack=async_error_ack or "async",
number_of_retries=number_of_retries,
)

def __init__(
Expand All @@ -352,6 +354,7 @@ def __init__(
error_keyword: str,
alarm_keyword: str,
async_error_ack: str,
number_of_retries: int = 0,
) -> None:
"""
Constructor
Expand Down Expand Up @@ -383,6 +386,7 @@ def __init__(
self._name = name
self._ack = ack.encode()
self._retry_wait_time_seconds = retry_wait_time_seconds
self._number_of_retries = number_of_retries
self._error_keyword = error_keyword.lower()
self._alarm_keyword = alarm_keyword.lower()
self._async_error_ack = async_error_ack.lower()
Expand All @@ -403,7 +407,9 @@ async def send_command(
Raises: SerialException
"""
return await self.send_data(
data=command.build(), retries=retries, timeout=timeout
data=command.build(),
retries=retries or self._number_of_retries,
timeout=timeout,
)

async def send_data(
Expand All @@ -424,7 +430,9 @@ async def send_data(
async with super().send_data_lock, self._serial.timeout_override(
"timeout", timeout
):
return await self._send_data(data=data, retries=retries)
return await self._send_data(
data=data, retries=retries or self._number_of_retries
)

async def _send_data(self, data: str, retries: int = 0) -> str:
"""
Expand All @@ -439,6 +447,7 @@ async def _send_data(self, data: str, retries: int = 0) -> str:
Raises: SerialException
"""
data_encode = data.encode()
retries = retries or self._number_of_retries

for retry in range(retries + 1):
log.debug(f"{self._name}: Write -> {data_encode!r}")
Expand Down
2 changes: 1 addition & 1 deletion api/src/opentrons/drivers/command_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class CommandBuilder:
"""Class used to build GCODE commands."""

def __init__(self, terminator: str) -> None:
def __init__(self, terminator: str = "\n") -> None:
"""
Construct a command builder.

Expand Down
9 changes: 9 additions & 0 deletions api/src/opentrons/drivers/flex_stacker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .abstract import StackerDriver
from .driver import FlexStackerDriver
from .simulator import SimulatingDriver

__all__ = [
"StackerDriver",
"FlexStackerDriver",
"SimulatingDriver",
]
72 changes: 72 additions & 0 deletions api/src/opentrons/drivers/flex_stacker/abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import Protocol

from .types import StackerAxis, PlatformStatus, Direction, MoveParams, StackerInfo


class StackerDriver(Protocol):
"""Protocol for the Stacker driver."""

async def connect(self) -> None:
"""Connect to stacker."""
...

async def disconnect(self) -> None:
"""Disconnect from stacker."""
...

async def is_connected(self) -> bool:
"""Check connection to stacker."""
...

async def update_firmware(self, firmware_file_path: str) -> None:
"""Updates the firmware on the device."""
...

async def get_device_info(self) -> StackerInfo:
"""Get Device Info."""
...

async def set_serial_number(self, sn: str) -> None:
"""Set Serial Number."""
...

async def stop_motor(self) -> None:
"""Stop motor movement."""
...

async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
"""Get limit switch status.

:return: True if limit switch is triggered, False otherwise
"""
...

async def get_platform_sensor(self, direction: Direction) -> bool:
"""Get platform sensor status.

:return: True if platform is present, False otherwise
"""
...

async def get_platform_status(self) -> PlatformStatus:
"""Get platform status."""
...

async def get_hopper_door_closed(self) -> bool:
"""Get whether or not door is closed.

:return: True if door is closed, False otherwise
"""
...

async def move_in_mm(
self, axis: StackerAxis, distance: float, params: MoveParams | None = None
) -> None:
"""Move axis."""
...

async def move_to_limit_switch(
self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None
) -> None:
"""Move until limit switch is triggered."""
...
203 changes: 203 additions & 0 deletions api/src/opentrons/drivers/flex_stacker/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import asyncio
import re
from typing import Optional

from opentrons.drivers.command_builder import CommandBuilder
from opentrons.drivers.asyncio.communication import AsyncResponseSerialConnection

from .abstract import StackerDriver
from .types import (
GCODE,
StackerAxis,
PlatformStatus,
Direction,
StackerInfo,
HardwareRevision,
MoveParams,
LimitSwitchStatus,
)


FS_BAUDRATE = 115200
DEFAULT_FS_TIMEOUT = 40
FS_ACK = "OK\n"
FS_ERROR_KEYWORD = "err"
FS_ASYNC_ERROR_ACK = "async"
DEFAULT_COMMAND_RETRIES = 0
GCODE_ROUNDING_PRECISION = 2


class FlexStackerDriver(StackerDriver):
"""FLEX Stacker driver."""

@classmethod
def parse_device_info(cls, response: str) -> StackerInfo:
"""Parse stacker info."""
_RE = re.compile(
f"^{GCODE.DEVICE_INFO} FW:(?P<fw>.+) HW:Opentrons-flex-stacker-(?P<hw>.+) SerialNo:(?P<sn>.+)"
)
m = _RE.match(response)
if not m:
raise ValueError(f"Incorrect Response for device info: {response}")
return StackerInfo(
m.group("fw"), HardwareRevision(m.group("hw")), m.group("sn")
)

@classmethod
def parse_limit_switch_status(cls, response: str) -> LimitSwitchStatus:
"""Parse limit switch statuses."""
field_names = LimitSwitchStatus.get_fields()
pattern = r"/s".join([rf"{name}:(?P<{name}>\d)" for name in field_names])
_RE = re.compile(f"^{GCODE.GET_LIMIT_SWITCH} {pattern}")
m = _RE.match(response)
if not m:
raise ValueError(f"Incorrect Response for limit siwtch status: {response}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise ValueError(f"Incorrect Response for limit siwtch status: {response}")
raise ValueError(f"Incorrect Response for limit switch status: {response}")

return LimitSwitchStatus(*(bool(int(m.group(name))) for name in field_names))

@classmethod
def parse_platform_sensor_status(cls, response: str) -> PlatformStatus:
"""Parse platform statuses."""
field_names = PlatformStatus.get_fields()
pattern = r"/s".join([rf"{name}:(?P<{name}>\d)" for name in field_names])
_RE = re.compile(f"^{GCODE.GET_PLATFORM_SENSOR} {pattern}")
m = _RE.match(response)
if not m:
raise ValueError(f"Incorrect Response for platform status: {response}")
return PlatformStatus(*(bool(int(m.group(name))) for name in field_names))

@classmethod
def parse_door_closed(cls, response: str) -> bool:
"""Parse door closed."""
_RE = re.compile(r"^M122 (\d)")
match = _RE.match(response)
if not match:
raise ValueError(f"Incorrect Response for door closed: {response}")
return bool(int(match.group(1)))

@classmethod
def append_move_params(
cls, command: CommandBuilder, params: MoveParams | None
) -> CommandBuilder:
"""Append move params."""
if params is not None:
if params.max_speed is not None:
command.add_float("V", params.max_speed, GCODE_ROUNDING_PRECISION)
if params.acceleration is not None:
command.add_float("A", params.acceleration, GCODE_ROUNDING_PRECISION)
if params.max_speed_discont is not None:
command.add_float(
"D", params.max_speed_discont, GCODE_ROUNDING_PRECISION
)
return command

@classmethod
async def create(
cls, port: str, loop: Optional[asyncio.AbstractEventLoop]
) -> "FlexStackerDriver":
"""Create a FLEX Stacker driver."""
connection = await AsyncResponseSerialConnection.create(
port=port,
baud_rate=FS_BAUDRATE,
timeout=DEFAULT_FS_TIMEOUT,
number_of_retries=DEFAULT_COMMAND_RETRIES,
ack=FS_ACK,
loop=loop,
error_keyword=FS_ERROR_KEYWORD,
async_error_ack=FS_ASYNC_ERROR_ACK,
)
return cls(connection)

def __init__(self, connection: AsyncResponseSerialConnection) -> None:
"""
Constructor

Args:
connection: Connection to the FLEX Stacker
"""
self._connection = connection

async def connect(self) -> None:
"""Connect to stacker."""
await self._connection.open()

async def disconnect(self) -> None:
"""Disconnect from stacker."""
await self._connection.close()

async def is_connected(self) -> bool:
"""Check connection to stacker."""
return await self._connection.is_open()

async def get_device_info(self) -> StackerInfo:
"""Get Device Info."""
response = await self._connection.send_command(
GCODE.DEVICE_INFO.build_command()
)
return self.parse_device_info(response)

async def set_serial_number(self, sn: str) -> None:
"""Set Serial Number."""
await self._connection.send_command(
GCODE.SET_SERIAL_NUMBER.build_command().add_element(sn)
)

async def stop_motor(self) -> None:
"""Stop motor movement."""
await self._connection.send_command(GCODE.STOP_MOTOR.build_command())

async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
"""Get limit switch status.

:return: True if limit switch is triggered, False otherwise
"""
response = await self.get_limit_switches_status()
return response.get(axis, direction)

async def get_limit_switches_status(self) -> LimitSwitchStatus:
"""Get limit switch statuses for all axes."""
response = await self._connection.send_command(
GCODE.GET_LIMIT_SWITCH.build_command()
)
return self.parse_limit_switch_status(response)

async def get_platform_sensor_status(self) -> PlatformStatus:
"""Get platform sensor status.

:return: True if platform is detected, False otherwise
"""
response = await self._connection.send_command(
GCODE.GET_PLATFORM_SENSOR.build_command()
)
return self.parse_platform_sensor_status(response)

async def get_hopper_door_closed(self) -> bool:
"""Get whether or not door is closed.

:return: True if door is closed, False otherwise
"""
response = await self._connection.send_command(
GCODE.GET_DOOR_SWITCH.build_command()
)
return self.parse_door_closed(response)

async def move_in_mm(
self, axis: StackerAxis, distance: float, params: MoveParams | None = None
) -> None:
"""Move axis."""
command = self.append_move_params(
GCODE.MOVE_TO.build_command().add_float(
axis.name, distance, GCODE_ROUNDING_PRECISION
),
params,
)
await self._connection.send_command(command)

async def move_to_limit_switch(
self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None
) -> None:
"""Move until limit switch is triggered."""
command = self.append_move_params(
GCODE.MOVE_TO_SWITCH.build_command().add_int(axis.name, direction.value),
params,
)
await self._connection.send_command(command)
Loading
Loading