Skip to content

Commit

Permalink
Add EventsDispatcher.stop()
Browse files Browse the repository at this point in the history
Avoid using exceptions to interrupt the dispatcher
  • Loading branch information
marmarek committed Sep 28, 2021
1 parent f186f7a commit f7ebf96
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
16 changes: 15 additions & 1 deletion qubesadmin/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def __init__(self, app, api_method='admin.Events', enable_cache=True):
#: event handlers - dict of event -> handlers
self.handlers = {}

#: used to stop processing events
self._reader_task = None

if enable_cache:
self.app.cache_enabled = True

Expand Down Expand Up @@ -135,9 +138,15 @@ async def listen_for_events(self, vm=None, reconnect=True):
'''
while True:
try:
await self._listen_for_events(vm)
self._reader_task = asyncio.create_task(
self._listen_for_events(vm))
await self._reader_task
except (OSError, qubesadmin.exc.QubesDaemonCommunicationError):
pass
except asyncio.CancelledError:
break
finally:
self._reader_task = None
if not reconnect:
break
self.app.log.warning(
Expand Down Expand Up @@ -198,6 +207,11 @@ async def _listen_for_events(self, vm=None):
cleanup_func()
return some_event_received

def stop(self):
"""Stop currently running dispatcher"""
if self._reader_task:
self._reader_task.cancel()

def handle(self, subject, event, **kwargs):
"""Call handlers for given event"""
# pylint: disable=protected-access
Expand Down
33 changes: 13 additions & 20 deletions qubesadmin/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,42 @@
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.

''' Utilities for common events-based actions '''
""" Utilities for common events-based actions """

import functools

import qubesadmin.events
import qubesadmin.exc



class Interrupt(Exception):
'''Interrupt events processing'''


def interrupt_on_vm_shutdown(vms, subject, event):
'''Interrupt events processing when given VM was shutdown'''
def interrupt_on_vm_shutdown(vms, dispatcher, subject, event):
"""Interrupt events processing when given VM was shutdown"""
# pylint: disable=unused-argument
if event == 'connection-established':
if all(vm.is_halted() for vm in sorted(vms)):
raise Interrupt
for vm in sorted(vms):
if vm.is_halted():
vms.remove(vm)
elif event == 'domain-shutdown' and subject in vms:
vms.remove(subject)
if not vms:
raise Interrupt
if not vms:
dispatcher.stop()


async def wait_for_domain_shutdown(vms):
''' Helper function to wait for domain shutdown.
""" Helper function to wait for domain shutdown.
This function wait for domain shutdown, but do not initiate the shutdown
itself.
:param vms: QubesVM object collection to wait for shutdown on
'''
"""
if not vms:
return
app = list(vms)[0].app
vms = set(vms)
events = qubesadmin.events.EventsDispatcher(app, enable_cache=False)
events.add_handler('domain-shutdown',
functools.partial(interrupt_on_vm_shutdown, vms))
functools.partial(interrupt_on_vm_shutdown, vms, events))
events.add_handler('connection-established',
functools.partial(interrupt_on_vm_shutdown, vms))
try:
await events.listen_for_events()
except Interrupt:
pass
functools.partial(interrupt_on_vm_shutdown, vms, events))
await events.listen_for_events()
1 change: 1 addition & 0 deletions qubesadmin/tests/tools/qvm_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def test_015_wait_all_kill_timeout(self):
self.app.expected_calls[
('some-vm', 'admin.vm.CurrentState', None, None)] = [
b'0\x00power_state=Running',
b'0\x00power_state=Running',
]
self.app.expected_calls[
('other-vm', 'admin.vm.CurrentState', None, None)] = [
Expand Down

0 comments on commit f7ebf96

Please sign in to comment.