Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Heater Shaker Firmware Level Emulator #11121

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions api/src/opentrons/hardware_control/emulation/heater_shaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""An emulation of the opentrons heater shaker module.

The purpose is to provide a fake backend that responds to GCODE commands.
"""
import logging
from typing import (
Optional,
)

from opentrons.drivers.heater_shaker.driver import (
GCODE,
HS_ACK,
)
from opentrons.hardware_control.emulation.parser import Parser, Command
from opentrons.hardware_control.emulation.settings import HeaterShakerSettings
from . import util

from .abstract_emulator import AbstractEmulator
from .simulations import (
Temperature,
RPM,
)
from ...drivers.types import HeaterShakerLabwareLatchStatus

logger = logging.getLogger(__name__)


class HeaterShakerEmulator(AbstractEmulator):
"""Heater Shaker emulator"""

_temperature: Temperature
_rpm: RPM
_latch_status: HeaterShakerLabwareLatchStatus

def __init__(self, parser: Parser, settings: HeaterShakerSettings) -> None:
self._parser = parser
self._settings = settings
self._gcode_to_function_mapping = {
GCODE.SET_RPM.value: self._set_rpm,
GCODE.GET_RPM.value: self._get_rpm,
GCODE.SET_TEMPERATURE.value: self._set_temp,
GCODE.GET_TEMPERATURE.value: self._get_temp,
GCODE.HOME.value: self._home,
GCODE.ENTER_BOOTLOADER.value: self._enter_bootloader,
GCODE.GET_VERSION.value: self._get_version,
GCODE.OPEN_LABWARE_LATCH.value: self._open_labware_latch,
GCODE.CLOSE_LABWARE_LATCH.value: self._close_labware_latch,
GCODE.GET_LABWARE_LATCH_STATE.value: self._get_labware_latch_state,
GCODE.DEACTIVATE_HEATER.value: self._deactivate_heater,
}
self.reset()

def handle(self, line: str) -> Optional[str]:
"""Handle a line"""
results = (self._handle(c) for c in self._parser.parse(line))
joined = " ".join(r for r in results if r)
return None if not joined else joined

def reset(self) -> None:

self._temperature = Temperature(
per_tick=self._settings.temperature.degrees_per_tick,
current=self._settings.temperature.starting,
)
self._rpm = RPM(
per_tick=self._settings.rpm.rpm_per_tick,
current=self._settings.rpm.starting,
)
self._rpm.set_target(0.0)
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_OPEN

def _handle(self, command: Command) -> Optional[str]:
"""
Handle a command.

TODO: AL 20210218 create dispatch map and remove 'noqa(C901)'
"""
logger.info(f"Got command {command}")
func_to_run = self._gcode_to_function_mapping.get(command.gcode)
res = None if func_to_run is None else func_to_run(command)
return None if not isinstance(res, str) else f"{res} {HS_ACK}"

def _set_rpm(self, command: Command) -> str:
value = command.params["S"]
assert isinstance(value, float), f"invalid value '{value}'"
self._rpm.set_target(value)
return "M3"

def _get_rpm(self, command: Command) -> str:
res = f"M123 C:{self._rpm.current} T:{self._rpm.target}"
DerekMaggio marked this conversation as resolved.
Show resolved Hide resolved
self._rpm.tick()
return res

def _set_temp(self, command: Command) -> str:
value = command.params["S"]
assert isinstance(value, float), f"invalid value '{value}'"
self._temperature.set_target(value)
return "M104"

def _get_temp(self, command: Command) -> str:
res = (
f"M105 C:{self._temperature.current} "
f"T:{util.OptionalValue(self._temperature.target)}"
)
self._temperature.tick()
return res

def _home(self, command: Command) -> str:
self._rpm.set_target(None)
return "G28"

def _enter_bootloader(self, command: Command) -> None:
pass

def _get_version(self, command: Command) -> str:
return (
f"FW:{self._settings.version} "
f"HW:{self._settings.model} "
f"SerialNo:{self._settings.serial_number}"
)

def _open_labware_latch(self, command: Command) -> str:
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_OPEN
return "M242"

def _close_labware_latch(self, command: Command) -> str:
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_CLOSED
return "M243"

def _get_labware_latch_state(self, command: Command) -> str:
return f"M241 STATUS:{self._latch_status.value.upper()}"

def _deactivate_heater(self, command: Command) -> str:
pass

@staticmethod
def get_terminator() -> bytes:
return b"\n"
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing_extensions import Final

from opentrons.hardware_control.emulation.abstract_emulator import AbstractEmulator
from opentrons.hardware_control.emulation.heater_shaker import HeaterShakerEmulator
from opentrons.hardware_control.emulation.types import ModuleType
from opentrons.hardware_control.emulation.magdeck import MagDeckEmulator
from opentrons.hardware_control.emulation.parser import Parser
Expand All @@ -22,12 +23,16 @@
ModuleType.Thermocycler.value: lambda s: ThermocyclerEmulator(
Parser(), s.thermocycler
),
ModuleType.Heatershaker.value: lambda s: HeaterShakerEmulator(
Parser(), s.heatershaker
),
}

emulator_port: Final[Dict[str, Callable[[Settings], ProxySettings]]] = {
ModuleType.Magnetic.value: lambda s: s.magdeck_proxy,
ModuleType.Temperature.value: lambda s: s.temperature_proxy,
ModuleType.Thermocycler.value: lambda s: s.thermocycler_proxy,
ModuleType.Heatershaker.value: lambda s: s.heatershaker_proxy,
}


Expand Down
17 changes: 17 additions & 0 deletions api/src/opentrons/hardware_control/emulation/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class TemperatureModelSettings(BaseModel):
starting: float = float(TEMPERATURE_ROOM)


class RPMModelSettings(BaseModel):
rpm_per_tick: float = 100.0
starting: float = 0.0


class MagDeckSettings(BaseModuleSettings):
pass

Expand All @@ -42,6 +47,11 @@ class ThermocyclerSettings(BaseModuleSettings):
plate_temperature: TemperatureModelSettings


class HeaterShakerSettings(BaseModuleSettings):
temperature: TemperatureModelSettings
rpm: RPMModelSettings


class ProxySettings(BaseModel):
"""Settings for a proxy."""

Expand Down Expand Up @@ -75,6 +85,13 @@ class Settings(BaseSettings):
lid_temperature=TemperatureModelSettings(),
plate_temperature=TemperatureModelSettings(),
)
heatershaker: HeaterShakerSettings = HeaterShakerSettings(
serial_number="heater_shaker_emulator",
model="v01",
version="v0.0.1",
temperature=TemperatureModelSettings(),
rpm=RPMModelSettings(),
)

heatershaker_proxy: ProxySettings = ProxySettings(
emulator_port=9000, driver_port=9995
Expand Down
50 changes: 50 additions & 0 deletions api/src/opentrons/hardware_control/emulation/simulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,56 @@ def target(self) -> Optional[float]:
return self._target


class RPM(Simulation):
"""A model with a current and target rpm. The current rpm is
always moving towards the target.
"""

def __init__(self, per_tick: float, current: float) -> None:
"""Construct a rpm simulation.

Args:
per_tick: amount to move per tick,
current: the starting rpm
"""
self._per_tick = per_tick
self._current = current
self._target: Optional[float] = None

def tick(self) -> None:

if self._target is None:
target = 0.0
else:
target = self._target

diff = target - self._current

if abs(diff) < self._per_tick:
self._current = target
elif diff > 0:
self._current += self._per_tick
else:
self._current -= self._per_tick
self._current

def deactivate(self, rpm: float) -> None:
"""Deactivate and reset to rpm"""
self._target = None
self._current = rpm

def set_target(self, target: Optional[float]) -> None:
self._target = target

@property
def current(self) -> float:
return self._current

@property
def target(self) -> Optional[float]:
return self._target


class TemperatureWithHold(Temperature):
"""A model with a current temperature, target temperature, and hold time.
The current temperature is always moving towards the target.
Expand Down
7 changes: 6 additions & 1 deletion api/tests/opentrons/hardware_control/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ def emulator_settings() -> Settings:
@pytest.fixture(scope="session")
def emulation_app(emulator_settings: Settings) -> Iterator[None]:
"""Run the emulators"""
modules = [ModuleType.Magnetic, ModuleType.Temperature, ModuleType.Thermocycler]
modules = [
ModuleType.Magnetic,
ModuleType.Temperature,
ModuleType.Thermocycler,
ModuleType.Heatershaker,
]

def _run_app() -> None:
async def _async_run() -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import asyncio
from typing import AsyncIterator, Iterator

import pytest
from mock import AsyncMock
from opentrons.drivers.rpi_drivers.types import USBPort
from opentrons.hardware_control.emulation.settings import Settings

from opentrons.hardware_control.modules import HeaterShaker


@pytest.fixture
async def heatershaker(
emulation_app: Iterator[None],
emulator_settings: Settings,
) -> AsyncIterator[HeaterShaker]:
module = await HeaterShaker.build(
port=f"socket://127.0.0.1:{emulator_settings.heatershaker_proxy.driver_port}",
execution_manager=AsyncMock(),
usb_port=USBPort(name="", port_number=1, device_path="", hub=1),
loop=asyncio.get_running_loop(),
polling_period=0.5,
)
yield module
await module.cleanup()


def test_device_info(heatershaker: HeaterShaker):
"""Confirm device_info returns correct values."""
assert heatershaker.device_info == {
"model": "v01",
"version": "v0.0.1",
"serial": "heater_shaker_emulator",
}


async def test_latch_status(heatershaker: HeaterShaker) -> None:
"""It should run open and close latch."""
await heatershaker.wait_next_poll()
DerekMaggio marked this conversation as resolved.
Show resolved Hide resolved
assert heatershaker.labware_latch_status.value == "idle_open"

await heatershaker.close_labware_latch()
assert heatershaker.labware_latch_status.value == "idle_closed"

await heatershaker.open_labware_latch()
assert heatershaker.labware_latch_status.value == "idle_open"


async def test_speed(heatershaker: HeaterShaker) -> None:
"""It should speed up, then slow down."""

await heatershaker.wait_next_poll()
await heatershaker.set_speed(150)
assert heatershaker.target_speed == 150

# The acceptable delta for actual speed is 100
assert 50 <= heatershaker.speed <= 250


async def test_temp(heatershaker: HeaterShaker) -> None:
"""Test setting temp"""
await heatershaker.wait_next_poll()
await heatershaker.start_set_temperature(50.0)

# Have to wait for next poll because target temp will not update until then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the intended API contract of the Heater-Shaker hardware controller? If I do hs.start_set_temperature(...) and then do t = hs.target_temperature, I'm not guaranteed to get back what I just set?

If yes: I think this is surprising, and we need to document it in the HeaterShaker class. Doesn't have to happen in this PR, but I'd like confirmation/consensus.

If no: this indicates a bug, I guess?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know, it makes sense that target temp should be set and it would be just current temp that hasn't update yet.

Looking into it now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left out a line. Fixing it in a followup commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await heatershaker.wait_next_poll()
assert heatershaker.target_temperature == 50.0
assert heatershaker.temperature != 50.0

await heatershaker.await_temperature(50.0)
assert heatershaker.target_temperature == 50.0
assert 49.3 <= heatershaker.temperature <= 50.7
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ services:
- container-network
depends_on:
- 'emulator'
heatershaker:
build: .
command: python3 -m opentrons.hardware_control.emulation.scripts.run_module_emulator heatershaker emulator
networks:
- container-network
depends_on:
- 'emulator'

networks:
container-network: