Skip to content

Commit

Permalink
wait for emulator connection method.
Browse files Browse the repository at this point in the history
  • Loading branch information
amit lissack committed Oct 23, 2021
1 parent ac6b618 commit 185be8d
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 12 deletions.
13 changes: 10 additions & 3 deletions api/src/opentrons/hardware_control/emulation/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

class ModuleType(str, Enum):
"""Module type enumeration."""

Magnetic = "magnetic"
Temperature = "temperature"
Thermocycler = "thermocycler"
Expand All @@ -38,13 +39,19 @@ def __init__(self, settings: Settings) -> None:
ModuleType.Magnetic, self._status_server, self._settings.magdeck_proxy
)
self._temperature = Proxy(
ModuleType.Temperature, self._status_server, self._settings.temperature_proxy
ModuleType.Temperature,
self._status_server,
self._settings.temperature_proxy,
)
self._thermocycler = Proxy(
ModuleType.Thermocycler, self._status_server, self._settings.thermocycler_proxy
ModuleType.Thermocycler,
self._status_server,
self._settings.thermocycler_proxy,
)
self._heatershaker = Proxy(
ModuleType.Heatershaker, self._status_server, self._settings.heatershaker_proxy
ModuleType.Heatershaker,
self._status_server,
self._settings.heatershaker_proxy,
)

async def run(self) -> None:
Expand Down
107 changes: 103 additions & 4 deletions api/src/opentrons/hardware_control/emulation/module_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import asyncio
import logging
from typing_extensions import Literal
from typing import Dict, List, Set

from opentrons.hardware_control.emulation.app import ModuleType
from typing_extensions import Literal, Final
from typing import Dict, List, Set, Sequence, Optional
from pydantic import BaseModel

from opentrons.hardware_control.emulation.proxy import ProxyListener
Expand All @@ -12,6 +14,8 @@

log = logging.getLogger(__name__)

MessageDelimiter: Final = b"\n"


class ModuleStatusServer(ProxyListener):
"""Server notifying of module connections."""
Expand Down Expand Up @@ -68,7 +72,7 @@ def on_server_disconnected(self, identifier: str) -> None:
.json()
.encode()
)
c.write(b"\n")
c.write(MessageDelimiter)
except KeyError:
log.exception("Failed to find identifier")

Expand All @@ -88,7 +92,7 @@ async def _handle_connection(
m = Message(status="dump", connections=list(self._connections.values()))

writer.write(m.json().encode())
writer.write(b"\n")
writer.write(MessageDelimiter)

self._clients.add(writer)

Expand All @@ -97,6 +101,8 @@ async def _handle_connection(
self._clients.remove(writer)
break

log.info("Client disconnected from module server.")


class Connection(BaseModel):
"""Model a single module connection."""
Expand All @@ -111,3 +117,96 @@ class Message(BaseModel):

status: Literal["connected", "disconnected", "dump"]
connections: List[Connection]


class ModuleServerClient:
"""A module server client."""

def __init__(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
"""Constructor."""
self._reader = reader
self._writer = writer

@classmethod
async def connect(
cls,
host: str,
port: int,
retries: int = 3,
interval_seconds: float = 0.1,
) -> ModuleServerClient:
"""Connect to the module server.
Args:
host: module server host.
port: module server port.
retries: number of retries
interval_seconds: time between retries.
Returns:
None
Raises:
IOError on retry expiry.
"""
r: Optional[asyncio.StreamReader] = None
w: Optional[asyncio.StreamWriter] = None
for i in range(retries):
try:
r, w = await asyncio.open_connection(host=host, port=port)
except OSError:
await asyncio.sleep(interval_seconds)

if r is not None and w is not None:
return ModuleServerClient(reader=r, writer=w)
else:
raise IOError(
f"Failed to connect to module_server at after {retries} retries."
)

async def read(self) -> Message:
"""Read a message from the module server."""
b = await self._reader.readuntil(MessageDelimiter)
m: Message = Message.parse_raw(b)
return m


async def wait_emulators(
client: ModuleServerClient,
modules: Sequence[ModuleType],
timeout: float,
) -> None:
"""Wait for module emulators to connect.
Args:
client: module server client.
modules: collection of of module types to wait for.
timeout: how long to wait for emulators to connect (in seconds)
Returns:
None
Raises:
asyncio.TimeoutError on timeout.
"""

async def _wait_modules(cl: ModuleServerClient, module_set: Set[str]) -> None:
"""Read messages from module server waiting for modules in module_set to
be connected."""
while module_set:
m: Message = await cl.read()
if m.status == "dump" or m.status == "connected":
for c in m.connections:
if c.module_type in module_set:
module_set.remove(c.module_type)
elif m.status == "disconnected":
for c in m.connections:
if c.module_type in module_set:
module_set.add(c.module_type)

log.debug(f"after message: {m}, awaiting module set is: {module_set}")

await asyncio.wait_for(
_wait_modules(cl=client, module_set=set(n.value for n in modules)),
timeout=timeout,
)
16 changes: 13 additions & 3 deletions api/src/opentrons/hardware_control/emulation/run_emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
log = logging.getLogger(__name__)


async def run_emulator_client(host: str, port: int, emulator: AbstractEmulator, retries: int = 3, interval_seconds:float=0.1) -> None:
async def run_emulator_client(
host: str,
port: int,
emulator: AbstractEmulator,
retries: int = 3,
interval_seconds: float = 0.1,
) -> None:
"""Run an emulator as a client.
Args:
Expand All @@ -30,11 +36,15 @@ async def run_emulator_client(host: str, port: int, emulator: AbstractEmulator,
r, w = await asyncio.open_connection(host, port)
break
except IOError:
log.error(f"{emulator.__class__.__name__} failed to connect on try {i + 1}. Retrying in {interval_seconds} seconds.")
log.error(
f"{emulator.__class__.__name__} failed to connect on try {i + 1}. Retrying in {interval_seconds} seconds."
)
await asyncio.sleep(interval_seconds)

if r is None or w is None:
raise IOError(f"Failed to connect to {emulator.__class__.__name__} at {host}:{port} after {retries} retries.")
raise IOError(
f"Failed to connect to {emulator.__class__.__name__} at {host}:{port} after {retries} retries."
)

connection = ConnectionHandler(emulator)
await connection(r, w)
Expand Down
2 changes: 0 additions & 2 deletions api/tests/opentrons/hardware_control/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,4 @@ def runit() -> None:
t = threading.Thread(target=runit)
t.daemon = True
t.start()
# Give it a bit to get going.
sleep(0.5)
yield

0 comments on commit 185be8d

Please sign in to comment.