Skip to content

Commit

Permalink
q-dev: DeviceInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed Jun 12, 2024
1 parent 69adb22 commit 471e6a5
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 61 deletions.
194 changes: 145 additions & 49 deletions qubes/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,48 +133,146 @@ def devclass(self, devclass: str):
self.__bus = devclass


class DeviceInterface(Enum):
# USB interfaces:
# https://www.usb.org/defined-class-codes#anchor_BaseClass03h
Other = "******"
USB_Audio = "01****"
USB_CDC = "02****" # Communications Device Class
USB_HID = "03****"
USB_HID_Keyboard = "03**01"
USB_HID_Mouse = "03**02"
# USB_Physical = "05****"
# USB_Still_Imaging = "06****" # Camera
USB_Printer = "07****"
USB_Mass_Storage = "08****"
USB_Hub = "09****"
USB_CDC_Data = "0a****"
USB_Smart_Card = "0b****"
# USB_Content_Security = "0d****"
USB_Video = "0e****" # Video Camera
# USB_Personal_Healthcare = "0f****"
USB_Audio_Video = "10****"
# USB_Billboard = "11****"
# USB_C_Bridge = "12****"
# and more...
class DeviceCategory(Enum):
"""
Arbitrarily selected interfaces that are important to users,
thus deserving special recognition such as a custom icon, etc.
"""
Other = "*******"

Communication = ("u02****", "p07****") # eg. modems
Input = ("u03****", "p09****") # HID etc.
Keyboard = ("u03**01", "p0900**")
Mouse = ("u03**02", "p0902**")
Printer = ("u07****",)
Scanner = ("p0903**",)
# Multimedia = Audio, Video, Displays etc.
Multimedia = ("u01****", "u0e****", "u06****", "u10****", "p03****",
"p04****")
Wireless = ("ue0****", "p0d****")
Bluetooth = ("ue00101", "p0d11**")
Mass_Data = ("b******", "u08****", "p01****")
Network = ("p02****",)
Memory = ("p05****",)
PCI_Bridge = ("p06****",)
Docking_Station = ("p0a****",)
Processor = ("p0b****", "p40****")
PCI_Serial_Bus = ("p0c****",)
PCI_USB = ("p0c03**",)

@staticmethod
def from_str(interface_encoding: str) -> 'DeviceInterface':
result = DeviceInterface.Other
def from_str(interface_encoding: str) -> 'DeviceCategory':
result = DeviceCategory.Other
if len(interface_encoding) != len(DeviceCategory.Other.value):
return result
best_score = 0

for interface in DeviceInterface:
pattern = interface.value
score = 0
for t, p in zip(interface_encoding, pattern):
if t == p:
score += 1
elif p != "*":
score = -1 # inconsistent with pattern
break
for interface in DeviceCategory:
for pattern in interface.value:
score = 0
for t, p in zip(interface_encoding, pattern):
if t == p:
score += 1
elif p != "*":
score = -1 # inconsistent with pattern
break

if score > best_score:
best_score = score
result = interface
if score > best_score:
best_score = score
result = interface

return result


class DeviceInterface:
def __init__(self, interface_encoding: str, devclass: Optional[str] = None):
ifc_padded = interface_encoding.ljust(6, '*')
if devclass:
if len(ifc_padded) > 6:
print(
f"interface_encoding is too long "
f"(is {len(interface_encoding)}, expected max. 6) "
f"for given {devclass=}",
file=sys.stderr
)
ifc_full = devclass[0] + ifc_padded
else:
known_devclasses = {'p': 'pci', 'u': 'usb', 'b': 'block'}
devclass = known_devclasses.get(interface_encoding[0], None)
if len(ifc_padded) > 7:
print(
f"interface_encoding is too long "
f"(is {len(interface_encoding)}, expected max. 7)",
file=sys.stderr
)
ifc_full = ifc_padded
elif len(ifc_padded) == 6:
ifc_full = ' ' + ifc_padded
else:
ifc_full = ifc_padded

self._devclass = devclass
self._interface_encoding = ifc_full
self._category = DeviceCategory.from_str(self._interface_encoding)

@property
def devclass(self) -> Optional[str]:
""" Immutable Device class such like: 'usb', 'pci' etc. """
return self._devclass

@property
def category(self) -> DeviceCategory:
""" Immutable Device category such like: 'Mouse', 'Mass_Data' etc. """
return self._category

@property
def unknown(self) -> 'DeviceInterface':
return DeviceInterface(" ******")

@property
def __repr__(self):
return self._interface_encoding

@property
def __str__(self):
if self.devclass == "block":
return "Block device"
if self.devclass in ("usb", "pci"):
self._load_classes(self.devclass).get(
self._interface_encoding[1:],
f"Unclassified {self.devclass} device")
return repr(self)

@staticmethod
def _load_classes(bus: str):
"""
List of known device classes, subclasses and programming interfaces.
"""
# Syntax:
# C class class_name
# subclass subclass_name <-- single tab
# prog-if prog-if_name <-- two tabs
result = {}
with open(f'/usr/share/hwdata/{bus}.ids',
encoding='utf-8', errors='ignore') as pciids:
class_id = None
subclass_id = None
for line in pciids.readlines():
line = line.rstrip()
if line.startswith('\t\t') and class_id and subclass_id:
(progif_id, _, progif_name) = line[2:].split(' ', 2)
result[class_id + subclass_id + progif_id] = \
f"{class_name}: {subclass_name} ({progif_name})"
elif line.startswith('\t') and class_id:
(subclass_id, _, subclass_name) = line[1:].split(' ', 2)
# store both prog-if specific entry and generic one
result[class_id + subclass_id + '**'] = \
f"{class_name}: {subclass_name}"
elif line.startswith('C '):
(_, class_id, _, class_name) = line.split(' ', 3)
result[class_id + '****'] = class_name
subclass_id = None

return result

Expand Down Expand Up @@ -312,7 +410,7 @@ def interfaces(self) -> List[DeviceInterface]:
Every device should have at least one interface.
"""
if not self._interfaces:
return [DeviceInterface.Other]
return [DeviceInterface.unknown]
return self._interfaces

@property
Expand Down Expand Up @@ -370,7 +468,7 @@ def serialize(self) -> bytes:
backend_domain_name.encode('ascii'))
properties += b' ' + base64.b64encode(backend_domain_prop)

interfaces = ''.join(ifc.value for ifc in self.interfaces)
interfaces = ''.join(ifc._interface_encoding for ifc in self.interfaces)
interfaces_prop = b'interfaces=' + str(interfaces).encode('ascii')
properties += b' ' + base64.b64encode(interfaces_prop)

Expand Down Expand Up @@ -435,8 +533,8 @@ def _deserialize(

interfaces = properties['interfaces']
interfaces = [
DeviceInterface.from_str(interfaces[i:i + 6])
for i in range(0, len(interfaces), 6)]
DeviceInterface(interfaces[i:i + 7])
for i in range(0, len(interfaces), 7)]
properties['interfaces'] = interfaces

if 'parent' in properties:
Expand Down Expand Up @@ -563,10 +661,9 @@ def __init__(self, vm, bus):
'qubes.devices', self._bus)

async def attach(self, device_assignment: DeviceAssignment):
'''Attach (add) device to domain.
:param DeviceInfo device: device object
'''
"""
Attach device to domain.
"""

if device_assignment.devclass is None:
device_assignment.devclass = self._bus
Expand Down Expand Up @@ -623,10 +720,9 @@ def update_persistent(self, device: DeviceInfo, persistent: bool):
self._set.discard(assignment)

async def detach(self, device_assignment: DeviceAssignment):
'''Detach (remove) device from domain.
:param DeviceInfo device: device object
'''
"""
Detach device from domain.
"""

if device_assignment.devclass is None:
device_assignment.devclass = self._bus
Expand Down
11 changes: 10 additions & 1 deletion qubes/ext/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import collections
import re
import string
from typing import Optional
from typing import Optional, List

import lxml.etree

Expand Down Expand Up @@ -111,6 +111,15 @@ def device_node(self):
"""Device node in backend domain"""
return '/dev/' + self.ident.replace('_', '/')

@property
def interfaces(self) -> List[qubes.devices.DeviceInterface]:
"""
List of device interfaces.
Every device should have at least one interface.
"""
return [qubes.devices.DeviceInterface("******", "block")]

@property
def parent_device(self) -> Optional[qubes.devices.Device]:
"""
Expand Down
Loading

0 comments on commit 471e6a5

Please sign in to comment.