diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index 34f56cc..76f2255 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -21,16 +21,19 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # import asyncio +import base64 import collections import fcntl import grp +import itertools import os import re import string import subprocess -import errno import tempfile +from enum import Enum +from typing import List, Optional, Dict, Tuple import qubes.devices import qubes.ext @@ -41,33 +44,197 @@ usb_connected_to_re = re.compile(br"^[a-zA-Z][a-zA-Z0-9_.-]*$") usb_device_hw_ident_re = re.compile(r'^[0-9a-f]{4}:[0-9a-f]{4} ') + class USBDevice(qubes.devices.DeviceInfo): # pylint: disable=too-few-public-methods def __init__(self, backend_domain, ident): - super(USBDevice, self).__init__(backend_domain, ident, None) + # super(USBDevice, self).__init__(backend_domain, ident, None) + super(USBDevice, self).__init__( + backend_domain=backend_domain, ident=ident, devclass="usb") self._qdb_ident = ident.replace('.', '_') self._qdb_path = '/qubes-usb-devices/' + self._qdb_ident - # lazy loading - self._description = None + @property + def vendor(self) -> str: + """ + Device vendor from local database `/usr/share/hwdata/usb.ids` + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._vendor is None: + result = self._load_desc_from_qubesdb()["vendor"] + else: + result = self._vendor + return result + + @property + def product(self) -> str: + """ + Device name from local database `/usr/share/hwdata/usb.ids` + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._product is None: + result = self._load_desc_from_qubesdb()["product"] + else: + result = self._product + return result + + @property + def manufacturer(self) -> str: + """ + The name of the manufacturer of the device introduced by device itself + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._manufacturer is None: + result = self._load_desc_from_qubesdb()["manufacturer"] + else: + result = self._manufacturer + return result + + @property + def name(self) -> str: + """ + The name of the device it introduced itself with (could be empty string) + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._name is None: + result = self._load_desc_from_qubesdb()["name"] + else: + result = self._name + return result + + @property + def serial(self) -> str: + """ + The serial number of the device it introduced itself with. + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._serial is None: + result = self._load_desc_from_qubesdb()["serial"] + else: + result = self._serial + return result + + @property + def interfaces(self) -> List[qubes.devices.DeviceInterface]: + """ + List of device interfaces. + + Every device should have at least one interface. + """ + if (len(self._interfaces) == 1 + and self._interfaces[0] == qubes.devices.DeviceInterface.Other): + result = self._load_interfaces_from_qubesdb() + else: + result = self._interfaces + return result @property - def description(self): - if self._description is None: - if not self.backend_domain.is_running(): - # don't cache this value - return "Unknown - domain not running" - untrusted_device_desc = self.backend_domain.untrusted_qdb.read( + def parent_device(self) -> Optional[qubes.devices.DeviceInfo]: + """ + The parent device if any. + + USB device has no parents. + """ + return None + + # @property + # def port_id(self) -> str: + # """ + # Which port the device is connected to. + # """ + # return self.ident.split("-")[1] + + def _load_interfaces_from_qubesdb(self) \ + -> List[qubes.devices.DeviceInterface]: + result = [] + if not self.backend_domain.is_running(): + # don't cache this value + return result + untrusted_interfaces: bytes = ( + self.backend_domain.untrusted_qdb.read( + self._qdb_path + '/interfaces') + ) + if not untrusted_interfaces: + return result + self._interfaces = result = [ + qubes.devices.DeviceInterface.from_str( + self._sanitize(ifc, safe_chars=string.hexdigits) + ) + for ifc in untrusted_interfaces.split(b':') + if ifc + ] + return result + + def _load_desc_from_qubesdb(self) -> Dict[str, str]: + unknown = "unknown" + result = {"vendor": unknown, + "product": unknown, + "manufacturer": unknown, + "name": unknown, + "serial": unknown} + if not self.backend_domain.is_running(): + # don't cache this value + return result + untrusted_device_desc: bytes = ( + self.backend_domain.untrusted_qdb.read( self._qdb_path + '/desc') - if not untrusted_device_desc: - return 'Unknown' - self._description = self._sanitize_desc(untrusted_device_desc) - hw_ident_match = usb_device_hw_ident_re.match(self._description) - if hw_ident_match: - self._description = self._description[ - len(hw_ident_match.group(0)):] - return self._description + ) + if not untrusted_device_desc: + return result + try: + (untrusted_vendor_product, untrusted_manufacturer, + untrusted_name, untrusted_serial + ) = untrusted_device_desc.split(b' ') + untrusted_vendor, untrusted_product = ( + untrusted_vendor_product.split(b':')) + except ValueError: + # desc doesn't contain correctly formatted data, + # but it is not empty. We cannot parse it, + # but we can still put it to the `serial` just to provide + # some information to the user. + untrusted_vendor, untrusted_product, untrusted_manufacturer = ( + unknown.encode(), unknown.encode(), unknown.encode()) + untrusted_name = untrusted_device_desc.replace(b' ', b'_') + vendor, product = self._get_vendor_and_product_names( + self._sanitize(untrusted_vendor), + self._sanitize(untrusted_product), + ) + self._desc_vendor = result["vendor"] = vendor + self._desc_product = result["product"] = product + self._desc_manufacturer = result["manufacturer"] = ( + self._sanitize(untrusted_manufacturer)) + self._desc_name = result["name"] = ( + self._sanitize(untrusted_name)) + return result + + @staticmethod + def _sanitize( + untrusted_device_desc: bytes, + safe_chars: str = + string.ascii_letters + string.digits + string.punctuation + ' ' + ) -> str: + # b'USB\\x202.0\\x20Camera' -> 'USB 2.0 Camera' + untrusted_device_desc = untrusted_device_desc.decode( + 'unicode_escape', errors='ignore') + return ''.join( + c if c in set(safe_chars) else '_' for c in untrusted_device_desc + ) @property def frontend_domain(self): @@ -96,15 +263,61 @@ def frontend_domain(self): return connected_to @staticmethod - def _sanitize_desc(untrusted_device_desc): - untrusted_device_desc = untrusted_device_desc.decode('ascii', - errors='ignore') - safe_set = set(string.ascii_letters + string.digits + - string.punctuation + ' ') - return ''.join( - c if c in safe_set else '_' for c in untrusted_device_desc - ) + def _get_vendor_and_product_names( + vendor_id: str, product_id: str + ) -> Tuple[str, str]: + """ + Return tuple of vendor's and product's names for the ids. + + If the id is not known return ("unknown", "unknown"). + """ + return (USBDevice._load_usb_known_devices() + .get(vendor_id, dict()) + .get(product_id, ("unknown", "unknown")) + ) + + @staticmethod + def _load_usb_known_devices() -> Dict[str, Dict[str, Tuple[str, str]]]: + """ + List of known device vendors, devices and interfaces. + + result[vendor_id][device_id] = (vendor_name, product_name) + """ + # Syntax: + # vendor vendor_name <-- 2 spaces between + # device device_name <-- single tab + # interface interface_name <-- two tabs + # ... + # C class class_name + # subclass subclass_name <-- single tab + # prog-if prog-if_name <-- two tabs + result = {} + with open('/usr/share/hwdata/usb.ids', + encoding='utf-8', errors='ignore') as usb_ids: + for line in usb_ids.readlines(): + line = line.rstrip() + if line.startswith('#'): + # skip comments + continue + elif not line: + # skip empty lines + continue + elif line.startswith('\t\t'): + # skip interfaces + continue + elif line.startswith('C '): + # description of classes starts here, we can finish + break + elif line.startswith('\t'): + # save vendor, device pair + device_id, _, device_name = line[1:].split(' ', 2) + result[vendor_id][device_id] = vendor_name, device_name + else: + # new vendor + vendor_id, _, vendor_name = line[:].split(' ', 2) + result[vendor_id] = {} + return result class USBProxyNotInstalled(qubes.exc.QubesException): pass @@ -170,7 +383,7 @@ class USBDeviceExtension(qubes.ext.Extension): def __init__(self): super(USBDeviceExtension, self).__init__() - #include dom0 devices in listing only when usb-proxy is really + # include dom0 devices in listing only when usb-proxy is really # installed there self.usb_proxy_installed_in_dom0 = os.path.exists( '/etc/qubes-rpc/qubes.USB') @@ -178,7 +391,7 @@ def __init__(self): @qubes.ext.handler('domain-init', 'domain-load') def on_domain_init_load(self, vm, event): - '''Initialize watching for changes''' + """Initialize watching for changes""" # pylint: disable=unused-argument,no-self-use vm.watch_qdb_path('/qubes-usb-devices') if event == 'domain-load': @@ -200,7 +413,7 @@ async def _attach_and_notify(self, vm, device, options): @qubes.ext.handler('domain-qdb-change:/qubes-usb-devices') def on_qdb_change(self, vm, event, path): - '''A change in QubesDB means a change in device list''' + """A change in QubesDB means a change in device list""" # pylint: disable=unused-argument,no-self-use vm.fire_event('device-list-change:usb') current_devices = dict((dev.ident, dev.frontend_domain)