-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(api): call module functions across threads #5194
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,9 @@ | |
import logging | ||
import asyncio | ||
import functools | ||
from typing import Generic, TypeVar, Any | ||
from .adapters import SynchronousAdapter | ||
from .modules.mod_abc import AbstractModule | ||
|
||
MODULE_LOG = logging.getLogger(__name__) | ||
|
||
|
@@ -13,14 +15,64 @@ class ThreadManagerException(Exception): | |
pass | ||
|
||
|
||
async def call_coroutine_threadsafe( | ||
loop: asyncio.AbstractEventLoop, | ||
coro, *args, **kwargs) -> asyncio.Future: | ||
fut = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop) | ||
wrapped = asyncio.wrap_future(fut) | ||
return await wrapped | ||
|
||
|
||
WrappedObj = TypeVar('WrappedObj') | ||
|
||
|
||
class CallBridger(Generic[WrappedObj]): | ||
def __init__( | ||
self, | ||
wrapped_obj: WrappedObj, | ||
loop: asyncio.AbstractEventLoop) -> None: | ||
self.wrapped_obj = wrapped_obj | ||
self._loop = loop | ||
|
||
def __getattribute__(self, attr_name: str) -> Any: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As this is really identical to the |
||
# Almost every attribute retrieved from us will be for people actually | ||
# looking for an attribute of the managed object, so check there first. | ||
managed_obj = object.__getattribute__(self, 'wrapped_obj') | ||
loop = object.__getattribute__(self, '_loop') | ||
try: | ||
attr = getattr(managed_obj, attr_name) | ||
except AttributeError: | ||
# Maybe this actually was for us? Let’s find it | ||
return object.__getattribute__(self, attr_name) | ||
|
||
if asyncio.iscoroutinefunction(attr): | ||
# Return coroutine result of async function | ||
# executed in managed thread to calling thread | ||
|
||
@functools.wraps(attr) | ||
async def wrapper(*args, **kwargs): | ||
return await call_coroutine_threadsafe( | ||
loop, attr, *args, **kwargs) | ||
|
||
return wrapper | ||
|
||
elif asyncio.iscoroutine(attr): | ||
# Return awaitable coroutine properties run in managed thread/loop | ||
fut = asyncio.run_coroutine_threadsafe(attr, loop) | ||
wrapped = asyncio.wrap_future(fut) | ||
return wrapped | ||
|
||
return attr | ||
|
||
|
||
# TODO: BC 2020-02-25 instead of overwriting __get_attribute__ in this class | ||
# use inspect.getmembers to iterate over appropriate members of adapted | ||
# instance and setattr on the outer instance with the proper threadsafe | ||
# resolution logic injected. This approach avoids requiring calls to | ||
# object.__get_attribute__(self,...) to opt out of the overwritten | ||
# functionality. It is more readable and protected from | ||
# unintentional recursion. | ||
class ThreadManager(): | ||
class ThreadManager: | ||
""" A wrapper to make every call into :py:class:`.hardware_control.API` | ||
execute within the same thread. | ||
|
||
|
@@ -40,14 +92,15 @@ class ThreadManager(): | |
>>> api_single_thread.sync.home() # call as blocking sync | ||
""" | ||
|
||
def __init__(self, builder, *args, **kwargs) -> None: | ||
def __init__(self, builder, *args, **kwargs): | ||
""" Build the ThreadManager. | ||
|
||
:param builder: The API function to use | ||
""" | ||
|
||
self._loop = None | ||
self.managed_obj = None | ||
self.bridged_obj = None | ||
self._sync_managed_obj = None | ||
is_running = threading.Event() | ||
self._is_running = is_running | ||
|
@@ -71,6 +124,7 @@ def _build_and_start_loop(self, builder, *args, **kwargs): | |
loop=loop, | ||
**kwargs)) | ||
self.managed_obj = managed_obj | ||
self.bridged_obj = CallBridger(managed_obj, loop) | ||
self._sync_managed_obj = SynchronousAdapter(managed_obj) | ||
except Exception: | ||
MODULE_LOG.exception('Exception in Thread Manager build') | ||
|
@@ -92,38 +146,31 @@ def clean_up(self): | |
loop.call_soon_threadsafe(loop.stop) | ||
except Exception: | ||
pass | ||
object.__getattribute__(self, 'wrap_module').cache_clear() | ||
object.__getattribute__(self, '_thread').join() | ||
|
||
def __del__(self): | ||
self.clean_up() | ||
|
||
@staticmethod | ||
async def call_coroutine_threadsafe(loop, coro, *args, **kwargs): | ||
fut = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop) | ||
wrapped = asyncio.wrap_future(fut) | ||
return await wrapped | ||
@functools.lru_cache(8) | ||
def wrap_module( | ||
self, module: AbstractModule) -> CallBridger[AbstractModule]: | ||
return CallBridger(module, object.__getattribute__(self, '_loop')) | ||
|
||
def __getattribute__(self, attr_name): | ||
# Almost every attribute retrieved from us will be for people actually | ||
# looking for an attribute of the managed object, so check there first. | ||
managed_obj = object.__getattribute__(self, 'managed_obj') | ||
loop = object.__getattribute__(self, '_loop') | ||
try: | ||
attr = getattr(managed_obj, attr_name) | ||
except AttributeError: | ||
# Maybe this actually was for us? Let’s find it | ||
return object.__getattribute__(self, attr_name) | ||
|
||
if asyncio.iscoroutinefunction(attr): | ||
# Return coroutine result of async function | ||
# executed in managed thread to calling thread | ||
return functools.partial(self.call_coroutine_threadsafe, | ||
loop, | ||
attr) | ||
elif asyncio.iscoroutine(attr): | ||
# Return awaitable coroutine properties run in managed thread/loop | ||
fut = asyncio.run_coroutine_threadsafe(attr, loop) | ||
wrapped = asyncio.wrap_future(fut, loop=asyncio.get_event_loop()) | ||
return wrapped | ||
|
||
return attr | ||
# hardware_control.api.API.attached_modules is the only hardware | ||
# API method that returns something other than data. The module | ||
# objects it returns have associated methods that can be called. | ||
# That means they need the same wrapping treatment as the API | ||
# itself. | ||
if attr_name == 'attached_modules': | ||
amitlissack marked this conversation as resolved.
Show resolved
Hide resolved
|
||
wrap = object.__getattribute__(self, 'wrap_module') | ||
managed = object.__getattribute__(self, 'managed_obj') | ||
attr = getattr(managed, attr_name) | ||
return [wrap(mod) for mod in attr] | ||
else: | ||
try: | ||
return getattr( | ||
object.__getattribute__(self, 'bridged_obj'), attr_name) | ||
except AttributeError: | ||
return object.__getattribute__(self, attr_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this another interface like
ThreadManager
that wraps an API object? What is aCallBridger
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a threadmanager lite, I suppose. The
ThreadManager
is-aCallBridger
, but it also is the actual owner of the thread. It could probably be changed to use theCallBridger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a future state where calling methods on a HardwareController or API or whatever it is does not happen through a
ThreadManager
orCallBridger
any other wrapper that hides the interface?It's probably beyond the scope of this PR, but It's worth stating that the developer experience of using these wrappers is not pleasant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is; it would be when the Smoothie serial driver and the module serial drivers are async. Right now, the serial transactions block and therefore block the event loop.