Skip to content

Commit

Permalink
Merge branch 'devel20210830'
Browse files Browse the repository at this point in the history
  • Loading branch information
marmarek committed Sep 28, 2021
2 parents baf5037 + ecf9723 commit 8f07e75
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 22 deletions.
23 changes: 21 additions & 2 deletions qubesadmin/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def __init__(self, app, api_method='admin.Events', enable_cache=True):
#: event handlers - dict of event -> handlers
self.handlers = {}

#: used to stop processing events
self._reader_task = None

if enable_cache:
self.app.cache_enabled = True

Expand Down Expand Up @@ -135,9 +138,15 @@ async def listen_for_events(self, vm=None, reconnect=True):
'''
while True:
try:
await self._listen_for_events(vm)
self._reader_task = asyncio.create_task(
self._listen_for_events(vm))
await self._reader_task
except (OSError, qubesadmin.exc.QubesDaemonCommunicationError):
pass
except asyncio.CancelledError:
break
finally:
self._reader_task = None
if not reconnect:
break
self.app.log.warning(
Expand Down Expand Up @@ -198,6 +207,11 @@ async def _listen_for_events(self, vm=None):
cleanup_func()
return some_event_received

def stop(self):
"""Stop currently running dispatcher"""
if self._reader_task:
self._reader_task.cancel()

def handle(self, subject, event, **kwargs):
"""Call handlers for given event"""
# pylint: disable=protected-access
Expand Down Expand Up @@ -235,4 +249,9 @@ def handle(self, subject, event, **kwargs):
for h_func in h_func_set
if fnmatch.fnmatch(event, h_name)]
for handler in handlers:
handler(subject, event, **kwargs)
try:
handler(subject, event, **kwargs)
except: # pylint: disable=bare-except
self.app.log.exception(
'Failed to handle event: %s, %s, %s',
subject, event, kwargs)
33 changes: 13 additions & 20 deletions qubesadmin/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,42 @@
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.

''' Utilities for common events-based actions '''
""" Utilities for common events-based actions """

import functools

import qubesadmin.events
import qubesadmin.exc



class Interrupt(Exception):
'''Interrupt events processing'''


def interrupt_on_vm_shutdown(vms, subject, event):
'''Interrupt events processing when given VM was shutdown'''
def interrupt_on_vm_shutdown(vms, dispatcher, subject, event):
"""Interrupt events processing when given VM was shutdown"""
# pylint: disable=unused-argument
if event == 'connection-established':
if all(vm.is_halted() for vm in sorted(vms)):
raise Interrupt
for vm in sorted(vms):
if vm.is_halted():
vms.remove(vm)
elif event == 'domain-shutdown' and subject in vms:
vms.remove(subject)
if not vms:
raise Interrupt
if not vms:
dispatcher.stop()


async def wait_for_domain_shutdown(vms):
''' Helper function to wait for domain shutdown.
""" Helper function to wait for domain shutdown.
This function wait for domain shutdown, but do not initiate the shutdown
itself.
:param vms: QubesVM object collection to wait for shutdown on
'''
"""
if not vms:
return
app = list(vms)[0].app
vms = set(vms)
events = qubesadmin.events.EventsDispatcher(app, enable_cache=False)
events.add_handler('domain-shutdown',
functools.partial(interrupt_on_vm_shutdown, vms))
functools.partial(interrupt_on_vm_shutdown, vms, events))
events.add_handler('connection-established',
functools.partial(interrupt_on_vm_shutdown, vms))
try:
await events.listen_for_events()
except Interrupt:
pass
functools.partial(interrupt_on_vm_shutdown, vms, events))
await events.listen_for_events()
10 changes: 10 additions & 0 deletions qubesadmin/tests/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ def test_002_handler_glob_partial(self):
self.dispatcher.handle('', 'some-event', arg1='value1')
self.assertFalse(handler.called)

def test_003_handler_error(self):
handler = unittest.mock.Mock()
self.dispatcher.add_handler('some-event', handler)
handler2 = unittest.mock.Mock(side_effect=AssertionError)
self.dispatcher.add_handler('some-event', handler2)
# should catch the exception
self.dispatcher.handle('', 'some-event', arg1='value1')
handler.assert_called_once_with(None, 'some-event', arg1='value1')
handler2.assert_called_once_with(None, 'some-event', arg1='value1')

async def mock_get_events_reader(self, stream, cleanup_func, expected_vm,
vm=None):
self.assertEqual(expected_vm, vm)
Expand Down
1 change: 1 addition & 0 deletions qubesadmin/tests/tools/qvm_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def test_015_wait_all_kill_timeout(self):
self.app.expected_calls[
('some-vm', 'admin.vm.CurrentState', None, None)] = [
b'0\x00power_state=Running',
b'0\x00power_state=Running',
]
self.app.expected_calls[
('other-vm', 'admin.vm.CurrentState', None, None)] = [
Expand Down

0 comments on commit 8f07e75

Please sign in to comment.