Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration test for kuberay ray service and improve ray service operator #415

Merged
merged 14 commits into from
Jul 26, 2022
2 changes: 1 addition & 1 deletion ray-operator/apis/ray/v1alpha1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ServiceStatus string
const (
FailedToGetOrCreateRayCluster ServiceStatus = "FailedToGetOrCreateRayCluster"
WaitForDashboard ServiceStatus = "WaitForDashboard"
FailedServeDeploy ServiceStatus = "FailedServeDeploy"
WaitForServeDeploymentReady ServiceStatus = "WaitForServeDeploymentReady"
FailedToGetServeDeploymentStatus ServiceStatus = "FailedToGetServeDeploymentStatus"
Running ServiceStatus = "Running"
Restarting ServiceStatus = "Restarting"
Expand Down
2 changes: 2 additions & 0 deletions ray-operator/config/samples/ray_v1alpha1_rayservice.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ kind: RayService
metadata:
name: rayservice-sample
spec:
serviceUnhealthySecondThreshold: 300 # Config for the health check threshold for service. Default value is 60.
deploymentUnhealthySecondThreshold: 300 # Config for the health check threshold for deployments. Default value is 60.
serveDeploymentGraphConfig:
importPath: fruit.deployment_graph
runtimeEnv: |
Expand Down
35 changes: 19 additions & 16 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
// Update RayService Status since reconcileRayCluster may mark RayCluster restart.
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil {
logger.Error(errStatus, "Fail to update status of RayService after RayCluster changes", "rayServiceInstance", rayServiceInstance)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil
}
logger.Info("Done reconcileRayCluster update status, enter next loop to create new ray cluster.")
return ctrl.Result{}, nil
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil
}

logger.Info("Reconciling the Serve component.")
Expand All @@ -132,20 +132,23 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
if activeRayClusterInstance != nil && pendingRayClusterInstance == nil {
rayServiceInstance.Status.PendingServiceStatus = rayv1alpha1.RayServiceStatus{}
if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, activeRayClusterInstance, true, logger); err != nil {
return ctrlResult, err
logger.Error(err, "Fail to reconcileServe.")
return ctrlResult, nil
}
} else if activeRayClusterInstance != nil && pendingRayClusterInstance != nil {
if err = r.updateStatusForActiveCluster(ctx, rayServiceInstance, activeRayClusterInstance, logger); err != nil {
logger.Error(err, "The updating of the status for active ray cluster while we have pending cluster failed")
}

if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false, logger); err != nil {
return ctrlResult, err
logger.Error(err, "Fail to reconcileServe.")
return ctrlResult, nil
}
} else if activeRayClusterInstance == nil && pendingRayClusterInstance != nil {
rayServiceInstance.Status.ActiveServiceStatus = rayv1alpha1.RayServiceStatus{}
if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false, logger); err != nil {
return ctrlResult, err
logger.Error(err, "Fail to reconcileServe.")
return ctrlResult, nil
}
} else {
rayServiceInstance.Status.ActiveServiceStatus = rayv1alpha1.RayServiceStatus{}
Expand All @@ -168,26 +171,26 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
if rayClusterInstance != nil {
if err := r.reconcileIngress(ctx, rayServiceInstance, rayClusterInstance); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToUpdateIngress, err)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
if err := r.reconcileServices(ctx, rayServiceInstance, rayClusterInstance, common.HeadService); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToUpdateService, err)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
if err := r.labelHealthyServePods(ctx, rayClusterInstance); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToUpdateServingPodLabel, err)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
if err := r.reconcileServices(ctx, rayServiceInstance, rayClusterInstance, common.ServingService); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToUpdateService, err)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
}

// Final status update for any CR modification.
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil {
logger.Error(errStatus, "Fail to update status of RayService", "rayServiceInstance", rayServiceInstance)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}

return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil
Expand Down Expand Up @@ -735,7 +738,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
r.markRestart(rayServiceInstance)
}
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.WaitForDashboard, err)
return ctrl.Result{}, false, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

rayDashboardClient := utils.GetRayDashboardClientFunc()
Expand All @@ -749,8 +752,8 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
logger.Info("Dashboard is unhealthy, restart the cluster.")
r.markRestart(rayServiceInstance)
}
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedServeDeploy, err)
return ctrl.Result{}, false, err
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.WaitForServeDeploymentReady, err)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

r.Recorder.Eventf(rayServiceInstance, "Normal", "SubmittedServeDeployment",
Expand All @@ -764,7 +767,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
r.markRestart(rayServiceInstance)
}
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToGetServeDeploymentStatus, err)
return ctrl.Result{}, false, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

r.updateAndCheckDashboardStatus(rayServiceStatus, true, rayServiceInstance.Spec.DeploymentUnhealthySecondThreshold)
Expand All @@ -782,7 +785,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
r.markRestart(rayServiceInstance)
rayServiceInstance.Status.ServiceStatus = rayv1alpha1.Restarting
if err := r.Status().Update(ctx, rayServiceInstance); err != nil {
return ctrl.Result{}, false, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

logger.Info("Mark cluster as unhealthy", "rayCluster", rayClusterInstance)
Expand All @@ -791,7 +794,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
return ctrl.Result{RequeueAfter: ServiceRestartRequeueDuration}, false, nil
}

return ctrl.Result{}, true, nil
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, true, nil
}

func (r *RayServiceReconciler) labelHealthyServePods(ctx context.Context, rayClusterInstance *rayv1alpha1.RayCluster) error {
Expand Down
120 changes: 120 additions & 0 deletions tests/compatibility-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import tempfile
import unittest
import subprocess
from string import Template

import docker
Expand Down Expand Up @@ -101,6 +102,37 @@ def create_kuberay_cluster(template_name):
shell_assert_success('kubectl apply -f {}'.format(raycluster_service_file))


def create_kuberay_service(template_name):
template = None
with open(template_name, mode='r') as f:
template = Template(f.read())

rayservice_spec_buf = template.substitute(
{'ray_image': ray_image, 'ray_version': ray_version})

service_config_file = None
with tempfile.NamedTemporaryFile('w', delete=False) as f:
f.write(rayservice_spec_buf)
service_config_file = f.name

rtn = shell_run('kubectl wait --for=condition=ready pod -n ray-system --all --timeout=900s')
if rtn != 0:
shell_run('kubectl get pods -A')
assert rtn == 0
assert service_config_file is not None
shell_assert_success('kubectl apply -f {}'.format(service_config_file))

shell_run('kubectl get pods -A')

time.sleep(20)

wait_for_condition(
lambda: shell_run('kubectl get service rayservice-sample-serve-svc -o jsonpath="{.status}"') == 0,
timeout=900,
retry_interval_ms=5000,
)


def delete_cluster():
shell_run('kind delete cluster')

Expand Down Expand Up @@ -210,6 +242,14 @@ def ray_ha_supported():
return False
return True

def ray_service_supported():
if ray_version == "nightly":
return True
major, minor, patch = parse_ray_version(ray_version)
if major * 100 + minor < 113:
return False
return True


class RayHATestCase(unittest.TestCase):
cluster_template_file = 'tests/config/ray-cluster.ray-ha.yaml.template'
Expand Down Expand Up @@ -438,6 +478,58 @@ def get_new_value():
client.close()


class RayServiceTestCase(unittest.TestCase):
service_template_file = 'tests/config/ray-service.yaml.template'
service_serve_update_template_file = 'tests/config/ray-service-serve-update.yaml.template'
service_cluster_update_template_file = 'tests/config/ray-service-cluster-update.yaml.template'

@classmethod
def setUpClass(cls):
if not ray_service_supported():
return
# Ray Service is running inside a local Kind environment.
# We use the Ray nightly version now.
# We wait for the serve service ready.
# The test will check the successful response from serve service.
delete_cluster()
create_cluster()
apply_kuberay_resources()
download_images()
create_kuberay_service(RayServiceTestCase.service_template_file)

def setUp(self):
if not ray_service_supported():
raise unittest.SkipTest("ray service is not supported")

def test_ray_serve_work(self):
brucez-anyscale marked this conversation as resolved.
Show resolved Hide resolved
port_forwarding_proc = subprocess.Popen('kubectl port-forward service/rayservice-sample-serve-svc 8000', shell=True)
time.sleep(5)
curl_cmd = 'curl -X POST -H \'Content-Type: application/json\' localhost:8000 -d \'["MANGO", 2]\''
wait_for_condition(
lambda: shell_run(curl_cmd) == 0,
timeout=5,
)
create_kuberay_service(RayServiceTestCase.service_serve_update_template_file)
curl_cmd = 'curl -X POST -H \'Content-Type: application/json\' localhost:8000 -d \'["MANGO", 2]\''
time.sleep(5)
wait_for_condition(
lambda: shell_run(curl_cmd) == 0,
timeout=60,
)
create_kuberay_service(RayServiceTestCase.service_cluster_update_template_file)
time.sleep(5)
port_forwarding_proc.kill()
time.sleep(5)
port_forwarding_proc = subprocess.Popen('kubectl port-forward service/rayservice-sample-serve-svc 8000', shell=True)
time.sleep(5)
curl_cmd = 'curl -X POST -H \'Content-Type: application/json\' localhost:8000 -d \'["MANGO", 2]\''

wait_for_condition(
lambda: shell_run(curl_cmd) == 0,
timeout=180,
)
port_forwarding_proc.kill()

def parse_environment():
global ray_version, ray_image, kuberay_sha
for k, v in os.environ.items():
Expand All @@ -450,6 +542,34 @@ def parse_environment():
kuberay_sha = v


def wait_for_condition(
condition_predictor, timeout=10, retry_interval_ms=100, **kwargs
):
"""Wait until a condition is met or time out with an exception.

Args:
condition_predictor: A function that predicts the condition.
timeout: Maximum timeout in seconds.
retry_interval_ms: Retry interval in milliseconds.

Raises:
RuntimeError: If the condition is not met before the timeout expires.
"""
start = time.time()
last_ex = None
while time.time() - start <= timeout:
try:
if condition_predictor(**kwargs):
return
except Exception as ex:
last_ex = ex
time.sleep(retry_interval_ms / 1000.0)
message = "The condition wasn't met before the timeout expired."
if last_ex is not None:
message += f" Last exception: {last_ex}"
raise RuntimeError(message)


if __name__ == '__main__':
parse_environment()
unittest.main(verbosity=2)
Loading