Skip to content

Commit

Permalink
Support suspension of RayClusters (ray-project#1711)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsykim authored Dec 12, 2023
1 parent 8760d90 commit 86abaab
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ _Appears in:_
| `enableInTreeAutoscaling` _boolean_ | EnableInTreeAutoscaling indicates whether operator should create in tree autoscaling configs |
| `autoscalerOptions` _[AutoscalerOptions](#autoscaleroptions)_ | AutoscalerOptions specifies optional configuration for the Ray autoscaler. |
| `headServiceAnnotations` _object (keys:string, values:string)_ | |
| `suspend` _boolean_ | Suspend indicates whether a RayCluster should be suspended. A suspended RayCluster will have head pods and worker pods deleted. |


#### RayJob
Expand Down
2 changes: 2 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions helm-chart/kuberay-operator/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ rules:
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type RayClusterSpec struct {
// AutoscalerOptions specifies optional configuration for the Ray autoscaler.
AutoscalerOptions *AutoscalerOptions `json:"autoscalerOptions,omitempty"`
HeadServiceAnnotations map[string]string `json:"headServiceAnnotations,omitempty"`
// Suspend indicates whether a RayCluster should be suspended.
// A suspended RayCluster will have head pods and worker pods deleted.
Suspend *bool `json:"suspend,omitempty"`
}

// HeadGroupSpec are the spec for the head pod
Expand Down Expand Up @@ -106,6 +109,7 @@ const (
Ready ClusterState = "ready"
Unhealthy ClusterState = "unhealthy"
Failed ClusterState = "failed"
Suspended ClusterState = "suspended"
)

// RayClusterStatus defines the observed state of RayCluster
Expand Down
5 changes: 5 additions & 0 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayclusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ray-operator/config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ rules:
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
Expand Down
24 changes: 23 additions & 1 deletion ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type RayClusterReconciler struct {
// +kubebuilder:rbac:groups=ray.io,resources=rayclusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=ray.io,resources=rayclusters/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete;deletecollection
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services/status,verbs=get;update;patch
Expand Down Expand Up @@ -312,6 +312,11 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
return ctrl.Result{}, nil
}

if instance.Spec.Suspend != nil && *instance.Spec.Suspend && instance.Status.State == rayv1.Suspended {
r.Log.Info("RayCluster is suspended, skipping reconcile", "cluster name", request.Name)
return ctrl.Result{}, nil
}

if err := r.reconcileAutoscalerServiceAccount(ctx, instance); err != nil {
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
r.Log.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
Expand Down Expand Up @@ -582,6 +587,19 @@ func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instan
}

func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster) error {
// if RayCluster is suspended, delete all pods and skip reconcile
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
clusterLabel := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name}
if err := r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(instance.Namespace), clusterLabel); err != nil {
return err
}

r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
"Deleted Pods for RayCluster %s/%s due to suspension",
instance.Namespace, instance.Name)
return nil
}

// check if all the pods exist
headPods := corev1.PodList{}
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode)}
Expand Down Expand Up @@ -1210,6 +1228,10 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
}
}

if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
newInstance.Status.State = rayv1.Suspended
}

if err := r.updateEndpoints(ctx, newInstance); err != nil {
return nil, err
}
Expand Down
67 changes: 67 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,73 @@ var _ = Context("Inside the default namespace", func() {
listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*2, time.Millisecond*200).Should(Equal(4), fmt.Sprintf("workerGroup %v", workerPods.Items))
})

It("should delete all head and worker pods if suspended", func() {
// suspend a Raycluster and check that all pods are deleted.
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayCluster.Name, Namespace: "default"}, myRayCluster),
time.Second*9, time.Millisecond*500).Should(BeNil(), "My raycluster = %v", myRayCluster)
suspend := true
myRayCluster.Spec.Suspend = &suspend
return k8sClient.Update(ctx, myRayCluster)
})
Expect(err).NotTo(HaveOccurred(), "failed to update test RayCluster resource")

// check that all pods are deleted
Eventually(
listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(0), fmt.Sprintf("workerGroup %v", workerPods.Items))

Eventually(
listResourceFunc(ctx, &headPods, headFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(0), fmt.Sprintf("head %v", headPods.Items))
})

It("cluster's .status.state should be updated to 'suspended' shortly after all Pods are terminated", func() {
Eventually(
getClusterState(ctx, "default", myRayCluster.Name),
time.Second*(utils.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS+5), time.Millisecond*500).Should(Equal(rayv1.Suspended))
})

It("should run all head and worker pods if un-suspended", func() {
// suspend a Raycluster and check that all pods are deleted.
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayCluster.Name, Namespace: "default"}, myRayCluster),
time.Second*9, time.Millisecond*500).Should(BeNil(), "My raycluster = %v", myRayCluster)
suspend := false
myRayCluster.Spec.Suspend = &suspend
return k8sClient.Update(ctx, myRayCluster)
})
Expect(err).NotTo(HaveOccurred(), "failed to update test RayCluster resource")

// check that all pods are created
Eventually(
listResourceFunc(ctx, &headPods, headFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(1), fmt.Sprintf("head %v", headPods.Items))
Eventually(
listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(4), fmt.Sprintf("workerGroup %v", workerPods.Items))

// We need to also manually update Pod statuses back to "Running" or else they will always stay as Pending.
// This is because we don't run kubelets in the unit tests to update the status subresource.
for _, headPod := range headPods.Items {
headPod.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, &headPod)).Should(BeNil())
}

for _, workerPod := range workerPods.Items {
workerPod.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, &workerPod)).Should(BeNil())
}
})

It("cluster's .status.state should be updated back to 'ready' after being un-suspended", func() {
Eventually(
getClusterState(ctx, "default", myRayCluster.Name),
time.Second*(utils.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS+5), time.Millisecond*500).Should(Equal(rayv1.Ready))
})
})
})

Expand Down

0 comments on commit 86abaab

Please sign in to comment.