-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(api): add module emulation #7353
Changes from all commits
c44fb41
35bbcc9
f86a6ec
5babd88
e2a8a17
e49cf8c
817cd05
530471f
9a5eb16
72148b2
7759b0f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -190,6 +190,8 @@ async def enter_programming_mode(self): | |
|
||
|
||
class TCPoller(threading.Thread): | ||
POLLING_FD_PATH = '/var/run/' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved here in order be able to patch in tests. Otherwise need root permission. |
||
|
||
def __init__(self, port, interrupt_callback, temp_status_callback, | ||
lid_status_callback, lid_temp_status_callback): | ||
if not select: | ||
|
@@ -206,14 +208,16 @@ def __init__(self, port, interrupt_callback, temp_status_callback, | |
# Note: the options and order of operations for opening file | ||
# descriptors is very specific. For more info, see: | ||
# http://pubs.opengroup.org/onlinepubs/007908799/xsh/open.html | ||
self._send_path = '/var/run/tc_send_fifo_{}'.format(hash(self)) | ||
self._send_path = os.path.join( | ||
self.POLLING_FD_PATH, f"tc_send_fifo_{hash(self)}") | ||
os.mkfifo(self._send_path) | ||
send_read_fd = os.open( | ||
self._send_path, flags=os.O_RDONLY | os.O_NONBLOCK) | ||
self._send_read_file = os.fdopen(send_read_fd, 'rb') | ||
self._send_write_fd = open(self._send_path, 'wb', buffering=0) | ||
|
||
self._halt_path = '/var/run/tc_halt_fifo_{}'.format(hash(self)) | ||
self._halt_path = os.path.join( | ||
self.POLLING_FD_PATH, f"tc_halt_fifo_{hash(self)}") | ||
os.mkfifo(self._halt_path) | ||
halt_read_fd = os.open( | ||
self._halt_path, flags=os.O_RDONLY | os.O_NONBLOCK) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import asyncio | ||
import re | ||
import logging | ||
|
||
from opentrons.hardware_control.emulation.magdeck import MagDeckEmulator | ||
from opentrons.hardware_control.emulation.tempdeck import TempDeckEmulator | ||
from opentrons.hardware_control.emulation.thermocycler import ThermocyclerEmulator | ||
from opentrons.hardware_control.emulation.smoothie import SmoothieEmulator | ||
|
||
from .command_processor import CommandProcessor | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
SMOOTHIE_PORT = 9996 | ||
THERMOCYCLER_PORT = 9997 | ||
TEMPDECK_PORT = 9998 | ||
MAGDECK_PORT = 9999 | ||
|
||
|
||
LINE_REGEX = re.compile(r"(\S+)(.+)") | ||
"""Split the line into command and payload. | ||
|
||
TODO AL 20210222 This regex is a very naive approach and should be revisited | ||
if we are going to expand emulator support. | ||
We can use a regex like "[A-Z][0-9]+\\.*[0-9]*" to match gcodes (i think). And | ||
handle our custom directives like 'version' and 'dfu' separately. | ||
""" | ||
|
||
|
||
class ConnectionHandler: | ||
def __init__(self, command_processor: CommandProcessor, | ||
terminator: bytes = b'\r\n\r\n', | ||
ack: bytes = b'ok\r\nok\r\n'): | ||
"""Construct""" | ||
self._command_processor = command_processor | ||
self._terminator = terminator | ||
self._ack = ack | ||
|
||
async def __call__(self, reader: asyncio.StreamReader, | ||
writer: asyncio.StreamWriter) -> None: | ||
"""New connection callback.""" | ||
logger.debug("Connected.") | ||
while True: | ||
line = await reader.readuntil(self._terminator) | ||
logger.debug("Received: %s", line) | ||
|
||
m = LINE_REGEX.match(line.decode()) | ||
if m: | ||
groups = m.groups() | ||
cmd = groups[0] | ||
payload = groups[1] | ||
logger.debug("Command: %s, Payload: %s", cmd, payload) | ||
response = self._command_processor.handle(cmd, payload) | ||
if response: | ||
response = f'{response}\r\n' | ||
logger.debug("Sending: %s", response) | ||
writer.write(response.encode()) | ||
|
||
writer.write(self._ack) | ||
await writer.drain() | ||
|
||
|
||
async def run_server(host: str, port: int, handler: ConnectionHandler) -> None: | ||
"""Run a server.""" | ||
server = await asyncio.start_server(handler, host, port) | ||
|
||
async with server: | ||
await server.serve_forever() | ||
|
||
|
||
async def run() -> None: | ||
"""Run the module emulators.""" | ||
host = "127.0.0.1" | ||
|
||
await asyncio.gather( | ||
run_server(host=host, | ||
port=MAGDECK_PORT, | ||
handler=ConnectionHandler(MagDeckEmulator())), | ||
run_server(host=host, | ||
port=TEMPDECK_PORT, | ||
handler=ConnectionHandler(TempDeckEmulator())), | ||
run_server(host=host, | ||
port=THERMOCYCLER_PORT, | ||
handler=ConnectionHandler(ThermocyclerEmulator(), | ||
terminator=b'\r\n')), | ||
run_server(host=host, | ||
port=SMOOTHIE_PORT, | ||
handler=ConnectionHandler(SmoothieEmulator())), | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
h = logging.StreamHandler() | ||
h.setLevel(logging.DEBUG) | ||
logger.setLevel(logging.DEBUG) | ||
logger.addHandler(h) | ||
asyncio.run(run()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import Optional | ||
|
||
|
||
class CommandProcessor(ABC): | ||
"""Interface of gcode command processor.""" | ||
|
||
@abstractmethod | ||
def handle(self, cmd: str, payload: str) -> Optional[str]: | ||
"""Handle a command and return a response.""" | ||
... |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import logging | ||
from typing import Optional | ||
from opentrons.drivers.mag_deck.driver import GCODES | ||
from .command_processor import CommandProcessor | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
GCODE_HOME = GCODES['HOME'] | ||
GCODE_MOVE = GCODES['MOVE'] | ||
GCODE_PROBE = GCODES['PROBE_PLATE'] | ||
GCODE_GET_PROBED_DISTANCE = GCODES['GET_PLATE_HEIGHT'] | ||
GCODE_GET_POSITION = GCODES['GET_CURRENT_POSITION'] | ||
GCODE_DEVICE_INFO = GCODES['DEVICE_INFO'] | ||
GCODE_DFU = GCODES['PROGRAMMING_MODE'] | ||
|
||
SERIAL = "fake_serial" | ||
MODEL = "magdeck_emulator" | ||
VERSION = 1 | ||
|
||
|
||
class MagDeckEmulator(CommandProcessor): | ||
"""Magdeck emulator""" | ||
|
||
def __init__(self) -> None: | ||
self.height = 0 | ||
self.position = 0 | ||
|
||
def handle(self, cmd: str, payload: str) -> Optional[str]: | ||
"""Handle a command.""" | ||
logger.info(f"Got command {cmd}") | ||
if cmd == GCODE_HOME: | ||
pass | ||
elif cmd == GCODE_MOVE: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't these set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It definitely should. I deliberately did not implement any command that changes the state of any of the emulators. Looking at the c++ code made think that updating the state is not at all trivial for any of these modules and the smoothie. So left it for another day. |
||
pass | ||
elif cmd == GCODE_PROBE: | ||
pass | ||
elif cmd == GCODE_GET_PROBED_DISTANCE: | ||
return f"height:{self.height}" | ||
elif cmd == GCODE_GET_POSITION: | ||
return f"Z:{self.position}" | ||
elif cmd == GCODE_DEVICE_INFO: | ||
return f"serial:{SERIAL} model:{MODEL} version:{VERSION}" | ||
elif cmd == GCODE_DFU: | ||
pass | ||
return None |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import logging | ||
from typing import Optional | ||
|
||
from opentrons.drivers.smoothie_drivers.driver_3_0 import GCODES | ||
|
||
from .command_processor import CommandProcessor | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
GCODE_HOME = GCODES['HOME'] | ||
GCODE_MOVE = GCODES['MOVE'] | ||
GCODE_DWELL = GCODES['DWELL'] | ||
GCODE_CURRENT_POSITION = GCODES['CURRENT_POSITION'] | ||
GCODE_LIMIT_SWITCH_STATUS = GCODES['LIMIT_SWITCH_STATUS'] | ||
GCODE_PROBE = GCODES['PROBE'] | ||
GCODE_ABSOLUTE_COORDS = GCODES['ABSOLUTE_COORDS'] | ||
GCODE_RELATIVE_COORDS = GCODES['RELATIVE_COORDS'] | ||
GCODE_RESET_FROM_ERROR = GCODES['RESET_FROM_ERROR'] | ||
GCODE_PUSH_SPEED = GCODES['PUSH_SPEED'] | ||
GCODE_POP_SPEED = GCODES['POP_SPEED'] | ||
GCODE_SET_SPEED = GCODES['SET_SPEED'] | ||
GCODE_STEPS_PER_MM = GCODES['STEPS_PER_MM'] | ||
GCODE_READ_INSTRUMENT_ID = GCODES['READ_INSTRUMENT_ID'] | ||
GCODE_WRITE_INSTRUMENT_ID = GCODES['WRITE_INSTRUMENT_ID'] | ||
GCODE_READ_INSTRUMENT_MODEL = GCODES['READ_INSTRUMENT_MODEL'] | ||
GCODE_WRITE_INSTRUMENT_MODEL = GCODES['WRITE_INSTRUMENT_MODEL'] | ||
GCODE_SET_MAX_SPEED = GCODES['SET_MAX_SPEED'] | ||
GCODE_SET_CURRENT = GCODES['SET_CURRENT'] | ||
GCODE_DISENGAGE_MOTOR = GCODES['DISENGAGE_MOTOR'] | ||
GCODE_HOMING_STATUS = GCODES['HOMING_STATUS'] | ||
GCODE_ACCELERATION = GCODES['ACCELERATION'] | ||
GCODE_WAIT = GCODES['WAIT'] | ||
GCODE_retract = 'M365.3' | ||
GCODE_debounce = 'M365.2' | ||
GCODE_max_travel = 'M365.1' | ||
GCODE_home = 'M365.0' | ||
GCODE_version = 'version' | ||
|
||
v = """Build version: EMULATOR, Build date: CURRENT, MCU: NONE, System Clock: NONE""" | ||
|
||
|
||
class SmoothieEmulator(CommandProcessor): | ||
"""Smoothie emulator""" | ||
|
||
def __init__(self) -> None: | ||
self.x = self.y = self.z = self.a = self.b = self.c = 0.00 | ||
|
||
def handle(self, cmd: str, payload: str) -> Optional[str]: | ||
"""Handle a command.""" | ||
logger.info(f"Got command {cmd}") | ||
if GCODE_HOMING_STATUS == cmd: | ||
return "X:0 Y:0 Z:0 A:0 B:0 C:0" | ||
elif GCODE_CURRENT_POSITION == cmd: | ||
return f"{cmd}\r\n\r\nok MCS: X:{self.x} Y:{self.y} " \ | ||
f"Z:{self.z} A:{self.a} B:{self.b} C:{self.c}" | ||
elif GCODE_version == cmd: | ||
return v | ||
return None |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import logging | ||
from typing import Optional | ||
|
||
from opentrons.drivers.temp_deck.driver import GCODES | ||
|
||
from .command_processor import CommandProcessor | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
GCODE_GET_TEMP = GCODES['GET_TEMP'] | ||
GCODE_SET_TEMP = GCODES['SET_TEMP'] | ||
GCODE_DEVICE_INFO = GCODES['DEVICE_INFO'] | ||
GCODE_DISENGAGE = GCODES['DISENGAGE'] | ||
GCODE_DFU = GCODES['PROGRAMMING_MODE'] | ||
|
||
SERIAL = "fake_serial" | ||
MODEL = "temp_emulator" | ||
VERSION = 1 | ||
|
||
|
||
class TempDeckEmulator(CommandProcessor): | ||
"""TempDeck emulator""" | ||
|
||
def __init__(self) -> None: | ||
self.target_temp = 0 | ||
self.current_temp = 0 | ||
|
||
def handle(self, cmd: str, payload: str) -> Optional[str]: | ||
"""Handle a command.""" | ||
logger.info(f"Got command {cmd}") | ||
if cmd == GCODE_GET_TEMP: | ||
return f"T:{self.target_temp} C:{self.current_temp}" | ||
elif cmd == GCODE_SET_TEMP: | ||
pass | ||
elif cmd == GCODE_DISENGAGE: | ||
pass | ||
elif cmd == GCODE_DEVICE_INFO: | ||
return f"serial:{SERIAL} model:{MODEL} version:{VERSION}" | ||
elif cmd == GCODE_DFU: | ||
pass | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under the covers, if the
url
doesn't specify a protocol (ie "socket://") pyserial will Serial.