diff --git a/api/src/opentrons/hardware_control/emulation/app.py b/api/src/opentrons/hardware_control/emulation/app.py index 74de5960aa59..f71ad647c082 100644 --- a/api/src/opentrons/hardware_control/emulation/app.py +++ b/api/src/opentrons/hardware_control/emulation/app.py @@ -14,6 +14,7 @@ class ModuleType(str, Enum): """Module type enumeration.""" + Magnetic = "magnetic" Temperature = "temperature" Thermocycler = "thermocycler" @@ -38,13 +39,19 @@ def __init__(self, settings: Settings) -> None: ModuleType.Magnetic, self._status_server, self._settings.magdeck_proxy ) self._temperature = Proxy( - ModuleType.Temperature, self._status_server, self._settings.temperature_proxy + ModuleType.Temperature, + self._status_server, + self._settings.temperature_proxy, ) self._thermocycler = Proxy( - ModuleType.Thermocycler, self._status_server, self._settings.thermocycler_proxy + ModuleType.Thermocycler, + self._status_server, + self._settings.thermocycler_proxy, ) self._heatershaker = Proxy( - ModuleType.Heatershaker, self._status_server, self._settings.heatershaker_proxy + ModuleType.Heatershaker, + self._status_server, + self._settings.heatershaker_proxy, ) async def run(self) -> None: diff --git a/api/src/opentrons/hardware_control/emulation/module_server.py b/api/src/opentrons/hardware_control/emulation/module_server.py index ba59ca3faa25..232d5aa287f5 100644 --- a/api/src/opentrons/hardware_control/emulation/module_server.py +++ b/api/src/opentrons/hardware_control/emulation/module_server.py @@ -2,8 +2,10 @@ import asyncio import logging -from typing_extensions import Literal -from typing import Dict, List, Set + +from opentrons.hardware_control.emulation.app import ModuleType +from typing_extensions import Literal, Final +from typing import Dict, List, Set, Sequence, Optional from pydantic import BaseModel from opentrons.hardware_control.emulation.proxy import ProxyListener @@ -12,6 +14,8 @@ log = logging.getLogger(__name__) +MessageDelimiter: Final = b"\n" + class ModuleStatusServer(ProxyListener): """Server notifying of module connections.""" @@ -68,7 +72,7 @@ def on_server_disconnected(self, identifier: str) -> None: .json() .encode() ) - c.write(b"\n") + c.write(MessageDelimiter) except KeyError: log.exception("Failed to find identifier") @@ -88,7 +92,7 @@ async def _handle_connection( m = Message(status="dump", connections=list(self._connections.values())) writer.write(m.json().encode()) - writer.write(b"\n") + writer.write(MessageDelimiter) self._clients.add(writer) @@ -97,6 +101,8 @@ async def _handle_connection( self._clients.remove(writer) break + log.info("Client disconnected from module server.") + class Connection(BaseModel): """Model a single module connection.""" @@ -111,3 +117,96 @@ class Message(BaseModel): status: Literal["connected", "disconnected", "dump"] connections: List[Connection] + + +class ModuleServerClient: + """A module server client.""" + + def __init__( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + """Constructor.""" + self._reader = reader + self._writer = writer + + @classmethod + async def connect( + cls, + host: str, + port: int, + retries: int = 3, + interval_seconds: float = 0.1, + ) -> ModuleServerClient: + """Connect to the module server. + + Args: + host: module server host. + port: module server port. + retries: number of retries + interval_seconds: time between retries. + + Returns: + None + Raises: + IOError on retry expiry. + """ + r: Optional[asyncio.StreamReader] = None + w: Optional[asyncio.StreamWriter] = None + for i in range(retries): + try: + r, w = await asyncio.open_connection(host=host, port=port) + except OSError: + await asyncio.sleep(interval_seconds) + + if r is not None and w is not None: + return ModuleServerClient(reader=r, writer=w) + else: + raise IOError( + f"Failed to connect to module_server at after {retries} retries." + ) + + async def read(self) -> Message: + """Read a message from the module server.""" + b = await self._reader.readuntil(MessageDelimiter) + m: Message = Message.parse_raw(b) + return m + + +async def wait_emulators( + client: ModuleServerClient, + modules: Sequence[ModuleType], + timeout: float, +) -> None: + """Wait for module emulators to connect. + + Args: + client: module server client. + modules: collection of of module types to wait for. + timeout: how long to wait for emulators to connect (in seconds) + + Returns: + None + Raises: + asyncio.TimeoutError on timeout. + """ + + async def _wait_modules(cl: ModuleServerClient, module_set: Set[str]) -> None: + """Read messages from module server waiting for modules in module_set to + be connected.""" + while module_set: + m: Message = await cl.read() + if m.status == "dump" or m.status == "connected": + for c in m.connections: + if c.module_type in module_set: + module_set.remove(c.module_type) + elif m.status == "disconnected": + for c in m.connections: + if c.module_type in module_set: + module_set.add(c.module_type) + + log.debug(f"after message: {m}, awaiting module set is: {module_set}") + + await asyncio.wait_for( + _wait_modules(cl=client, module_set=set(n.value for n in modules)), + timeout=timeout, + ) diff --git a/api/src/opentrons/hardware_control/emulation/run_emulator.py b/api/src/opentrons/hardware_control/emulation/run_emulator.py index bd2a74e0603e..25bee731082e 100644 --- a/api/src/opentrons/hardware_control/emulation/run_emulator.py +++ b/api/src/opentrons/hardware_control/emulation/run_emulator.py @@ -8,7 +8,13 @@ log = logging.getLogger(__name__) -async def run_emulator_client(host: str, port: int, emulator: AbstractEmulator, retries: int = 3, interval_seconds:float=0.1) -> None: +async def run_emulator_client( + host: str, + port: int, + emulator: AbstractEmulator, + retries: int = 3, + interval_seconds: float = 0.1, +) -> None: """Run an emulator as a client. Args: @@ -30,11 +36,15 @@ async def run_emulator_client(host: str, port: int, emulator: AbstractEmulator, r, w = await asyncio.open_connection(host, port) break except IOError: - log.error(f"{emulator.__class__.__name__} failed to connect on try {i + 1}. Retrying in {interval_seconds} seconds.") + log.error( + f"{emulator.__class__.__name__} failed to connect on try {i + 1}. Retrying in {interval_seconds} seconds." + ) await asyncio.sleep(interval_seconds) if r is None or w is None: - raise IOError(f"Failed to connect to {emulator.__class__.__name__} at {host}:{port} after {retries} retries.") + raise IOError( + f"Failed to connect to {emulator.__class__.__name__} at {host}:{port} after {retries} retries." + ) connection = ConnectionHandler(emulator) await connection(r, w) diff --git a/api/tests/opentrons/hardware_control/integration/conftest.py b/api/tests/opentrons/hardware_control/integration/conftest.py index 91cac71aca0d..86e401e0f689 100644 --- a/api/tests/opentrons/hardware_control/integration/conftest.py +++ b/api/tests/opentrons/hardware_control/integration/conftest.py @@ -67,6 +67,4 @@ def runit() -> None: t = threading.Thread(target=runit) t.daemon = True t.start() - # Give it a bit to get going. - sleep(0.5) yield