From d261a0fd043bfa2d6eea9bb86c15b2171a7d54b9 Mon Sep 17 00:00:00 2001
From: Piotr Bartman <prbartman@invisiblethingslab.com>
Date: Sat, 17 Feb 2024 08:22:25 +0100
Subject: [PATCH] q-dev: implementation of self_identity

---
 qubesusbproxy/core3ext.py | 67 ++++++++++++++++++++++++++-------------
 1 file changed, 45 insertions(+), 22 deletions(-)

diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py
index 3a2fb57..da08996 100644
--- a/qubesusbproxy/core3ext.py
+++ b/qubesusbproxy/core3ext.py
@@ -54,6 +54,8 @@ def __init__(self, backend_domain, ident):
 
         self._qdb_ident = ident.replace('.', '_')
         self._qdb_path = '/qubes-usb-devices/' + self._qdb_ident
+        self._vendor_id = None
+        self._product_id = None
 
     @property
     def vendor(self) -> str:
@@ -176,7 +178,9 @@ def _load_interfaces_from_qubesdb(self) \
     def _load_desc_from_qubesdb(self) -> Dict[str, str]:
         unknown = "unknown"
         result = {"vendor": unknown,
+                  "vendor ID": "0000",
                   "product": unknown,
+                  "product ID": "0000",
                   "manufacturer": unknown,
                   "name": unknown,
                   "serial": unknown}
@@ -190,29 +194,34 @@ def _load_desc_from_qubesdb(self) -> Dict[str, str]:
         if not untrusted_device_desc:
             return result
         try:
-            (untrusted_vendor_product, untrusted_manufacturer,
+            (untrusted_vend_prod_id, untrusted_manufacturer,
              untrusted_name, untrusted_serial
              ) = untrusted_device_desc.split(b' ')
-            untrusted_vendor, untrusted_product = (
-                untrusted_vendor_product.split(b':'))
+            untrusted_vendor_id, untrusted_product_id = (
+                untrusted_vend_prod_id.split(b':'))
         except ValueError:
             # desc doesn't contain correctly formatted data,
             # but it is not empty. We cannot parse it,
-            # but we can still put it to the `serial` just to provide
+            # but we can still put it to the `name` just to provide
             # some information to the user.
-            untrusted_vendor, untrusted_product, untrusted_manufacturer = (
-                unknown.encode(), unknown.encode(), unknown.encode())
+            untrusted_vendor_id, untrusted_product_id = ("0000", "0000")
+            (untrusted_manufacturer, untrusted_serial) = (
+                unknown.encode() for _ in range(2))
             untrusted_name = untrusted_device_desc.replace(b' ', b'_')
+
+        # Data successfully loaded, cache these values
+        self._vendor_id = result["vendor ID"] = self._sanitize(
+            untrusted_vendor_id)
+        self._product_id = result["product ID"] = self._sanitize(
+            untrusted_product_id)
         vendor, product = self._get_vendor_and_product_names(
-            self._sanitize(untrusted_vendor),
-            self._sanitize(untrusted_product),
-        )
+            self._vendor_id, self._product_id)
         self._vendor = result["vendor"] = vendor
         self._product = result["product"] = product
         self._manufacturer = result["manufacturer"] = (
             self._sanitize(untrusted_manufacturer))
-        self._name = result["name"] = (
-            self._sanitize(untrusted_name))
+        self._name = result["name"] = (self._sanitize(untrusted_name))
+        self._name = result["serial"] = (self._sanitize(untrusted_serial))
         return result
 
     @staticmethod
@@ -221,7 +230,7 @@ def _sanitize(
             safe_chars: str =
             string.ascii_letters + string.digits + string.punctuation + ' '
     ) -> str:
-        # b'USB\\x202.0\\x20Camera' -> 'USB 2.0 Camera'
+        # rb'USB\x202.0\x20Camera' -> 'USB 2.0 Camera'
         untrusted_device_desc = untrusted_device_desc.decode(
             'unicode_escape', errors='ignore')
         return ''.join(
@@ -254,6 +263,24 @@ def attachment(self):
             return None
         return connected_to
 
+    @property
+    def self_identity(self) -> str:
+        """
+        Get identification of device not related to port.
+        """
+        if self._vendor_id is None:
+            vendor_id = self._load_desc_from_qubesdb()["vendor ID"]
+        else:
+            vendor_id = self._vendor_id
+        if self._product_id is None:
+            product_id = self._load_desc_from_qubesdb()["product ID"]
+        else:
+            product_id = self._product_id
+        interfaces = ''.join(repr(ifc) for ifc in self.interfaces)
+        serial = self.serial if self.serial != "unknown" else ""
+        return \
+            f'{vendor_id}:{product_id}:{serial}:{interfaces}'
+
     @staticmethod
     def _get_vendor_and_product_names(
             vendor_id: str, product_id: str
@@ -320,10 +347,6 @@ class QubesUSBException(qubes.exc.QubesException):
     pass
 
 
-class UnrecognizedDevice(QubesUSBException):
-    pass
-
-
 def modify_qrexec_policy(service, line, add):
     """
     Add/remove *line* to qrexec policy of a *service*.
@@ -407,7 +430,7 @@ async def _attach_and_notify(self, vm, device, options):
         try:
             await self.on_device_attach_usb(
                 vm, 'device-pre-attach:usb', device, options)
-        except UnrecognizedDevice:
+        except qubes.devices.UnrecognizedDevice:
             return
         await vm.fire_event_async(
             'device-attach:usb', device=device, options=options)
@@ -469,7 +492,7 @@ def on_qdb_change(self, vm, event, path):
             dev = USBDevice(vm, dev_ident)
             asyncio.ensure_future(front_vm.fire_event_async(
                 'device-attach:usb', device=dev,
-                options={'identity': dev.full_identity})
+                options={'identity': dev.self_identity})
             )
 
         self.devices_cache[vm.name] = current_devices
@@ -537,7 +560,7 @@ def on_device_list_attached(self, vm, event, **kwargs):
 
         for dev in self.get_all_devices(vm.app):
             if dev.attachment == vm:
-                yield (dev, {'identity': dev.full_identity})
+                yield (dev, {'identity': dev.self_identity})
 
     @qubes.ext.handler('device-pre-attach:usb')
     async def on_device_attach_usb(self, vm, event, device, options):
@@ -548,12 +571,12 @@ async def on_device_attach_usb(self, vm, event, device, options):
                 raise qubes.exc.QubesException(
                     'USB device attach do not support user options')
             identity = options['identity']
-            if device.full_identity != identity:
+            if device.self_identity != identity:
                 print(f"Unrecognized identity, skipping attachment of {device}",
                       file=sys.stderr)
-                raise UnrecognizedDevice(
+                raise qubes.devices.UnrecognizedDevice(
                     "Device presented identity "
-                    f"{device.full_identity} "
+                    f"{device.self_identity} "
                     f"does not match expected {identity}"
                 )