Skip to content

Commit

Permalink
Support using UUIDs instead of VM names
Browse files Browse the repository at this point in the history
This supports using UUIDs (instead of VM names) in the Admin API.  It
also provides the UUIDs to qrexec-policy-daemon, which will use them to
support the @uuid: keyword.

All admin APIs now support UUIDs as source and destination.  Specific
APIs changed include:

- admin.vm.CreateDisposable: if the "uuid" argument is used, the VM UUID
  is returned instead of the name.
- admin.vm.List: the UUID is included in the returned list.
- internal.GetSystemInfo: the UUID is returned in the "uuid" key of each
  VM's JSON object.

Fixes: QubesOS/qubes-issues#8862
  • Loading branch information
DemiMarie committed Jan 14, 2024
1 parent 49c9fc9 commit 21c1bf2
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 28 deletions.
54 changes: 38 additions & 16 deletions qubes/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import functools
import io
import os
import re
import shutil
import socket
import struct
import traceback
from typing import Union
import uuid

import qubes.exc

Expand Down Expand Up @@ -94,6 +97,33 @@ def apply_filters(iterable, filters):
iterable = filter(selector, iterable)
return iterable

# pylint: disable=line-too-long
_uuid_regex = re.compile(rb"\A@uuid:[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}\Z")
_qube_name_regex = re.compile(rb"\A[A-Za-z][A-Za-z0-9_.-]*\Z")
def decode_vm(
untrusted_input: bytes, domains: qubes.app.VMCollection
) -> qubes.vm.qubesvm.QubesVM:
lookup: Union[uuid.UUID, str]
vm = untrusted_input.decode("ascii", "strict")
if untrusted_input.startswith(b"@uuid:"):
if (len(untrusted_input) != 42 or
not _uuid_regex.match(untrusted_input)):
raise qubes.exc.QubesVMNotFoundError(f"Invalid VM UUID {vm[6:]}")

Check warning on line 111 in qubes/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

qubes/api/__init__.py#L111

Added line #L111 was not covered by tests
lookup = uuid.UUID(vm[6:])
else:
if (len(untrusted_input) > 31 or
not _qube_name_regex.match(untrusted_input)):
raise qubes.exc.QubesVMNotFoundError(f"Invalid VM name {vm}")

Check warning on line 116 in qubes/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

qubes/api/__init__.py#L116

Added line #L116 was not covered by tests
lookup = vm
try:
return domains[lookup]
except KeyError:

Check warning on line 120 in qubes/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

qubes/api/__init__.py#L120

Added line #L120 was not covered by tests
# normally this should filtered out by qrexec policy, but there are
# two cases it might not be:
# 1. The call comes from dom0, which bypasses qrexec policy
# 2. Domain was removed between checking the policy and here
# we inform the client accordingly
raise qubes.exc.QubesVMNotFoundError(vm)

Check warning on line 126 in qubes/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

qubes/api/__init__.py#L126

Added line #L126 was not covered by tests

class AbstractQubesAPI:
'''Common code for Qubes Management Protocol handling
Expand All @@ -114,25 +144,17 @@ class AbstractQubesAPI:
#: the preferred socket location (to be overridden in child's class)
SOCKNAME = None

def __init__(self, app, src, method_name, dest, arg, send_event=None):
app: qubes.Qubes
src: qubes.vm.qubesvm.QubesVM
def __init__(self, app: qubes.Qubes, src: bytes, method_name, dest: bytes, arg, send_event=None):
#: :py:class:`qubes.Qubes` object
self.app = app

try:
vm = src.decode('ascii')
#: source qube
self.src = self.app.domains[vm]

vm = dest.decode('ascii')
#: destination qube
self.dest = self.app.domains[vm]
except KeyError:
# normally this should filtered out by qrexec policy, but there are
# two cases it might not be:
# 1. The call comes from dom0, which bypasses qrexec policy
# 2. Domain was removed between checking the policy and here
# we inform the client accordingly
raise qubes.exc.QubesVMNotFoundError(vm)
#: source qube
self.src = decode_vm(src, app.domains)

#: destination qube
self.dest = decode_vm(dest, app.domains)

#: argument
self.arg = arg.decode('ascii')
Expand Down
15 changes: 10 additions & 5 deletions qubes/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ async def vm_list(self):
else:
domains = self.fire_event_for_filter([self.dest])

return ''.join('{} class={} state={}\n'.format(
return ''.join('{} class={} state={} uuid={}\n'.format(
vm.name,
vm.__class__.__name__,
vm.get_power_state())
vm.get_power_state(),
vm.uuid)
for vm in sorted(domains))

@qubes.api.method('admin.vm.property.List', no_payload=True,
Expand Down Expand Up @@ -1134,10 +1135,14 @@ async def _vm_create(self, vm_type, allow_pool=False,
raise
self.app.save()

@qubes.api.method('admin.vm.CreateDisposable', no_payload=True,
@qubes.api.method('admin.vm.CreateDisposable',
scope='global', write=True)
async def create_disposable(self):
async def create_disposable(self, untrusted_payload):
self.enforce(not self.arg)
if untrusted_payload not in (b"", b"uuid"):
raise qubes.exc.QubesValueError(

Check warning on line 1143 in qubes/api/admin.py

View check run for this annotation

Codecov / codecov/patch

qubes/api/admin.py#L1143

Added line #L1143 was not covered by tests
"Invalid payload for admin.vm.CreateDisposable: "
"expected the empty string or 'uuid'")

if self.dest.name == 'dom0':
dispvm_template = self.src.default_dispvm
Expand All @@ -1150,7 +1155,7 @@ async def create_disposable(self):
# TODO: move this to extension (in race-free fashion, better than here)
dispvm.tags.add('disp-created-by-' + str(self.src))

return dispvm.name
return str(dispvm.uuid) if untrusted_payload else dispvm.name

@qubes.api.method('admin.vm.Remove', no_payload=True,
scope='global', write=True)
Expand Down
1 change: 1 addition & 0 deletions qubes/api/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def get_system_info(app):
'guivm': (domain.guivm.name if getattr(domain, 'guivm', None)
else None),
'power_state': domain.get_power_state(),
'uuid': str(domain.uuid),
} for domain in app.domains
}}
return system_info
Expand Down
19 changes: 12 additions & 7 deletions qubes/tests/api_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def setUp(self):
app.save = unittest.mock.Mock()
self.vm = app.add_new_vm('AppVM', label='red', name='test-vm1',
template='test-template')
self.uuid = b"@uuid:" + str(self.vm.uuid).encode("ascii", "strict")
self.app = app
libvirt_attrs = {
'libvirt_conn.lookupByUUID.return_value.isActive.return_value':
Expand Down Expand Up @@ -123,19 +124,23 @@ def call_internal_mgmt_func(self, method, dest, arg=b'', payload=b''):
mgmt_obj.execute(untrusted_payload=payload))
return response


class TC_00_VMs(AdminAPITestCase):
def test_000_vm_list(self):
value = self.call_mgmt_func(b'admin.vm.List', b'dom0')
self.assertEqual(value,
'dom0 class=AdminVM state=Running\n'
'test-template class=TemplateVM state=Halted\n'
'test-vm1 class=AppVM state=Halted\n')
'dom0 class=AdminVM state=Running uuid=00000000-0000-0000-0000-000000000000\n'
f'test-template class=TemplateVM state=Halted uuid={self.template.uuid}\n'
f'test-vm1 class=AppVM state=Halted uuid={self.vm.uuid}\n')

def test_001_vm_list_single(self):
value = self.call_mgmt_func(b'admin.vm.List', b'test-vm1')
self.assertEqual(value,
'test-vm1 class=AppVM state=Halted\n')
f"test-vm1 class=AppVM state=Halted uuid={self.vm.uuid}\n")

def test_001_vm_list_single_uuid(self):
value = self.call_mgmt_func(b'admin.vm.List', self.uuid)
self.assertEqual(value,
f"test-vm1 class=AppVM state=Halted uuid={self.vm.uuid}\n")

def test_002_vm_list_filter(self):
with tempfile.TemporaryDirectory() as tmpdir:
Expand All @@ -152,8 +157,8 @@ def test_002_vm_list_filter(self):
value = loop.run_until_complete(
mgmt_obj.execute(untrusted_payload=b''))
self.assertEqual(value,
'dom0 class=AdminVM state=Running\n'
'test-vm1 class=AppVM state=Halted\n')
'dom0 class=AdminVM state=Running uuid=00000000-0000-0000-0000-000000000000\n'
f"test-vm1 class=AppVM state=Halted uuid={self.vm.uuid}\n")

def test_010_vm_property_list(self):
# this test is kind of stupid, but at least check if appropriate
Expand Down
5 changes: 5 additions & 0 deletions qubes/tests/api_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import qubes.vm.adminvm
from unittest import mock
import json
import uuid

def mock_coro(f):
async def coro_f(*args, **kwargs):
Expand Down Expand Up @@ -150,6 +151,7 @@ def test_010_get_system_info(self):
self.dom0.template_for_dispvms = False
self.dom0.label.icon = 'icon-dom0'
self.dom0.get_power_state.return_value = 'Running'
self.dom0.uuid = uuid.UUID("00000000-0000-0000-0000-000000000000")
del self.dom0.guivm

vm = mock.NonCallableMock(spec=qubes.vm.qubesvm.QubesVM)
Expand All @@ -160,6 +162,7 @@ def test_010_get_system_info(self):
vm.label.icon = 'icon-vm'
vm.guivm = vm
vm.get_power_state.return_value = 'Halted'
vm.uuid = uuid.uuid4()
self.domains['vm'] = vm

ret = json.loads(self.call_mgmt_func(b'internal.GetSystemInfo'))
Expand All @@ -173,6 +176,7 @@ def test_010_get_system_info(self):
'icon': 'icon-dom0',
'guivm': None,
'power_state': 'Running',
'uuid': "00000000-0000-0000-0000-000000000000",
},
'vm': {
'tags': ['tag3', 'tag4'],
Expand All @@ -182,6 +186,7 @@ def test_010_get_system_info(self):
'icon': 'icon-vm',
'guivm': 'vm',
'power_state': 'Halted',
"uuid": str(vm.uuid)
}
}
})

0 comments on commit 21c1bf2

Please sign in to comment.