Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix XGBoost conditions bug #1737

Merged
merged 1 commit into from
Jan 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions pkg/controller.v1/xgboost/xgboostjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/kubeflow/common/pkg/controller.v1/control"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
commonutil "github.com/kubeflow/common/pkg/util"
logger "github.com/kubeflow/common/pkg/util"
kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common"
"github.com/kubeflow/training-operator/pkg/common/util"
Expand Down Expand Up @@ -376,13 +375,15 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[com
return err
}

logger := commonutil.LoggerForJob(xgboostJob)

// Set StartTime.
if jobStatus.StartTime == nil {
now := metav1.Now()
jobStatus.StartTime = &now
// enqueue a sync to check if job past ActiveDeadlineSeconds
if xgboostJob.Spec.RunPolicy.ActiveDeadlineSeconds != nil {
logger.LoggerForJob(xgboostJob).Infof("Job with ActiveDeadlineSeconds will sync after %d seconds", *xgboostJob.Spec.RunPolicy.ActiveDeadlineSeconds)
logger.Infof("Job with ActiveDeadlineSeconds will sync after %d seconds", *xgboostJob.Spec.RunPolicy.ActiveDeadlineSeconds)
r.WorkQueue.AddAfter(xgboostJobKey, time.Duration(*xgboostJob.Spec.RunPolicy.ActiveDeadlineSeconds)*time.Second)
}
}
Expand All @@ -403,7 +404,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[com
msg := fmt.Sprintf("XGBoostJob %s is running.", xgboostJob.Name)
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobRunning, xgboostJobRunningReason, msg)
if err != nil {
logger.LoggerForJob(xgboostJob).Infof("Append job condition error: %v", err)
logger.Infof("Append job condition error: %v", err)
return err
}
}
Expand All @@ -418,7 +419,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[com
}
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobSucceeded, xgboostJobSucceededReason, msg)
if err != nil {
logger.LoggerForJob(xgboostJob).Infof("Append job condition error: %v", err)
logger.Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.SuccessfulJobsCounterInc(xgboostJob.Namespace, kubeflowv1.XGBoostJobFrameworkName)
Expand All @@ -431,7 +432,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[com
r.Recorder.Event(xgboostJob, corev1.EventTypeWarning, xgboostJobRestartingReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobRestarting, xgboostJobRestartingReason, msg)
if err != nil {
logger.LoggerForJob(xgboostJob).Infof("Append job condition error: %v", err)
logger.Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.RestartedJobsCounterInc(xgboostJob.Namespace, kubeflowv1.XGBoostJobFrameworkName)
Expand All @@ -444,23 +445,13 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[com
}
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobFailed, xgboostJobFailedReason, msg)
if err != nil {
logger.LoggerForJob(xgboostJob).Infof("Append job condition error: %v", err)
logger.Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.FailedJobsCounterInc(xgboostJob.Namespace, kubeflowv1.XGBoostJobFrameworkName)
}
}
}

// Some workers are still running, leave a running condition.
msg := fmt.Sprintf("XGBoostJob %s is running.", xgboostJob.Name)
logger.LoggerForJob(xgboostJob).Infof(msg)

if err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobRunning, xgboostJobRunningReason, msg); err != nil {
logger.LoggerForJob(xgboostJob).Error(err, "failed to update XGBoost Job conditions")
return err
}

return nil
}

Expand All @@ -484,7 +475,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobSt
result := r.Status().Update(context.Background(), xgboostjob)

if result != nil {
logger.LoggerForJob(xgboostjob).Error(result, "failed to update XGBoost Job conditions in the API server")
commonutil.LoggerForJob(xgboostjob).Error(result, "failed to update XGBoost Job conditions in the API server")
return result
}

Expand Down