Skip to content

Commit

Permalink
fix the suspend=true added to the job by the default job webhook has …
Browse files Browse the repository at this point in the history
…not taken effect

Signed-off-by: fjding <[email protected]>
  • Loading branch information
fjding committed May 11, 2023
1 parent 804a280 commit a3e923c
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 18 deletions.
8 changes: 4 additions & 4 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ func (h *parentWorkloadHandler) queueReconcileForChildJob(object client.Object,
}

type Job struct {
batchv1.Job
*batchv1.Job
}

func (j *Job) Object() client.Object {
return &j.Job
return j.Job
}

func (j *Job) IsSuspended() bool {
Expand Down Expand Up @@ -265,7 +265,7 @@ func (r *JobReconciler) SetupWithManager(mgr ctrl.Manager) error {
func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
if err := indexer.IndexField(ctx, &batchv1.Job{}, parentWorkloadKey, func(o client.Object) []string {
job := o.(*batchv1.Job)
if pwName := jobframework.ParentWorkloadName(&Job{*job}); pwName != "" {
if pwName := jobframework.ParentWorkloadName(&Job{job}); pwName != "" {
return []string{pwName}
}
return nil
Expand All @@ -287,7 +287,7 @@ func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {

func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
fjr := (*jobframework.JobReconciler)(r)
return fjr.ReconcileGenericJob(ctx, req, &Job{})
return fjr.ReconcileGenericJob(ctx, req, &Job{&batchv1.Job{}})
}

func GetWorkloadNameForJob(jobName string) string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestPodsReady(t *testing.T) {

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
batchJob := &Job{tc.job}
batchJob := &Job{&tc.job}
got := batchJob.PodsReady()
if tc.want != got {
t.Errorf("Unexpected response (want: %v, got: %v)", tc.want, got)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/jobs/job/job_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (w *JobWebhook) Default(ctx context.Context, obj runtime.Object) error {
}
}

jobframework.ApplyDefaultForSuspend(&Job{*job}, w.manageJobsWithoutQueueName)
jobframework.ApplyDefaultForSuspend(&Job{job}, w.manageJobsWithoutQueueName)
return nil
}

Expand All @@ -84,7 +84,7 @@ func (w *JobWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) err
job := obj.(*batchv1.Job)
log := ctrl.LoggerFrom(ctx).WithName("job-webhook")
log.V(5).Info("Validating create", "job", klog.KObj(job))
return validateCreate(&Job{*job}).ToAggregate()
return validateCreate(&Job{job}).ToAggregate()
}

func validateCreate(job jobframework.GenericJob) field.ErrorList {
Expand All @@ -100,7 +100,7 @@ func (w *JobWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.
newJob := newObj.(*batchv1.Job)
log := ctrl.LoggerFrom(ctx).WithName("job-webhook")
log.V(5).Info("Validating update", "job", klog.KObj(newJob))
return validateUpdate(&Job{*oldJob}, &Job{*newJob}).ToAggregate()
return validateUpdate(&Job{oldJob}, &Job{newJob}).ToAggregate()
}

func validateUpdate(oldJob, newJob jobframework.GenericJob) field.ErrorList {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/jobs/job/job_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestValidateCreate(t *testing.T) {

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
gotErr := validateCreate(&Job{*tc.job})
gotErr := validateCreate(&Job{tc.job})

if diff := cmp.Diff(tc.wantErr, gotErr); diff != "" {
t.Errorf("validateCreate() mismatch (-want +got):\n%s", diff)
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestValidateUpdate(t *testing.T) {

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
gotErr := validateUpdate(&Job{*tc.oldJob}, &Job{*tc.newJob})
gotErr := validateUpdate(&Job{tc.oldJob}, &Job{tc.newJob})

if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.IgnoreFields(field.Error{})); diff != "" {
t.Errorf("validateUpdate() mismatch (-want +got):\n%s", diff)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/jobs/mpijob/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ func NewReconciler(
}

type MPIJob struct {
kubeflow.MPIJob
*kubeflow.MPIJob
}

func (j *MPIJob) Object() client.Object {
return &j.MPIJob
return j.MPIJob
}

func (j *MPIJob) IsSuspended() bool {
Expand Down Expand Up @@ -251,7 +251,7 @@ func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {

func (r *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
fjr := (*jobframework.JobReconciler)(r)
return fjr.ReconcileGenericJob(ctx, req, &MPIJob{})
return fjr.ReconcileGenericJob(ctx, req, &MPIJob{&kubeflow.MPIJob{}})
}

func orderedReplicaTypes(jobSpec *kubeflow.MPIJobSpec) []kubeflow.MPIReplicaType {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/mpijob/mpijob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestCalcPriorityClassName(t *testing.T) {

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
mpiJob := MPIJob{tc.job}
mpiJob := MPIJob{&tc.job}
gotPriorityClassName := mpiJob.PriorityClass()
if tc.wantPriorityClassName != gotPriorityClassName {
t.Errorf("Unexpected response (want: %v, got: %v)", tc.wantPriorityClassName, gotPriorityClassName)
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/jobs/mpijob/mpijob_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (w *MPIJobWebhook) Default(ctx context.Context, obj runtime.Object) error {
log := ctrl.LoggerFrom(ctx).WithName("job-webhook")
log.V(5).Info("Applying defaults", "job", klog.KObj(job))

jobframework.ApplyDefaultForSuspend(&MPIJob{*job}, w.manageJobsWithoutQueueName)
jobframework.ApplyDefaultForSuspend(&MPIJob{job}, w.manageJobsWithoutQueueName)
return nil
}

Expand All @@ -72,7 +72,7 @@ func (w *MPIJobWebhook) ValidateCreate(ctx context.Context, obj runtime.Object)
job := obj.(*kubeflow.MPIJob)
log := ctrl.LoggerFrom(ctx).WithName("job-webhook")
log.Info("Validating create", "job", klog.KObj(job))
return validateCreate(&MPIJob{*job}).ToAggregate()
return validateCreate(&MPIJob{job}).ToAggregate()
}

func validateCreate(job jobframework.GenericJob) field.ErrorList {
Expand All @@ -82,9 +82,9 @@ func validateCreate(job jobframework.GenericJob) field.ErrorList {
// ValidateUpdate implements webhook.CustomValidator so a webhook will be registered for the type
func (w *MPIJobWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) error {
oldJob := oldObj.(*kubeflow.MPIJob)
oldGenJob := &MPIJob{*oldJob}
oldGenJob := &MPIJob{oldJob}
newJob := newObj.(*kubeflow.MPIJob)
newGenJob := &MPIJob{*newJob}
newGenJob := &MPIJob{newJob}
log := ctrl.LoggerFrom(ctx).WithName("job-webhook")
log.Info("Validating update", "job", klog.KObj(newJob))
allErrs := jobframework.ValidateUpdateForQueueName(oldGenJob, newGenJob)
Expand Down

0 comments on commit a3e923c

Please sign in to comment.