Skip to content

Commit

Permalink
q-dev: add block and pci tests to improve testcov + typos
Browse files Browse the repository at this point in the history
update pci description format
  • Loading branch information
piotrbartman committed Jun 12, 2024
1 parent ba66cba commit 93f60d5
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 67 deletions.
6 changes: 3 additions & 3 deletions qubes/ext/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ def interfaces(self) -> List[qubes.device_protocol.DeviceInterface]:
@property
def parent_device(self) -> Optional[qubes.device_protocol.Device]:
"""
The parent device if any.
The parent device, if any.
If the device is part of another device (e.g. it's a single
If the device is part of another device (e.g., it's a single
partition of an usb stick), the parent device id should be here.
"""
if self._parent is None:
Expand Down Expand Up @@ -602,7 +602,7 @@ def on_device_pre_detached_block(self, vm, event, device):
if not vm.is_running():
return

# need to enumerate attached device to find frontend_dev option (at
# need to enumerate attached devices to find frontend_dev option (at
# least)
for attached_device, options in self.on_device_list_attached(vm, event):
if attached_device == device:
Expand Down
19 changes: 3 additions & 16 deletions qubes/ext/pci.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def attached_devices(app):


def _device_desc(hostdev_xml):
return '{devclass}: {vendor} {product}'.format(
return '{devclass}: {product} ({vendor})'.format(
devclass=pcidev_class(hostdev_xml),
vendor=hostdev_xml.findtext('capability/vendor'),
product=hostdev_xml.findtext('capability/product'),
Expand Down Expand Up @@ -235,7 +235,7 @@ def interfaces(self) -> List[qubes.device_protocol.DeviceInterface]:
@property
def parent_device(self) -> Optional[qubes.device_protocol.DeviceInfo]:
"""
The parent device if any.
The parent device, if any.
PCI device has no parents.
"""
Expand All @@ -261,7 +261,7 @@ def description(self):
@property
def self_identity(self) -> str:
"""
Get identification of device not related to port.
Get identification of the device not related to port.
"""
allowed_chars = string.digits + string.ascii_letters + '-_.'
if self._vendor_id is None:
Expand Down Expand Up @@ -306,19 +306,6 @@ def _load_desc(self) -> Dict[str, str]:
"//product/@id")[0]
return result

@staticmethod
def _sanitize(
untrusted_device_desc: bytes,
safe_chars: str =
string.ascii_letters + string.digits + string.punctuation + ' '
) -> str:
# b'USB\\x202.0\\x20Camera' -> 'USB 2.0 Camera'
untrusted_device_desc = untrusted_device_desc.decode(
'unicode_escape', errors='ignore')
return ''.join(
c if c in set(safe_chars) else '_' for c in untrusted_device_desc
)

@property
def frontend_domain(self):
# TODO: cache this
Expand Down
74 changes: 63 additions & 11 deletions qubes/tests/devices_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import qubes.tests
import qubes.ext.block
from qubes.device_protocol import DeviceInterface, Device, DeviceInfo

modules_disk = '''
<disk type='block' device='disk'>
Expand Down Expand Up @@ -102,6 +103,13 @@ def list(self, prefix):


class TestApp(object):
class Domains(dict):
def __init__(self):
super().__init__()

def __iter__(self):
return iter(self.values())

def __init__(self):
#: jinja2 environment for libvirt XML templates
self.env = jinja2.Environment(
Expand All @@ -112,7 +120,19 @@ def __init__(self):
]),
undefined=jinja2.StrictUndefined,
autoescape=True)
self.domains = {}
self.domains = TestApp.Domains()


class TestDeviceCollection(object):
def __init__(self, backend_vm, devclass):
self._exposed = []
self.backend_vm = backend_vm
self.devclass = devclass

def __getitem__(self, ident):
for dev in self._exposed:
if dev.ident == ident:
return dev


class TestVM(qubes.tests.TestEmitter):
Expand All @@ -136,13 +156,16 @@ def __init__(
'XMLDesc.return_value': domain_xml
})
self.devices = {
'testclass': qubes.devices.DeviceCollection(self, 'testclass')
'testclass': TestDeviceCollection(self, 'testclass')
}

def __eq__(self, other):
if isinstance(other, TestVM):
return self.name == other.name

def __str__(self):
return self.name


class TC_00_Block(qubes.tests.QubesTestCase):

Expand All @@ -156,7 +179,33 @@ def test_000_device_get(self):
'/qubes-block-devices/sda/desc': b'Test_ (device)',
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'w',
})
'/qubes-block-devices/sda/parent': b'1-1.1:1.0',
}, domain_xml=domain_xml_template.format(""))
parent = DeviceInfo(vm, '1-1.1', devclass='usb')
vm.devices['usb'] = TestDeviceCollection(backend_vm=vm, devclass='usb')
vm.devices['usb']._exposed.append(parent)
vm.is_running = lambda: True

dom0 = TestVM({}, name='dom0',
domain_xml=domain_xml_template.format(""))

disk = '''
<disk type="block" device="disk">
<driver name="phy" />
<source dev="/dev/sda" />
<target dev="xvdi" />
<readonly />
<backenddomain name="test-vm" />
</disk>
'''
front = TestVM({}, domain_xml=domain_xml_template.format(disk), name='front-vm')

vm.app.domains[0] = dom0
vm.app.domains['test-vm'] = vm
vm.app.domains['front-vm'] = front
front.app.domains = vm.app.domains
dom0.app.domains = vm.app.domains

device_info = self.ext.device_get(vm, 'sda')
self.assertIsInstance(device_info, qubes.ext.block.BlockDevice)
self.assertEqual(device_info.backend_domain, vm)
Expand All @@ -167,6 +216,16 @@ def test_000_device_get(self):
self.assertEqual(device_info._serial, 'Test')
self.assertEqual(device_info.size, 1024000)
self.assertEqual(device_info.mode, 'w')
self.assertEqual(device_info.manufacturer,
'sub-device of test-vm:1-1.1')
self.assertEqual(device_info.device_node, '/dev/sda')
self.assertEqual(device_info.interfaces,
[DeviceInterface("b******")])
self.assertEqual(device_info.parent_device,
Device(vm, '1-1.1', devclass='usb'))
self.assertEqual(device_info.attachment, front)
self.assertEqual(device_info.self_identity,
'1-1.1:0000:0000::?******:1.0')
self.assertEqual(
device_info.data.get('test_frontend_domain', None), None)
self.assertEqual(device_info.device_node, '/dev/sda')
Expand Down Expand Up @@ -325,6 +384,7 @@ def test_031_list_attached(self):
options = devices[0][1]
self.assertEqual(dev.backend_domain, vm.app.domains['sys-usb'])
self.assertEqual(dev.ident, 'sda')
self.assertEqual(dev.attachment, None)
self.assertEqual(options['frontend-dev'], 'xvdi')
self.assertEqual(options['read-only'], 'yes')

Expand Down Expand Up @@ -584,14 +644,6 @@ def test_051_detach_not_attached(self):
'/qubes-block-devices/sda/size': b'1024000',
'/qubes-block-devices/sda/mode': b'r',
})
device_xml = (
'<disk type="block" device="disk">\n'
' <driver name="phy" />\n'
' <source dev="/dev/sda" />\n'
' <target dev="xvdi" />\n'
' <readonly />\n\n'
' <backenddomain name="sys-usb" />\n'
'</disk>')
vm = TestVM({}, domain_xml=domain_xml_template.format(''))
vm.app.domains['test-vm'] = vm
vm.app.domains['sys-usb'] = TestVM({}, name='sys-usb')
Expand Down
136 changes: 99 additions & 37 deletions qubes/tests/devices_pci.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.

from unittest import mock

import jinja2

import qubes.tests
import qubes.ext.pci
from qubes.device_protocol import DeviceInterface


class TestVM(object):
Expand All @@ -38,38 +36,10 @@ def __eq__(self, other):
if isinstance(other, TestVM):
return self.name == other.name

class TC_00_Block(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.ext = qubes.ext.pci.PCIDeviceExtension()

def test_000_unsupported_device(self):
vm = TestVM()
vm.app.configure_mock(**{
'vmm.libvirt_conn.listAllDevices.return_value':
[mock.Mock(**{"XMLDesc.return_value": """<device>
<name>pci_0000_00_14_0</name>
<path>/sys/devices/pci0000:00/0000:00:14.0</path>
<parent>computer</parent>
<driver>
<name>pciback</name>
</driver>
<capability type='pci'>
<class>0x0c0330</class>
<domain>0</domain>
<bus>0</bus>
<slot>20</slot>
<function>0</function>
<product id='0x8cb1'>9 Series Chipset Family USB xHCI Controller</product>
<vendor id='0x8086'>Intel Corporation</vendor>
</capability>
</device>
""",
"listCaps.return_value": ["pci"]
}),
mock.Mock(**{"XMLDesc.return_value": """<device>
<name>pci_1000_00_14_0</name>
<path>/sys/devices/pci1000:00/1000:00:14.0</path>
PCI_XML = """<device>
<name>pci_{}_00_14_0</name>
<path>/sys/devices/pci{}:00/{}:00:14.0</path>
<parent>computer</parent>
<driver>
<name>pciback</name>
Expand All @@ -84,11 +54,103 @@ def test_000_unsupported_device(self):
<vendor id='0x8086'>Intel Corporation</vendor>
</capability>
</device>
""",
"listCaps.return_value": ["pci"]
"""


def mock_file_open(filename: str, *_args, **_kwargs):
if filename == "/usr/share/hwdata/pci.ids":
# short version of pci.ids
content = """
#
# List of PCI ID's
#
# (...)
#
0001 SafeNet (wrong ID)
0010 Allied Telesis, Inc (Wrong ID)
# This is a relabelled RTL-8139
8139 AT-2500TX V3 Ethernet
# List of known device classes, subclasses and programming interfaces
# Syntax:
# C class class_name
# subclass subclass_name <-- single tab
# prog-if prog-if_name <-- two tabs
C 00 Unclassified device
\t00 Non-VGA unclassified device
C 01 Mass storage controller
\t01 IDE interface
\t\t00 ISA Compatibility mode-only controller
C 0c Serial bus controller
\t00 FireWire (IEEE 1394)
\t\t00 Generic
\t\t10 OHCI
\t01 ACCESS Bus
\t02 SSA
\t03 USB controller
\t\t00 UHCI
\t\t10 OHCI
\t\t20 EHCI
\t\t30 XHCI
\t\t40 USB4 Host Interface
\t\t80 Unspecified
\t\tfe USB Device
\t04 Fibre Channel
\t05 SMBus
\t06 InfiniBand
\t07 IPMI Interface
\t\t00 SMIC
\t\t01 KCS
\t\t02 BT (Block Transfer)
\t08 SERCOS interface
\t09 CANBUS
\t80 Serial bus controller
"""
elif filename.startswith("/sys/devices/pci"):
content = "0x0c0330"
else:
raise OSError()

file_object = mock.mock_open(read_data=content).return_value
file_object.__iter__.return_value = content
return file_object


class TC_00_Block(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.ext = qubes.ext.pci.PCIDeviceExtension()

@mock.patch('builtins.open', new=mock_file_open)
def test_000_unsupported_device(self):
vm = TestVM()
vm.app.configure_mock(**{
'vmm.libvirt_conn.nodeDeviceLookupByName.return_value':
mock.Mock(**{"XMLDesc.return_value":
PCI_XML.format(*["0000"] * 3)
}),
]
'vmm.libvirt_conn.listAllDevices.return_value':
[mock.Mock(**{"XMLDesc.return_value":
PCI_XML.format(*["0000"] * 3),
"listCaps.return_value": ["pci"]
}),
mock.Mock(**{"XMLDesc.return_value":
PCI_XML.format(*["1000"] * 3),
"listCaps.return_value": ["pci"]
}),
]
})
devices = list(self.ext.on_device_list_pci(vm, 'device-list:pci'))
self.assertEqual(len(devices), 1)
self.assertEqual(devices[0].ident, "00_14.0")
self.assertEqual(devices[0].vendor, "Intel Corporation")
self.assertEqual(devices[0].product,
"9 Series Chipset Family USB xHCI Controller")
self.assertEqual(devices[0].interfaces, [DeviceInterface("p0c0330")])
self.assertEqual(devices[0].parent_device, None)
self.assertEqual(devices[0].libvirt_name, "pci_0000_00_14_0")
self.assertEqual(devices[0].description,
"USB controller: 9 Series Chipset Family "
"USB xHCI Controller (Intel Corporation)")
self.assertEqual(devices[0].self_identity, "0x8086:0x8cb1::p0c0330")

0 comments on commit 93f60d5

Please sign in to comment.