Skip to content

Commit

Permalink
[Bug] Misuse of Docker API and misunderstanding of Ray HA cause test_…
Browse files Browse the repository at this point in the history
…ray_serve flaky (#650)

Cleans up RayServe compatibility test.

Co-authored-by: Ubuntu <azureuser@Ubuntu.pqxb1uggpgbehcpt4orv5untcb.rx.internal.cloudapp.net>
  • Loading branch information
jasoonn and Ubuntu authored Nov 3, 2022
1 parent 79c6c20 commit 1ab5a00
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 88 deletions.
127 changes: 39 additions & 88 deletions tests/compatibility-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,99 +174,50 @@ def test_kill_head(self):
assert rtn == 0

def test_ray_serve(self):
client = docker.from_env()
container = client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True,
network_mode='host', command=["/bin/sh", "-c", "python"])
s = container.attach_socket(
params={'stdin': 1, 'stream': 1, 'stdout': 1, 'stderr': 1})
s._sock.setblocking(0)
s._sock.sendall(b'''
import ray
import time
import ray.serve as serve
import os
import requests
from ray._private.test_utils import wait_for_condition
docker_client = docker.from_env()
container = docker_client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True,
network_mode='host', command=["/bin/sh"])
# Deploy a model with ray serve
ray_namespace = ''.join(random.choices(string.ascii_lowercase, k=10))
logger.info(f'namespace: {ray_namespace}')

def retry_with_timeout(func, count=90):
tmp = 0
err = None
while tmp < count:
try:
return func()
except Exception as e:
err = e
tmp += 1
assert err is not None
raise err
utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_ray_serve_1.py')
exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_1.py {ray_namespace}', timeout_sec = 180)

ray.init(address='ray://127.0.0.1:10001')
if exit_code != 0:
raise Exception(f"There was an exception during the execution of test_ray_serve_1.py. The exit code is {exit_code}." +
"See above for command output. The output will be printed by the function exec_run_container.")

@serve.deployment
def d(*args):
return f"{os.getpid()}"
d.deploy()
pid1 = ray.get(d.get_handle().remote())
print('ready')
''')

count = 0
while count < 90:
try:
buf = s._sock.recv(4096)
logger.info(buf.decode())
if buf.decode().find('ready') != -1:
break
except Exception as e:
pass
time.sleep(1)
count += 1
if count >= 90:
raise Exception('failed to run script')

# kill the gcs on head node. If fate sharing is enabled
# the whole head node pod will terminate.
utils.shell_assert_success(
'kubectl exec -it $(kubectl get pods -A| grep -e "-head" | awk "{print \\$2}") -- /bin/bash -c "ps aux | grep gcs_server | grep -v grep | awk \'{print \$2}\' | xargs kill"')
# wait for new head node getting created
time.sleep(10)
# make sure the new head is ready
utils.shell_assert_success(
'kubectl wait --for=condition=Ready pod/$(kubectl get pods -A | grep -e "-head" | awk "{print \$2}") --timeout=900s')

s._sock.sendall(b'''
def get_new_value():
return ray.get(d.get_handle().remote())
pid2 = retry_with_timeout(get_new_value)
if pid1 == pid2:
print('successful: {} {}'.format(pid1, pid2))
sys.exit(0)
else:
print('failed: {} {}'.format(pid1, pid2))
raise Exception('failed')
''')

count = 0
while count < 90:
try:
buf = s._sock.recv(4096)
logger.info(buf.decode())
if buf.decode().find('successful') != -1:
break
if buf.decode().find('failed') != -1:
raise Exception('test failed {}'.format(buf.decode()))
except Exception as e:
pass
time.sleep(1)
count += 1
if count >= 90:
raise Exception('failed to run script')
# Initialize k8s client
config.load_kube_config()
k8s_api = client.CoreV1Api()

# KubeRay only allows at most 1 head pod per RayCluster instance at the same time. In addition,
# if we have 0 head pods at this moment, it indicates that the head pod crashes unexpectedly.
headpods = utils.get_pod(k8s_api, namespace='default', label_selector='rayNodeType=head')
assert(len(headpods.items) == 1)
old_head_pod = headpods.items[0]
old_head_pod_name = old_head_pod.metadata.name
restart_count = old_head_pod.status.container_statuses[0].restart_count

# Kill the gcs_server process on head node. If fate sharing is enabled, the whole head node pod
# will terminate.
exec_command = ['pkill gcs_server']
utils.pod_exec_command(k8s_api, pod_name=old_head_pod_name, namespace='default', exec_command=exec_command)

# Waiting for all pods become ready and running.
utils.wait_for_new_head(k8s_api, old_head_pod_name, restart_count, 'default', timeout=300, retry_interval_ms=1000)

# Try to connect to the deployed model again
utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_ray_serve_2.py')
exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_2.py {ray_namespace}', timeout_sec = 180)

if exit_code != 0:
raise Exception(f"There was an exception during the execution of test_ray_serve_2.py. The exit code is {exit_code}." +
"See above for command output. The output will be printed by the function exec_run_container.")

container.stop()
client.close()
docker_client.close()

def test_detached_actor(self):
docker_client = docker.from_env()
Expand Down
23 changes: 23 additions & 0 deletions tests/scripts/test_ray_serve_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import requests
from starlette.requests import Request
import ray
from ray import serve
import time
import sys

# 1: Define a Ray Serve model.
@serve.deployment(route_prefix="/")
class MyModelDeployment:
def __init__(self, msg: str):
self._msg = msg

def __call__(self):
return self._msg

ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1])
# 2: Deploy the model.
handle = serve.run(MyModelDeployment.bind(msg="Hello world!"))
# 3: Query the deployment and print the result.
val = ray.get(handle.remote())
print(val)
assert(val == "Hello world!")
22 changes: 22 additions & 0 deletions tests/scripts/test_ray_serve_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import ray
import time
import sys
from ray import serve

def retry_with_timeout(func, timeout=90):
err = None
for _ in range(timeout):
try:
return func()
except BaseException as e:
err = e
finally:
time.sleep(1)
raise err

retry_with_timeout(lambda: ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1]))
retry_with_timeout(lambda:serve.start(detached=True))
val = retry_with_timeout(lambda: ray.get(serve.get_deployment("MyModelDeployment").get_handle().remote()))
print(val)
assert(val == "Hello world!")

0 comments on commit 1ab5a00

Please sign in to comment.