-
Notifications
You must be signed in to change notification settings - Fork 179
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(api): Add hardware_control submodule (#2349)
The hardware_control submodule is the controller of the robot's hardware. Its API can move the robot at a level of abstraction that portrays the robot as a whole, not including the labware. In this PR, the module is a stub, and not used by default Closes #2232 * fixup: More verbose names for file and thread locks * fixup: use functools.wraps in log_call decorator
- Loading branch information
Showing
7 changed files
with
359 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
""" | ||
hardware_control: The sole authority for controlling the hardware of an OT2. | ||
The hardware_control module presents a unified api for the lowest level of | ||
hardware command that takes into account the robot as a whole. For instance, | ||
it presents an API for moving a specific pipette mount (not a specific motor | ||
or axis) to a deck-absolute point (not a Smoothie-coordinate point). | ||
This module is not for use outside the opentrons api module. Higher-level | ||
functions are available elsewhere. | ||
""" | ||
|
||
import asyncio | ||
import functools | ||
import logging | ||
|
||
from . import simulator | ||
try: | ||
from . import controller | ||
except ModuleNotFoundError: | ||
# implies windows | ||
controller = None # type: ignore | ||
|
||
|
||
mod_log = logging.getLogger(__name__) | ||
|
||
|
||
def _log_call(func): | ||
@functools.wraps(func) | ||
def _log_call_inner(*args, **kwargs): | ||
args[0]._log.debug(func.__name__) | ||
return func(*args, **kwargs) | ||
return _log_call_inner | ||
|
||
|
||
class API: | ||
""" This API is the primary interface to the hardware controller. | ||
Because the hardware manager controls access to the system's hardware | ||
as a whole, it is designed as a class of which only one should be | ||
instantiated at a time. This class's methods should be the only method | ||
of external access to the hardware. Each method may be minimal - it may | ||
only delegate the call to another submodule of the hardware manager - | ||
but its purpose is to be gathered here to provide a single interface. | ||
""" | ||
|
||
CLS_LOG = mod_log.getChild('API') | ||
|
||
def __init__(self, | ||
backend: object, | ||
config: dict = None, | ||
loop: asyncio.AbstractEventLoop = None) -> None: | ||
""" Initialize an API instance. | ||
This should rarely be explicitly invoked by an external user; instead, | ||
one of the factory methods build_hardware_controller or | ||
build_hardware_simulator should be used. | ||
""" | ||
self._log = self.CLS_LOG.getChild(str(id(self))) | ||
self._backend = backend | ||
if None is loop: | ||
self._loop = asyncio.get_event_loop() | ||
else: | ||
self._loop = loop | ||
|
||
@classmethod | ||
def build_hardware_controller( | ||
cls, config: dict = None, | ||
loop: asyncio.AbstractEventLoop = None) -> 'API': | ||
""" Build a hardware controller that will actually talk to hardware. | ||
This method should not be used outside of a real robot, and on a | ||
real robot only one true hardware controller may be active at one | ||
time. | ||
""" | ||
if None is controller: | ||
raise RuntimeError( | ||
'The hardware controller may only be instantiated on a robot') | ||
return cls(controller.Controller(config, loop), | ||
config=config, loop=loop) | ||
|
||
@classmethod | ||
def build_hardware_simulator( | ||
cls, config: dict = None, | ||
loop: asyncio.AbstractEventLoop = None) -> 'API': | ||
""" Build a simulating hardware controller. | ||
This method may be used both on a real robot and on dev machines. | ||
Multiple simulating hardware controllers may be active at one time. | ||
""" | ||
return cls(simulator.Simulator(config, loop), | ||
config=config, loop=loop) | ||
|
||
# Query API | ||
@_log_call | ||
def get_connected_hardware(self): | ||
""" Get the cached hardware connected to the robot. | ||
""" | ||
pass | ||
|
||
# Incidentals (i.e. not motion) API | ||
@_log_call | ||
async def turn_on_button_light(self): | ||
pass | ||
|
||
@_log_call | ||
async def turn_off_button_light(self): | ||
pass | ||
|
||
@_log_call | ||
async def turn_on_rail_lights(self): | ||
pass | ||
|
||
@_log_call | ||
async def turn_off_rail_lights(self): | ||
pass | ||
|
||
@_log_call | ||
async def identify(self, seconds): | ||
pass | ||
|
||
@_log_call | ||
async def cache_instrument_models(self): | ||
pass | ||
|
||
@_log_call | ||
async def update_smoothie_firmware(self, firmware_file): | ||
pass | ||
|
||
# Global actions API | ||
@_log_call | ||
async def pause(self): | ||
pass | ||
|
||
@_log_call | ||
async def resume(self): | ||
pass | ||
|
||
@_log_call | ||
async def halt(self): | ||
pass | ||
|
||
# Gantry/frame (i.e. not pipette) action API | ||
@_log_call | ||
async def home(self, *args, **kwargs): | ||
pass | ||
|
||
@_log_call | ||
async def home_z(self): | ||
pass | ||
|
||
@_log_call | ||
async def move_to(self, mount, position=None, position_rel=None): | ||
pass | ||
|
||
# Gantry/frame (i.e. not pipette) config API | ||
@_log_call | ||
async def head_speed(self, combined_speed=None, | ||
x=None, y=None, z=None, a=None, b=None, c=None): | ||
pass | ||
|
||
# Pipette action API | ||
@_log_call | ||
async def aspirate(self, mount, volume=None, rate=None): | ||
pass | ||
|
||
@_log_call | ||
async def dispense(self, mount, volume=None, rate=None): | ||
pass | ||
|
||
@_log_call | ||
async def blow_out(self, mount): | ||
pass | ||
|
||
@_log_call | ||
async def air_gap(self, mount, volume=None): | ||
pass | ||
|
||
@_log_call | ||
async def pick_up_tip(self, mount, tip_length): | ||
pass | ||
|
||
@_log_call | ||
async def drop_tip(self, mount): | ||
pass | ||
|
||
# Pipette config api | ||
@_log_call | ||
async def calibrate_plunger( | ||
self, mount, top=None, bottom=None, blow_out=None, drop_tip=None): | ||
pass | ||
|
||
@_log_call | ||
async def set_flow_rate(self, mount, aspirate=None, dispense=None): | ||
pass | ||
|
||
@_log_call | ||
async def set_pick_up_current(self, mount, amperes): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import os | ||
import fcntl | ||
import threading | ||
|
||
from opentrons.util import environment | ||
|
||
_lock = threading.Lock() | ||
|
||
|
||
class _Locker: | ||
""" A class that combines a threading.Lock and a file lock to ensure | ||
controllers are unique both between processes and within a process. | ||
There should be one instance of this per process. | ||
""" | ||
LOCK_FILE_PATH = environment.settings['HARDWARE_CONTROLLER_LOCKFILE'] | ||
|
||
def __init__(self): | ||
global _lock | ||
|
||
self._thread_lock_acquired = _lock.acquire(blocking=False) | ||
self._file_lock_acquired = self._try_acquire_file_lock() | ||
if not (self._thread_lock_acquired and self._file_lock_acquired): | ||
raise RuntimeError( | ||
'Only one hardware controller may be instantiated') | ||
|
||
def _try_acquire_file_lock(self): | ||
self._file = open(self.LOCK_FILE_PATH, 'w') | ||
try: | ||
fcntl.lockf(self._file, fcntl.LOCK_EX | fcntl.LOCK_NB) | ||
except OSError: | ||
return False | ||
else: | ||
return True | ||
|
||
def __del__(self): | ||
global _lock | ||
if self._file_lock_acquired: | ||
fcntl.lockf(self._file, fcntl.LOCK_UN) | ||
if self._thread_lock_acquired: | ||
_lock.release() | ||
|
||
|
||
class Controller: | ||
""" The concrete instance of the controller for actually controlling | ||
hardware. | ||
This class may only be instantiated on a robot, and only one instance | ||
may be active at any time. | ||
""" | ||
|
||
def __init__(self, config, loop): | ||
""" Build a Controller instance. | ||
If another controller is already instantiated on the system (or if | ||
this is instantiated somewhere other than a robot) then this method | ||
will raise a RuntimeError. | ||
""" | ||
if not os.environ.get('RUNNING_ON_PI'): | ||
raise RuntimeError('{} may only be instantiated on a robot' | ||
.format(self.__class__.__name__)) | ||
self._lock = _Locker() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
class Simulator: | ||
""" This is a subclass of hardware_control that only simulates the | ||
hardware actions. It is suitable for use on a dev machine or on | ||
a robot with no smoothie connected. | ||
""" | ||
|
||
def __init__(self, config, loop): | ||
self._config = config | ||
self._loop = loop |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
66 changes: 66 additions & 0 deletions
66
api/tests/opentrons/hardware_control/test_instantiation.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import subprocess | ||
import threading | ||
|
||
import pytest | ||
from opentrons import hardware_control as hc | ||
|
||
if not hc.controller: | ||
pytest.skip('hardware controller not available (probably windows)', | ||
allow_module_level=True) | ||
|
||
|
||
def test_controller_runs_only_on_pi(): | ||
with pytest.raises(RuntimeError): | ||
c = hc.API.build_hardware_controller() # noqa | ||
|
||
|
||
def test_controller_instantiates( | ||
hardware_controller_lockfile, running_on_pi, loop): | ||
c = hc.API.build_hardware_controller(loop=loop) | ||
assert None is not c | ||
|
||
|
||
def test_controller_unique_per_thread( | ||
hardware_controller_lockfile, running_on_pi, loop): | ||
c = hc.API.build_hardware_controller(loop=loop) # noqa | ||
with pytest.raises(RuntimeError): | ||
_ = hc.API.build_hardware_controller(loop=loop) # noqa | ||
|
||
def _create_in_new_thread(): | ||
with pytest.raises(RuntimeError): | ||
_ = hc.API.build_hardware_controller(loop=loop) # noqa | ||
|
||
thread = threading.Thread(target=_create_in_new_thread) | ||
thread.start() | ||
thread.join() | ||
|
||
async def _create_in_coroutine(): | ||
_ = hc.API.build_hardware_controller(loop=loop) # noqa | ||
|
||
fut = _create_in_coroutine() | ||
with pytest.raises(RuntimeError): | ||
loop.run_until_complete(fut) | ||
|
||
|
||
def test_controller_unique_per_proc( | ||
hardware_controller_lockfile, running_on_pi, loop): | ||
c = hc.API.build_hardware_controller(loop=loop) # noqa | ||
|
||
script = '''import os | ||
os.environ.pop('RUNNING_ON_PI') | ||
import opentrons.hardware_control as hc | ||
os.environ['RUNNING_ON_PI'] = '1' | ||
try: | ||
hc.API.build_hardware_controller() | ||
except RuntimeError: | ||
print('ok') | ||
except Exception as e: | ||
print('unexpected exception: {}'.format(repr(e))) | ||
else: | ||
print('no exception') | ||
''' | ||
|
||
cmd = ['python', '-c', script] | ||
|
||
out = subprocess.check_output(cmd) | ||
assert out.split(b'\n')[-2] == b'ok' |