From d985cf118dac454670d967c593aa1d5a3fd456af Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Tue, 5 Dec 2023 14:19:39 +0100 Subject: [PATCH 01/23] q-dev: implement part of new API for DeviceInfo --- qubesusbproxy/core3ext.py | 271 ++++++++++++++++++++++++++++++++++---- 1 file changed, 242 insertions(+), 29 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 34f56cc..76f2255 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -21,16 +21,19 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # import asyncio +import base64 import collections import fcntl import grp +import itertools import os import re import string import subprocess -import errno import tempfile +from enum import Enum +from typing import List, Optional, Dict, Tuple import qubes.devices import qubes.ext @@ -41,33 +44,197 @@ usb_connected_to_re = re.compile(br"^[a-zA-Z][a-zA-Z0-9_.-]*$") usb_device_hw_ident_re = re.compile(r'^[0-9a-f]{4}:[0-9a-f]{4} ') + class USBDevice(qubes.devices.DeviceInfo): # pylint: disable=too-few-public-methods def __init__(self, backend_domain, ident): - super(USBDevice, self).__init__(backend_domain, ident, None) + # super(USBDevice, self).__init__(backend_domain, ident, None) + super(USBDevice, self).__init__( + backend_domain=backend_domain, ident=ident, devclass="usb") self._qdb_ident = ident.replace('.', '_') self._qdb_path = '/qubes-usb-devices/' + self._qdb_ident - # lazy loading - self._description = None + @property + def vendor(self) -> str: + """ + Device vendor from local database `/usr/share/hwdata/usb.ids` + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._vendor is None: + result = self._load_desc_from_qubesdb()["vendor"] + else: + result = self._vendor + return result + + @property + def product(self) -> str: + """ + Device name from local database `/usr/share/hwdata/usb.ids` + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._product is None: + result = self._load_desc_from_qubesdb()["product"] + else: + result = self._product + return result + + @property + def manufacturer(self) -> str: + """ + The name of the manufacturer of the device introduced by device itself + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._manufacturer is None: + result = self._load_desc_from_qubesdb()["manufacturer"] + else: + result = self._manufacturer + return result + + @property + def name(self) -> str: + """ + The name of the device it introduced itself with (could be empty string) + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._name is None: + result = self._load_desc_from_qubesdb()["name"] + else: + result = self._name + return result + + @property + def serial(self) -> str: + """ + The serial number of the device it introduced itself with. + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._serial is None: + result = self._load_desc_from_qubesdb()["serial"] + else: + result = self._serial + return result + + @property + def interfaces(self) -> List[qubes.devices.DeviceInterface]: + """ + List of device interfaces. + + Every device should have at least one interface. + """ + if (len(self._interfaces) == 1 + and self._interfaces[0] == qubes.devices.DeviceInterface.Other): + result = self._load_interfaces_from_qubesdb() + else: + result = self._interfaces + return result @property - def description(self): - if self._description is None: - if not self.backend_domain.is_running(): - # don't cache this value - return "Unknown - domain not running" - untrusted_device_desc = self.backend_domain.untrusted_qdb.read( + def parent_device(self) -> Optional[qubes.devices.DeviceInfo]: + """ + The parent device if any. + + USB device has no parents. + """ + return None + + # @property + # def port_id(self) -> str: + # """ + # Which port the device is connected to. + # """ + # return self.ident.split("-")[1] + + def _load_interfaces_from_qubesdb(self) \ + -> List[qubes.devices.DeviceInterface]: + result = [] + if not self.backend_domain.is_running(): + # don't cache this value + return result + untrusted_interfaces: bytes = ( + self.backend_domain.untrusted_qdb.read( + self._qdb_path + '/interfaces') + ) + if not untrusted_interfaces: + return result + self._interfaces = result = [ + qubes.devices.DeviceInterface.from_str( + self._sanitize(ifc, safe_chars=string.hexdigits) + ) + for ifc in untrusted_interfaces.split(b':') + if ifc + ] + return result + + def _load_desc_from_qubesdb(self) -> Dict[str, str]: + unknown = "unknown" + result = {"vendor": unknown, + "product": unknown, + "manufacturer": unknown, + "name": unknown, + "serial": unknown} + if not self.backend_domain.is_running(): + # don't cache this value + return result + untrusted_device_desc: bytes = ( + self.backend_domain.untrusted_qdb.read( self._qdb_path + '/desc') - if not untrusted_device_desc: - return 'Unknown' - self._description = self._sanitize_desc(untrusted_device_desc) - hw_ident_match = usb_device_hw_ident_re.match(self._description) - if hw_ident_match: - self._description = self._description[ - len(hw_ident_match.group(0)):] - return self._description + ) + if not untrusted_device_desc: + return result + try: + (untrusted_vendor_product, untrusted_manufacturer, + untrusted_name, untrusted_serial + ) = untrusted_device_desc.split(b' ') + untrusted_vendor, untrusted_product = ( + untrusted_vendor_product.split(b':')) + except ValueError: + # desc doesn't contain correctly formatted data, + # but it is not empty. We cannot parse it, + # but we can still put it to the `serial` just to provide + # some information to the user. + untrusted_vendor, untrusted_product, untrusted_manufacturer = ( + unknown.encode(), unknown.encode(), unknown.encode()) + untrusted_name = untrusted_device_desc.replace(b' ', b'_') + vendor, product = self._get_vendor_and_product_names( + self._sanitize(untrusted_vendor), + self._sanitize(untrusted_product), + ) + self._desc_vendor = result["vendor"] = vendor + self._desc_product = result["product"] = product + self._desc_manufacturer = result["manufacturer"] = ( + self._sanitize(untrusted_manufacturer)) + self._desc_name = result["name"] = ( + self._sanitize(untrusted_name)) + return result + + @staticmethod + def _sanitize( + untrusted_device_desc: bytes, + safe_chars: str = + string.ascii_letters + string.digits + string.punctuation + ' ' + ) -> str: + # b'USB\\x202.0\\x20Camera' -> 'USB 2.0 Camera' + untrusted_device_desc = untrusted_device_desc.decode( + 'unicode_escape', errors='ignore') + return ''.join( + c if c in set(safe_chars) else '_' for c in untrusted_device_desc + ) @property def frontend_domain(self): @@ -96,15 +263,61 @@ def frontend_domain(self): return connected_to @staticmethod - def _sanitize_desc(untrusted_device_desc): - untrusted_device_desc = untrusted_device_desc.decode('ascii', - errors='ignore') - safe_set = set(string.ascii_letters + string.digits + - string.punctuation + ' ') - return ''.join( - c if c in safe_set else '_' for c in untrusted_device_desc - ) + def _get_vendor_and_product_names( + vendor_id: str, product_id: str + ) -> Tuple[str, str]: + """ + Return tuple of vendor's and product's names for the ids. + + If the id is not known return ("unknown", "unknown"). + """ + return (USBDevice._load_usb_known_devices() + .get(vendor_id, dict()) + .get(product_id, ("unknown", "unknown")) + ) + + @staticmethod + def _load_usb_known_devices() -> Dict[str, Dict[str, Tuple[str, str]]]: + """ + List of known device vendors, devices and interfaces. + + result[vendor_id][device_id] = (vendor_name, product_name) + """ + # Syntax: + # vendor vendor_name <-- 2 spaces between + # device device_name <-- single tab + # interface interface_name <-- two tabs + # ... + # C class class_name + # subclass subclass_name <-- single tab + # prog-if prog-if_name <-- two tabs + result = {} + with open('/usr/share/hwdata/usb.ids', + encoding='utf-8', errors='ignore') as usb_ids: + for line in usb_ids.readlines(): + line = line.rstrip() + if line.startswith('#'): + # skip comments + continue + elif not line: + # skip empty lines + continue + elif line.startswith('\t\t'): + # skip interfaces + continue + elif line.startswith('C '): + # description of classes starts here, we can finish + break + elif line.startswith('\t'): + # save vendor, device pair + device_id, _, device_name = line[1:].split(' ', 2) + result[vendor_id][device_id] = vendor_name, device_name + else: + # new vendor + vendor_id, _, vendor_name = line[:].split(' ', 2) + result[vendor_id] = {} + return result class USBProxyNotInstalled(qubes.exc.QubesException): pass @@ -170,7 +383,7 @@ class USBDeviceExtension(qubes.ext.Extension): def __init__(self): super(USBDeviceExtension, self).__init__() - #include dom0 devices in listing only when usb-proxy is really + # include dom0 devices in listing only when usb-proxy is really # installed there self.usb_proxy_installed_in_dom0 = os.path.exists( '/etc/qubes-rpc/qubes.USB') @@ -178,7 +391,7 @@ def __init__(self): @qubes.ext.handler('domain-init', 'domain-load') def on_domain_init_load(self, vm, event): - '''Initialize watching for changes''' + """Initialize watching for changes""" # pylint: disable=unused-argument,no-self-use vm.watch_qdb_path('/qubes-usb-devices') if event == 'domain-load': @@ -200,7 +413,7 @@ async def _attach_and_notify(self, vm, device, options): @qubes.ext.handler('domain-qdb-change:/qubes-usb-devices') def on_qdb_change(self, vm, event, path): - '''A change in QubesDB means a change in device list''' + """A change in QubesDB means a change in device list""" # pylint: disable=unused-argument,no-self-use vm.fire_event('device-list-change:usb') current_devices = dict((dev.ident, dev.frontend_domain) From ab088925553c8f065df200bc229f52e5099e46fd Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Thu, 28 Dec 2023 21:00:56 +0100 Subject: [PATCH 02/23] q-dev: events device-added:usb device-removed:usb --- qubesusbproxy/core3ext.py | 45 +++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 76f2255..7bd0d6b 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -30,6 +30,7 @@ import re import string import subprocess +import sys import tempfile from enum import Enum @@ -48,7 +49,6 @@ class USBDevice(qubes.devices.DeviceInfo): # pylint: disable=too-few-public-methods def __init__(self, backend_domain, ident): - # super(USBDevice, self).__init__(backend_domain, ident, None) super(USBDevice, self).__init__( backend_domain=backend_domain, ident=ident, devclass="usb") @@ -137,8 +137,7 @@ def interfaces(self) -> List[qubes.devices.DeviceInterface]: Every device should have at least one interface. """ - if (len(self._interfaces) == 1 - and self._interfaces[0] == qubes.devices.DeviceInterface.Other): + if self._interfaces is None: result = self._load_interfaces_from_qubesdb() else: result = self._interfaces @@ -162,7 +161,7 @@ def parent_device(self) -> Optional[qubes.devices.DeviceInfo]: def _load_interfaces_from_qubesdb(self) \ -> List[qubes.devices.DeviceInterface]: - result = [] + result = [qubes.devices.DeviceInterface.Other] if not self.backend_domain.is_running(): # don't cache this value return result @@ -319,6 +318,7 @@ def _load_usb_known_devices() -> Dict[str, Dict[str, Tuple[str, str]]]: return result + class USBProxyNotInstalled(qubes.exc.QubesException): pass @@ -397,9 +397,12 @@ def on_domain_init_load(self, vm, event): if event == 'domain-load': # avoid building a cache on domain-init, as it isn't fully set yet, # and definitely isn't running yet - current_devices = dict((dev.ident, dev.frontend_domain) - for dev in self.on_device_list_usb(vm, None)) + current_devices = { + dev.ident: dev.frontend_domain + for dev in self.on_device_list_usb(vm, None) + } self.devices_cache[vm.name] = current_devices + # TODO: fire device-added else: self.devices_cache[vm.name] = {} @@ -427,26 +430,32 @@ def on_qdb_change(self, vm, event, path): connected_devices = dict() disconnected_devices = dict() devices_cache_for_vm = self.devices_cache[vm.name] - for dev, connected_to in current_devices.items(): - if dev not in devices_cache_for_vm: - new_devices.add(dev) - elif devices_cache_for_vm[dev] != current_devices[dev]: - if devices_cache_for_vm[dev] is not None: - disconnected_devices[dev] = devices_cache_for_vm[dev] - if current_devices[dev] is not None: - connected_devices[dev] = current_devices[dev] + for dev_id, connected_to in current_devices.items(): + if dev_id not in devices_cache_for_vm: + new_devices.add(dev_id) + device = USBDevice(vm, dev_id) + vm.fire_event('device-added:usb', device=device) + elif devices_cache_for_vm[dev_id] != current_devices[dev_id]: + if devices_cache_for_vm[dev_id] is not None: + disconnected_devices[dev_id] = devices_cache_for_vm[dev_id] + if current_devices[dev_id] is not None: + connected_devices[dev_id] = current_devices[dev_id] + for dev_id, connected_to in devices_cache_for_vm.items(): + if dev_id not in current_devices: + device = USBDevice(vm, dev_id) + vm.fire_event('device-removed:usb', device=device) self.devices_cache[vm.name] = current_devices # send events about devices detached/attached outside by themselves # (like device pulled out or manual qubes.USB qrexec call) for dev_ident, front_vm in disconnected_devices.items(): - dev = USBDevice(vm, dev_ident) + dev_id = USBDevice(vm, dev_ident) asyncio.ensure_future(front_vm.fire_event_async('device-detach:usb', - device=dev)) + device=dev_id)) for dev_ident, front_vm in connected_devices.items(): - dev = USBDevice(vm, dev_ident) + dev_id = USBDevice(vm, dev_ident) asyncio.ensure_future(front_vm.fire_event_async('device-attach:usb', - device=dev, + device=dev_id, options={})) for front_vm in vm.app.domains: if not front_vm.is_running(): From 30fa0061397d1755c0266040491d891e185ff603 Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Sat, 30 Dec 2023 03:03:14 +0100 Subject: [PATCH 03/23] q-dev: DeviceInterface --- qubesusbproxy/core3ext.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 7bd0d6b..1693f9e 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -161,7 +161,7 @@ def parent_device(self) -> Optional[qubes.devices.DeviceInfo]: def _load_interfaces_from_qubesdb(self) \ -> List[qubes.devices.DeviceInterface]: - result = [qubes.devices.DeviceInterface.Other] + result = [qubes.devices.DeviceInterface.unknown()] if not self.backend_domain.is_running(): # don't cache this value return result @@ -172,8 +172,8 @@ def _load_interfaces_from_qubesdb(self) \ if not untrusted_interfaces: return result self._interfaces = result = [ - qubes.devices.DeviceInterface.from_str( - self._sanitize(ifc, safe_chars=string.hexdigits) + qubes.devices.DeviceInterface( + self._sanitize(ifc, safe_chars=string.hexdigits), devclass="usb" ) for ifc in untrusted_interfaces.split(b':') if ifc @@ -214,11 +214,11 @@ def _load_desc_from_qubesdb(self) -> Dict[str, str]: self._sanitize(untrusted_vendor), self._sanitize(untrusted_product), ) - self._desc_vendor = result["vendor"] = vendor - self._desc_product = result["product"] = product - self._desc_manufacturer = result["manufacturer"] = ( + self._vendor = result["vendor"] = vendor + self._product = result["product"] = product + self._manufacturer = result["manufacturer"] = ( self._sanitize(untrusted_manufacturer)) - self._desc_name = result["name"] = ( + self._name = result["name"] = ( self._sanitize(untrusted_name)) return result @@ -292,7 +292,7 @@ def _load_usb_known_devices() -> Dict[str, Dict[str, Tuple[str, str]]]: # prog-if prog-if_name <-- two tabs result = {} with open('/usr/share/hwdata/usb.ids', - encoding='utf-8', errors='ignore') as usb_ids: + encoding='utf-8', errors='ignore') as usb_ids: # TODO debian etc. for line in usb_ids.readlines(): line = line.rstrip() if line.startswith('#'): @@ -402,7 +402,6 @@ def on_domain_init_load(self, vm, event): for dev in self.on_device_list_usb(vm, None) } self.devices_cache[vm.name] = current_devices - # TODO: fire device-added else: self.devices_cache[vm.name] = {} @@ -416,7 +415,7 @@ async def _attach_and_notify(self, vm, device, options): @qubes.ext.handler('domain-qdb-change:/qubes-usb-devices') def on_qdb_change(self, vm, event, path): - """A change in QubesDB means a change in device list""" + """A change in QubesDB means a change in device list.""" # pylint: disable=unused-argument,no-self-use vm.fire_event('device-list-change:usb') current_devices = dict((dev.ident, dev.frontend_domain) From b8513dc9ea2344fcb847fb9f8037c154681a12f1 Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Mon, 22 Jan 2024 11:13:16 +0100 Subject: [PATCH 04/23] q-dev: assignments -> get_assigned_devices --- qubesusbproxy/core3ext.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 1693f9e..f74bf93 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -459,8 +459,7 @@ def on_qdb_change(self, vm, event, path): for front_vm in vm.app.domains: if not front_vm.is_running(): continue - for assignment in front_vm.devices['usb'].assignments( - persistent=True): + for assignment in front_vm.devices['usb'].get_assigned_devices(): if assignment.backend_domain == vm and \ assignment.ident in new_devices: asyncio.ensure_future(self._attach_and_notify( @@ -610,7 +609,7 @@ async def on_device_detach_usb(self, vm, event, device): @qubes.ext.handler('domain-start') async def on_domain_start(self, vm, _event, **_kwargs): # pylint: disable=unused-argument - for assignment in vm.devices['usb'].assignments(persistent=True): + for assignment in vm.devices['usb'].get_assigned_devices(): device = assignment.device await self.on_device_attach_usb(vm, '', device, options={}) From 1f3aa0c942a6c28acfc11036f87a9bdf735bbc94 Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Tue, 23 Jan 2024 01:56:01 +0100 Subject: [PATCH 05/23] q-dev: fire device-attach on domain start fix device-list-change events --- qubesusbproxy/core3ext.py | 89 ++++++++++++++++++++++++--------------- 1 file changed, 54 insertions(+), 35 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index f74bf93..dd02505 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -421,47 +421,66 @@ def on_qdb_change(self, vm, event, path): current_devices = dict((dev.ident, dev.frontend_domain) for dev in self.on_device_list_usb(vm, None)) + # send events about devices detached/attached outside by themselves + # (like device pulled out or manual qubes.USB qrexec call) # compare cached devices and current devices, collect: - # - newly appeared devices - # - devices disconnected from a vm - # - devices connected to a vm - new_devices = set() - connected_devices = dict() - disconnected_devices = dict() - devices_cache_for_vm = self.devices_cache[vm.name] - for dev_id, connected_to in current_devices.items(): - if dev_id not in devices_cache_for_vm: - new_devices.add(dev_id) - device = USBDevice(vm, dev_id) - vm.fire_event('device-added:usb', device=device) - elif devices_cache_for_vm[dev_id] != current_devices[dev_id]: - if devices_cache_for_vm[dev_id] is not None: - disconnected_devices[dev_id] = devices_cache_for_vm[dev_id] - if current_devices[dev_id] is not None: - connected_devices[dev_id] = current_devices[dev_id] - for dev_id, connected_to in devices_cache_for_vm.items(): + # - newly appeared devices (ident) + # - devices attached from a vm to frontend vm (ident: frontend_vm) + # - devices detached from frontend vm (ident: frontend_vm) + # - disappeared devices, e.g. plugged out (ident) + added = set() + attached = dict() + detached = dict() + removed = set() + cache = self.devices_cache[vm.name] + for dev_id, front_vm in current_devices.items(): + if dev_id not in cache: + added.add(dev_id) + if front_vm is not None: + attached[dev_id] = front_vm + elif cache[dev_id] != front_vm: + cached_front = cache[dev_id] + if front_vm is None: + detached[dev_id] = cached_front + elif cached_front is None: + attached[dev_id] = front_vm + else: + # front changed from one to another, so we signal it as: + # detach from first one and attach to the second one. + detached[dev_id] = cached_front + attached[dev_id] = front_vm + + for dev_id, cached_front in cache.items(): if dev_id not in current_devices: - device = USBDevice(vm, dev_id) - vm.fire_event('device-removed:usb', device=device) + removed.add(dev_id) + if cached_front is not None: + detached[dev_id] = cached_front + + for dev_id, front_vm in detached.items(): + dev = USBDevice(vm, dev_id) + asyncio.ensure_future(front_vm.fire_event_async( + 'device-detach:usb', device=dev)) + for dev_id in removed: + device = USBDevice(vm, dev_id) + vm.fire_event('device-removed:usb', device=device) + for dev_id in added: + device = USBDevice(vm, dev_id) + vm.fire_event('device-added:usb', device=device) + for dev_ident, front_vm in attached.items(): + dev = USBDevice(vm, dev_ident) + asyncio.ensure_future(front_vm.fire_event_async( + 'device-attach:usb', device=dev, options={})) self.devices_cache[vm.name] = current_devices - # send events about devices detached/attached outside by themselves - # (like device pulled out or manual qubes.USB qrexec call) - for dev_ident, front_vm in disconnected_devices.items(): - dev_id = USBDevice(vm, dev_ident) - asyncio.ensure_future(front_vm.fire_event_async('device-detach:usb', - device=dev_id)) - for dev_ident, front_vm in connected_devices.items(): - dev_id = USBDevice(vm, dev_ident) - asyncio.ensure_future(front_vm.fire_event_async('device-attach:usb', - device=dev_id, - options={})) + for front_vm in vm.app.domains: if not front_vm.is_running(): continue for assignment in front_vm.devices['usb'].get_assigned_devices(): - if assignment.backend_domain == vm and \ - assignment.ident in new_devices: + if (assignment.backend_domain == vm + and assignment.ident in added + and assignment.ident not in attached + ): asyncio.ensure_future(self._attach_and_notify( front_vm, assignment.device, assignment.options)) @@ -610,8 +629,8 @@ async def on_device_detach_usb(self, vm, event, device): async def on_domain_start(self, vm, _event, **_kwargs): # pylint: disable=unused-argument for assignment in vm.devices['usb'].get_assigned_devices(): - device = assignment.device - await self.on_device_attach_usb(vm, '', device, options={}) + asyncio.ensure_future(self._attach_and_notify( + vm, assignment.device, assignment.options)) @qubes.ext.handler('domain-shutdown') async def on_domain_shutdown(self, vm, _event, **_kwargs): From ece1fd0ca53b05bd05bbd5b9a156ce6ff1787bd1 Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Tue, 30 Jan 2024 02:44:55 +0100 Subject: [PATCH 06/23] q-dev: usb device full identity --- qubesusbproxy/core3ext.py | 73 ++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 28 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index dd02505..17e273a 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -5,6 +5,8 @@ # # Copyright (C) 2016 Marek Marczykowski-Górecki # +# Copyright (C) 2024 Piotr Bartman-Szwarc +# # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,13 +21,11 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# + import asyncio -import base64 import collections import fcntl import grp -import itertools import os import re import string @@ -33,7 +33,6 @@ import sys import tempfile -from enum import Enum from typing import List, Optional, Dict, Tuple import qubes.devices @@ -152,13 +151,6 @@ def parent_device(self) -> Optional[qubes.devices.DeviceInfo]: """ return None - # @property - # def port_id(self) -> str: - # """ - # Which port the device is connected to. - # """ - # return self.ident.split("-")[1] - def _load_interfaces_from_qubesdb(self) \ -> List[qubes.devices.DeviceInterface]: result = [qubes.devices.DeviceInterface.unknown()] @@ -256,8 +248,8 @@ def frontend_domain(self): untrusted_connected_to] except KeyError: self.backend_domain.log.warning( - 'Device {} has invalid VM name in connected-to ' - 'property: '.format(self.ident, untrusted_connected_to)) + f'Device {self.ident} has invalid VM name in connected-to ' + f'property: {untrusted_connected_to}') return None return connected_to @@ -327,6 +319,10 @@ class QubesUSBException(qubes.exc.QubesException): pass +class UnrecognizedDevice(QubesUSBException): + pass + + def modify_qrexec_policy(service, line, add): """ Add/remove *line* to qrexec policy of a *service*. @@ -407,11 +403,13 @@ def on_domain_init_load(self, vm, event): async def _attach_and_notify(self, vm, device, options): # bypass DeviceCollection logic preventing double attach - await self.on_device_attach_usb(vm, - 'device-pre-attach:usb', device, options) - await vm.fire_event_async('device-attach:usb', - device=device, - options=options) + try: + await self.on_device_attach_usb( + vm, 'device-pre-attach:usb', device, options) + except UnrecognizedDevice: + return + await vm.fire_event_async( + 'device-attach:usb', device=device, options=options) @qubes.ext.handler('domain-qdb-change:/qubes-usb-devices') def on_qdb_change(self, vm, event, path): @@ -469,7 +467,9 @@ def on_qdb_change(self, vm, event, path): for dev_ident, front_vm in attached.items(): dev = USBDevice(vm, dev_ident) asyncio.ensure_future(front_vm.fire_event_async( - 'device-attach:usb', device=dev, options={})) + 'device-attach:usb', device=dev, + options={'identity': dev.full_identity}) + ) self.devices_cache[vm.name] = current_devices @@ -536,29 +536,46 @@ def on_device_list_attached(self, vm, event, **kwargs): for dev in self.get_all_devices(vm.app): if dev.frontend_domain == vm: - yield (dev, {}) + yield (dev, {'identity': dev.full_identity}) @qubes.ext.handler('device-pre-attach:usb') async def on_device_attach_usb(self, vm, event, device, options): # pylint: disable=unused-argument + + if options: + if list(options.keys()) != ['identity']: + raise qubes.exc.QubesException( + 'USB device attach do not support user options') + identity = options['identity'] + if device.full_identity != identity: + print(f"Unrecognized identity, skipping attachment of {device}", + file=sys.stderr) + raise UnrecognizedDevice( + "Device presented identity " + f"{device.full_identity} " + f"does not match expected {identity}" + ) + if not vm.is_running() or vm.qid == 0: + # print(f"Qube is not running, skipping attachment of {device}", + # file=sys.stderr) return if not isinstance(device, USBDevice): + # print("The device is not recognized as usb device, " + # f"skipping attachment of {device}", + # file=sys.stderr) return - if options: - raise qubes.exc.QubesException( - 'USB device attach do not support options') - if device.frontend_domain: raise qubes.devices.DeviceAlreadyAttached( - 'Device {!s} already attached to {!s}'.format(device, - device.frontend_domain) + 'Device {!s} already attached to {!s}'.format( + device, device.frontend_domain) ) - stubdom_qrexec = (vm.virt_mode == 'hvm' and \ - vm.features.check_with_template('stubdom-qrexec', False)) + stubdom_qrexec = ( + vm.virt_mode == 'hvm' + and vm.features.check_with_template('stubdom-qrexec', False)) name = vm.name + '-dm' if stubdom_qrexec else vm.name From daac099b3cb707d74821fd43286985401e0b79ab Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Thu, 1 Feb 2024 19:20:19 +0100 Subject: [PATCH 07/23] q-dev: frontend_device -> attachment --- qubesusbproxy/core3ext.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 17e273a..3a2fb57 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -44,6 +44,7 @@ usb_connected_to_re = re.compile(br"^[a-zA-Z][a-zA-Z0-9_.-]*$") usb_device_hw_ident_re = re.compile(r'^[0-9a-f]{4}:[0-9a-f]{4} ') +HWDATA_PATH = '/usr/share/hwdata' class USBDevice(qubes.devices.DeviceInfo): # pylint: disable=too-few-public-methods @@ -228,7 +229,7 @@ def _sanitize( ) @property - def frontend_domain(self): + def attachment(self): if not self.backend_domain.is_running(): return None untrusted_connected_to = self.backend_domain.untrusted_qdb.read( @@ -283,8 +284,8 @@ def _load_usb_known_devices() -> Dict[str, Dict[str, Tuple[str, str]]]: # subclass subclass_name <-- single tab # prog-if prog-if_name <-- two tabs result = {} - with open('/usr/share/hwdata/usb.ids', - encoding='utf-8', errors='ignore') as usb_ids: # TODO debian etc. + with open(HWDATA_PATH + '/usb.ids', + encoding='utf-8', errors='ignore') as usb_ids: for line in usb_ids.readlines(): line = line.rstrip() if line.startswith('#'): @@ -394,7 +395,7 @@ def on_domain_init_load(self, vm, event): # avoid building a cache on domain-init, as it isn't fully set yet, # and definitely isn't running yet current_devices = { - dev.ident: dev.frontend_domain + dev.ident: dev.attachment for dev in self.on_device_list_usb(vm, None) } self.devices_cache[vm.name] = current_devices @@ -416,7 +417,7 @@ def on_qdb_change(self, vm, event, path): """A change in QubesDB means a change in device list.""" # pylint: disable=unused-argument,no-self-use vm.fire_event('device-list-change:usb') - current_devices = dict((dev.ident, dev.frontend_domain) + current_devices = dict((dev.ident, dev.attachment) for dev in self.on_device_list_usb(vm, None)) # send events about devices detached/attached outside by themselves @@ -535,7 +536,7 @@ def on_device_list_attached(self, vm, event, **kwargs): return for dev in self.get_all_devices(vm.app): - if dev.frontend_domain == vm: + if dev.attachment == vm: yield (dev, {'identity': dev.full_identity}) @qubes.ext.handler('device-pre-attach:usb') @@ -567,10 +568,10 @@ async def on_device_attach_usb(self, vm, event, device, options): # file=sys.stderr) return - if device.frontend_domain: + if device.attachment: raise qubes.devices.DeviceAlreadyAttached( 'Device {!s} already attached to {!s}'.format( - device, device.frontend_domain) + device, device.attachment) ) stubdom_qrexec = ( @@ -622,7 +623,7 @@ async def on_device_detach_usb(self, vm, event, device): if not isinstance(device, USBDevice): return - connected_to = device.frontend_domain + connected_to = device.attachment # detect race conditions; there is still race here, but much smaller if connected_to is None or connected_to.qid != vm.qid: raise QubesUSBException( From d261a0fd043bfa2d6eea9bb86c15b2171a7d54b9 Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Sat, 17 Feb 2024 08:22:25 +0100 Subject: [PATCH 08/23] q-dev: implementation of self_identity --- qubesusbproxy/core3ext.py | 67 ++++++++++++++++++++++++++------------- 1 file changed, 45 insertions(+), 22 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 3a2fb57..da08996 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -54,6 +54,8 @@ def __init__(self, backend_domain, ident): self._qdb_ident = ident.replace('.', '_') self._qdb_path = '/qubes-usb-devices/' + self._qdb_ident + self._vendor_id = None + self._product_id = None @property def vendor(self) -> str: @@ -176,7 +178,9 @@ def _load_interfaces_from_qubesdb(self) \ def _load_desc_from_qubesdb(self) -> Dict[str, str]: unknown = "unknown" result = {"vendor": unknown, + "vendor ID": "0000", "product": unknown, + "product ID": "0000", "manufacturer": unknown, "name": unknown, "serial": unknown} @@ -190,29 +194,34 @@ def _load_desc_from_qubesdb(self) -> Dict[str, str]: if not untrusted_device_desc: return result try: - (untrusted_vendor_product, untrusted_manufacturer, + (untrusted_vend_prod_id, untrusted_manufacturer, untrusted_name, untrusted_serial ) = untrusted_device_desc.split(b' ') - untrusted_vendor, untrusted_product = ( - untrusted_vendor_product.split(b':')) + untrusted_vendor_id, untrusted_product_id = ( + untrusted_vend_prod_id.split(b':')) except ValueError: # desc doesn't contain correctly formatted data, # but it is not empty. We cannot parse it, - # but we can still put it to the `serial` just to provide + # but we can still put it to the `name` just to provide # some information to the user. - untrusted_vendor, untrusted_product, untrusted_manufacturer = ( - unknown.encode(), unknown.encode(), unknown.encode()) + untrusted_vendor_id, untrusted_product_id = ("0000", "0000") + (untrusted_manufacturer, untrusted_serial) = ( + unknown.encode() for _ in range(2)) untrusted_name = untrusted_device_desc.replace(b' ', b'_') + + # Data successfully loaded, cache these values + self._vendor_id = result["vendor ID"] = self._sanitize( + untrusted_vendor_id) + self._product_id = result["product ID"] = self._sanitize( + untrusted_product_id) vendor, product = self._get_vendor_and_product_names( - self._sanitize(untrusted_vendor), - self._sanitize(untrusted_product), - ) + self._vendor_id, self._product_id) self._vendor = result["vendor"] = vendor self._product = result["product"] = product self._manufacturer = result["manufacturer"] = ( self._sanitize(untrusted_manufacturer)) - self._name = result["name"] = ( - self._sanitize(untrusted_name)) + self._name = result["name"] = (self._sanitize(untrusted_name)) + self._name = result["serial"] = (self._sanitize(untrusted_serial)) return result @staticmethod @@ -221,7 +230,7 @@ def _sanitize( safe_chars: str = string.ascii_letters + string.digits + string.punctuation + ' ' ) -> str: - # b'USB\\x202.0\\x20Camera' -> 'USB 2.0 Camera' + # rb'USB\x202.0\x20Camera' -> 'USB 2.0 Camera' untrusted_device_desc = untrusted_device_desc.decode( 'unicode_escape', errors='ignore') return ''.join( @@ -254,6 +263,24 @@ def attachment(self): return None return connected_to + @property + def self_identity(self) -> str: + """ + Get identification of device not related to port. + """ + if self._vendor_id is None: + vendor_id = self._load_desc_from_qubesdb()["vendor ID"] + else: + vendor_id = self._vendor_id + if self._product_id is None: + product_id = self._load_desc_from_qubesdb()["product ID"] + else: + product_id = self._product_id + interfaces = ''.join(repr(ifc) for ifc in self.interfaces) + serial = self.serial if self.serial != "unknown" else "" + return \ + f'{vendor_id}:{product_id}:{serial}:{interfaces}' + @staticmethod def _get_vendor_and_product_names( vendor_id: str, product_id: str @@ -320,10 +347,6 @@ class QubesUSBException(qubes.exc.QubesException): pass -class UnrecognizedDevice(QubesUSBException): - pass - - def modify_qrexec_policy(service, line, add): """ Add/remove *line* to qrexec policy of a *service*. @@ -407,7 +430,7 @@ async def _attach_and_notify(self, vm, device, options): try: await self.on_device_attach_usb( vm, 'device-pre-attach:usb', device, options) - except UnrecognizedDevice: + except qubes.devices.UnrecognizedDevice: return await vm.fire_event_async( 'device-attach:usb', device=device, options=options) @@ -469,7 +492,7 @@ def on_qdb_change(self, vm, event, path): dev = USBDevice(vm, dev_ident) asyncio.ensure_future(front_vm.fire_event_async( 'device-attach:usb', device=dev, - options={'identity': dev.full_identity}) + options={'identity': dev.self_identity}) ) self.devices_cache[vm.name] = current_devices @@ -537,7 +560,7 @@ def on_device_list_attached(self, vm, event, **kwargs): for dev in self.get_all_devices(vm.app): if dev.attachment == vm: - yield (dev, {'identity': dev.full_identity}) + yield (dev, {'identity': dev.self_identity}) @qubes.ext.handler('device-pre-attach:usb') async def on_device_attach_usb(self, vm, event, device, options): @@ -548,12 +571,12 @@ async def on_device_attach_usb(self, vm, event, device, options): raise qubes.exc.QubesException( 'USB device attach do not support user options') identity = options['identity'] - if device.full_identity != identity: + if device.self_identity != identity: print(f"Unrecognized identity, skipping attachment of {device}", file=sys.stderr) - raise UnrecognizedDevice( + raise qubes.devices.UnrecognizedDevice( "Device presented identity " - f"{device.full_identity} " + f"{device.self_identity} " f"does not match expected {identity}" ) From 785bd62aa39955c026278508723a43f928173e7f Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Tue, 20 Feb 2024 12:52:58 +0100 Subject: [PATCH 09/23] q-dev: port assignment --- qubesusbproxy/core3ext.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index da08996..4fd6370 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -571,7 +571,7 @@ async def on_device_attach_usb(self, vm, event, device, options): raise qubes.exc.QubesException( 'USB device attach do not support user options') identity = options['identity'] - if device.self_identity != identity: + if identity != 'any' and device.self_identity != identity: print(f"Unrecognized identity, skipping attachment of {device}", file=sys.stderr) raise qubes.devices.UnrecognizedDevice( From e7acdf4225f767eb99c7be0262e5f2d065b85c91 Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Tue, 19 Mar 2024 15:14:04 +0100 Subject: [PATCH 10/23] q-dev: use ext/utils --- qubesusbproxy/core3ext.py | 87 ++++++--------------------------------- 1 file changed, 12 insertions(+), 75 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 4fd6370..d58e7a5 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -38,6 +38,7 @@ import qubes.devices import qubes.ext import qubes.vm.adminvm +from qubes.ext import utils usb_device_re = re.compile(r"^[0-9]+-[0-9]+(_[0-9]+)*$") # should match valid VM name @@ -46,6 +47,7 @@ HWDATA_PATH = '/usr/share/hwdata' + class USBDevice(qubes.devices.DeviceInfo): # pylint: disable=too-few-public-methods def __init__(self, backend_domain, ident): @@ -148,9 +150,9 @@ def interfaces(self) -> List[qubes.devices.DeviceInterface]: @property def parent_device(self) -> Optional[qubes.devices.DeviceInfo]: """ - The parent device if any. + The parent device, if any. - USB device has no parents. + A USB device has no parents. """ return None @@ -266,7 +268,7 @@ def attachment(self): @property def self_identity(self) -> str: """ - Get identification of device not related to port. + Get identification of a device not related to port. """ if self._vendor_id is None: vendor_id = self._load_desc_from_qubesdb()["vendor ID"] @@ -288,7 +290,7 @@ def _get_vendor_and_product_names( """ Return tuple of vendor's and product's names for the ids. - If the id is not known return ("unknown", "unknown"). + If the id is not known, return ("unknown", "unknown"). """ return (USBDevice._load_usb_known_devices() .get(vendor_id, dict()) @@ -425,7 +427,7 @@ def on_domain_init_load(self, vm, event): else: self.devices_cache[vm.name] = {} - async def _attach_and_notify(self, vm, device, options): + async def attach_and_notify(self, vm, device, options): # bypass DeviceCollection logic preventing double attach try: await self.on_device_attach_usb( @@ -437,76 +439,11 @@ async def _attach_and_notify(self, vm, device, options): @qubes.ext.handler('domain-qdb-change:/qubes-usb-devices') def on_qdb_change(self, vm, event, path): - """A change in QubesDB means a change in device list.""" + """A change in QubesDB means a change in a device list.""" # pylint: disable=unused-argument,no-self-use - vm.fire_event('device-list-change:usb') current_devices = dict((dev.ident, dev.attachment) - for dev in self.on_device_list_usb(vm, None)) - - # send events about devices detached/attached outside by themselves - # (like device pulled out or manual qubes.USB qrexec call) - # compare cached devices and current devices, collect: - # - newly appeared devices (ident) - # - devices attached from a vm to frontend vm (ident: frontend_vm) - # - devices detached from frontend vm (ident: frontend_vm) - # - disappeared devices, e.g. plugged out (ident) - added = set() - attached = dict() - detached = dict() - removed = set() - cache = self.devices_cache[vm.name] - for dev_id, front_vm in current_devices.items(): - if dev_id not in cache: - added.add(dev_id) - if front_vm is not None: - attached[dev_id] = front_vm - elif cache[dev_id] != front_vm: - cached_front = cache[dev_id] - if front_vm is None: - detached[dev_id] = cached_front - elif cached_front is None: - attached[dev_id] = front_vm - else: - # front changed from one to another, so we signal it as: - # detach from first one and attach to the second one. - detached[dev_id] = cached_front - attached[dev_id] = front_vm - - for dev_id, cached_front in cache.items(): - if dev_id not in current_devices: - removed.add(dev_id) - if cached_front is not None: - detached[dev_id] = cached_front - - for dev_id, front_vm in detached.items(): - dev = USBDevice(vm, dev_id) - asyncio.ensure_future(front_vm.fire_event_async( - 'device-detach:usb', device=dev)) - for dev_id in removed: - device = USBDevice(vm, dev_id) - vm.fire_event('device-removed:usb', device=device) - for dev_id in added: - device = USBDevice(vm, dev_id) - vm.fire_event('device-added:usb', device=device) - for dev_ident, front_vm in attached.items(): - dev = USBDevice(vm, dev_ident) - asyncio.ensure_future(front_vm.fire_event_async( - 'device-attach:usb', device=dev, - options={'identity': dev.self_identity}) - ) - - self.devices_cache[vm.name] = current_devices - - for front_vm in vm.app.domains: - if not front_vm.is_running(): - continue - for assignment in front_vm.devices['usb'].get_assigned_devices(): - if (assignment.backend_domain == vm - and assignment.ident in added - and assignment.ident not in attached - ): - asyncio.ensure_future(self._attach_and_notify( - front_vm, assignment.device, assignment.options)) + for dev in self.on_device_list_usb(vm, None)) + utils.device_list_change(self, current_devices, vm, path, USBDevice) @qubes.ext.handler('device-list:usb') def on_device_list_usb(self, vm, event): @@ -522,7 +459,7 @@ def on_device_list_usb(self, vm, event): untrusted_dev_list = vm.untrusted_qdb.list('/qubes-usb-devices/') if not untrusted_dev_list: return - # just get list of devices, not its every property + # just get a list of devices, not its every property untrusted_dev_list = \ set(path.split('/')[2] for path in untrusted_dev_list) for untrusted_qdb_ident in untrusted_dev_list: @@ -670,7 +607,7 @@ async def on_device_detach_usb(self, vm, event, device): async def on_domain_start(self, vm, _event, **_kwargs): # pylint: disable=unused-argument for assignment in vm.devices['usb'].get_assigned_devices(): - asyncio.ensure_future(self._attach_and_notify( + asyncio.ensure_future(self.attach_and_notify( vm, assignment.device, assignment.options)) @qubes.ext.handler('domain-shutdown') From f9f84bec165ed7ebfb27b6c6406b3441f910018d Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Wed, 20 Mar 2024 12:37:45 +0100 Subject: [PATCH 11/23] q-dev: device protocol --- qubesusbproxy/core3ext.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index d58e7a5..41e677d 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -35,6 +35,7 @@ import tempfile from typing import List, Optional, Dict, Tuple +import qubes.device_protocol import qubes.devices import qubes.ext import qubes.vm.adminvm @@ -48,7 +49,7 @@ HWDATA_PATH = '/usr/share/hwdata' -class USBDevice(qubes.devices.DeviceInfo): +class USBDevice(qubes.device_protocol.DeviceInfo): # pylint: disable=too-few-public-methods def __init__(self, backend_domain, ident): super(USBDevice, self).__init__( @@ -135,7 +136,7 @@ def serial(self) -> str: return result @property - def interfaces(self) -> List[qubes.devices.DeviceInterface]: + def interfaces(self) -> List[qubes.device_protocol.DeviceInterface]: """ List of device interfaces. @@ -148,7 +149,7 @@ def interfaces(self) -> List[qubes.devices.DeviceInterface]: return result @property - def parent_device(self) -> Optional[qubes.devices.DeviceInfo]: + def parent_device(self) -> Optional[qubes.device_protocol.DeviceInfo]: """ The parent device, if any. @@ -157,8 +158,8 @@ def parent_device(self) -> Optional[qubes.devices.DeviceInfo]: return None def _load_interfaces_from_qubesdb(self) \ - -> List[qubes.devices.DeviceInterface]: - result = [qubes.devices.DeviceInterface.unknown()] + -> List[qubes.device_protocol.DeviceInterface]: + result = [qubes.device_protocol.DeviceInterface.unknown()] if not self.backend_domain.is_running(): # don't cache this value return result @@ -169,7 +170,7 @@ def _load_interfaces_from_qubesdb(self) \ if not untrusted_interfaces: return result self._interfaces = result = [ - qubes.devices.DeviceInterface( + qubes.device_protocol.DeviceInterface( self._sanitize(ifc, safe_chars=string.hexdigits), devclass="usb" ) for ifc in untrusted_interfaces.split(b':') From 0be9633cd1a153ab1bd05068962ef2f8437fca1f Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Wed, 24 Apr 2024 15:27:08 +0200 Subject: [PATCH 12/23] q-dev: minor optimization --- qubesusbproxy/core3ext.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 41e677d..01aa910 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -236,8 +236,9 @@ def _sanitize( # rb'USB\x202.0\x20Camera' -> 'USB 2.0 Camera' untrusted_device_desc = untrusted_device_desc.decode( 'unicode_escape', errors='ignore') + safe_chars_set = set(safe_chars) return ''.join( - c if c in set(safe_chars) else '_' for c in untrusted_device_desc + c if c in safe_chars_set else '_' for c in untrusted_device_desc ) @property From fca47771629ea4dcdf675d8cc4d663f375e10e12 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 14 May 2024 21:18:26 +0200 Subject: [PATCH 13/23] q-dev: update integ tests --- qubesusbproxy/tests.py | 207 +++++++++++++++++++++++------------------ 1 file changed, 114 insertions(+), 93 deletions(-) diff --git a/qubesusbproxy/tests.py b/qubesusbproxy/tests.py index f9ccb12..a268219 100644 --- a/qubesusbproxy/tests.py +++ b/qubesusbproxy/tests.py @@ -31,12 +31,14 @@ import qubesusbproxy.core3ext import qubes.devices import asyncio + core3 = True except ImportError: pass try: import qubes.qubesutils + core2 = True except ImportError: pass @@ -49,7 +51,6 @@ except FileNotFoundError: pass - GADGET_PREREQ = '&&'.join([ "modprobe dummy_hcd", "modprobe usb_f_mass_storage", @@ -76,15 +77,16 @@ "sleep 2; udevadm settle", ]) + def create_usb_gadget(vm): vm.start() p = vm.run(GADGET_PREREQ, user="root", - passio_popen=True, passio_stderr=True) - (_, stderr) = p.communicate() + passio_popen=True, passio_stderr=True) + (_, _stderr) = p.communicate() if p.returncode != 0: raise unittest.SkipTest("missing USB Gadget subsystem") p = vm.run(GADGET_PREPARE, user="root", - passio_popen=True, passio_stderr=True) + passio_popen=True, passio_stderr=True) (_, stderr) = p.communicate() if p.returncode != 0: raise RuntimeError("Failed to setup USB gadget: " + stderr.decode()) @@ -97,14 +99,16 @@ def create_usb_gadget(vm): raise RuntimeError("Failed to get dummy device ID") return stdout + def remove_usb_gadget(vm): assert vm.is_running() retcode = vm.run("echo > /sys/kernel/config/usb_gadget/test_g1/UDC", - user="root", wait=True) + user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to disable USB gadget") + def recreate_usb_gadget(vm): '''Re-create the gadget previously created with *create_usb_gadget*, then removed with *remove_usb_gadget*. @@ -115,11 +119,10 @@ def recreate_usb_gadget(vm): "mkdir test_g1; cd test_g1", "echo dummy_udc.0 > UDC", "sleep 2; udevadm settle", - ]) - + ]) p = vm.run(reconnect, user="root", - passio_popen=True, passio_stderr=True) + passio_popen=True, passio_stderr=True) (_, stderr) = p.communicate() if p.returncode != 0: raise RuntimeError("Failed to re-create USB gadget: " + stderr.decode()) @@ -139,53 +142,64 @@ def test_000_attach_detach(self): self.frontend.start() # TODO: check qubesdb entries self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format(self.backend.name, self.dummy_usb_dev)), 0, - "qubes.USBAttach call failed") + user='root', + input="{} {}\n".format( + self.backend.name, + self.dummy_usb_dev)), 0, + "qubes.USBAttach call failed") self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") # TODO: check qubesdb entries self.assertEqual(self.frontend.run_service('qubes.USBDetach', - user='root', - input="{} {}\n".format(self.backend.name, self.dummy_usb_dev)), 0, - "qubes.USBDetach call failed") + user='root', + input="{} {}\n".format( + self.backend.name, + self.dummy_usb_dev)), 0, + "qubes.USBDetach call failed") self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device disconnection failed") + "Device disconnection failed") def test_010_attach_detach_vid_pid(self): self.frontend.start() # TODO: check qubesdb entries self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format(self.backend.name, "0x1234.0x1234")), 0, - "qubes.USBAttach call failed") + user='root', + input="{} {}\n".format( + self.backend.name, + "0x1234.0x1234")), 0, + "qubes.USBAttach call failed") self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 0, - "Device connection failed") + "Device connection failed") # TODO: check qubesdb entries self.assertEqual(self.frontend.run_service('qubes.USBDetach', - user='root', - input="{} {}\n".format(self.backend.name, "0x1234.0x1234")), 0, - "qubes.USBDetach call failed") + user='root', + input="{} {}\n".format( + self.backend.name, + "0x1234.0x1234")), 0, + "qubes.USBDetach call failed") self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device disconnection failed") + "Device disconnection failed") def test_020_detach_on_remove(self): self.frontend.start() self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format(self.backend.name, self.dummy_usb_dev)), 0, - "qubes.USBAttach call failed") + user='root', + input="{} {}\n".format( + self.backend.name, + self.dummy_usb_dev)), 0, + "qubes.USBAttach call failed") self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") remove_usb_gadget(self.backend) # FIXME: usb-export script may update qubesdb/disconnect with 1sec delay time.sleep(2) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device not cleaned up") + "Device not cleaned up") # TODO: check for kernel errors? + class TC_10_USBProxy_core2(qubes.tests.extra.ExtraTestCase): def setUp(self): super(TC_10_USBProxy_core2, self).setUp() @@ -208,29 +222,30 @@ def test_020_attach(self): usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: qubes.qubesutils.usb_attach(self.qc, - self.frontend, usb_list[self.usbdev_name]) + self.frontend, + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) self.assertEquals(usb_list[self.usbdev_name]['connected-to'], - self.frontend) + self.frontend) def test_030_detach(self): self.frontend.start() usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) qubes.qubesutils.usb_detach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) # FIXME: usb-export script may update qubesdb with 1sec delay time.sleep(2) @@ -238,15 +253,15 @@ def test_030_detach(self): self.assertIsNone(usb_list[self.usbdev_name]['connected-to']) self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + wait=True), 0, + "Device disconnection failed") def test_040_detach_all(self): self.frontend.start() usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -258,8 +273,8 @@ def test_040_detach_all(self): self.assertIsNone(usb_list[self.usbdev_name]['connected-to']) self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + wait=True), 0, + "Device disconnection failed") def test_050_list_attached(self): """ Attached device should not be listed as further attachable """ @@ -267,24 +282,25 @@ def test_050_list_attached(self): usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) usb_list_front_pre = qubes.qubesutils.usb_list(self.qc, - vm=self.frontend) + vm=self.frontend) try: qubes.qubesutils.usb_attach(self.qc, - self.frontend, usb_list[self.usbdev_name]) + self.frontend, + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) self.assertEquals(usb_list[self.usbdev_name]['connected-to'], - self.frontend) + self.frontend) usb_list_front_post = qubes.qubesutils.usb_list(self.qc, - vm=self.frontend) + vm=self.frontend) self.assertEquals(usb_list_front_pre, usb_list_front_post) @@ -293,7 +309,7 @@ def test_060_auto_detach_on_remove(self): usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -304,33 +320,33 @@ def test_060_auto_detach_on_remove(self): usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) self.assertNotIn(self.usbdev_name, usb_list) self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + wait=True), 0, + "Device disconnection failed") def test_070_attach_not_installed_front(self): self.frontend.start() # simulate package not installed retcode = self.frontend.run("rm -f /etc/qubes-rpc/qubes.USBAttach", - user="root", wait=True) + user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) with self.assertRaises(qubes.qubesutils.USBProxyNotInstalled): qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) @unittest.expectedFailure def test_075_attach_not_installed_back(self): self.frontend.start() # simulate package not installed retcode = self.backend.run("rm -f /etc/qubes-rpc/qubes.USB", - user="root", wait=True) + user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + usb_list[self.usbdev_name]) except qubes.qubesutils.USBProxyNotInstalled: pass except Exception as e: @@ -362,24 +378,25 @@ def test_000_list(self): usb_list = self.backend.devices['usb'] self.assertIn(self.usbdev_name, [str(dev) for dev in usb_list]) - def test_010_attach_offline(self): + def test_010_assign(self): usb_dev = self.backend.devices['usb'][self.usbdev_ident] ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident, - persistent=True) + attach_automatically=True, + required=True) self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) - self.assertIsNone(usb_dev.frontend_domain) + self.frontend.devices['usb'].assign(ass)) + self.assertIsNone(usb_dev.attachment) try: self.frontend.start() except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") - self.assertEquals(usb_dev.frontend_domain, - self.frontend) + self.assertEquals(usb_dev.attachment, + self.frontend) def test_020_attach(self): self.frontend.start() @@ -392,11 +409,11 @@ def test_020_attach(self): self.skipTest(str(e)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") - self.assertEquals(usb_dev.frontend_domain, - self.frontend) + self.assertEquals(usb_dev.attachment, + self.frontend) def test_030_detach(self): self.frontend.start() @@ -413,22 +430,23 @@ def test_030_detach(self): # FIXME: usb-export script may update qubesdb with 1sec delay self.loop.run_until_complete(asyncio.sleep(2)) - self.assertIsNone(usb_dev.frontend_domain) + self.assertIsNone(usb_dev.attachment) self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + wait=True), 0, + "Device disconnection failed") - def test_040_detach_offline(self): + def test_040_unassign(self): usb_dev = self.backend.devices['usb'][self.usbdev_ident] ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident, - persistent=True) + attach_automatically=True, + required=True) self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) - self.assertIsNone(usb_dev.frontend_domain) + self.frontend.devices['usb'].assign(ass)) + self.assertIsNone(usb_dev.attachment) self.loop.run_until_complete( - self.frontend.devices['usb'].detach(ass)) - self.assertIsNone(usb_dev.frontend_domain) + self.frontend.devices['usb'].unassign(ass)) + self.assertIsNone(usb_dev.attachment) def test_050_list_attached(self): """ Attached device should not be listed as further attachable """ @@ -445,11 +463,11 @@ def test_050_list_attached(self): self.skipTest(str(e)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") - self.assertEquals(usb_list[self.usbdev_ident].frontend_domain, - self.frontend) + self.assertEquals(usb_list[self.usbdev_ident].attachment, + self.frontend) usb_list_front_post = list(self.frontend.devices['usb']) @@ -471,17 +489,18 @@ def test_060_auto_detach_on_remove(self): self.assertNotIn(self.usbdev_name, [str(dev) for dev in usb_list]) self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + wait=True), 0, + "Device disconnection failed") def test_061_auto_attach_on_reconnect(self): self.frontend.start() usb_list = self.backend.devices['usb'] ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident, - persistent=True) + attach_automatically=True, + required=True) try: self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices['usb'].assign(ass)) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -501,14 +520,14 @@ def test_061_auto_attach_on_reconnect(self): self.assertGreater(timeout, 0, 'timeout on device create') self.loop.run_until_complete(asyncio.sleep(5)) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device reconnection failed") + wait=True), 0, + "Device reconnection failed") def test_070_attach_not_installed_front(self): self.frontend.start() # simulate package not installed retcode = self.frontend.run("rm -f /etc/qubes-rpc/qubes.USBAttach", - user="root", wait=True) + user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) @@ -521,7 +540,7 @@ def test_075_attach_not_installed_back(self): self.frontend.start() # simulate package not installed retcode = self.backend.run("rm -f /etc/qubes-rpc/qubes.USB", - user="root", wait=True) + user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) @@ -536,8 +555,10 @@ def test_075_attach_not_installed_back(self): def test_080_attach_existing_policy(self): self.frontend.start() # this override policy file, but during normal execution it shouldn't - # exist, so should be ok, especially on testing system - with open('/etc/qubes-rpc/policy/qubes.USB+{}'.format(self.usbdev_ident), 'w+') as policy_file: + # exist, so should be ok, especially on a testing system + with open( + '/etc/qubes-rpc/policy/qubes.USB+{}'.format(self.usbdev_ident), + 'w+') as policy_file: policy_file.write('# empty policy\n') ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) self.loop.run_until_complete( @@ -548,7 +569,6 @@ def test_090_attach_stubdom(self): self.frontend.virt_mode = 'hvm' self.frontend.features['stubdom-qrexec'] = True self.frontend.start() - usb_dev = self.backend.devices['usb'][self.usbdev_ident] ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( @@ -558,8 +578,9 @@ def test_090_attach_stubdom(self): time.sleep(5) self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + wait=True), 0, + "Device connection failed") + def list_tests(): tests = [TC_00_USBProxy] From 90945610525612db0aedd2b5ea43029306af691a Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Sat, 18 May 2024 18:54:51 +0200 Subject: [PATCH 14/23] q-dev: fix attaching usb devices on domain start --- qubesusbproxy/core3ext.py | 4 ++-- qubesusbproxy/tests.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 01aa910..9aea5f3 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -609,8 +609,8 @@ async def on_device_detach_usb(self, vm, event, device): async def on_domain_start(self, vm, _event, **_kwargs): # pylint: disable=unused-argument for assignment in vm.devices['usb'].get_assigned_devices(): - asyncio.ensure_future(self.attach_and_notify( - vm, assignment.device, assignment.options)) + await self.attach_and_notify( + vm, assignment.device, assignment.options) @qubes.ext.handler('domain-shutdown') async def on_domain_shutdown(self, vm, _event, **_kwargs): diff --git a/qubesusbproxy/tests.py b/qubesusbproxy/tests.py index a268219..30b1d98 100644 --- a/qubesusbproxy/tests.py +++ b/qubesusbproxy/tests.py @@ -395,7 +395,7 @@ def test_010_assign(self): wait=True), 0, "Device connection failed") - self.assertEquals(usb_dev.attachment, + self.assertEqual(usb_dev.attachment, self.frontend) def test_020_attach(self): From f6ad1c3373301e73cfb95ccf71529b15ed7f378d Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Wed, 29 May 2024 17:21:23 +0200 Subject: [PATCH 15/23] q-dev: small fix for unknown devices --- qubesusbproxy/core3ext.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 9aea5f3..f522a8b 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -207,7 +207,7 @@ def _load_desc_from_qubesdb(self) -> Dict[str, str]: # but it is not empty. We cannot parse it, # but we can still put it to the `name` just to provide # some information to the user. - untrusted_vendor_id, untrusted_product_id = ("0000", "0000") + untrusted_vendor_id, untrusted_product_id = (b"0000", b"0000") (untrusted_manufacturer, untrusted_serial) = ( unknown.encode() for _ in range(2)) untrusted_name = untrusted_device_desc.replace(b' ', b'_') From 5cb2a8f315926e05cdd8a5a9d80cb74e4936ba98 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Sat, 1 Jun 2024 12:07:39 +0200 Subject: [PATCH 16/23] q-dev: keep partial backward compatibility --- qubesusbproxy/core3ext.py | 82 ++++++++++++++++++++++-------- qubesusbproxy/utils.py | 102 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+), 20 deletions(-) create mode 100644 qubesusbproxy/utils.py diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index f522a8b..11b2166 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -22,7 +22,6 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import asyncio import collections import fcntl import grp @@ -35,11 +34,52 @@ import tempfile from typing import List, Optional, Dict, Tuple -import qubes.device_protocol +try: + from qubes.device_protocol import DeviceInfo + from qubes.device_protocol import DeviceInterface + from qubes.ext import utils + from qubes.devices import UnrecognizedDevice +except ImportError: + # This extension supports both the legacy and new device API. + # In the case of the legacy backend, functionality is limited. + from qubes.devices import DeviceInfo as LegacyDeviceInfo + import qubesusbproxy.utils + + class DescriptionOverrider: + @property + def description(self): + return self.vendor + " " + self.product + + class DeviceInfo(DescriptionOverrider, LegacyDeviceInfo): + def __init__(self, *args, **kwargs): + # not supported options in legacy code + del kwargs['devclass'] + kwargs['description'] = 'foo' + self.safe_chars = self.safe_chars.replace(' ', '') + super().__init__(*args, **kwargs) + + # needed but not in legacy DeviceInfo + self._vendor = None + self._product = None + self._manufacturer = None + self._name = None + self._serial = None + # `_load_interfaces_from_qubesdb` will never be called + self._interfaces = "?******" + + @property + def fronted_domain(self): + return self.attachment + + class DeviceInterface: + pass + + class UnrecognizedDevice(ValueError): + pass + import qubes.devices import qubes.ext import qubes.vm.adminvm -from qubes.ext import utils usb_device_re = re.compile(r"^[0-9]+-[0-9]+(_[0-9]+)*$") # should match valid VM name @@ -49,9 +89,12 @@ HWDATA_PATH = '/usr/share/hwdata' -class USBDevice(qubes.device_protocol.DeviceInfo): +class USBDevice(DeviceInfo): # pylint: disable=too-few-public-methods def __init__(self, backend_domain, ident): + # the superclass can restrict the allowed characters + self.safe_chars = (string.ascii_letters + string.digits + + string.punctuation + ' ') super(USBDevice, self).__init__( backend_domain=backend_domain, ident=ident, devclass="usb") @@ -136,7 +179,7 @@ def serial(self) -> str: return result @property - def interfaces(self) -> List[qubes.device_protocol.DeviceInterface]: + def interfaces(self) -> List[DeviceInterface]: """ List of device interfaces. @@ -149,7 +192,7 @@ def interfaces(self) -> List[qubes.device_protocol.DeviceInterface]: return result @property - def parent_device(self) -> Optional[qubes.device_protocol.DeviceInfo]: + def parent_device(self) -> Optional[DeviceInfo]: """ The parent device, if any. @@ -157,9 +200,8 @@ def parent_device(self) -> Optional[qubes.device_protocol.DeviceInfo]: """ return None - def _load_interfaces_from_qubesdb(self) \ - -> List[qubes.device_protocol.DeviceInterface]: - result = [qubes.device_protocol.DeviceInterface.unknown()] + def _load_interfaces_from_qubesdb(self) -> List[DeviceInterface]: + result = [DeviceInterface.unknown()] if not self.backend_domain.is_running(): # don't cache this value return result @@ -170,7 +212,7 @@ def _load_interfaces_from_qubesdb(self) \ if not untrusted_interfaces: return result self._interfaces = result = [ - qubes.device_protocol.DeviceInterface( + DeviceInterface( self._sanitize(ifc, safe_chars=string.hexdigits), devclass="usb" ) for ifc in untrusted_interfaces.split(b':') @@ -219,21 +261,21 @@ def _load_desc_from_qubesdb(self) -> Dict[str, str]: untrusted_product_id) vendor, product = self._get_vendor_and_product_names( self._vendor_id, self._product_id) - self._vendor = result["vendor"] = vendor - self._product = result["product"] = product + self._vendor = result["vendor"] = self._sanitize(vendor.encode()) + self._product = result["product"] = self._sanitize(product.encode()) self._manufacturer = result["manufacturer"] = ( self._sanitize(untrusted_manufacturer)) - self._name = result["name"] = (self._sanitize(untrusted_name)) - self._name = result["serial"] = (self._sanitize(untrusted_serial)) + self._name = result["name"] = self._sanitize(untrusted_name) + self._name = result["serial"] = self._sanitize(untrusted_serial) return result - @staticmethod def _sanitize( - untrusted_device_desc: bytes, - safe_chars: str = - string.ascii_letters + string.digits + string.punctuation + ' ' + self, untrusted_device_desc: bytes, + safe_chars: Optional[str] = None ) -> str: # rb'USB\x202.0\x20Camera' -> 'USB 2.0 Camera' + if safe_chars is None: + safe_chars = self.safe_chars untrusted_device_desc = untrusted_device_desc.decode( 'unicode_escape', errors='ignore') safe_chars_set = set(safe_chars) @@ -434,7 +476,7 @@ async def attach_and_notify(self, vm, device, options): try: await self.on_device_attach_usb( vm, 'device-pre-attach:usb', device, options) - except qubes.devices.UnrecognizedDevice: + except UnrecognizedDevice: return await vm.fire_event_async( 'device-attach:usb', device=device, options=options) @@ -513,7 +555,7 @@ async def on_device_attach_usb(self, vm, event, device, options): if identity != 'any' and device.self_identity != identity: print(f"Unrecognized identity, skipping attachment of {device}", file=sys.stderr) - raise qubes.devices.UnrecognizedDevice( + raise UnrecognizedDevice( "Device presented identity " f"{device.self_identity} " f"does not match expected {identity}" diff --git a/qubesusbproxy/utils.py b/qubesusbproxy/utils.py new file mode 100644 index 0000000..939e609 --- /dev/null +++ b/qubesusbproxy/utils.py @@ -0,0 +1,102 @@ +# coding=utf-8 +# +# The Qubes OS Project, https://www.qubes-os.org +# +# Copyright (C) 2023 Piotr Bartman-Szwarc +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +import asyncio + +import qubes + + +def device_list_change( + ext: qubes.ext.Extension, current_devices, + vm, path, device_class +): + devclass = device_class.__name__[:-len('Device')].lower() + + if path is not None: + vm.fire_event(f'device-list-change:{devclass}') + + added, attached, detached, removed = ( + compare_device_cache(vm, ext.devices_cache, current_devices)) + + # send events about devices detached/attached outside by themselves + for dev_id, front_vm in detached.items(): + dev = device_class(vm, dev_id) + asyncio.ensure_future(front_vm.fire_event_async( + f'device-detach:{devclass}', device=dev)) + for dev_id in removed: + device = device_class(vm, dev_id) + vm.fire_event(f'device-removed:{devclass}', device=device) + for dev_id in added: + device = device_class(vm, dev_id) + vm.fire_event(f'device-added:{devclass}', device=device) + for dev_ident, front_vm in attached.items(): + dev = device_class(vm, dev_ident) + # options are unknown, device already attached + asyncio.ensure_future(front_vm.fire_event_async( + f'device-attach:{devclass}', device=dev, options={})) + + ext.devices_cache[vm.name] = current_devices + + for front_vm in vm.app.domains: + if not front_vm.is_running(): + continue + for assignment in front_vm.devices[devclass].get_assigned_devices(): + if (assignment.backend_domain == vm + and assignment.ident in added + and assignment.ident not in attached + ): + asyncio.ensure_future(ext.attach_and_notify( + front_vm, assignment.device, assignment.options)) + + +def compare_device_cache(vm, devices_cache, current_devices): + # compare cached devices and current devices, collect: + # - newly appeared devices (ident) + # - devices attached from a vm to frontend vm (ident: frontend_vm) + # - devices detached from frontend vm (ident: frontend_vm) + # - disappeared devices, e.g., plugged out (ident) + added = set() + attached = {} + detached = {} + removed = set() + cache = devices_cache[vm.name] + for dev_id, front_vm in current_devices.items(): + if dev_id not in cache: + added.add(dev_id) + if front_vm is not None: + attached[dev_id] = front_vm + elif cache[dev_id] != front_vm: + cached_front = cache[dev_id] + if front_vm is None: + detached[dev_id] = cached_front + elif cached_front is None: + attached[dev_id] = front_vm + else: + # a front changed from one to another, so we signal it as: + # detach from the first one and attach to the second one. + detached[dev_id] = cached_front + attached[dev_id] = front_vm + + for dev_id, cached_front in cache.items(): + if dev_id not in current_devices: + removed.add(dev_id) + if cached_front is not None: + detached[dev_id] = cached_front + return added, attached, detached, removed From cad623aa1957a412615147deb2ec57b5ca659416 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Wed, 5 Jun 2024 09:38:18 +0200 Subject: [PATCH 17/23] q-dev: keep partial backward compatibility for auto-attachment --- qubesusbproxy/core3ext.py | 11 +++++++++-- qubesusbproxy/utils.py | 3 ++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 11b2166..1c29c45 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -39,11 +39,14 @@ from qubes.device_protocol import DeviceInterface from qubes.ext import utils from qubes.devices import UnrecognizedDevice + + def get_assigned_devices(devices): + yield from devices.get_assigned_devices() except ImportError: # This extension supports both the legacy and new device API. # In the case of the legacy backend, functionality is limited. from qubes.devices import DeviceInfo as LegacyDeviceInfo - import qubesusbproxy.utils + from qubesusbproxy import utils class DescriptionOverrider: @property @@ -77,6 +80,10 @@ class DeviceInterface: class UnrecognizedDevice(ValueError): pass + def get_assigned_devices(devices): + yield from devices.assignments(persistent=True) + + import qubes.devices import qubes.ext import qubes.vm.adminvm @@ -650,7 +657,7 @@ async def on_device_detach_usb(self, vm, event, device): @qubes.ext.handler('domain-start') async def on_domain_start(self, vm, _event, **_kwargs): # pylint: disable=unused-argument - for assignment in vm.devices['usb'].get_assigned_devices(): + for assignment in get_assigned_devices(vm.devices['usb']): await self.attach_and_notify( vm, assignment.device, assignment.options) diff --git a/qubesusbproxy/utils.py b/qubesusbproxy/utils.py index 939e609..da49b4f 100644 --- a/qubesusbproxy/utils.py +++ b/qubesusbproxy/utils.py @@ -57,7 +57,8 @@ def device_list_change( for front_vm in vm.app.domains: if not front_vm.is_running(): continue - for assignment in front_vm.devices[devclass].get_assigned_devices(): + for assignment in front_vm.devices[devclass].assignments( + persistent=True): if (assignment.backend_domain == vm and assignment.ident in added and assignment.ident not in attached From 227ec98664c6541cf7d49c006ea77220d71498b5 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Fri, 7 Jun 2024 17:03:00 +0200 Subject: [PATCH 18/23] q-dev: keep partial backward compatibility in tests --- qubesusbproxy/core3ext.py | 8 ++-- qubesusbproxy/tests.py | 78 +++++++++++++++++++++++---------------- 2 files changed, 50 insertions(+), 36 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 1c29c45..bff803b 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -51,7 +51,7 @@ def get_assigned_devices(devices): class DescriptionOverrider: @property def description(self): - return self.vendor + " " + self.product + return self.name class DeviceInfo(DescriptionOverrider, LegacyDeviceInfo): def __init__(self, *args, **kwargs): @@ -70,9 +70,9 @@ def __init__(self, *args, **kwargs): # `_load_interfaces_from_qubesdb` will never be called self._interfaces = "?******" - @property - def fronted_domain(self): - return self.attachment + @property + def frontend_domain(self): + return self.attachment class DeviceInterface: pass diff --git a/qubesusbproxy/tests.py b/qubesusbproxy/tests.py index 30b1d98..79edf44 100644 --- a/qubesusbproxy/tests.py +++ b/qubesusbproxy/tests.py @@ -27,11 +27,35 @@ core2 = False core3 = False +legacy = False try: import qubesusbproxy.core3ext - import qubes.devices import asyncio + try: + from qubes.device_protocol import DeviceAssignment + + def assign(test, collection, assignment): + test.loop.run_until_complete(collection.assign(assignment)) + + def unassign(test, collection, assignment): + test.loop.run_until_complete(collection.unassign(assignment)) + + AUTO_ATTACH = {"attach_automatically": True, "required": True} + except ImportError: + # This extension supports both the legacy and new device API. + # In the case of the legacy backend, functionality is limited. + from qubes.devices import DeviceAssignment + + def assign(test, collection, assignment): + test.loop.run_until_complete(collection.attach(assignment)) + + def unassign(test, collection, assignment): + test.loop.run_until_complete(collection.detach(assignment)) + + legacy = True + AUTO_ATTACH = {"persistent": True} + core3 = True except ImportError: pass @@ -380,11 +404,9 @@ def test_000_list(self): def test_010_assign(self): usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident, - attach_automatically=True, - required=True) - self.loop.run_until_complete( - self.frontend.devices['usb'].assign(ass)) + ass = DeviceAssignment( + self.backend, self.usbdev_ident, **AUTO_ATTACH) + assign(self, self.frontend.devices['usb'], ass) self.assertIsNone(usb_dev.attachment) try: self.frontend.start() @@ -395,13 +417,12 @@ def test_010_assign(self): wait=True), 0, "Device connection failed") - self.assertEqual(usb_dev.attachment, - self.frontend) + self.assertEqual(usb_dev.attachment, self.frontend) def test_020_attach(self): self.frontend.start() usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) @@ -412,13 +433,12 @@ def test_020_attach(self): wait=True), 0, "Device connection failed") - self.assertEquals(usb_dev.attachment, - self.frontend) + self.assertEquals(usb_dev.attachment, self.frontend) def test_030_detach(self): self.frontend.start() usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) @@ -438,14 +458,11 @@ def test_030_detach(self): def test_040_unassign(self): usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident, - attach_automatically=True, - required=True) - self.loop.run_until_complete( - self.frontend.devices['usb'].assign(ass)) + ass = DeviceAssignment( + self.backend, self.usbdev_ident, **AUTO_ATTACH) + assign(self, self.frontend.devices['usb'], ass) self.assertIsNone(usb_dev.attachment) - self.loop.run_until_complete( - self.frontend.devices['usb'].unassign(ass)) + unassign(self, self.frontend.devices['usb'], ass) self.assertIsNone(usb_dev.attachment) def test_050_list_attached(self): @@ -454,7 +471,7 @@ def test_050_list_attached(self): usb_list = self.backend.devices['usb'] usb_list_front_pre = list(self.frontend.devices['usb']) - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( @@ -466,8 +483,7 @@ def test_050_list_attached(self): wait=True), 0, "Device connection failed") - self.assertEquals(usb_list[self.usbdev_ident].attachment, - self.frontend) + self.assertEquals(usb_list[self.usbdev_ident].attachment, self.frontend) usb_list_front_post = list(self.frontend.devices['usb']) @@ -476,7 +492,7 @@ def test_050_list_attached(self): def test_060_auto_detach_on_remove(self): self.frontend.start() usb_list = self.backend.devices['usb'] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) @@ -495,12 +511,10 @@ def test_060_auto_detach_on_remove(self): def test_061_auto_attach_on_reconnect(self): self.frontend.start() usb_list = self.backend.devices['usb'] - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident, - attach_automatically=True, - required=True) + ass = DeviceAssignment( + self.backend, self.usbdev_ident, **AUTO_ATTACH) try: - self.loop.run_until_complete( - self.frontend.devices['usb'].assign(ass)) + assign(self, self.frontend.devices['usb'], ass) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -530,7 +544,7 @@ def test_070_attach_not_installed_front(self): user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) with self.assertRaises(qubesusbproxy.core3ext.USBProxyNotInstalled): self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) @@ -543,7 +557,7 @@ def test_075_attach_not_installed_back(self): user="root", wait=True) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: with self.assertRaises(qubesusbproxy.core3ext.USBProxyNotInstalled): self.loop.run_until_complete( @@ -560,7 +574,7 @@ def test_080_attach_existing_policy(self): '/etc/qubes-rpc/policy/qubes.USB+{}'.format(self.usbdev_ident), 'w+') as policy_file: policy_file.write('# empty policy\n') - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) @@ -569,7 +583,7 @@ def test_090_attach_stubdom(self): self.frontend.virt_mode = 'hvm' self.frontend.features['stubdom-qrexec'] = True self.frontend.start() - ass = qubes.devices.DeviceAssignment(self.backend, self.usbdev_ident) + ass = DeviceAssignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( self.frontend.devices['usb'].attach(ass)) From 7cbbb49d17ce6c77d1f4d440ff1205cdd2d00e1f Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 17 Jun 2024 08:29:11 +0200 Subject: [PATCH 19/23] q-dev: do not use unicode_escape --- qubesusbproxy/core3ext.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index bff803b..502cb94 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -283,12 +283,30 @@ def _sanitize( # rb'USB\x202.0\x20Camera' -> 'USB 2.0 Camera' if safe_chars is None: safe_chars = self.safe_chars - untrusted_device_desc = untrusted_device_desc.decode( - 'unicode_escape', errors='ignore') safe_chars_set = set(safe_chars) - return ''.join( - c if c in safe_chars_set else '_' for c in untrusted_device_desc - ) + + result = "" + i = 0 + while i < len(untrusted_device_desc): + c = chr(untrusted_device_desc[i]) + if c == '\\': + i += 1 + if i >= len(untrusted_device_desc): + break + c = chr(untrusted_device_desc[i]) + if c == 'x': + i += 2 + if i >= len(untrusted_device_desc): + break + hex_code = untrusted_device_desc[i - 1: i + 1] + c = chr(int(hex_code, base=16)) + + if c in safe_chars_set: + result += c + else: + result += '_' + i += 1 + return result @property def attachment(self): From 54fca9480c4e0606d70c5f657a710909f399e380 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 17 Jun 2024 09:09:37 +0200 Subject: [PATCH 20/23] q-dev: cleanup --- qubesusbproxy/core3ext.py | 1 - 1 file changed, 1 deletion(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 502cb94..3313db3 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -57,7 +57,6 @@ class DeviceInfo(DescriptionOverrider, LegacyDeviceInfo): def __init__(self, *args, **kwargs): # not supported options in legacy code del kwargs['devclass'] - kwargs['description'] = 'foo' self.safe_chars = self.safe_chars.replace(' ', '') super().__init__(*args, **kwargs) From 18abd266ee973d895393fa281ae227753c622522 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 17 Jun 2024 21:17:55 +0200 Subject: [PATCH 21/23] q-dev: handle invalid values --- qubesusbproxy/core3ext.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 3313db3..09e3ccd 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -298,7 +298,11 @@ def _sanitize( if i >= len(untrusted_device_desc): break hex_code = untrusted_device_desc[i - 1: i + 1] - c = chr(int(hex_code, base=16)) + try: + hex_value = int(hex_code, 16) + c = chr(hex_value) + except ValueError: + c = '_' if c in safe_chars_set: result += c From 29d32c557400d29c066fdbff305a2283d5b20729 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Tue, 18 Jun 2024 09:02:13 +0200 Subject: [PATCH 22/23] q-dev: handle invalid values --- qubesusbproxy/core3ext.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 09e3ccd..96eb5e6 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -299,6 +299,9 @@ def _sanitize( break hex_code = untrusted_device_desc[i - 1: i + 1] try: + for i in range(2): + if hex_code[i] not in b'0123456789abcdefABCDEF': + raise ValueError() hex_value = int(hex_code, 16) c = chr(hex_value) except ValueError: From f86dbf67683c349ead87feee0caaeb1f9b6c3850 Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Fri, 21 Jun 2024 11:13:07 +0200 Subject: [PATCH 23/23] q-dev: fix loop --- qubesusbproxy/core3ext.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 96eb5e6..799bff6 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -299,8 +299,8 @@ def _sanitize( break hex_code = untrusted_device_desc[i - 1: i + 1] try: - for i in range(2): - if hex_code[i] not in b'0123456789abcdefABCDEF': + for j in range(2): + if hex_code[j] not in b'0123456789abcdefABCDEF': raise ValueError() hex_value = int(hex_code, 16) c = chr(hex_value)