diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 3a2fb57..da08996 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -54,6 +54,8 @@ def __init__(self, backend_domain, ident): self._qdb_ident = ident.replace('.', '_') self._qdb_path = '/qubes-usb-devices/' + self._qdb_ident + self._vendor_id = None + self._product_id = None @property def vendor(self) -> str: @@ -176,7 +178,9 @@ def _load_interfaces_from_qubesdb(self) \ def _load_desc_from_qubesdb(self) -> Dict[str, str]: unknown = "unknown" result = {"vendor": unknown, + "vendor ID": "0000", "product": unknown, + "product ID": "0000", "manufacturer": unknown, "name": unknown, "serial": unknown} @@ -190,29 +194,34 @@ def _load_desc_from_qubesdb(self) -> Dict[str, str]: if not untrusted_device_desc: return result try: - (untrusted_vendor_product, untrusted_manufacturer, + (untrusted_vend_prod_id, untrusted_manufacturer, untrusted_name, untrusted_serial ) = untrusted_device_desc.split(b' ') - untrusted_vendor, untrusted_product = ( - untrusted_vendor_product.split(b':')) + untrusted_vendor_id, untrusted_product_id = ( + untrusted_vend_prod_id.split(b':')) except ValueError: # desc doesn't contain correctly formatted data, # but it is not empty. We cannot parse it, - # but we can still put it to the `serial` just to provide + # but we can still put it to the `name` just to provide # some information to the user. - untrusted_vendor, untrusted_product, untrusted_manufacturer = ( - unknown.encode(), unknown.encode(), unknown.encode()) + untrusted_vendor_id, untrusted_product_id = ("0000", "0000") + (untrusted_manufacturer, untrusted_serial) = ( + unknown.encode() for _ in range(2)) untrusted_name = untrusted_device_desc.replace(b' ', b'_') + + # Data successfully loaded, cache these values + self._vendor_id = result["vendor ID"] = self._sanitize( + untrusted_vendor_id) + self._product_id = result["product ID"] = self._sanitize( + untrusted_product_id) vendor, product = self._get_vendor_and_product_names( - self._sanitize(untrusted_vendor), - self._sanitize(untrusted_product), - ) + self._vendor_id, self._product_id) self._vendor = result["vendor"] = vendor self._product = result["product"] = product self._manufacturer = result["manufacturer"] = ( self._sanitize(untrusted_manufacturer)) - self._name = result["name"] = ( - self._sanitize(untrusted_name)) + self._name = result["name"] = (self._sanitize(untrusted_name)) + self._name = result["serial"] = (self._sanitize(untrusted_serial)) return result @staticmethod @@ -221,7 +230,7 @@ def _sanitize( safe_chars: str = string.ascii_letters + string.digits + string.punctuation + ' ' ) -> str: - # b'USB\\x202.0\\x20Camera' -> 'USB 2.0 Camera' + # rb'USB\x202.0\x20Camera' -> 'USB 2.0 Camera' untrusted_device_desc = untrusted_device_desc.decode( 'unicode_escape', errors='ignore') return ''.join( @@ -254,6 +263,24 @@ def attachment(self): return None return connected_to + @property + def self_identity(self) -> str: + """ + Get identification of device not related to port. + """ + if self._vendor_id is None: + vendor_id = self._load_desc_from_qubesdb()["vendor ID"] + else: + vendor_id = self._vendor_id + if self._product_id is None: + product_id = self._load_desc_from_qubesdb()["product ID"] + else: + product_id = self._product_id + interfaces = ''.join(repr(ifc) for ifc in self.interfaces) + serial = self.serial if self.serial != "unknown" else "" + return \ + f'{vendor_id}:{product_id}:{serial}:{interfaces}' + @staticmethod def _get_vendor_and_product_names( vendor_id: str, product_id: str @@ -320,10 +347,6 @@ class QubesUSBException(qubes.exc.QubesException): pass -class UnrecognizedDevice(QubesUSBException): - pass - - def modify_qrexec_policy(service, line, add): """ Add/remove *line* to qrexec policy of a *service*. @@ -407,7 +430,7 @@ async def _attach_and_notify(self, vm, device, options): try: await self.on_device_attach_usb( vm, 'device-pre-attach:usb', device, options) - except UnrecognizedDevice: + except qubes.devices.UnrecognizedDevice: return await vm.fire_event_async( 'device-attach:usb', device=device, options=options) @@ -469,7 +492,7 @@ def on_qdb_change(self, vm, event, path): dev = USBDevice(vm, dev_ident) asyncio.ensure_future(front_vm.fire_event_async( 'device-attach:usb', device=dev, - options={'identity': dev.full_identity}) + options={'identity': dev.self_identity}) ) self.devices_cache[vm.name] = current_devices @@ -537,7 +560,7 @@ def on_device_list_attached(self, vm, event, **kwargs): for dev in self.get_all_devices(vm.app): if dev.attachment == vm: - yield (dev, {'identity': dev.full_identity}) + yield (dev, {'identity': dev.self_identity}) @qubes.ext.handler('device-pre-attach:usb') async def on_device_attach_usb(self, vm, event, device, options): @@ -548,12 +571,12 @@ async def on_device_attach_usb(self, vm, event, device, options): raise qubes.exc.QubesException( 'USB device attach do not support user options') identity = options['identity'] - if device.full_identity != identity: + if device.self_identity != identity: print(f"Unrecognized identity, skipping attachment of {device}", file=sys.stderr) - raise UnrecognizedDevice( + raise qubes.devices.UnrecognizedDevice( "Device presented identity " - f"{device.full_identity} " + f"{device.self_identity} " f"does not match expected {identity}" )