Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Misuse of Docker API and misunderstanding of Ray HA cause test_ray_serve flaky #650

Merged
merged 5 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 39 additions & 88 deletions tests/compatibility-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,99 +174,50 @@ def test_kill_head(self):
assert rtn == 0

def test_ray_serve(self):
client = docker.from_env()
container = client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True,
network_mode='host', command=["/bin/sh", "-c", "python"])
s = container.attach_socket(
params={'stdin': 1, 'stream': 1, 'stdout': 1, 'stderr': 1})
s._sock.setblocking(0)
s._sock.sendall(b'''
import ray
import time
import ray.serve as serve
import os
import requests
from ray._private.test_utils import wait_for_condition
docker_client = docker.from_env()
container = docker_client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True,
network_mode='host', command=["/bin/sh"])
# Deploy a model with ray serve
ray_namespace = ''.join(random.choices(string.ascii_lowercase, k=10))
logger.info(f'namespace: {ray_namespace}')

def retry_with_timeout(func, count=90):
tmp = 0
err = None
while tmp < count:
try:
return func()
except Exception as e:
err = e
tmp += 1
assert err is not None
raise err
utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_ray_serve_1.py')
exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_1.py {ray_namespace}', timeout_sec = 180)

ray.init(address='ray://127.0.0.1:10001')
if exit_code != 0:
raise Exception(f"There was an exception during the execution of test_ray_serve_1.py. The exit code is {exit_code}." +
"See above for command output. The output will be printed by the function exec_run_container.")

@serve.deployment
def d(*args):
return f"{os.getpid()}"

d.deploy()
pid1 = ray.get(d.get_handle().remote())

print('ready')
''')

count = 0
while count < 90:
try:
buf = s._sock.recv(4096)
logger.info(buf.decode())
if buf.decode().find('ready') != -1:
break
except Exception as e:
pass
time.sleep(1)
count += 1
if count >= 90:
raise Exception('failed to run script')

# kill the gcs on head node. If fate sharing is enabled
# the whole head node pod will terminate.
utils.shell_assert_success(
'kubectl exec -it $(kubectl get pods -A| grep -e "-head" | awk "{print \\$2}") -- /bin/bash -c "ps aux | grep gcs_server | grep -v grep | awk \'{print \$2}\' | xargs kill"')
# wait for new head node getting created
time.sleep(10)
# make sure the new head is ready
utils.shell_assert_success(
'kubectl wait --for=condition=Ready pod/$(kubectl get pods -A | grep -e "-head" | awk "{print \$2}") --timeout=900s')

s._sock.sendall(b'''
def get_new_value():
return ray.get(d.get_handle().remote())
pid2 = retry_with_timeout(get_new_value)

if pid1 == pid2:
print('successful: {} {}'.format(pid1, pid2))
sys.exit(0)
else:
print('failed: {} {}'.format(pid1, pid2))
raise Exception('failed')
''')

count = 0
while count < 90:
try:
buf = s._sock.recv(4096)
logger.info(buf.decode())
if buf.decode().find('successful') != -1:
break
if buf.decode().find('failed') != -1:
raise Exception('test failed {}'.format(buf.decode()))
except Exception as e:
pass
time.sleep(1)
count += 1
if count >= 90:
raise Exception('failed to run script')
# Initialize k8s client
config.load_kube_config()
k8s_api = client.CoreV1Api()

# KubeRay only allows at most 1 head pod per RayCluster instance at the same time. In addition,
# if we have 0 head pods at this moment, it indicates that the head pod crashes unexpectedly.
headpods = utils.get_pod(k8s_api, namespace='default', label_selector='rayNodeType=head')
assert(len(headpods.items) == 1)
old_head_pod = headpods.items[0]
old_head_pod_name = old_head_pod.metadata.name
restart_count = old_head_pod.status.container_statuses[0].restart_count

# Kill the gcs_server process on head node. If fate sharing is enabled, the whole head node pod
# will terminate.
exec_command = ['pkill gcs_server']
utils.pod_exec_command(k8s_api, pod_name=old_head_pod_name, namespace='default', exec_command=exec_command)

# Waiting for all pods become ready and running.
utils.wait_for_new_head(k8s_api, old_head_pod_name, restart_count, 'default', timeout=300, retry_interval_ms=1000)

# Try to connect to the deployed model again
utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_ray_serve_2.py')
exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_ray_serve_2.py {ray_namespace}', timeout_sec = 180)

if exit_code != 0:
raise Exception(f"There was an exception during the execution of test_ray_serve_2.py. The exit code is {exit_code}." +
"See above for command output. The output will be printed by the function exec_run_container.")

container.stop()
client.close()
docker_client.close()

def test_detached_actor(self):
docker_client = docker.from_env()
Expand Down
23 changes: 23 additions & 0 deletions tests/scripts/test_ray_serve_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import requests
from starlette.requests import Request
import ray
from ray import serve
import time
import sys

# 1: Define a Ray Serve model.
@serve.deployment(route_prefix="/")
class MyModelDeployment:
def __init__(self, msg: str):
self._msg = msg

def __call__(self):
return self._msg

ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1])
# 2: Deploy the model.
handle = serve.run(MyModelDeployment.bind(msg="Hello world!"))
# 3: Query the deployment and print the result.
val = ray.get(handle.remote())
print(val)
assert(val == "Hello world!")
22 changes: 22 additions & 0 deletions tests/scripts/test_ray_serve_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import ray
import time
import sys
from ray import serve

def retry_with_timeout(func, timeout=90):
err = None
for _ in range(timeout):
try:
return func()
except BaseException as e:
err = e
finally:
time.sleep(1)
raise err

retry_with_timeout(lambda: ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1]))
retry_with_timeout(lambda:serve.start(detached=True))
val = retry_with_timeout(lambda: ray.get(serve.get_deployment("MyModelDeployment").get_handle().remote()))
print(val)
assert(val == "Hello world!")