diff --git a/api/src/opentrons/drivers/serial_communication.py b/api/src/opentrons/drivers/serial_communication.py index ef7ebd04619..d32919a877c 100755 --- a/api/src/opentrons/drivers/serial_communication.py +++ b/api/src/opentrons/drivers/serial_communication.py @@ -15,7 +15,7 @@ class SerialNoResponse(Exception): def get_ports_by_name(device_name): - '''Returns all serial devices with a given name''' + """Returns all serial devices with a given name""" filtered_devices = filter( lambda device: device_name in device[1], list_ports.comports() @@ -25,7 +25,7 @@ def get_ports_by_name(device_name): def get_port_by_VID(vid): - '''Returns first serial device with a given VID''' + """Returns first serial device with a given VID""" for d in list_ports.comports(): if d.vid == vid: return d[0] @@ -33,7 +33,7 @@ def get_port_by_VID(vid): @contextlib.contextmanager def serial_with_temp_timeout(serial_connection, timeout): - '''Implements a temporary timeout for a serial connection''' + """Implements a temporary timeout for a serial connection""" saved_timeout = serial_connection.timeout if timeout is not None: serial_connection.timeout = timeout @@ -56,10 +56,10 @@ def clear_buffer(serial_connection): def _write_to_device_and_return(cmd, ack, device_connection, tag=None): - '''Writes to a serial device. + """Writes to a serial device. - Formats command - Wait for ack return - - return parsed response''' + - return parsed response""" if not tag: tag = device_connection.port @@ -82,8 +82,8 @@ def _write_to_device_and_return(cmd, ack, device_connection, tag=None): def _connect(port_name, baudrate): - ser = serial.Serial( - port=port_name, + ser = serial.serial_for_url( + url=port_name, baudrate=baudrate, timeout=DEFAULT_SERIAL_TIMEOUT ) @@ -92,7 +92,7 @@ def _connect(port_name, baudrate): def _attempt_command_recovery(command, ack, serial_conn, tag=None): - '''Recovery after following a failed write_and_return() atempt''' + """Recovery after following a failed write_and_return() attempt""" if not tag: tag = serial_conn.port with serial_with_temp_timeout(serial_conn, RECOVERY_TIMEOUT) as device: @@ -109,7 +109,7 @@ def _attempt_command_recovery(command, ack, serial_conn, tag=None): def write_and_return( command, ack, serial_connection, timeout=DEFAULT_WRITE_TIMEOUT, tag=None): - '''Write a command and return the response''' + """Write a command and return the response""" clear_buffer(serial_connection) with serial_with_temp_timeout( serial_connection, timeout) as device_connection: @@ -119,12 +119,12 @@ def write_and_return( def connect(device_name=None, port=None, baudrate=115200): - ''' + """ Creates a serial connection :param device_name: defaults to 'Smoothieboard' :param baudrate: integer frequency for serial communication :return: serial.Serial connection - ''' + """ if not port: port = get_ports_by_name(device_name=device_name)[0] log.debug("Device name: {}, Port: {}".format(device_name, port)) diff --git a/api/src/opentrons/drivers/thermocycler/driver.py b/api/src/opentrons/drivers/thermocycler/driver.py index 758783d8fed..2e99a5d6f3f 100644 --- a/api/src/opentrons/drivers/thermocycler/driver.py +++ b/api/src/opentrons/drivers/thermocycler/driver.py @@ -190,6 +190,8 @@ async def enter_programming_mode(self): class TCPoller(threading.Thread): + POLLING_FD_PATH = '/var/run/' + def __init__(self, port, interrupt_callback, temp_status_callback, lid_status_callback, lid_temp_status_callback): if not select: @@ -206,14 +208,16 @@ def __init__(self, port, interrupt_callback, temp_status_callback, # Note: the options and order of operations for opening file # descriptors is very specific. For more info, see: # http://pubs.opengroup.org/onlinepubs/007908799/xsh/open.html - self._send_path = '/var/run/tc_send_fifo_{}'.format(hash(self)) + self._send_path = os.path.join( + self.POLLING_FD_PATH, f"tc_send_fifo_{hash(self)}") os.mkfifo(self._send_path) send_read_fd = os.open( self._send_path, flags=os.O_RDONLY | os.O_NONBLOCK) self._send_read_file = os.fdopen(send_read_fd, 'rb') self._send_write_fd = open(self._send_path, 'wb', buffering=0) - self._halt_path = '/var/run/tc_halt_fifo_{}'.format(hash(self)) + self._halt_path = os.path.join( + self.POLLING_FD_PATH, f"tc_halt_fifo_{hash(self)}") os.mkfifo(self._halt_path) halt_read_fd = os.open( self._halt_path, flags=os.O_RDONLY | os.O_NONBLOCK) diff --git a/api/src/opentrons/hardware_control/emulation/__init__.py b/api/src/opentrons/hardware_control/emulation/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/api/src/opentrons/hardware_control/emulation/app.py b/api/src/opentrons/hardware_control/emulation/app.py new file mode 100644 index 00000000000..a2084b33f84 --- /dev/null +++ b/api/src/opentrons/hardware_control/emulation/app.py @@ -0,0 +1,98 @@ +import asyncio +import re +import logging + +from opentrons.hardware_control.emulation.magdeck import MagDeckEmulator +from opentrons.hardware_control.emulation.tempdeck import TempDeckEmulator +from opentrons.hardware_control.emulation.thermocycler import ThermocyclerEmulator +from opentrons.hardware_control.emulation.smoothie import SmoothieEmulator + +from .command_processor import CommandProcessor + +logger = logging.getLogger(__name__) + + +SMOOTHIE_PORT = 9996 +THERMOCYCLER_PORT = 9997 +TEMPDECK_PORT = 9998 +MAGDECK_PORT = 9999 + + +LINE_REGEX = re.compile(r"(\S+)(.+)") +"""Split the line into command and payload. + +TODO AL 20210222 This regex is a very naive approach and should be revisited + if we are going to expand emulator support. +We can use a regex like "[A-Z][0-9]+\\.*[0-9]*" to match gcodes (i think). And +handle our custom directives like 'version' and 'dfu' separately. +""" + + +class ConnectionHandler: + def __init__(self, command_processor: CommandProcessor, + terminator: bytes = b'\r\n\r\n', + ack: bytes = b'ok\r\nok\r\n'): + """Construct""" + self._command_processor = command_processor + self._terminator = terminator + self._ack = ack + + async def __call__(self, reader: asyncio.StreamReader, + writer: asyncio.StreamWriter) -> None: + """New connection callback.""" + logger.debug("Connected.") + while True: + line = await reader.readuntil(self._terminator) + logger.debug("Received: %s", line) + + m = LINE_REGEX.match(line.decode()) + if m: + groups = m.groups() + cmd = groups[0] + payload = groups[1] + logger.debug("Command: %s, Payload: %s", cmd, payload) + response = self._command_processor.handle(cmd, payload) + if response: + response = f'{response}\r\n' + logger.debug("Sending: %s", response) + writer.write(response.encode()) + + writer.write(self._ack) + await writer.drain() + + +async def run_server(host: str, port: int, handler: ConnectionHandler) -> None: + """Run a server.""" + server = await asyncio.start_server(handler, host, port) + + async with server: + await server.serve_forever() + + +async def run() -> None: + """Run the module emulators.""" + host = "127.0.0.1" + + await asyncio.gather( + run_server(host=host, + port=MAGDECK_PORT, + handler=ConnectionHandler(MagDeckEmulator())), + run_server(host=host, + port=TEMPDECK_PORT, + handler=ConnectionHandler(TempDeckEmulator())), + run_server(host=host, + port=THERMOCYCLER_PORT, + handler=ConnectionHandler(ThermocyclerEmulator(), + terminator=b'\r\n')), + run_server(host=host, + port=SMOOTHIE_PORT, + handler=ConnectionHandler(SmoothieEmulator())), + ) + + +if __name__ == "__main__": + h = logging.StreamHandler() + h.setLevel(logging.DEBUG) + logger.setLevel(logging.DEBUG) + logger.addHandler(h) + asyncio.run(run()) diff --git a/api/src/opentrons/hardware_control/emulation/command_processor.py b/api/src/opentrons/hardware_control/emulation/command_processor.py new file mode 100644 index 00000000000..c72bbdcda18 --- /dev/null +++ b/api/src/opentrons/hardware_control/emulation/command_processor.py @@ -0,0 +1,11 @@ +from abc import ABC, abstractmethod +from typing import Optional + + +class CommandProcessor(ABC): + """Interface of gcode command processor.""" + + @abstractmethod + def handle(self, cmd: str, payload: str) -> Optional[str]: + """Handle a command and return a response.""" + ... diff --git a/api/src/opentrons/hardware_control/emulation/magdeck.py b/api/src/opentrons/hardware_control/emulation/magdeck.py new file mode 100644 index 00000000000..3c898ac18ec --- /dev/null +++ b/api/src/opentrons/hardware_control/emulation/magdeck.py @@ -0,0 +1,45 @@ +import logging +from typing import Optional +from opentrons.drivers.mag_deck.driver import GCODES +from .command_processor import CommandProcessor + +logger = logging.getLogger(__name__) + +GCODE_HOME = GCODES['HOME'] +GCODE_MOVE = GCODES['MOVE'] +GCODE_PROBE = GCODES['PROBE_PLATE'] +GCODE_GET_PROBED_DISTANCE = GCODES['GET_PLATE_HEIGHT'] +GCODE_GET_POSITION = GCODES['GET_CURRENT_POSITION'] +GCODE_DEVICE_INFO = GCODES['DEVICE_INFO'] +GCODE_DFU = GCODES['PROGRAMMING_MODE'] + +SERIAL = "fake_serial" +MODEL = "magdeck_emulator" +VERSION = 1 + + +class MagDeckEmulator(CommandProcessor): + """Magdeck emulator""" + + def __init__(self) -> None: + self.height = 0 + self.position = 0 + + def handle(self, cmd: str, payload: str) -> Optional[str]: + """Handle a command.""" + logger.info(f"Got command {cmd}") + if cmd == GCODE_HOME: + pass + elif cmd == GCODE_MOVE: + pass + elif cmd == GCODE_PROBE: + pass + elif cmd == GCODE_GET_PROBED_DISTANCE: + return f"height:{self.height}" + elif cmd == GCODE_GET_POSITION: + return f"Z:{self.position}" + elif cmd == GCODE_DEVICE_INFO: + return f"serial:{SERIAL} model:{MODEL} version:{VERSION}" + elif cmd == GCODE_DFU: + pass + return None diff --git a/api/src/opentrons/hardware_control/emulation/smoothie.py b/api/src/opentrons/hardware_control/emulation/smoothie.py new file mode 100644 index 00000000000..ef71ab072de --- /dev/null +++ b/api/src/opentrons/hardware_control/emulation/smoothie.py @@ -0,0 +1,58 @@ +import logging +from typing import Optional + +from opentrons.drivers.smoothie_drivers.driver_3_0 import GCODES + +from .command_processor import CommandProcessor + +logger = logging.getLogger(__name__) + +GCODE_HOME = GCODES['HOME'] +GCODE_MOVE = GCODES['MOVE'] +GCODE_DWELL = GCODES['DWELL'] +GCODE_CURRENT_POSITION = GCODES['CURRENT_POSITION'] +GCODE_LIMIT_SWITCH_STATUS = GCODES['LIMIT_SWITCH_STATUS'] +GCODE_PROBE = GCODES['PROBE'] +GCODE_ABSOLUTE_COORDS = GCODES['ABSOLUTE_COORDS'] +GCODE_RELATIVE_COORDS = GCODES['RELATIVE_COORDS'] +GCODE_RESET_FROM_ERROR = GCODES['RESET_FROM_ERROR'] +GCODE_PUSH_SPEED = GCODES['PUSH_SPEED'] +GCODE_POP_SPEED = GCODES['POP_SPEED'] +GCODE_SET_SPEED = GCODES['SET_SPEED'] +GCODE_STEPS_PER_MM = GCODES['STEPS_PER_MM'] +GCODE_READ_INSTRUMENT_ID = GCODES['READ_INSTRUMENT_ID'] +GCODE_WRITE_INSTRUMENT_ID = GCODES['WRITE_INSTRUMENT_ID'] +GCODE_READ_INSTRUMENT_MODEL = GCODES['READ_INSTRUMENT_MODEL'] +GCODE_WRITE_INSTRUMENT_MODEL = GCODES['WRITE_INSTRUMENT_MODEL'] +GCODE_SET_MAX_SPEED = GCODES['SET_MAX_SPEED'] +GCODE_SET_CURRENT = GCODES['SET_CURRENT'] +GCODE_DISENGAGE_MOTOR = GCODES['DISENGAGE_MOTOR'] +GCODE_HOMING_STATUS = GCODES['HOMING_STATUS'] +GCODE_ACCELERATION = GCODES['ACCELERATION'] +GCODE_WAIT = GCODES['WAIT'] +GCODE_retract = 'M365.3' +GCODE_debounce = 'M365.2' +GCODE_max_travel = 'M365.1' +GCODE_home = 'M365.0' +GCODE_version = 'version' + +v = """Build version: EMULATOR, Build date: CURRENT, MCU: NONE, System Clock: NONE""" + + +class SmoothieEmulator(CommandProcessor): + """Smoothie emulator""" + + def __init__(self) -> None: + self.x = self.y = self.z = self.a = self.b = self.c = 0.00 + + def handle(self, cmd: str, payload: str) -> Optional[str]: + """Handle a command.""" + logger.info(f"Got command {cmd}") + if GCODE_HOMING_STATUS == cmd: + return "X:0 Y:0 Z:0 A:0 B:0 C:0" + elif GCODE_CURRENT_POSITION == cmd: + return f"{cmd}\r\n\r\nok MCS: X:{self.x} Y:{self.y} " \ + f"Z:{self.z} A:{self.a} B:{self.b} C:{self.c}" + elif GCODE_version == cmd: + return v + return None diff --git a/api/src/opentrons/hardware_control/emulation/tempdeck.py b/api/src/opentrons/hardware_control/emulation/tempdeck.py new file mode 100644 index 00000000000..9f20853df39 --- /dev/null +++ b/api/src/opentrons/hardware_control/emulation/tempdeck.py @@ -0,0 +1,41 @@ +import logging +from typing import Optional + +from opentrons.drivers.temp_deck.driver import GCODES + +from .command_processor import CommandProcessor + +logger = logging.getLogger(__name__) + +GCODE_GET_TEMP = GCODES['GET_TEMP'] +GCODE_SET_TEMP = GCODES['SET_TEMP'] +GCODE_DEVICE_INFO = GCODES['DEVICE_INFO'] +GCODE_DISENGAGE = GCODES['DISENGAGE'] +GCODE_DFU = GCODES['PROGRAMMING_MODE'] + +SERIAL = "fake_serial" +MODEL = "temp_emulator" +VERSION = 1 + + +class TempDeckEmulator(CommandProcessor): + """TempDeck emulator""" + + def __init__(self) -> None: + self.target_temp = 0 + self.current_temp = 0 + + def handle(self, cmd: str, payload: str) -> Optional[str]: + """Handle a command.""" + logger.info(f"Got command {cmd}") + if cmd == GCODE_GET_TEMP: + return f"T:{self.target_temp} C:{self.current_temp}" + elif cmd == GCODE_SET_TEMP: + pass + elif cmd == GCODE_DISENGAGE: + pass + elif cmd == GCODE_DEVICE_INFO: + return f"serial:{SERIAL} model:{MODEL} version:{VERSION}" + elif cmd == GCODE_DFU: + pass + return None diff --git a/api/src/opentrons/hardware_control/emulation/thermocycler.py b/api/src/opentrons/hardware_control/emulation/thermocycler.py new file mode 100644 index 00000000000..489c743cd3d --- /dev/null +++ b/api/src/opentrons/hardware_control/emulation/thermocycler.py @@ -0,0 +1,83 @@ +import logging +from typing import Optional +import enum +from opentrons.drivers.thermocycler.driver import GCODES +from .command_processor import CommandProcessor + +logger = logging.getLogger(__name__) + +GCODE_OPEN_LID = GCODES['OPEN_LID'] +GCODE_CLOSE_LID = GCODES['CLOSE_LID'] +GCODE_GET_LID_STATUS = GCODES['GET_LID_STATUS'] +GCODE_SET_LID_TEMP = GCODES['SET_LID_TEMP'] +GCODE_GET_LID_TEMP = GCODES['GET_LID_TEMP'] +GCODE_EDIT_PID_PARAMS = GCODES['EDIT_PID_PARAMS'] +GCODE_SET_PLATE_TEMP = GCODES['SET_PLATE_TEMP'] +GCODE_GET_PLATE_TEMP = GCODES['GET_PLATE_TEMP'] +GCODE_SET_RAMP_RATE = GCODES['SET_RAMP_RATE'] +GCODE_DEACTIVATE_ALL = GCODES['DEACTIVATE_ALL'] +GCODE_DEACTIVATE_LID = GCODES['DEACTIVATE_LID'] +GCODE_DEACTIVATE_BLOCK = GCODES['DEACTIVATE_BLOCK'] +GCODE_DEVICE_INFO = GCODES['DEVICE_INFO'] + +SERIAL = "fake_serial" +MODEL = "thermocycler_emulator" +VERSION = 1 + + +class LidStatus(str, enum.Enum): + IN_BETWEEN = 'in_between' + CLOSED = 'closed' + OPEN = 'open' + UNKNOWN = 'unknown' + MAX = 'max' + + +class ThermocyclerEmulator(CommandProcessor): + """Thermocycler emulator""" + + def __init__(self) -> None: + self.target_temp = 0 + self.current_temp = 0 + self.lid_status = LidStatus.CLOSED + self.at_target = None + self.total_hold_time = None + self.time_remaining = None + + def handle(self, cmd: str, payload: str) -> Optional[str]: # noqa(C901) + """ + Handle a command. + + TODO: AL 20210218 create dispatch map annd remove 'noqa(C901)' + """ + logger.info(f"Got command {cmd}") + if cmd == GCODE_OPEN_LID: + pass + elif cmd == GCODE_CLOSE_LID: + pass + elif cmd == GCODE_GET_LID_STATUS: + return f"Lid:{self.lid_status}" + elif cmd == GCODE_SET_LID_TEMP: + pass + elif cmd == GCODE_GET_LID_TEMP: + return f"T:{self.target_temp} C:{self.current_temp} " \ + f"H:none Total_H:none At_target?:0" + elif cmd == GCODE_EDIT_PID_PARAMS: + pass + elif cmd == GCODE_SET_PLATE_TEMP: + pass + elif cmd == GCODE_GET_PLATE_TEMP: + return f"T:{self.target_temp} C:{self.current_temp} " \ + f"H:{self.time_remaining} Total_H:{self.total_hold_time} " \ + f"At_target?:{self.at_target}" + elif cmd == GCODE_SET_RAMP_RATE: + pass + elif cmd == GCODE_DEACTIVATE_ALL: + pass + elif cmd == GCODE_DEACTIVATE_LID: + pass + elif cmd == GCODE_DEACTIVATE_BLOCK: + pass + elif cmd == GCODE_DEVICE_INFO: + return f"serial:{SERIAL} model:{MODEL} version:{VERSION}" + return None diff --git a/api/tests/opentrons/hardware_control/integration/__init__.py b/api/tests/opentrons/hardware_control/integration/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/api/tests/opentrons/hardware_control/integration/conftest.py b/api/tests/opentrons/hardware_control/integration/conftest.py new file mode 100644 index 00000000000..5e7a74f82a7 --- /dev/null +++ b/api/tests/opentrons/hardware_control/integration/conftest.py @@ -0,0 +1,19 @@ +import pytest +import threading +import asyncio +from opentrons.hardware_control.emulation.app import run + + +@pytest.fixture(scope="session") +def emulation_app(): + """Run the emulators""" + def runit(): + asyncio.run(run()) + # TODO 20210219 + # The emulators must be run in a separate thread because our serial + # drivers block the main thread. Remove this thread when that is no + # longer true. + t = threading.Thread(target=runit) + t.daemon = True + t.start() + yield t diff --git a/api/tests/opentrons/hardware_control/integration/test_magdeck.py b/api/tests/opentrons/hardware_control/integration/test_magdeck.py new file mode 100644 index 00000000000..bdff3a7b004 --- /dev/null +++ b/api/tests/opentrons/hardware_control/integration/test_magdeck.py @@ -0,0 +1,23 @@ +import asyncio + +import pytest +from mock import AsyncMock +from opentrons.drivers.rpi_drivers.types import USBPort +from opentrons.hardware_control.emulation.app import MAGDECK_PORT +from opentrons.hardware_control.modules import MagDeck + + +@pytest.fixture +async def magdeck(loop: asyncio.BaseEventLoop, emulation_app) -> MagDeck: + td = await MagDeck.build( + port=f"socket://127.0.0.1:{MAGDECK_PORT}", + execution_manager=AsyncMock(), + usb_port=USBPort(name="", port_number=1, sub_names=[], device_path="", hub=1), + loop=loop + ) + yield td + + +def test_device_info(magdeck): + assert {'model': 'magdeck_emulator', 'serial': 'fake_serial', + 'version': '1'} == magdeck.device_info diff --git a/api/tests/opentrons/hardware_control/integration/test_smoothie.py b/api/tests/opentrons/hardware_control/integration/test_smoothie.py new file mode 100644 index 00000000000..c370173e241 --- /dev/null +++ b/api/tests/opentrons/hardware_control/integration/test_smoothie.py @@ -0,0 +1,19 @@ +import asyncio + +import pytest +from opentrons.config.robot_configs import build_config +from opentrons.drivers.smoothie_drivers.driver_3_0 import SmoothieDriver_3_0_0 +from opentrons.hardware_control.emulation.app import SMOOTHIE_PORT + + +@pytest.fixture +def smoothie(loop: asyncio.BaseEventLoop, emulation_app) -> SmoothieDriver_3_0_0: + conf = build_config({}) + s = SmoothieDriver_3_0_0(config=conf) + s.connect(f"socket://127.0.0.1:{SMOOTHIE_PORT}") + yield s + s.disconnect() + + +def test_get_fw_version(smoothie: SmoothieDriver_3_0_0): + assert smoothie.get_fw_version() == 'EMULATOR' diff --git a/api/tests/opentrons/hardware_control/integration/test_tempdeck.py b/api/tests/opentrons/hardware_control/integration/test_tempdeck.py new file mode 100644 index 00000000000..63811a95d1b --- /dev/null +++ b/api/tests/opentrons/hardware_control/integration/test_tempdeck.py @@ -0,0 +1,24 @@ +import asyncio + +import pytest +from mock import AsyncMock +from opentrons.drivers.rpi_drivers.types import USBPort +from opentrons.hardware_control.emulation.app import TEMPDECK_PORT +from opentrons.hardware_control.modules import TempDeck + + +@pytest.fixture +async def tempdeck(loop: asyncio.BaseEventLoop, emulation_app) -> TempDeck: + td = await TempDeck.build( + port=f"socket://127.0.0.1:{TEMPDECK_PORT}", + execution_manager=AsyncMock(), + usb_port=USBPort(name="", port_number=1, sub_names=[], device_path="", + hub=1), + loop=loop + ) + yield td + + +def test_device_info(tempdeck): + assert {'model': 'temp_emulator', 'serial': 'fake_serial', + 'version': '1'} == tempdeck.device_info diff --git a/api/tests/opentrons/hardware_control/integration/test_thermocycler.py b/api/tests/opentrons/hardware_control/integration/test_thermocycler.py new file mode 100644 index 00000000000..76c65b3b686 --- /dev/null +++ b/api/tests/opentrons/hardware_control/integration/test_thermocycler.py @@ -0,0 +1,42 @@ +import asyncio + +import pytest +from mock import AsyncMock, patch +from opentrons.config import IS_WIN +from opentrons.drivers.rpi_drivers.types import USBPort +from opentrons.drivers.thermocycler.driver import TCPoller +from opentrons.hardware_control.emulation.app import THERMOCYCLER_PORT +from opentrons.hardware_control.modules import Thermocycler + + +@pytest.fixture +async def patch_fd_path(tmpdir): + """Thermocycler uses /var/run as directory for polling files. We need + a directory that does not require root permission.""" + with patch.object(TCPoller, 'POLLING_FD_PATH', new=str(tmpdir)) as p: + yield p + + +@pytest.fixture +async def thermocycler( + loop: asyncio.BaseEventLoop, + patch_fd_path, + emulation_app) -> Thermocycler: + """Thermocycler fixture.""" + td = await Thermocycler.build( + port=f"socket://127.0.0.1:{THERMOCYCLER_PORT}", + execution_manager=AsyncMock(), + usb_port=USBPort(name="", port_number=1, sub_names=[], device_path="", + hub=1), + loop=loop + ) + yield td + # Thermocycler class does not have a public interface to disconnect + td._driver.disconnect() + + +@pytest.mark.skipif(IS_WIN, reason="Cannot be run on Windows") +def test_device_info(thermocycler: Thermocycler): + """""" + assert {'model': 'thermocycler_emulator', 'serial': 'fake_serial', + 'version': '1'} == thermocycler.device_info