diff --git a/.github/workflows/actions/compatibility/action.yaml b/.github/workflows/actions/compatibility/action.yaml index 83fdf127489..08a53eb7065 100644 --- a/.github/workflows/actions/compatibility/action.yaml +++ b/.github/workflows/actions/compatibility/action.yaml @@ -48,28 +48,12 @@ runs: docker images ls -a shell: bash - - name: Download Artifact Apiserver - uses: actions/download-artifact@v2 - with: - name: apiserver_img - path: /tmp - - - name: Load KubeRay Apiserver Docker Image - run: | - docker load --input /tmp/apiserver.tar - docker images ls -a - shell: bash - - name: Run ${{ inputs.ray_version }} compatibility test # compatibility test depends on operator & apiserver images built in previous steps. run: | - pushd manifests/base/ - kustomize edit set image kuberay/operator=kuberay/operator:${{ steps.vars.outputs.sha_short }} - kustomize edit set image kuberay/apiserver=kuberay/apiserver:${{ steps.vars.outputs.sha_short }} - popd echo "Using Ray image ${{ inputs.ray_version }}" PYTHONPATH="./tests/" \ RAY_IMAGE="rayproject/ray:${{ inputs.ray_version }}" \ OPERATOR_IMAGE="kuberay/operator:${{ steps.vars.outputs.sha_short }}" \ - APISERVER_IMAGE="kuberay/apiserver:${{ steps.vars.outputs.sha_short }}" python ./tests/compatibility-test.py + python ./tests/compatibility-test.py shell: bash diff --git a/.github/workflows/actions/configuration/action.yaml b/.github/workflows/actions/configuration/action.yaml index 8b3226de798..026737ab92a 100644 --- a/.github/workflows/actions/configuration/action.yaml +++ b/.github/workflows/actions/configuration/action.yaml @@ -48,29 +48,13 @@ runs: docker images ls -a shell: bash - - name: Download Artifact Apiserver - uses: actions/download-artifact@v2 - with: - name: apiserver_img - path: /tmp - - - name: Load KubeRay Apiserver Docker Image - run: | - docker load --input /tmp/apiserver.tar - docker images ls -a - shell: bash - - name: Run configuration tests for sample YAML files. # compatibility test depends on operator & apiserver images built in previous steps. run: | - pushd manifests/base/ - kustomize edit set image kuberay/operator=kuberay/operator:${{ steps.vars.outputs.sha_short }} - kustomize edit set image kuberay/apiserver=kuberay/apiserver:${{ steps.vars.outputs.sha_short }} - popd echo "Using Ray image ${{ inputs.ray_version }}" cd tests/framework GITHUB_ACTIONS=true \ RAY_IMAGE="rayproject/ray:${{ inputs.ray_version }}" \ OPERATOR_IMAGE="kuberay/operator:${{ steps.vars.outputs.sha_short }}" \ - APISERVER_IMAGE="kuberay/apiserver:${{ steps.vars.outputs.sha_short }}" python test_sample_raycluster_yamls.py + python test_sample_raycluster_yamls.py shell: bash diff --git a/ray-operator/DEVELOPMENT.md b/ray-operator/DEVELOPMENT.md index b8d4e24c5e0..e3265778641 100644 --- a/ray-operator/DEVELOPMENT.md +++ b/ray-operator/DEVELOPMENT.md @@ -135,13 +135,13 @@ python3 ../scripts/rbac-check.py We have some [end-to-end tests](https://github.com/ray-project/kuberay/blob/master/.github/workflows/actions/compatibility/action.yaml) on GitHub Actions. These tests operate small Ray clusters running within a [kind](https://kind.sigs.k8s.io/) (Kubernetes-in-docker) environment. To run the tests yourself, follow these steps: -* Step1: Install related dependencies, including [kind](https://kind.sigs.k8s.io/), [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/), and [kustomize](https://kustomize.io/). +* Step1: Install related dependencies, including [kind](https://kind.sigs.k8s.io/) and [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/). * Step2: You must be in `/path/to/your/kuberay/`. ```bash - # [Usage]: RAY_IMAGE=$RAY_IMAGE OPERATOR_IMAGE=$OPERATOR_IMAGE APISERVER_IMAGE=$APISERVER_IMAGE python3 tests/compatibility-test.py + # [Usage]: RAY_IMAGE=$RAY_IMAGE OPERATOR_IMAGE=$OPERATOR_IMAGE python3 tests/compatibility-test.py # These 3 environment variables are optional. # [Example]: - RAY_IMAGE=rayproject/ray:2.0.0 OPERATOR_IMAGE=kuberay/operator:nightly APISERVER_IMAGE=kuberay/apiserver:nightly python3 tests/compatibility-test.py + RAY_IMAGE=rayproject/ray:2.0.0 OPERATOR_IMAGE=kuberay/operator:nightly python3 tests/compatibility-test.py ``` diff --git a/tests/compatibility-test.py b/tests/compatibility-test.py index b446ab39166..d6d93c6d197 100755 --- a/tests/compatibility-test.py +++ b/tests/compatibility-test.py @@ -8,7 +8,14 @@ import docker from kuberay_utils import utils -from kubernetes import client, config +from framework.prototype import ( + CONST, + CurlServiceRule, + K8S_CLUSTER_MANAGER, + OperatorManager, + RuleSet, + shell_subprocess_run +) logger = logging.getLogger(__name__) logging.basicConfig( @@ -23,7 +30,6 @@ # Docker images ray_image = 'rayproject/ray:1.9.0' kuberay_operator_image = 'kuberay/operator:nightly' -kuberay_apiserver_image = 'kuberay/apiserver:nightly' class BasicRayTestCase(unittest.TestCase): @@ -36,12 +42,15 @@ def setUpClass(cls): # from another local ray container. The local ray container # outside Kind environment has the same ray version as the # ray cluster running inside Kind environment. - utils.delete_cluster() - utils.create_cluster() - images = [ray_image, kuberay_operator_image, kuberay_apiserver_image] - utils.download_images(images) - utils.apply_kuberay_resources(images, kuberay_operator_image, kuberay_apiserver_image) - utils.create_kuberay_cluster(BasicRayTestCase.cluster_template_file, + K8S_CLUSTER_MANAGER.delete_kind_cluster() + K8S_CLUSTER_MANAGER.create_kind_cluster() + image_dict = { + CONST.RAY_IMAGE_KEY: ray_image, + CONST.OPERATOR_IMAGE_KEY: kuberay_operator_image + } + operator_manager = OperatorManager(image_dict) + operator_manager.prepare_operator() + utils.create_ray_cluster(BasicRayTestCase.cluster_template_file, ray_version, ray_image) def test_simple_code(self): @@ -138,38 +147,42 @@ class RayFTTestCase(unittest.TestCase): @classmethod def setUpClass(cls): - if not utils.ray_ft_supported(ray_version): - raise unittest.SkipTest("ray ft is not supported") - utils.delete_cluster() - utils.create_cluster() - images = [ray_image, kuberay_operator_image, kuberay_apiserver_image] - utils.download_images(images) - utils.apply_kuberay_resources(images, kuberay_operator_image, kuberay_apiserver_image) - utils.create_kuberay_cluster(RayFTTestCase.cluster_template_file, + if not utils.is_feature_supported(ray_version, CONST.RAY_FT): + raise unittest.SkipTest(f"{CONST.RAY_FT} is not supported") + K8S_CLUSTER_MANAGER.delete_kind_cluster() + K8S_CLUSTER_MANAGER.create_kind_cluster() + image_dict = { + CONST.RAY_IMAGE_KEY: ray_image, + CONST.OPERATOR_IMAGE_KEY: kuberay_operator_image + } + operator_manager = OperatorManager(image_dict) + operator_manager.prepare_operator() + utils.create_ray_cluster(RayFTTestCase.cluster_template_file, ray_version, ray_image) + @unittest.skip("Skip test_kill_head due to its flakiness.") def test_kill_head(self): # This test will delete head node and wait for a new replacement to # come up. - utils.shell_assert_success( + shell_subprocess_run( 'kubectl delete pod $(kubectl get pods -A | grep -e "-head" | awk "{print \$2}")') # wait for new head node to start time.sleep(80) - utils.shell_assert_success('kubectl get pods -A') + shell_subprocess_run('kubectl get pods -A') # make sure the new head is ready # shell_assert_success('kubectl wait --for=condition=Ready pod/$(kubectl get pods -A | grep -e "-head" | awk "{print \$2}") --timeout=900s') # make sure both head and worker pods are ready - rtn = utils.shell_run( - 'kubectl wait --for=condition=ready pod -l rayCluster=raycluster-compatibility-test --all --timeout=900s') + rtn = shell_subprocess_run( + 'kubectl wait --for=condition=ready pod -l rayCluster=raycluster-compatibility-test --all --timeout=900s', check = False) if rtn != 0: - utils.shell_run('kubectl get pods -A') - utils.shell_run( + shell_subprocess_run('kubectl get pods -A') + shell_subprocess_run( 'kubectl describe pod $(kubectl get pods | grep -e "-head" | awk "{print \$1}")') - utils.shell_run( + shell_subprocess_run( 'kubectl logs $(kubectl get pods | grep -e "-head" | awk "{print \$1}")') - utils.shell_run( + shell_subprocess_run( 'kubectl logs -n $(kubectl get pods -A | grep -e "-operator" | awk \'{print $1 " " $2}\')') assert rtn == 0 @@ -188,13 +201,9 @@ def test_ray_serve(self): raise Exception(f"There was an exception during the execution of test_ray_serve_1.py. The exit code is {exit_code}." + "See above for command output. The output will be printed by the function exec_run_container.") - # Initialize k8s client - config.load_kube_config() - k8s_api = client.CoreV1Api() - # KubeRay only allows at most 1 head pod per RayCluster instance at the same time. In addition, # if we have 0 head pods at this moment, it indicates that the head pod crashes unexpectedly. - headpods = utils.get_pod(k8s_api, namespace='default', label_selector='ray.io/node-type=head') + headpods = utils.get_pod(namespace='default', label_selector='ray.io/node-type=head') assert(len(headpods.items) == 1) old_head_pod = headpods.items[0] old_head_pod_name = old_head_pod.metadata.name @@ -203,10 +212,10 @@ def test_ray_serve(self): # Kill the gcs_server process on head node. If fate sharing is enabled, the whole head node pod # will terminate. exec_command = ['pkill gcs_server'] - utils.pod_exec_command(k8s_api, pod_name=old_head_pod_name, namespace='default', exec_command=exec_command) + utils.pod_exec_command(pod_name=old_head_pod_name, namespace='default', exec_command=exec_command) # Waiting for all pods become ready and running. - utils.wait_for_new_head(k8s_api, old_head_pod_name, restart_count, 'default', timeout=300, retry_interval_ms=1000) + utils.wait_for_new_head(old_head_pod_name, restart_count, 'default', timeout=300, retry_interval_ms=1000) # Try to connect to the deployed model again utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_ray_serve_2.py') @@ -234,13 +243,9 @@ def test_detached_actor(self): raise Exception(f"There was an exception during the execution of test_detached_actor_1.py. The exit code is {exit_code}." + "See above for command output. The output will be printed by the function exec_run_container.") - # Initialize k8s client - config.load_kube_config() - k8s_api = client.CoreV1Api() - # KubeRay only allows at most 1 head pod per RayCluster instance at the same time. In addition, # if we have 0 head pods at this moment, it indicates that the head pod crashes unexpectedly. - headpods = utils.get_pod(k8s_api, namespace='default', label_selector='ray.io/node-type=head') + headpods = utils.get_pod(namespace='default', label_selector='ray.io/node-type=head') assert(len(headpods.items) == 1) old_head_pod = headpods.items[0] old_head_pod_name = old_head_pod.metadata.name @@ -249,10 +254,10 @@ def test_detached_actor(self): # Kill the gcs_server process on head node. If fate sharing is enabled, the whole head node pod # will terminate. exec_command = ['pkill gcs_server'] - utils.pod_exec_command(k8s_api, pod_name=old_head_pod_name, namespace='default', exec_command=exec_command) + utils.pod_exec_command(pod_name=old_head_pod_name, namespace='default', exec_command=exec_command) # Waiting for all pods become ready and running. - utils.wait_for_new_head(k8s_api, old_head_pod_name, restart_count, 'default', timeout=300, retry_interval_ms=1000) + utils.wait_for_new_head(old_head_pod_name, restart_count, 'default', timeout=300, retry_interval_ms=1000) # Try to connect to the detached actor again. # [Note] When all pods become running and ready, the RayCluster still needs tens of seconds to relaunch actors. Hence, @@ -264,69 +269,47 @@ def test_detached_actor(self): raise Exception(f"There was an exception during the execution of test_detached_actor_2.py. The exit code is {exit_code}." + "See above for command output. The output will be printed by the function exec_run_container.") - k8s_api.api_client.rest_client.pool_manager.clear() - k8s_api.api_client.close() container.stop() docker_client.close() class RayServiceTestCase(unittest.TestCase): + """Integration tests for RayService""" service_template_file = 'tests/config/ray-service.yaml.template' - service_serve_update_template_file = 'tests/config/ray-service-serve-update.yaml.template' - service_cluster_update_template_file = 'tests/config/ray-service-cluster-update.yaml.template' + # The previous logic for testing updates was problematic. + # We need to test RayService updates. @classmethod def setUpClass(cls): - if not utils.ray_service_supported(ray_version): - raise unittest.SkipTest("ray service is not supported") - # Ray Service is running inside a local Kind environment. - # We use the Ray nightly version now. - # We wait for the serve service ready. - # The test will check the successful response from serve service. - utils.delete_cluster() - utils.create_cluster() - images = [ray_image, kuberay_operator_image, kuberay_apiserver_image] - utils.download_images(images) - utils.apply_kuberay_resources(images, kuberay_operator_image, kuberay_apiserver_image) - utils.create_kuberay_service( - RayServiceTestCase.service_template_file, ray_version, ray_image) + if not utils.is_feature_supported(ray_version, CONST.RAY_SERVICE): + raise unittest.SkipTest(f"{CONST.RAY_SERVICE} is not supported") + K8S_CLUSTER_MANAGER.delete_kind_cluster() + K8S_CLUSTER_MANAGER.create_kind_cluster() + image_dict = { + CONST.RAY_IMAGE_KEY: ray_image, + CONST.OPERATOR_IMAGE_KEY: kuberay_operator_image + } + operator_manager = OperatorManager(image_dict) + operator_manager.prepare_operator() def test_ray_serve_work(self): - time.sleep(5) - curl_cmd = 'curl -X POST -H \'Content-Type: application/json\' localhost:8000 -d \'["MANGO", 2]\'' - utils.wait_for_condition( - lambda: utils.shell_run(curl_cmd) == 0, - timeout=15, - ) - utils.create_kuberay_service( - RayServiceTestCase.service_serve_update_template_file, - ray_version, ray_image) - curl_cmd = 'curl -X POST -H \'Content-Type: application/json\' localhost:8000 -d \'["MANGO", 2]\'' - time.sleep(5) - utils.wait_for_condition( - lambda: utils.shell_run(curl_cmd) == 0, - timeout=60, - ) - utils.create_kuberay_service( - RayServiceTestCase.service_cluster_update_template_file, - ray_version, ray_image) - time.sleep(5) - curl_cmd = 'curl -X POST -H \'Content-Type: application/json\' localhost:8000 -d \'["MANGO", 2]\'' - utils.wait_for_condition( - lambda: utils.shell_run(curl_cmd) == 0, - timeout=180, - ) - + """Create a RayService, send a request to RayService via `curl`, and compare the result.""" + cr_event = utils.create_ray_service( + RayServiceTestCase.service_template_file, ray_version, ray_image) + # When Pods are READY and RUNNING, RayService still needs tens of seconds to be ready + # for serving requests. This `sleep` function is a workaround, and should be removed + # when https://github.com/ray-project/kuberay/pull/730 is merged. + time.sleep(60) + cr_event.rulesets = [RuleSet([CurlServiceRule()])] + cr_event.check_rule_sets() def parse_environment(): - global ray_version, ray_image, kuberay_operator_image, kuberay_apiserver_image + global ray_version, ray_image, kuberay_operator_image for k, v in os.environ.items(): if k == 'RAY_IMAGE': ray_image = v ray_version = ray_image.split(':')[-1] elif k == 'OPERATOR_IMAGE': kuberay_operator_image = v - elif k == 'APISERVER_IMAGE': - kuberay_apiserver_image = v if __name__ == '__main__': @@ -334,5 +317,4 @@ def parse_environment(): logger.info('Setting Ray image to: {}'.format(ray_image)) logger.info('Setting Ray version to: {}'.format(ray_version)) logger.info('Setting KubeRay operator image to: {}'.format(kuberay_operator_image)) - logger.info('Setting KubeRay apiserver image to: {}'.format(kuberay_apiserver_image)) unittest.main(verbosity=2) diff --git a/tests/config/ray-service.yaml.template b/tests/config/ray-service.yaml.template index 4797bed4b61..9091679498d 100644 --- a/tests/config/ray-service.yaml.template +++ b/tests/config/ray-service.yaml.template @@ -8,7 +8,7 @@ spec: serveConfig: importPath: fruit.deployment_graph runtimeEnv: | - working_dir: "https://github.com/ray-project/test_dag/archive/c620251044717ace0a4c19d766d43c5099af8a77.zip" + working_dir: "https://github.com/ray-project/test_dag/archive/41d09119cbdf8450599f993f51318e9e27c59098.zip" deployments: - name: MangoStand numReplicas: 1 diff --git a/tests/config/cluster-config.yaml b/tests/framework/config/kind-config.yaml similarity index 100% rename from tests/config/cluster-config.yaml rename to tests/framework/config/kind-config.yaml diff --git a/tests/framework/prototype.py b/tests/framework/prototype.py index e894a4e2849..85b17c54341 100644 --- a/tests/framework/prototype.py +++ b/tests/framework/prototype.py @@ -4,18 +4,41 @@ import unittest import time import subprocess +from pathlib import Path import yaml from kubernetes import client, config -import docker import jsonpatch -# Utility functions +# Global variables logger = logging.getLogger(__name__) logging.basicConfig( format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', datefmt='%Y-%m-%d:%H:%M:%S', level=logging.INFO) +class CONST: + """Constants""" + __slots__ = () + # Docker images + OPERATOR_IMAGE_KEY = "kuberay-operator-image" + RAY_IMAGE_KEY = "ray-image" + + # Kubernetes API clients + K8S_CR_CLIENT_KEY = "k8s-cr-api-client" + K8S_V1_CLIENT_KEY = "k8s-v1-api-client" + + # Paths + REPO_ROOT = Path(__file__).absolute().parent.parent.parent + HELM_CHART_ROOT = REPO_ROOT.joinpath("helm-chart") + DEFAULT_KIND_CONFIG = REPO_ROOT.joinpath("tests/framework/config/kind-config.yaml") + + # Ray features + RAY_FT = "RAY_FT" + RAY_SERVICE = "RAY_SERVICE" + +CONST = CONST() + +# Utility functions def search_path(yaml_object, steps, default_value = None): """ Search the position in `yaml_object` based on steps. The following example uses @@ -38,41 +61,83 @@ def search_path(yaml_object, steps, default_value = None): return default_value return curr -# Functions for cluster preparation. -def delete_kind_cluster() -> None: - """Delete a KinD cluster""" - shell_subprocess_run("kind delete cluster") - -def create_kind_cluster(): - """Create a KinD cluster""" - shell_subprocess_run("kind create cluster --wait 900s") - -def install_crd(): - """Install Custom Resource Definition (CRD)""" - shell_subprocess_run("kubectl create -k ../../manifests/cluster-scope-resources") - -def download_images(docker_images): - """Download Docker images from DockerHub""" - docker_client = docker.from_env() - for image in docker_images: - # Only pull the image from DockerHub when the image does not - # exist in the local docker registry. - if shell_subprocess_run(f'docker image inspect {image} > /dev/null', check = False) != 0: - docker_client.images.pull(image) - docker_client.close() - -def kind_load_images(docker_images): - """Load downloaded images into KinD cluster""" - for image in docker_images: - shell_subprocess_run(f'kind load docker-image {image}') - -def install_kuberay_operator(): - """Install kuberay operator with image kuberay/operator:nightly""" - shell_subprocess_run('kubectl apply -k ../../manifests/base') - -def check_cluster_exist(): - """Check whether KinD cluster exists or not""" - return shell_subprocess_run("kubectl cluster-info --context kind-kind", check = False) == 0 +class KubernetesClusterManager: + """ + KubernetesClusterManager controlls the lifecycle of KinD cluster and Kubernetes API client. + """ + def __init__(self) -> None: + self.k8s_client_dict = {} + + def delete_kind_cluster(self) -> None: + """Delete a KinD cluster""" + shell_subprocess_run("kind delete cluster") + for _, k8s_client in self.k8s_client_dict.items(): + k8s_client.api_client.rest_client.pool_manager.clear() + k8s_client.api_client.close() + self.k8s_client_dict = {} + + def create_kind_cluster(self, kind_config = None) -> None: + """Create a KinD cluster""" + # To use NodePort service, `kind_config` needs to set `extraPortMappings` properly. + kind_config = CONST.DEFAULT_KIND_CONFIG if not kind_config else kind_config + shell_subprocess_run(f"kind create cluster --wait 900s --config {kind_config}") + config.load_kube_config() + self.k8s_client_dict.update({ + CONST.K8S_V1_CLIENT_KEY: client.CoreV1Api(), + CONST.K8S_CR_CLIENT_KEY: client.CustomObjectsApi() + }) + + def check_cluster_exist(self) -> bool: + """Check whether KinD cluster exists or not""" + return shell_subprocess_run("kubectl cluster-info --context kind-kind", check = False) == 0 + +K8S_CLUSTER_MANAGER = KubernetesClusterManager() + +class OperatorManager: + """ + OperatorManager controlls the lifecycle of KubeRay operator. It will download Docker images, + load images into an existing KinD cluster, and install CRD and KubeRay operator. + """ + def __init__(self, docker_image_dict) -> None: + for key in [CONST.OPERATOR_IMAGE_KEY, CONST.RAY_IMAGE_KEY]: + if key not in docker_image_dict: + raise Exception(f"Image {key} does not exist!") + self.docker_image_dict = docker_image_dict + + def prepare_operator(self): + """Prepare KubeRay operator for an existing KinD cluster""" + self.__kind_prepare_images() + self.__install_crd_and_operator() + + def __kind_prepare_images(self): + """Download images and load images into KinD cluster""" + def download_images(): + """Download Docker images from DockerHub""" + logger.info("Download Docker images: %s", self.docker_image_dict) + for key in self.docker_image_dict: + # Only pull the image from DockerHub when the image does not + # exist in the local docker registry. + image = self.docker_image_dict[key] + if shell_subprocess_run( + f'docker image inspect {image} > /dev/null', check = False) != 0: + shell_subprocess_run(f'docker pull {image}') + else: + logger.info("Image %s exists", image) + + download_images() + logger.info("Load images into KinD cluster") + for key in self.docker_image_dict: + image = self.docker_image_dict[key] + shell_subprocess_run(f'kind load docker-image {image}') + + def __install_crd_and_operator(self): + """Install both CRD and KubeRay operator by kuberay-operator chart""" + logger.info("Install both CRD and KubeRay operator by kuberay-operator chart") + repo, tag = self.docker_image_dict[CONST.OPERATOR_IMAGE_KEY].split(':') + shell_subprocess_run( + f"helm install kuberay-operator {CONST.HELM_CHART_ROOT}/kuberay-operator/ " + f"--set image.repository={repo},image.tag={tag}" + ) def check_pod_running(pods) -> bool: """"Check whether all of the pods are in running state""" @@ -82,11 +147,22 @@ def check_pod_running(pods) -> bool: return True def shell_subprocess_run(command, check = True): - """Command will be executed through the shell. - If check=True, it will raise an error when the returncode of the execution is not 0""" + """ + Command will be executed through the shell. If check=True, it will raise an error when + the returncode of the execution is not 0. + """ logger.info("Execute command: %s", command) return subprocess.run(command, shell = True, check = check).returncode +def shell_subprocess_check_output(command): + """ + Run command and return STDOUT as encoded bytes. + """ + logger.info("Execute command (check_output): %s", command) + output = subprocess.check_output(command, shell=True) + logger.info("Output: %s", output) + return output + def get_expected_head_pods(custom_resource): """Get the number of head pods in custom_resource""" resource_kind = custom_resource["kind"] @@ -121,14 +197,14 @@ def show_cluster_info(cr_namespace): """Show system information""" shell_subprocess_run(f'kubectl get all -n={cr_namespace}') shell_subprocess_run(f'kubectl describe pods -n={cr_namespace}') + # With "--tail=-1", every line in the log will be printed. The default value of "tail" is not + # -1 when using selector. shell_subprocess_run(f'kubectl logs -n={cr_namespace} -l ray.io/node-type=head --tail=-1') - # --tail=-1 print all the lines in the log. - # Added because the default value when using selecter is not -1 - operator_namespace = subprocess.check_output('kubectl get pods ' + operator_namespace = shell_subprocess_check_output('kubectl get pods ' '-l app.kubernetes.io/component=kuberay-operator -A ' - '-o jsonpath={".items[0].metadata.namespace"}', shell=True) + '-o jsonpath={".items[0].metadata.namespace"}') shell_subprocess_run("kubectl logs -l app.kubernetes.io/component=kuberay-operator -n " - f'{operator_namespace.decode("utf-8") } --tail=-1') + f'{operator_namespace.decode("utf-8")} --tail=-1') # Configuration Test Framework Abstractions: (1) Mutator (2) Rule (3) RuleSet (4) CREvent class Mutator: @@ -223,26 +299,28 @@ def trigger_condition(self, custom_resource=None) -> bool: def assert_rule(self, custom_resource=None, cr_namespace='default'): expected_val = search_path(custom_resource, "spec.headGroupSpec.template.spec.containers.0.name".split('.')) - headpods = client.CoreV1Api().list_namespaced_pod( + headpods = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY].list_namespaced_pod( namespace = cr_namespace, label_selector='ray.io/node-type=head') assert headpods.items[0].spec.containers[0].name == expected_val class HeadSvcRule(Rule): """The labels of the head pod and the selectors of the head service must match.""" def assert_rule(self, custom_resource=None, cr_namespace='default'): - head_services = client.CoreV1Api().list_namespaced_service( + k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] + head_services = k8s_v1_api.list_namespaced_service( namespace= cr_namespace, label_selector="ray.io/node-type=head") assert len(head_services.items) == 1 selector_dict = head_services.items[0].spec.selector selector = ','.join(map(lambda key: f"{key}={selector_dict[key]}", selector_dict)) - headpods = client.CoreV1Api().list_namespaced_pod( + headpods = k8s_v1_api.list_namespaced_pod( namespace =cr_namespace, label_selector=selector) assert len(headpods.items) == 1 class EasyJobRule(Rule): """Submit a very simple Ray job to test the basic functionality of the Ray cluster.""" def assert_rule(self, custom_resource=None, cr_namespace='default'): - headpods = client.CoreV1Api().list_namespaced_pod( + k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] + headpods = k8s_v1_api.list_namespaced_pod( namespace = cr_namespace, label_selector='ray.io/node-type=head') headpod_name = headpods.items[0].metadata.name shell_subprocess_run(f"kubectl exec {headpod_name} -n {cr_namespace} --" + @@ -255,18 +333,19 @@ def assert_rule(self, custom_resource=None, cr_namespace='default'): shell_subprocess_run(f"kubectl run curl --image=radial/busyboxplus:curl -n {cr_namespace} " "--command -- /bin/sh -c \"while true; do sleep 10;done\"") success_create = False + k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] for _ in range(30): - resp = client.CoreV1Api().read_namespaced_pod(name="curl", namespace=cr_namespace) + resp = k8s_v1_api.read_namespaced_pod(name="curl", namespace=cr_namespace) if resp.status.phase != 'Pending': success_create = True break time.sleep(1) if not success_create: raise Exception("CurlServiceRule create curl pod timeout") - output = subprocess.check_output(f"kubectl exec curl -n {cr_namespace} " + output = shell_subprocess_check_output(f"kubectl exec curl -n {cr_namespace} " "-- curl -X POST -H 'Content-Type: application/json' " f"{custom_resource['metadata']['name']}-serve-svc.{cr_namespace}.svc.cluster.local:8000" - " -d '[\"MANGO\", 2]'", shell=True) + " -d '[\"MANGO\", 2]'") assert output == b'6' shell_subprocess_run(f"kubectl delete pod curl -n {cr_namespace}") @@ -274,12 +353,10 @@ class RayClusterAddCREvent(CREvent): """CREvent for RayCluster addition""" def exec(self): if not self.filepath: - k8s_cr_api = client.CustomObjectsApi() + k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] k8s_cr_api.create_namespaced_custom_object( group = 'ray.io',version = 'v1alpha1', namespace = self.namespace, plural = 'rayclusters', body = self.custom_resource_object) - k8s_cr_api.api_client.rest_client.pool_manager.clear() - k8s_cr_api.api_client.close() else: shell_subprocess_run(f"kubectl apply -n {self.namespace} -f {self.filepath}") @@ -291,7 +368,7 @@ def wait(self): # (1) The number of head pods and worker pods are as expected. # (2) All head pods and worker pods are "Running". converge = False - k8s_v1_api = client.CoreV1Api() + k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] for _ in range(self.timeout): headpods = k8s_v1_api.list_namespaced_pod( namespace = self.namespace, label_selector='ray.io/node-type=head') @@ -304,10 +381,6 @@ def wait(self): logger.info("--- RayClusterAddCREvent %s seconds ---", time.time() - start_time) break time.sleep(1) - # I hope to move k8s_v1_api to constructor and close it in the destructor, - # but test_sample_raycluster_yamls.py will fail with unknown reasons. - k8s_v1_api.api_client.rest_client.pool_manager.clear() - k8s_v1_api.api_client.close() if not converge: logger.info("RayClusterAddCREvent wait() failed to converge in %d seconds.", @@ -319,15 +392,13 @@ def wait(self): def clean_up(self): """Delete added RayCluster""" - k8s_cr_api = client.CustomObjectsApi() + k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] k8s_cr_api.delete_namespaced_custom_object( group = 'ray.io', version = 'v1alpha1', namespace = self.namespace, plural = 'rayclusters', name = self.custom_resource_object['metadata']['name']) - k8s_cr_api.api_client.rest_client.pool_manager.clear() - k8s_cr_api.api_client.close() # Wait pods to be deleted converge = False - k8s_v1_api = client.CoreV1Api() + k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] start_time = time.time() for _ in range(self.timeout): headpods = k8s_v1_api.list_namespaced_pod( @@ -339,8 +410,7 @@ def clean_up(self): logger.info("--- Cleanup RayCluster %s seconds ---", time.time() - start_time) break time.sleep(1) - k8s_v1_api.api_client.rest_client.pool_manager.clear() - k8s_v1_api.api_client.close() + if not converge: logger.info("RayClusterAddCREvent clean_up() failed to converge in %d seconds.", self.timeout) @@ -353,12 +423,10 @@ class RayServiceAddCREvent(CREvent): def exec(self): """Wait for RayService to converge""""" if not self.filepath: - k8s_cr_api = client.CustomObjectsApi() + k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] k8s_cr_api.create_namespaced_custom_object( group = 'ray.io',version = 'v1alpha1', namespace = self.namespace, plural = 'rayservices', body = self.custom_resource_object) - k8s_cr_api.api_client.rest_client.pool_manager.clear() - k8s_cr_api.api_client.close() else: shell_subprocess_run(f"kubectl apply -n {self.namespace} -f {self.filepath}") @@ -372,7 +440,7 @@ def wait(self): # (2) All head pods and worker pods are "Running". # (3) Service named "rayservice-sample-serve" presents converge = False - k8s_v1_api = client.CoreV1Api() + k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] for _ in range(self.timeout): headpods = k8s_v1_api.list_namespaced_pod( namespace = self.namespace, label_selector='ray.io/node-type=head') @@ -388,10 +456,6 @@ def wait(self): logger.info("--- RayServiceAddCREvent %s seconds ---", time.time() - start_time) break time.sleep(1) - # I hope to move k8s_v1_api to constructor and close it in the destructor, - # but test_sample_raycluster_yamls.py will fail with unknown reasons. - k8s_v1_api.api_client.rest_client.pool_manager.clear() - k8s_v1_api.api_client.close() if not converge: logger.info("RayServiceAddCREvent wait() failed to converge in %d seconds.", @@ -403,15 +467,13 @@ def wait(self): def clean_up(self): """Delete added RayService""" - k8s_cr_api = client.CustomObjectsApi() + k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] k8s_cr_api.delete_namespaced_custom_object( group = 'ray.io', version = 'v1alpha1', namespace = self.namespace, plural = 'rayservices', name = self.custom_resource_object['metadata']['name']) - k8s_cr_api.api_client.rest_client.pool_manager.clear() - k8s_cr_api.api_client.close() # Wait pods to be deleted converge = False - k8s_v1_api = client.CoreV1Api() + k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] start_time = time.time() for _ in range(self.timeout): headpods = k8s_v1_api.list_namespaced_pod( @@ -423,8 +485,7 @@ def clean_up(self): logger.info("--- Cleanup RayService %s seconds ---", time.time() - start_time) break time.sleep(1) - k8s_v1_api.api_client.rest_client.pool_manager.clear() - k8s_v1_api.api_client.close() + if not converge: logger.info("RayServiceAddCREvent clean_up() failed to converge in %d seconds.", self.timeout) @@ -434,23 +495,19 @@ def clean_up(self): class GeneralTestCase(unittest.TestCase): """TestSuite""" - def __init__(self, methodName, docker_images, cr_event): + def __init__(self, methodName, docker_image_dict, cr_event): super().__init__(methodName) self.cr_event = cr_event - self.images = docker_images + self.operator_manager = OperatorManager(docker_image_dict) @classmethod def setUpClass(cls): - delete_kind_cluster() + K8S_CLUSTER_MANAGER.delete_kind_cluster() def setUp(self): - if not check_cluster_exist(): - create_kind_cluster() - install_crd() - download_images(self.images) - kind_load_images(self.images) - install_kuberay_operator() - config.load_kube_config() + if not K8S_CLUSTER_MANAGER.check_cluster_exist(): + K8S_CLUSTER_MANAGER.create_kind_cluster() + self.operator_manager.prepare_operator() def runtest(self): """Run a configuration test""" @@ -461,7 +518,7 @@ def tearDown(self) -> None: self.cr_event.clean_up() except Exception as ex: logger.error(str(ex)) - delete_kind_cluster() + K8S_CLUSTER_MANAGER.delete_kind_cluster() if __name__ == '__main__': TEMPLATE_NAME = 'config/ray-cluster.mini.yaml.template' @@ -506,11 +563,14 @@ def tearDown(self) -> None: rs = RuleSet([HeadPodNameRule(), EasyJobRule(), HeadSvcRule()]) mut = Mutator(base_cr, patch_list) - images = ['rayproject/ray:2.0.0', 'kuberay/operator:nightly', 'kuberay/apiserver:nightly'] + image_dict = { + CONST.RAY_IMAGE_KEY: 'rayproject/ray:2.0.0', + CONST.OPERATOR_IMAGE_KEY: 'kuberay/operator:nightly' + } test_cases = unittest.TestSuite() for new_cr in mut.mutate(): addEvent = RayClusterAddCREvent(new_cr, [rs], 90, NAMESPACE) - test_cases.addTest(GeneralTestCase('runtest', images, addEvent)) + test_cases.addTest(GeneralTestCase('runtest', image_dict, addEvent)) runner = unittest.TextTestRunner() runner.run(test_cases) diff --git a/tests/framework/test_sample_raycluster_yamls.py b/tests/framework/test_sample_raycluster_yamls.py index c1dd1b29f0e..e6f5be2e09c 100644 --- a/tests/framework/test_sample_raycluster_yamls.py +++ b/tests/framework/test_sample_raycluster_yamls.py @@ -11,6 +11,7 @@ HeadPodNameRule, EasyJobRule, HeadSvcRule, + CONST ) logger = logging.getLogger(__name__) @@ -53,12 +54,11 @@ } rs = RuleSet([HeadPodNameRule(), EasyJobRule(), HeadSvcRule()]) - images = [ - os.getenv('RAY_IMAGE', default='rayproject/ray:2.0.0'), - os.getenv('OPERATOR_IMAGE', default='kuberay/operator:nightly'), - os.getenv('APISERVER_IMAGE', default='kuberay/apiserver:nightly') - ] - logger.info(images) + image_dict = { + CONST.RAY_IMAGE_KEY: os.getenv('RAY_IMAGE', default='rayproject/ray:2.0.0'), + CONST.OPERATOR_IMAGE_KEY: os.getenv('OPERATOR_IMAGE', default='kuberay/operator:nightly'), + } + logger.info(image_dict) # Build a test plan logger.info("Build a test plan ...") test_cases = unittest.TestSuite() @@ -68,7 +68,7 @@ continue logger.info('[TEST %d]: %s', index, new_cr['name']) addEvent = RayClusterAddCREvent(new_cr['cr'], [rs], 90, NAMESPACE, new_cr['path']) - test_cases.addTest(GeneralTestCase('runtest', images, addEvent)) + test_cases.addTest(GeneralTestCase('runtest', image_dict, addEvent)) # Execute all tests runner = unittest.TextTestRunner() diff --git a/tests/framework/test_sample_rayservice_yamls.py b/tests/framework/test_sample_rayservice_yamls.py index 5f1598e8a77..9d790972a10 100644 --- a/tests/framework/test_sample_rayservice_yamls.py +++ b/tests/framework/test_sample_rayservice_yamls.py @@ -10,6 +10,7 @@ RayServiceAddCREvent, EasyJobRule, CurlServiceRule, + CONST ) @@ -29,19 +30,19 @@ ) rs = RuleSet([EasyJobRule(), CurlServiceRule()]) - images = [ - os.getenv('RAY_IMAGE', default='rayproject/ray:2.0.0'), - os.getenv('OPERATOR_IMAGE', default='kuberay/operator:nightly'), - os.getenv('APISERVER_IMAGE', default='kuberay/apiserver:nightly') - ] - logger.info(images) + image_dict = { + CONST.RAY_IMAGE_KEY: os.getenv('RAY_IMAGE', default='rayproject/ray:2.0.0'), + CONST.OPERATOR_IMAGE_KEY: os.getenv('OPERATOR_IMAGE', default='kuberay/operator:nightly'), + } + logger.info(image_dict) + # Build a test plan logger.info("Build a test plan ...") test_cases = unittest.TestSuite() for index, new_cr in enumerate(sample_yaml_files): logger.info('[TEST %d]: %s', index, new_cr['name']) addEvent = RayServiceAddCREvent(new_cr['cr'], [rs], 90, NAMESPACE, new_cr['path']) - test_cases.addTest(GeneralTestCase('runtest', images, addEvent)) + test_cases.addTest(GeneralTestCase('runtest', image_dict, addEvent)) # Execute all tests runner = unittest.TextTestRunner() diff --git a/tests/kuberay_utils/utils.py b/tests/kuberay_utils/utils.py index 20197dca1fe..9d33ea7d7c5 100644 --- a/tests/kuberay_utils/utils.py +++ b/tests/kuberay_utils/utils.py @@ -7,13 +7,16 @@ import time import tempfile import yaml -import docker from kubernetes.stream import stream -from kubernetes import config -from framework.prototype import RayClusterAddCREvent +from framework.prototype import ( + CONST, + K8S_CLUSTER_MANAGER, + RayClusterAddCREvent, + RayServiceAddCREvent, + shell_subprocess_run +) -kindcluster_config_file = 'tests/config/cluster-config.yaml' raycluster_service_file = 'tests/config/raycluster-service.yaml' logger = logging.getLogger(__name__) @@ -23,70 +26,16 @@ level=logging.INFO ) - -def parse_ray_version(version_str): - tmp = version_str.split('.') - assert len(tmp) == 3 - major = int(tmp[0]) - minor = int(tmp[1]) - patch = int(tmp[2]) - return major, minor, patch - - -def ray_ft_supported(ray_version): +def is_feature_supported(ray_version, feature): + """Return True if `feature` is supported in `ray_version`""" if ray_version == "nightly": return True - major, minor, _ = parse_ray_version(ray_version) - return major * 100 + minor > 113 - - -def ray_service_supported(ray_version): - if ray_version == "nightly": - return True - major, minor, _ = parse_ray_version(ray_version) - return major * 100 + minor > 113 - - -def shell_run(cmd, silent = False): - logger.info('executing cmd: {}'.format(cmd)) - if silent: - cmd += ' > /dev/null' - return os.system(cmd) - + major, minor, _ = [int(s) for s in ray_version.split('.')] + if feature in [CONST.RAY_FT, CONST.RAY_SERVICE]: + return major * 100 + minor > 113 + return False -def shell_assert_success(cmd): - assert shell_run(cmd) == 0 - - -def shell_assert_failure(cmd): - assert shell_run(cmd) != 0 - - -def create_cluster(): - """Create a KinD cluster""" - # Use `--wait 10m` flag to block until the control plane reaches a ready status. - shell_assert_success('kind create cluster --wait 10m --config {}'.format(kindcluster_config_file)) - rtn = shell_run('kubectl wait --for=condition=ready pod -n kube-system --all --timeout=300s') - if rtn != 0: - shell_run('kubectl get pods -A') - # Initialize Kubernetes config - config.load_kube_config() - assert rtn == 0 - - -def apply_kuberay_resources(images, kuberay_operator_image, kuberay_apiserver_image): - for image in images: - shell_assert_success('kind load docker-image {}'.format(image)) - shell_assert_success('kubectl create -k manifests/cluster-scope-resources') - # use kustomize to build the yaml, then change the image to the one we want to testing. - shell_assert_success( - ('rm -f kustomization.yaml && kustomize create --resources manifests/base && ' + - 'kustomize edit set image ' + - 'kuberay/operator:nightly={0} kuberay/apiserver:nightly={1} && ' + - 'kubectl apply -k .').format(kuberay_operator_image, kuberay_apiserver_image)) - - -def create_kuberay_cluster(template_name, ray_version, ray_image): +def create_ray_cluster(template_name, ray_version, ray_image): """Create a RayCluster and a NodePort service.""" context = {} with open(template_name, encoding="utf-8") as ray_cluster_template: @@ -105,76 +54,52 @@ def create_kuberay_cluster(template_name, ray_version, ray_image): try: # Deploy a NodePort service to expose ports for users. - shell_assert_success(f'kubectl apply -f {raycluster_service_file}') + shell_subprocess_run(f'kubectl apply -f {raycluster_service_file}') # Create a RayCluster ray_cluster_add_event = RayClusterAddCREvent( - custom_resource_object = context['cr'], + custom_resource_object = context['cr'], rulesets = [], timeout = 90, namespace='default', filepath = context['filepath'] ) ray_cluster_add_event.trigger() - return + return ray_cluster_add_event except Exception as ex: - # RayClusterAddCREvent fails to converge. - logger.error(str(ex)) - shell_run('kubectl describe pod $(kubectl get pods | grep -e "-head" | awk "{print \$1}")') - shell_run('kubectl logs $(kubectl get pods | grep -e "-head" | awk "{print \$1}")') - shell_run('kubectl logs -n $(kubectl get pods -A | grep -e "-operator" | awk \'{print $1 " " $2}\')') - raise Exception("create_kuberay_cluster fails") - -def create_kuberay_service(template_name, ray_version, ray_image): - template = None - with open(template_name, mode='r') as f: - template = Template(f.read()) - - rayservice_spec_buf = template.substitute( - {'ray_image': ray_image, 'ray_version': ray_version}) - - service_config_file = None - with tempfile.NamedTemporaryFile('w', delete=False) as f: - f.write(rayservice_spec_buf) - service_config_file = f.name + logger.error(f"RayClusterAddCREvent fails to converge: {str(ex)}") + raise Exception("create_ray_cluster fails") - rtn = shell_run( - 'kubectl wait --for=condition=ready pod -n ray-system --all --timeout=900s') - if rtn != 0: - shell_run('kubectl get pods -A') - assert rtn == 0 - assert service_config_file is not None - shell_assert_success('kubectl apply -f {}'.format(service_config_file)) - - shell_run('kubectl get pods -A') - - time.sleep(20) - - shell_assert_success('kubectl apply -f {}'.format(raycluster_service_file)) - - wait_for_condition( - lambda: shell_run( - 'kubectl get service rayservice-sample-serve-svc -o jsonpath="{.status}"') == 0, - timeout=900, - retry_interval_ms=5000, - ) - - -def delete_cluster(): - shell_run('kind delete cluster') +def create_ray_service(template_name, ray_version, ray_image): + """Create a RayService without a NodePort service.""" + context = {} + with open(template_name, encoding="utf-8") as ray_service_template: + template = Template(ray_service_template.read()) + yamlfile = template.substitute( + {'ray_image': ray_image, 'ray_version': ray_version} + ) + with tempfile.NamedTemporaryFile('w', delete=False) as ray_service_yaml: + ray_service_yaml.write(yamlfile) + context['filepath'] = ray_service_yaml.name + for k8s_object in yaml.safe_load_all(yamlfile): + if k8s_object['kind'] == 'RayService': + context['cr'] = k8s_object + break -def download_images(images): - """Pull images from DockerHub if do not exist.""" - client = docker.from_env() - for image in images: - if shell_run(f'docker image inspect {image}', silent=True) != 0: - # Only pull the image from DockerHub when the image does not - # exist in the local docker registry. - logger.info("Download docker image %s", image) - client.images.pull(image) - else: - logger.info("Image %s exists", image) - client.close() + try: + # Create a RayService + ray_service_add_event = RayServiceAddCREvent( + custom_resource_object = context['cr'], + rulesets = [], + timeout = 90, + namespace='default', + filepath = context['filepath'] + ) + ray_service_add_event.trigger() + return ray_service_add_event + except Exception as ex: + logger.error(f"RayServiceAddCREvent fails to converge: {str(ex)}") + raise Exception("create_ray_service fails") def copy_to_container(container, src, dest, filename): oldpwd = os.getcwd() @@ -230,12 +155,15 @@ def wait_for_condition( message += f" Last exception: {last_ex}" raise RuntimeError(message) -def get_pod(k8s_api, namespace, label_selector): - return k8s_api.list_namespaced_pod(namespace = namespace, label_selector = label_selector) +def get_pod(namespace, label_selector): + return K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY].list_namespaced_pod( + namespace = namespace, label_selector = label_selector + ) -def pod_exec_command(k8s_api, pod_name, namespace, exec_command, stderr=True, stdin=False, stdout=True, tty=False, silent=False): +def pod_exec_command(pod_name, namespace, exec_command, stderr=True, stdin=False, stdout=True, tty=False, silent=False): exec_command = ['/bin/sh', '-c'] + exec_command - resp = stream(k8s_api.connect_get_namespaced_pod_exec, + k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] + resp = stream(k8s_v1_api.connect_get_namespaced_pod_exec, pod_name, namespace, command=exec_command, @@ -246,7 +174,7 @@ def pod_exec_command(k8s_api, pod_name, namespace, exec_command, stderr=True, st logger.info(f"response: {resp}") return resp -def wait_for_new_head(k8s_api, old_head_pod_name, old_restart_count, namespace, timeout, retry_interval_ms): +def wait_for_new_head(old_head_pod_name, old_restart_count, namespace, timeout, retry_interval_ms): """ `wait_for_new_head` is used to wait for new head is ready and running. For example, `test_detached_actor` kills the gcs_server process on the head pod. It takes nearly 1 min to kill the head pod, and the head pod will still @@ -264,7 +192,6 @@ def wait_for_new_head(k8s_api, old_head_pod_name, old_restart_count, namespace, become ready to serve new connections from ray clients. So, users need to retry until a Ray client connection succeeds. Args: - k8s_api: Kubernetes client (e.g. client.CoreV1Api()) old_head_pod_name: Name of the old head pod. old_restart_count: If the Pod is restarted by Kubernetes Pod RestartPolicy, the restart_count will increase by 1. namespace: Namespace that the head pod is running in. @@ -274,13 +201,14 @@ def wait_for_new_head(k8s_api, old_head_pod_name, old_restart_count, namespace, Raises: RuntimeError: If the condition is not met before the timeout expires, raise the RuntimeError. """ - def check_status(k8s_api, old_head_pod_name, old_restart_count, namespace) -> bool: - all_pods = k8s_api.list_namespaced_pod(namespace = namespace) - headpods = get_pod(k8s_api, namespace=namespace, label_selector='ray.io/node-type=head') + k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] + def check_status(old_head_pod_name, old_restart_count, namespace) -> bool: + all_pods = k8s_v1_api.list_namespaced_pod(namespace = namespace) + headpods = get_pod(namespace=namespace, label_selector='ray.io/node-type=head') # KubeRay only allows at most 1 head pod per RayCluster instance at the same time. On the other # hands, when we kill a worker, the operator will reconcile a new one immediately without waiting # for the Pod termination to complete. Hence, it is possible to have more than `worker.Replicas` - # worker pods in the cluster. + # worker pods in the cluster. if len(headpods.items) != 1: logger.info('Number of headpods is not equal to 1.') return False @@ -318,7 +246,7 @@ def check_status(k8s_api, old_head_pod_name, old_restart_count, namespace) -> bo logger.info(f'Container {c.name} in {pod.metadata.name} is not ready.') return False return True - wait_for_condition(check_status, timeout=timeout, retry_interval_ms=retry_interval_ms, k8s_api=k8s_api, + wait_for_condition(check_status, timeout=timeout, retry_interval_ms=retry_interval_ms, old_head_pod_name=old_head_pod_name, old_restart_count=old_restart_count, namespace=namespace) # After the cluster state converges, ray processes still need tens of seconds to become ready. # TODO (kevin85421): Make ray processes become ready when pods are "Ready" and "Running".