Skip to content

Commit

Permalink
fix(api): call module functions across threads (#5194)
Browse files Browse the repository at this point in the history
Moving to the threadmanager broke the case of calling module functions
through the threadmanager, since the module objects would be returned
directly.

They are now wrapped in CallBridger objects that have the
__getattribute__ override the same as thread managers but don't do the
actual thread managing. This is LRU cached so we don't build a bunch of
them.
  • Loading branch information
sfoster1 authored Mar 12, 2020
1 parent c143bcf commit ba1afe2
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 34 deletions.
107 changes: 77 additions & 30 deletions api/src/opentrons/hardware_control/thread_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import logging
import asyncio
import functools
from typing import Generic, TypeVar, Any
from .adapters import SynchronousAdapter
from .modules.mod_abc import AbstractModule

MODULE_LOG = logging.getLogger(__name__)

Expand All @@ -13,14 +15,64 @@ class ThreadManagerException(Exception):
pass


async def call_coroutine_threadsafe(
loop: asyncio.AbstractEventLoop,
coro, *args, **kwargs) -> asyncio.Future:
fut = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop)
wrapped = asyncio.wrap_future(fut)
return await wrapped


WrappedObj = TypeVar('WrappedObj')


class CallBridger(Generic[WrappedObj]):
def __init__(
self,
wrapped_obj: WrappedObj,
loop: asyncio.AbstractEventLoop) -> None:
self.wrapped_obj = wrapped_obj
self._loop = loop

def __getattribute__(self, attr_name: str) -> Any:
# Almost every attribute retrieved from us will be for people actually
# looking for an attribute of the managed object, so check there first.
managed_obj = object.__getattribute__(self, 'wrapped_obj')
loop = object.__getattribute__(self, '_loop')
try:
attr = getattr(managed_obj, attr_name)
except AttributeError:
# Maybe this actually was for us? Let’s find it
return object.__getattribute__(self, attr_name)

if asyncio.iscoroutinefunction(attr):
# Return coroutine result of async function
# executed in managed thread to calling thread

@functools.wraps(attr)
async def wrapper(*args, **kwargs):
return await call_coroutine_threadsafe(
loop, attr, *args, **kwargs)

return wrapper

elif asyncio.iscoroutine(attr):
# Return awaitable coroutine properties run in managed thread/loop
fut = asyncio.run_coroutine_threadsafe(attr, loop)
wrapped = asyncio.wrap_future(fut)
return wrapped

return attr


# TODO: BC 2020-02-25 instead of overwriting __get_attribute__ in this class
# use inspect.getmembers to iterate over appropriate members of adapted
# instance and setattr on the outer instance with the proper threadsafe
# resolution logic injected. This approach avoids requiring calls to
# object.__get_attribute__(self,...) to opt out of the overwritten
# functionality. It is more readable and protected from
# unintentional recursion.
class ThreadManager():
class ThreadManager:
""" A wrapper to make every call into :py:class:`.hardware_control.API`
execute within the same thread.
Expand All @@ -40,14 +92,15 @@ class ThreadManager():
>>> api_single_thread.sync.home() # call as blocking sync
"""

def __init__(self, builder, *args, **kwargs) -> None:
def __init__(self, builder, *args, **kwargs):
""" Build the ThreadManager.
:param builder: The API function to use
"""

self._loop = None
self.managed_obj = None
self.bridged_obj = None
self._sync_managed_obj = None
is_running = threading.Event()
self._is_running = is_running
Expand All @@ -71,6 +124,7 @@ def _build_and_start_loop(self, builder, *args, **kwargs):
loop=loop,
**kwargs))
self.managed_obj = managed_obj
self.bridged_obj = CallBridger(managed_obj, loop)
self._sync_managed_obj = SynchronousAdapter(managed_obj)
except Exception:
MODULE_LOG.exception('Exception in Thread Manager build')
Expand All @@ -92,38 +146,31 @@ def clean_up(self):
loop.call_soon_threadsafe(loop.stop)
except Exception:
pass
object.__getattribute__(self, 'wrap_module').cache_clear()
object.__getattribute__(self, '_thread').join()

def __del__(self):
self.clean_up()

@staticmethod
async def call_coroutine_threadsafe(loop, coro, *args, **kwargs):
fut = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop)
wrapped = asyncio.wrap_future(fut)
return await wrapped
@functools.lru_cache(8)
def wrap_module(
self, module: AbstractModule) -> CallBridger[AbstractModule]:
return CallBridger(module, object.__getattribute__(self, '_loop'))

def __getattribute__(self, attr_name):
# Almost every attribute retrieved from us will be for people actually
# looking for an attribute of the managed object, so check there first.
managed_obj = object.__getattribute__(self, 'managed_obj')
loop = object.__getattribute__(self, '_loop')
try:
attr = getattr(managed_obj, attr_name)
except AttributeError:
# Maybe this actually was for us? Let’s find it
return object.__getattribute__(self, attr_name)

if asyncio.iscoroutinefunction(attr):
# Return coroutine result of async function
# executed in managed thread to calling thread
return functools.partial(self.call_coroutine_threadsafe,
loop,
attr)
elif asyncio.iscoroutine(attr):
# Return awaitable coroutine properties run in managed thread/loop
fut = asyncio.run_coroutine_threadsafe(attr, loop)
wrapped = asyncio.wrap_future(fut, loop=asyncio.get_event_loop())
return wrapped

return attr
# hardware_control.api.API.attached_modules is the only hardware
# API method that returns something other than data. The module
# objects it returns have associated methods that can be called.
# That means they need the same wrapping treatment as the API
# itself.
if attr_name == 'attached_modules':
wrap = object.__getattribute__(self, 'wrap_module')
managed = object.__getattribute__(self, 'managed_obj')
attr = getattr(managed, attr_name)
return [wrap(mod) for mod in attr]
else:
try:
return getattr(
object.__getattribute__(self, 'bridged_obj'), attr_name)
except AttributeError:
return object.__getattribute__(self, attr_name)
2 changes: 1 addition & 1 deletion api/src/opentrons/legacy_api/modules/tempdeck.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def connect(self):
self._driver.connect(self._port)
self._device_info = self._driver.get_device_info()
self._poll_stop_event = Event()
Thread(target=self._poll_temperature).start()
Thread(target=self._poll_temperature, daemon=True).start()
else:
# Sanity check Should never happen, because connect should never
# be called without a port on Module
Expand Down
6 changes: 3 additions & 3 deletions api/tests/opentrons/protocol_api/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from opentrons.hardware_control import API, adapters, types, ThreadManager


def test_hw_manager(loop):
async def test_hw_manager(loop):
# When built without an input it should build its own adapter
mgr = HardwareManager(None)
adapter = mgr.hardware
Expand All @@ -17,8 +17,8 @@ def test_hw_manager(loop):

# When built with a hardware API input it should wrap it with a new
# synchronous adapter and not build its own API
mgr = HardwareManager(
API.build_hardware_simulator(loop=loop))
sim = await API.build_hardware_simulator(loop=loop)
mgr = HardwareManager(sim)
assert isinstance(mgr.hardware, adapters.SynchronousAdapter)
passed = mgr.hardware
# When disconnecting from a real external adapter, it should create
Expand Down

0 comments on commit ba1afe2

Please sign in to comment.