From ba1afe2ff4d748f29c53a4d398d826d69d9f0de4 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Thu, 12 Mar 2020 09:39:48 -0400 Subject: [PATCH] fix(api): call module functions across threads (#5194) Moving to the threadmanager broke the case of calling module functions through the threadmanager, since the module objects would be returned directly. They are now wrapped in CallBridger objects that have the __getattribute__ override the same as thread managers but don't do the actual thread managing. This is LRU cached so we don't build a bunch of them. --- .../hardware_control/thread_manager.py | 107 +++++++++++++----- .../opentrons/legacy_api/modules/tempdeck.py | 2 +- api/tests/opentrons/protocol_api/test_util.py | 6 +- 3 files changed, 81 insertions(+), 34 deletions(-) diff --git a/api/src/opentrons/hardware_control/thread_manager.py b/api/src/opentrons/hardware_control/thread_manager.py index 8fe1e3e0c82..037fbd78d28 100644 --- a/api/src/opentrons/hardware_control/thread_manager.py +++ b/api/src/opentrons/hardware_control/thread_manager.py @@ -4,7 +4,9 @@ import logging import asyncio import functools +from typing import Generic, TypeVar, Any from .adapters import SynchronousAdapter +from .modules.mod_abc import AbstractModule MODULE_LOG = logging.getLogger(__name__) @@ -13,6 +15,56 @@ class ThreadManagerException(Exception): pass +async def call_coroutine_threadsafe( + loop: asyncio.AbstractEventLoop, + coro, *args, **kwargs) -> asyncio.Future: + fut = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop) + wrapped = asyncio.wrap_future(fut) + return await wrapped + + +WrappedObj = TypeVar('WrappedObj') + + +class CallBridger(Generic[WrappedObj]): + def __init__( + self, + wrapped_obj: WrappedObj, + loop: asyncio.AbstractEventLoop) -> None: + self.wrapped_obj = wrapped_obj + self._loop = loop + + def __getattribute__(self, attr_name: str) -> Any: + # Almost every attribute retrieved from us will be for people actually + # looking for an attribute of the managed object, so check there first. + managed_obj = object.__getattribute__(self, 'wrapped_obj') + loop = object.__getattribute__(self, '_loop') + try: + attr = getattr(managed_obj, attr_name) + except AttributeError: + # Maybe this actually was for us? Let’s find it + return object.__getattribute__(self, attr_name) + + if asyncio.iscoroutinefunction(attr): + # Return coroutine result of async function + # executed in managed thread to calling thread + + @functools.wraps(attr) + async def wrapper(*args, **kwargs): + return await call_coroutine_threadsafe( + loop, attr, *args, **kwargs) + + return wrapper + + elif asyncio.iscoroutine(attr): + # Return awaitable coroutine properties run in managed thread/loop + fut = asyncio.run_coroutine_threadsafe(attr, loop) + wrapped = asyncio.wrap_future(fut) + return wrapped + + return attr + + # TODO: BC 2020-02-25 instead of overwriting __get_attribute__ in this class # use inspect.getmembers to iterate over appropriate members of adapted # instance and setattr on the outer instance with the proper threadsafe @@ -20,7 +72,7 @@ class ThreadManagerException(Exception): # object.__get_attribute__(self,...) to opt out of the overwritten # functionality. It is more readable and protected from # unintentional recursion. -class ThreadManager(): +class ThreadManager: """ A wrapper to make every call into :py:class:`.hardware_control.API` execute within the same thread. @@ -40,7 +92,7 @@ class ThreadManager(): >>> api_single_thread.sync.home() # call as blocking sync """ - def __init__(self, builder, *args, **kwargs) -> None: + def __init__(self, builder, *args, **kwargs): """ Build the ThreadManager. :param builder: The API function to use @@ -48,6 +100,7 @@ def __init__(self, builder, *args, **kwargs) -> None: self._loop = None self.managed_obj = None + self.bridged_obj = None self._sync_managed_obj = None is_running = threading.Event() self._is_running = is_running @@ -71,6 +124,7 @@ def _build_and_start_loop(self, builder, *args, **kwargs): loop=loop, **kwargs)) self.managed_obj = managed_obj + self.bridged_obj = CallBridger(managed_obj, loop) self._sync_managed_obj = SynchronousAdapter(managed_obj) except Exception: MODULE_LOG.exception('Exception in Thread Manager build') @@ -92,38 +146,31 @@ def clean_up(self): loop.call_soon_threadsafe(loop.stop) except Exception: pass + object.__getattribute__(self, 'wrap_module').cache_clear() object.__getattribute__(self, '_thread').join() def __del__(self): self.clean_up() - @staticmethod - async def call_coroutine_threadsafe(loop, coro, *args, **kwargs): - fut = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop) - wrapped = asyncio.wrap_future(fut) - return await wrapped + @functools.lru_cache(8) + def wrap_module( + self, module: AbstractModule) -> CallBridger[AbstractModule]: + return CallBridger(module, object.__getattribute__(self, '_loop')) def __getattribute__(self, attr_name): - # Almost every attribute retrieved from us will be for people actually - # looking for an attribute of the managed object, so check there first. - managed_obj = object.__getattribute__(self, 'managed_obj') - loop = object.__getattribute__(self, '_loop') - try: - attr = getattr(managed_obj, attr_name) - except AttributeError: - # Maybe this actually was for us? Let’s find it - return object.__getattribute__(self, attr_name) - - if asyncio.iscoroutinefunction(attr): - # Return coroutine result of async function - # executed in managed thread to calling thread - return functools.partial(self.call_coroutine_threadsafe, - loop, - attr) - elif asyncio.iscoroutine(attr): - # Return awaitable coroutine properties run in managed thread/loop - fut = asyncio.run_coroutine_threadsafe(attr, loop) - wrapped = asyncio.wrap_future(fut, loop=asyncio.get_event_loop()) - return wrapped - - return attr + # hardware_control.api.API.attached_modules is the only hardware + # API method that returns something other than data. The module + # objects it returns have associated methods that can be called. + # That means they need the same wrapping treatment as the API + # itself. + if attr_name == 'attached_modules': + wrap = object.__getattribute__(self, 'wrap_module') + managed = object.__getattribute__(self, 'managed_obj') + attr = getattr(managed, attr_name) + return [wrap(mod) for mod in attr] + else: + try: + return getattr( + object.__getattribute__(self, 'bridged_obj'), attr_name) + except AttributeError: + return object.__getattribute__(self, attr_name) diff --git a/api/src/opentrons/legacy_api/modules/tempdeck.py b/api/src/opentrons/legacy_api/modules/tempdeck.py index 7a1d8516bc3..29eac98e927 100644 --- a/api/src/opentrons/legacy_api/modules/tempdeck.py +++ b/api/src/opentrons/legacy_api/modules/tempdeck.py @@ -140,7 +140,7 @@ def connect(self): self._driver.connect(self._port) self._device_info = self._driver.get_device_info() self._poll_stop_event = Event() - Thread(target=self._poll_temperature).start() + Thread(target=self._poll_temperature, daemon=True).start() else: # Sanity check Should never happen, because connect should never # be called without a port on Module diff --git a/api/tests/opentrons/protocol_api/test_util.py b/api/tests/opentrons/protocol_api/test_util.py index 2bcba4678f0..63f79aaa071 100644 --- a/api/tests/opentrons/protocol_api/test_util.py +++ b/api/tests/opentrons/protocol_api/test_util.py @@ -4,7 +4,7 @@ from opentrons.hardware_control import API, adapters, types, ThreadManager -def test_hw_manager(loop): +async def test_hw_manager(loop): # When built without an input it should build its own adapter mgr = HardwareManager(None) adapter = mgr.hardware @@ -17,8 +17,8 @@ def test_hw_manager(loop): # When built with a hardware API input it should wrap it with a new # synchronous adapter and not build its own API - mgr = HardwareManager( - API.build_hardware_simulator(loop=loop)) + sim = await API.build_hardware_simulator(loop=loop) + mgr = HardwareManager(sim) assert isinstance(mgr.hardware, adapters.SynchronousAdapter) passed = mgr.hardware # When disconnecting from a real external adapter, it should create