Skip to content

Commit

Permalink
Add some pci utils in avocado framework
Browse files Browse the repository at this point in the history
Add the following pci utils:
1) unbind - unbind the driver from pci device.
2) bind - bind the driver to pci device.
3) get_vendor_id - get the vendor id of pci device.
4) reset - remove pci device in pci devices list in the system.
5) rescan - rescan the system for pci device.
6) reset_check - check if reset of pci device is successful.
7) rescan_check - check if rescan of the system for pci device is
successful.
8) get_iommu_group - returns the iommu group of pci device.
9) change_domain - change iommu domain of pci device
10) change_domain_check - check whether whether change_domain is
successful.

Signed-off-by: Dheeraj Kumar Srivastava <[email protected]>
  • Loading branch information
dhsrivas committed Aug 30, 2023
1 parent f07f661 commit c81b3bf
Showing 1 changed file with 159 additions and 1 deletion.
160 changes: 159 additions & 1 deletion avocado/utils/pci.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import os
import re

from avocado.utils import genio, process
from avocado.utils import genio, process, wait


def get_domains():
Expand Down Expand Up @@ -320,6 +320,164 @@ def get_driver(pci_address):
return line.rsplit(None, 1)[-1]


def unbind(driver, full_pci_address):
"""
Unbind the pci device(full_pci_address) from driver
:param driver: driver of the PCI address (full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file(f"/sys/bus/pci/drivers/{driver}/unbind", full_pci_address)
if wait.wait_for(
lambda: os.path.exists(
f"/sys/bus/pci/drivers/\
{driver}/{full_pci_address}"
),
timeout=5,
):
raise ValueError(f"Not able to unbind {full_pci_address} from {driver}")


def bind(driver, full_pci_address):
"""
Bind the pci device(full_pci_address) to driver
:param driver: driver of the PCI address (full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file(f"/sys/bus/pci/drivers/{driver}/bind", full_pci_address)
if not wait.wait_for(
lambda: os.path.exists(
f"/sys/bus/pci/drivers/\
{driver}/{full_pci_address}"
),
timeout=5,
):
raise ValueError(f"Not able to bind {full_pci_address} to {driver}")


def get_vendor_id(full_pci_address):
"""
Get vendor id of a PCI address
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: vendor id of PCI address
rtype: str
"""
cmd = f"lspci -n -s {full_pci_address}"
out = process.run(cmd, ignore_status=True, shell=True).stdout_text
if out == "":
raise ValueError(f"Not able to get {full_pci_address} vendor id")
return out.split(" ")[2].strip()


def reset_check(full_pci_address):
"""
Check if reset for "full_pci_address" is successful
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: whether reset for "full_pci_address" is successful
rtype: bool
"""
cmd = f"lspci -vvs {full_pci_address}"
output = process.run(cmd, ignore_status=True, shell=True).stdout_text
if output != "":
return False
return True


def rescan_check(full_pci_address):
"""
Check if rescan for full_pci_address is successful
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: whether rescan for full_pci_address is successful
rtype: bool
"""
cmd = f"lspci -vvs {full_pci_address}"
output = process.run(cmd, ignore_status=True, shell=True).stdout_text
if output == "":
return False
return True


def change_domain_check(dom, full_pci_address, def_dom):
"""
Check if the domain changed successfully to "dom" for "full_pci_address"
:param dom: domain type
:param def_dom: default domain of pci device(full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: whether domain changed successfully to "dom"
rtype: bool
"""
output = genio.read_one_line(
f"/sys/bus/pci/devices/{full_pci_address}/iommu_group/type"
)
out = output.rsplit(None, 1)[-1]
if (dom not in ("auto", out)) or (dom == "auto" and out != def_dom):
return False
return True


def reset(full_pci_address):
"""
Remove the full_pci_address
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file(f"/sys/bus/pci/devices/{full_pci_address}/remove", "1")
if not wait.wait_for(lambda: reset_check(full_pci_address), timeout=5):
raise ValueError(f"Unsuccessful to remove {full_pci_address}")


def rescan(full_pci_address):
"""
Rescan the system and check for full_pci_address
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file("/sys/bus/pci/rescan", "1")
if not wait.wait_for(lambda: rescan_check(full_pci_address), timeout=5):
raise ValueError(f"Unsuccessful to rescan for {full_pci_address}")


def get_iommu_group(full_pci_address):
"""
Return the iommu group of full_pci_address
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: iommu group of full_pci_address
rtype: string
"""
cmd = f"lspci -vvvv -s {full_pci_address}"
out = process.run(cmd, ignore_status=True, shell=True)
for line in out.stdout_text.split("\n"):
if "IOMMU group" in line:
return line.strip().split(" ")[2]
raise ValueError(f"{full_pci_address} group not found")


def change_domain(dom, def_dom, full_pci_address):
"""
Change the domain of pci device(full_pci_address) to dom
:param dom: domain type
:param def_dom: default domain of pci device(full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file(f"/sys/bus/pci/devices/{full_pci_address}/iommu_group/type", dom)
if not wait.wait_for(
lambda: change_domain_check(dom, full_pci_address, def_dom), timeout=5
):
raise ValueError(f"Domain type change failed for {full_pci_address}")


def get_memory_address(pci_address):
"""
Gets the memory address of a PCI address. (first match only)
Expand Down

0 comments on commit c81b3bf

Please sign in to comment.