Skip to content

Commit

Permalink
q-dev: update device_protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed Jun 12, 2024
1 parent 8da3246 commit 54b9f9b
Showing 1 changed file with 76 additions and 14 deletions.
90 changes: 76 additions & 14 deletions qubes/device_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import string
import sys
from enum import Enum
from typing import Optional, Dict, Any, List, Union
from typing import Optional, Dict, Any, List, Union, Tuple

import qubes.utils

Expand All @@ -52,8 +52,19 @@ def qbool(value):


class Device:
ALLOWED_CHARS_KEY = string.digits + string.ascii_letters + '-_.'
ALLOWED_CHARS_PARAM = ALLOWED_CHARS_KEY + ',+:'
"""
Basic class of a *bus* device with *ident* exposed by a *backend domain*.
Attributes:
backend_domain (QubesVM): The domain which exposes devices,
e.g.`sys-usb`.
ident (str): A unique identifier for the device within
the backend domain.
devclass (str, optional): The class of the device (e.g., 'usb', 'pci').
"""
ALLOWED_CHARS_KEY = (
string.digits + string.ascii_letters + string.punctuation + ' ')
ALLOWED_CHARS_PARAM = ALLOWED_CHARS_KEY + string.punctuation + ' '

def __init__(self, backend_domain, ident, devclass=None):
self.__backend_domain = backend_domain
Expand All @@ -64,16 +75,20 @@ def __hash__(self):
return hash((str(self.backend_domain), self.ident))

def __eq__(self, other):
return (
self.backend_domain == other.backend_domain and
self.ident == other.ident
)
if isinstance(other, Device):
return (
self.backend_domain == other.backend_domain and
self.ident == other.ident
)
raise TypeError(f"Comparing instances of 'Device' and '{type(other)}' "
"is not supported")

def __lt__(self, other):
if isinstance(other, Device):
return (self.backend_domain.name, self.ident) < \
(other.backend_domain.name, other.ident)
raise NotImplementedError()
raise TypeError(f"Comparing instances of 'Device' and '{type(other)}' "
"is not supported")

def __repr__(self):
return "[%s]:%s" % (self.backend_domain, self.ident)
Expand Down Expand Up @@ -125,7 +140,20 @@ def devclass(self, devclass: str):
self.__bus = devclass

@classmethod
def unpack_properties(cls, untrusted_serialization: bytes):
def unpack_properties(
cls, untrusted_serialization: bytes
) -> Tuple[Dict, Dict]:
"""
Unpacks basic device properties from a serialized encoded string.
Returns:
tuple: A tuple containing two dictionaries, properties and options,
extracted from the serialization.
Raises:
ValueError: If unexpected characters are found in property
names or values.
"""
ut_decoded = untrusted_serialization.decode(
'ascii', errors='strict').strip()

Expand Down Expand Up @@ -170,7 +198,17 @@ def unpack_properties(cls, untrusted_serialization: bytes):
return properties, options

@staticmethod
def check_device_properties(expected_device, properties):
def check_device_properties(
expected_device: 'Device', properties: Dict[str, Any]):
"""
Validates properties against an expected device configuration.
Modifies `properties`.
Raises:
UnexpectedDeviceProperty: If any property does not match
the expected values.
"""
expected = expected_device
exp_vm_name = expected.backend_domain.name
if properties.get('backend_domain', exp_vm_name) != exp_vm_name:
Expand All @@ -194,10 +232,9 @@ def check_device_properties(expected_device, properties):
properties['devclass'] = expected.devclass



class DeviceCategory(Enum):
"""
Category of peripheral device.
Category of a peripheral device.
Arbitrarily selected interfaces that are important to users,
thus deserving special recognition such as a custom icon, etc.
Expand Down Expand Up @@ -228,6 +265,9 @@ class DeviceCategory(Enum):

@staticmethod
def from_str(interface_encoding: str) -> 'DeviceCategory':
"""
Returns `DeviceCategory` from data encoded in string.
"""
result = DeviceCategory.Other
if len(interface_encoding) != len(DeviceCategory.Other.value):
return result
Expand Down Expand Up @@ -527,7 +567,7 @@ def attachment(self) -> Optional[QubesVM]:

def serialize(self) -> bytes:
"""
Serialize object to be transmitted via Qubes API.
Serialize an object to be transmitted via Qubes API.
"""
# 'backend_domain', 'attachment', 'interfaces', 'data', 'parent_device'
# are not string, so they need special treatment
Expand Down Expand Up @@ -577,6 +617,9 @@ def deserialize(
expected_backend_domain: QubesVM,
expected_devclass: Optional[str] = None,
) -> 'DeviceInfo':
"""
Recovers a serialized object, see: :py:meth:`serialize`.
"""
ident, _, rest = serialization.partition(b' ')
ident = ident.decode('ascii', errors='ignore')
device = UnknownDevice(
Expand All @@ -587,6 +630,7 @@ def deserialize(

try:
device = cls._deserialize(rest, device)
# pylint: disable=broad-exception-caught
except Exception as exc:
print(exc, file=sys.stderr)

Expand All @@ -598,6 +642,9 @@ def _deserialize(
untrusted_serialization: bytes,
expected_device: Device
) -> 'DeviceInfo':
"""
Actually deserializes the object.
"""
properties, options = cls.unpack_properties(untrusted_serialization)
properties.update(options)

Expand Down Expand Up @@ -656,13 +703,19 @@ def self_identity(self) -> str:


def serialize_str(value: str):
"""
Serialize python string to ensure consistency.
"""
result = repr(str(value))
if result.startswith('"'):
result = "'" + result[1:-1] + "'"
return result


def deserialize_str(value: str):
"""
Deserialize python string to ensure consistency.
"""
return value.replace("\\\'", "'")


Expand Down Expand Up @@ -694,7 +747,7 @@ def sanitize_str(

class UnknownDevice(DeviceInfo):
# pylint: disable=too-few-public-methods
"""Unknown device - for example exposed by domain not running currently"""
"""Unknown device - for example, exposed by domain not running currently"""

def __init__(self, backend_domain, ident, *, devclass, **kwargs):
super().__init__(backend_domain, ident, devclass=devclass, **kwargs)
Expand Down Expand Up @@ -822,6 +875,9 @@ def options(self, options: Optional[Dict[str, Any]]):
self.__options = options or {}

def serialize(self) -> bytes:
"""
Serialize an object to be transmitted via Qubes API.
"""
properties = b' '.join(
f'{prop}={serialize_str(value)}'.encode('ascii')
for prop, value in (
Expand Down Expand Up @@ -857,6 +913,9 @@ def deserialize(
serialization: bytes,
expected_device: Device,
) -> 'DeviceAssignment':
"""
Recovers a serialized object, see: :py:meth:`serialize`.
"""
try:
result = cls._deserialize(serialization, expected_device)
except Exception as exc:
Expand All @@ -869,6 +928,9 @@ def _deserialize(
untrusted_serialization: bytes,
expected_device: Device,
) -> 'DeviceAssignment':
"""
Actually deserializes the object.
"""
properties, options = cls.unpack_properties(untrusted_serialization)
properties['options'] = options

Expand Down

0 comments on commit 54b9f9b

Please sign in to comment.