diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 14724daad6c..fc689a89a82 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -248,6 +248,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) // Check the current status of ray jobs jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId) if err != nil { + r.Log.Error(err, "failed to get job info", "jobId", rayJobInstance.Status.JobId) err = r.updateState(ctx, rayJobInstance, jobInfo, rayJobInstance.Status.JobStatus, rayv1.JobDeploymentStatusFailedToGetJobStatus, err) // Dashboard service in head pod takes time to start, it's possible we get connection refused error. // Requeue after few seconds to avoid continuous connection errors. @@ -255,8 +256,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) } // Update RayJob.Status (Kubernetes CR) from Ray Job Status from Dashboard service - if jobInfo != nil && jobInfo.JobStatus != rayJobInstance.Status.JobStatus { - r.Log.Info(fmt.Sprintf("Update status from %s to %s", rayJobInstance.Status.JobStatus, jobInfo.JobStatus), "rayjob", rayJobInstance.Status.JobId) + if r.shouldUpdateJobStatus(rayJobInstance.Status.JobStatus, rayJobInstance.Status.JobDeploymentStatus, jobInfo) { err = r.updateState(ctx, rayJobInstance, jobInfo, jobInfo.JobStatus, rayv1.JobDeploymentStatusRunning, nil) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } @@ -524,6 +524,17 @@ func (r *RayJobReconciler) initRayJobStatusIfNeed(ctx context.Context, rayJob *r return nil } +func (r *RayJobReconciler) shouldUpdateJobStatus(oldJobStatus rayv1.JobStatus, oldJobDeploymentStatus rayv1.JobDeploymentStatus, jobInfo *utils.RayJobInfo) bool { + if jobInfo != nil { + jobStatusChanged := (oldJobStatus != jobInfo.JobStatus) + // If the status changed, or if we didn't have the status before and now we have it, update the status and deployment status. + if jobStatusChanged || oldJobDeploymentStatus == rayv1.JobDeploymentStatusFailedToGetJobStatus { + return true + } + } + return false +} + // make sure the priority is correct func (r *RayJobReconciler) updateState(ctx context.Context, rayJob *rayv1.RayJob, jobInfo *utils.RayJobInfo, jobStatus rayv1.JobStatus, jobDeploymentStatus rayv1.JobDeploymentStatus, err error) error { // Let's skip update the APIServer if it's synced. diff --git a/ray-operator/controllers/ray/rayjob_controller_unit_test.go b/ray-operator/controllers/ray/rayjob_controller_unit_test.go index b5f907b4917..4625bf27681 100644 --- a/ray-operator/controllers/ray/rayjob_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_unit_test.go @@ -6,6 +6,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" + utils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/stretchr/testify/assert" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -176,3 +177,57 @@ func TestGetSubmitterTemplate(t *testing.T) { } assert.True(t, found) } + +func TestShouldUpdateJobStatus(t *testing.T) { + r := &RayJobReconciler{} + + tests := []struct { + name string + oldJobStatus rayv1.JobStatus + oldJobDeploymentStatus rayv1.JobDeploymentStatus + jobInfo *utils.RayJobInfo + expectedShouldUpdate bool + }{ + { + name: "jobInfo is nil", + oldJobStatus: rayv1.JobStatusPending, + oldJobDeploymentStatus: rayv1.JobDeploymentStatusRunning, + jobInfo: nil, + expectedShouldUpdate: false, + }, + { + name: "job status changed", + oldJobStatus: rayv1.JobStatusRunning, + oldJobDeploymentStatus: rayv1.JobDeploymentStatusRunning, + jobInfo: &utils.RayJobInfo{ + JobStatus: rayv1.JobStatusStopped, + }, + expectedShouldUpdate: true, + }, + { + name: "job status same but JobDeploymentStatus failed to get status", + oldJobStatus: rayv1.JobStatusRunning, + oldJobDeploymentStatus: rayv1.JobDeploymentStatusFailedToGetJobStatus, + jobInfo: &utils.RayJobInfo{ + JobStatus: rayv1.JobStatusRunning, + }, + expectedShouldUpdate: true, + }, + { + name: "job status same and JobDeploymentStatus not failed to get status", + oldJobStatus: rayv1.JobStatusRunning, + oldJobDeploymentStatus: rayv1.JobDeploymentStatusRunning, + jobInfo: &utils.RayJobInfo{ + JobStatus: rayv1.JobStatusRunning, + }, + expectedShouldUpdate: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := r.shouldUpdateJobStatus(tt.oldJobStatus, tt.oldJobDeploymentStatus, tt.jobInfo) + assert.Equal(t, tt.expectedShouldUpdate, result) + }) + } +}