diff --git a/tests/compatibility-test.py b/tests/compatibility-test.py index b65835b617c..b59188398a8 100755 --- a/tests/compatibility-test.py +++ b/tests/compatibility-test.py @@ -174,99 +174,50 @@ def test_kill_head(self): assert rtn == 0 def test_ray_serve(self): - client = docker.from_env() - container = client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True, - network_mode='host', command=["/bin/sh", "-c", "python"]) - s = container.attach_socket( - params={'stdin': 1, 'stream': 1, 'stdout': 1, 'stderr': 1}) - s._sock.setblocking(0) - s._sock.sendall(b''' -import ray -import time -import ray.serve as serve -import os -import requests -from ray._private.test_utils import wait_for_condition + docker_client = docker.from_env() + container = docker_client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True, + network_mode='host', command=["/bin/sh"]) + # Deploy a model with ray serve + ray_namespace = ''.join(random.choices(string.ascii_lowercase, k=10)) + logger.info(f'namespace: {ray_namespace}') -def retry_with_timeout(func, count=90): - tmp = 0 - err = None - while tmp < count: - try: - return func() - except Exception as e: - err = e - tmp += 1 - assert err is not None - raise err + utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_ray_serve_1.py') + exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_1.py {ray_namespace}', timeout_sec = 180) -ray.init(address='ray://127.0.0.1:10001') + if exit_code != 0: + raise Exception(f"There was an exception during the execution of test_ray_serve_1.py. The exit code is {exit_code}." + + "See above for command output. The output will be printed by the function exec_run_container.") -@serve.deployment -def d(*args): - return f"{os.getpid()}" - -d.deploy() -pid1 = ray.get(d.get_handle().remote()) - -print('ready') - ''') - - count = 0 - while count < 90: - try: - buf = s._sock.recv(4096) - logger.info(buf.decode()) - if buf.decode().find('ready') != -1: - break - except Exception as e: - pass - time.sleep(1) - count += 1 - if count >= 90: - raise Exception('failed to run script') - - # kill the gcs on head node. If fate sharing is enabled - # the whole head node pod will terminate. - utils.shell_assert_success( - 'kubectl exec -it $(kubectl get pods -A| grep -e "-head" | awk "{print \\$2}") -- /bin/bash -c "ps aux | grep gcs_server | grep -v grep | awk \'{print \$2}\' | xargs kill"') - # wait for new head node getting created - time.sleep(10) - # make sure the new head is ready - utils.shell_assert_success( - 'kubectl wait --for=condition=Ready pod/$(kubectl get pods -A | grep -e "-head" | awk "{print \$2}") --timeout=900s') - - s._sock.sendall(b''' -def get_new_value(): - return ray.get(d.get_handle().remote()) -pid2 = retry_with_timeout(get_new_value) - -if pid1 == pid2: - print('successful: {} {}'.format(pid1, pid2)) - sys.exit(0) -else: - print('failed: {} {}'.format(pid1, pid2)) - raise Exception('failed') - ''') - - count = 0 - while count < 90: - try: - buf = s._sock.recv(4096) - logger.info(buf.decode()) - if buf.decode().find('successful') != -1: - break - if buf.decode().find('failed') != -1: - raise Exception('test failed {}'.format(buf.decode())) - except Exception as e: - pass - time.sleep(1) - count += 1 - if count >= 90: - raise Exception('failed to run script') + # Initialize k8s client + config.load_kube_config() + k8s_api = client.CoreV1Api() + + # KubeRay only allows at most 1 head pod per RayCluster instance at the same time. In addition, + # if we have 0 head pods at this moment, it indicates that the head pod crashes unexpectedly. + headpods = utils.get_pod(k8s_api, namespace='default', label_selector='rayNodeType=head') + assert(len(headpods.items) == 1) + old_head_pod = headpods.items[0] + old_head_pod_name = old_head_pod.metadata.name + restart_count = old_head_pod.status.container_statuses[0].restart_count + + # Kill the gcs_server process on head node. If fate sharing is enabled, the whole head node pod + # will terminate. + exec_command = ['pkill gcs_server'] + utils.pod_exec_command(k8s_api, pod_name=old_head_pod_name, namespace='default', exec_command=exec_command) + + # Waiting for all pods become ready and running. + utils.wait_for_new_head(k8s_api, old_head_pod_name, restart_count, 'default', timeout=300, retry_interval_ms=1000) + + # Try to connect to the deployed model again + utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_ray_serve_2.py') + exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_2.py {ray_namespace}', timeout_sec = 180) + + if exit_code != 0: + raise Exception(f"There was an exception during the execution of test_ray_serve_2.py. The exit code is {exit_code}." + + "See above for command output. The output will be printed by the function exec_run_container.") container.stop() - client.close() + docker_client.close() def test_detached_actor(self): docker_client = docker.from_env() diff --git a/tests/scripts/test_ray_serve_1.py b/tests/scripts/test_ray_serve_1.py new file mode 100644 index 00000000000..01fb8cc47fe --- /dev/null +++ b/tests/scripts/test_ray_serve_1.py @@ -0,0 +1,23 @@ +import requests +from starlette.requests import Request +import ray +from ray import serve +import time +import sys + +# 1: Define a Ray Serve model. +@serve.deployment(route_prefix="/") +class MyModelDeployment: + def __init__(self, msg: str): + self._msg = msg + + def __call__(self): + return self._msg + +ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1]) +# 2: Deploy the model. +handle = serve.run(MyModelDeployment.bind(msg="Hello world!")) +# 3: Query the deployment and print the result. +val = ray.get(handle.remote()) +print(val) +assert(val == "Hello world!") diff --git a/tests/scripts/test_ray_serve_2.py b/tests/scripts/test_ray_serve_2.py new file mode 100644 index 00000000000..abcd251adfe --- /dev/null +++ b/tests/scripts/test_ray_serve_2.py @@ -0,0 +1,22 @@ +import ray +import time +import sys +from ray import serve + +def retry_with_timeout(func, timeout=90): + err = None + for _ in range(timeout): + try: + return func() + except BaseException as e: + err = e + finally: + time.sleep(1) + raise err + +retry_with_timeout(lambda: ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1])) +retry_with_timeout(lambda:serve.start(detached=True)) +val = retry_with_timeout(lambda: ray.get(serve.get_deployment("MyModelDeployment").get_handle().remote())) +print(val) +assert(val == "Hello world!") +