Skip to content
This repository has been archived by the owner on Apr 26, 2020. It is now read-only.

Commit

Permalink
Move state signals from Domain to DomainManager
Browse files Browse the repository at this point in the history
- Add handy type aliases
- Migrate (most) typing to python3
- Remove __future__ imports
- Fix gi imports
- Handle more state signals
  • Loading branch information
kalkin committed Jun 13, 2017
1 parent f808502 commit ac7910e
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 87 deletions.
31 changes: 0 additions & 31 deletions qubesdbus/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,6 @@ def __init__(self, bus, bus_name, bus_path, data):
bus=bus, bus_name=bus_name,
bus_path=bus_path)

@dbus.service.signal(dbus_interface="org.qubes.Domain")
def Started(self):
""" Signal emited when the domain is started """
pass

@dbus.service.signal(dbus_interface="org.qubes.Domain")
def Halted(self):
""" Signal emited when the domain is halted """
pass

@dbus.service.signal(dbus_interface='org.freedesktop.DBus.Properties',
signature="sa{sv}as")
def PropertiesChanged(self, interface, changed_properties,
invalidated=None):
''' This signal is emitted when a property changes.
''' # pylint: disable=unused-argument
# type: (str, Dict[dbus.String, Any], List[dbus.String]) -> None
if 'state' in changed_properties:
value = changed_properties['state']
if value == 'Running':
self.Started()
elif value == 'Halted':
self.Halted()
super().PropertiesChanged(interface, changed_properties, invalidated)

@dbus.service.signal(
dbus_interface="org.qubes.DomainManager1.domains.Signals",
signature='s')
def StartingSignal(self, name):
self.properties['state'] = name

@dbus.service.method("org.qubes.Domain", out_signature="b")
def Shutdown(self):
app = qubesadmin.Qubes()
Expand Down
146 changes: 90 additions & 56 deletions qubesdbus/domain_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,33 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
''' org.qubes.DomainManager1 Service '''

from __future__ import absolute_import, print_function

import logging
import sys
from typing import Any, Dict, List, Union

import dbus
import dbus.service
import qubesadmin
from gi.repository import GLib
from systemd.journal import JournalHandler

import qubesadmin
import qubesdbus.serialize
from qubesdbus.domain import Domain
from qubesdbus.service import ObjectManager, PropertiesObject

try:
# Check mypy types. pylint: disable=ungrouped-imports, unused-import
from typing import Any, Union
except ImportError:
pass
import gi # isort:skip
gi.require_version('Gtk', '3.0') # isort:skip
from gi.repository import GLib # isort:skip pylint:disable=wrong-import-position

log = logging.getLogger('qubesdbus.DomainManager1')
log.addHandler(
JournalHandler(level=logging.DEBUG,
SYSLOG_IDENTIFIER='qubesdbus.domain_manager'))
log.addHandler(JournalHandler(level=logging.DEBUG, SYSLOG_IDENTIFIER='qubesdbus.domain_manager'))
log.propagate = True

# type aliases
DBusSignalMatch = dbus.connection.SignalMatch
DBusString = Union[str, dbus.String]
DBusProperties = Dict[DBusString, Any]

INTERFACE = 'org.qubes.DomainManager1'

class DomainManager(PropertiesObject, ObjectManager):
''' The `DomainManager` is the equivalent to the `qubes.Qubes` object for
Expand All @@ -55,70 +56,103 @@ class DomainManager(PropertiesObject, ObjectManager):
properties
'''

def __init__(self, data, domains):
# type: (Dict[dbus.String, Any], List[Dict[Union[str,dbus.String], Any]]) -> None
super(DomainManager, self).__init__('DomainManager1',
'org.qubes.DomainManager1', data)
def __init__(self, qubes_data: DBusProperties, domains: List[DBusProperties]) -> None:
super(DomainManager, self).__init__('DomainManager1', INTERFACE, qubes_data)
self.managed_objects = [self._proxify_domain(vm) for vm in domains]
self.state_signals = {
'Starting': self.Starting,
'Running' : self.Started,
'Failed' : self.Failed,
'Halting' : self.Halting,
'Halted' : self.Halted,
}
self.signal_matches = {} # type: Dict[dbus.ObjectPath, List[DBusSignalMatch]]

for domain in self.managed_objects:
self._setup_signals(domain)
obj_path = domain._object_path # pylint: disable=protected-access
self._setup_signals(obj_path)

def _setup_signals(self, obj_path: dbus.ObjectPath):
def emit_state_signal(dbus_interface: DBusString,
changed_properties: DBusProperties,
invalidated: dbus.Array=None # pylint: disable=unused-argument
) -> None:
''' Emit state signal when domain state property is changed. '''
assert dbus_interface == 'org.freedesktop.DBus.Properties'
if 'state' in changed_properties:
state = changed_properties['state']
assert state in self.state_signals
signal_func = self.state_signals[state]
signal_func(INTERFACE, obj_path)

signal_match = self.bus.add_signal_receiver(
emit_state_signal,
signal_name="PropertiesChanged",
dbus_interface='org.freedesktop.DBus.Properties') # type: DBusSignalMatch

if obj_path not in self.signal_matches:
self.signal_matches[obj_path] = list()

self.signal_matches[obj_path] += signal_match

@dbus.service.signal(INTERFACE, signature="so")
def Started(self, interface: DBusString, object_path: dbus.ObjectPath) -> None:
""" Signal emited when the domain is started and running"""
pass

def _setup_signals(self, obj):
obj_path = obj._object_path # pylint: disable=protected-access
@dbus.service.signal(INTERFACE, signature="so")
def Starting(self, interface: DBusString, object_path: dbus.ObjectPath) -> None:
""" Signal emited when the domain is starting """
pass

self.bus.add_signal_receiver(
lambda: self.Started("org.qubes.Domain", obj_path),
signal_name="Started", path=obj_path,
dbus_interface="org.qubes.Domain")
@dbus.service.signal(INTERFACE, signature="so")
def Failed(self, interface: DBusString, object_path: dbus.ObjectPath) -> None:
""" Signal emited when during the start up of the domain something went
wrong and the domain was halted"""
pass

self.bus.add_signal_receiver(
lambda: self.Halted("org.qubes.Domain", obj_path),
signal_name="Halted", path=obj_path,
dbus_interface="org.qubes.Domain")
@dbus.service.signal(INTERFACE, signature="so")
def Halting(self, interface: DBusString, object_path: dbus.ObjectPath) -> None:
""" Signal emited when the domain is shutting down"""
pass

@dbus.service.signal(INTERFACE, signature="so")
def Halted(self, interface: DBusString, object_path: dbus.ObjectPath) -> None:
""" Signal emited when the domain is halted"""
pass

@dbus.service.method(dbus_interface='org.qubes.DomainManager1',
in_signature='a{sv}b')
def AddDomain(self, vm, execute=False):
@dbus.service.method(dbus_interface=INTERFACE, in_signature='a{sv}b')
def AddDomain(self, vm: DBusProperties, execute: bool = False) -> bool:
''' Notify the `DomainManager` when a domain is added. This is
called by `QubesDbusProxy` when 'domain-create-on-disk' event
arrives from `core-admin`. UI programs which need to create an
actual vm should set `execute` to True.
''' # type: (Dict[dbus.String, Any], bool) -> bool
'''
if execute:
log.error('Creating domains via DBus is not implemented yet')
return False
else:
vm['qid'] = len(self.managed_objects)
domain = self._proxify_domain(vm)
self.managed_objects.append(domain)
log.info('Added domain %s', vm['name'])
# pylint: disable=protected-access
self.DomainAdded("org.qubes.DomainManager1", domain._object_path)
return True

@dbus.service.signal("org.qubes.DomainManager1", signature="so")
vm['qid'] = len(self.managed_objects)
domain = self._proxify_domain(vm)
self.managed_objects.append(domain)
log.info('Added domain %s', vm['name'])
# pylint: disable=protected-access
obj_path = domain._object_path # type: dbus.Object_Path
self._setup_signals(obj_path)
self.DomainAdded(INTERFACE, obj_path)
return True

@dbus.service.signal(INTERFACE, signature="so")
def DomainAdded(self, _, object_path):
''' This signal is emitted when a new domain is added '''
self.log.debug("Emiting DomainAdded signal: %s", object_path)

@dbus.service.signal("org.qubes.DomainManager1", signature="so")
def Halted(self, _, object_path):
print("Halted %s" % object_path)
pass

@dbus.service.signal("org.qubes.DomainManager1", signature="so")
def Started(self, _, object_path):
print("Started %s" % object_path)
pass

@dbus.service.signal("org.qubes.DomainManager1", signature="so")
@dbus.service.signal(INTERFACE, signature="so")
def DomainRemoved(self, _, object_path):
''' This signal is emitted when a new domain is removed '''
self.log.debug("Emiting DomainRemoved signal: %s", object_path)

@dbus.service.method(dbus_interface='org.qubes.DomainManager1',
@dbus.service.method(dbus_interface=INTERFACE,
in_signature='ob', out_signature='b')
def RemoveDomain(self, vm_dbus_path, execute=False):
''' Notify the `DomainManager` when a domain is removed. This is
Expand All @@ -134,22 +168,22 @@ def RemoveDomain(self, vm_dbus_path, execute=False):
if vm._object_path == vm_dbus_path:
vm.remove_from_connection()
self.managed_objects.remove(vm)
self.DomainRemoved("org.qubes.DomainManager1", vm._object_path)
self.DomainRemoved(INTERFACE, vm._object_path)
return True
return False

def _proxify_domain(self, vm):
# type: (Dict[Union[str,dbus.String], Any]) -> Domain
# type: (Dict[Union[str,DBusString], Any]) -> Domain
return Domain(self.bus, self.bus_name, self.bus_path, vm)


def main(args=None): # pylint: disable=unused-argument
''' Main function starting the DomainManager1 service. '''
loop = GLib.MainLoop()
app = qubesadmin.Qubes()
data = qubesdbus.serialize.qubes_data(app)
qubes_data = qubesdbus.serialize.qubes_data(app)
domains = [qubesdbus.serialize.domain_data(vm) for vm in app.domains]
_ = DomainManager(data, domains)
_ = DomainManager(qubes_data, domains)
print("Service running...")
loop.run()
print("Service stopped")
Expand Down

0 comments on commit ac7910e

Please sign in to comment.