diff --git a/api/src/opentrons/drivers/asyncio/communication/serial_connection.py b/api/src/opentrons/drivers/asyncio/communication/serial_connection.py index 294e5779a7b..f925cfe8680 100644 --- a/api/src/opentrons/drivers/asyncio/communication/serial_connection.py +++ b/api/src/opentrons/drivers/asyncio/communication/serial_connection.py @@ -298,6 +298,7 @@ async def create( alarm_keyword: Optional[str] = None, reset_buffer_before_write: bool = False, async_error_ack: Optional[str] = None, + number_of_retries: int = 0, ) -> AsyncResponseSerialConnection: """ Create a connection. @@ -340,6 +341,7 @@ async def create( error_keyword=error_keyword or "err", alarm_keyword=alarm_keyword or "alarm", async_error_ack=async_error_ack or "async", + number_of_retries=number_of_retries, ) def __init__( @@ -352,6 +354,7 @@ def __init__( error_keyword: str, alarm_keyword: str, async_error_ack: str, + number_of_retries: int = 0, ) -> None: """ Constructor @@ -383,6 +386,7 @@ def __init__( self._name = name self._ack = ack.encode() self._retry_wait_time_seconds = retry_wait_time_seconds + self._number_of_retries = number_of_retries self._error_keyword = error_keyword.lower() self._alarm_keyword = alarm_keyword.lower() self._async_error_ack = async_error_ack.lower() @@ -403,7 +407,9 @@ async def send_command( Raises: SerialException """ return await self.send_data( - data=command.build(), retries=retries, timeout=timeout + data=command.build(), + retries=retries or self._number_of_retries, + timeout=timeout, ) async def send_data( @@ -424,7 +430,9 @@ async def send_data( async with super().send_data_lock, self._serial.timeout_override( "timeout", timeout ): - return await self._send_data(data=data, retries=retries) + return await self._send_data( + data=data, retries=retries or self._number_of_retries + ) async def _send_data(self, data: str, retries: int = 0) -> str: """ @@ -439,6 +447,7 @@ async def _send_data(self, data: str, retries: int = 0) -> str: Raises: SerialException """ data_encode = data.encode() + retries = retries or self._number_of_retries for retry in range(retries + 1): log.debug(f"{self._name}: Write -> {data_encode!r}") diff --git a/api/src/opentrons/drivers/command_builder.py b/api/src/opentrons/drivers/command_builder.py index 99ac5c7890c..ea90a12b946 100644 --- a/api/src/opentrons/drivers/command_builder.py +++ b/api/src/opentrons/drivers/command_builder.py @@ -6,7 +6,7 @@ class CommandBuilder: """Class used to build GCODE commands.""" - def __init__(self, terminator: str) -> None: + def __init__(self, terminator: str = "\n") -> None: """ Construct a command builder. @@ -17,7 +17,7 @@ def __init__(self, terminator: str) -> None: self._elements: List[str] = [] def add_float( - self, prefix: str, value: float, precision: Optional[int] + self, prefix: str, value: float, precision: Optional[int] = None ) -> CommandBuilder: """ Add a float value. diff --git a/api/src/opentrons/drivers/flex_stacker/__init__.py b/api/src/opentrons/drivers/flex_stacker/__init__.py new file mode 100644 index 00000000000..cd4866c179a --- /dev/null +++ b/api/src/opentrons/drivers/flex_stacker/__init__.py @@ -0,0 +1,9 @@ +from .abstract import AbstractStackerDriver +from .driver import FlexStackerDriver +from .simulator import SimulatingDriver + +__all__ = [ + "AbstractStackerDriver", + "FlexStackerDriver", + "SimulatingDriver", +] diff --git a/api/src/opentrons/drivers/flex_stacker/abstract.py b/api/src/opentrons/drivers/flex_stacker/abstract.py new file mode 100644 index 00000000000..5ba3cdcb026 --- /dev/null +++ b/api/src/opentrons/drivers/flex_stacker/abstract.py @@ -0,0 +1,89 @@ +from typing import Protocol + +from .types import ( + StackerAxis, + PlatformStatus, + Direction, + MoveParams, + StackerInfo, + LEDColor, +) + + +class AbstractStackerDriver(Protocol): + """Protocol for the Stacker driver.""" + + async def connect(self) -> None: + """Connect to stacker.""" + ... + + async def disconnect(self) -> None: + """Disconnect from stacker.""" + ... + + async def is_connected(self) -> bool: + """Check connection to stacker.""" + ... + + async def update_firmware(self, firmware_file_path: str) -> None: + """Updates the firmware on the device.""" + ... + + async def get_device_info(self) -> StackerInfo: + """Get Device Info.""" + ... + + async def set_serial_number(self, sn: str) -> bool: + """Set Serial Number.""" + ... + + async def stop_motors(self) -> bool: + """Stop all motor movement.""" + ... + + async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool: + """Get limit switch status. + + :return: True if limit switch is triggered, False otherwise + """ + ... + + async def get_platform_sensor(self, direction: Direction) -> bool: + """Get platform sensor status. + + :return: True if platform is present, False otherwise + """ + ... + + async def get_platform_status(self) -> PlatformStatus: + """Get platform status.""" + ... + + async def get_hopper_door_closed(self) -> bool: + """Get whether or not door is closed. + + :return: True if door is closed, False otherwise + """ + ... + + async def move_in_mm( + self, axis: StackerAxis, distance: float, params: MoveParams | None = None + ) -> bool: + """Move axis.""" + ... + + async def move_to_limit_switch( + self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None + ) -> bool: + """Move until limit switch is triggered.""" + ... + + async def home_axis(self, axis: StackerAxis, direction: Direction) -> bool: + """Home axis.""" + ... + + async def set_led( + self, power: float, color: LEDColor | None = None, external: bool | None = None + ) -> bool: + """Set LED color of status bar.""" + ... diff --git a/api/src/opentrons/drivers/flex_stacker/driver.py b/api/src/opentrons/drivers/flex_stacker/driver.py new file mode 100644 index 00000000000..83671023772 --- /dev/null +++ b/api/src/opentrons/drivers/flex_stacker/driver.py @@ -0,0 +1,260 @@ +import asyncio +import re +from typing import Optional + +from opentrons.drivers.command_builder import CommandBuilder +from opentrons.drivers.asyncio.communication import AsyncResponseSerialConnection + +from .abstract import AbstractStackerDriver +from .types import ( + GCODE, + StackerAxis, + PlatformStatus, + Direction, + StackerInfo, + HardwareRevision, + MoveParams, + LimitSwitchStatus, + LEDColor, +) + + +FS_BAUDRATE = 115200 +DEFAULT_FS_TIMEOUT = 40 +FS_ACK = "OK\n" +FS_ERROR_KEYWORD = "err" +FS_ASYNC_ERROR_ACK = "async" +DEFAULT_COMMAND_RETRIES = 0 +GCODE_ROUNDING_PRECISION = 2 + + +class FlexStackerDriver(AbstractStackerDriver): + """FLEX Stacker driver.""" + + @classmethod + def parse_device_info(cls, response: str) -> StackerInfo: + """Parse stacker info.""" + # TODO: Validate serial number format once established + _RE = re.compile( + f"^{GCODE.DEVICE_INFO} FW:(?P\\S+) HW:Opentrons-flex-stacker-(?P\\S+) SerialNo:(?P\\S+)$" + ) + m = _RE.match(response) + if not m: + raise ValueError(f"Incorrect Response for device info: {response}") + return StackerInfo( + m.group("fw"), HardwareRevision(m.group("hw")), m.group("sn") + ) + + @classmethod + def parse_limit_switch_status(cls, response: str) -> LimitSwitchStatus: + """Parse limit switch statuses.""" + field_names = LimitSwitchStatus.get_fields() + pattern = r"\s".join([rf"{name}:(?P<{name}>\d)" for name in field_names]) + _RE = re.compile(f"^{GCODE.GET_LIMIT_SWITCH} {pattern}$") + m = _RE.match(response) + if not m: + raise ValueError(f"Incorrect Response for limit switch status: {response}") + return LimitSwitchStatus(*(bool(int(m.group(name))) for name in field_names)) + + @classmethod + def parse_platform_sensor_status(cls, response: str) -> PlatformStatus: + """Parse platform statuses.""" + field_names = PlatformStatus.get_fields() + pattern = r"\s".join([rf"{name}:(?P<{name}>\d)" for name in field_names]) + _RE = re.compile(f"^{GCODE.GET_PLATFORM_SENSOR} {pattern}$") + m = _RE.match(response) + if not m: + raise ValueError(f"Incorrect Response for platform status: {response}") + return PlatformStatus(*(bool(int(m.group(name))) for name in field_names)) + + @classmethod + def parse_door_closed(cls, response: str) -> bool: + """Parse door closed.""" + _RE = re.compile(r"^M122 D:(\d)$") + match = _RE.match(response) + if not match: + raise ValueError(f"Incorrect Response for door closed: {response}") + return bool(int(match.group(1))) + + @classmethod + def append_move_params( + cls, command: CommandBuilder, params: MoveParams | None + ) -> CommandBuilder: + """Append move params.""" + if params is not None: + if params.max_speed is not None: + command.add_float("V", params.max_speed, GCODE_ROUNDING_PRECISION) + if params.acceleration is not None: + command.add_float("A", params.acceleration, GCODE_ROUNDING_PRECISION) + if params.max_speed_discont is not None: + command.add_float( + "D", params.max_speed_discont, GCODE_ROUNDING_PRECISION + ) + return command + + @classmethod + async def create( + cls, port: str, loop: Optional[asyncio.AbstractEventLoop] + ) -> "FlexStackerDriver": + """Create a FLEX Stacker driver.""" + connection = await AsyncResponseSerialConnection.create( + port=port, + baud_rate=FS_BAUDRATE, + timeout=DEFAULT_FS_TIMEOUT, + number_of_retries=DEFAULT_COMMAND_RETRIES, + ack=FS_ACK, + loop=loop, + error_keyword=FS_ERROR_KEYWORD, + async_error_ack=FS_ASYNC_ERROR_ACK, + ) + return cls(connection) + + def __init__(self, connection: AsyncResponseSerialConnection) -> None: + """ + Constructor + + Args: + connection: Connection to the FLEX Stacker + """ + self._connection = connection + + async def connect(self) -> None: + """Connect to stacker.""" + await self._connection.open() + + async def disconnect(self) -> None: + """Disconnect from stacker.""" + await self._connection.close() + + async def is_connected(self) -> bool: + """Check connection to stacker.""" + return await self._connection.is_open() + + async def get_device_info(self) -> StackerInfo: + """Get Device Info.""" + response = await self._connection.send_command( + GCODE.DEVICE_INFO.build_command() + ) + await self._connection.send_command(GCODE.GET_RESET_REASON.build_command()) + return self.parse_device_info(response) + + async def set_serial_number(self, sn: str) -> bool: + """Set Serial Number.""" + # TODO: validate the serial number format + resp = await self._connection.send_command( + GCODE.SET_SERIAL_NUMBER.build_command().add_element(sn) + ) + if not re.match(rf"^{GCODE.SET_SERIAL_NUMBER}$", resp): + raise ValueError(f"Incorrect Response for set serial number: {resp}") + return True + + async def stop_motors(self) -> bool: + """Stop all motor movement.""" + resp = await self._connection.send_command(GCODE.STOP_MOTORS.build_command()) + if not re.match(rf"^{GCODE.STOP_MOTORS}$", resp): + raise ValueError(f"Incorrect Response for stop motors: {resp}") + return True + + async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool: + """Get limit switch status. + + :return: True if limit switch is triggered, False otherwise + """ + response = await self.get_limit_switches_status() + return response.get(axis, direction) + + async def get_limit_switches_status(self) -> LimitSwitchStatus: + """Get limit switch statuses for all axes.""" + response = await self._connection.send_command( + GCODE.GET_LIMIT_SWITCH.build_command() + ) + return self.parse_limit_switch_status(response) + + async def get_platform_sensor(self, direction: Direction) -> bool: + """Get platform sensor at one direction.""" + response = await self.get_platform_status() + return response.get(direction) + + async def get_platform_status(self) -> PlatformStatus: + """Get platform sensor status. + + :return: True if platform is detected, False otherwise + """ + response = await self._connection.send_command( + GCODE.GET_PLATFORM_SENSOR.build_command() + ) + return self.parse_platform_sensor_status(response) + + async def get_hopper_door_closed(self) -> bool: + """Get whether or not door is closed. + + :return: True if door is closed, False otherwise + """ + response = await self._connection.send_command( + GCODE.GET_DOOR_SWITCH.build_command() + ) + return self.parse_door_closed(response) + + async def move_in_mm( + self, axis: StackerAxis, distance: float, params: MoveParams | None = None + ) -> bool: + """Move axis.""" + command = self.append_move_params( + GCODE.MOVE_TO.build_command().add_float( + axis.name, distance, GCODE_ROUNDING_PRECISION + ), + params, + ) + resp = await self._connection.send_command(command) + if not re.match(rf"^{GCODE.MOVE_TO}$", resp): + raise ValueError(f"Incorrect Response for move to: {resp}") + return True + + async def move_to_limit_switch( + self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None + ) -> bool: + """Move until limit switch is triggered.""" + command = self.append_move_params( + GCODE.MOVE_TO_SWITCH.build_command().add_int(axis.name, direction.value), + params, + ) + resp = await self._connection.send_command(command) + if not re.match(rf"^{GCODE.MOVE_TO_SWITCH}$", resp): + raise ValueError(f"Incorrect Response for move to switch: {resp}") + return True + + async def home_axis(self, axis: StackerAxis, direction: Direction) -> bool: + """Home axis.""" + resp = await self._connection.send_command( + GCODE.HOME_AXIS.build_command().add_int(axis.name, direction.value) + ) + if not re.match(rf"^{GCODE.HOME_AXIS}$", resp): + raise ValueError(f"Incorrect Response for home axis: {resp}") + return True + + async def set_led( + self, power: float, color: LEDColor | None = None, external: bool | None = None + ) -> bool: + """Set LED color. + + :param power: Power of the LED (0-1.0), 0 is off, 1 is full power + :param color: Color of the LED + :param external: True if external LED, False if internal LED + """ + power = max(0, min(power, 1.0)) + command = GCODE.SET_LED.build_command().add_float( + "P", power, GCODE_ROUNDING_PRECISION + ) + if color is not None: + command.add_int("C", color.value) + if external is not None: + command.add_int("E", external) + resp = await self._connection.send_command(command) + if not re.match(rf"^{GCODE.SET_LED}$", resp): + raise ValueError(f"Incorrect Response for set led: {resp}") + return True + + async def update_firmware(self, firmware_file_path: str) -> None: + """Updates the firmware on the device.""" + # TODO: Implement firmware update + pass diff --git a/api/src/opentrons/drivers/flex_stacker/simulator.py b/api/src/opentrons/drivers/flex_stacker/simulator.py new file mode 100644 index 00000000000..1e0b59b19de --- /dev/null +++ b/api/src/opentrons/drivers/flex_stacker/simulator.py @@ -0,0 +1,109 @@ +from typing import Optional + +from opentrons.util.async_helpers import ensure_yield + +from .abstract import AbstractStackerDriver +from .types import ( + StackerAxis, + PlatformStatus, + Direction, + StackerInfo, + HardwareRevision, + MoveParams, + LimitSwitchStatus, +) + + +class SimulatingDriver(AbstractStackerDriver): + """FLEX Stacker driver simulator.""" + + def __init__(self, serial_number: Optional[str] = None) -> None: + self._sn = serial_number or "dummySerialFS" + self._limit_switch_status = LimitSwitchStatus(False, False, False, False, False) + self._platform_sensor_status = PlatformStatus(False, False) + self._door_closed = True + + def set_limit_switch(self, status: LimitSwitchStatus) -> bool: + self._limit_switch_status = status + return True + + def set_platform_sensor(self, status: PlatformStatus) -> bool: + self._platform_sensor_status = status + return True + + def set_door_closed(self, door_closed: bool) -> bool: + self._door_closed = door_closed + return True + + @ensure_yield + async def connect(self) -> None: + """Connect to stacker.""" + pass + + @ensure_yield + async def disconnect(self) -> None: + """Disconnect from stacker.""" + pass + + @ensure_yield + async def is_connected(self) -> bool: + """Check connection to stacker.""" + return True + + @ensure_yield + async def get_device_info(self) -> StackerInfo: + """Get Device Info.""" + return StackerInfo(fw="stacker-fw", hw=HardwareRevision.EVT, sn=self._sn) + + @ensure_yield + async def set_serial_number(self, sn: str) -> bool: + """Set Serial Number.""" + return True + + @ensure_yield + async def stop_motor(self) -> bool: + """Stop motor movement.""" + return True + + @ensure_yield + async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool: + """Get limit switch status. + + :return: True if limit switch is triggered, False otherwise + """ + return self._limit_switch_status.get(axis, direction) + + @ensure_yield + async def get_limit_switches_status(self) -> LimitSwitchStatus: + """Get limit switch statuses for all axes.""" + return self._limit_switch_status + + @ensure_yield + async def get_platform_sensor_status(self) -> PlatformStatus: + """Get platform sensor status. + + :return: True if platform is detected, False otherwise + """ + return self._platform_sensor_status + + @ensure_yield + async def get_hopper_door_closed(self) -> bool: + """Get whether or not door is closed. + + :return: True if door is closed, False otherwise + """ + return self._door_closed + + @ensure_yield + async def move_in_mm( + self, axis: StackerAxis, distance: float, params: MoveParams | None = None + ) -> bool: + """Move axis.""" + return True + + @ensure_yield + async def move_to_limit_switch( + self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None + ) -> bool: + """Move until limit switch is triggered.""" + return True diff --git a/api/src/opentrons/drivers/flex_stacker/types.py b/api/src/opentrons/drivers/flex_stacker/types.py new file mode 100644 index 00000000000..4035aaaa755 --- /dev/null +++ b/api/src/opentrons/drivers/flex_stacker/types.py @@ -0,0 +1,138 @@ +from enum import Enum +from dataclasses import dataclass, fields +from typing import List + +from opentrons.drivers.command_builder import CommandBuilder + + +class GCODE(str, Enum): + + MOVE_TO = "G0" + MOVE_TO_SWITCH = "G5" + HOME_AXIS = "G28" + STOP_MOTORS = "M0" + GET_RESET_REASON = "M114" + DEVICE_INFO = "M115" + GET_LIMIT_SWITCH = "M119" + SET_LED = "M200" + GET_PLATFORM_SENSOR = "M121" + GET_DOOR_SWITCH = "M122" + SET_SERIAL_NUMBER = "M996" + ENTER_BOOTLOADER = "dfu" + + def build_command(self) -> CommandBuilder: + """Build command.""" + return CommandBuilder().add_gcode(self) + + +STACKER_VID = 0x483 +STACKER_PID = 0xEF24 +STACKER_FREQ = 115200 + + +class HardwareRevision(Enum): + """Hardware Revision.""" + + NFF = "nff" + EVT = "a1" + + +@dataclass +class StackerInfo: + """Stacker Info.""" + + fw: str + hw: HardwareRevision + sn: str + + +class StackerAxis(Enum): + """Stacker Axis.""" + + X = "X" + Z = "Z" + L = "L" + + def __str__(self) -> str: + """Name.""" + return self.name + + +class LEDColor(Enum): + """Stacker LED Color.""" + + WHITE = 0 + RED = 1 + GREEN = 2 + BLUE = 3 + + +class Direction(Enum): + """Direction.""" + + RETRACT = 0 # negative + EXTENT = 1 # positive + + def __str__(self) -> str: + """Convert to tag for clear logging.""" + return "negative" if self == Direction.RETRACT else "positive" + + def opposite(self) -> "Direction": + """Get opposite direction.""" + return Direction.EXTENT if self == Direction.RETRACT else Direction.RETRACT + + def distance(self, distance: float) -> float: + """Get signed distance, where retract direction is negative.""" + return distance * -1 if self == Direction.RETRACT else distance + + +@dataclass +class LimitSwitchStatus: + """Stacker Limit Switch Statuses.""" + + XE: bool + XR: bool + ZE: bool + ZR: bool + LR: bool + + @classmethod + def get_fields(cls) -> List[str]: + """Get fields.""" + return [f.name for f in fields(cls)] + + def get(self, axis: StackerAxis, direction: Direction) -> bool: + """Get limit switch status.""" + if axis == StackerAxis.X: + return self.XE if direction == Direction.EXTENT else self.XR + if axis == StackerAxis.Z: + return self.ZE if direction == Direction.EXTENT else self.ZR + if direction == Direction.EXTENT: + raise ValueError("Latch does not have extent limit switch") + return self.LR + + +@dataclass +class PlatformStatus: + """Stacker Platform Statuses.""" + + E: bool + R: bool + + @classmethod + def get_fields(cls) -> List[str]: + """Get fields.""" + return [f.name for f in fields(cls)] + + def get(self, direction: Direction) -> bool: + """Get platform status.""" + return self.E if direction == Direction.EXTENT else self.R + + +@dataclass +class MoveParams: + """Move Parameters.""" + + max_speed: float | None = None + acceleration: float | None = None + max_speed_discont: float | None = None diff --git a/api/tests/opentrons/drivers/flex_stacker/__init__.py b/api/tests/opentrons/drivers/flex_stacker/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/api/tests/opentrons/drivers/flex_stacker/test_driver.py b/api/tests/opentrons/drivers/flex_stacker/test_driver.py new file mode 100644 index 00000000000..aea2492cf9e --- /dev/null +++ b/api/tests/opentrons/drivers/flex_stacker/test_driver.py @@ -0,0 +1,257 @@ +import pytest +from mock import AsyncMock +from opentrons.drivers.asyncio.communication.serial_connection import ( + AsyncResponseSerialConnection, +) +from opentrons.drivers.flex_stacker.driver import FlexStackerDriver +from opentrons.drivers.flex_stacker import types + + +@pytest.fixture +def connection() -> AsyncMock: + return AsyncMock(spec=AsyncResponseSerialConnection) + + +@pytest.fixture +def subject(connection: AsyncMock) -> FlexStackerDriver: + connection.send_command.return_value = "" + return FlexStackerDriver(connection) + + +async def test_get_device_info( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a get device info command""" + connection.send_command.return_value = ( + "M115 FW:0.0.1 HW:Opentrons-flex-stacker-a1 SerialNo:STCA120230605001" + ) + response = await subject.get_device_info() + assert response == types.StackerInfo( + fw="0.0.1", + hw=types.HardwareRevision.EVT, + sn="STCA120230605001", + ) + + device_info = types.GCODE.DEVICE_INFO.build_command() + reset_reason = types.GCODE.GET_RESET_REASON.build_command() + connection.send_command.assert_any_call(device_info) + connection.send_command.assert_called_with(reset_reason) + connection.reset_mock() + + # Test invalid response + connection.send_command.return_value = "M115 FW:0.0.1 SerialNo:STCA120230605001" + + # This should raise ValueError + with pytest.raises(ValueError): + response = await subject.get_device_info() + + device_info = types.GCODE.DEVICE_INFO.build_command() + reset_reason = types.GCODE.GET_RESET_REASON.build_command() + connection.send_command.assert_any_call(device_info) + connection.send_command.assert_called_with(reset_reason) + + +async def test_stop_motors(subject: FlexStackerDriver, connection: AsyncMock) -> None: + """It should send a stop motors command""" + connection.send_command.return_value = "M0" + response = await subject.stop_motors() + assert response + + stop_motors = types.GCODE.STOP_MOTORS.build_command() + connection.send_command.assert_any_call(stop_motors) + connection.reset_mock() + + # This should raise ValueError + with pytest.raises(ValueError): + await subject.get_device_info() + + +async def test_set_serial_number( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a set serial number command""" + connection.send_command.return_value = "M996" + + serial_number = "Something" + response = await subject.set_serial_number(serial_number) + assert response + + set_serial_number = types.GCODE.SET_SERIAL_NUMBER.build_command().add_element( + serial_number + ) + connection.send_command.assert_any_call(set_serial_number) + connection.reset_mock() + + # Test invalid response + connection.send_command.return_value = "M9nn" + with pytest.raises(ValueError): + response = await subject.set_serial_number(serial_number) + + set_serial_number = types.GCODE.SET_SERIAL_NUMBER.build_command().add_element( + serial_number + ) + connection.send_command.assert_any_call(set_serial_number) + connection.reset_mock() + + +async def test_get_limit_switch( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a get limit switch command and return the boolean of one.""" + connection.send_command.return_value = "M119 XE:1 XR:0 ZE:0 ZR:1 LR:1" + response = await subject.get_limit_switch( + types.StackerAxis.X, types.Direction.EXTENT + ) + assert response + + limit_switch_status = types.GCODE.GET_LIMIT_SWITCH.build_command() + connection.send_command.assert_any_call(limit_switch_status) + connection.reset_mock() + + +async def test_get_limit_switches_status( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a get limit switch status and return LimitSwitchStatus.""" + connection.send_command.return_value = "M119 XE:1 XR:0 ZE:0 ZR:1 LR:1" + response = await subject.get_limit_switches_status() + assert response == types.LimitSwitchStatus( + XE=True, + XR=False, + ZE=False, + ZR=True, + LR=True, + ) + + limit_switch_status = types.GCODE.GET_LIMIT_SWITCH.build_command() + connection.send_command.assert_any_call(limit_switch_status) + connection.reset_mock() + + # Test invalid response + connection.send_command.return_value = "M119 XE:b XR:0 ZE:a ZR:1 LR:n" + with pytest.raises(ValueError): + response = await subject.get_limit_switches_status() + + limit_switch_status = types.GCODE.GET_LIMIT_SWITCH.build_command() + connection.send_command.assert_any_call(limit_switch_status) + + +async def test_get_platform_sensor( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a get platform sensor command return status of specified sensor.""" + connection.send_command.return_value = "M121 E:1 R:1" + response = await subject.get_platform_sensor(types.Direction.EXTENT) + assert response + + platform_sensor = types.GCODE.GET_PLATFORM_SENSOR.build_command() + connection.send_command.assert_any_call(platform_sensor) + connection.reset_mock() + + +async def test_get_platform_status( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """it should send a get platform sensors status.""" + connection.send_command.return_value = "M121 E:0 R:1" + response = await subject.get_platform_status() + assert response == types.PlatformStatus( + E=False, + R=True, + ) + + platform_status = types.GCODE.GET_PLATFORM_SENSOR.build_command() + connection.send_command.assert_any_call(platform_status) + connection.reset_mock() + + # Test invalid response + connection.send_command.return_value = "M121 E:0 R:1 something" + with pytest.raises(ValueError): + response = await subject.get_platform_status() + + platform_status = types.GCODE.GET_PLATFORM_SENSOR.build_command() + connection.send_command.assert_any_call(platform_status) + + +async def test_get_hopper_door_closed( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a get door closed command.""" + connection.send_command.return_value = "M122 D:1" + response = await subject.get_hopper_door_closed() + assert response + + door_closed = types.GCODE.GET_DOOR_SWITCH.build_command() + connection.send_command.assert_any_call(door_closed) + connection.reset_mock() + + # Test door open + connection.send_command.return_value = "M122 D:0" + response = await subject.get_hopper_door_closed() + assert not response + + door_closed = types.GCODE.GET_DOOR_SWITCH.build_command() + connection.send_command.assert_any_call(door_closed) + connection.reset_mock() + + # Test invalid response + connection.send_command.return_value = "M122 78gybhjk" + + with pytest.raises(ValueError): + response = await subject.get_hopper_door_closed() + + door_closed = types.GCODE.GET_DOOR_SWITCH.build_command() + connection.send_command.assert_any_call(door_closed) + connection.reset_mock() + + +async def test_move_in_mm(subject: FlexStackerDriver, connection: AsyncMock) -> None: + """It should send a move to command""" + connection.send_command.return_value = "G0" + response = await subject.move_in_mm(types.StackerAxis.X, 10) + assert response + + move_to = types.GCODE.MOVE_TO.build_command().add_float("X", 10) + connection.send_command.assert_any_call(move_to) + connection.reset_mock() + + +async def test_move_to_switch( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a move to switch command""" + connection.send_command.return_value = "G5" + axis = types.StackerAxis.X + direction = types.Direction.EXTENT + response = await subject.move_to_limit_switch(axis, direction) + assert response + + move_to = types.GCODE.MOVE_TO_SWITCH.build_command().add_int( + axis.name, direction.value + ) + connection.send_command.assert_any_call(move_to) + connection.reset_mock() + + +async def test_home_axis(subject: FlexStackerDriver, connection: AsyncMock) -> None: + """It should send a home axis command""" + connection.send_command.return_value = "G28" + axis = types.StackerAxis.X + direction = types.Direction.EXTENT + response = await subject.home_axis(axis, direction) + assert response + + move_to = types.GCODE.HOME_AXIS.build_command().add_int(axis.name, direction.value) + connection.send_command.assert_any_call(move_to) + connection.reset_mock() + + +async def test_set_led(subject: FlexStackerDriver, connection: AsyncMock) -> None: + """It should send a set led command""" + connection.send_command.return_value = "M200" + response = await subject.set_led(1, types.LEDColor.RED) + assert response + + set_led = types.GCODE.SET_LED.build_command().add_float("P", 1).add_int("C", 1) + connection.send_command.assert_any_call(set_led) + connection.reset_mock()