Skip to content

Commit

Permalink
q-dev: fix serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed Jun 12, 2024
1 parent 4a13df1 commit fe36156
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 57 deletions.
117 changes: 61 additions & 56 deletions qubes/device_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ class Device:
the backend domain.
devclass (str, optional): The class of the device (e.g., 'usb', 'pci').
"""
ALLOWED_CHARS_KEY = (
string.digits + string.ascii_letters + string.punctuation + ' ')
ALLOWED_CHARS_PARAM = ALLOWED_CHARS_KEY + string.punctuation + ' '
ALLOWED_CHARS_KEY = set(
string.digits + string.ascii_letters
+ r"!#$%&()*+,-./:;<>?@[\]^_{|}~" + ' ')
ALLOWED_CHARS_PARAM = ALLOWED_CHARS_KEY.union(set(string.punctuation + ' '))

def __init__(self, backend_domain, ident, devclass=None):
self.__backend_domain = backend_domain
Expand Down Expand Up @@ -197,6 +198,20 @@ def unpack_properties(

return properties, options

@classmethod
def pack_property(cls, serialization: bytes, key: str, value: str):
"""
Add property `key=value` to serialization.
"""
key = sanitize_str(
key, cls.ALLOWED_CHARS_KEY,
error_message='Invalid chars in property name: ')
value = sanitize_str(
serialize_str(value), cls.ALLOWED_CHARS_PARAM,
error_message='Invalid chars in property value: ')
return (serialization + b' '
+ key.encode('ascii') + b'=' + value.encode('ascii'))

@staticmethod
def check_device_properties(
expected_device: 'Device', properties: Dict[str, Any]):
Expand Down Expand Up @@ -344,6 +359,14 @@ def unknown(cls) -> 'DeviceInterface':
def __repr__(self):
return self._interface_encoding

def __hash__(self):
return hash(repr(self))

def __eq__(self, other):
if not isinstance(other, DeviceInterface):
return False
return repr(self) == repr(other)

def __str__(self):
if self.devclass == "block":
return "Block device"
Expand Down Expand Up @@ -398,8 +421,6 @@ def _load_classes(bus: str):

class DeviceInfo(Device):
""" Holds all information about a device """
ALLOWED_CHARS_KEY = Device.ALLOWED_CHARS_KEY + string.punctuation + ' '
ALLOWED_CHARS_PARAM = Device.ALLOWED_CHARS_PARAM + string.punctuation + ' '

def __init__(
self,
Expand Down Expand Up @@ -574,39 +595,31 @@ def serialize(self) -> bytes:
default_attrs = {
'ident', 'devclass', 'vendor', 'product', 'manufacturer', 'name',
'serial', 'self_identity'}
properties = b' '.join(
f'{prop}={serialize_str(value)}'.encode('ascii')
for prop, value in (
(key, getattr(self, key)) for key in default_attrs)
)
properties = b''
for key, value in ((key, getattr(self, key)) for key in default_attrs):
properties = self.pack_property(properties, key, value)
# remove unnecessary space in the beginning
properties = properties[1:]

qname = serialize_str(self.backend_domain.name)
backend_prop = b"backend_domain=" + qname.encode('ascii')
properties += b' ' + backend_prop
properties = self.pack_property(
properties, 'backend_domain', self.backend_domain.name)

if self.attachment:
qname = serialize_str(self.attachment.name)
attachment_prop = b"attachment=" + qname.encode('ascii')
properties += b' ' + attachment_prop
properties = self.pack_property(
properties, 'attachment', self.attachment.name)

interfaces = serialize_str(
properties = self.pack_property(
properties, 'interfaces',
''.join(repr(ifc) for ifc in self.interfaces))
interfaces_prop = b'interfaces=' + interfaces.encode('ascii')
properties += b' ' + interfaces_prop

if self.parent_device is not None:
ident = serialize_str(self.parent_device.ident)
ident_prop = b'parent_ident=' + ident.encode('ascii')
properties += b' ' + ident_prop
devclass = serialize_str(self.parent_device.devclass)
devclass_prop = b'parent_devclass=' + devclass.encode('ascii')
properties += b' ' + devclass_prop

data = b' '.join(
f'_{prop}={serialize_str(value)}'.encode('ascii')
for prop, value in self.data.items())
if data:
properties += b' ' + data
properties = self.pack_property(
properties, 'parent_ident', self.parent_device.ident)
properties = self.pack_property(
properties, 'parent_devclass', self.parent_device.devclass)

for key, value in self.data.items():
properties = self.pack_property(properties, "_" + key, value)

return properties

Expand Down Expand Up @@ -706,22 +719,19 @@ def serialize_str(value: str):
"""
Serialize python string to ensure consistency.
"""
result = repr(str(value))
if result.startswith('"'):
result = "'" + result[1:-1] + "'"
return result
return "'" + str(value).replace("'", r"\'") + "'"


def deserialize_str(value: str):
"""
Deserialize python string to ensure consistency.
"""
return value.replace("\\\'", "'")
return value.replace(r"\'", "'")


def sanitize_str(
untrusted_value: str,
allowed_chars: str,
allowed_chars: set,
replace_char: str = None,
error_message: str = ""
) -> str:
Expand All @@ -732,7 +742,7 @@ def sanitize_str(
characters with the string.
"""
if replace_char is None:
not_allowed_chars = set(untrusted_value) - set(allowed_chars)
not_allowed_chars = set(untrusted_value) - allowed_chars
if not_allowed_chars:
raise ProtocolError(error_message + repr(not_allowed_chars))
return untrusted_value
Expand Down Expand Up @@ -878,32 +888,27 @@ def serialize(self) -> bytes:
"""
Serialize an object to be transmitted via Qubes API.
"""
properties = b' '.join(
f'{prop}={serialize_str(value)}'.encode('ascii')
for prop, value in (
properties = b''
for key, value in (
('required', 'yes' if self.required else 'no'),
('attach_automatically',
'yes' if self.attach_automatically else 'no'),
('ident', self.ident),
('devclass', self.devclass)
)
)
):
properties = self.pack_property(properties, key, value)
# remove unnecessary space in the beginning
properties = properties[1:]

back_name = serialize_str(self.backend_domain.name)
backend_domain_prop = b"backend_domain=" + back_name.encode('ascii')
properties += b' ' + backend_domain_prop
properties = self.pack_property(
properties, 'backend_domain', self.backend_domain.name)

if self.frontend_domain is not None:
front_name = serialize_str(self.frontend_domain.name)
frontend_domain_prop = (
b"frontend_domain=" + front_name.encode('ascii'))
properties += b' ' + frontend_domain_prop

if self.options:
properties += b' ' + b' '.join(
f'_{prop}={serialize_str(value)}'.encode('ascii')
for prop, value in self.options.items()
)
properties = self.pack_property(
properties, 'frontend_domain', self.frontend_domain.name)

for key, value in self.options.items():
properties = self.pack_property(properties, "_" + key, value)

return properties

Expand Down
Loading

0 comments on commit fe36156

Please sign in to comment.