Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Heater Shaker Firmware Level Emulator #11121

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/src/opentrons/drivers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def parse_rpm_response(rpm_string: str) -> RPM:
"""Example input: T:1233 C:212"""
data = parse_key_values(rpm_string)
try:
# target is listed as Optional for below assignment,
# but None will be represented as 0 in G-code
target: Optional[int] = int(parse_number(data["T"], 0))
if target == 0:
target = None
Expand Down
146 changes: 146 additions & 0 deletions api/src/opentrons/hardware_control/emulation/heater_shaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""An emulation of the opentrons heater shaker module.

The purpose is to provide a fake backend that responds to GCODE commands.
"""
import logging
from time import sleep
from typing import (
Optional,
)

from opentrons.drivers.heater_shaker.driver import (
GCODE,
HS_ACK,
)
from opentrons.hardware_control.emulation.parser import Parser, Command
from opentrons.hardware_control.emulation.settings import HeaterShakerSettings
from . import util

from .abstract_emulator import AbstractEmulator
from .simulations import (
Temperature,
RPM,
)
from .util import TEMPERATURE_ROOM
from ...drivers.types import HeaterShakerLabwareLatchStatus

logger = logging.getLogger(__name__)


class HeaterShakerEmulator(AbstractEmulator):
"""Heater Shaker emulator"""

_temperature: Temperature
_rpm: RPM
_latch_status: HeaterShakerLabwareLatchStatus

def __init__(self, parser: Parser, settings: HeaterShakerSettings) -> None:
self._parser = parser
self._settings = settings
self._gcode_to_function_mapping = {
GCODE.SET_RPM.value: self._set_rpm,
GCODE.GET_RPM.value: self._get_rpm,
GCODE.SET_TEMPERATURE.value: self._set_temp,
GCODE.GET_TEMPERATURE.value: self._get_temp,
GCODE.HOME.value: self._home,
GCODE.ENTER_BOOTLOADER.value: self._enter_bootloader,
GCODE.GET_VERSION.value: self._get_version,
GCODE.OPEN_LABWARE_LATCH.value: self._open_labware_latch,
GCODE.CLOSE_LABWARE_LATCH.value: self._close_labware_latch,
GCODE.GET_LABWARE_LATCH_STATE.value: self._get_labware_latch_state,
GCODE.DEACTIVATE_HEATER.value: self._deactivate_heater,
}
self.reset()

def handle(self, line: str) -> Optional[str]:
"""Handle a line"""
results = (self._handle(c) for c in self._parser.parse(line))
joined = " ".join(r for r in results if r)
return None if not joined else joined

def reset(self) -> None:

self._temperature = Temperature(
per_tick=self._settings.temperature.degrees_per_tick,
current=self._settings.temperature.starting,
)
self._rpm = RPM(
per_tick=self._settings.rpm.rpm_per_tick,
current=self._settings.rpm.starting,
)
self._rpm.set_target(0.0)
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_OPEN

def _handle(self, command: Command) -> Optional[str]:
"""
Handle a command.

TODO: AL 20210218 create dispatch map and remove 'noqa(C901)'
"""
logger.info(f"Got command {command}")
func_to_run = self._gcode_to_function_mapping.get(command.gcode)
res = None if func_to_run is None else func_to_run(command)
return None if not isinstance(res, str) else f"{res} {HS_ACK}"

def _set_rpm(self, command: Command) -> str:
value = command.params["S"]
assert isinstance(value, float), f"invalid value '{value}'"
self._rpm.set_target(value)
return "M3"

def _get_rpm(self, command: Command) -> str:
res = (
f"M123 C:{self._rpm.current} "
f"T:{self._rpm.target if self._rpm.target is not None else 0}"
)
self._rpm.tick()
return res

def _set_temp(self, command: Command) -> str:
value = command.params["S"]
assert isinstance(value, float), f"invalid value '{value}'"
self._temperature.set_target(value)
return "M104"

def _get_temp(self, command: Command) -> str:
res = (
f"M105 C:{self._temperature.current} "
f"T:{util.OptionalValue(self._temperature.target)}"
)
self._temperature.tick()
return res

def _home(self, command: Command) -> str:
sleep(2)
self._rpm.deactivate(0.0)
self._rpm.set_target(0.0)
return "G28"

def _enter_bootloader(self, command: Command) -> None:
pass

def _get_version(self, command: Command) -> str:
return (
f"FW:{self._settings.version} "
f"HW:{self._settings.model} "
f"SerialNo:{self._settings.serial_number}"
)

def _open_labware_latch(self, command: Command) -> str:
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_OPEN
return "M242"

def _close_labware_latch(self, command: Command) -> str:
self._latch_status = HeaterShakerLabwareLatchStatus.IDLE_CLOSED
return "M243"

def _get_labware_latch_state(self, command: Command) -> str:
return f"M241 STATUS:{self._latch_status.value.upper()}"

def _deactivate_heater(self, command: Command) -> str:
self._temperature.set_target(TEMPERATURE_ROOM)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the temperature status respond with a target of room temperature value. We want the target to return as None. It looks like the thermocycler & tempdeck's deactivate emulator simply sets the target to None and current temp to room temperature. So, I'm fine with implementing the same thing here for now. We can revisit them all later.

return "M106"

@staticmethod
def get_terminator() -> bytes:
return b"\n"
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing_extensions import Final

from opentrons.hardware_control.emulation.abstract_emulator import AbstractEmulator
from opentrons.hardware_control.emulation.heater_shaker import HeaterShakerEmulator
from opentrons.hardware_control.emulation.types import ModuleType
from opentrons.hardware_control.emulation.magdeck import MagDeckEmulator
from opentrons.hardware_control.emulation.parser import Parser
Expand All @@ -22,12 +23,16 @@
ModuleType.Thermocycler.value: lambda s: ThermocyclerEmulator(
Parser(), s.thermocycler
),
ModuleType.Heatershaker.value: lambda s: HeaterShakerEmulator(
Parser(), s.heatershaker
),
}

emulator_port: Final[Dict[str, Callable[[Settings], ProxySettings]]] = {
ModuleType.Magnetic.value: lambda s: s.magdeck_proxy,
ModuleType.Temperature.value: lambda s: s.temperature_proxy,
ModuleType.Thermocycler.value: lambda s: s.thermocycler_proxy,
ModuleType.Heatershaker.value: lambda s: s.heatershaker_proxy,
}


Expand Down
17 changes: 17 additions & 0 deletions api/src/opentrons/hardware_control/emulation/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class TemperatureModelSettings(BaseModel):
starting: float = float(TEMPERATURE_ROOM)


class RPMModelSettings(BaseModel):
rpm_per_tick: float = 100.0
starting: float = 0.0


class MagDeckSettings(BaseModuleSettings):
pass

Expand All @@ -42,6 +47,11 @@ class ThermocyclerSettings(BaseModuleSettings):
plate_temperature: TemperatureModelSettings


class HeaterShakerSettings(BaseModuleSettings):
temperature: TemperatureModelSettings
rpm: RPMModelSettings


class ProxySettings(BaseModel):
"""Settings for a proxy."""

Expand Down Expand Up @@ -75,6 +85,13 @@ class Settings(BaseSettings):
lid_temperature=TemperatureModelSettings(),
plate_temperature=TemperatureModelSettings(),
)
heatershaker: HeaterShakerSettings = HeaterShakerSettings(
serial_number="heater_shaker_emulator",
model="v01",
version="v0.0.1",
temperature=TemperatureModelSettings(),
rpm=RPMModelSettings(),
)

heatershaker_proxy: ProxySettings = ProxySettings(
emulator_port=9000, driver_port=9995
Expand Down
51 changes: 50 additions & 1 deletion api/src/opentrons/hardware_control/emulation/simulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,56 @@ def deactivate(self, temperature: float) -> None:
self._target = None
self._current = temperature

def set_target(self, target: float) -> None:
def set_target(self, target: Optional[float]) -> None:
self._target = target

@property
def current(self) -> float:
return self._current

@property
def target(self) -> Optional[float]:
return self._target


class RPM(Simulation):
"""A model with a current and target rpm. The current rpm is
always moving towards the target.
"""

def __init__(self, per_tick: float, current: float) -> None:
"""Construct a rpm simulation.

Args:
per_tick: amount to move per tick,
current: the starting rpm
"""
self._per_tick = per_tick
self._current = current
self._target: Optional[float] = None

def tick(self) -> None:

if self._target is None:
target = 0.0
else:
target = self._target

diff = target - self._current

if abs(diff) < self._per_tick:
self._current = target
elif diff > 0:
self._current += self._per_tick
else:
self._current -= self._per_tick

def deactivate(self, rpm: float) -> None:
"""Deactivate and reset to rpm"""
self._target = None
self._current = rpm

def set_target(self, target: Optional[float]) -> None:
self._target = target

@property
Expand Down
7 changes: 6 additions & 1 deletion api/tests/opentrons/hardware_control/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ def emulator_settings() -> Settings:
@pytest.fixture(scope="session")
def emulation_app(emulator_settings: Settings) -> Iterator[None]:
"""Run the emulators"""
modules = [ModuleType.Magnetic, ModuleType.Temperature, ModuleType.Thermocycler]
modules = [
ModuleType.Magnetic,
ModuleType.Temperature,
ModuleType.Thermocycler,
ModuleType.Heatershaker,
]

def _run_app() -> None:
async def _async_run() -> None:
Expand Down
Loading