From fc895970a86c96023f1054f34824fece642a11fd Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Mon, 24 Oct 2022 10:41:39 -0700 Subject: [PATCH] [Bug] Update wait function in test_detached_actor (#635) This commit implements a wait function for head pod restart in test_detached_actor. --- tests/compatibility-test.py | 41 ++++++++--- tests/config/requirements.txt | 1 + tests/kuberay_utils/utils.py | 99 +++++++++++++++++++++++++- tests/scripts/test_detached_actor_2.py | 13 +++- 4 files changed, 138 insertions(+), 16 deletions(-) diff --git a/tests/compatibility-test.py b/tests/compatibility-test.py index 7f5d7bab8c2..b65835b617c 100755 --- a/tests/compatibility-test.py +++ b/tests/compatibility-test.py @@ -8,6 +8,8 @@ import string import kuberay_utils.utils as utils +from kubernetes import client, config +from kubernetes.stream import stream logger = logging.getLogger(__name__) @@ -267,37 +269,54 @@ def get_new_value(): client.close() def test_detached_actor(self): - client = docker.from_env() - container = client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True, + docker_client = docker.from_env() + container = docker_client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True, network_mode='host', command=["/bin/sh"]) ray_namespace = ''.join(random.choices(string.ascii_lowercase, k=10)) logger.info(f'namespace: {ray_namespace}') # Register a detached actor utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_detached_actor_1.py') - exit_code, output = utils.exec_run_container(container, f'python3 /usr/local/test_detached_actor_1.py {ray_namespace}', timeout_sec = 180) + exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_detached_actor_1.py {ray_namespace}', timeout_sec = 180) if exit_code != 0: raise Exception(f"There was an exception during the execution of test_detached_actor_1.py. The exit code is {exit_code}." + "See above for command output. The output will be printed by the function exec_run_container.") - # Kill the gcs on head node. If fate sharing is enabled, the whole head node pod will terminate. - utils.shell_assert_success( - 'kubectl exec -it $(kubectl get pods -A| grep -e "-head" | awk "{print \\$2}") -- /bin/bash -c "ps aux | grep gcs_server | grep -v grep | awk \'{print \$2}\' | xargs kill"') - # Wait for new head node getting created - # TODO (kevin85421): Need a better method to wait for the new head pod. (https://github.com/ray-project/kuberay/issues/618) - time.sleep(180) + # Initialize k8s client + config.load_kube_config() + k8s_api = client.CoreV1Api() + + # KubeRay only allows at most 1 head pod per RayCluster instance at the same time. In addition, + # if we have 0 head pods at this moment, it indicates that the head pod crashes unexpectedly. + headpods = utils.get_pod(k8s_api, namespace='default', label_selector='rayNodeType=head') + assert(len(headpods.items) == 1) + old_head_pod = headpods.items[0] + old_head_pod_name = old_head_pod.metadata.name + restart_count = old_head_pod.status.container_statuses[0].restart_count + + # Kill the gcs_server process on head node. If fate sharing is enabled, the whole head node pod + # will terminate. + exec_command = ['pkill gcs_server'] + utils.pod_exec_command(k8s_api, pod_name=old_head_pod_name, namespace='default', exec_command=exec_command) + + # Waiting for all pods become ready and running. + utils.wait_for_new_head(k8s_api, old_head_pod_name, restart_count, 'default', timeout=300, retry_interval_ms=1000) # Try to connect to the detached actor again. + # [Note] When all pods become running and ready, the RayCluster still needs tens of seconds to relaunch actors. Hence, + # `test_detached_actor_2.py` will retry until a Ray client connection succeeds. utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_detached_actor_2.py') - exit_code, output = utils.exec_run_container(container, f'python3 /usr/local/test_detached_actor_2.py {ray_namespace}', timeout_sec = 180) + exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_detached_actor_2.py {ray_namespace}', timeout_sec = 180) if exit_code != 0: raise Exception(f"There was an exception during the execution of test_detached_actor_2.py. The exit code is {exit_code}." + "See above for command output. The output will be printed by the function exec_run_container.") + k8s_api.api_client.rest_client.pool_manager.clear() + k8s_api.api_client.close() container.stop() - client.close() + docker_client.close() class RayServiceTestCase(unittest.TestCase): service_template_file = 'tests/config/ray-service.yaml.template' diff --git a/tests/config/requirements.txt b/tests/config/requirements.txt index bdb9670965e..ad3726cd2c6 100644 --- a/tests/config/requirements.txt +++ b/tests/config/requirements.txt @@ -1 +1,2 @@ docker +kubernetes diff --git a/tests/kuberay_utils/utils.py b/tests/kuberay_utils/utils.py index 915d2ccfa49..82a67a746b2 100644 --- a/tests/kuberay_utils/utils.py +++ b/tests/kuberay_utils/utils.py @@ -8,6 +8,8 @@ import docker from string import Template +from kubernetes import client, config +from kubernetes.stream import stream kindcluster_config_file = 'tests/config/cluster-config.yaml' raycluster_service_file = 'tests/config/raycluster-service.yaml' @@ -168,8 +170,8 @@ def copy_to_container(container, src, dest, filename): tar.add(filename) finally: tar.close() - data = open(tf.name, 'rb').read() - container.put_archive(dest, data) + with open(tf.name, 'rb') as data: + container.put_archive(dest, data.read()) finally: os.chdir(oldpwd) @@ -211,3 +213,96 @@ def wait_for_condition( if last_ex is not None: message += f" Last exception: {last_ex}" raise RuntimeError(message) + +def get_pod(k8s_api, namespace, label_selector): + return k8s_api.list_namespaced_pod(namespace = namespace, label_selector = label_selector) + +def pod_exec_command(k8s_api, pod_name, namespace, exec_command, stderr=True, stdin=False, stdout=True, tty=False, silent=False): + exec_command = ['/bin/sh', '-c'] + exec_command + resp = stream(k8s_api.connect_get_namespaced_pod_exec, + pod_name, + namespace, + command=exec_command, + stderr=stderr, stdin=stdin, + stdout=stdout, tty=tty) + if not silent: + logger.info(f"cmd: {exec_command}") + logger.info(f"response: {resp}") + return resp + +def wait_for_new_head(k8s_api, old_head_pod_name, old_restart_count, namespace, timeout, retry_interval_ms): + """ + `wait_for_new_head` is used to wait for new head is ready and running. For example, `test_detached_actor` kills + the gcs_server process on the head pod. It takes nearly 1 min to kill the head pod, and the head pod will still + be in 'Running' and 'Ready' in that minute. + + Hence, we need to check `restart_count` or `new_head_pod_name`. + (1) `restart_count`: If the pod is restarted by the restartPolicy of a Pod, `restart_count` will increase by 1. + If the pod is deleted by KubeRay and the reconciler creates a new one, `restart_count` will be 0. + (2) `new_head_pod_name`: If the reconciler creates a new head pod, `new_head_pod_name` will be different from + `old_head_pod_name`. + + Next, we check the status of pods to ensure all pods should be "Running" and "Ready". + + After the cluster state converges (all pods are "Running" and "Ready"), ray processes still need tens of seconds to + become ready to serve new connections from ray clients. So, users need to retry until a Ray client connection succeeds. + + Args: + k8s_api: Kubernetes client (e.g. client.CoreV1Api()) + old_head_pod_name: Name of the old head pod. + old_restart_count: If the Pod is restarted by Kubernetes Pod RestartPolicy, the restart_count will increase by 1. + namespace: Namespace that the head pod is running in. + timeout: Same as `wait_for_condition`. + retry_interval_ms: Same as `wait_for_condition`. + + Raises: + RuntimeError: If the condition is not met before the timeout expires, raise the RuntimeError. + """ + def check_status(k8s_api, old_head_pod_name, old_restart_count, namespace) -> bool: + all_pods = k8s_api.list_namespaced_pod(namespace = namespace) + headpods = get_pod(k8s_api, namespace=namespace, label_selector='rayNodeType=head') + # KubeRay only allows at most 1 head pod per RayCluster instance at the same time. On the other + # hands, when we kill a worker, the operator will reconcile a new one immediately without waiting + # for the Pod termination to complete. Hence, it is possible to have more than `worker.Replicas` + # worker pods in the cluster. + if len(headpods.items) != 1: + logger.info('Number of headpods is not equal to 1.') + return False + new_head_pod = headpods.items[0] + new_head_pod_name = new_head_pod.metadata.name + new_restart_count = new_head_pod.status.container_statuses[0].restart_count + # The default container restartPolicy of a Pod is `Always`. Hence, when GCS server is killed, + # the head pod will restart the old one rather than create a new one. + if new_head_pod_name != old_head_pod_name: + logger.info(f'If GCS server is killed, the head pod will restart the old one rather than create a new one.' + + f' new_head_pod_name: {new_head_pod_name}, old_head_pod_name: {old_head_pod_name}') + # TODO (kevin85421): We should `return False` here, but currently ray:nightly has a high possibility to create + # a new head pod instead of restarting the old one. + + # When GCS server is killed, it takes nearly 1 min to kill the head pod. In the minute, the head + # pod will still be in 'Running' and 'Ready'. Hence, we need to check `restart_count`. + else: + # TODO (kevin85421): We should remove `else` in the future. Currently, ray:nightly has a high possibility to + # create a new head pod instead of restarting the old one. The new pod's `restart_count` + # is 0. + if new_restart_count != old_restart_count + 1: + logger.info(f'new_restart_count != old_restart_count + 1 => new_restart_count: {new_restart_count},' + + f' old_restart_count: {old_restart_count}') + return False + # All pods should be "Running" and "Ready". This check is an overkill. We added this check due to + # the buggy behaviors of Ray HA. To elaborate, when GCS server is killed, the head pod should restart, + # but worker pods should not. However, currently, worker pods will also restart. + # See https://github.com/ray-project/kuberay/issues/634 for more details. + for pod in all_pods.items: + if pod.status.phase != 'Running': + logger.info(f'Pod {pod.metadata.name} is not Running.') + return False + for c in pod.status.container_statuses: + if not c.ready: + logger.info(f'Container {c.name} in {pod.metadata.name} is not ready.') + return False + return True + wait_for_condition(check_status, timeout=timeout, retry_interval_ms=retry_interval_ms, k8s_api=k8s_api, + old_head_pod_name=old_head_pod_name, old_restart_count=old_restart_count, namespace=namespace) + # After the cluster state converges, ray processes still need tens of seconds to become ready. + # TODO (kevin85421): Make ray processes become ready when pods are "Ready" and "Running". diff --git a/tests/scripts/test_detached_actor_2.py b/tests/scripts/test_detached_actor_2.py index 08834cf10b0..b55c5788b16 100644 --- a/tests/scripts/test_detached_actor_2.py +++ b/tests/scripts/test_detached_actor_2.py @@ -2,11 +2,10 @@ import time import sys -ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1]) - def retry_with_timeout(func, timeout=90): err = None - for _ in range(timeout): + start = time.time() + while time.time() - start <= timeout: try: return func() except BaseException as e: @@ -18,7 +17,15 @@ def retry_with_timeout(func, timeout=90): def get_detached_actor(): return ray.get_actor("testCounter") +# Try to connect to Ray cluster. +print("Try to connect to Ray cluster.") +retry_with_timeout(lambda: ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1]), timeout = 180) + +# Get TestCounter actor +print("Get TestCounter actor.") tc = retry_with_timeout(get_detached_actor) + +print("Try to call remote function \'increment\'.") val = retry_with_timeout(lambda: ray.get(tc.increment.remote())) print(f"val: {val}")