Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Update wait function in test_detached_actor #635

Merged
merged 9 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions tests/compatibility-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import string

import kuberay_utils.utils as utils
from kubernetes import client, config
from kubernetes.stream import stream


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -267,37 +269,54 @@ def get_new_value():
client.close()

def test_detached_actor(self):
client = docker.from_env()
container = client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True,
docker_client = docker.from_env()
container = docker_client.containers.run(ray_image, remove=True, detach=True, stdin_open=True, tty=True,
network_mode='host', command=["/bin/sh"])
ray_namespace = ''.join(random.choices(string.ascii_lowercase, k=10))
logger.info(f'namespace: {ray_namespace}')

# Register a detached actor
utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_detached_actor_1.py')
exit_code, output = utils.exec_run_container(container, f'python3 /usr/local/test_detached_actor_1.py {ray_namespace}', timeout_sec = 180)
exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_detached_actor_1.py {ray_namespace}', timeout_sec = 180)

if exit_code != 0:
raise Exception(f"There was an exception during the execution of test_detached_actor_1.py. The exit code is {exit_code}." +
"See above for command output. The output will be printed by the function exec_run_container.")

# Kill the gcs on head node. If fate sharing is enabled, the whole head node pod will terminate.
utils.shell_assert_success(
'kubectl exec -it $(kubectl get pods -A| grep -e "-head" | awk "{print \\$2}") -- /bin/bash -c "ps aux | grep gcs_server | grep -v grep | awk \'{print \$2}\' | xargs kill"')
# Wait for new head node getting created
# TODO (kevin85421): Need a better method to wait for the new head pod. (https://github.com/ray-project/kuberay/issues/618)
time.sleep(180)
# Initialize k8s client
config.load_kube_config()
k8s_api = client.CoreV1Api()

# KubeRay only allows at most 1 head pod per RayCluster instance at the same time. In addition,
# if we have 0 head pods at this moment, it indicates that the head pod crashes unexpectedly.
headpods = utils.get_pod(k8s_api, namespace='default', label_selector='rayNodeType=head')
assert(len(headpods.items) == 1)
old_head_pod = headpods.items[0]
old_head_pod_name = old_head_pod.metadata.name
restart_count = old_head_pod.status.container_statuses[0].restart_count

# Kill the gcs_server process on head node. If fate sharing is enabled, the whole head node pod
# will terminate.
exec_command = ['pkill gcs_server']
utils.pod_exec_command(k8s_api, pod_name=old_head_pod_name, namespace='default', exec_command=exec_command)

# Waiting for all pods become ready and running.
utils.wait_for_new_head(k8s_api, old_head_pod_name, restart_count, 'default', timeout=300, retry_interval_ms=1000)

# Try to connect to the detached actor again.
# [Note] When all pods become running and ready, the RayCluster still needs tens of seconds to relaunch actors. Hence,
# `test_detached_actor_2.py` will retry until a Ray client connection succeeds.
utils.copy_to_container(container, 'tests/scripts', '/usr/local/', 'test_detached_actor_2.py')
exit_code, output = utils.exec_run_container(container, f'python3 /usr/local/test_detached_actor_2.py {ray_namespace}', timeout_sec = 180)
exit_code, _ = utils.exec_run_container(container, f'python3 /usr/local/test_detached_actor_2.py {ray_namespace}', timeout_sec = 180)

if exit_code != 0:
raise Exception(f"There was an exception during the execution of test_detached_actor_2.py. The exit code is {exit_code}." +
"See above for command output. The output will be printed by the function exec_run_container.")

k8s_api.api_client.rest_client.pool_manager.clear()
k8s_api.api_client.close()
container.stop()
client.close()
docker_client.close()

class RayServiceTestCase(unittest.TestCase):
service_template_file = 'tests/config/ray-service.yaml.template'
Expand Down
1 change: 1 addition & 0 deletions tests/config/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
docker
kubernetes
99 changes: 97 additions & 2 deletions tests/kuberay_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import docker

from string import Template
from kubernetes import client, config
from kubernetes.stream import stream

kindcluster_config_file = 'tests/config/cluster-config.yaml'
raycluster_service_file = 'tests/config/raycluster-service.yaml'
Expand Down Expand Up @@ -168,8 +170,8 @@ def copy_to_container(container, src, dest, filename):
tar.add(filename)
finally:
tar.close()
data = open(tf.name, 'rb').read()
container.put_archive(dest, data)
with open(tf.name, 'rb') as data:
container.put_archive(dest, data.read())
finally:
os.chdir(oldpwd)

Expand Down Expand Up @@ -211,3 +213,96 @@ def wait_for_condition(
if last_ex is not None:
message += f" Last exception: {last_ex}"
raise RuntimeError(message)

def get_pod(k8s_api, namespace, label_selector):
return k8s_api.list_namespaced_pod(namespace = namespace, label_selector = label_selector)

def pod_exec_command(k8s_api, pod_name, namespace, exec_command, stderr=True, stdin=False, stdout=True, tty=False, silent=False):
exec_command = ['/bin/sh', '-c'] + exec_command
resp = stream(k8s_api.connect_get_namespaced_pod_exec,
pod_name,
namespace,
command=exec_command,
stderr=stderr, stdin=stdin,
stdout=stdout, tty=tty)
if not silent:
logger.info(f"cmd: {exec_command}")
logger.info(f"response: {resp}")
return resp

def wait_for_new_head(k8s_api, old_head_pod_name, old_restart_count, namespace, timeout, retry_interval_ms):
"""
`wait_for_new_head` is used to wait for new head is ready and running. For example, `test_detached_actor` kills
the gcs_server process on the head pod. It takes nearly 1 min to kill the head pod, and the head pod will still
be in 'Running' and 'Ready' in that minute.

Hence, we need to check `restart_count` or `new_head_pod_name`.
(1) `restart_count`: If the pod is restarted by the restartPolicy of a Pod, `restart_count` will increase by 1.
If the pod is deleted by KubeRay and the reconciler creates a new one, `restart_count` will be 0.
(2) `new_head_pod_name`: If the reconciler creates a new head pod, `new_head_pod_name` will be different from
`old_head_pod_name`.

Next, we check the status of pods to ensure all pods should be "Running" and "Ready".

After the cluster state converges (all pods are "Running" and "Ready"), ray processes still need tens of seconds to
become ready to serve new connections from ray clients. So, users need to retry until a Ray client connection succeeds.

Args:
k8s_api: Kubernetes client (e.g. client.CoreV1Api())
old_head_pod_name: Name of the old head pod.
old_restart_count: If the Pod is restarted by Kubernetes Pod RestartPolicy, the restart_count will increase by 1.
namespace: Namespace that the head pod is running in.
timeout: Same as `wait_for_condition`.
retry_interval_ms: Same as `wait_for_condition`.

Raises:
RuntimeError: If the condition is not met before the timeout expires, raise the RuntimeError.
"""
def check_status(k8s_api, old_head_pod_name, old_restart_count, namespace) -> bool:
all_pods = k8s_api.list_namespaced_pod(namespace = namespace)
headpods = get_pod(k8s_api, namespace=namespace, label_selector='rayNodeType=head')
# KubeRay only allows at most 1 head pod per RayCluster instance at the same time. On the other
# hands, when we kill a worker, the operator will reconcile a new one immediately without waiting
# for the Pod termination to complete. Hence, it is possible to have more than `worker.Replicas`
# worker pods in the cluster.
if len(headpods.items) != 1:
logger.info('Number of headpods is not equal to 1.')
return False
new_head_pod = headpods.items[0]
new_head_pod_name = new_head_pod.metadata.name
new_restart_count = new_head_pod.status.container_statuses[0].restart_count
# The default container restartPolicy of a Pod is `Always`. Hence, when GCS server is killed,
# the head pod will restart the old one rather than create a new one.
if new_head_pod_name != old_head_pod_name:
logger.info(f'If GCS server is killed, the head pod will restart the old one rather than create a new one.' +
f' new_head_pod_name: {new_head_pod_name}, old_head_pod_name: {old_head_pod_name}')
# TODO (kevin85421): We should `return False` here, but currently ray:nightly has a high possibility to create
# a new head pod instead of restarting the old one.

# When GCS server is killed, it takes nearly 1 min to kill the head pod. In the minute, the head
# pod will still be in 'Running' and 'Ready'. Hence, we need to check `restart_count`.
else:
# TODO (kevin85421): We should remove `else` in the future. Currently, ray:nightly has a high possibility to
# create a new head pod instead of restarting the old one. The new pod's `restart_count`
# is 0.
if new_restart_count != old_restart_count + 1:
logger.info(f'new_restart_count != old_restart_count + 1 => new_restart_count: {new_restart_count},' +
f' old_restart_count: {old_restart_count}')
return False
# All pods should be "Running" and "Ready". This check is an overkill. We added this check due to
# the buggy behaviors of Ray HA. To elaborate, when GCS server is killed, the head pod should restart,
# but worker pods should not. However, currently, worker pods will also restart.
# See https://github.com/ray-project/kuberay/issues/634 for more details.
for pod in all_pods.items:
if pod.status.phase != 'Running':
logger.info(f'Pod {pod.metadata.name} is not Running.')
return False
for c in pod.status.container_statuses:
if not c.ready:
logger.info(f'Container {c.name} in {pod.metadata.name} is not ready.')
return False
return True
wait_for_condition(check_status, timeout=timeout, retry_interval_ms=retry_interval_ms, k8s_api=k8s_api,
old_head_pod_name=old_head_pod_name, old_restart_count=old_restart_count, namespace=namespace)
# After the cluster state converges, ray processes still need tens of seconds to become ready.
# TODO (kevin85421): Make ray processes become ready when pods are "Ready" and "Running".
13 changes: 10 additions & 3 deletions tests/scripts/test_detached_actor_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
import time
import sys

ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1])

def retry_with_timeout(func, timeout=90):
err = None
for _ in range(timeout):
start = time.time()
while time.time() - start <= timeout:
try:
return func()
except BaseException as e:
Expand All @@ -18,7 +17,15 @@ def retry_with_timeout(func, timeout=90):
def get_detached_actor():
return ray.get_actor("testCounter")

# Try to connect to Ray cluster.
print("Try to connect to Ray cluster.")
retry_with_timeout(lambda: ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1]), timeout = 180)

# Get TestCounter actor
print("Get TestCounter actor.")
tc = retry_with_timeout(get_detached_actor)

print("Try to call remote function \'increment\'.")
val = retry_with_timeout(lambda: ray.get(tc.increment.remote()))
print(f"val: {val}")

Expand Down