diff --git a/qubes/devices.py b/qubes/devices.py index 4faad4bb3..3f402f556 100644 --- a/qubes/devices.py +++ b/qubes/devices.py @@ -133,48 +133,146 @@ def devclass(self, devclass: str): self.__bus = devclass -class DeviceInterface(Enum): - # USB interfaces: - # https://www.usb.org/defined-class-codes#anchor_BaseClass03h - Other = "******" - USB_Audio = "01****" - USB_CDC = "02****" # Communications Device Class - USB_HID = "03****" - USB_HID_Keyboard = "03**01" - USB_HID_Mouse = "03**02" - # USB_Physical = "05****" - # USB_Still_Imaging = "06****" # Camera - USB_Printer = "07****" - USB_Mass_Storage = "08****" - USB_Hub = "09****" - USB_CDC_Data = "0a****" - USB_Smart_Card = "0b****" - # USB_Content_Security = "0d****" - USB_Video = "0e****" # Video Camera - # USB_Personal_Healthcare = "0f****" - USB_Audio_Video = "10****" - # USB_Billboard = "11****" - # USB_C_Bridge = "12****" - # and more... +class DeviceCategory(Enum): + """ + Arbitrarily selected interfaces that are important to users, + thus deserving special recognition such as a custom icon, etc. + + """ + Other = "*******" + + Communication = ("u02****", "p07****") # eg. modems + Input = ("u03****", "p09****") # HID etc. + Keyboard = ("u03**01", "p0900**") + Mouse = ("u03**02", "p0902**") + Printer = ("u07****",) + Scanner = ("p0903**",) + # Multimedia = Audio, Video, Displays etc. + Multimedia = ("u01****", "u0e****", "u06****", "u10****", "p03****", + "p04****") + Wireless = ("ue0****", "p0d****") + Bluetooth = ("ue00101", "p0d11**") + Mass_Data = ("b******", "u08****", "p01****") + Network = ("p02****",) + Memory = ("p05****",) + PCI_Bridge = ("p06****",) + Docking_Station = ("p0a****",) + Processor = ("p0b****", "p40****") + PCI_Serial_Bus = ("p0c****",) + PCI_USB = ("p0c03**",) @staticmethod - def from_str(interface_encoding: str) -> 'DeviceInterface': - result = DeviceInterface.Other + def from_str(interface_encoding: str) -> 'DeviceCategory': + result = DeviceCategory.Other + if len(interface_encoding) != len(DeviceCategory.Other.value): + return result best_score = 0 - for interface in DeviceInterface: - pattern = interface.value - score = 0 - for t, p in zip(interface_encoding, pattern): - if t == p: - score += 1 - elif p != "*": - score = -1 # inconsistent with pattern - break + for interface in DeviceCategory: + for pattern in interface.value: + score = 0 + for t, p in zip(interface_encoding, pattern): + if t == p: + score += 1 + elif p != "*": + score = -1 # inconsistent with pattern + break - if score > best_score: - best_score = score - result = interface + if score > best_score: + best_score = score + result = interface + + return result + + +class DeviceInterface: + def __init__(self, interface_encoding: str, devclass: Optional[str] = None): + ifc_padded = interface_encoding.ljust(6, '*') + if devclass: + if len(ifc_padded) > 6: + print( + f"interface_encoding is too long " + f"(is {len(interface_encoding)}, expected max. 6) " + f"for given {devclass=}", + file=sys.stderr + ) + ifc_full = devclass[0] + ifc_padded + else: + known_devclasses = {'p': 'pci', 'u': 'usb', 'b': 'block'} + devclass = known_devclasses.get(interface_encoding[0], None) + if len(ifc_padded) > 7: + print( + f"interface_encoding is too long " + f"(is {len(interface_encoding)}, expected max. 7)", + file=sys.stderr + ) + ifc_full = ifc_padded + elif len(ifc_padded) == 6: + ifc_full = ' ' + ifc_padded + else: + ifc_full = ifc_padded + + self._devclass = devclass + self._interface_encoding = ifc_full + self._category = DeviceCategory.from_str(self._interface_encoding) + + @property + def devclass(self) -> Optional[str]: + """ Immutable Device class such like: 'usb', 'pci' etc. """ + return self._devclass + + @property + def category(self) -> DeviceCategory: + """ Immutable Device category such like: 'Mouse', 'Mass_Data' etc. """ + return self._category + + @property + def unknown(self) -> 'DeviceInterface': + return DeviceInterface(" ******") + + @property + def __repr__(self): + return self._interface_encoding + + @property + def __str__(self): + if self.devclass == "block": + return "Block device" + if self.devclass in ("usb", "pci"): + self._load_classes(self.devclass).get( + self._interface_encoding[1:], + f"Unclassified {self.devclass} device") + return repr(self) + + @staticmethod + def _load_classes(bus: str): + """ + List of known device classes, subclasses and programming interfaces. + """ + # Syntax: + # C class class_name + # subclass subclass_name <-- single tab + # prog-if prog-if_name <-- two tabs + result = {} + with open(f'/usr/share/hwdata/{bus}.ids', + encoding='utf-8', errors='ignore') as pciids: + class_id = None + subclass_id = None + for line in pciids.readlines(): + line = line.rstrip() + if line.startswith('\t\t') and class_id and subclass_id: + (progif_id, _, progif_name) = line[2:].split(' ', 2) + result[class_id + subclass_id + progif_id] = \ + f"{class_name}: {subclass_name} ({progif_name})" + elif line.startswith('\t') and class_id: + (subclass_id, _, subclass_name) = line[1:].split(' ', 2) + # store both prog-if specific entry and generic one + result[class_id + subclass_id + '**'] = \ + f"{class_name}: {subclass_name}" + elif line.startswith('C '): + (_, class_id, _, class_name) = line.split(' ', 3) + result[class_id + '****'] = class_name + subclass_id = None return result @@ -312,7 +410,7 @@ def interfaces(self) -> List[DeviceInterface]: Every device should have at least one interface. """ if not self._interfaces: - return [DeviceInterface.Other] + return [DeviceInterface.unknown] return self._interfaces @property @@ -370,7 +468,7 @@ def serialize(self) -> bytes: backend_domain_name.encode('ascii')) properties += b' ' + base64.b64encode(backend_domain_prop) - interfaces = ''.join(ifc.value for ifc in self.interfaces) + interfaces = ''.join(ifc._interface_encoding for ifc in self.interfaces) interfaces_prop = b'interfaces=' + str(interfaces).encode('ascii') properties += b' ' + base64.b64encode(interfaces_prop) @@ -435,8 +533,8 @@ def _deserialize( interfaces = properties['interfaces'] interfaces = [ - DeviceInterface.from_str(interfaces[i:i + 6]) - for i in range(0, len(interfaces), 6)] + DeviceInterface(interfaces[i:i + 7]) + for i in range(0, len(interfaces), 7)] properties['interfaces'] = interfaces if 'parent' in properties: @@ -563,10 +661,9 @@ def __init__(self, vm, bus): 'qubes.devices', self._bus) async def attach(self, device_assignment: DeviceAssignment): - '''Attach (add) device to domain. - - :param DeviceInfo device: device object - ''' + """ + Attach device to domain. + """ if device_assignment.devclass is None: device_assignment.devclass = self._bus @@ -623,10 +720,9 @@ def update_persistent(self, device: DeviceInfo, persistent: bool): self._set.discard(assignment) async def detach(self, device_assignment: DeviceAssignment): - '''Detach (remove) device from domain. - - :param DeviceInfo device: device object - ''' + """ + Detach device from domain. + """ if device_assignment.devclass is None: device_assignment.devclass = self._bus diff --git a/qubes/ext/block.py b/qubes/ext/block.py index 175b03d6f..2b6112c63 100644 --- a/qubes/ext/block.py +++ b/qubes/ext/block.py @@ -22,7 +22,7 @@ import collections import re import string -from typing import Optional +from typing import Optional, List import lxml.etree @@ -111,6 +111,15 @@ def device_node(self): """Device node in backend domain""" return '/dev/' + self.ident.replace('_', '/') + @property + def interfaces(self) -> List[qubes.devices.DeviceInterface]: + """ + List of device interfaces. + + Every device should have at least one interface. + """ + return [qubes.devices.DeviceInterface("******", "block")] + @property def parent_device(self) -> Optional[qubes.devices.Device]: """ diff --git a/qubes/ext/pci.py b/qubes/ext/pci.py index 09f8839da..5f5df7988 100644 --- a/qubes/ext/pci.py +++ b/qubes/ext/pci.py @@ -24,6 +24,8 @@ import os import re import subprocess +from typing import Optional, List + import libvirt import lxml import lxml.etree @@ -83,7 +85,7 @@ def pcidev_class(dev_xmldesc): with open(sysfs_path + '/class', encoding='ascii') as f_class: class_id = f_class.read().strip() except OSError: - return "Unknown" + return "unknown" if not qubes.ext.pci.pci_classes: qubes.ext.pci.pci_classes = load_pci_classes() @@ -93,7 +95,7 @@ def pcidev_class(dev_xmldesc): # ignore prog-if return qubes.ext.pci.pci_classes[class_id[0:4]] except KeyError: - return "Unknown" + return "unknown" def attached_devices(app): @@ -166,11 +168,111 @@ def __init__(self, backend_domain, ident, libvirt_name=None): # lazy loading self._description = None + @property + def vendor(self) -> str: + """ + Device vendor from local database `/usr/share/hwdata/usb.ids` + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._vendor is None: + result = self._load_desc_from_qubesdb()["vendor"] + else: + result = self._vendor + return result + + @property + def product(self) -> str: + """ + Device name from local database `/usr/share/hwdata/usb.ids` + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._product is None: + result = self._load_desc_from_qubesdb()["product"] + else: + result = self._product + return result + + @property + def manufacturer(self) -> str: + """ + The name of the manufacturer of the device introduced by device itself + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._manufacturer is None: + result = self._load_desc_from_qubesdb()["manufacturer"] + else: + result = self._manufacturer + return result + + @property + def name(self) -> str: + """ + The name of the device it introduced itself with (could be empty string) + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._name is None: + result = self._load_desc_from_qubesdb()["name"] + else: + result = self._name + return result + + @property + def serial(self) -> str: + """ + The serial number of the device it introduced itself with. + + Could be empty string or "unknown". + + Lazy loaded. + """ + if self._serial is None: + result = self._load_desc_from_qubesdb()["serial"] + else: + result = self._serial + return result + + @property + def interfaces(self) -> List[qubes.devices.DeviceCategory]: + """ + List of device interfaces. + + Every device should have at least one interface. + """ + if self._interfaces is None: + hostdev_details = \ + self.backend_domain.app.vmm.libvirt_conn.nodeDeviceLookupByName( + self.libvirt_name + ) + self._interfaces = [pcidev_class(lxml.etree.fromstring( + hostdev_details.XMLDesc()))] + return self._interfaces + + @property + def parent_device(self) -> Optional[qubes.devices.DeviceInfo]: + """ + The parent device if any. + + PCI device has no parents. + """ + return None + @property def libvirt_name(self): # pylint: disable=no-member # noinspection PyUnresolvedReferences - return 'pci_0000_{}_{}_{}'.format(self.bus, self.device, self.function) + return f'pci_0000_{self.bus}_{self.device}_{self.function}' @property def description(self): @@ -183,11 +285,11 @@ def description(self): hostdev_details.XMLDesc())) return self._description - # @property - # def frontend_domain(self): # TODO: possibly could be removed - # # TODO: cache this - # all_attached = attached_devices(self.backend_domain.app) - # return all_attached.get(self.ident, None) + @property + def frontend_domain(self): + # TODO: cache this + all_attached = attached_devices(self.backend_domain.app) + return all_attached.get(self.ident, None) class PCIDeviceExtension(qubes.ext.Extension): diff --git a/qubes/tests/devices.py b/qubes/tests/devices.py index 562ea22ab..91e51b0c6 100644 --- a/qubes/tests/devices.py +++ b/qubes/tests/devices.py @@ -21,7 +21,7 @@ # import qubes.devices -from qubes.devices import DeviceInfo, DeviceInterface +from qubes.devices import DeviceInfo, DeviceCategory import qubes.tests @@ -245,7 +245,7 @@ def test_010_serialize(self): manufacturer="", name="Some untrusted garbage", serial=None, - interfaces=[DeviceInterface.Other, DeviceInterface.USB_HID], + interfaces=[DeviceCategory.Other, DeviceCategory.USB_HID], # additional_info="", # TODO # date="06.12.23", # TODO ) @@ -277,7 +277,7 @@ def test_020_deserialize(self): manufacturer="", name="Some untrusted garbage", serial=None, - interfaces=[DeviceInterface.Other, DeviceInterface.USB_HID], + interfaces=[DeviceCategory.Other, DeviceCategory.USB_HID], # additional_info="", # TODO # date="06.12.23", # TODO )