diff --git a/tests/e2e-test-framework/framework/const.py b/tests/e2e-test-framework/framework/const.py index aeff9f2a4..eb6206fff 100644 --- a/tests/e2e-test-framework/framework/const.py +++ b/tests/e2e-test-framework/framework/const.py @@ -35,3 +35,14 @@ # health HEALTH_GOOD = "GOOD" HEALTH_BAD = "BAD" + +# fake attach +FAKE_ATTACH_INVOLVED = "FakeAttachInvolved" +FAKE_ATTACH_CLEARED = "FakeAttachCleared" + +# plurals +DRIVES_PLURAL = "drives" +AC_PLURAL = "availablecapacities" +ACR_PLURAL = "availablecapacityreservations" +LVG_PLURAL = "logicalvolumegroups" +VOLUMES_PLURAL = "volumes" diff --git a/tests/e2e-test-framework/framework/utils.py b/tests/e2e-test-framework/framework/utils.py index 85b82c161..81f9a4f4b 100644 --- a/tests/e2e-test-framework/framework/utils.py +++ b/tests/e2e-test-framework/framework/utils.py @@ -2,12 +2,12 @@ import logging from typing import Any, Callable, Dict, List, Optional +from kubernetes.client.rest import ApiException from kubernetes.client.models import ( V1Pod, V1PersistentVolumeClaim, CoreV1Event, ) -from kubernetes.client.rest import ApiException import framework.const as const @@ -36,6 +36,63 @@ def __init__(self, vm_user: str, vm_cred: str, namespace: str): const.SYSLVG_SC: const.STORAGE_TYPE_SYSLVG, } + def get_worker_ips(self) -> List[str]: + """ + Retrieves the IP addresses of worker nodes in the Kubernetes cluster. + + Returns: + List[str]: A list of IP addresses of worker nodes. + """ + nodes = self.core_v1_api.list_node().items + worker_nodes = [ + node + for node in nodes + if "node-role.kubernetes.io/control-plane" + not in node.metadata.labels + ] + assert worker_nodes, "No worker nodes found in the cluster" + logging.info("[ASSERT] Worker nodes found in the cluster.") + + worker_ips = [ + node.status.addresses[0].address for node in worker_nodes + ] + assert worker_ips, "No IP addresses found for worker nodes" + logging.info( + f"[ASSERT] IP addresses found for worker nodes - {worker_ips}" + ) + + return worker_ips + + def get_controlplane_ips(self) -> List[str]: + """ + Retrieves the IP addresses of control plane nodes in the Kubernetes cluster. + + Returns: + List[str]: A list of IP addresses of control plane nodes. + """ + nodes = self.core_v1_api.list_node().items + controlplane_nodes = [ + node + for node in nodes + if "node-role.kubernetes.io/control-plane" in node.metadata.labels + ] + assert ( + controlplane_nodes + ), "No control plane nodes found in the cluster" + logging.info("[ASSERT] Control plane nodes found in the cluster.") + + controlplane_ips = [ + node.status.addresses[0].address for node in controlplane_nodes + ] + assert ( + controlplane_ips + ), "No IP addresses found for control plane nodes" + logging.info( + f"[ASSERT] IP addresses found for control plane nodes - {controlplane_ips}" + ) + + return controlplane_ips + def is_pod_running(self, pod_name: str, timeout=30) -> bool: """ Checks if a given pod is running in the Kubernetes cluster. @@ -127,6 +184,8 @@ def list_pods( key, value = label_split[0], label_split[1] labeled_pods = {p.metadata.name: p for p in pods} for p in pods: + if p.metadata.labels is None: + p.metadata.labels = {} requested_label = p.metadata.labels.get(key, None) if requested_label is None or requested_label != value: del labeled_pods[p.metadata.name] @@ -164,6 +223,8 @@ def list_persistent_volume_claims( key, value = label_split[0], label_split[1] labeled_pods = {p.metadata.name: p for p in pvcs} for p in pvcs: + if p.metadata.labels is None: + p.metadata.labels = {} requested_label = p.metadata.labels.get(key, None) if requested_label is None or requested_label != value: del labeled_pods[p.metadata.name] @@ -287,7 +348,12 @@ def get_events_by_reason( """ if namespace: cr = self.custom_objects_api.get_namespaced_custom_object( - const.CR_GROUP, const.CR_VERSION, namespace, plural, resource_name) + const.CR_GROUP, + const.CR_VERSION, + namespace, + plural, + resource_name, + ) else: cr = self.custom_objects_api.get_cluster_custom_object( const.CR_GROUP, const.CR_VERSION, plural, resource_name @@ -508,7 +574,109 @@ def annotate_pvc( pvc = self.core_v1_api.read_namespaced_persistent_volume_claim( name=resource_name, namespace=namespace ) + if pvc.metadata.annotations is None: + pvc.metadata.annotations = {} pvc.metadata.annotations[annotation_key] = annotation_value self.core_v1_api.patch_namespaced_persistent_volume_claim( name=resource_name, namespace=namespace, body=pvc ) + + def clear_csi_resources(self, namespace: str) -> None: + """ + Clears the CSI resources by deleting the custom objects in the specified namespace. + + Args: + namespace (str): The namespace of the custom objects to be cleared. + + Returns: + None: This function does not return anything. + """ + try: + self.custom_objects_api.delete_collection_namespaced_custom_object( + group=const.CR_GROUP, + version=const.CR_VERSION, + namespace=namespace, + plural=const.VOLUMES_PLURAL, + grace_period_seconds=0, + propagation_policy="Foreground", + ) + logging.info("CR volumes: delete request sent") + for plural in [ + const.DRIVES_PLURAL, + const.AC_PLURAL, + const.ACR_PLURAL, + const.LVG_PLURAL, + ]: + self.custom_objects_api.delete_collection_cluster_custom_object( + group=const.CR_GROUP, + version=const.CR_VERSION, + plural=plural, + grace_period_seconds=0, + propagation_policy="Foreground", + ) + logging.info(f"CR {plural}: delete request sent") + self.core_v1_api.delete_collection_namespaced_persistent_volume_claim( + namespace=namespace + ) + logging.info("waiting for resources to be in the removing state") + time.sleep(10) + lvg_list = self.custom_objects_api.list_cluster_custom_object( + group=const.CR_GROUP, + version=const.CR_VERSION, + plural="logicalvolumegroups", + )["items"] + for lvg in lvg_list: + if "finalizers" in lvg.get("metadata", {}): + lvg["metadata"]["finalizers"] = [] + self.custom_objects_api.replace_cluster_custom_object( + group=const.CR_GROUP, + version=const.CR_VERSION, + namespace=namespace, + plural=const.LVG_PLURAL, + name=lvg["metadata"]["name"], + body=lvg, + ) + for v in self.list_volumes(): + if "finalizers" in v.get("metadata", {}): + v["metadata"]["finalizers"] = [] + self.custom_objects_api.replace_namespaced_custom_object( + const.CR_GROUP, + const.CR_VERSION, + namespace, + plural=const.VOLUMES_PLURAL, + name=v["metadata"]["name"], + body=v, + ) + except ApiException as e: + print( + f"Exception when calling CustomObjectsApi->delete_namespaced_custom_object: {e}" + ) + + def recreate_pod(self, name: str, namespace: str) -> V1Pod: + """ + Recreates a Kubernetes Pod by deleting the existing Pod and waiting for a new Pod to be created. + + Args: + name (str): The name of the pod. + namespace (str): The namespace of the pod. + + Returns: + V1Pod: The recreated Pod. + """ + self.core_v1_api.delete_namespaced_pod( + name=name, namespace=namespace + ) + logging.info( + f"pod {name} deleted, waiting for a new pod to be created" + ) + + time.sleep(5) + pod = self.list_pods(name, namespace=namespace)[ + 0 + ] + assert self.is_pod_ready( + name, timeout=120 + ), "pod not ready after 120 seconds timeout" + logging.info(f"pod {name} is ready") + + return pod