Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Heater Shaker Firmware Level Emulator #11121

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/src/opentrons/drivers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def parse_rpm_response(rpm_string: str) -> RPM:
"""Example input: T:1233 C:212"""
data = parse_key_values(rpm_string)
try:
# target is listed as Optional for below assignment,
# but None will be represented as 0 in G-code
target: Optional[int] = int(parse_number(data["T"], 0))
if target == 0:
target = None
Expand Down
146 changes: 146 additions & 0 deletions api/src/opentrons/hardware_control/emulation/heater_shaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""An emulation of the opentrons heater shaker module.

The purpose is to provide a fake backend that responds to GCODE commands.
"""
import logging
from time import sleep
from typing import (
Optional,
)

from opentrons.drivers.heater_shaker.driver import (
GCODE,
HS_ACK,
)
from opentrons.hardware_control.emulation.parser import Parser, Command
from opentrons.hardware_control.emulation.settings import HeaterShakerSettings
from . import util

from .abstract_emulator import AbstractEmulator
from .simulations import (
Temperature,
RPM,
)
from .util import TEMPERATURE_ROOM
from ...drivers.types import HeaterShakerLabwareLatchStatus

logger = logging.getLogger(__name__)


class HeaterShakerEmulator(AbstractEmulator):
"""Heater Shaker emulator"""

_temperature: Temperature
_rpm: RPM
_latch_status: HeaterShakerLabwareLatchStatus

def __init__(self, parser: Parser, settings: HeaterShakerSettings) -> None:
self._parser = parser
self._settings = settings
self._gcode_to_function_mapping = {
GCODE.SET_RPM.value: self._set_rpm,
GCODE.GET_RPM.value: self._get_rpm,
GCODE.SET_TEMPERATURE.value: self._set_temp,
GCODE.GET_TEMPERATURE.value: self._get_temp,
GCODE.HOME.value: self._home,
GCODE.ENTER_BOOTLOADER.value: self._enter_bootloader,
GCODE.GET_VERSION.value: self._get_version,
GCODE.OPEN_LABWARE_LATCH.value: self._open_labware_latch,
GCODE.CLOSE_LABWARE_LATCH.value: self._close_labware_latch,
GCODE.GET_LABWARE_LATCH_STATE.value: self._get_labware_latch_state,
GCODE.DEACTIVATE_HEATER.value: self._deactivate_heater,
}
self.reset()

def handle(self, line: str) -> Optional[str]:
"""Handle a line"""
results = (self._handle(c) for c in self._parser.parse(line))
joined = " ".join(r for r in results if r)
return None if not joined else joined

def reset(self) -> None:

self._temperature = Temperature(
per_tick=self._settings.temperature.degrees_per_tick,
current=self._settings.temperature.starting,
)
self._rpm = RPM(
per_tick=self._settings.rpm.rpm_per_tick,
current=self._settings.rpm.starting,
)
self._rpm.set_target(0.0)
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_OPEN

def _handle(self, command: Command) -> Optional[str]:
"""
Handle a command.

TODO: AL 20210218 create dispatch map and remove 'noqa(C901)'
"""
logger.info(f"Got command {command}")
func_to_run = self._gcode_to_function_mapping.get(command.gcode)
res = None if func_to_run is None else func_to_run(command)
return None if not isinstance(res, str) else f"{res} {HS_ACK}"

def _set_rpm(self, command: Command) -> str:
value = command.params["S"]
assert isinstance(value, float), f"invalid value '{value}'"
self._rpm.set_target(value)
return "M3"

def _get_rpm(self, command: Command) -> str:
res = (
f"M123 C:{self._rpm.current} "
f"T:{self._rpm.target if self._rpm.target is not None else 0}"
)
self._rpm.tick()
return res

def _set_temp(self, command: Command) -> str:
value = command.params["S"]
assert isinstance(value, float), f"invalid value '{value}'"
self._temperature.set_target(value)
return "M104"

def _get_temp(self, command: Command) -> str:
res = (
f"M105 C:{self._temperature.current} "
f"T:{util.OptionalValue(self._temperature.target)}"
)
self._temperature.tick()
return res

def _home(self, command: Command) -> str:
sleep(self._settings.home_delay_time)
self._rpm.deactivate(0.0)
self._rpm.set_target(0.0)
return "G28"

def _enter_bootloader(self, command: Command) -> None:
pass

def _get_version(self, command: Command) -> str:
return (
f"FW:{self._settings.version} "
f"HW:{self._settings.model} "
f"SerialNo:{self._settings.serial_number}"
)

def _open_labware_latch(self, command: Command) -> str:
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_OPEN
return "M242"

def _close_labware_latch(self, command: Command) -> str:
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_CLOSED
return "M243"

def _get_labware_latch_state(self, command: Command) -> str:
return f"M241 STATUS:{self._latch_status.value.upper()}"

def _deactivate_heater(self, command: Command) -> str:
self._temperature.deactivate(TEMPERATURE_ROOM)
return "M106"

@staticmethod
def get_terminator() -> bytes:
return b"\n"
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing_extensions import Final

from opentrons.hardware_control.emulation.abstract_emulator import AbstractEmulator
from opentrons.hardware_control.emulation.heater_shaker import HeaterShakerEmulator
from opentrons.hardware_control.emulation.types import ModuleType
from opentrons.hardware_control.emulation.magdeck import MagDeckEmulator
from opentrons.hardware_control.emulation.parser import Parser
Expand All @@ -22,12 +23,16 @@
ModuleType.Thermocycler.value: lambda s: ThermocyclerEmulator(
Parser(), s.thermocycler
),
ModuleType.Heatershaker.value: lambda s: HeaterShakerEmulator(
Parser(), s.heatershaker
),
}

emulator_port: Final[Dict[str, Callable[[Settings], ProxySettings]]] = {
ModuleType.Magnetic.value: lambda s: s.magdeck_proxy,
ModuleType.Temperature.value: lambda s: s.temperature_proxy,
ModuleType.Thermocycler.value: lambda s: s.thermocycler_proxy,
ModuleType.Heatershaker.value: lambda s: s.heatershaker_proxy,
}


Expand Down
19 changes: 19 additions & 0 deletions api/src/opentrons/hardware_control/emulation/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class TemperatureModelSettings(BaseModel):
starting: float = float(TEMPERATURE_ROOM)


class RPMModelSettings(BaseModel):
rpm_per_tick: float = 100.0
starting: float = 0.0


class MagDeckSettings(BaseModuleSettings):
pass

Expand All @@ -42,6 +47,12 @@ class ThermocyclerSettings(BaseModuleSettings):
plate_temperature: TemperatureModelSettings


class HeaterShakerSettings(BaseModuleSettings):
temperature: TemperatureModelSettings
rpm: RPMModelSettings
home_delay_time: int = 0


class ProxySettings(BaseModel):
"""Settings for a proxy."""

Expand Down Expand Up @@ -75,6 +86,14 @@ class Settings(BaseSettings):
lid_temperature=TemperatureModelSettings(),
plate_temperature=TemperatureModelSettings(),
)
heatershaker: HeaterShakerSettings = HeaterShakerSettings(
serial_number="heater_shaker_emulator",
model="v01",
version="v0.0.1",
temperature=TemperatureModelSettings(),
rpm=RPMModelSettings(),
home_delay_time=0,
)

heatershaker_proxy: ProxySettings = ProxySettings(
emulator_port=9000, driver_port=9995
Expand Down
51 changes: 50 additions & 1 deletion api/src/opentrons/hardware_control/emulation/simulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,56 @@ def deactivate(self, temperature: float) -> None:
self._target = None
self._current = temperature

def set_target(self, target: float) -> None:
def set_target(self, target: Optional[float]) -> None:
self._target = target

@property
def current(self) -> float:
return self._current

@property
def target(self) -> Optional[float]:
return self._target


class RPM(Simulation):
"""A model with a current and target rpm. The current rpm is
always moving towards the target.
"""

def __init__(self, per_tick: float, current: float) -> None:
"""Construct a rpm simulation.

Args:
per_tick: amount to move per tick,
current: the starting rpm
"""
self._per_tick = per_tick
self._current = current
self._target: Optional[float] = None

def tick(self) -> None:

if self._target is None:
target = 0.0
else:
target = self._target

diff = target - self._current

if abs(diff) < self._per_tick:
self._current = target
elif diff > 0:
self._current += self._per_tick
else:
self._current -= self._per_tick

def deactivate(self, rpm: float) -> None:
"""Deactivate and reset to rpm"""
self._target = None
self._current = rpm

def set_target(self, target: Optional[float]) -> None:
self._target = target

@property
Expand Down
7 changes: 6 additions & 1 deletion api/tests/opentrons/hardware_control/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ def emulator_settings() -> Settings:
@pytest.fixture(scope="session")
def emulation_app(emulator_settings: Settings) -> Iterator[None]:
"""Run the emulators"""
modules = [ModuleType.Magnetic, ModuleType.Temperature, ModuleType.Thermocycler]
modules = [
ModuleType.Magnetic,
ModuleType.Temperature,
ModuleType.Thermocycler,
ModuleType.Heatershaker,
]

def _run_app() -> None:
async def _async_run() -> None:
Expand Down
102 changes: 102 additions & 0 deletions api/tests/opentrons/hardware_control/integration/test_heatershaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import asyncio
from typing import AsyncIterator, Iterator

import pytest
from mock import AsyncMock
from opentrons.drivers.rpi_drivers.types import USBPort
from opentrons.hardware_control.emulation.settings import Settings
from opentrons.hardware_control.emulation.util import TEMPERATURE_ROOM

from opentrons.hardware_control.modules import HeaterShaker

TEMP_ROOM_LOW = TEMPERATURE_ROOM - 0.7
TEMP_ROOM_HIGH = TEMPERATURE_ROOM + 0.7


@pytest.fixture
async def heatershaker(
emulation_app: Iterator[None],
emulator_settings: Settings,
) -> AsyncIterator[HeaterShaker]:
module = await HeaterShaker.build(
port=f"socket://127.0.0.1:{emulator_settings.heatershaker_proxy.driver_port}",
execution_manager=AsyncMock(),
usb_port=USBPort(name="", port_number=1, device_path="", hub=1),
loop=asyncio.get_running_loop(),
polling_period=0.5,
)
yield module
await module.cleanup()


def test_device_info(heatershaker: HeaterShaker):
"""Confirm device_info returns correct values."""
assert heatershaker.device_info == {
"model": "v01",
"version": "v0.0.1",
"serial": "heater_shaker_emulator",
}


async def test_latch_status(heatershaker: HeaterShaker) -> None:
"""It should run open and close latch."""
await heatershaker.wait_next_poll()
DerekMaggio marked this conversation as resolved.
Show resolved Hide resolved
assert heatershaker.labware_latch_status.value == "idle_open"

await heatershaker.close_labware_latch()
assert heatershaker.labware_latch_status.value == "idle_closed"

await heatershaker.open_labware_latch()
assert heatershaker.labware_latch_status.value == "idle_open"


async def test_speed(heatershaker: HeaterShaker) -> None:
"""It should speed up, then slow down."""

await heatershaker.wait_next_poll()
await heatershaker.set_speed(550)
assert heatershaker.target_speed == 550

# The acceptable delta for actual speed is 100
assert 450 <= heatershaker.speed <= 650


async def test_deactivate_shaker(heatershaker: HeaterShaker) -> None:
"""It should speed up, then slow down."""

await heatershaker.wait_next_poll()
await heatershaker.set_speed(150)
assert heatershaker.target_speed == 150

await heatershaker.deactivate_shaker()

assert heatershaker.speed == 0
assert heatershaker.target_speed is None


async def test_deactivate_heater(heatershaker: HeaterShaker) -> None:
await heatershaker.wait_next_poll()
await heatershaker.start_set_temperature(30.0)
await heatershaker.await_temperature(30.0)
assert heatershaker.target_temperature == 30.0
assert 29.3 <= heatershaker.temperature <= 30.7

await heatershaker.deactivate_heater()
assert heatershaker.target_temperature is None
assert TEMP_ROOM_LOW <= heatershaker.temperature <= TEMP_ROOM_HIGH


async def test_temp(heatershaker: HeaterShaker) -> None:
"""Test setting temp"""

# Have to wait for next poll because target temp will not update until then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the intended API contract of the Heater-Shaker hardware controller? If I do hs.start_set_temperature(...) and then do t = hs.target_temperature, I'm not guaranteed to get back what I just set?

If yes: I think this is surprising, and we need to document it in the HeaterShaker class. Doesn't have to happen in this PR, but I'd like confirmation/consensus.

If no: this indicates a bug, I guess?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know, it makes sense that target temp should be set and it would be just current temp that hasn't update yet.

Looking into it now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left out a line. Fixing it in a followup commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await heatershaker.wait_next_poll()
await heatershaker.start_set_temperature(50.0)
assert heatershaker.target_temperature == 50.0
assert heatershaker.temperature != 50.0

await heatershaker.await_temperature(50.0)
assert heatershaker.target_temperature == 50.0

# Acceptable delta is 0.7 degrees
assert 49.3 <= heatershaker.temperature <= 50.7
Loading