Skip to content

Commit

Permalink
scripts.
Browse files Browse the repository at this point in the history
  • Loading branch information
amit lissack committed Oct 23, 2021
1 parent 185be8d commit c25c1e7
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 97 deletions.
11 changes: 1 addition & 10 deletions api/src/opentrons/hardware_control/emulation/app.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
import asyncio
import logging
from enum import Enum

from opentrons.hardware_control.emulation.module_server import ModuleStatusServer
from opentrons.hardware_control.emulation.parser import Parser
from opentrons.hardware_control.emulation.proxy import Proxy
from opentrons.hardware_control.emulation.run_emulator import run_emulator_server
from opentrons.hardware_control.emulation.settings import Settings
from opentrons.hardware_control.emulation.smoothie import SmoothieEmulator
from opentrons.hardware_control.emulation.types import ModuleType

logger = logging.getLogger(__name__)


class ModuleType(str, Enum):
"""Module type enumeration."""

Magnetic = "magnetic"
Temperature = "temperature"
Thermocycler = "thermocycler"
Heatershaker = "heatershaker"


class Application:
"""The emulator application."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
import logging

from opentrons.hardware_control.emulation.app import ModuleType
from opentrons.hardware_control.emulation.types import ModuleType
from typing_extensions import Literal, Final
from typing import Dict, List, Set, Sequence, Optional
from pydantic import BaseModel
Expand Down
6 changes: 4 additions & 2 deletions api/src/opentrons/hardware_control/emulation/run_emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ async def run_emulator_client(
break
except IOError:
log.error(
f"{emulator.__class__.__name__} failed to connect on try {i + 1}. Retrying in {interval_seconds} seconds."
f"{emulator.__class__.__name__} failed to connect on "
f"try {i + 1}. Retrying in {interval_seconds} seconds."
)
await asyncio.sleep(interval_seconds)

if r is None or w is None:
raise IOError(
f"Failed to connect to {emulator.__class__.__name__} at {host}:{port} after {retries} retries."
f"Failed to connect to {emulator.__class__.__name__} at "
f"{host}:{port} after {retries} retries."
)

connection = ConnectionHandler(emulator)
Expand Down

This file was deleted.

Empty file.
54 changes: 54 additions & 0 deletions api/src/opentrons/hardware_control/emulation/scripts/run_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Script for starting up emulation up with module emulators."""
import logging
import asyncio
from argparse import ArgumentParser
from typing import List

from opentrons.hardware_control.emulation.app import Application
from opentrons.hardware_control.emulation.scripts.run_module_emulator import (
emulator_builder,
)
from opentrons.hardware_control.emulation.settings import Settings
from .run_module_emulator import run as run_module_by_name


async def run(settings: Settings, modules: List[str]) -> None:
"""Run the emulator app with connected module emulators.
Args:
settings: App settings.
modules: The module emulators to start.
Returns:
None
"""
loop = asyncio.get_event_loop()

app_task = loop.create_task(Application(settings=settings).run())
module_tasks = [
loop.create_task(
run_module_by_name(settings=settings, emulator_name=n, host="localhost")
)
for n in modules
]
await asyncio.gather(app_task, *module_tasks)


def main() -> None:
"""Entry point."""
a = ArgumentParser()
a.add_argument(
"--m",
action="append",
choices=emulator_builder.keys(),
help="which module(s) to emulate.",
)
args = a.parse_args()

logging.basicConfig(format="%(asctime)s:%(message)s", level=logging.DEBUG)
asyncio.run(run(Settings(), args.m))


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Script for starting up a python module emulator."""
import logging
import asyncio
from argparse import ArgumentParser
from typing import Dict, Callable
from typing_extensions import Final

from opentrons.hardware_control.emulation.abstract_emulator import AbstractEmulator
from opentrons.hardware_control.emulation.types import ModuleType
from opentrons.hardware_control.emulation.magdeck import MagDeckEmulator
from opentrons.hardware_control.emulation.parser import Parser
from opentrons.hardware_control.emulation.tempdeck import TempDeckEmulator
from opentrons.hardware_control.emulation.thermocycler import ThermocyclerEmulator


from opentrons.hardware_control.emulation.run_emulator import run_emulator_client
from opentrons.hardware_control.emulation.settings import Settings, ProxySettings

emulator_builder: Final[Dict[str, Callable[[Settings], AbstractEmulator]]] = {
ModuleType.Magnetic.value: lambda s: MagDeckEmulator(Parser()),
ModuleType.Temperature.value: lambda s: TempDeckEmulator(Parser()),
ModuleType.Thermocycler.value: lambda s: ThermocyclerEmulator(Parser()),
}

emulator_port: Final[Dict[str, Callable[[Settings], ProxySettings]]] = {
ModuleType.Magnetic.value: lambda s: s.magdeck_proxy,
ModuleType.Temperature.value: lambda s: s.temperature_proxy,
ModuleType.Thermocycler.value: lambda s: s.thermocycler_proxy,
}


async def run(settings: Settings, emulator_name: str, host: str) -> None:
"""Run an emulator.
Args:
settings: emulator settings
emulator_name: Name of emulator. This must be a key in emulator_builder
host: host to connect to.
Returns:
None
"""
e = emulator_builder[emulator_name](settings)
proxy_settings = emulator_port[emulator_name](settings)
await run_emulator_client(host, proxy_settings.emulator_port, e)


def main() -> None:
"""Entry point."""
a = ArgumentParser()
a.add_argument(
"emulator",
type=str,
choices=emulator_builder.keys(),
help="which module to emulate.",
)
a.add_argument("host", type=str, help="the emulator host")
args = a.parse_args()

logging.basicConfig(format="%(asctime)s:%(message)s", level=logging.DEBUG)
asyncio.run(run(Settings(), args.emulator, args.host))


if __name__ == "__main__":
main()
10 changes: 10 additions & 0 deletions api/src/opentrons/hardware_control/emulation/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from enum import Enum


class ModuleType(str, Enum):
"""Module type enumeration."""

Magnetic = "magnetic"
Temperature = "temperature"
Thermocycler = "thermocycler"
Heatershaker = "heatershaker"
44 changes: 12 additions & 32 deletions api/tests/opentrons/hardware_control/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
from time import sleep
from typing import Iterator

import pytest
import threading
import asyncio
from opentrons.hardware_control.emulation.app import Application
from opentrons.hardware_control.emulation.magdeck import MagDeckEmulator
from opentrons.hardware_control.emulation.parser import Parser
from opentrons.hardware_control.emulation.run_emulator import run_emulator_client
from opentrons.hardware_control.emulation.scripts import run_app
from opentrons.hardware_control.emulation.types import ModuleType
from opentrons.hardware_control.emulation.settings import (
Settings,
SmoothieSettings,
PipetteSettings,
)
from opentrons.hardware_control.emulation.tempdeck import TempDeckEmulator
from opentrons.hardware_control.emulation.thermocycler import ThermocyclerEmulator


@pytest.fixture(scope="session")
Expand All @@ -33,32 +28,17 @@ def emulator_settings() -> Settings:
def emulation_app(emulator_settings: Settings) -> Iterator[None]:
"""Run the emulators"""

async def _run_emulation_environment() -> None:
await asyncio.gather(
# Start application
Application(settings=emulator_settings).run(),
# Add magdeck emulator
run_emulator_client(
host="localhost",
port=emulator_settings.magdeck_proxy.emulator_port,
emulator=MagDeckEmulator(Parser()),
),
# Add temperature emulator
run_emulator_client(
host="localhost",
port=emulator_settings.temperature_proxy.emulator_port,
emulator=TempDeckEmulator(Parser()),
),
# Add thermocycler emulator
run_emulator_client(
host="localhost",
port=emulator_settings.thermocycler_proxy.emulator_port,
emulator=ThermocyclerEmulator(Parser()),
),
)

def runit() -> None:
asyncio.run(_run_emulation_environment())
asyncio.run(
run_app.run(
settings=emulator_settings,
modules=[
ModuleType.Magnetic,
ModuleType.Temperature,
ModuleType.Thermocycler,
],
)
)

# TODO 20210219
# The emulators must be run in a separate thread because our serial
Expand Down

0 comments on commit c25c1e7

Please sign in to comment.