Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-1186]: fake-attach without DR test automation (e2e), framework utils #1208

Merged
merged 8 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 54 additions & 7 deletions tests/e2e-test-framework/conftest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import logging
from datetime import datetime
from typing import Generator
import pytest
from framework.test_description_plugin import TestDescriptionPlugin
import re

from datetime import datetime
from wiremock.testing.testcontainer import wiremock_container
from wiremock.constants import Config

from framework.test_description_plugin import TestDescriptionPlugin
from framework.qtest_helper import QTestHelper
from framework.docker_helper import Docker
import re
from datetime import datetime
from framework.propagating_thread import PropagatingThread
from framework.utils import Utils
from framework.ssh import SSHCommandExecutor
from framework.drive import DriveUtils
from framework.propagating_thread import PropagatingThread

@pytest.mark.trylast
Expand Down Expand Up @@ -69,15 +76,15 @@ def pytest_sessionfinish():
logging.info(f"[qTest] {thread.test_name} {thread.get_target_name()} success.")

@pytest.fixture(scope="session")
def vm_user(request):
def vm_user(request) -> str:
return request.config.getoption("--login")

@pytest.fixture(scope="session")
def vm_cred(request):
def vm_cred(request) -> str:
return request.config.getoption("--password")

@pytest.fixture(scope="session")
def namespace(request):
def namespace(request) -> str:
return request.config.getoption("--namespace")

@pytest.fixture(scope="session")
Expand All @@ -97,13 +104,53 @@ def wire_mock():
Config.requests_verify = False
yield wire_mock

def get_utils(request) -> Utils:
return Utils(
vm_user=request.config.getoption("--login"),
vm_cred=request.config.getoption("--password"),
namespace=request.config.getoption("--namespace")
)

def get_ssh_executors(request) -> dict[str, SSHCommandExecutor]:
utils = get_utils(request)
worker_ips = utils.get_worker_ips()
executors = {ip: SSHCommandExecutor(ip_address=ip, username=utils.vm_user, password=utils.vm_cred) for ip in worker_ips}
return executors

@pytest.fixture(scope="session")
def utils(request) -> Utils:
return get_utils(request)

@pytest.fixture(scope="session")
def ssh_executors(request) -> dict[str, SSHCommandExecutor]:
return get_ssh_executors(request)

@pytest.fixture(scope="session")
def drive_utils_executors(request) -> dict[str, DriveUtils]:
ssh_execs = get_ssh_executors(request)
return {ip: DriveUtils(executor) for ip, executor in ssh_execs.items()}

@pytest.fixture(scope="function", autouse=True)
def link_requirements_in_background(request):
if pytest.qtest_helper is not None:
requirements_thread = PropagatingThread(target=link_requirements, args=(request,), test_name=request.node.name)
requirements_thread.start()
pytest.threads.append(requirements_thread)


@pytest.fixture(autouse=True)
def keep_drive_count(drive_utils_executors: dict[str, DriveUtils]) -> Generator[None, None, None]:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make sure we don't disconnect a drive during a test without plugging it again

hosts_per_node_before = {ip: drive_utils.get_all_hosts() for ip, drive_utils in drive_utils_executors.items()}
yield
hosts_per_node_after = {ip: drive_utils.get_all_hosts() for ip, drive_utils in drive_utils_executors.items()}
for ip, drive_utils in drive_utils_executors.items():
drive_utils.rescan_missing_hosts(before=hosts_per_node_before[ip], after=hosts_per_node_after[ip])

@pytest.fixture(autouse=True)
def wipe_drives(drive_utils_executors: dict[str, DriveUtils]) -> Generator[None, None, None]:
yield
for _, drive_utils in drive_utils_executors.items():
drive_utils.wipe_drives()

def link_requirements(request):
for marker in request.node.iter_markers():
if marker.name == "requirements":
Expand Down
63 changes: 56 additions & 7 deletions tests/e2e-test-framework/framework/drive.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import Any, List, TypedDict
from typing import Any, Dict, List, TypedDict
from framework.ssh import SSHCommandExecutor


Expand Down Expand Up @@ -48,6 +48,44 @@ def restore(self, host_num: int) -> None:
)
self._handle_errors(errors)

def get_all_hosts(self) -> Dict[str, int]:
"""
Retrieves a dictionary of all SCSI hosts in the system.

Returns:
dict: A dictionary mapping the SCSI ID to the host number.
"""
param = "'{print $9}'"
output, errors = self.executor.exec(
f"ls -l /sys/class/scsi_device | awk {param}"
)
self._handle_errors(errors)
scsi_ids = output.splitlines()
return {
scsi_id: int(scsi_id.split(":")[0])
for scsi_id in scsi_ids
if scsi_id
}

def rescan_missing_hosts(
self, before: Dict[str, int], after: Dict[str, int]
) -> None:
"""
Rescans for missing hosts in the system.

Args:
before (Dict[str, int]): A dictionary mapping the SCSI ID to the host number before rescanning.
after (Dict[str, int]): A dictionary mapping the SCSI ID to the host number after rescanning.

Example:
>>> drive = DriveUtils(executor=executor)
>>> drive.rescan_missing_hosts(before={ '18:0:0:0': 0, '18:0:0:1': 0 }, after={ '18:0:0:0': 0 })
"""
for scsi_id, host_num in before.items():
if after.get(scsi_id) is None:
self.restore(host_num=host_num)
logging.info(f"host{host_num} was restored")

def get_host_num(self, drive_path_or_name: str) -> int:
"""
Retrieves the host number associated with the specified drive path or name.
Expand Down Expand Up @@ -87,22 +125,31 @@ def _get_drives_to_wipe(self, lsblk_out: dict) -> dict[str, DriveChild]:
"""
to_wipe = {}
for drive in lsblk_out["blockdevices"]:
if drive['type'] == 'disk':
if drive["type"] == "disk":
children = drive.get("children")
drive_mountpoints = drive.get("mountpoints", [])
drive_mountpoints = [
mountpoint for mountpoint in drive_mountpoints if mountpoint
mountpoint
for mountpoint in drive_mountpoints
if mountpoint
]
if len(drive_mountpoints) != 0:
logging.warning(f"found drive with drive mountpoints: \"/dev/{drive['name']}\", skipping...")
logging.warning(
f"found drive with drive mountpoints: \"/dev/{drive['name']}\", skipping..."
)
continue
if children:
for child in children:
child_mountpoints = child.get("mountpoints", [])
child_mountpoints = [
mountpoint for mountpoint in child_mountpoints if mountpoint
mountpoint
for mountpoint in child_mountpoints
if mountpoint
]
if len(child_mountpoints) == 0 and child['type'] in ["part", "lvm"]:
if len(child_mountpoints) == 0 and child["type"] in [
"part",
"lvm",
]:
logging.info(
f"found drive \"/dev/{drive['name']}\" with child \"{child['name']}\" with no mountpoints."
)
Expand Down Expand Up @@ -187,7 +234,9 @@ def wipe_drives(self) -> None:
self._handle_errors(errors)
output = json.loads(output)
drives_to_wipe = self._get_drives_to_wipe(lsblk_out=output)
logging.warning(f"drives to wipe: {drives_to_wipe}")
logging.warning(
f"drives to wipe on node {self.executor.ip_address}: {drives_to_wipe}"
)

for drive, children in drives_to_wipe.items():
if children["type"] == "part":
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e-test-framework/framework/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def exec(self, command: str) -> Tuple[str, List[Any]]:
ssh_client.connect(
self.ip_address, username=self.username, password=self.password)

logging.info(f"SSH connected, executing command: {command}")
logging.info(f"SSH connected to {self.ip_address}, executing command: {command}")
_, stdout, stderr = ssh_client.exec_command(command)
output = stdout.read().decode().strip()
error = stderr.readlines()
Expand Down
75 changes: 43 additions & 32 deletions tests/e2e-test-framework/framework/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,48 +327,63 @@ def get_drive_cr(self, volume_name: str, namespace: str) -> dict:
)
return drive_cr

def get_events_by_reason(
def get_pod_node_ip(self, pod_name: str, namespace: str) -> str:
"""
Retrieves the IP address of the node associated with the given pod name and namespace.
Args:
pod_name (str): The name of the pod.
namespace (str): The namespace of the pod.
Returns:
str: The IP address of the node associated with the pod.
"""
pod = self.list_pods(name_prefix=pod_name, namespace=namespace)[0]
node_name = pod.spec.node_name
node = self.core_v1_api.read_node(name=node_name)
return node.status.addresses[0].address

def get_events_by_reason_for_cr(
self,
plural: str,
resource_name: str,
namespace: Optional[str] = None,
reason: Optional[str] = None,
reason: str,
) -> List[CoreV1Event]:
"""
Retrieves events related to a specific resource by reason.
Retrieves a list of events filtered by the given resource name and reason.

Args:
plural (str): The plural name of the resource.
resource_name (str): The name of the resource.
namespace: (Optional[str], optional): The namespace of the resource.
reason (Optional[str], optional): The reason for filtering events. Defaults to None.
reason (str): The reason for filtering events.

Returns:
List[CoreV1Event]: A list of events related to the resource by reason.
List[CoreV1Event]: A list of Kubernetes CoreV1Event objects.
"""
if namespace:
cr = self.custom_objects_api.get_namespaced_custom_object(
const.CR_GROUP,
const.CR_VERSION,
namespace,
plural,
resource_name,
)
else:
cr = self.custom_objects_api.get_cluster_custom_object(
const.CR_GROUP, const.CR_VERSION, plural, resource_name
)
uid = cr["metadata"]["uid"]
field_selector = f"involvedObject.uid={uid}"
field_selector = f"involvedObject.name={resource_name},reason={reason}"
events_list = self.core_v1_api.list_event_for_all_namespaces(
field_selector=field_selector
).items

if reason:
events_list = [e for e in events_list if e.reason == reason]

return events_list

def event_in(self, resource_name: str, reason: str) -> bool:
"""
Checks if an event with the given resource name and reason exists in the Kubernetes API.

Args:
resource_name (str): The name of the resource.
reason (str): The reason for the event.

Returns:
bool: True if the event exists, False otherwise.
"""
events = self.get_events_by_reason_for_cr(
resource_name=resource_name,
reason=reason,
)
if len(events) > 0:
logging.info(f"event {reason} found")
return True
logging.warning(f"event {reason} not found")
return False

def wait_volume(
self,
name: str,
Expand Down Expand Up @@ -663,17 +678,13 @@ def recreate_pod(self, name: str, namespace: str) -> V1Pod:
Returns:
V1Pod: The recreated Pod.
"""
self.core_v1_api.delete_namespaced_pod(
name=name, namespace=namespace
)
self.core_v1_api.delete_namespaced_pod(name=name, namespace=namespace)
logging.info(
f"pod {name} deleted, waiting for a new pod to be created"
)

time.sleep(5)
pod = self.list_pods(name, namespace=namespace)[
0
]
pod = self.list_pods(name, namespace=namespace)[0]
assert self.is_pod_ready(
name, timeout=120
), "pod not ready after 120 seconds timeout"
Expand Down
Loading
Loading