diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 8b845a92759..5045f5ba76f 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -107,6 +107,7 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque if rayServiceInstance, err = r.getRayServiceInstance(ctx, request); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } + originalRayServiceInstance := rayServiceInstance.DeepCopy() r.cleanUpServeConfigCache(rayServiceInstance) // TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not. @@ -212,14 +213,74 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque } // Final status update for any CR modification. - if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil { - logger.Error(errStatus, "Fail to update status of RayService", "rayServiceInstance", rayServiceInstance) - return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err + if r.inconsistentRayServiceStatuses(originalRayServiceInstance.Status, rayServiceInstance.Status) { + if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil { + logger.Error(errStatus, "Failed to update RayService status", "rayServiceInstance", rayServiceInstance) + return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err + } } return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil } +// Checks whether the old and new RayServiceStatus are inconsistent by comparing different fields. +// If the only differences between the old and new status are the LastUpdateTime and HealthLastUpdateTime fields, +// the status update will not be triggered. +// The RayClusterStatus field is only for observability in RayService CR, and changes to it will not trigger the status update. +func (r *RayServiceReconciler) inconsistentRayServiceStatus(oldStatus rayv1alpha1.RayServiceStatus, newStatus rayv1alpha1.RayServiceStatus) bool { + if oldStatus.RayClusterName != newStatus.RayClusterName { + r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService RayClusterName changed from %s to %s", oldStatus.RayClusterName, newStatus.RayClusterName)) + return true + } + + if oldStatus.DashboardStatus.IsHealthy != newStatus.DashboardStatus.IsHealthy { + r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService DashboardStatus changed from %v to %v", oldStatus.DashboardStatus, newStatus.DashboardStatus)) + return true + } + + if oldStatus.ApplicationStatus.Status != newStatus.ApplicationStatus.Status || + oldStatus.ApplicationStatus.Message != newStatus.ApplicationStatus.Message { + r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService ApplicationStatus changed from %v to %v", oldStatus.ApplicationStatus, newStatus.ApplicationStatus)) + return true + } + + if len(oldStatus.ServeStatuses) != len(newStatus.ServeStatuses) { + r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService number of ServeStatus changed from %v to %v", len(oldStatus.ServeStatuses), len(newStatus.ServeStatuses))) + return true + } + + for i := 0; i < len(oldStatus.ServeStatuses); i++ { + if oldStatus.ServeStatuses[i].Name != newStatus.ServeStatuses[i].Name || + oldStatus.ServeStatuses[i].Status != newStatus.ServeStatuses[i].Status || + oldStatus.ServeStatuses[i].Message != newStatus.ServeStatuses[i].Message { + r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService ServeDeploymentStatus changed from %v to %v", oldStatus.ServeStatuses[i], newStatus.ServeStatuses[i])) + return true + } + } + + return false +} + +// Determine whether to update the status of the RayService instance. +func (r *RayServiceReconciler) inconsistentRayServiceStatuses(oldStatus rayv1alpha1.RayServiceStatuses, newStatus rayv1alpha1.RayServiceStatuses) bool { + if oldStatus.ServiceStatus != newStatus.ServiceStatus { + r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService ServiceStatus changed from %s to %s", oldStatus.ServiceStatus, newStatus.ServiceStatus)) + return true + } + + if r.inconsistentRayServiceStatus(oldStatus.ActiveServiceStatus, newStatus.ActiveServiceStatus) { + r.Log.Info("inconsistentRayServiceStatus RayService ActiveServiceStatus changed") + return true + } + + if r.inconsistentRayServiceStatus(oldStatus.PendingServiceStatus, newStatus.PendingServiceStatus) { + r.Log.Info("inconsistentRayServiceStatus RayService PendingServiceStatus changed") + return true + } + + return false +} + // SetupWithManager sets up the controller with the Manager. func (r *RayServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/ray-operator/controllers/ray/rayservice_controller_test.go b/ray-operator/controllers/ray/rayservice_controller_test.go index 4448ddc34cc..3acb1cd05f1 100644 --- a/ray-operator/controllers/ray/rayservice_controller_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_test.go @@ -382,6 +382,94 @@ var _ = Context("Inside the default namespace", func() { time.Second*15, time.Millisecond*500).Should(Equal(initialPendingClusterName), "New active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName) }) + It("Status should be updated if the differences are not only LastUpdateTime and HealthLastUpdateTime fields.", func() { + // Make sure (1) Dashboard client is healthy (2) All the three Ray Serve deployments in the active RayCluster are HEALTHY. + initialClusterName, _ := getRayClusterNameFunc(ctx, myRayService)() + Eventually( + checkServiceHealth(ctx, myRayService), + time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status) + + // ServiceUnhealthySecondThreshold is a global variable in rayservice_controller.go. + // If the time elapsed since the last update of the service HEALTHY status exceeds ServiceUnhealthySecondThreshold seconds, + // the RayService controller will consider the active RayCluster as unhealthy and prepare a new RayCluster. + orignalServeDeploymentUnhealthySecondThreshold := ServiceUnhealthySecondThreshold + ServiceUnhealthySecondThreshold = 500 + + // Only update the LastUpdateTime and HealthLastUpdateTime fields in the active RayCluster. + oldTime := myRayService.Status.ActiveServiceStatus.ServeStatuses[0].HealthLastUpdateTime.DeepCopy() + newTime := oldTime.Add(time.Duration(5) * time.Minute) // 300 seconds + fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.NewTime(newTime), "UNHEALTHY")) + + // Confirm not switch to a new RayCluster because ServiceUnhealthySecondThreshold is 500 seconds. + Consistently( + getRayClusterNameFunc(ctx, myRayService), + time.Second*3, time.Millisecond*500).Should(Equal(initialClusterName), "Active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName) + + // Check if all the ServeStatuses[i].Status are UNHEALTHY. + checkAllServeStatusesUnhealthy := func(ctx context.Context, rayService *rayiov1alpha1.RayService) bool { + if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Name, Namespace: rayService.Namespace}, rayService); err != nil { + return false + } + for _, serveStatus := range rayService.Status.ActiveServiceStatus.ServeStatuses { + if serveStatus.Status != "UNHEALTHY" { + return false + } + } + return true + } + + // The status update not only includes the LastUpdateTime and HealthLastUpdateTime fields, but also the ServeStatuses[i].Status field. + // Hence, all the ServeStatuses[i].Status should be updated to UNHEALTHY. + // + // Note: LastUpdateTime/HealthLastUpdateTime will be overwritten via metav1.Now() in rayservice_controller.go. + // Hence, we cannot use `newTime`` to check whether the status is updated or not. + Eventually( + checkAllServeStatusesUnhealthy(ctx, myRayService), + time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status) + + fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.NewTime(newTime), "HEALTHY")) + + // Confirm not switch to a new RayCluster because ServiceUnhealthySecondThreshold is 500 seconds. + Consistently( + getRayClusterNameFunc(ctx, myRayService), + time.Second*3, time.Millisecond*500).Should(Equal(initialClusterName), "Active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName) + + // The status update not only includes the LastUpdateTime and HealthLastUpdateTime fields, but also the ServeStatuses[i].Status field. + // Hence, the status should be updated. + Eventually( + checkServiceHealth(ctx, myRayService), + time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status) + ServiceUnhealthySecondThreshold = orignalServeDeploymentUnhealthySecondThreshold + }) + + It("Status should not be updated if the only differences are the LastUpdateTime and HealthLastUpdateTime fields.", func() { + // Make sure (1) Dashboard client is healthy (2) All the three Ray Serve deployments in the active RayCluster are HEALTHY. + initialClusterName, _ := getRayClusterNameFunc(ctx, myRayService)() + Eventually( + checkServiceHealth(ctx, myRayService), + time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status) + + // Only update the LastUpdateTime and HealthLastUpdateTime fields in the active RayCluster. + oldTime := myRayService.Status.ActiveServiceStatus.ServeStatuses[0].HealthLastUpdateTime.DeepCopy() + newTime := oldTime.Add(time.Duration(5) * time.Minute) // 300 seconds + fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.NewTime(newTime), "HEALTHY")) + + // Confirm not switch to a new RayCluster + Consistently( + getRayClusterNameFunc(ctx, myRayService), + time.Second*3, time.Millisecond*500).Should(Equal(initialClusterName), "Active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName) + + // The status is still the same as before. + Eventually( + checkServiceHealth(ctx, myRayService), + time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status) + + // Status should not be updated if the only differences are the LastUpdateTime and HealthLastUpdateTime fields. + // Unlike the test "Status should be updated if the differences are not only LastUpdateTime and HealthLastUpdateTime fields.", + // the status update will not be triggered, so we can check whether the LastUpdateTime/HealthLastUpdateTime fields are updated or not by `oldTime`. + Expect(myRayService.Status.ActiveServiceStatus.ServeStatuses[0].HealthLastUpdateTime).Should(Equal(oldTime), "myRayService status = %v", myRayService.Status) + }) + It("Update workerGroup.replicas in RayService and should not switch to new Ray Cluster", func() { // Certain field updates should not trigger new RayCluster preparation, such as updates // to `Replicas` and `WorkersToDelete` triggered by the autoscaler during scaling up/down. diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 7493a62882d..1b05da48540 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -6,7 +6,9 @@ import ( "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" + ctrl "sigs.k8s.io/controller-runtime" ) func TestGenerateRayClusterJsonHash(t *testing.T) { @@ -60,3 +62,115 @@ func TestCompareRayClusterJsonHash(t *testing.T) { assert.Nil(t, err) assert.True(t, equal) } + +func TestInconsistentRayServiceStatuses(t *testing.T) { + r := &RayServiceReconciler{ + Log: ctrl.Log.WithName("controllers").WithName("RayService"), + } + + timeNow := metav1.Now() + oldStatus := v1alpha1.RayServiceStatuses{ + ActiveServiceStatus: v1alpha1.RayServiceStatus{ + RayClusterName: "new-cluster", + DashboardStatus: v1alpha1.DashboardStatus{ + IsHealthy: true, + LastUpdateTime: &timeNow, + HealthLastUpdateTime: &timeNow, + }, + ApplicationStatus: v1alpha1.AppStatus{ + Status: "running", + Message: "OK", + LastUpdateTime: &timeNow, + HealthLastUpdateTime: &timeNow, + }, + ServeStatuses: []v1alpha1.ServeDeploymentStatus{ + { + Name: "serve-1", + Status: "unhealthy", + Message: "error", + LastUpdateTime: &timeNow, + HealthLastUpdateTime: &timeNow, + }, + }, + }, + PendingServiceStatus: v1alpha1.RayServiceStatus{ + RayClusterName: "old-cluster", + DashboardStatus: v1alpha1.DashboardStatus{ + IsHealthy: true, + LastUpdateTime: &timeNow, + HealthLastUpdateTime: &timeNow, + }, + ApplicationStatus: v1alpha1.AppStatus{ + Status: "stopped", + Message: "stopped", + LastUpdateTime: &timeNow, + HealthLastUpdateTime: &timeNow, + }, + ServeStatuses: []v1alpha1.ServeDeploymentStatus{ + { + Name: "serve-1", + Status: "healthy", + Message: "Serve is healthy", + LastUpdateTime: &timeNow, + HealthLastUpdateTime: &timeNow, + }, + }, + }, + ServiceStatus: v1alpha1.WaitForDashboard, + } + + // Test 1: Update ServiceStatus only. + newStatus := oldStatus.DeepCopy() + newStatus.ServiceStatus = v1alpha1.WaitForServeDeploymentReady + assert.True(t, r.inconsistentRayServiceStatuses(oldStatus, *newStatus)) + + // Test 2: Test RayServiceStatus + newStatus = oldStatus.DeepCopy() + newStatus.ActiveServiceStatus.DashboardStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(1)} + assert.False(t, r.inconsistentRayServiceStatuses(oldStatus, *newStatus)) + + newStatus.ActiveServiceStatus.DashboardStatus.IsHealthy = !oldStatus.ActiveServiceStatus.DashboardStatus.IsHealthy + assert.True(t, r.inconsistentRayServiceStatuses(oldStatus, *newStatus)) +} + +func TestInconsistentRayServiceStatus(t *testing.T) { + timeNow := metav1.Now() + oldStatus := v1alpha1.RayServiceStatus{ + RayClusterName: "cluster-1", + DashboardStatus: v1alpha1.DashboardStatus{ + IsHealthy: true, + LastUpdateTime: &timeNow, + HealthLastUpdateTime: &timeNow, + }, + ApplicationStatus: v1alpha1.AppStatus{ + Status: "running", + Message: "Application is running", + LastUpdateTime: &timeNow, + HealthLastUpdateTime: &timeNow, + }, + ServeStatuses: []v1alpha1.ServeDeploymentStatus{ + { + Name: "serve-1", + Status: "healthy", + Message: "Serve is healthy", + LastUpdateTime: &timeNow, + HealthLastUpdateTime: &timeNow, + }, + }, + } + + r := &RayServiceReconciler{ + Log: ctrl.Log.WithName("controllers").WithName("RayService"), + } + + // Test 1: Only LastUpdateTime and HealthLastUpdateTime are updated. + newStatus := oldStatus.DeepCopy() + newStatus.DashboardStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(1)} + assert.False(t, r.inconsistentRayServiceStatus(oldStatus, *newStatus)) + + // Test 2: Not only LastUpdateTime and HealthLastUpdateTime are updated. + newStatus = oldStatus.DeepCopy() + newStatus.DashboardStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(1)} + newStatus.DashboardStatus.IsHealthy = !oldStatus.DashboardStatus.IsHealthy + assert.True(t, r.inconsistentRayServiceStatus(oldStatus, *newStatus)) +}