From a113b12075c41137e3c537faafc16470118bc862 Mon Sep 17 00:00:00 2001 From: Piotr Bartman Date: Tue, 23 Jan 2024 13:20:45 +0100 Subject: [PATCH] q-dev: readable serialization --- qubes/devices.py | 97 +++++++++++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 35 deletions(-) diff --git a/qubes/devices.py b/qubes/devices.py index e64b70624..c0c223b6d 100644 --- a/qubes/devices.py +++ b/qubes/devices.py @@ -55,7 +55,6 @@ `domain-qdb-change:path`) to detect changes and fire `device-list-change:class` event. """ -import base64 import itertools import sys from enum import Enum @@ -88,6 +87,12 @@ class DeviceAlreadyAssigned(qubes.exc.QubesException, KeyError): """ +class UnexpectedDeviceProperty(qubes.exc.QubesException, ValueError): + """ + Device has unexpected property such as backend_domain, devclass etc. + """ + + class Device: def __init__(self, backend_domain, ident, devclass=None): self.__backend_domain = backend_domain @@ -230,7 +235,7 @@ def __init__(self, interface_encoding: str, devclass: Optional[str] = None): ) ifc_full = ifc_padded elif len(ifc_padded) == 6: - ifc_full = ' ' + ifc_padded + ifc_full = '?' + ifc_padded else: ifc_full = ifc_padded @@ -251,7 +256,7 @@ def category(self) -> DeviceCategory: @classmethod def unknown(cls) -> 'DeviceInterface': """ Value for unknown device interface. """ - return cls(" ******") + return cls("?******") def __repr__(self): return self._interface_encoding @@ -482,26 +487,27 @@ def serialize(self) -> bytes: 'ident', 'devclass', 'vendor', 'product', 'manufacturer', 'name', 'serial'} properties = b' '.join( - base64.b64encode(f'{prop}={value!s}'.encode('ascii')) + f'{prop}={self.serialize_str(value)}'.encode('ascii') for prop, value in ( (key, getattr(self, key)) for key in default_attrs) ) - backend_domain_name = self.backend_domain.name - backend_domain_prop = (b'backend_domain=' + - backend_domain_name.encode('ascii')) - properties += b' ' + base64.b64encode(backend_domain_prop) + back_name = self.serialize_str(self.backend_domain.name) + backend_domain_prop = (b"backend_domain=" + back_name.encode('ascii')) + properties += b' ' + backend_domain_prop - interfaces = ''.join(repr(ifc) for ifc in self.interfaces) - interfaces_prop = b'interfaces=' + str(interfaces).encode('ascii') - properties += b' ' + base64.b64encode(interfaces_prop) + interfaces = self.serialize_str( + ''.join(repr(ifc) for ifc in self.interfaces)) + interfaces_prop = (b'interfaces=' + interfaces.encode('ascii')) + properties += b' ' + interfaces_prop if self.parent_device is not None: - parent_prop = b'parent=' + self.parent_device.ident.encode('ascii') - properties += b' ' + base64.b64encode(parent_prop) + parent_ident = self.serialize_str(self.parent_device.ident) + parent_prop = (b'parent=' + parent_ident.encode('ascii')) + properties += b' ' + parent_prop data = b' '.join( - base64.b64encode(f'_{prop}={value!s}'.encode('ascii')) + f'_{prop}={self.serialize_str(value)}'.encode('ascii') for prop, value in ((key, self.data[key]) for key in self.data) ) if data: @@ -520,7 +526,7 @@ def deserialize( result = DeviceInfo._deserialize( cls, serialization, expected_backend_domain, expected_devclass) except Exception as exc: - print(exc, file=sys.stderr) # TODO + print(exc, file=sys.stderr) ident = serialization.split(b' ')[0].decode( 'ascii', errors='ignore') result = UnknownDevice( @@ -537,23 +543,37 @@ def _deserialize( expected_backend_domain: 'qubes.vm.BaseVM', expected_devclass: Optional[str] = None, ) -> 'DeviceInfo': - properties_str = [ - base64.b64decode(line).decode('ascii', errors='ignore') - for line in serialization.split(b' ')[1:]] + decoded = serialization.decode('ascii', errors='ignore') + _ident, _, rest = decoded.partition(' ') + keys = [] + values = [] + key, _, rest = rest.partition("='") + keys.append(key) + while "='" in rest: + value_key, _, rest = rest.partition("='") + value, _, key = value_key.rpartition("' ") + values.append(DeviceInfo.deserialize_str(value)) + keys.append(key) + value = rest[:-1] # ending ' + values.append(DeviceInfo.deserialize_str(value)) properties = dict() - for line in properties_str: - key, _, param = line.partition("=") + for key, value in zip(keys, values): if key.startswith("_"): - properties[key[1:]] = param + # it's handled in cls.__init__ + properties[key[1:]] = value else: - properties[key] = param + properties[key] = value if properties['backend_domain'] != expected_backend_domain.name: - raise ValueError("TODO") # TODO + raise UnexpectedDeviceProperty( + f"Got device exposed by {properties['backend_domain']}" + f"when expected devices from {expected_backend_domain.name}.") properties['backend_domain'] = expected_backend_domain - # if expected_devclass and properties['devclass'] != expected_devclass: - # raise ValueError("TODO") # TODO + if expected_devclass and properties['devclass'] != expected_devclass: + raise UnexpectedDeviceProperty( + f"Got {properties['devclass']} device " + f"when expected {expected_devclass}.") interfaces = properties['interfaces'] interfaces = [ @@ -569,6 +589,14 @@ def _deserialize( return cls(**properties) + @staticmethod + def serialize_str(string: str): + return repr(string) + + @staticmethod + def deserialize_str(string: str): + return string.replace("\\\'", "'") + @property def frontend_domain(self): return self.data.get("frontend_domain", None) @@ -690,7 +718,7 @@ class DeviceCollection: :param vm: VM for which we manage devices :param bus: device bus - This class emits following events on VM object: # TODO pre-assign, assign, pre-unassign, unassign + This class emits following events on VM object: .. event:: device-added: (device) @@ -802,7 +830,6 @@ async def assign(self, device_assignment: DeviceAssignment): 'device {!s} of class {} already assigned to {!s}'.format( device, self._bus, self._vm)) - # TODO: check if needed await self._vm.fire_event_async( 'device-pre-assign:' + self._bus, pre_event=True, device=device, options=device_assignment.options) @@ -859,27 +886,28 @@ async def update_assignment( else: await self.detach(assignment) - async def detach(self, device_assignment: DeviceAssignment): # TODO: argument should be just device + async def detach(self, device: Device): """ Detach device from domain. """ - for assignment in self.get_attached_devices(): - if device_assignment == assignment: + for assign in self.get_attached_devices(): + if device == assign: # load all options - device_assignment = assignment + assignment = assign break else: raise DeviceNotAssigned( - f'device {device_assignment.ident!s} of class {self._bus} not ' + f'device {device.ident!s} of class {self._bus} not ' f'attached to {self._vm!s}') - if device_assignment.required and not self._vm.is_halted(): + if assignment.required and not self._vm.is_halted(): raise qubes.exc.QubesVMNotHaltedError( self._vm, "Can not detach a required device from a non halted qube. " "You need to unassign device first.") - device = device_assignment.device + # use local object + device = assignment.device await self._vm.fire_event_async( 'device-pre-detach:' + self._bus, pre_event=True, device=device) @@ -906,7 +934,6 @@ async def unassign(self, device_assignment: DeviceAssignment): "Can not remove an assignment from a non halted qube.") device = device_assignment.device - # TODO: check if needed await self._vm.fire_event_async( 'device-pre-unassign:' + self._bus, pre_event=True, device=device)