Skip to content

Commit

Permalink
events: add support for async event handlers
Browse files Browse the repository at this point in the history
See documentation for details.
  • Loading branch information
marmarek committed Jul 4, 2017
1 parent 6238254 commit ea1a04c
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 10 deletions.
2 changes: 1 addition & 1 deletion contrib/check-events
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class EventVisitor(ast.NodeVisitor):
# events this way
return

if name.endswith('.fire_event') or name.endswith('.fire_event_pre'):
if name.endswith('.fire_event') or name.endswith('.fire_event_async'):
# here we throw events; event name is the first argument; sometimes
# it is expressed as 'event-stem:' + some_variable
eventnode = node.args[0]
Expand Down
69 changes: 69 additions & 0 deletions doc/qubes-events.rst
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,75 @@ returned to the caller as list. The order of this list is undefined.
effect = o.fire_event('event1')
Asynchronous event handling
---------------------------

Event handlers can be defined as coroutine. This way they can execute long
running actions without blocking the whole qubesd process. To define
asynchronous event handler, annotate a coroutine (a function defined with
`async def`, or decorated with `py:func:`asyncio.coroutine`) with
py:func:`qubes.events.handler` decorator. By definition, order of
such handlers is undefined.

Asynchronous events can be fired using
:py:meth:`qubes.events.Emitter.fire_event_async` method. It will handle both
synchronous and asynchronous handlers. It's an error to register asynchronous
handler (a coroutine) for synchronous event (the one fired with
:py:meth:`qubes.events.Emitter.fire_event`) - it will result in
:py:exc:`RuntimeError` exception.

.. code-block:: python
import asyncio
import qubes.events
class MyClass(qubes.events.Emitter):
@qubes.events.handler('event1', 'event2')
@asyncio.coroutine
def event_handler(self, event):
if event == 'event1':
print('Got event 1, starting long running action')
yield from asyncio.sleep(10)
print('Done')
o = MyClass()
loop = asyncio.get_event_loop()
loop.run_until_complete(o.fire_event_async('event1'))
Asynchronous event handlers can also return value - but only a collection, not
yield individual values (because of python limitation):

.. code-block:: python
import asyncio
import qubes.events
class MyClass(qubes.events.Emitter):
@qubes.events.handler('event1')
@asyncio.coroutine
def event_handler(self, event):
print('Got event, starting long running action')
yield from asyncio.sleep(10)
return ('result1', 'result2')
@qubes.events.handler('event1')
@asyncio.coroutine
def another_handler(self, event):
print('Got event, starting long running action')
yield from asyncio.sleep(10)
return ('result3', 'result4')
@qubes.events.handler('event1')
def synchronous_handler(self, event):
yield 'sync result'
o = MyClass()
loop = asyncio.get_event_loop()
# returns ['sync result', 'result1', 'result2', 'result3', 'result4'],
# possibly not in order
effects = loop.run_until_complete(o.fire_event_async('event1'))
Module contents
---------------

Expand Down
70 changes: 61 additions & 9 deletions qubes/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
Events are fired when something happens, like VM start or stop, property change
etc.
'''

import asyncio
import collections

import itertools
Expand All @@ -36,14 +36,14 @@ def handler(*events):
To hook an event, decorate a method in your plugin class with this
decorator.
It probably makes no sense to specify more than one handler for specific
event in one class, because handlers are not run concurrently and there is
no guarantee of the order of execution.
Some event handlers may be defined as coroutine. In such a case, *async*
should be set to :py:obj:``True``.
See appropriate event documentation for details.
.. note::
For hooking events from extensions, see :py:func:`qubes.ext.handler`.
:param str event: event type
:param str events: events
'''

def decorator(func):
Expand Down Expand Up @@ -141,13 +141,14 @@ def _fire_event(self, event, kwargs, pre_event=False):
'''

if not self.events_enabled:
return []
return [], []

order = itertools.chain((self,), self.__class__.__mro__)
if not pre_event:
order = reversed(list(order))

effects = []
async_effects = []
for i in order:
try:
handlers_dict = i.__handlers__
Expand All @@ -160,9 +161,11 @@ def _fire_event(self, event, kwargs, pre_event=False):
key=(lambda handler: hasattr(handler, 'ha_bound')),
reverse=True):
effect = func(self, event, **kwargs)
if effect is not None:
if asyncio.iscoroutinefunction(func):
async_effects.append(effect)
elif effect is not None:
effects.extend(effect)
return effects
return effects, async_effects

def fire_event(self, event, pre_event=False, **kwargs):
'''Call all handlers for an event.
Expand All @@ -173,6 +176,13 @@ def fire_event(self, event, pre_event=False, **kwargs):
(specified in class definition), then handlers from extensions. Aside
from above, remaining order is undefined.
This method call only synchronous handlers. If any asynchronous
handler is registered for the event, :py:class:``RuntimeError`` is
raised.
.. seealso::
:py:meth:`fire_event_async`
:param str event: event identifier
:param pre_event: is this -pre- event? reverse handlers calling order
:returns: list of effects
Expand All @@ -181,4 +191,46 @@ def fire_event(self, event, pre_event=False, **kwargs):
events.
'''

return self._fire_event(event, kwargs, pre_event=pre_event)
sync_effects, async_effects = self._fire_event(event, kwargs,
pre_event=pre_event)
if async_effects:
raise RuntimeError(
'unexpected async-handler(s) {!r} for sync event {!s}'.format(
async_effects, event))
return sync_effects


@asyncio.coroutine
def fire_event_async(self, event, pre_event=False, **kwargs):
'''Call all handlers for an event, allowing async calls.
Handlers are called for class and all parent classes, in **reversed**
or **true** (depending on *pre_event* parameter)
method resolution order. For each class first are called bound handlers
(specified in class definition), then handlers from extensions. Aside
from above, remaining order is undefined.
This method call both synchronous and asynchronous handlers. Order of
asynchronous calls is, by definition, undefined.
.. seealso::
:py:meth:`fire_event`
:param str event: event identifier
:param pre_event: is this -pre- event? reverse handlers calling order
:returns: list of effects
All *kwargs* are passed verbatim. They are different for different
events.
'''

sync_effects, async_effects = self._fire_event(event,
kwargs, pre_event=pre_event)
effects = sync_effects
if async_effects:
async_tasks, _ = yield from asyncio.wait(async_effects)
for task in async_tasks:
effect = task.result()
if effect is not None:
effects.extend(effect)
return effects
12 changes: 12 additions & 0 deletions qubes/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ def fire_event(self, event, **kwargs):
self.fired_events[(event, ev_kwargs)] += 1
return effects

@asyncio.coroutine
def fire_event_async(self, event, pre_event=False, **kwargs):
effects = yield from super(TestEmitter, self).fire_event_async(
event, pre_event=pre_event, **kwargs)
ev_kwargs = frozenset(
(key,
frozenset(value.items()) if isinstance(value, dict) else value)
for key, value in kwargs.items()
)
self.fired_events[(event, ev_kwargs)] += 1
return effects


def expectedFailureIfTemplate(templates):
"""
Expand Down
34 changes: 34 additions & 0 deletions qubes/tests/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import asyncio

import qubes.events
import qubes.tests
Expand Down Expand Up @@ -134,3 +135,36 @@ def on_testevent_2(subject, event):
['testevent_2', 'testevent_1'])
self.assertEqual(list(effect2),
['testevent_1'])

def test_005_fire_for_effect_async(self):
class TestEmitter(qubes.events.Emitter):
@qubes.events.handler('testevent')
@asyncio.coroutine
def on_testevent_1(self, event):
pass

@qubes.events.handler('testevent')
@asyncio.coroutine
def on_testevent_2(self, event):
yield from asyncio.sleep(0.01)
return ['testvalue1']

@qubes.events.handler('testevent')
@asyncio.coroutine
def on_testevent_3(self, event):
return ('testvalue2', 'testvalue3')

@qubes.events.handler('testevent')
def on_testevent_4(self, event):
return ('testvalue4',)

loop = asyncio.get_event_loop()
emitter = TestEmitter()
emitter.events_enabled = True

effect = loop.run_until_complete(emitter.fire_event_async('testevent'))
loop.close()
asyncio.set_event_loop(None)

self.assertCountEqual(effect,
('testvalue1', 'testvalue2', 'testvalue3', 'testvalue4'))

0 comments on commit ea1a04c

Please sign in to comment.