Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some pci utils in avocado framework #5755

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 164 additions & 1 deletion avocado/utils/pci.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import os
import re

from avocado.utils import genio, process
from avocado.utils import genio, process, wait


def get_domains():
Expand Down Expand Up @@ -320,6 +320,169 @@ def get_driver(pci_address):
return line.rsplit(None, 1)[-1]


def unbind(driver, full_pci_address):
"""
Unbind the pci device(full_pci_address) from driver

:param driver: driver of the PCI address (full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file_or_fail(f"/sys/bus/pci/drivers/{driver}/unbind", full_pci_address)
if wait.wait_for(
lambda: os.path.exists(
f"/sys/bus/pci/drivers/\
{driver}/{full_pci_address}"
),
timeout=5,
):
raise ValueError(f"Not able to unbind {full_pci_address} from {driver}")


def bind(driver, full_pci_address):
"""
Bind the pci device(full_pci_address) to driver

:param driver: driver of the PCI address (full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file_or_fail(f"/sys/bus/pci/drivers/{driver}/bind", full_pci_address)
if not wait.wait_for(
lambda: os.path.exists(
f"/sys/bus/pci/drivers/\
{driver}/{full_pci_address}"
),
timeout=5,
):
raise ValueError(f"Not able to bind {full_pci_address} to {driver}")


def get_vendor_id(full_pci_address):
"""
Get vendor id of a PCI address

:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: vendor id of PCI address
rtype: str
"""
cmd = f"lspci -n -s {full_pci_address}"
out = process.run(cmd, ignore_status=True, shell=True).stdout_text
if out == "":
raise ValueError(f"Not able to get {full_pci_address} vendor id")
return out.split(" ")[2].strip()


def reset_check(full_pci_address):
"""
Check if reset for "full_pci_address" is successful

:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: whether reset for "full_pci_address" is successful
rtype: bool
"""
cmd = f"lspci -vvs {full_pci_address}"
output = process.run(cmd, ignore_status=True, shell=True).stdout_text
if output != "":
return False
return True


def rescan_check(full_pci_address):
"""
Check if rescan for full_pci_address is successful

:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: whether rescan for full_pci_address is successful
rtype: bool
"""
cmd = f"lspci -vvs {full_pci_address}"
output = process.run(cmd, ignore_status=True, shell=True).stdout_text
if output == "":
return False
return True


def change_domain_check(dom, full_pci_address, def_dom):
"""
Check if the domain changed successfully to "dom" for "full_pci_address"

:param dom: domain type
:param def_dom: default domain of pci device(full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: whether domain changed successfully to "dom"
rtype: bool
"""
try:
output = genio.read_one_line(
f"/sys/bus/pci/devices/{full_pci_address}/iommu_group/type"
)
out = output.rsplit(None, 1)[-1]
if (dom not in ("auto", out)) or (dom == "auto" and out != def_dom):
return False
return True
except OSError as details:
raise ValueError(f"Change domain check failed: {details}")


def reset(full_pci_address):
"""
Remove the full_pci_address

:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file_or_fail(f"/sys/bus/pci/devices/{full_pci_address}/remove", "1")
if not wait.wait_for(lambda: reset_check(full_pci_address), timeout=5):
raise ValueError(f"Unsuccessful to remove {full_pci_address}")


def rescan(full_pci_address):
"""
Rescan the system and check for full_pci_address

:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file_or_fail("/sys/bus/pci/rescan", "1")
if not wait.wait_for(lambda: rescan_check(full_pci_address), timeout=5):
raise ValueError(f"Unsuccessful to rescan for {full_pci_address}")


def get_iommu_group(full_pci_address):
"""
Return the iommu group of full_pci_address

:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: iommu group of full_pci_address
rtype: string
"""
cmd = f"lspci -vvvv -s {full_pci_address}"
out = process.run(cmd, ignore_status=True, shell=True)
for line in out.stdout_text.split("\n"):
if "IOMMU group" in line:
return line.strip().split(" ")[2]
raise ValueError(f"{full_pci_address} group not found")


def change_domain(dom, def_dom, full_pci_address):
"""
Change the domain of pci device(full_pci_address) to dom

:param dom: domain type
:param def_dom: default domain of pci device(full_pci_address)
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
return: None
"""
genio.write_file_or_fail(
f"/sys/bus/pci/devices/{full_pci_address}/iommu_group/type", dom
)
if not wait.wait_for(
lambda: change_domain_check(dom, full_pci_address, def_dom), timeout=5
):
raise ValueError(f"Domain type change failed for {full_pci_address}")


def get_memory_address(pci_address):
"""
Gets the memory address of a PCI address. (first match only)
Expand Down