Skip to content

Commit

Permalink
[RayService] a safeguard for preventing overriding the pending cluste…
Browse files Browse the repository at this point in the history
…r during a upgrade (ray-project#2887)

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian authored Feb 5, 2025
1 parent 268a776 commit b753f1a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
9 changes: 7 additions & 2 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (r *RayServiceReconciler) calculateStatus(ctx context.Context, rayServiceIn
rayServiceInstance.Status.ActiveServiceStatus.Applications = activeClusterServeApplications
rayServiceInstance.Status.PendingServiceStatus.Applications = pendingClusterServeApplications

isPendingClusterServing := false
if headSvc != nil && serveSvc != nil {
pendingClusterName := rayServiceInstance.Status.PendingServiceStatus.RayClusterName
activeClusterName := rayServiceInstance.Status.ActiveServiceStatus.RayClusterName
Expand All @@ -257,6 +258,7 @@ func (r *RayServiceReconciler) calculateStatus(ctx context.Context, rayServiceIn
if clusterName != pendingClusterName && clusterName != activeClusterName {
panic("clusterName is not equal to pendingCluster or activeCluster")
}
isPendingClusterServing = clusterName == pendingClusterName

// If services point to a different cluster than the active one, promote pending to active
logger.Info("calculateStatus", "clusterSvcPointingTo", clusterName, "pendingClusterName", pendingClusterName, "activeClusterName", activeClusterName)
Expand All @@ -269,7 +271,7 @@ func (r *RayServiceReconciler) calculateStatus(ctx context.Context, rayServiceIn
}
}

if shouldPrepareNewCluster(ctx, rayServiceInstance, activeCluster, pendingCluster) {
if shouldPrepareNewCluster(ctx, rayServiceInstance, activeCluster, pendingCluster, isPendingClusterServing) {
rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{
RayClusterName: utils.GenerateRayClusterName(rayServiceInstance.Name),
}
Expand Down Expand Up @@ -684,7 +686,10 @@ func isClusterSpecHashEqual(rayServiceInstance *rayv1.RayService, cluster *rayv1
return clusterHash == goalClusterHash
}

func shouldPrepareNewCluster(ctx context.Context, rayServiceInstance *rayv1.RayService, activeRayCluster, pendingRayCluster *rayv1.RayCluster) bool {
func shouldPrepareNewCluster(ctx context.Context, rayServiceInstance *rayv1.RayService, activeRayCluster, pendingRayCluster *rayv1.RayCluster, isPendingClusterServing bool) bool {
if isPendingClusterServing {
return false
}
if activeRayCluster == nil && pendingRayCluster == nil {
// Both active and pending clusters are nil, which means the RayService has just been created.
// Create a new pending cluster.
Expand Down
52 changes: 50 additions & 2 deletions ray-operator/controllers/ray/rayservice_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ func TestShouldPrepareNewCluster_PrepareNewCluster(t *testing.T) {
},
}

shouldPrepareNewCluster := shouldPrepareNewCluster(ctx, &rayService, nil, nil)
shouldPrepareNewCluster := shouldPrepareNewCluster(ctx, &rayService, nil, nil, false)
assert.True(t, shouldPrepareNewCluster)
}

Expand Down Expand Up @@ -1153,10 +1153,58 @@ func TestShouldPrepareNewCluster_ZeroDowntimeUpgrade(t *testing.T) {

// Update cluster spec in RayService to trigger a zero downtime upgrade.
rayService.Spec.RayClusterSpec.RayVersion = "new-version"
shouldPrepareNewCluster := shouldPrepareNewCluster(ctx, &rayService, activeCluster, nil)
shouldPrepareNewCluster := shouldPrepareNewCluster(ctx, &rayService, activeCluster, nil, false)
assert.True(t, shouldPrepareNewCluster)
}

func TestShouldPrepareNewCluster_PendingCluster(t *testing.T) {
// Trigger a zero-downtime upgrade when the cluster spec in RayService differs
// from the pending cluster and no current serving cluster.
ctx := context.TODO()
namespace := "test-namespace"
pendingClusterName := "pending-cluster"

rayService := rayv1.RayService{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
Namespace: namespace,
},
Spec: rayv1.RayServiceSpec{
RayClusterSpec: rayv1.RayClusterSpec{
RayVersion: "old-version",
},
},
}

hash, err := generateHashWithoutReplicasAndWorkersToDelete(rayService.Spec.RayClusterSpec)
assert.NoError(t, err)
pendingCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: pendingClusterName,
Namespace: namespace,
Annotations: map[string]string{
utils.HashWithoutReplicasAndWorkersToDeleteKey: hash,
utils.NumWorkerGroupsKey: strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs)),
utils.KubeRayVersion: utils.KUBERAY_VERSION,
},
},
}

t.Run("override the pending cluster if it is not serving", func(t *testing.T) {
// Update cluster spec in RayService to trigger a zero downtime upgrade.
rayService.Spec.RayClusterSpec.RayVersion = "new-version"
shouldPrepareNewCluster := shouldPrepareNewCluster(ctx, &rayService, nil, pendingCluster, false)
assert.True(t, shouldPrepareNewCluster)
})

t.Run("do not override the pending cluster if it is serving", func(t *testing.T) {
// Update cluster spec in RayService to trigger a zero downtime upgrade.
rayService.Spec.RayClusterSpec.RayVersion = "new-version"
shouldPrepareNewCluster := shouldPrepareNewCluster(ctx, &rayService, nil, pendingCluster, true)
assert.False(t, shouldPrepareNewCluster)
})
}

func TestIsZeroDowntimeUpgradeEnabled(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit b753f1a

Please sign in to comment.