diff --git a/qubes/api/admin.py b/qubes/api/admin.py index bc0b5f484..59a53d2f4 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -28,6 +28,7 @@ import string import subprocess import pathlib +import sys import libvirt import lxml.etree @@ -1204,31 +1205,9 @@ async def vm_device_available(self, endpoint): # the list is empty self.enforce(len(devices) <= 1) devices = self.fire_event_for_filter(devices, devclass=devclass) - - # dev_info = {dev.ident: dev.serialize() for dev in devices} - dev_info = {} - for dev in devices: - # TODO: - if hasattr(dev, "serialize"): - properties_txt = dev.serialize().decode() - else: - non_default_attrs = set(attr for attr in dir(dev) if - not attr.startswith('_')).difference(( - 'backend_domain', 'ident', 'frontend_domain', - 'description', 'options', 'regex')) - properties_txt = ' '.join( - '{}={!s}'.format(prop, value) for prop, value - in itertools.chain( - ((key, getattr(dev, key)) for key in non_default_attrs), - # keep description as the last one, according to API - # specification - (('description', dev.description),) - )) - self.enforce('\n' not in properties_txt) - dev_info[dev.ident] = properties_txt - + dev_info = {dev.ident: dev.serialize().decode() for dev in devices} return ''.join('{} {}\n'.format(ident, dev_info[ident]) - for ident in sorted(dev_info)) + for ident in sorted(dev_info)) @qubes.api.method('admin.vm.device.{endpoint}.List', endpoints=(ep.name for ep in importlib.metadata.entry_points(group='qubes.devices')), diff --git a/qubes/devices.py b/qubes/devices.py index 3f402f556..4e0cfef5f 100644 --- a/qubes/devices.py +++ b/qubes/devices.py @@ -135,9 +135,10 @@ def devclass(self, devclass: str): class DeviceCategory(Enum): """ + Category of peripheral device. + Arbitrarily selected interfaces that are important to users, thus deserving special recognition such as a custom icon, etc. - """ Other = "*******" @@ -186,12 +187,16 @@ def from_str(interface_encoding: str) -> 'DeviceCategory': class DeviceInterface: + """ + Peripheral device interface wrapper. + """ + def __init__(self, interface_encoding: str, devclass: Optional[str] = None): ifc_padded = interface_encoding.ljust(6, '*') if devclass: if len(ifc_padded) > 6: print( - f"interface_encoding is too long " + f"{interface_encoding=} is too long " f"(is {len(interface_encoding)}, expected max. 6) " f"for given {devclass=}", file=sys.stderr @@ -202,7 +207,7 @@ def __init__(self, interface_encoding: str, devclass: Optional[str] = None): devclass = known_devclasses.get(interface_encoding[0], None) if len(ifc_padded) > 7: print( - f"interface_encoding is too long " + f"{interface_encoding=} is too long " f"(is {len(interface_encoding)}, expected max. 7)", file=sys.stderr ) @@ -226,22 +231,29 @@ def category(self) -> DeviceCategory: """ Immutable Device category such like: 'Mouse', 'Mass_Data' etc. """ return self._category - @property - def unknown(self) -> 'DeviceInterface': - return DeviceInterface(" ******") + @classmethod + def unknown(cls) -> 'DeviceInterface': + """ Value for unknown device interface. """ + return cls(" ******") - @property def __repr__(self): return self._interface_encoding - @property def __str__(self): if self.devclass == "block": return "Block device" if self.devclass in ("usb", "pci"): - self._load_classes(self.devclass).get( - self._interface_encoding[1:], - f"Unclassified {self.devclass} device") + result = self._load_classes(self.devclass).get( + self._interface_encoding[1:], None) + if result is None: + result = self._load_classes(self.devclass).get( + self._interface_encoding[1:-2] + '**', None) + if result is None: + result = self._load_classes(self.devclass).get( + self._interface_encoding[1:-4] + '****', None) + if result is None: + result = f"Unclassified {self.devclass} device" + return result return repr(self) @staticmethod @@ -410,7 +422,7 @@ def interfaces(self) -> List[DeviceInterface]: Every device should have at least one interface. """ if not self._interfaces: - return [DeviceInterface.unknown] + return [DeviceInterface.unknown()] return self._interfaces @property @@ -468,7 +480,7 @@ def serialize(self) -> bytes: backend_domain_name.encode('ascii')) properties += b' ' + base64.b64encode(backend_domain_prop) - interfaces = ''.join(ifc._interface_encoding for ifc in self.interfaces) + interfaces = ''.join(repr(ifc) for ifc in self.interfaces) interfaces_prop = b'interfaces=' + str(interfaces).encode('ascii') properties += b' ' + base64.b64encode(interfaces_prop) diff --git a/qubes/ext/pci.py b/qubes/ext/pci.py index 5f5df7988..0243dc68a 100644 --- a/qubes/ext/pci.py +++ b/qubes/ext/pci.py @@ -23,8 +23,10 @@ import functools import os import re +import string import subprocess -from typing import Optional, List +import sys +from typing import Optional, List, Dict, Tuple import libvirt import lxml @@ -98,6 +100,20 @@ def pcidev_class(dev_xmldesc): return "unknown" +def pcidev_interface(dev_xmldesc): + sysfs_path = dev_xmldesc.findtext('path') + assert sysfs_path + try: + with open(sysfs_path + '/class', encoding='ascii') as f_class: + class_id = f_class.read().strip() + except OSError: + return "000000" + + if class_id.startswith('0x'): + class_id = class_id[2:] + return class_id + + def attached_devices(app): """Return map device->domain-name for all currently attached devices""" @@ -178,7 +194,7 @@ def vendor(self) -> str: Lazy loaded. """ if self._vendor is None: - result = self._load_desc_from_qubesdb()["vendor"] + result = self._load_desc()["vendor"] else: result = self._vendor return result @@ -193,58 +209,13 @@ def product(self) -> str: Lazy loaded. """ if self._product is None: - result = self._load_desc_from_qubesdb()["product"] + result = self._load_desc()["product"] else: result = self._product return result @property - def manufacturer(self) -> str: - """ - The name of the manufacturer of the device introduced by device itself - - Could be empty string or "unknown". - - Lazy loaded. - """ - if self._manufacturer is None: - result = self._load_desc_from_qubesdb()["manufacturer"] - else: - result = self._manufacturer - return result - - @property - def name(self) -> str: - """ - The name of the device it introduced itself with (could be empty string) - - Could be empty string or "unknown". - - Lazy loaded. - """ - if self._name is None: - result = self._load_desc_from_qubesdb()["name"] - else: - result = self._name - return result - - @property - def serial(self) -> str: - """ - The serial number of the device it introduced itself with. - - Could be empty string or "unknown". - - Lazy loaded. - """ - if self._serial is None: - result = self._load_desc_from_qubesdb()["serial"] - else: - result = self._serial - return result - - @property - def interfaces(self) -> List[qubes.devices.DeviceCategory]: + def interfaces(self) -> List[qubes.devices.DeviceInterface]: """ List of device interfaces. @@ -255,8 +226,10 @@ def interfaces(self) -> List[qubes.devices.DeviceCategory]: self.backend_domain.app.vmm.libvirt_conn.nodeDeviceLookupByName( self.libvirt_name ) - self._interfaces = [pcidev_class(lxml.etree.fromstring( - hostdev_details.XMLDesc()))] + interface_encoding = pcidev_interface(lxml.etree.fromstring( + hostdev_details.XMLDesc())) + self._interfaces = [qubes.devices.DeviceInterface( + interface_encoding, devclass='pci')] return self._interfaces @property @@ -285,6 +258,40 @@ def description(self): hostdev_details.XMLDesc())) return self._description + def _load_desc(self) -> Dict[str, str]: + unknown = "unknown" + result = {"vendor": unknown, + "product": unknown, + "manufacturer": unknown, + "name": unknown, + "serial": unknown} + if not self.backend_domain.is_running(): + # don't cache this value + return result + hostdev_details = \ + self.backend_domain.app.vmm.libvirt_conn.nodeDeviceLookupByName( + self.libvirt_name + ) + hostdev_xml = lxml.etree.fromstring(hostdev_details.XMLDesc()) + self._vendor = result["vendor"] = hostdev_xml.findtext( + 'capability/vendor') + self._product = result["product"] = hostdev_xml.findtext( + 'capability/product') + return result + + @staticmethod + def _sanitize( + untrusted_device_desc: bytes, + safe_chars: str = + string.ascii_letters + string.digits + string.punctuation + ' ' + ) -> str: + # b'USB\\x202.0\\x20Camera' -> 'USB 2.0 Camera' + untrusted_device_desc = untrusted_device_desc.decode( + 'unicode_escape', errors='ignore') + return ''.join( + c if c in set(safe_chars) else '_' for c in untrusted_device_desc + ) + @property def frontend_domain(self): # TODO: cache this