Skip to content

Commit

Permalink
q-dev: pci devices draft
Browse files Browse the repository at this point in the history
fix DeviceInterface.unknown
  • Loading branch information
piotrbartman committed Jun 12, 2024
1 parent 471e6a5 commit 7632208
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 88 deletions.
27 changes: 3 additions & 24 deletions qubes/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import string
import subprocess
import pathlib
import sys

import libvirt
import lxml.etree
Expand Down Expand Up @@ -1204,31 +1205,9 @@ async def vm_device_available(self, endpoint):
# the list is empty
self.enforce(len(devices) <= 1)
devices = self.fire_event_for_filter(devices, devclass=devclass)

# dev_info = {dev.ident: dev.serialize() for dev in devices}
dev_info = {}
for dev in devices:
# TODO:
if hasattr(dev, "serialize"):
properties_txt = dev.serialize().decode()
else:
non_default_attrs = set(attr for attr in dir(dev) if
not attr.startswith('_')).difference((
'backend_domain', 'ident', 'frontend_domain',
'description', 'options', 'regex'))
properties_txt = ' '.join(
'{}={!s}'.format(prop, value) for prop, value
in itertools.chain(
((key, getattr(dev, key)) for key in non_default_attrs),
# keep description as the last one, according to API
# specification
(('description', dev.description),)
))
self.enforce('\n' not in properties_txt)
dev_info[dev.ident] = properties_txt

dev_info = {dev.ident: dev.serialize().decode() for dev in devices}
return ''.join('{} {}\n'.format(ident, dev_info[ident])
for ident in sorted(dev_info))
for ident in sorted(dev_info))

@qubes.api.method('admin.vm.device.{endpoint}.List', endpoints=(ep.name
for ep in importlib.metadata.entry_points(group='qubes.devices')),
Expand Down
38 changes: 25 additions & 13 deletions qubes/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ def devclass(self, devclass: str):

class DeviceCategory(Enum):
"""
Category of peripheral device.
Arbitrarily selected interfaces that are important to users,
thus deserving special recognition such as a custom icon, etc.
"""
Other = "*******"

Expand Down Expand Up @@ -186,12 +187,16 @@ def from_str(interface_encoding: str) -> 'DeviceCategory':


class DeviceInterface:
"""
Peripheral device interface wrapper.
"""

def __init__(self, interface_encoding: str, devclass: Optional[str] = None):
ifc_padded = interface_encoding.ljust(6, '*')
if devclass:
if len(ifc_padded) > 6:
print(
f"interface_encoding is too long "
f"{interface_encoding=} is too long "
f"(is {len(interface_encoding)}, expected max. 6) "
f"for given {devclass=}",
file=sys.stderr
Expand All @@ -202,7 +207,7 @@ def __init__(self, interface_encoding: str, devclass: Optional[str] = None):
devclass = known_devclasses.get(interface_encoding[0], None)
if len(ifc_padded) > 7:
print(
f"interface_encoding is too long "
f"{interface_encoding=} is too long "
f"(is {len(interface_encoding)}, expected max. 7)",
file=sys.stderr
)
Expand All @@ -226,22 +231,29 @@ def category(self) -> DeviceCategory:
""" Immutable Device category such like: 'Mouse', 'Mass_Data' etc. """
return self._category

@property
def unknown(self) -> 'DeviceInterface':
return DeviceInterface(" ******")
@classmethod
def unknown(cls) -> 'DeviceInterface':
""" Value for unknown device interface. """
return cls(" ******")

@property
def __repr__(self):
return self._interface_encoding

@property
def __str__(self):
if self.devclass == "block":
return "Block device"
if self.devclass in ("usb", "pci"):
self._load_classes(self.devclass).get(
self._interface_encoding[1:],
f"Unclassified {self.devclass} device")
result = self._load_classes(self.devclass).get(
self._interface_encoding[1:], None)
if result is None:
result = self._load_classes(self.devclass).get(
self._interface_encoding[1:-2] + '**', None)
if result is None:
result = self._load_classes(self.devclass).get(
self._interface_encoding[1:-4] + '****', None)
if result is None:
result = f"Unclassified {self.devclass} device"
return result
return repr(self)

@staticmethod
Expand Down Expand Up @@ -410,7 +422,7 @@ def interfaces(self) -> List[DeviceInterface]:
Every device should have at least one interface.
"""
if not self._interfaces:
return [DeviceInterface.unknown]
return [DeviceInterface.unknown()]
return self._interfaces

@property
Expand Down Expand Up @@ -468,7 +480,7 @@ def serialize(self) -> bytes:
backend_domain_name.encode('ascii'))
properties += b' ' + base64.b64encode(backend_domain_prop)

interfaces = ''.join(ifc._interface_encoding for ifc in self.interfaces)
interfaces = ''.join(repr(ifc) for ifc in self.interfaces)
interfaces_prop = b'interfaces=' + str(interfaces).encode('ascii')
properties += b' ' + base64.b64encode(interfaces_prop)

Expand Down
109 changes: 58 additions & 51 deletions qubes/ext/pci.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import functools
import os
import re
import string
import subprocess
from typing import Optional, List
import sys
from typing import Optional, List, Dict, Tuple

import libvirt
import lxml
Expand Down Expand Up @@ -98,6 +100,20 @@ def pcidev_class(dev_xmldesc):
return "unknown"


def pcidev_interface(dev_xmldesc):
sysfs_path = dev_xmldesc.findtext('path')
assert sysfs_path
try:
with open(sysfs_path + '/class', encoding='ascii') as f_class:
class_id = f_class.read().strip()
except OSError:
return "000000"

if class_id.startswith('0x'):
class_id = class_id[2:]
return class_id


def attached_devices(app):
"""Return map device->domain-name for all currently attached devices"""

Expand Down Expand Up @@ -178,7 +194,7 @@ def vendor(self) -> str:
Lazy loaded.
"""
if self._vendor is None:
result = self._load_desc_from_qubesdb()["vendor"]
result = self._load_desc()["vendor"]
else:
result = self._vendor
return result
Expand All @@ -193,58 +209,13 @@ def product(self) -> str:
Lazy loaded.
"""
if self._product is None:
result = self._load_desc_from_qubesdb()["product"]
result = self._load_desc()["product"]
else:
result = self._product
return result

@property
def manufacturer(self) -> str:
"""
The name of the manufacturer of the device introduced by device itself
Could be empty string or "unknown".
Lazy loaded.
"""
if self._manufacturer is None:
result = self._load_desc_from_qubesdb()["manufacturer"]
else:
result = self._manufacturer
return result

@property
def name(self) -> str:
"""
The name of the device it introduced itself with (could be empty string)
Could be empty string or "unknown".
Lazy loaded.
"""
if self._name is None:
result = self._load_desc_from_qubesdb()["name"]
else:
result = self._name
return result

@property
def serial(self) -> str:
"""
The serial number of the device it introduced itself with.
Could be empty string or "unknown".
Lazy loaded.
"""
if self._serial is None:
result = self._load_desc_from_qubesdb()["serial"]
else:
result = self._serial
return result

@property
def interfaces(self) -> List[qubes.devices.DeviceCategory]:
def interfaces(self) -> List[qubes.devices.DeviceInterface]:
"""
List of device interfaces.
Expand All @@ -255,8 +226,10 @@ def interfaces(self) -> List[qubes.devices.DeviceCategory]:
self.backend_domain.app.vmm.libvirt_conn.nodeDeviceLookupByName(
self.libvirt_name
)
self._interfaces = [pcidev_class(lxml.etree.fromstring(
hostdev_details.XMLDesc()))]
interface_encoding = pcidev_interface(lxml.etree.fromstring(
hostdev_details.XMLDesc()))
self._interfaces = [qubes.devices.DeviceInterface(
interface_encoding, devclass='pci')]
return self._interfaces

@property
Expand Down Expand Up @@ -285,6 +258,40 @@ def description(self):
hostdev_details.XMLDesc()))
return self._description

def _load_desc(self) -> Dict[str, str]:
unknown = "unknown"
result = {"vendor": unknown,
"product": unknown,
"manufacturer": unknown,
"name": unknown,
"serial": unknown}
if not self.backend_domain.is_running():
# don't cache this value
return result
hostdev_details = \
self.backend_domain.app.vmm.libvirt_conn.nodeDeviceLookupByName(
self.libvirt_name
)
hostdev_xml = lxml.etree.fromstring(hostdev_details.XMLDesc())
self._vendor = result["vendor"] = hostdev_xml.findtext(
'capability/vendor')
self._product = result["product"] = hostdev_xml.findtext(
'capability/product')
return result

@staticmethod
def _sanitize(
untrusted_device_desc: bytes,
safe_chars: str =
string.ascii_letters + string.digits + string.punctuation + ' '
) -> str:
# b'USB\\x202.0\\x20Camera' -> 'USB 2.0 Camera'
untrusted_device_desc = untrusted_device_desc.decode(
'unicode_escape', errors='ignore')
return ''.join(
c if c in set(safe_chars) else '_' for c in untrusted_device_desc
)

@property
def frontend_domain(self):
# TODO: cache this
Expand Down

0 comments on commit 7632208

Please sign in to comment.