Skip to content

Commit

Permalink
Improve pod/wl cleanup for pod groups (kubernetes#1373)
Browse files Browse the repository at this point in the history
* Add StopReason argument for the Stop method of the
  JobWithCustomStop interface.

* Change Stop method for the pod controller to delete
  even suspended pods if workload is being deleted.

* Change pods in group finalization for the case of workload
  deletion. Now all pods in the group will be finalized
  right after the pod group has been stopped.

* Add new unit tests for the pod controller and webhook.
  • Loading branch information
achernevskii authored Nov 29, 2023
1 parent 808319d commit 1636c16
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 47 deletions.
11 changes: 10 additions & 1 deletion pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,20 @@ type JobWithReclaimablePods interface {
ReclaimablePods() []kueue.ReclaimablePod
}

type StopReason int

const (
StopReasonWorkloadDeleted StopReason = iota
StopReasonWorkloadEvicted
StopReasonNoMatchingWorkload
StopReasonNotAdmitted
)

type JobWithCustomStop interface {
// Stop implements a custom stop procedure.
// The function should be idempotent: not do any API calls if the job is already stopped.
// Returns whether the Job stopped with this call or an error
Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, eventMsg string) (bool, error)
Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) (bool, error)
}

// JobWithFinalize interface should be implemented by generic jobs,
Expand Down
15 changes: 9 additions & 6 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
// and drop the finalizer.
if wl != nil && !wl.DeletionTimestamp.IsZero() {
log.V(2).Info("The workload is marked for deletion")
err := r.stopJob(ctx, job, object, wl, "Workload is deleted")
err := r.stopJob(ctx, job, wl, StopReasonWorkloadDeleted, "Workload is deleted")
if err != nil {
log.Error(err, "Suspending job with deleted workload")
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques

// 6. handle eviction
if evCond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadEvicted); evCond != nil && evCond.Status == metav1.ConditionTrue {
if err := r.stopJob(ctx, job, object, wl, evCond.Message); err != nil {
if err := r.stopJob(ctx, job, wl, StopReasonWorkloadEvicted, evCond.Message); err != nil {
return ctrl.Result{}, err
}
if workload.HasQuotaReservation(wl) {
Expand Down Expand Up @@ -367,7 +367,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
if !workload.IsAdmitted(wl) {
// the job must be suspended if the workload is not yet admitted.
log.V(2).Info("Running job is not admitted by a cluster queue, suspending")
err := r.stopJob(ctx, job, object, wl, "Not admitted by cluster queue")
err := r.stopJob(ctx, job, wl, StopReasonNotAdmitted, "Not admitted by cluster queue")
if err != nil {
log.Error(err, "Suspending job with non admitted workload")
}
Expand Down Expand Up @@ -455,7 +455,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
return nil, fmt.Errorf("finalizing job with no matching workload: %w", err)
}
} else {
if err := r.stopJob(ctx, job, object, w, "No matching Workload"); err != nil {
if err := r.stopJob(ctx, job, w, StopReasonNoMatchingWorkload, "No matching Workload"); err != nil {
return nil, fmt.Errorf("stopping job with no matching workload: %w", err)
}
}
Expand Down Expand Up @@ -597,14 +597,17 @@ func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object cli

// stopJob will suspend the job, and also restore node affinity, reset job status if needed.
// Returns whether any operation was done to stop the job or an error.
func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload, eventMsg string) error {
func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, wl *kueue.Workload, stopReason StopReason, eventMsg string) error {
object := job.Object()

info := GetPodSetsInfoFromWorkload(wl)

if jws, implements := job.(JobWithCustomStop); implements {
stoppedNow, err := jws.Stop(ctx, r.client, info, eventMsg)
stoppedNow, err := jws.Stop(ctx, r.client, info, stopReason, eventMsg)
if stoppedNow {
r.record.Eventf(object, corev1.EventTypeNormal, "Stopped", eventMsg)
}

return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (j *Job) Suspend() {
j.Spec.Suspend = ptr.To(true)
}

func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, eventMsg string) (bool, error) {
func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, _ jobframework.StopReason, eventMsg string) (bool, error) {
stoppedNow := false
if !j.IsSuspended() {
j.Suspend()
Expand Down
40 changes: 12 additions & 28 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (p *Pod) GVK() schema.GroupVersionKind {
return gvk
}

func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo, eventMsg string) (bool, error) {
func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo, stopReason jobframework.StopReason, eventMsg string) (bool, error) {
var podsInGroup []corev1.Pod

if p.isGroup {
Expand All @@ -258,7 +258,8 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo,
}

for i := range podsInGroup {
if podSuspended(&podsInGroup[i]) || !podsInGroup[i].DeletionTimestamp.IsZero() {
// If the workload is being deleted, delete even finished Pods.
if !podsInGroup[i].DeletionTimestamp.IsZero() || (stopReason != jobframework.StopReasonWorkloadDeleted && podSuspended(&podsInGroup[i])) {
continue
}
podInGroup := fromObject(&podsInGroup[i])
Expand Down Expand Up @@ -293,26 +294,17 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo,
}
}

return true, nil
}

// hasIssuedAllStops will return true if all pods in the group has been stopped
func (p *Pod) hasIssuedAllStops() bool {
var podsInGroup []corev1.Pod

if p.isGroup {
podsInGroup = p.list.Items
} else {
podsInGroup = []corev1.Pod{p.pod}
}

for i := range podsInGroup {
if podsInGroup[i].ObjectMeta.DeletionTimestamp.IsZero() {
return false
// If related workload is deleted, the generic reconciler will stop the pod group and finalize the workload.
// However, it won't finalize the pods. Since the Stop method for the pod group deletes all the pods in the
// group, the pods will be finalized here.
if p.isGroup && stopReason == jobframework.StopReasonWorkloadDeleted {
err := p.Finalize(ctx, c)
if err != nil {
return false, err
}
}

return true
return true, nil
}

func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
Expand Down Expand Up @@ -397,15 +389,7 @@ func (p *Pod) Load(ctx context.Context, c client.Client, key types.NamespacedNam
return false, err
}

wl, toDelete, err := p.FindMatchingWorkloads(ctx, c)
if err != nil {
return false, err
}

// If related workload is deleted, the generic reconciler will stop the pod group and finalize the workload.
// However, it won't finalize the pods. Since the Stop method for the pod group deletes all the pods in the
// group, the pods will be finalized here, on the next reconciliation.
return wl == nil && len(toDelete) == 0 && p.hasIssuedAllStops(), nil
return false, nil
}

func (p *Pod) constructGroupPodSets(podsInGroup corev1.PodList) ([]kueue.PodSet, error) {
Expand Down
32 changes: 22 additions & 10 deletions pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,13 @@ func TestReconciler(t *testing.T) {
Image("", nil)

testCases := map[string]struct {
initObjects []client.Object
pods []corev1.Pod
wantPods []corev1.Pod
workloads []kueue.Workload
wantWorkloads []kueue.Workload
wantErr error
initObjects []client.Object
pods []corev1.Pod
wantPods []corev1.Pod
workloads []kueue.Workload
wantWorkloads []kueue.Workload
// wantErrs should be the same length and order as pods
wantErrs []error
workloadCmpOpts []cmp.Option
// If true, the test will delete workloads before running reconcile
deleteWorkloads bool
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestReconciler(t *testing.T) {
Admitted(true).
Obj(),
},
wantErr: jobframework.ErrNoMatchingWorkloads,
wantErrs: []error{jobframework.ErrNoMatchingWorkloads},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"the workload is created when queue name is set": {
Expand Down Expand Up @@ -966,6 +967,7 @@ func TestReconciler(t *testing.T) {
Queue("test-queue").
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
wantPods: []corev1.Pod{},
Expand Down Expand Up @@ -1410,6 +1412,9 @@ func TestReconciler(t *testing.T) {

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
if tc.wantErrs != nil && len(tc.wantErrs) != len(tc.pods) {
t.Fatalf("pods and wantErrs in the test should be the same length and order")
}
ctx, _ := utiltesting.ContextWithLog(t)
clientBuilder := utiltesting.NewClientBuilder()
if err := SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil {
Expand Down Expand Up @@ -1451,13 +1456,20 @@ func TestReconciler(t *testing.T) {
recorder := record.NewBroadcaster().NewRecorder(kClient.Scheme(), corev1.EventSource{Component: "test"})
reconciler := NewReconciler(kClient, recorder)

for _, pod := range tc.pods {
podKey := client.ObjectKeyFromObject(&pod)
for i := range tc.pods {
podKey := client.ObjectKeyFromObject(&tc.pods[i])
_, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: podKey,
})

if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" {
var wantErr error
if tc.wantErrs == nil {
wantErr = nil
} else {
wantErr = tc.wantErrs[i]
}

if diff := cmp.Diff(wantErr, err, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Reconcile returned error (-want,+got):\n%s", diff)
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/jobs/pod/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ func volumesShape(volumes []corev1.Volume) (result []corev1.Volume) {
}

func getRoleHash(p *Pod) (string, error) {

shape := map[string]interface{}{
"metadata": map[string]interface{}{
"labels": omitKueueLabels(p.pod.ObjectMeta.Labels),
Expand Down
63 changes: 63 additions & 0 deletions pkg/controller/jobs/pod/pod_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,69 @@ func TestDefault(t *testing.T) {
}
}

func TestGetRoleHash(t *testing.T) {
testCases := map[string]struct {
pods []*Pod
// If true, hash for all the pods in test should be equal
wantEqualHash bool
wantErr error
}{
"kueue.x-k8s.io/* labels shouldn't affect the role": {
pods: []*Pod{
{pod: *testingpod.MakePod("pod1", "test-ns").
Label("kueue.x-k8s.io/managed", "true").
Obj()},
{pod: *testingpod.MakePod("pod2", "test-ns").
Obj()},
},
wantEqualHash: true,
},
"volume name shouldn't affect the role": {
pods: []*Pod{
{pod: *testingpod.MakePod("pod1", "test-ns").
Volume(corev1.Volume{
Name: "volume1",
}).
Obj()},
{pod: *testingpod.MakePod("pod1", "test-ns").
Volume(corev1.Volume{
Name: "volume2",
}).
Obj()},
},
wantEqualHash: true,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {

var previousHash string
for i := range tc.pods {
hash, err := getRoleHash(tc.pods[i])

if diff := cmp.Diff(tc.wantErr, err); diff != "" {
t.Errorf("Unexpected error (-want,+got):\n%s", diff)
}

if previousHash != "" {
if tc.wantEqualHash {
if previousHash != hash {
t.Errorf("Hash of pod shapes shouldn't be different %s!=%s", previousHash, hash)
}
} else {
if previousHash == hash {
t.Errorf("Hash of pod shapes shouldn't be equal %s==%s", previousHash, hash)
}
}
}

previousHash = hash
}
})
}
}

func TestValidateCreate(t *testing.T) {
testCases := map[string]struct {
pod *corev1.Pod
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/pod/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,9 @@ func (p *PodWrapper) Delete() *PodWrapper {
p.Pod.DeletionTimestamp = &t
return p
}

// Volume adds a new volume for the pod object
func (p *PodWrapper) Volume(v corev1.Volume) *PodWrapper {
p.Pod.Spec.Volumes = append(p.Pod.Spec.Volumes, v)
return p
}

0 comments on commit 1636c16

Please sign in to comment.