diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 4fd6370..d58e7a5 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -38,6 +38,7 @@ import qubes.devices import qubes.ext import qubes.vm.adminvm +from qubes.ext import utils usb_device_re = re.compile(r"^[0-9]+-[0-9]+(_[0-9]+)*$") # should match valid VM name @@ -46,6 +47,7 @@ HWDATA_PATH = '/usr/share/hwdata' + class USBDevice(qubes.devices.DeviceInfo): # pylint: disable=too-few-public-methods def __init__(self, backend_domain, ident): @@ -148,9 +150,9 @@ def interfaces(self) -> List[qubes.devices.DeviceInterface]: @property def parent_device(self) -> Optional[qubes.devices.DeviceInfo]: """ - The parent device if any. + The parent device, if any. - USB device has no parents. + A USB device has no parents. """ return None @@ -266,7 +268,7 @@ def attachment(self): @property def self_identity(self) -> str: """ - Get identification of device not related to port. + Get identification of a device not related to port. """ if self._vendor_id is None: vendor_id = self._load_desc_from_qubesdb()["vendor ID"] @@ -288,7 +290,7 @@ def _get_vendor_and_product_names( """ Return tuple of vendor's and product's names for the ids. - If the id is not known return ("unknown", "unknown"). + If the id is not known, return ("unknown", "unknown"). """ return (USBDevice._load_usb_known_devices() .get(vendor_id, dict()) @@ -425,7 +427,7 @@ def on_domain_init_load(self, vm, event): else: self.devices_cache[vm.name] = {} - async def _attach_and_notify(self, vm, device, options): + async def attach_and_notify(self, vm, device, options): # bypass DeviceCollection logic preventing double attach try: await self.on_device_attach_usb( @@ -437,76 +439,11 @@ async def _attach_and_notify(self, vm, device, options): @qubes.ext.handler('domain-qdb-change:/qubes-usb-devices') def on_qdb_change(self, vm, event, path): - """A change in QubesDB means a change in device list.""" + """A change in QubesDB means a change in a device list.""" # pylint: disable=unused-argument,no-self-use - vm.fire_event('device-list-change:usb') current_devices = dict((dev.ident, dev.attachment) - for dev in self.on_device_list_usb(vm, None)) - - # send events about devices detached/attached outside by themselves - # (like device pulled out or manual qubes.USB qrexec call) - # compare cached devices and current devices, collect: - # - newly appeared devices (ident) - # - devices attached from a vm to frontend vm (ident: frontend_vm) - # - devices detached from frontend vm (ident: frontend_vm) - # - disappeared devices, e.g. plugged out (ident) - added = set() - attached = dict() - detached = dict() - removed = set() - cache = self.devices_cache[vm.name] - for dev_id, front_vm in current_devices.items(): - if dev_id not in cache: - added.add(dev_id) - if front_vm is not None: - attached[dev_id] = front_vm - elif cache[dev_id] != front_vm: - cached_front = cache[dev_id] - if front_vm is None: - detached[dev_id] = cached_front - elif cached_front is None: - attached[dev_id] = front_vm - else: - # front changed from one to another, so we signal it as: - # detach from first one and attach to the second one. - detached[dev_id] = cached_front - attached[dev_id] = front_vm - - for dev_id, cached_front in cache.items(): - if dev_id not in current_devices: - removed.add(dev_id) - if cached_front is not None: - detached[dev_id] = cached_front - - for dev_id, front_vm in detached.items(): - dev = USBDevice(vm, dev_id) - asyncio.ensure_future(front_vm.fire_event_async( - 'device-detach:usb', device=dev)) - for dev_id in removed: - device = USBDevice(vm, dev_id) - vm.fire_event('device-removed:usb', device=device) - for dev_id in added: - device = USBDevice(vm, dev_id) - vm.fire_event('device-added:usb', device=device) - for dev_ident, front_vm in attached.items(): - dev = USBDevice(vm, dev_ident) - asyncio.ensure_future(front_vm.fire_event_async( - 'device-attach:usb', device=dev, - options={'identity': dev.self_identity}) - ) - - self.devices_cache[vm.name] = current_devices - - for front_vm in vm.app.domains: - if not front_vm.is_running(): - continue - for assignment in front_vm.devices['usb'].get_assigned_devices(): - if (assignment.backend_domain == vm - and assignment.ident in added - and assignment.ident not in attached - ): - asyncio.ensure_future(self._attach_and_notify( - front_vm, assignment.device, assignment.options)) + for dev in self.on_device_list_usb(vm, None)) + utils.device_list_change(self, current_devices, vm, path, USBDevice) @qubes.ext.handler('device-list:usb') def on_device_list_usb(self, vm, event): @@ -522,7 +459,7 @@ def on_device_list_usb(self, vm, event): untrusted_dev_list = vm.untrusted_qdb.list('/qubes-usb-devices/') if not untrusted_dev_list: return - # just get list of devices, not its every property + # just get a list of devices, not its every property untrusted_dev_list = \ set(path.split('/')[2] for path in untrusted_dev_list) for untrusted_qdb_ident in untrusted_dev_list: @@ -670,7 +607,7 @@ async def on_device_detach_usb(self, vm, event, device): async def on_domain_start(self, vm, _event, **_kwargs): # pylint: disable=unused-argument for assignment in vm.devices['usb'].get_assigned_devices(): - asyncio.ensure_future(self._attach_and_notify( + asyncio.ensure_future(self.attach_and_notify( vm, assignment.device, assignment.options)) @qubes.ext.handler('domain-shutdown')