From 7d9734b5daf918758571468dc872bbf712cf52f1 Mon Sep 17 00:00:00 2001 From: Laura Cox <31892318+Laura-Danielle@users.noreply.github.com> Date: Thu, 11 Mar 2021 16:20:24 -0500 Subject: [PATCH] feat(api): Add physical USB Port information to the hardware controller (#7359) --- api/src/opentrons/algorithms/__init__.py | 7 + api/src/opentrons/algorithms/dfs.py | 55 +++++ api/src/opentrons/algorithms/graph.py | 196 ++++++++++++++++++ api/src/opentrons/algorithms/types.py | 40 ++++ .../drivers/rpi_drivers/interfaces.py | 40 ++++ .../opentrons/drivers/rpi_drivers/types.py | 95 ++++++++- api/src/opentrons/drivers/rpi_drivers/usb.py | 154 ++++++++++++++ .../drivers/rpi_drivers/usb_simulator.py | 103 +++++++++ api/src/opentrons/hardware_control/api.py | 16 +- .../opentrons/hardware_control/controller.py | 5 +- .../hardware_control/modules/magdeck.py | 9 + .../hardware_control/modules/mod_abc.py | 12 +- .../hardware_control/modules/tempdeck.py | 9 + .../hardware_control/modules/thermocycler.py | 9 + .../hardware_control/modules/types.py | 10 +- .../hardware_control/modules/utils.py | 5 +- .../opentrons/hardware_control/simulator.py | 4 + .../implementations/protocol_context.py | 1 + .../fixture_alphabetical_graph.json | 12 ++ .../algorithms/fixture_numerical_graph.json | 12 ++ api/tests/opentrons/algorithms/test_dfs.py | 127 ++++++++++++ .../opentrons/drivers/rpi_drivers/test_usb.py | 82 ++++++++ .../modules/test_hc_magdeck.py | 23 +- .../modules/test_hc_tempdeck.py | 27 ++- .../modules/test_hc_thermocycler.py | 23 +- .../hardware_control/test_modules.py | 7 + .../service/legacy/routers/test_modules.py | 13 ++ 27 files changed, 1067 insertions(+), 29 deletions(-) create mode 100644 api/src/opentrons/algorithms/__init__.py create mode 100644 api/src/opentrons/algorithms/dfs.py create mode 100644 api/src/opentrons/algorithms/graph.py create mode 100644 api/src/opentrons/algorithms/types.py create mode 100644 api/src/opentrons/drivers/rpi_drivers/interfaces.py create mode 100644 api/src/opentrons/drivers/rpi_drivers/usb.py create mode 100644 api/src/opentrons/drivers/rpi_drivers/usb_simulator.py create mode 100644 api/tests/opentrons/algorithms/fixture_alphabetical_graph.json create mode 100644 api/tests/opentrons/algorithms/fixture_numerical_graph.json create mode 100644 api/tests/opentrons/algorithms/test_dfs.py create mode 100644 api/tests/opentrons/drivers/rpi_drivers/test_usb.py diff --git a/api/src/opentrons/algorithms/__init__.py b/api/src/opentrons/algorithms/__init__.py new file mode 100644 index 00000000000..6920017f1ab --- /dev/null +++ b/api/src/opentrons/algorithms/__init__.py @@ -0,0 +1,7 @@ + +""" +Algorithms Module. + +This module should only contain generic implementations for +common computer science algorithms that can be used in different applications. +""" diff --git a/api/src/opentrons/algorithms/dfs.py b/api/src/opentrons/algorithms/dfs.py new file mode 100644 index 00000000000..c6b98c39e53 --- /dev/null +++ b/api/src/opentrons/algorithms/dfs.py @@ -0,0 +1,55 @@ +""" +Depth first search. + +Search a generic graph down to its leaf +nodes first before back-tracking up the tree. +""" + +from typing import List, Set, Generic + +from .graph import Graph +from .types import VertexLike, VertexName + + +class DFS(Generic[VertexName]): + """ + Depth first search class. + + This class will build a graph object and then + perform a depth first search on the graph. + """ + + def __init__(self, graph: List[VertexLike]) -> None: + """ + DFS Initializer. + + :param graph: A list of nodes you wish to add to + the graph. + """ + self._graph = Graph.build(graph) + + @property + def graph(self) -> Graph: + """ + DFS property: graph. + + :returns: the graph object in which + dfs is being performed on. + """ + return self._graph + + def dfs(self) -> Set[VertexName]: + """ + Depth first search. + + :returns: the set of visited vertices + in depth first search order. + """ + visited_vertices: Set[VertexName] = set() + for node in self.graph.graph: + if node not in visited_vertices: + visited_vertices.add(node.name) + for neighbor in node.neighbors: + if neighbor not in visited_vertices: + visited_vertices.add(neighbor) + return visited_vertices diff --git a/api/src/opentrons/algorithms/graph.py b/api/src/opentrons/algorithms/graph.py new file mode 100644 index 00000000000..8cf286f1340 --- /dev/null +++ b/api/src/opentrons/algorithms/graph.py @@ -0,0 +1,196 @@ + +""" +Graph Builder. + +Generic graph builder classes. +""" +from __future__ import annotations + +from typing import List, Dict, Callable, Sequence, Generic + +from .types import VertexLike, VertexName + + +class Vertex(Generic[VertexName, VertexLike]): + """ + Vertex class. + + A class to hold information about each vertex + of a graph. + """ + + def __init__( + self, vertex: VertexLike, + neighbors: List[VertexName]) -> None: + """ + Vertex class initializer. + + :param vertex: A node dataclass + :param neighbors: A list of node names who + are neighbors to the node dataclass + """ + self._vertex = vertex + self._neighbors = neighbors + + @property + def name(self) -> VertexName: + """ + Vertex class property: name. + + :returns: Name of the vertex + """ + return self._vertex.name + + @property + def vertex(self) -> VertexLike: + """ + Vertex class property: vertex. + + :returns: The node dataclass + """ + return self._vertex + + @property + def neighbors(self) -> List[VertexName]: + """ + Vertex class property: neighbors. + + :returns: The list of node names who are + neighbors with the vertex. + """ + return self._neighbors + + def add_neighbor(self, vertex_name: VertexName) -> None: + """ + Add a neighbor. + + :param vertex_name: The name of the neighbor + """ + self._neighbors.append(vertex_name) + self._neighbors.sort() + + def remove_neighbor(self, vertex_name: VertexName) -> None: + """ + Remove a neighbor. + + :param vertex_name: The name of the neighbor + """ + if vertex_name in self._neighbors: + self._neighbors.remove(vertex_name) + + +def default_sort(vertex: Vertex) -> VertexName: + """ + Sort function default for a graph. + + By default, a graph's nodes will be searched + by the name of the node. Generally, the name + should either be a string or an integer. + """ + return vertex.name + + +class Graph(Generic[VertexName, VertexLike]): + """ + Graph class. + + A class to handle functions moving through the + graph. + """ + + # Note, the type of sort_by is actually + # Callable[[Vertex], VertexName] however there + # is an issue when using generics for functions + # passed into sort. See + # https://github.com/python/typing/issues/760 + def __init__( + self, sorted_graph: List[Vertex], + lookup_table: Dict[VertexName, Vertex], + sort_by: Callable[[Vertex], str]) -> None: + """ + Graph class initializer. + + :param sorted_graph: The initial graph, sorted + and converted to vertex objects. + :param lookup_table: A lookup table keyed by vertex + name and with a value of the vertex object + :param sort_by: The callable function used to sort + the graph nodes in priority order. + """ + self._sort_by = sort_by + self._sorted_graph = sorted_graph + self._lookup_table = lookup_table + + @classmethod + def build(cls, graph: List[VertexLike], + sort_by: Callable = default_sort) -> Graph: + """ + Graph class builder. + + :param graph: A list of nodes to add to the graph. + :param sort_by: The function used to sort the graph + in priority order. + :returns: A graph class + """ + sorted_graph = [] + lookup_table = {} + for vertex in graph: + vertex_obj = cls.build_vertex(vertex) + lookup_table[vertex.name] = vertex_obj + sorted_graph.append(vertex_obj) + sorted_graph.sort(key=sort_by) + return cls(sorted_graph, lookup_table, sort_by) + + @property + def graph(self) -> Sequence[Vertex]: + """ + Graph class property: graph. + + :returns: A list of sorted vertex objects + """ + return self._sorted_graph + + @staticmethod + def build_vertex(vertex: VertexLike) -> Vertex: + """ + Build a vertex. + + Use this to sort the neighbors and then build + a vertex using a node dataclass. + :param vertex: A node dataclass + :returns: vertex object + """ + vertex.sub_names.sort() + return Vertex(vertex, vertex.sub_names) + + def add_vertex(self, vertex: VertexLike) -> None: + """ + Add a vertex. + + :param vertex: A node dataclass + """ + new_vertex = self.build_vertex(vertex) + if new_vertex not in self._lookup_table.values(): + self._lookup_table[new_vertex.name] = new_vertex + self._sorted_graph.append(new_vertex) + self._sorted_graph.sort(key=self._sort_by) + + def remove_vertex(self, vertex: VertexLike) -> None: + """ + Remove a vertex. + + :param vertex: A node dataclass + """ + if vertex.name in self._lookup_table.keys(): + vertex_to_remove = self._lookup_table[vertex.name] + del self._lookup_table[vertex.name] + self._sorted_graph.remove(vertex_to_remove) + + def get_vertex(self, vertex_name: VertexName) -> Vertex: + """ + Get a vertex. + + :param vertex_name: The name of the vertex + :returns: The vertex object + """ + return self._lookup_table[vertex_name] diff --git a/api/src/opentrons/algorithms/types.py b/api/src/opentrons/algorithms/types.py new file mode 100644 index 00000000000..cc69b952517 --- /dev/null +++ b/api/src/opentrons/algorithms/types.py @@ -0,0 +1,40 @@ +""" +Algorithm types. + +Any type definitions required for the algorithms +module should be put in this file. +""" + +from typing import TypeVar, Generic, List +from dataclasses import dataclass + + +VertexName = TypeVar('VertexName') + + +@dataclass(frozen=True) +class GenericNode(Generic[VertexName]): + """ + Generic graph node dataclass. + + A dataclass to hold information about a + graph node. Information should not be + mutated about the node. + """ + + name: VertexName + sub_names: List[str] + + def __hash__(self) -> int: + """ + Hash function. + + To have a unique set of nodes, they must + all have a unique hash. Lists are not + hashable which is why we need to unpack + the list here. + """ + return hash((self.name, *self.sub_names)) + + +VertexLike = TypeVar('VertexLike', bound=GenericNode) diff --git a/api/src/opentrons/drivers/rpi_drivers/interfaces.py b/api/src/opentrons/drivers/rpi_drivers/interfaces.py new file mode 100644 index 00000000000..f3656c960b0 --- /dev/null +++ b/api/src/opentrons/drivers/rpi_drivers/interfaces.py @@ -0,0 +1,40 @@ +from typing import List, Set +from typing_extensions import Protocol + +from .types import USBPort + + +class USBDriverInterface(Protocol): + + @staticmethod + def read_bus() -> List[str]: + ... + + @staticmethod + def convert_port_path(full_port_path: str) -> USBPort: + ... + + @property + def usb_dev(self) -> List[USBPort]: + ... + + @usb_dev.setter + def usb_dev(self, ports: List[USBPort]) -> None: + ... + + @property + def sorted_ports(self) -> Set: + ... + + @sorted_ports.setter + def sorted_ports(self, sorted: Set) -> None: + ... + + def read_usb_bus(self) -> List[USBPort]: + ... + + def find_port(self, device_path: str) -> USBPort: + ... + + def sort_ports(self) -> None: + ... diff --git a/api/src/opentrons/drivers/rpi_drivers/types.py b/api/src/opentrons/drivers/rpi_drivers/types.py index ea7c3cb3759..82a0ef26782 100644 --- a/api/src/opentrons/drivers/rpi_drivers/types.py +++ b/api/src/opentrons/drivers/rpi_drivers/types.py @@ -1,7 +1,9 @@ import enum from itertools import groupby -from typing import List, Optional +from dataclasses import dataclass +from typing import List, Optional, Tuple from opentrons.hardware_control.types import BoardRevision +from opentrons.algorithms.types import GenericNode class PinDir(enum.Enum): @@ -10,6 +12,97 @@ class PinDir(enum.Enum): output = enum.auto() +@dataclass(frozen=True) +class USBPort(GenericNode): + port_number: Optional[int] + device_path: str + hub: Optional[int] + + @classmethod + def build(cls, port_path: str) -> 'USBPort': + """ + Build a USBPort dataclass. + + An example port path: + `1-1.3/1-1.3:1.0/tty/ttyACM1/dev` + + :param port_path: Full path of a usb device + :returns: Tuple of the port number, hub and name + """ + full_name, device_path = port_path.split(':') + port_nodes = cls.get_unique_nodes(full_name) + hub, port, name = cls.find_hub(port_nodes) + return cls( + name=name, + port_number=port, + sub_names=[], + device_path=device_path, + hub=hub) + + @staticmethod + def find_hub( + port_nodes: List[str] + ) -> Tuple[Optional[int], int, str]: + """ + Find Hub. + + Here we need to determine if a port is a hub + or not. A hub path might look like: + `1-1.4/1-1.4/1-1.4.1/1-1.4.1`. When this function + is used, the nodes will look like: ['1-1.4', '1-1.4.1']. + This function will then be used to check if it is a + port hub based on the values given. In the case of the + example above, it will determine this device is + connected to a hub and return both the port and + the hub number. The hub would always be the first number, + in this case `4` and the port number of the hub would be `1`. + + :param port_nodes: A list of unique port id(s) + :returns: Tuple of the port number, hub and name + """ + if len(port_nodes) > 1: + port_info = port_nodes[1].split('.') + hub: Optional[int] = int(port_info[1]) + port = int(port_info[2]) + name = port_nodes[1] + else: + port = int(port_nodes[0].split('.')[1]) + hub = None + name = port_nodes[0] + return hub, port, name + + @staticmethod + def get_unique_nodes(full_name: str) -> List[str]: + """ + Get Unique Nodes. + + A path might look like: `1-1.3/1-1.3`. In this + instance we know that the device is on Bus 1 and + port 3 of the pi. We only need one unique id + here, so we will filter it out. + + :param full_name: Full path of the physical + USB Path. + :returns: List of separated USB port paths + """ + port_nodes = [] + for node in full_name.split('/'): + if node not in port_nodes: + port_nodes.append(node) + return port_nodes + + def __hash__(self) -> int: + """ + Hash function. + + To have a unique set of nodes, they must + all have a unique hash. Lists are not + hashable which is why we need to unpack + the list here. + """ + return hash((self.name, *self.sub_names)) + + class GPIOPin: @classmethod diff --git a/api/src/opentrons/drivers/rpi_drivers/usb.py b/api/src/opentrons/drivers/rpi_drivers/usb.py new file mode 100644 index 00000000000..590371267c0 --- /dev/null +++ b/api/src/opentrons/drivers/rpi_drivers/usb.py @@ -0,0 +1,154 @@ +""" +USB Driver. + +A class to convert info from the usb bus into a +more readable format. +""" + +import subprocess +import re +from typing import List, Set + +from opentrons.algorithms.dfs import DFS + +from .interfaces import USBDriverInterface +from .types import USBPort + + +# Example usb path might look like: +# '/sys/bus/usb/devices/usb1/1-1/1-1.3/1-1.3:1.0/tty/ttyACM1/dev'. +# There is only 1 bus that supports USB on the raspberry pi. +BUS_PATH = '/sys/bus/usb/devices/usb1/' +PORT_PATTERN = r'(/\d-\d(\.?\d)+)+:' +DEVICE_PATH = r'\d.\d/tty/tty(\w{4})/dev' +USB_PORT_INFO = re.compile(PORT_PATTERN + DEVICE_PATH) + + +class USBBus(USBDriverInterface): + def __init__(self): + self._usb_dev: List[USBPort] = self.read_usb_bus() + self._dfs: DFS = DFS(self._usb_dev) + self._sorted = self._dfs.dfs() + + @staticmethod + def read_bus() -> List[str]: + """ + Read the USB Bus information. + + Use the sys bus path to find all of the USBs with + active devices connected to them. + """ + read = [''] + try: + read = subprocess.check_output( + ['find', BUS_PATH, '-name', 'dev']).decode().splitlines() + except Exception: + pass + return read + + @staticmethod + def convert_port_path(full_port_path: str) -> USBPort: + """ + Convert port path. + + Take the value returned from the USB bus and format + that information into a dataclass + :param full_port_path: The string port path + :returns: The USBPort dataclass + """ + return USBPort.build(full_port_path.strip('/')) + + @property + def usb_dev(self) -> List[USBPort]: + """ + USBBus property: usb_dev. + + :returns: The list of ports found from + the usb bus. + """ + return self._usb_dev + + @usb_dev.setter + def usb_dev(self, ports: List[USBPort]) -> None: + """ + USBBus setter: usb_dev. + + :param ports: The list of ports found from + the usb bus. + """ + self._usb_dev = ports + + @property + def sorted_ports(self) -> Set: + """ + USBBus property: sorted_ports. + + :returns: The set of sorted ports + """ + return self._sorted + + @sorted_ports.setter + def sorted_ports(self, sorted: Set) -> None: + """ + USBBus setter: sorted_ports. + + :param sorted: The updated set of usb ports. + """ + self._sorted = sorted + + def read_usb_bus(self) -> List[USBPort]: + """ + Read usb bus + + Take the value returned from the USB bus and match + the paths to the expected port paths for modules. + :returns: A list of matching ports as dataclasses + """ + active_ports = self.read_bus() + port_matches = [] + for port in active_ports: + match = USB_PORT_INFO.search(port) + if match: + port_matches.append(self.convert_port_path(match.group(0))) + return port_matches + + def find_port(self, device_path: str) -> USBPort: + """ + Find port. + + Take the value returned from the USB bus and match + the paths to the expected port paths for modules. + :param device_path: The device path of a module, which + generally contains tty/tty* in its name. + :returns: The matching port, or an empty port dataclass + """ + for s in self.sorted_ports: + vertex = self._dfs.graph.get_vertex(s) + port = vertex.vertex + if port.device_path.find(device_path): + return port + return USBPort( + name='', sub_names=[], hub=None, + port_number=None, device_path=device_path) + + def sort_ports(self) -> None: + """ + Sort ports. + + Check the cached bus read vs the new bus read. Update + graph and sorted ports accordingly. + :param device_path: The device path of a module, which + generally contains tty/tty* in its name. + :returns: The matching port, or an empty port dataclass + """ + updated_bus = self.read_usb_bus() + remove_difference = set(self.usb_dev) - set(updated_bus) + add_difference = set(updated_bus) - set(self.usb_dev) + + if remove_difference or add_difference: + for d in remove_difference: + self._dfs.graph.remove_vertex(d) + for d in add_difference: + self._dfs.graph.add_vertex(d) + self.sorted_ports = self._dfs.dfs() + self.usb_dev = updated_bus diff --git a/api/src/opentrons/drivers/rpi_drivers/usb_simulator.py b/api/src/opentrons/drivers/rpi_drivers/usb_simulator.py new file mode 100644 index 00000000000..70b28847a65 --- /dev/null +++ b/api/src/opentrons/drivers/rpi_drivers/usb_simulator.py @@ -0,0 +1,103 @@ +""" +USB Simulating Driver. + +A class to convert info from the usb bus into a +more readable format. +""" +from typing import List, Set + +from .interfaces import USBDriverInterface +from .types import USBPort + + +class USBBusSimulator(USBDriverInterface): + def __init__(self): + self._usb_dev: List[USBPort] = self.read_usb_bus() + self._sorted = set() + + @staticmethod + def read_bus() -> List[str]: + """ + Read the USB Bus information. + + Use the sys bus path to find all of the USBs with + active devices connected to them. + """ + return [''] + + @staticmethod + def convert_port_path(full_port_path: str) -> USBPort: + """ + Convert port path. + + Take the value returned from the USB bus and format + that information into a dataclass + :param full_port_path: The string port path + :returns: The USBPort dataclass + """ + pass + + @property + def usb_dev(self) -> List[USBPort]: + """ + USBBus property: usb_dev. + + :returns: The list of ports found from + the usb bus. + """ + return self._usb_dev + + @usb_dev.setter + def usb_dev(self, ports: List[USBPort]) -> None: + """ + USBBus setter: usb_dev. + + :param ports: The list of ports found from + the usb bus. + """ + self._usb_dev = ports + + @property + def sorted_ports(self) -> Set: + """ + USBBus property: sorted_ports. + + :returns: The set of sorted ports + """ + return self._sorted + + @sorted_ports.setter + def sorted_ports(self, sorted: Set) -> None: + """ + USBBus setter: sorted_ports. + + :param sorted: The updated set of usb ports. + """ + self._sorted = sorted + + def read_usb_bus(self) -> List[USBPort]: + """ + Read usb bus + + Take the value returned from the USB bus and match + the paths to the expected port paths for modules. + :returns: A list of matching ports as dataclasses + """ + return [] + + def find_port(self, device_path: str) -> USBPort: + """ + Find port. + + Take the value returned from the USB bus and match + the paths to the expected port paths for modules. + :param device_path: The device path of a module, which + generally contains tty/tty* in its name. + :returns: The matching port, or an empty port dataclass + """ + return USBPort( + name='', sub_names=[], hub=None, + port_number=None, device_path=device_path) + + def sort_ports(self) -> None: + pass diff --git a/api/src/opentrons/hardware_control/api.py b/api/src/opentrons/hardware_control/api.py index b4d931bd71c..44f82667ed3 100644 --- a/api/src/opentrons/hardware_control/api.py +++ b/api/src/opentrons/hardware_control/api.py @@ -1720,9 +1720,9 @@ def set_pipette_speed(self, mount, def _unregister_modules(self, mods_at_ports: List[modules.ModuleAtPort]) -> None: removed_modules = [] - for port, mod in mods_at_ports: # type: ignore + for mod in mods_at_ports: for attached_mod in self._attached_modules: - if attached_mod.port == port: + if attached_mod.port == mod.port: removed_modules.append(attached_mod) for removed_mod in removed_modules: try: @@ -1747,18 +1747,20 @@ async def register_modules( # destroy removed mods self._unregister_modules(removed_mods_at_ports) + self._backend._usb.sort_ports() # build new mods - for port, name in new_mods_at_ports: + for mod in new_mods_at_ports: new_instance = await self._backend.build_module( - port=port, - model=name, + port=mod.port, + usb_port=self._backend._usb.find_port(mod.port), + model=mod.name, interrupt_callback=self.pause_with_message, loop=self.loop, execution_manager=self._execution_manager) self._attached_modules.append(new_instance) - self._log.info(f"Module {name} discovered and attached" - f" at port {port}, new_instance: {new_instance}") + self._log.info(f"Module {mod.name} discovered and attached" + f" at port {mod.port}, new_instance: {new_instance}") def get_instrument_max_height( self, diff --git a/api/src/opentrons/hardware_control/controller.py b/api/src/opentrons/hardware_control/controller.py index d763aea9c32..daa15c4e293 100644 --- a/api/src/opentrons/hardware_control/controller.py +++ b/api/src/opentrons/hardware_control/controller.py @@ -11,7 +11,7 @@ aionotify = None # type: ignore from opentrons.drivers.smoothie_drivers import driver_3_0 -from opentrons.drivers.rpi_drivers import build_gpio_chardev +from opentrons.drivers.rpi_drivers import build_gpio_chardev, types, usb import opentrons.config from opentrons.config import pipette_config from opentrons.config.types import RobotConfig @@ -78,6 +78,7 @@ def __init__(self, config: RobotConfig, gpio: GPIODriverLike): config=self.config, gpio_chardev=self._gpio_chardev, handle_locks=False) self._cached_fw_version: Optional[str] = None + self._usb = usb.USBBus() try: self._module_watcher = aionotify.Watcher() self._module_watcher.watch( @@ -243,12 +244,14 @@ async def watch_modules(self, loop: asyncio.AbstractEventLoop, async def build_module( self, port: str, + usb_port: types.USBPort, model: str, interrupt_callback: modules.InterruptCallback, loop: asyncio.AbstractEventLoop, execution_manager: ExecutionManager) -> modules.AbstractModule: return await modules.build( port=port, + usb_port=usb_port, which=model, simulating=False, interrupt_callback=interrupt_callback, diff --git a/api/src/opentrons/hardware_control/modules/magdeck.py b/api/src/opentrons/hardware_control/modules/magdeck.py index 4f49e21de8a..b5e3b840e4f 100644 --- a/api/src/opentrons/hardware_control/modules/magdeck.py +++ b/api/src/opentrons/hardware_control/modules/magdeck.py @@ -4,6 +4,7 @@ from opentrons.drivers.mag_deck import ( SimulatingDriver, MagDeck as MagDeckDriver) from opentrons.drivers.mag_deck.driver import mag_locks +from opentrons.drivers.rpi_drivers.types import USBPort from ..execution_manager import ExecutionManager from . import update, mod_abc, types @@ -46,6 +47,7 @@ class MagDeck(mod_abc.AbstractModule): @classmethod async def build(cls, port: str, + usb_port: USBPort, execution_manager: ExecutionManager, interrupt_callback: types.InterruptCallback = None, simulating=False, @@ -54,6 +56,7 @@ async def build(cls, # MagDeck does not currently use interrupts, so the callback is not # passed on mod = cls(port=port, + usb_port=usb_port, simulating=simulating, loop=loop, execution_manager=execution_manager, @@ -84,11 +87,13 @@ def _build_driver( def __init__(self, port: str, + usb_port: USBPort, execution_manager: ExecutionManager, simulating: bool, loop: asyncio.AbstractEventLoop = None, sim_model: str = None) -> None: super().__init__(port=port, + usb_port=usb_port, simulating=simulating, loop=loop, execution_manager=execution_manager, @@ -168,6 +173,10 @@ def live_data(self) -> types.LiveData: def port(self) -> str: return self._port + @property + def usb_port(self) -> USBPort: + return self._usb_port + @property def is_simulated(self) -> bool: return isinstance(self._driver, SimulatingDriver) diff --git a/api/src/opentrons/hardware_control/modules/mod_abc.py b/api/src/opentrons/hardware_control/modules/mod_abc.py index 89f4b59574b..a2edb1f9054 100644 --- a/api/src/opentrons/hardware_control/modules/mod_abc.py +++ b/api/src/opentrons/hardware_control/modules/mod_abc.py @@ -6,6 +6,7 @@ from typing import Mapping, Optional from opentrons.config import IS_ROBOT, ROBOT_FIRMWARE_DIR from opentrons.hardware_control.util import use_or_initialize_loop +from opentrons.drivers.rpi_drivers.types import USBPort from ..execution_manager import ExecutionManager from .types import BundledFirmware, UploadFunction, InterruptCallback, LiveData @@ -19,6 +20,7 @@ class AbstractModule(abc.ABC): @abc.abstractmethod async def build(cls, port: str, + usb_port: USBPort, execution_manager: ExecutionManager, interrupt_callback: InterruptCallback = None, simulating: bool = False, @@ -35,11 +37,13 @@ async def build(cls, @abc.abstractmethod def __init__(self, port: str, + usb_port: USBPort, execution_manager: ExecutionManager, simulating: bool = False, loop: asyncio.AbstractEventLoop = None, sim_model: str = None) -> None: self._port = port + self._usb_port = usb_port self._loop = use_or_initialize_loop(loop) self._execution_manager = execution_manager self._device_info: Mapping[str, str] @@ -111,7 +115,13 @@ def is_simulated(self) -> bool: @property @abc.abstractmethod def port(self) -> str: - """ The port where the module is connected. """ + """ The virtual port where the module is connected. """ + pass + + @property + @abc.abstractmethod + def usb_port(self) -> USBPort: + """ The physical port where the module is connected. """ pass @abc.abstractmethod diff --git a/api/src/opentrons/hardware_control/modules/tempdeck.py b/api/src/opentrons/hardware_control/modules/tempdeck.py index 150e4ff2b7e..2fc65759db2 100644 --- a/api/src/opentrons/hardware_control/modules/tempdeck.py +++ b/api/src/opentrons/hardware_control/modules/tempdeck.py @@ -4,6 +4,7 @@ from typing import Mapping, Union, Optional from opentrons.drivers.temp_deck import ( SimulatingDriver, TempDeck as TempDeckDriver) +from opentrons.drivers.rpi_drivers.types import USBPort from opentrons.drivers.temp_deck.driver import temp_locks from ..execution_manager import ExecutionManager from . import update, mod_abc, types @@ -59,6 +60,7 @@ class TempDeck(mod_abc.AbstractModule): @classmethod async def build(cls, port: str, + usb_port: USBPort, execution_manager: ExecutionManager, interrupt_callback: types.InterruptCallback = None, simulating: bool = False, @@ -69,6 +71,7 @@ async def build(cls, # TempDeck does not currently use interrupts, so the callback is not # passed on mod = cls(port=port, + usb_port=usb_port, simulating=simulating, loop=loop, execution_manager=execution_manager, @@ -99,11 +102,13 @@ def _build_driver( def __init__(self, port: str, + usb_port: USBPort, execution_manager: ExecutionManager, simulating: bool, loop: asyncio.AbstractEventLoop = None, sim_model: str = None) -> None: super().__init__(port=port, + usb_port=usb_port, simulating=simulating, loop=loop, execution_manager=execution_manager, @@ -200,6 +205,10 @@ def status(self) -> str: def port(self) -> str: return self._port + @property + def usb_port(self) -> USBPort: + return self._usb_port + @property def is_simulated(self) -> bool: return isinstance(self._driver, SimulatingDriver) diff --git a/api/src/opentrons/hardware_control/modules/thermocycler.py b/api/src/opentrons/hardware_control/modules/thermocycler.py index c2ef8bddc77..b2e6ee746a4 100644 --- a/api/src/opentrons/hardware_control/modules/thermocycler.py +++ b/api/src/opentrons/hardware_control/modules/thermocycler.py @@ -1,6 +1,7 @@ import asyncio import logging from typing import Union, Optional, List, Callable +from opentrons.drivers.rpi_drivers.types import USBPort from ..execution_manager import ExecutionManager from . import types, update, mod_abc from opentrons.drivers.thermocycler.driver import ( @@ -19,6 +20,7 @@ class Thermocycler(mod_abc.AbstractModule): @classmethod async def build(cls, port: str, + usb_port: USBPort, execution_manager: ExecutionManager, interrupt_callback: types.InterruptCallback = None, simulating: bool = False, @@ -28,6 +30,7 @@ async def build(cls, """ mod = cls(port=port, + usb_port=usb_port, interrupt_callback=interrupt_callback, simulating=simulating, loop=loop, @@ -60,12 +63,14 @@ def _build_driver( def __init__(self, port: str, + usb_port: USBPort, execution_manager: ExecutionManager, interrupt_callback: types.InterruptCallback = None, simulating: bool = False, loop: asyncio.AbstractEventLoop = None, sim_model: str = None) -> None: super().__init__(port=port, + usb_port=usb_port, simulating=simulating, loop=loop, execution_manager=execution_manager) @@ -323,6 +328,10 @@ async def _connect(self): def port(self): return self._port + @property + def usb_port(self) -> USBPort: + return self._usb_port + async def prep_for_update(self): await self._driver.enter_programming_mode() diff --git a/api/src/opentrons/hardware_control/modules/types.py b/api/src/opentrons/hardware_control/modules/types.py index 593f92ede87..644e5f2a8b0 100644 --- a/api/src/opentrons/hardware_control/modules/types.py +++ b/api/src/opentrons/hardware_control/modules/types.py @@ -1,4 +1,4 @@ -from collections import namedtuple +from dataclasses import dataclass from typing import ( Dict, NamedTuple, Callable, Any, Tuple, Awaitable, Mapping, Union) from pathlib import Path @@ -10,11 +10,15 @@ UploadFunction = Callable[[str, str, Dict[str, Any]], Awaitable[Tuple[bool, str]]] -ModuleAtPort = namedtuple('ModuleAtPort', ('port', 'name')) - LiveData = Mapping[str, Union[str, Mapping[str, Union[float, str, None]]]] +@dataclass(frozen=True) +class ModuleAtPort: + port: str + name: str + + class BundledFirmware(NamedTuple): """ Represents a versioned firmware file, generally bundled into the fs""" version: str diff --git a/api/src/opentrons/hardware_control/modules/utils.py b/api/src/opentrons/hardware_control/modules/utils.py index 3a51ef7b36f..d21fe63612f 100644 --- a/api/src/opentrons/hardware_control/modules/utils.py +++ b/api/src/opentrons/hardware_control/modules/utils.py @@ -1,10 +1,11 @@ import asyncio import logging -from glob import glob import re +from glob import glob from typing import List, Optional from opentrons.config import IS_ROBOT, IS_LINUX +from opentrons.drivers.rpi_drivers.types import USBPort # NOTE: Must import all modules so they actually create the subclasses from . import update, tempdeck, magdeck, thermocycler, types # noqa: F401 from .mod_abc import AbstractModule @@ -30,12 +31,14 @@ async def build( port: str, which: str, simulating: bool, + usb_port: USBPort, interrupt_callback: InterruptCallback, loop: asyncio.AbstractEventLoop, execution_manager: ExecutionManager, sim_model: str = None) -> AbstractModule: return await MODULE_HW_BY_NAME[which].build( port, + usb_port, interrupt_callback=interrupt_callback, simulating=simulating, loop=loop, diff --git a/api/src/opentrons/hardware_control/simulator.py b/api/src/opentrons/hardware_control/simulator.py index 5498db309e1..fd618215ad7 100644 --- a/api/src/opentrons/hardware_control/simulator.py +++ b/api/src/opentrons/hardware_control/simulator.py @@ -18,6 +18,7 @@ from opentrons.drivers.smoothie_drivers import SimulatingDriver from opentrons.drivers.rpi_drivers.gpio_simulator import SimulatingGPIOCharDev +from opentrons.drivers.rpi_drivers import types as usb_types, usb_simulator from . import modules from .execution_manager import ExecutionManager @@ -148,6 +149,7 @@ def _sanitize_attached_instrument( self._run_flag.set() self._log = MODULE_LOG.getChild(repr(self)) self._strict_attached = bool(strict_attached_instruments) + self._usb = usb_simulator.USBBusSimulator() @property def gpio_chardev(self) -> GPIODriverLike: @@ -267,6 +269,7 @@ def save_current(self): async def build_module( self, port: str, + usb_port: usb_types.USBPort, model: str, interrupt_callback: modules.InterruptCallback, loop: asyncio.AbstractEventLoop, @@ -275,6 +278,7 @@ async def build_module( ) -> modules.AbstractModule: return await modules.build( port=port, + usb_port=usb_port, which=model, simulating=True, interrupt_callback=interrupt_callback, diff --git a/api/src/opentrons/protocols/implementations/protocol_context.py b/api/src/opentrons/protocols/implementations/protocol_context.py index 177fe41738c..ac33af3703f 100644 --- a/api/src/opentrons/protocols/implementations/protocol_context.py +++ b/api/src/opentrons/protocols/implementations/protocol_context.py @@ -203,6 +203,7 @@ def load_module( hc_mod_instance = SynchronousAdapter( mod_type( port='', + usb_port=self._hw_manager.hardware._backend._usb.find_port(''), simulating=True, loop=self._hw_manager.hardware.loop, execution_manager=ExecutionManager( diff --git a/api/tests/opentrons/algorithms/fixture_alphabetical_graph.json b/api/tests/opentrons/algorithms/fixture_alphabetical_graph.json new file mode 100644 index 00000000000..12afaca3b1d --- /dev/null +++ b/api/tests/opentrons/algorithms/fixture_alphabetical_graph.json @@ -0,0 +1,12 @@ +{ + "A": ["B", "E"], + "B": ["F", "A"], + "C": ["G"], + "H": ["I", "E", "D"], + "I": ["H", "F"], + "J": ["F", "G"], + "D": ["E", "H"], + "E": ["A", "D", "H"], + "F": ["B", "I", "J", "G"], + "G": ["F", "J", "C"] +} \ No newline at end of file diff --git a/api/tests/opentrons/algorithms/fixture_numerical_graph.json b/api/tests/opentrons/algorithms/fixture_numerical_graph.json new file mode 100644 index 00000000000..68c33c0216f --- /dev/null +++ b/api/tests/opentrons/algorithms/fixture_numerical_graph.json @@ -0,0 +1,12 @@ +{ + "1": [2, 5], + "2": [6, 1], + "3": [7], + "8": [9, 5, 4], + "9": [8, 6], + "10": [6, 7], + "4": [5, 8], + "5": [1, 4, 8], + "6": [2, 9, 10, 7], + "7": [6, 10, 3] +} \ No newline at end of file diff --git a/api/tests/opentrons/algorithms/test_dfs.py b/api/tests/opentrons/algorithms/test_dfs.py new file mode 100644 index 00000000000..a957fc23b94 --- /dev/null +++ b/api/tests/opentrons/algorithms/test_dfs.py @@ -0,0 +1,127 @@ +"""Test the DFS module. + +Generic testing of a depth first search algo +""" + +import pytest +import json +import os +from typing import Callable, List, Tuple, Any + +from opentrons.algorithms import dfs, types + + +def convert_to_vertex( + graph_dict: dict, + cast_type: Callable) -> List[types.GenericNode]: + """ + Convert to Vertex. + + Helper function to convert a json file to a list of + generic nodes to help build the graph. + """ + graph = [] + for key, value in graph_dict.items(): + vertex = types.GenericNode( + name=cast_type(key), + sub_names=value) + graph.append(vertex) + return graph + + +def load_graph() -> Tuple[Tuple]: + """ + Load Graphs. + + Helper function to load the test json graph files. + """ + cwd = os.getcwd() + test_path = 'tests/opentrons/algorithms' + with open(f'{cwd}/{test_path}/fixture_alphabetical_graph.json') as f: + alphabet = convert_to_vertex(json.load(f), str) + with open(f'{cwd}/{test_path}/fixture_numerical_graph.json') as f: + numbers = convert_to_vertex(json.load(f), int) + return (alphabet, 'string'), (numbers, 'integer') + + +@pytest.fixture(scope="session", params=load_graph()) +def dfs_graph(request: Any) -> Tuple[dfs.DFS, str]: + """ + Build DFS class. + + Fixture that sets up a dfs class for either an + alphabetical or numerical graph. + """ + graph = request.param[0] + _type = request.param[1] + yield dfs.DFS(graph), _type + + +def test_vertices(dfs_graph: dfs.DFS) -> None: + """ + Test vertices. + + Test adding and removing the vertices of a graph. + + Here we should check for the new vertice in the lookup + table of the graph as well as the sorted graph attribute. + """ + _dfs, _type = dfs_graph + graph = _dfs.graph + if _type == 'string': + additional_vertex = types.GenericNode( + name='K', sub_names=['H', 'J', 'A']) + else: + additional_vertex = types.GenericNode( + name=12, sub_names=[1, 9, 5] + ) + graph.add_vertex(additional_vertex) + vertex_obj = graph.get_vertex(additional_vertex.name) + assert additional_vertex.name in graph._lookup_table.keys() + assert vertex_obj in graph._sorted_graph + graph.remove_vertex(additional_vertex) + assert additional_vertex not in graph._lookup_table.keys() + assert vertex_obj not in graph._sorted_graph + + +def test_neighbors(dfs_graph: dfs.DFS) -> None: + """ + Test neighbors. + + Test adding neighbors to a vertex. + Neighbors act as keys for a given vertex. + """ + _dfs, _type = dfs_graph + graph = _dfs.graph + if _type == 'string': + key = 'A' + neighbor = 'J' + og_neighbors = ['B', 'E'] + sorted_neighbors = ['B', 'E', 'J'] + else: + key = 1 + neighbor = 4 + og_neighbors = [2, 5] + sorted_neighbors = [2, 4, 5] + vertex = graph.get_vertex(key) + assert vertex.neighbors == og_neighbors + vertex.add_neighbor(neighbor) + assert vertex.neighbors == sorted_neighbors + vertex.remove_neighbor(neighbor) + assert vertex.neighbors == og_neighbors + + +def test_depth_first_search(dfs_graph: dfs.DFS) -> None: + """ + Test the depth first search algorithm. + + The method should dig down into the bottom leaf node + before backtracking up to find unvisited nodes. + """ + _dfs, _type = dfs_graph + visited_vertices = _dfs.dfs() + if _type == 'string': + sort = {'A', 'B', 'F', 'G', 'C', 'J', 'I', 'H', 'D', 'E'} + else: + sort = {1, 2, 6, 7, 3, 10, 9, 8, 4, 5} + assert sort == visited_vertices diff --git a/api/tests/opentrons/drivers/rpi_drivers/test_usb.py b/api/tests/opentrons/drivers/rpi_drivers/test_usb.py new file mode 100644 index 00000000000..fa975740fa0 --- /dev/null +++ b/api/tests/opentrons/drivers/rpi_drivers/test_usb.py @@ -0,0 +1,82 @@ +import pytest + +from mock import patch, MagicMock +from opentrons.drivers.rpi_drivers.usb import USBBus +from opentrons.drivers.rpi_drivers.types import USBPort + +fake_bus = [ + '/sys/bus/usb/devices/usb1/dev', + '/sys/bus/usb/devices/usb1/1-1/1-1.3/1-1.3:1.0/tty/ttyACM1/dev', + '/sys/bus/usb/devices/usb1/1-1/1-1.5/1-1.5:1.0/tty/ttyAMA0/dev', + '/sys/bus/usb/devices/usb1/1-1/1-1.4/1-1.4/1-1.4.1/1-1.4.1:1.0/tty/ttyAMA1/dev', + '/sys/bus/usb/devices/usb1/1-1/1-1.4/1-1.4/1-1.4.3/1-1.4.3:1.0/tty/ttyACM1/dev', + '/sys/bus/usb/devices/usb1/1-1/1-1.3/dev', + '/sys/bus/usb/devices/usb1/1-1/1-1.1/dev', + '/sys/bus/usb/devices/usb1/1-1/dev', + '/sys/bus/usb/devices/usb1/1-1/1-1.4/dev', + '/sys/bus/usb/devices/usb1/1-1/1-1.4/1-1.4:1.2/0003:046D:C52B.0017/hidraw/hidraw0/dev', # NOQA + '/sys/bus/usb/devices/usb1/1-1/1-1.4/1-1.4:1.2/usbmisc/hiddev0/dev' +] + +filtered_ports = [ + '1-1.3/1-1.3:1.0/tty/ttyACM1/dev', + '1-1.5/1-1.5:1.0/tty/ttyAMA0/dev', + '1-1.4/1-1.4/1-1.4.1/1-1.4.1:1.0/tty/ttyAMA1/dev', + '1-1.4/1-1.4/1-1.4.3/1-1.4.3:1.0/tty/ttyACM1/dev' +] + + +@pytest.fixture +def usb_class() -> USBBus: + + @staticmethod + def fake_read_bus(): + return fake_bus + + with patch.object(USBBus, 'read_bus', fake_read_bus): + yield USBBus() + + +def test_usb_list_output(usb_class: USBBus) -> None: + expected_ports = [USBPort.build(p) for p in filtered_ports] + assert usb_class.usb_dev == expected_ports + regular_port = expected_ports[0] + hub_port = expected_ports[2] + + assert regular_port.name == '1-1.3' + assert regular_port.hub is None + assert regular_port.sub_names == [] + assert hub_port.name == '1-1.4.1' + assert hub_port.hub == 4 + assert hub_port.sub_names == [] + + +def test_find_device(usb_class: USBBus) -> None: + device_paths = [ + '/tty/ttyACM1/dev', + '/tty/ttyAMA1/dev', + '/tty/ttyAMA0/dev'] + for dp in device_paths: + port = usb_class.find_port(dp) + assert port in usb_class.usb_dev + + +def test_unplug_device(usb_class: USBBus) -> None: + import copy + + copy_bus = copy.copy(fake_bus) + copy_ports = copy.copy(filtered_ports) + u = '/sys/bus/usb/devices/usb1/1-1/1-1.3/1-1.3:1.0/tty/ttyACM1/dev' + copy_bus.remove(u) + copy_ports.remove('1-1.3/1-1.3:1.0/tty/ttyACM1/dev') + + usb_class.read_bus = MagicMock(return_value=copy_bus) + usb_class.sort_ports() + + expected_ports = [USBPort.build(p) for p in copy_ports] + assert usb_class.usb_dev == expected_ports + + +def test_sorted_usb_class(usb_class: USBBus) -> None: + expected_sorted = {'1-1.3', '1-1.4.1', '1-1.4.3', '1-1.5'} + assert usb_class.sorted_ports == expected_sorted diff --git a/api/tests/opentrons/hardware_control/modules/test_hc_magdeck.py b/api/tests/opentrons/hardware_control/modules/test_hc_magdeck.py index 83597c99295..3ea38b19f53 100644 --- a/api/tests/opentrons/hardware_control/modules/test_hc_magdeck.py +++ b/api/tests/opentrons/hardware_control/modules/test_hc_magdeck.py @@ -1,8 +1,19 @@ +import pytest from opentrons.hardware_control import modules, ExecutionManager +from opentrons.drivers.rpi_drivers.types import USBPort -async def test_sim_initialization(loop): + +@pytest.fixture +def usb_port(): + return USBPort( + name='', sub_names=[], hub=None, + port_number=None, device_path='/dev/ot_module_sim_magdeck0') + + +async def test_sim_initialization(loop, usb_port): mag = await modules.build(port='/dev/ot_module_sim_magdeck0', + usb_port=usb_port, which='magdeck', simulating=True, interrupt_callback=lambda x: None, @@ -11,8 +22,9 @@ async def test_sim_initialization(loop): assert isinstance(mag, modules.AbstractModule) -async def test_sim_data(loop): +async def test_sim_data(loop, usb_port): mag = await modules.build(port='/dev/ot_module_sim_magdeck0', + usb_port=usb_port, which='magdeck', simulating=True, interrupt_callback=lambda x: None, @@ -27,8 +39,9 @@ async def test_sim_data(loop): assert 'data' in mag.live_data -async def test_sim_state_update(loop): +async def test_sim_state_update(loop, usb_port): mag = await modules.build(port='/dev/ot_module_sim_magdeck0', + usb_port=usb_port, which='magdeck', simulating=True, interrupt_callback=lambda x: None, @@ -42,8 +55,8 @@ async def test_sim_state_update(loop): assert mag.status == 'disengaged' -async def test_revision_model_parsing(loop): - mag = await modules.build('', 'magdeck', True, lambda x: None, loop=loop, +async def test_revision_model_parsing(loop, usb_port): + mag = await modules.build('', 'magdeck', True, usb_port, lambda x: None, loop=loop, execution_manager=ExecutionManager(loop=loop)) mag._device_info['model'] = 'mag_deck_v1.1' assert mag.model() == 'magneticModuleV1' diff --git a/api/tests/opentrons/hardware_control/modules/test_hc_tempdeck.py b/api/tests/opentrons/hardware_control/modules/test_hc_tempdeck.py index 0778465f7c1..8493e19b240 100644 --- a/api/tests/opentrons/hardware_control/modules/test_hc_tempdeck.py +++ b/api/tests/opentrons/hardware_control/modules/test_hc_tempdeck.py @@ -1,10 +1,22 @@ +import pytest import asyncio from opentrons.hardware_control import modules, ExecutionManager from opentrons.hardware_control.modules import tempdeck -async def test_sim_initialization(loop): +from opentrons.drivers.rpi_drivers.types import USBPort + + +@pytest.fixture +def usb_port(): + return USBPort( + name='', sub_names=[], hub=None, + port_number=None, device_path='/dev/ot_module_sim_tempdeck0') + + +async def test_sim_initialization(loop, usb_port): temp = await modules.build(port='/dev/ot_module_sim_tempdeck0', + usb_port=usb_port, which='tempdeck', simulating=True, interrupt_callback=lambda x: None, @@ -13,8 +25,9 @@ async def test_sim_initialization(loop): assert isinstance(temp, modules.AbstractModule) -async def test_sim_state(loop): +async def test_sim_state(loop, usb_port): temp = await modules.build(port='/dev/ot_module_sim_tempdeck0', + usb_port=usb_port, which='tempdeck', simulating=True, interrupt_callback=lambda x: None, @@ -33,8 +46,9 @@ async def test_sim_state(loop): assert status['version'] == 'dummyVersionTD' -async def test_sim_update(loop): +async def test_sim_update(loop, usb_port): temp = await modules.build(port='/dev/ot_module_sim_tempdeck0', + usb_port=usb_port, which='tempdeck', simulating=True, interrupt_callback=lambda x: None, @@ -50,9 +64,10 @@ async def test_sim_update(loop): assert temp.status == 'idle' -async def test_poller(monkeypatch, loop): +async def test_poller(monkeypatch, loop, usb_port): temp = modules.tempdeck.TempDeck( port='/dev/ot_module_sim_tempdeck0', + usb_port=usb_port, execution_manager=ExecutionManager(loop=loop), simulating=True, loop=loop) @@ -69,8 +84,8 @@ def update_called(): assert hit -async def test_revision_model_parsing(loop): - mag = await modules.build('', 'tempdeck', True, lambda x: None, loop=loop, +async def test_revision_model_parsing(loop, usb_port): + mag = await modules.build('', 'tempdeck', True, usb_port, lambda x: None, loop=loop, execution_manager=ExecutionManager(loop=loop)) mag._device_info['model'] = 'temp_deck_v20' assert mag.model() == 'temperatureModuleV2' diff --git a/api/tests/opentrons/hardware_control/modules/test_hc_thermocycler.py b/api/tests/opentrons/hardware_control/modules/test_hc_thermocycler.py index 4c711682b6d..2504b9900c2 100644 --- a/api/tests/opentrons/hardware_control/modules/test_hc_thermocycler.py +++ b/api/tests/opentrons/hardware_control/modules/test_hc_thermocycler.py @@ -1,10 +1,21 @@ +import pytest import asyncio from unittest import mock from opentrons.hardware_control import modules, ExecutionManager +from opentrons.drivers.rpi_drivers.types import USBPort -async def test_sim_initialization(loop): + +@pytest.fixture +def usb_port(): + return USBPort( + name='', sub_names=[], hub=None, + port_number=None, device_path='/dev/ot_module_sim_thermocycler0') + + +async def test_sim_initialization(loop, usb_port): therm = await modules.build(port='/dev/ot_module_sim_thermocycler0', + usb_port=usb_port, which='thermocycler', simulating=True, interrupt_callback=lambda x: None, @@ -16,6 +27,7 @@ async def test_sim_initialization(loop): async def test_lid(loop): therm = await modules.build(port='/dev/ot_module_sim_thermocycler0', + usb_port=usb_port, which='thermocycler', simulating=True, interrupt_callback=lambda x: None, @@ -37,8 +49,9 @@ async def test_lid(loop): assert therm.lid_status == 'open' -async def test_sim_state(loop): +async def test_sim_state(loop, usb_port): therm = await modules.build(port='/dev/ot_module_sim_thermocycler0', + usb_port=usb_port, which='thermocycler', simulating=True, interrupt_callback=lambda x: None, @@ -57,8 +70,9 @@ async def test_sim_state(loop): assert status['version'] == 'dummyVersionTC' -async def test_sim_update(loop): +async def test_sim_update(loop, usb_port): therm = await modules.build(port='/dev/ot_module_sim_thermocycler0', + usb_port=usb_port, which='thermocycler', simulating=True, interrupt_callback=lambda x: None, @@ -102,8 +116,9 @@ async def test_sim_update(loop): assert therm.lid_target is None -async def test_set_temperature(monkeypatch, loop): +async def test_set_temperature(monkeypatch, loop, usb_port): hw_tc = await modules.build(port='/dev/ot_module_sim_thermocycler0', + usb_port=usb_port, which='thermocycler', simulating=True, interrupt_callback=lambda x: None, diff --git a/api/tests/opentrons/hardware_control/test_modules.py b/api/tests/opentrons/hardware_control/test_modules.py index ab5501cee6a..6be7902745f 100644 --- a/api/tests/opentrons/hardware_control/test_modules.py +++ b/api/tests/opentrons/hardware_control/test_modules.py @@ -10,6 +10,7 @@ from opentrons.hardware_control.modules import ModuleAtPort from opentrons.hardware_control.modules.types import BundledFirmware from opentrons.hardware_control.modules import tempdeck, magdeck +from opentrons.drivers.rpi_drivers.types import USBPort async def test_get_modules_simulating(): @@ -74,9 +75,13 @@ def async_return(result): } # test temperature module update with avrdude bootloader + usb_port = USBPort( + name='', sub_names=[], hub=None, + port_number=None, device_path='/dev/ot_module_sim_tempdeck0') tempdeck = await modules.build( port='/dev/ot_module_sim_tempdeck0', + usb_port=usb_port, which='tempdeck', simulating=True, interrupt_callback=lambda x: None, @@ -109,6 +114,7 @@ async def mock_find_avrdude_bootloader_port(): magdeck = await modules.build( port='/dev/ot_module_sim_magdeck0', + usb_port=usb_port, which='magdeck', simulating=True, interrupt_callback=lambda x: None, @@ -126,6 +132,7 @@ async def mock_find_avrdude_bootloader_port(): thermocycler = await modules.build( port='/dev/ot_module_sim_thermocycler0', + usb_port=usb_port, which='thermocycler', simulating=True, interrupt_callback=lambda x: None, diff --git a/robot-server/tests/service/legacy/routers/test_modules.py b/robot-server/tests/service/legacy/routers/test_modules.py index 0e09ae58a0c..ca26f7adee9 100644 --- a/robot-server/tests/service/legacy/routers/test_modules.py +++ b/robot-server/tests/service/legacy/routers/test_modules.py @@ -7,13 +7,18 @@ from opentrons.hardware_control.modules import MagDeck, Thermocycler, TempDeck from opentrons.hardware_control.modules import utils, UpdateError, \ BundledFirmware +from opentrons.drivers.rpi_drivers.types import USBPort @pytest.fixture def magdeck(): + usb_port = USBPort( + name='', sub_names=[], hub=None, + port_number=None, device_path='/dev/ot_module_magdeck1') m = asyncio.get_event_loop().run_until_complete( utils.build( port='/dev/ot_module_magdeck1', + usb_port=usb_port, which='magdeck', simulating=True, interrupt_callback=lambda x: None, @@ -28,9 +33,13 @@ def magdeck(): @pytest.fixture def tempdeck(): + usb_port = USBPort( + name='', sub_names=[], hub=None, + port_number=None, device_path='/dev/ot_module_tempdeck1') t = asyncio.get_event_loop().run_until_complete( utils.build( port='/dev/ot_module_tempdeck1', + usb_port=usb_port, which='tempdeck', simulating=True, interrupt_callback=lambda x: None, @@ -50,9 +59,13 @@ def tempdeck(): @pytest.fixture def thermocycler(): + usb_port = USBPort( + name='', sub_names=[], hub=None, + port_number=None, device_path='/dev/ot_module_thermocycler1') t = asyncio.get_event_loop().run_until_complete( utils.build( port='/dev/ot_module_thermocycler1', + usb_port=usb_port, which='thermocycler', simulating=True, interrupt_callback=lambda x: None,