Skip to content

Commit

Permalink
device interface denied list: reformat code
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrbartman committed Jan 11, 2025
1 parent a8b060e commit ffbe0c5
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 60 deletions.
16 changes: 9 additions & 7 deletions qubes/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1632,8 +1632,8 @@ async def vm_device_set_required(self, endpoint, untrusted_payload):
self.app.save()

@qubes.api.method(
"admin.vm.device.denied.List",
no_payload=True, scope="local", read=True)
"admin.vm.device.denied.List", no_payload=True, scope="local", read=True
)
async def vm_device_denied_list(self):
"""
List all denied device interfaces for the VM.
Expand All @@ -1647,8 +1647,7 @@ async def vm_device_denied_list(self):
denied = self.dest.devices_denied
return "\n".join(map(repr, DeviceInterface.from_str_bulk(denied)))

@qubes.api.method(
"admin.vm.device.denied.Add", scope="local", write=True)
@qubes.api.method("admin.vm.device.denied.Add", scope="local", write=True)
async def vm_device_denied_add(self, untrusted_payload):
"""
Add device interface(s) to the denied list for the VM.
Expand All @@ -1661,7 +1660,8 @@ async def vm_device_denied_add(self, untrusted_payload):

if len(set(to_add)) != len(to_add):
raise qubes.exc.QubesValueError(
"Duplicated device interfaces in payload.")
"Duplicated device interfaces in payload."
)

self.fire_event_for_permission(interfaces=to_add)

Expand All @@ -1675,7 +1675,8 @@ async def vm_device_denied_add(self, untrusted_payload):
self.app.save()

@qubes.api.method(
"admin.vm.device.denied.Remove", scope="local", write=True)
"admin.vm.device.denied.Remove", scope="local", write=True
)
async def vm_device_denied_remove(self, untrusted_payload):
"""
Remove device interface(s) from the denied list for the VM.
Expand All @@ -1694,7 +1695,8 @@ async def vm_device_denied_remove(self, untrusted_payload):

if len(set(to_remove)) != len(to_remove):
raise qubes.exc.QubesValueError(
"Duplicated device interfaces in payload.")
"Duplicated device interfaces in payload."
)

# may contain missing values
self.fire_event_for_permission(interfaces=to_remove)
Expand Down
2 changes: 1 addition & 1 deletion qubes/device_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ def from_str_bulk(interfaces: Optional[str]) -> List["DeviceInterface"]:
f"(is {len(interfaces)}, expected multiple of 7)",
)
return [
DeviceInterface(interfaces[i: i + 7])
DeviceInterface(interfaces[i : i + 7])
for i in range(0, len(interfaces), 7)
]

Expand Down
106 changes: 60 additions & 46 deletions qubes/tests/api_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3913,105 +3913,119 @@ def test_655_vm_device_set_mode_invalid_value(self):
self.assertFalse(self.app.save.called)

def test_660_vm_device_denied_list_empty(self):
actual = self.call_mgmt_func(b"admin.vm.device.denied.List",
b"test-vm1")
actual = self.call_mgmt_func(
b"admin.vm.device.denied.List", b"test-vm1"
)
self.assertEqual(actual, "")
self.assertFalse(self.app.save.called)

def test_661_vm_device_denied_list(self):
self.vm.devices_denied = "b******p012345pff**2*"
actual = self.call_mgmt_func(b"admin.vm.device.denied.List",
b"test-vm1")
actual = self.call_mgmt_func(
b"admin.vm.device.denied.List", b"test-vm1"
)
self.assertEqual(actual, "b******\np012345\npff**2*")
self.assertFalse(self.app.save.called)

def test_662_vm_device_denied_add(self):
self.vm.devices_denied = "b******p012345p53**2*"
self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1",
b"", b"uabcdef")
self.assertEqual(self.vm.devices_denied,
"b******p012345p53**2*uabcdef")
self.call_mgmt_func(
b"admin.vm.device.denied.Add", b"test-vm1", b"", b"uabcdef"
)
self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*uabcdef")
self.assertTrue(self.app.save.called)

def test_663_vm_device_denied_add_multiple(self):
self.vm.devices_denied = "b******p012345p53**2*"
self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1",
b"", b"uabcdefm******")
self.assertEqual(self.vm.devices_denied,
"b******m******p012345p53**2*uabcdef")
self.call_mgmt_func(
b"admin.vm.device.denied.Add", b"test-vm1", b"", b"uabcdefm******"
)
self.assertEqual(
self.vm.devices_denied, "b******m******p012345p53**2*uabcdef"
)
self.assertTrue(self.app.save.called)

def test_664_vm_device_denied_add_repeated(self):
self.vm.devices_denied = "b******p012345p53**2*"
with self.assertRaises(qubes.exc.QubesValueError):
self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1",
b"", b"u112233u112233")
self.assertEqual(self.vm.devices_denied,
"b******p012345p53**2*")
self.call_mgmt_func(
b"admin.vm.device.denied.Add",
b"test-vm1",
b"",
b"u112233u112233",
)
self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*")
self.assertFalse(self.app.save.called)

def test_665_vm_device_denied_add_present(self):
self.vm.devices_denied = "b******p012345p53**2*"
self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1",
b"", b"b******")
self.assertEqual(self.vm.devices_denied,
"b******p012345p53**2*")
self.call_mgmt_func(
b"admin.vm.device.denied.Add", b"test-vm1", b"", b"b******"
)
self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*")
self.assertFalse(self.app.save.called)

def test_666_vm_device_denied_add_nothing(self):
self.vm.devices_denied = "b******p012345p53**2*"
self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1",
b"", b"")
self.assertEqual(self.vm.devices_denied,
"b******p012345p53**2*")
self.call_mgmt_func(
b"admin.vm.device.denied.Add", b"test-vm1", b"", b""
)
self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*")
self.assertFalse(self.app.save.called)

def test_670_vm_device_denied_remove(self):
self.vm.devices_denied = "b******p012345p53**2*"
self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1",
b"", b"b******")
self.assertEqual(self.vm.devices_denied,
"p012345p53**2*")
self.call_mgmt_func(
b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"b******"
)
self.assertEqual(self.vm.devices_denied, "p012345p53**2*")
self.assertTrue(self.app.save.called)

def test_671_vm_device_denied_remove_repeated(self):
self.vm.devices_denied = "b******p012345p53**2*"
with self.assertRaises(qubes.exc.QubesValueError):
self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1",
b"", b"b******b******")
self.assertEqual(self.vm.devices_denied,
"b******p012345p53**2*")
self.call_mgmt_func(
b"admin.vm.device.denied.Remove",
b"test-vm1",
b"",
b"b******b******",
)
self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*")
self.assertFalse(self.app.save.called)

def test_672_vm_device_denied_remove_all(self):
self.vm.devices_denied = "b******p012345p53**2*"
self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1",
b"", b"all")
self.call_mgmt_func(
b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"all"
)
self.assertEqual(self.vm.devices_denied, "")
self.assertTrue(self.app.save.called)

def test_673_vm_device_denied_remove_missing(self):
self.vm.devices_denied = "b******p012345p53**2*"
self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1",
b"", b"m******")
self.assertEqual(self.vm.devices_denied,
"b******p012345p53**2*")
self.call_mgmt_func(
b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"m******"
)
self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*")
self.assertFalse(self.app.save.called)

def test_673_vm_device_denied_remove_missing_and_present(self):
self.vm.devices_denied = "b******p012345p53**2*"
self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1",
b"", b"m******p53**2*")
self.assertEqual(self.vm.devices_denied,
"b******p012345")
self.call_mgmt_func(
b"admin.vm.device.denied.Remove",
b"test-vm1",
b"",
b"m******p53**2*",
)
self.assertEqual(self.vm.devices_denied, "b******p012345")
self.assertTrue(self.app.save.called)

def test_674_vm_device_denied_remove_nothing(self):
self.vm.devices_denied = "b******p012345p53**2*"
self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1",
b"", b"")
self.assertEqual(self.vm.devices_denied,
"b******p012345p53**2*")
self.call_mgmt_func(
b"admin.vm.device.denied.Remove", b"test-vm1", b"", b""
)
self.assertEqual(self.vm.devices_denied, "b******p012345p53**2*")
self.assertFalse(self.app.save.called)

def test_700_pool_set_revisions_to_keep(self):
Expand Down
18 changes: 12 additions & 6 deletions qubes/vm/qubesvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,15 @@ def _setter_default_user(self, prop, value):


def _setter_denied_list(self, prop, value):
""" Helper for setting denied list """
"""Helper for setting denied list"""
value = str(value)
if len(value) == 0:
return value

# remove duplicates
value = "".join(
sorted(map(repr, set(DeviceInterface.from_str_bulk(value)))))
sorted(map(repr, set(DeviceInterface.from_str_bulk(value))))
)

# The requirements for the interface encoding are more relaxed
# in the DeviceInterface class compared to the denied list.
Expand All @@ -151,10 +152,13 @@ def _setter_denied_list(self, prop, value):
pattern = r"^([bump\*][0123456789abcdef\*]{6})*$"
if not re.fullmatch(pattern, value):
raise qubes.exc.QubesPropertyValueError(

Check warning on line 154 in qubes/vm/qubesvm.py

View check run for this annotation

Codecov / codecov/patch

qubes/vm/qubesvm.py#L154

Added line #L154 was not covered by tests
self, prop, value,
self,
prop,
value,
"Interface code list should be in the form chhhhhhchhhhhh...,"
'where c is one of "b", "u", "m", "p", "*" '
'and h is a hexdigit or "*".')
'and h is a hexdigit or "*".',
)
return value


Expand Down Expand Up @@ -856,10 +860,12 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
)

devices_denied = qubes.property(
"devices_denied", default="",
"devices_denied",
default="",
type=str,
setter=_setter_denied_list,
doc="List of device interface codes that are denied for this VM.")
doc="List of device interface codes that are denied for this VM.",
)

# for changes in keyboard_layout, see also the same property in AdminVM
keyboard_layout = qubes.property(
Expand Down

0 comments on commit ffbe0c5

Please sign in to comment.