From a8b060e8fc9ec64726c8fbcbe5f0a2e4d20f9e5f Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 6 Jan 2025 13:02:24 +0100 Subject: [PATCH] device interface denied list: make add/remove idempotent --- qubes/api/admin.py | 33 ++++++++++++++++--------------- qubes/tests/api_admin.py | 42 +++++++++++++++++++++++++++------------- qubes/vm/qubesvm.py | 15 ++++++++------ 3 files changed, 55 insertions(+), 35 deletions(-) diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 00e3139d7..6f4d22bed 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -1659,16 +1659,20 @@ async def vm_device_denied_add(self, untrusted_payload): payload = untrusted_payload.decode("ascii", errors="strict") to_add = DeviceInterface.from_str_bulk(payload) - if not to_add: - return + if len(set(to_add)) != len(to_add): + raise qubes.exc.QubesValueError( + "Duplicated device interfaces in payload.") - # may contain duplicates self.fire_event_for_permission(interfaces=to_add) to_add_enc = "".join(map(repr, to_add)) + prev = self.dest.devices_denied + # "auto" ignoring of duplications self.dest.devices_denied = self.dest.devices_denied + to_add_enc - self.app.save() + # do not save if nothing changed + if prev != self.dest.devices_denied: + self.app.save() @qubes.api.method( "admin.vm.device.denied.Remove", scope="local", write=True) @@ -1688,22 +1692,19 @@ async def vm_device_denied_remove(self, untrusted_payload): else: to_remove = denied.copy() - if not to_remove: - return + if len(set(to_remove)) != len(to_remove): + raise qubes.exc.QubesValueError( + "Duplicated device interfaces in payload.") # may contain missing values self.fire_event_for_permission(interfaces=to_remove) - for interface in to_remove: - try: - denied.remove(interface) - except ValueError: - raise qubes.exc.QubesValueError( - f"Interface {interface} are not listed " - f"in the devices denied list of {self.dest} vm.") - new_denied = "".join(map(repr, denied)) - self.dest.devices_denied = new_denied - self.app.save() + # ignore missing values + new_denied = "".join(repr(i) for i in denied if i not in to_remove) + + if new_denied != self.dest.devices_denied: + self.dest.devices_denied = new_denied + self.app.save() @qubes.api.method( "admin.vm.firewall.Get", no_payload=True, scope="local", read=True diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index c1c7ca9fe..ad9bc8b0d 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -3938,7 +3938,7 @@ def test_663_vm_device_denied_add_multiple(self): self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", b"", b"uabcdefm******") self.assertEqual(self.vm.devices_denied, - "b******p012345p53**2*uabcdefm******") + "b******m******p012345p53**2*uabcdef") self.assertTrue(self.app.save.called) def test_664_vm_device_denied_add_repeated(self): @@ -3946,20 +3946,24 @@ def test_664_vm_device_denied_add_repeated(self): with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", b"", b"u112233u112233") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_665_vm_device_denied_add_present(self): self.vm.devices_denied = "b******p012345p53**2*" - with self.assertRaises(qubes.exc.QubesValueError): - self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", - b"", b"b******") + self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", + b"", b"b******") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_666_vm_device_denied_add_nothing(self): self.vm.devices_denied = "b******p012345p53**2*" - with self.assertRaises(qubes.exc.QubesValueError): - self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", - b"", b"b******") + self.call_mgmt_func(b"admin.vm.device.denied.Add", b"test-vm1", + b"", b"") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_670_vm_device_denied_remove(self): @@ -3975,6 +3979,8 @@ def test_671_vm_device_denied_remove_repeated(self): with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", b"", b"b******b******") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_672_vm_device_denied_remove_all(self): @@ -3986,16 +3992,26 @@ def test_672_vm_device_denied_remove_all(self): def test_673_vm_device_denied_remove_missing(self): self.vm.devices_denied = "b******p012345p53**2*" - with self.assertRaises(qubes.exc.QubesValueError): - self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", - b"", b"m******") + self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", + b"", b"m******") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) + def test_673_vm_device_denied_remove_missing_and_present(self): + self.vm.devices_denied = "b******p012345p53**2*" + self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", + b"", b"m******p53**2*") + self.assertEqual(self.vm.devices_denied, + "b******p012345") + self.assertTrue(self.app.save.called) + def test_674_vm_device_denied_remove_nothing(self): self.vm.devices_denied = "b******p012345p53**2*" - with self.assertRaises(qubes.exc.QubesValueError): - self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", - b"", b"") + self.call_mgmt_func(b"admin.vm.device.denied.Remove", b"test-vm1", + b"", b"") + self.assertEqual(self.vm.devices_denied, + "b******p012345p53**2*") self.assertFalse(self.app.save.called) def test_700_pool_set_revisions_to_keep(self): diff --git a/qubes/vm/qubesvm.py b/qubes/vm/qubesvm.py index dd933eade..aa896f328 100644 --- a/qubes/vm/qubesvm.py +++ b/qubes/vm/qubesvm.py @@ -139,12 +139,15 @@ def _setter_denied_list(self, prop, value): value = str(value) if len(value) == 0: return value - interfaces = DeviceInterface.from_str_bulk(value) - if len(interfaces) != len(set(interfaces)): - raise qubes.exc.QubesPropertyValueError( - self, prop, value, - "Interface code list contains duplicates.") - # block, usb, mic, pci, * + + # remove duplicates + value = "".join( + sorted(map(repr, set(DeviceInterface.from_str_bulk(value))))) + + # The requirements for the interface encoding are more relaxed + # in the DeviceInterface class compared to the denied list. + # allowed classes: block, usb, mic, pci, * + # allowed interface encoding: hex digits + * pattern = r"^([bump\*][0123456789abcdef\*]{6})*$" if not re.fullmatch(pattern, value): raise qubes.exc.QubesPropertyValueError(