-
Notifications
You must be signed in to change notification settings - Fork 178
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(api): add module emulation (#7353)
* magdeck * tempdeck * thermocycler * app * basic integration tests for modules. * a little clean up and file renaming. * skip thermocycler tests on windows. * add very basic smoothie * add comment about emulation fixture. * add comment about naive regular expression. * fix broken tests
- Loading branch information
amitlissack
authored
Mar 17, 2021
1 parent
3b7830d
commit 78869dc
Showing
15 changed files
with
480 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import asyncio | ||
import re | ||
import logging | ||
|
||
from opentrons.hardware_control.emulation.magdeck import MagDeckEmulator | ||
from opentrons.hardware_control.emulation.tempdeck import TempDeckEmulator | ||
from opentrons.hardware_control.emulation.thermocycler import ThermocyclerEmulator | ||
from opentrons.hardware_control.emulation.smoothie import SmoothieEmulator | ||
|
||
from .command_processor import CommandProcessor | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
SMOOTHIE_PORT = 9996 | ||
THERMOCYCLER_PORT = 9997 | ||
TEMPDECK_PORT = 9998 | ||
MAGDECK_PORT = 9999 | ||
|
||
|
||
LINE_REGEX = re.compile(r"(\S+)(.+)") | ||
"""Split the line into command and payload. | ||
TODO AL 20210222 This regex is a very naive approach and should be revisited | ||
if we are going to expand emulator support. | ||
We can use a regex like "[A-Z][0-9]+\\.*[0-9]*" to match gcodes (i think). And | ||
handle our custom directives like 'version' and 'dfu' separately. | ||
""" | ||
|
||
|
||
class ConnectionHandler: | ||
def __init__(self, command_processor: CommandProcessor, | ||
terminator: bytes = b'\r\n\r\n', | ||
ack: bytes = b'ok\r\nok\r\n'): | ||
"""Construct""" | ||
self._command_processor = command_processor | ||
self._terminator = terminator | ||
self._ack = ack | ||
|
||
async def __call__(self, reader: asyncio.StreamReader, | ||
writer: asyncio.StreamWriter) -> None: | ||
"""New connection callback.""" | ||
logger.debug("Connected.") | ||
while True: | ||
line = await reader.readuntil(self._terminator) | ||
logger.debug("Received: %s", line) | ||
|
||
m = LINE_REGEX.match(line.decode()) | ||
if m: | ||
groups = m.groups() | ||
cmd = groups[0] | ||
payload = groups[1] | ||
logger.debug("Command: %s, Payload: %s", cmd, payload) | ||
response = self._command_processor.handle(cmd, payload) | ||
if response: | ||
response = f'{response}\r\n' | ||
logger.debug("Sending: %s", response) | ||
writer.write(response.encode()) | ||
|
||
writer.write(self._ack) | ||
await writer.drain() | ||
|
||
|
||
async def run_server(host: str, port: int, handler: ConnectionHandler) -> None: | ||
"""Run a server.""" | ||
server = await asyncio.start_server(handler, host, port) | ||
|
||
async with server: | ||
await server.serve_forever() | ||
|
||
|
||
async def run() -> None: | ||
"""Run the module emulators.""" | ||
host = "127.0.0.1" | ||
|
||
await asyncio.gather( | ||
run_server(host=host, | ||
port=MAGDECK_PORT, | ||
handler=ConnectionHandler(MagDeckEmulator())), | ||
run_server(host=host, | ||
port=TEMPDECK_PORT, | ||
handler=ConnectionHandler(TempDeckEmulator())), | ||
run_server(host=host, | ||
port=THERMOCYCLER_PORT, | ||
handler=ConnectionHandler(ThermocyclerEmulator(), | ||
terminator=b'\r\n')), | ||
run_server(host=host, | ||
port=SMOOTHIE_PORT, | ||
handler=ConnectionHandler(SmoothieEmulator())), | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
h = logging.StreamHandler() | ||
h.setLevel(logging.DEBUG) | ||
logger.setLevel(logging.DEBUG) | ||
logger.addHandler(h) | ||
asyncio.run(run()) |
11 changes: 11 additions & 0 deletions
11
api/src/opentrons/hardware_control/emulation/command_processor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import Optional | ||
|
||
|
||
class CommandProcessor(ABC): | ||
"""Interface of gcode command processor.""" | ||
|
||
@abstractmethod | ||
def handle(self, cmd: str, payload: str) -> Optional[str]: | ||
"""Handle a command and return a response.""" | ||
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import logging | ||
from typing import Optional | ||
from opentrons.drivers.mag_deck.driver import GCODES | ||
from .command_processor import CommandProcessor | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
GCODE_HOME = GCODES['HOME'] | ||
GCODE_MOVE = GCODES['MOVE'] | ||
GCODE_PROBE = GCODES['PROBE_PLATE'] | ||
GCODE_GET_PROBED_DISTANCE = GCODES['GET_PLATE_HEIGHT'] | ||
GCODE_GET_POSITION = GCODES['GET_CURRENT_POSITION'] | ||
GCODE_DEVICE_INFO = GCODES['DEVICE_INFO'] | ||
GCODE_DFU = GCODES['PROGRAMMING_MODE'] | ||
|
||
SERIAL = "fake_serial" | ||
MODEL = "magdeck_emulator" | ||
VERSION = 1 | ||
|
||
|
||
class MagDeckEmulator(CommandProcessor): | ||
"""Magdeck emulator""" | ||
|
||
def __init__(self) -> None: | ||
self.height = 0 | ||
self.position = 0 | ||
|
||
def handle(self, cmd: str, payload: str) -> Optional[str]: | ||
"""Handle a command.""" | ||
logger.info(f"Got command {cmd}") | ||
if cmd == GCODE_HOME: | ||
pass | ||
elif cmd == GCODE_MOVE: | ||
pass | ||
elif cmd == GCODE_PROBE: | ||
pass | ||
elif cmd == GCODE_GET_PROBED_DISTANCE: | ||
return f"height:{self.height}" | ||
elif cmd == GCODE_GET_POSITION: | ||
return f"Z:{self.position}" | ||
elif cmd == GCODE_DEVICE_INFO: | ||
return f"serial:{SERIAL} model:{MODEL} version:{VERSION}" | ||
elif cmd == GCODE_DFU: | ||
pass | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import logging | ||
from typing import Optional | ||
|
||
from opentrons.drivers.smoothie_drivers.driver_3_0 import GCODES | ||
|
||
from .command_processor import CommandProcessor | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
GCODE_HOME = GCODES['HOME'] | ||
GCODE_MOVE = GCODES['MOVE'] | ||
GCODE_DWELL = GCODES['DWELL'] | ||
GCODE_CURRENT_POSITION = GCODES['CURRENT_POSITION'] | ||
GCODE_LIMIT_SWITCH_STATUS = GCODES['LIMIT_SWITCH_STATUS'] | ||
GCODE_PROBE = GCODES['PROBE'] | ||
GCODE_ABSOLUTE_COORDS = GCODES['ABSOLUTE_COORDS'] | ||
GCODE_RELATIVE_COORDS = GCODES['RELATIVE_COORDS'] | ||
GCODE_RESET_FROM_ERROR = GCODES['RESET_FROM_ERROR'] | ||
GCODE_PUSH_SPEED = GCODES['PUSH_SPEED'] | ||
GCODE_POP_SPEED = GCODES['POP_SPEED'] | ||
GCODE_SET_SPEED = GCODES['SET_SPEED'] | ||
GCODE_STEPS_PER_MM = GCODES['STEPS_PER_MM'] | ||
GCODE_READ_INSTRUMENT_ID = GCODES['READ_INSTRUMENT_ID'] | ||
GCODE_WRITE_INSTRUMENT_ID = GCODES['WRITE_INSTRUMENT_ID'] | ||
GCODE_READ_INSTRUMENT_MODEL = GCODES['READ_INSTRUMENT_MODEL'] | ||
GCODE_WRITE_INSTRUMENT_MODEL = GCODES['WRITE_INSTRUMENT_MODEL'] | ||
GCODE_SET_MAX_SPEED = GCODES['SET_MAX_SPEED'] | ||
GCODE_SET_CURRENT = GCODES['SET_CURRENT'] | ||
GCODE_DISENGAGE_MOTOR = GCODES['DISENGAGE_MOTOR'] | ||
GCODE_HOMING_STATUS = GCODES['HOMING_STATUS'] | ||
GCODE_ACCELERATION = GCODES['ACCELERATION'] | ||
GCODE_WAIT = GCODES['WAIT'] | ||
GCODE_retract = 'M365.3' | ||
GCODE_debounce = 'M365.2' | ||
GCODE_max_travel = 'M365.1' | ||
GCODE_home = 'M365.0' | ||
GCODE_version = 'version' | ||
|
||
v = """Build version: EMULATOR, Build date: CURRENT, MCU: NONE, System Clock: NONE""" | ||
|
||
|
||
class SmoothieEmulator(CommandProcessor): | ||
"""Smoothie emulator""" | ||
|
||
def __init__(self) -> None: | ||
self.x = self.y = self.z = self.a = self.b = self.c = 0.00 | ||
|
||
def handle(self, cmd: str, payload: str) -> Optional[str]: | ||
"""Handle a command.""" | ||
logger.info(f"Got command {cmd}") | ||
if GCODE_HOMING_STATUS == cmd: | ||
return "X:0 Y:0 Z:0 A:0 B:0 C:0" | ||
elif GCODE_CURRENT_POSITION == cmd: | ||
return f"{cmd}\r\n\r\nok MCS: X:{self.x} Y:{self.y} " \ | ||
f"Z:{self.z} A:{self.a} B:{self.b} C:{self.c}" | ||
elif GCODE_version == cmd: | ||
return v | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import logging | ||
from typing import Optional | ||
|
||
from opentrons.drivers.temp_deck.driver import GCODES | ||
|
||
from .command_processor import CommandProcessor | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
GCODE_GET_TEMP = GCODES['GET_TEMP'] | ||
GCODE_SET_TEMP = GCODES['SET_TEMP'] | ||
GCODE_DEVICE_INFO = GCODES['DEVICE_INFO'] | ||
GCODE_DISENGAGE = GCODES['DISENGAGE'] | ||
GCODE_DFU = GCODES['PROGRAMMING_MODE'] | ||
|
||
SERIAL = "fake_serial" | ||
MODEL = "temp_emulator" | ||
VERSION = 1 | ||
|
||
|
||
class TempDeckEmulator(CommandProcessor): | ||
"""TempDeck emulator""" | ||
|
||
def __init__(self) -> None: | ||
self.target_temp = 0 | ||
self.current_temp = 0 | ||
|
||
def handle(self, cmd: str, payload: str) -> Optional[str]: | ||
"""Handle a command.""" | ||
logger.info(f"Got command {cmd}") | ||
if cmd == GCODE_GET_TEMP: | ||
return f"T:{self.target_temp} C:{self.current_temp}" | ||
elif cmd == GCODE_SET_TEMP: | ||
pass | ||
elif cmd == GCODE_DISENGAGE: | ||
pass | ||
elif cmd == GCODE_DEVICE_INFO: | ||
return f"serial:{SERIAL} model:{MODEL} version:{VERSION}" | ||
elif cmd == GCODE_DFU: | ||
pass | ||
return None |
Oops, something went wrong.