Skip to content

Commit

Permalink
Fix job finalization logic in generic reconciler (kubernetes#1177)
Browse files Browse the repository at this point in the history
* Fix job finalization logic in generic reconciler

* In the case of a finished job, it should be
  finalized before updating the Workload status,
  because if any errors are occurring during
  finalization, it will never be reached
  by reconciler again if workload has a 'Finished'
  condition.

* Add test case for pod integration, change old test

* Add a test case validating that 'Finished'
  workload status condition is added even if the
  pod is finalized. Which could happen due to
  client error while updating wl conditions.

* Simplify 'TestReconciler_ErrorFinalizingPod'.

* Fix pod_controller_test.go for go race detector

* Replace Pod mock with the client interceptor func
  • Loading branch information
achernevskii authored Oct 5, 2023
1 parent e162f85 commit 2dd73b7
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 6 deletions.
10 changes: 6 additions & 4 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,16 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques

// 2. handle job is finished.
if condition, finished := job.Finished(); finished && wl != nil {
err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
if err != nil {
log.Error(err, "Updating workload status")
}
// Execute job finalization logic
if err := r.finalizeJob(ctx, job); err != nil {
return ctrl.Result{}, err
}

err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
if err != nil {
log.Error(err, "Updating workload status")
}

return ctrl.Result{}, nil
}

Expand Down
158 changes: 156 additions & 2 deletions pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@ limitations under the License.
package pod

import (
"context"
"fmt"
"syscall"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand Down Expand Up @@ -339,6 +343,39 @@ func TestReconciler(t *testing.T) {
}),
),
},
"workload status condition is added even if the pod is finalized": {
pod: *basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
StatusPhase(corev1.PodSucceeded).
Obj(),
wantPod: basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
StatusPhase(corev1.PodSucceeded).
Obj(),
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(true).
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(true).
Condition(metav1.Condition{
Type: "Finished",
Status: "True",
Reason: "JobFinished",
Message: "Job finished successfully",
}).
Obj(),
},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"pod without scheduling gate is terminated if workload is not admitted": {
pod: *basePodWrapper.
Clone().
Expand Down Expand Up @@ -428,7 +465,7 @@ func TestReconciler(t *testing.T) {

var gotPod corev1.Pod
if err := kClient.Get(ctx, podKey, &gotPod); err != nil {
if tc.wantPod != nil || !errors.IsNotFound(err) {
if tc.wantPod != nil || !apierrors.IsNotFound(err) {
t.Fatalf("Could not get Pod after reconcile: %v", err)
}
}
Expand All @@ -448,6 +485,123 @@ func TestReconciler(t *testing.T) {
}
}

func TestReconciler_ErrorFinalizingPod(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)

clientBuilder := utiltesting.NewClientBuilder()
if err := SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil {
t.Fatalf("Could not setup indexes: %v", err)
}

basePodWrapper := testingpod.MakePod("pod", "ns").
UID("test-uid").
Queue("user-queue").
Request(corev1.ResourceCPU, "1").
Image("", nil)

pod := *basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
StatusPhase(corev1.PodSucceeded).
Obj()

wl := *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(true).
Obj()

reqcount := 0
errMock := fmt.Errorf("connection refused: %w", syscall.ECONNREFUSED)

kcBuilder := clientBuilder.
WithObjects(&pod).
WithStatusSubresource(&wl).
WithInterceptorFuncs(interceptor.Funcs{
Update: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.UpdateOption) error {
gvk := obj.GetObjectKind().GroupVersionKind()
if gvk.GroupVersion().String() == "v1" && gvk.Kind == "Pod" {
defer func() { reqcount++ }()
if reqcount == 0 {
// return a connection refused error for the first update request.
return errMock
}
if reqcount == 1 {
// Exec a regular update operation for the second request
return client.Update(ctx, obj, opts...)
}
}
return client.Update(ctx, obj, opts...)
},
})

kClient := kcBuilder.Build()
if err := ctrl.SetControllerReference(&pod, &wl, kClient.Scheme()); err != nil {
t.Fatalf("Could not setup owner reference in Workloads: %v", err)
}
if err := kClient.Create(ctx, &wl); err != nil {
t.Fatalf("Could not create workload: %v", err)
}
recorder := record.NewBroadcaster().NewRecorder(kClient.Scheme(), corev1.EventSource{Component: "test"})

reconciler := NewReconciler(kClient, recorder)

podKey := client.ObjectKeyFromObject(&pod)
_, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: podKey,
})

if diff := cmp.Diff(errMock, err, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Expected reconcile error (-want,+got):\n%s", diff)
}

// Reconcile for the second time
_, err = reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: podKey,
})
if err != nil {
t.Errorf("Got unexpected error while running reconcile:\n%v", err)
}

var gotPod corev1.Pod
if err := kClient.Get(ctx, podKey, &gotPod); err != nil {
t.Fatalf("Could not get Pod after second reconcile: %v", err)
}
// Validate that pod has no finalizer after the second reconcile
wantPod := *basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
StatusPhase(corev1.PodSucceeded).
Obj()
if diff := cmp.Diff(wantPod, gotPod, podCmpOpts...); diff != "" {
t.Errorf("Pod after second reconcile (-want,+got):\n%s", diff)
}

var gotWorkloads kueue.WorkloadList
if err := kClient.List(ctx, &gotWorkloads); err != nil {
t.Fatalf("Could not get Workloads after second reconcile: %v", err)
}

// Workload should be finished after the second reconcile
wantWl := *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(true).
Condition(
metav1.Condition{
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: "JobFinished",
Message: "Job finished successfully",
},
).
Obj()
if diff := cmp.Diff([]kueue.Workload{wantWl}, gotWorkloads.Items, defaultWorkloadCmpOpts...); diff != "" {
t.Errorf("Workloads after second reconcile (-want,+got):\n%s", diff)
}
}

func TestIsPodOwnerManagedByQueue(t *testing.T) {
testCases := map[string]struct {
ownerReference metav1.OwnerReference
Expand Down

0 comments on commit 2dd73b7

Please sign in to comment.