Skip to content

Commit

Permalink
q-dev: full_identity of device
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed Jun 12, 2024
1 parent 7febeff commit 2b033d4
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 18 deletions.
2 changes: 1 addition & 1 deletion qubes/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import asyncio
import functools
import itertools
import os
import string
import subprocess
Expand All @@ -38,6 +37,7 @@
import qubes.backup
import qubes.config
import qubes.devices
import qubes.ext
import qubes.firewall
import qubes.storage
import qubes.utils
Expand Down
66 changes: 51 additions & 15 deletions qubes/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,27 @@ def _deserialize(
def frontend_domain(self):
return self.data.get("frontend_domain", None)

@property
def full_identity(self) -> str:
"""
Get user understandable identification of device not related to ports.
In addition to the description returns presented interfaces.
It is used to auto-attach usb devices, so an attacking device needs to
mimic not only a name, but also interfaces of trusted device (and have
to be plugged to the same port). For a common user it is all the data
she uses to recognize the device.
"""
allowed_chars = string.digits + string.ascii_letters + '-_.'
description = ""
for char in self.description:
if char in allowed_chars:
description += char
else:
description += "_"
interfaces = ''.join(repr(ifc) for ifc in self.interfaces)
return f'{description}:{interfaces}'


def serialize_str(value: str):
return repr(str(value))
Expand All @@ -624,6 +645,7 @@ def sanitize_str(
"""
if replace_char is None:
if any(x not in allowed_chars for x in untrusted_value):
print(untrusted_value, file=sys.stderr) # TODO
raise qubes.api.ProtocolError(error_message)
return untrusted_value
result = ""
Expand Down Expand Up @@ -759,15 +781,19 @@ def serialize(self) -> bytes:
properties += b' ' + backend_domain_prop

if self.frontend_domain is not None:
front_name = serialize_str(self.frontend_domain.name)
if isinstance(self.frontend_domain, str):
front_name = serialize_str(self.frontend_domain) # TODO
else:
front_name = serialize_str(self.frontend_domain.name)
frontend_domain_prop = (
b"frontend_domain=" + front_name.encode('ascii'))
properties += b' ' + frontend_domain_prop

properties += b' ' + b' '.join(
f'_{prop}={serialize_str(value)}'.encode('ascii')
for prop, value in self.options.items()
)
if self.options:
properties += b' ' + b' '.join(
f'_{prop}={serialize_str(value)}'.encode('ascii')
for prop, value in self.options.items()
)

return properties

Expand Down Expand Up @@ -800,7 +826,8 @@ def _deserialize(
allowed_chars_key = string.digits + string.ascii_letters + '-_.'
allowed_chars_value = allowed_chars_key + ',+:'

untrusted_decoded = untrusted_serialization.decode('ascii', 'strict')
untrusted_decoded = untrusted_serialization.decode(
'ascii', 'strict').strip()
keys = []
values = []
untrusted_key, _, untrusted_rest = untrusted_decoded.partition("='")
Expand Down Expand Up @@ -837,7 +864,7 @@ def _deserialize(

if properties['backend_domain'] != expected_backend_domain.name:
raise UnexpectedDeviceProperty(
f"Got device exposed by {properties['backend_domain']}"
f"Got device exposed by {properties['backend_domain']} "
f"when expected devices from {expected_backend_domain.name}.")
properties['backend_domain'] = expected_backend_domain

Expand Down Expand Up @@ -963,31 +990,40 @@ async def attach(self, device_assignment: DeviceAssignment):
'device-attach:' + self._bus,
device=device, options=device_assignment.options)

async def assign(self, device_assignment: DeviceAssignment):
async def assign(self, assignment: DeviceAssignment):
"""
Assign device to domain.
"""
if device_assignment.devclass is None:
device_assignment.devclass = self._bus
elif device_assignment.devclass != self._bus:
if assignment.devclass is None:
assignment.devclass = self._bus
elif assignment.devclass != self._bus:
raise ValueError(
'Trying to assign DeviceAssignment of a different device class')

device = device_assignment.device
device = assignment.device
if device in self.get_assigned_devices():
raise DeviceAlreadyAssigned(
'device {!s} of class {} already assigned to {!s}'.format(
device, self._bus, self._vm))

if (assignment.devclass not in ('pci', 'testclass')
and assignment.required):
raise qubes.exc.QubesValueError(
"Only pci devices can be set as required.")
if (assignment.devclass not in ('pci', 'testclass', 'mic', 'usb')
and assignment.attach_automatically):
raise qubes.exc.QubesValueError(
"Only pci, mic and usb devices can be automatically attached.")

await self._vm.fire_event_async(
'device-pre-assign:' + self._bus,
pre_event=True, device=device, options=device_assignment.options)
pre_event=True, device=device, options=assignment.options)

self._set.add(device_assignment)
self._set.add(assignment)

await self._vm.fire_event_async(
'device-assign:' + self._bus,
device=device, options=device_assignment.options)
device=device, options=assignment.options)

def load_assignment(self, device_assignment: DeviceAssignment):
"""Load DeviceAssignment retrieved from qubes.xml
Expand Down
1 change: 0 additions & 1 deletion qubes/vm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ def __xml__(self):
node.set('backend-domain', device.backend_domain.name)
node.set('id', device.ident)
node.set('required', 'yes' if device.required else 'no')
# TODO: serial
for key, val in device.options.items():
option_node = lxml.etree.Element('option')
option_node.set('name', key)
Expand Down
2 changes: 1 addition & 1 deletion qubes/vm/qubesvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@ async def start(self, start_guid=True, notify_function=None,
qmemman_client = None
try:
for devclass in self.devices:
for dev in self.devices[devclass].get_assigned_devices():
for dev in self.devices[devclass].get_assigned_devices(): # TODO: fix
if isinstance(dev, qubes.devices.UnknownDevice):
raise qubes.exc.QubesException(
'{} device {} not available'.format(
Expand Down

0 comments on commit 2b033d4

Please sign in to comment.