From 91e8eae9c9cddd5ab6b7cf942c99e6af04294959 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 22 Oct 2022 23:52:58 +0000 Subject: [PATCH 1/5] Update-ray-serve-test --- tests/compatibility-test.py | 101 ++++++------------------------ tests/scripts/test_ray_serve_1.py | 17 +++++ tests/scripts/test_ray_serve_2.py | 26 ++++++++ 3 files changed, 62 insertions(+), 82 deletions(-) create mode 100644 tests/scripts/test_ray_serve_1.py create mode 100644 tests/scripts/test_ray_serve_2.py diff --git a/tests/compatibility-test.py b/tests/compatibility-test.py index b65835b617c..af3b68b51fe 100755 --- a/tests/compatibility-test.py +++ b/tests/compatibility-test.py @@ -176,94 +176,31 @@ def test_kill_head(self): def test_ray_serve(self): client = docker.from_env() container = client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True, - network_mode='host', command=["/bin/sh", "-c", "python"]) - s = container.attach_socket( - params={'stdin': 1, 'stream': 1, 'stdout': 1, 'stderr': 1}) - s._sock.setblocking(0) - s._sock.sendall(b''' -import ray -import time -import ray.serve as serve -import os -import requests -from ray._private.test_utils import wait_for_condition + network_mode='host', command=["/bin/sh"]) + # Deploy a model with ray serve + ray_namespace = ''.join(random.choices(string.ascii_lowercase, k=10)) + logger.info(f'namespace: {ray_namespace}') -def retry_with_timeout(func, count=90): - tmp = 0 - err = None - while tmp < count: - try: - return func() - except Exception as e: - err = e - tmp += 1 - assert err is not None - raise err + utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_ray_serve_1.py') + exit_code, output = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_1.py {ray_namespace}', timeout_sec = 180) -ray.init(address='ray://127.0.0.1:10001') + if exit_code != 0: + raise Exception(f"There was an exception during the execution of test_ray_serve_1.py. The exit code is {exit_code}." + + "See above for command output. The output will be printed by the function exec_run_container.") -@serve.deployment -def d(*args): - return f"{os.getpid()}" - -d.deploy() -pid1 = ray.get(d.get_handle().remote()) - -print('ready') - ''') - - count = 0 - while count < 90: - try: - buf = s._sock.recv(4096) - logger.info(buf.decode()) - if buf.decode().find('ready') != -1: - break - except Exception as e: - pass - time.sleep(1) - count += 1 - if count >= 90: - raise Exception('failed to run script') - - # kill the gcs on head node. If fate sharing is enabled - # the whole head node pod will terminate. + # kill the gcs on head node. If fate sharing is enabled, the whole head node pod will terminate. utils.shell_assert_success( 'kubectl exec -it $(kubectl get pods -A| grep -e "-head" | awk "{print \\$2}") -- /bin/bash -c "ps aux | grep gcs_server | grep -v grep | awk \'{print \$2}\' | xargs kill"') # wait for new head node getting created - time.sleep(10) - # make sure the new head is ready - utils.shell_assert_success( - 'kubectl wait --for=condition=Ready pod/$(kubectl get pods -A | grep -e "-head" | awk "{print \$2}") --timeout=900s') - - s._sock.sendall(b''' -def get_new_value(): - return ray.get(d.get_handle().remote()) -pid2 = retry_with_timeout(get_new_value) - -if pid1 == pid2: - print('successful: {} {}'.format(pid1, pid2)) - sys.exit(0) -else: - print('failed: {} {}'.format(pid1, pid2)) - raise Exception('failed') - ''') - - count = 0 - while count < 90: - try: - buf = s._sock.recv(4096) - logger.info(buf.decode()) - if buf.decode().find('successful') != -1: - break - if buf.decode().find('failed') != -1: - raise Exception('test failed {}'.format(buf.decode())) - except Exception as e: - pass - time.sleep(1) - count += 1 - if count >= 90: - raise Exception('failed to run script') + time.sleep(180) + + # Try to connect to the deployed model again + utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_ray_serve_2.py') + exit_code, output = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_2.py {ray_namespace}', timeout_sec = 180) + + if exit_code != 0: + raise Exception(f"There was an exception during the execution of test_ray_serve_2.py. The exit code is {exit_code}." + + "See above for command output. The output will be printed by the function exec_run_container.") container.stop() client.close() diff --git a/tests/scripts/test_ray_serve_1.py b/tests/scripts/test_ray_serve_1.py new file mode 100644 index 00000000000..fb4e4614d40 --- /dev/null +++ b/tests/scripts/test_ray_serve_1.py @@ -0,0 +1,17 @@ +import ray +import sys +import ray.serve as serve +import os +import requests +from ray._private.test_utils import wait_for_condition + +ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1]) +serve.start(detached=True) + +@serve.deployment +def d(*args): + return "HelloWorld" +d.deploy() +val = ray.get(d.get_handle().remote()) +print(val) +assert(val == "HelloWorld") diff --git a/tests/scripts/test_ray_serve_2.py b/tests/scripts/test_ray_serve_2.py new file mode 100644 index 00000000000..16063ce0962 --- /dev/null +++ b/tests/scripts/test_ray_serve_2.py @@ -0,0 +1,26 @@ +import ray +import sys +import ray.serve as serve +import time + +def retry_with_timeout(func, timeout=90): + err = None + for _ in range(timeout): + try: + return func() + except BaseException as e: + err = e + finally: + time.sleep(1) + raise err + +retry_with_timeout(lambda: ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1])) +retry_with_timeout(lambda: serve.start(detached=True)) + +@serve.deployment +def d(*args): + return "HelloWorld" + +val = retry_with_timeout(lambda: ray.get(d.get_handle().remote())) +print(val) +assert(val == "HelloWorld") From fa25a7be2ad154c3e10345514b1b71ac27f2fc7e Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 24 Oct 2022 21:57:47 +0000 Subject: [PATCH 2/5] update_ray_serve_scripts --- tests/scripts/test_ray_serve_1.py | 30 ++++++++++++++++++------------ tests/scripts/test_ray_serve_2.py | 16 ++++++---------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/tests/scripts/test_ray_serve_1.py b/tests/scripts/test_ray_serve_1.py index fb4e4614d40..01fb8cc47fe 100644 --- a/tests/scripts/test_ray_serve_1.py +++ b/tests/scripts/test_ray_serve_1.py @@ -1,17 +1,23 @@ +import requests +from starlette.requests import Request import ray +from ray import serve +import time import sys -import ray.serve as serve -import os -import requests -from ray._private.test_utils import wait_for_condition -ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1]) -serve.start(detached=True) +# 1: Define a Ray Serve model. +@serve.deployment(route_prefix="/") +class MyModelDeployment: + def __init__(self, msg: str): + self._msg = msg -@serve.deployment -def d(*args): - return "HelloWorld" -d.deploy() -val = ray.get(d.get_handle().remote()) + def __call__(self): + return self._msg + +ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1]) +# 2: Deploy the model. +handle = serve.run(MyModelDeployment.bind(msg="Hello world!")) +# 3: Query the deployment and print the result. +val = ray.get(handle.remote()) print(val) -assert(val == "HelloWorld") +assert(val == "Hello world!") diff --git a/tests/scripts/test_ray_serve_2.py b/tests/scripts/test_ray_serve_2.py index 16063ce0962..abcd251adfe 100644 --- a/tests/scripts/test_ray_serve_2.py +++ b/tests/scripts/test_ray_serve_2.py @@ -1,7 +1,7 @@ import ray -import sys -import ray.serve as serve import time +import sys +from ray import serve def retry_with_timeout(func, timeout=90): err = None @@ -15,12 +15,8 @@ def retry_with_timeout(func, timeout=90): raise err retry_with_timeout(lambda: ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1])) -retry_with_timeout(lambda: serve.start(detached=True)) - -@serve.deployment -def d(*args): - return "HelloWorld" - -val = retry_with_timeout(lambda: ray.get(d.get_handle().remote())) +retry_with_timeout(lambda:serve.start(detached=True)) +val = retry_with_timeout(lambda: ray.get(serve.get_deployment("MyModelDeployment").get_handle().remote())) print(val) -assert(val == "HelloWorld") +assert(val == "Hello world!") + From 847c162c98d73615d1b4b8cf4009340e2ce5c3db Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 24 Oct 2022 23:38:52 +0000 Subject: [PATCH 3/5] replace sleep function with the new wait function --- tests/compatibility-test.py | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/tests/compatibility-test.py b/tests/compatibility-test.py index af3b68b51fe..b59188398a8 100755 --- a/tests/compatibility-test.py +++ b/tests/compatibility-test.py @@ -174,36 +174,50 @@ def test_kill_head(self): assert rtn == 0 def test_ray_serve(self): - client = docker.from_env() - container = client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True, + docker_client = docker.from_env() + container = docker_client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True, network_mode='host', command=["/bin/sh"]) # Deploy a model with ray serve ray_namespace = ''.join(random.choices(string.ascii_lowercase, k=10)) logger.info(f'namespace: {ray_namespace}') utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_ray_serve_1.py') - exit_code, output = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_1.py {ray_namespace}', timeout_sec = 180) + exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_1.py {ray_namespace}', timeout_sec = 180) if exit_code != 0: raise Exception(f"There was an exception during the execution of test_ray_serve_1.py. The exit code is {exit_code}." + "See above for command output. The output will be printed by the function exec_run_container.") - # kill the gcs on head node. If fate sharing is enabled, the whole head node pod will terminate. - utils.shell_assert_success( - 'kubectl exec -it $(kubectl get pods -A| grep -e "-head" | awk "{print \\$2}") -- /bin/bash -c "ps aux | grep gcs_server | grep -v grep | awk \'{print \$2}\' | xargs kill"') - # wait for new head node getting created - time.sleep(180) + # Initialize k8s client + config.load_kube_config() + k8s_api = client.CoreV1Api() + + # KubeRay only allows at most 1 head pod per RayCluster instance at the same time. In addition, + # if we have 0 head pods at this moment, it indicates that the head pod crashes unexpectedly. + headpods = utils.get_pod(k8s_api, namespace='default', label_selector='rayNodeType=head') + assert(len(headpods.items) == 1) + old_head_pod = headpods.items[0] + old_head_pod_name = old_head_pod.metadata.name + restart_count = old_head_pod.status.container_statuses[0].restart_count + + # Kill the gcs_server process on head node. If fate sharing is enabled, the whole head node pod + # will terminate. + exec_command = ['pkill gcs_server'] + utils.pod_exec_command(k8s_api, pod_name=old_head_pod_name, namespace='default', exec_command=exec_command) + + # Waiting for all pods become ready and running. + utils.wait_for_new_head(k8s_api, old_head_pod_name, restart_count, 'default', timeout=300, retry_interval_ms=1000) # Try to connect to the deployed model again utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_ray_serve_2.py') - exit_code, output = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_2.py {ray_namespace}', timeout_sec = 180) + exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_2.py {ray_namespace}', timeout_sec = 180) if exit_code != 0: raise Exception(f"There was an exception during the execution of test_ray_serve_2.py. The exit code is {exit_code}." + "See above for command output. The output will be printed by the function exec_run_container.") container.stop() - client.close() + docker_client.close() def test_detached_actor(self): docker_client = docker.from_env() From 03c29b61371b42c963a13cec5277f28c182ec3b2 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 22 Oct 2022 23:52:58 +0000 Subject: [PATCH 4/5] Update-ray-serve-test --- tests/scripts/test_ray_serve_2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/scripts/test_ray_serve_2.py b/tests/scripts/test_ray_serve_2.py index abcd251adfe..65fbc758601 100644 --- a/tests/scripts/test_ray_serve_2.py +++ b/tests/scripts/test_ray_serve_2.py @@ -1,7 +1,7 @@ import ray -import time import sys -from ray import serve +import ray.serve as serve +import time def retry_with_timeout(func, timeout=90): err = None From 8c346eb450be60883b92db51f741c066c5836d89 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 24 Oct 2022 21:57:47 +0000 Subject: [PATCH 5/5] update_ray_serve_scripts --- tests/scripts/test_ray_serve_2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/scripts/test_ray_serve_2.py b/tests/scripts/test_ray_serve_2.py index 65fbc758601..abcd251adfe 100644 --- a/tests/scripts/test_ray_serve_2.py +++ b/tests/scripts/test_ray_serve_2.py @@ -1,7 +1,7 @@ import ray -import sys -import ray.serve as serve import time +import sys +from ray import serve def retry_with_timeout(func, timeout=90): err = None