From eafdc7f6144cc71be6b6b1dc40f1229ed9613da7 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 21 Oct 2024 08:48:15 +0200 Subject: [PATCH] review comments --- pkg/controller/tas/indexer.go | 5 +- pkg/controller/tas/topology_ungater.go | 108 ++-- pkg/controller/tas/topology_ungater_test.go | 566 ++++++++++++++++++-- pkg/util/testingjobs/pod/wrappers.go | 16 +- 4 files changed, 596 insertions(+), 99 deletions(-) diff --git a/pkg/controller/tas/indexer.go b/pkg/controller/tas/indexer.go index b11a044624..0bf6ad1159 100644 --- a/pkg/controller/tas/indexer.go +++ b/pkg/controller/tas/indexer.go @@ -31,7 +31,10 @@ const ( func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { return indexer.IndexField(ctx, &corev1.Pod{}, workloadNameKey, func(o client.Object) []string { - pod := o.(*corev1.Pod) + pod, ok := o.(*corev1.Pod) + if !ok { + return nil + } value, found := pod.Annotations[kueuealpha.WorkloadAnnotation] if !found { return nil diff --git a/pkg/controller/tas/topology_ungater.go b/pkg/controller/tas/topology_ungater.go index 7f3973000b..83f00dd0a2 100644 --- a/pkg/controller/tas/topology_ungater.go +++ b/pkg/controller/tas/topology_ungater.go @@ -18,6 +18,7 @@ package tas import ( "context" + "slices" "time" "github.com/go-logr/logr" @@ -41,9 +42,12 @@ import ( "sigs.k8s.io/kueue/pkg/util/parallelize" utilpod "sigs.k8s.io/kueue/pkg/util/pod" utiltas "sigs.k8s.io/kueue/pkg/util/tas" + "sigs.k8s.io/kueue/pkg/workload" ) const ( + // batch reconciles in 1s intervals to avoid cascades of reconciles when + // a large Job creates its Pods. ungateBatchPeriod = time.Second ) @@ -85,43 +89,28 @@ type podHandler struct { } func (h *podHandler) Create(_ context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { - pod, isPod := e.Object.(*corev1.Pod) - if !isPod { - return - } - h.queueReconcileForPod(pod, q) + h.queueReconcileForPod(e.Object, false, q) } -func (h *podHandler) Update(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { - oldPod, isOldPod := e.ObjectOld.(*corev1.Pod) - newPod, isNewPod := e.ObjectNew.(*corev1.Pod) - if !isOldPod || !isNewPod { - return - } - h.queueReconcileForPod(oldPod, q) - h.queueReconcileForPod(newPod, q) +func (h *podHandler) Update(_ context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + h.queueReconcileForPod(e.ObjectNew, false, q) } func (h *podHandler) Delete(_ context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { - pod, isPod := e.Object.(*corev1.Pod) - if !isPod { - return - } - h.queueReconcileForPod(pod, q) + h.queueReconcileForPod(e.Object, true, q) } -func (h *podHandler) queueReconcileForPod(pod *corev1.Pod, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { - if pod == nil { - return - } - if !utilpod.HasGate(pod, kueuealpha.TopologySchedulingGate) { +func (h *podHandler) queueReconcileForPod(object client.Object, deleted bool, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + pod, isPod := object.(*corev1.Pod) + if !isPod { return } if wlName, found := pod.Annotations[kueuealpha.WorkloadAnnotation]; found { - q.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{ + key := types.NamespacedName{ Name: wlName, Namespace: pod.Namespace, - }}, ungateBatchPeriod) + } + q.AddAfter(reconcile.Request{NamespacedName: key}, ungateBatchPeriod) } } @@ -129,7 +118,7 @@ func (h *podHandler) Generic(context.Context, event.GenericEvent, workqueue.Type } func (r *topologyUngater) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - log := ctrl.LoggerFrom(ctx).WithValues("workload", req.NamespacedName.Name) + log := ctrl.LoggerFrom(ctx).WithValues("workload", req.NamespacedName.String()) log.V(2).Info("Reconcile Topology Ungater") wl := &kueue.Workload{} @@ -137,11 +126,14 @@ func (r *topologyUngater) Reconcile(ctx context.Context, req reconcile.Request) if client.IgnoreNotFound(err) != nil { return reconcile.Result{}, err } - log.Info("workload not found") + log.V(5).Info("workload not found") return reconcile.Result{}, nil } - if wl.Status.Admission == nil { - log.Info("workload is not admitted") + if !isWorkloadAdmittedByTAS(wl) { + // this is a safeguard. In particular, it helps to prevent the race + // condition if the workload is evicted before the reconcile is + // triggered. + log.V(5).Info("workload is not admitted by TAS") return reconcile.Result{}, nil } @@ -153,7 +145,7 @@ func (r *topologyUngater) Reconcile(ctx context.Context, req reconcile.Request) log.Error(err, "failed to identify pods to ungate", "podset", psa.Name, "count", psa.Count) return reconcile.Result{}, err } else { - log.Info("identified pods to ungate for podset", "podset", psa.Name, "count", len(toUngate)) + log.V(2).Info("identified pods to ungate for podset", "podset", psa.Name, "count", len(toUngate)) allToUngate = append(allToUngate, toUngate...) } } @@ -161,6 +153,7 @@ func (r *topologyUngater) Reconcile(ctx context.Context, req reconcile.Request) var err error if len(allToUngate) > 0 { log.V(2).Info("identified pods to ungate", "count", len(allToUngate)) + err = parallelize.Until(ctx, len(allToUngate), func(i int) error { podWithUngateInfo := &allToUngate[i] e := utilclient.Patch(ctx, r.client, podWithUngateInfo.pod, true, func() (bool, error) { @@ -175,6 +168,7 @@ func (r *topologyUngater) Reconcile(ctx context.Context, req reconcile.Request) return true, nil }) if e != nil { + // We won't observe this cleanup in the event handler. log.Error(e, "failed ungating pod", "pod", klog.KObj(podWithUngateInfo.pod)) } return e @@ -187,44 +181,35 @@ func (r *topologyUngater) Reconcile(ctx context.Context, req reconcile.Request) } func (r *topologyUngater) Create(event event.CreateEvent) bool { - wl, isWl := event.Object.(*kueue.Workload) - if isWl { - return isTASWorkload(wl) - } - return true + return isWorkloadAdmittedByTAS(event.Object) } func (r *topologyUngater) Delete(event event.DeleteEvent) bool { - wl, isWl := event.Object.(*kueue.Workload) - if isWl { - return isTASWorkload(wl) - } - return true + return isWorkloadAdmittedByTAS(event.Object) } func (r *topologyUngater) Update(event event.UpdateEvent) bool { - _, isOldWl := event.ObjectOld.(*kueue.Workload) - newWl, isNewWl := event.ObjectNew.(*kueue.Workload) - if isOldWl && isNewWl { - return isTASWorkload(newWl) - } - return true + return isWorkloadAdmittedByTAS(event.ObjectNew) } -func isTASWorkload(wl *kueue.Workload) bool { - if wl.Status.Admission == nil { +func (r *topologyUngater) Generic(event event.GenericEvent) bool { + return false +} + +func isWorkloadAdmittedByTAS(object client.Object) bool { + wl, isWl := object.(*kueue.Workload) + if !isWl { return false } - for _, psa := range wl.Status.Admission.PodSetAssignments { - if psa.TopologyAssignment != nil { - return true - } - } - return false + return isAdmittedByTAS(wl) } -func (r *topologyUngater) Generic(event event.GenericEvent) bool { - return false +func isAdmittedByTAS(w *kueue.Workload) bool { + return w.Status.Admission != nil && workload.IsAdmitted(w) && + slices.ContainsFunc(w.Status.Admission.PodSetAssignments, + func(psa kueue.PodSetAssignment) bool { + return psa.TopologyAssignment != nil + }) } func (r *topologyUngater) podsetPodsToUngate(ctx context.Context, log logr.Logger, wl *kueue.Workload, psa *kueue.PodSetAssignment) ([]podWithUngateInfo, error) { @@ -292,9 +277,14 @@ func (r *topologyUngater) podsForDomain(ctx context.Context, ns, wlName, psName }); err != nil { return nil, err } - result := make([]*corev1.Pod, 0) - for _, p := range pods.Items { - result = append(result, &p) + result := make([]*corev1.Pod, 0, len(pods.Items)) + for i := range pods.Items { + if pods.Items[i].Status.Phase == corev1.PodFailed { + // ignore failed pods as they need to be replaced, and so we don't + // want to count them as already ungated Pods. + continue + } + result = append(result, &pods.Items[i]) } return result, nil } diff --git a/pkg/controller/tas/topology_ungater_test.go b/pkg/controller/tas/topology_ungater_test.go index 3e0765a941..89ba0479eb 100644 --- a/pkg/controller/tas/topology_ungater_test.go +++ b/pkg/controller/tas/topology_ungater_test.go @@ -31,7 +31,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" - "sigs.k8s.io/kueue/pkg/constants" + utilpod "sigs.k8s.io/kueue/pkg/util/pod" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" @@ -82,11 +82,15 @@ func TestReconcile(t *testing.T) { result := make(map[string]*counts, len(pods)) for i := range pods { pod := pods[i] + // don't consider failed pods as ungated + if utilpod.HasGate(&pod, kueuealpha.TopologySchedulingGate) || + pod.Status.Phase == corev1.PodFailed { + continue + } key := mapToJSON(t, pod.Spec.NodeSelector) if _, found := result[key]; !found { result[key] = &counts{ NodeSelector: maps.Clone(pod.Spec.NodeSelector), - Count: 0, } } result[key].Count++ @@ -129,18 +133,14 @@ func TestReconcile(t *testing.T) { pods: []corev1.Pod{ *testingpod.MakePod("pod", "ns"). Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). - Label(constants.ManagedByKueueLabel, "true"). Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). - KueueFinalizer(). TopologySchedulingGate(). Obj(), }, wantPods: []corev1.Pod{ *testingpod.MakePod("pod", "ns"). Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). - Label(constants.ManagedByKueueLabel, "true"). Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). - KueueFinalizer(). Obj(), }, wantCounts: []counts{ @@ -181,31 +181,23 @@ func TestReconcile(t *testing.T) { pods: []corev1.Pod{ *testingpod.MakePod("pod1", "ns"). Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). - Label(constants.ManagedByKueueLabel, "true"). Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). - KueueFinalizer(). TopologySchedulingGate(). Obj(), *testingpod.MakePod("pod2", "ns"). Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). - Label(constants.ManagedByKueueLabel, "true"). Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). - KueueFinalizer(). TopologySchedulingGate(). Obj(), }, wantPods: []corev1.Pod{ *testingpod.MakePod("pod1", "ns"). Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). - Label(constants.ManagedByKueueLabel, "true"). Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). - KueueFinalizer(). Obj(), *testingpod.MakePod("pod2", "ns"). Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). - Label(constants.ManagedByKueueLabel, "true"). Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). - KueueFinalizer(). Obj(), }, wantCounts: []counts{ @@ -253,31 +245,23 @@ func TestReconcile(t *testing.T) { pods: []corev1.Pod{ *testingpod.MakePod("pod1", "ns"). Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). - Label(constants.ManagedByKueueLabel, "true"). Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). - KueueFinalizer(). TopologySchedulingGate(). Obj(), *testingpod.MakePod("pod2", "ns"). Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). - Label(constants.ManagedByKueueLabel, "true"). Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). - KueueFinalizer(). TopologySchedulingGate(). Obj(), }, wantPods: []corev1.Pod{ *testingpod.MakePod("pod1", "ns"). Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). - Label(constants.ManagedByKueueLabel, "true"). Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). - KueueFinalizer(). Obj(), *testingpod.MakePod("pod2", "ns"). Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). - Label(constants.ManagedByKueueLabel, "true"). Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). - KueueFinalizer(). Obj(), }, wantCounts: []counts{ @@ -297,6 +281,528 @@ func TestReconcile(t *testing.T) { }, }, }, + "workload without admission - pod remains gated": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 2).Request(corev1.ResourceCPU, "1").Obj()). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("pod1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + wantPods: []corev1.Pod{ + *testingpod.MakePod("pod1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + }, + "workload admitted but without topology assignment - pod remains gated": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 2).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(2). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("pod1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + wantPods: []corev1.Pod{ + *testingpod.MakePod("pod1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + }, + "workload with admission (reserved quota), but not admitted - pod remains gated": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 2).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(2). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "b1", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(false). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("pod1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + wantPods: []corev1.Pod{ + *testingpod.MakePod("pod1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + }, + "workload admitted by TAS with single pod without the Workload annotation - Pod remains gated": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "b1", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("pod", "ns"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + wantPods: []corev1.Pod{ + *testingpod.MakePod("pod", "ns"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + }, + "workload admitted by TAS with single pod without the PodSet label - Pod remains gated": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "b1", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("pod", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + TopologySchedulingGate(). + Obj(), + }, + wantPods: []corev1.Pod{ + *testingpod.MakePod("pod", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + TopologySchedulingGate(). + Obj(), + }, + }, + "workload admitted by TAS with single pod without topology gate, remains gated by another gate": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "b1", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("pod", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Gate("example.com/gate"). + Obj(), + }, + wantPods: []corev1.Pod{ + *testingpod.MakePod("pod", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Gate("example.com/gate"). + Obj(), + }, + wantCounts: []counts{ + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r1", + }, + Count: 1, + }, + }, + }, + "workload admitted by TAS with single pod with topology gate and another gate, ungated, but remains gated by another gate": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "b1", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("pod", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Gate("example.com/gate"). + Obj(), + }, + wantPods: []corev1.Pod{ + *testingpod.MakePod("pod", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Gate("example.com/gate"). + Obj(), + }, + wantCounts: []counts{ + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r1", + }, + Count: 1, + }, + }, + }, + "expect single pod; one already running - don't ungate second pod": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "b1", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("pod-already-running", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + StatusPhase(corev1.PodRunning). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("pod-gated", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + wantPods: []corev1.Pod{ + *testingpod.MakePod("pod-already-running", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + StatusPhase(corev1.PodRunning). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("pod-gated", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + wantCounts: []counts{ + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r1", + }, + Count: 1, + }, + }, + }, + "expect single pod; one ungated pod failed - ungate second pod": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "b1", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("pod-already-running", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + StatusPhase(corev1.PodFailed). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("pod-gated", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + wantPods: []corev1.Pod{ + *testingpod.MakePod("pod-already-running", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + StatusPhase(corev1.PodFailed). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("pod-gated", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + }, + wantCounts: []counts{ + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r1", + }, + Count: 1, + }, + }, + }, + "two pods, one already ungated, second to ungate": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "2"). + AssignmentPodCount(2). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 2, + Values: []string{ + "b1", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("pod1", "ns").UID("x"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("pod2", "ns").UID("y"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + wantPods: []corev1.Pod{ + *testingpod.MakePod("pod1", "ns").UID("x"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("pod2", "ns").UID("y"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + }, + wantCounts: []counts{ + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r1", + }, + Count: 2, + }, + }, + }, + "expect single pod; one ungated pod succeeded - don't ungate second pod": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "b1", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("pod-already-succeeded", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + StatusPhase(corev1.PodSucceeded). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("pod-gated", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + wantPods: []corev1.Pod{ + *testingpod.MakePod("pod-already-succeeded", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + StatusPhase(corev1.PodSucceeded). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("pod-gated", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + wantCounts: []counts{ + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r1", + }, + Count: 1, + }, + }, + }, } for name, tc := range testCases { @@ -323,24 +829,24 @@ func TestReconcile(t *testing.T) { } } topologyUngater := newTopologyUngater(kClient) + key := client.ObjectKeyFromObject(&tc.workloads[0]) + request := reconcile.Request{NamespacedName: key} + _, err := topologyUngater.Reconcile(ctx, request) - reconcileRequest := reconcile.Request{ - NamespacedName: client.ObjectKeyFromObject(&tc.workloads[0]), - } - - _, err := topologyUngater.Reconcile(ctx, reconcileRequest) - - if diff := gocmp.Diff(tc.wantErr, err, cmpopts.EquateErrors(), cmpopts.EquateEmpty()); diff != "" { + if diff := gocmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" { t.Errorf("Reconcile returned error (-want,+got):\n%s", diff) } var gotPods corev1.PodList if err := kClient.List(ctx, &gotPods); err != nil { - if tc.wantPods != nil || !apierrors.IsNotFound(err) { + if !apierrors.IsNotFound(err) { t.Fatalf("Could not get Pod after reconcile: %v", err) } } + // don't assert on the node selector directly, because the Pod + // assignments to domains may differ, depending on the order of + // listing the pods by the client. extPodCmpOpts := append(podCmpOpts, cmpopts.IgnoreFields(corev1.PodSpec{}, "NodeSelector")) if diff := gocmp.Diff(tc.wantPods, gotPods.Items, extPodCmpOpts...); diff != "" { t.Errorf("Pods after reconcile (-want,+got):\n%s", diff) diff --git a/pkg/util/testingjobs/pod/wrappers.go b/pkg/util/testingjobs/pod/wrappers.go index 054d6ad1a1..4bef1e7d6f 100644 --- a/pkg/util/testingjobs/pod/wrappers.go +++ b/pkg/util/testingjobs/pod/wrappers.go @@ -130,19 +130,17 @@ func (p *PodWrapper) RoleHash(h string) *PodWrapper { // KueueSchedulingGate adds kueue scheduling gate to the Pod func (p *PodWrapper) KueueSchedulingGate() *PodWrapper { - if p.Spec.SchedulingGates == nil { - p.Spec.SchedulingGates = make([]corev1.PodSchedulingGate, 0) - } - p.Spec.SchedulingGates = append(p.Spec.SchedulingGates, corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}) - return p + return p.Gate("kueue.x-k8s.io/admission") } // TopologySchedulingGate adds kueue scheduling gate to the Pod func (p *PodWrapper) TopologySchedulingGate() *PodWrapper { - if p.Spec.SchedulingGates == nil { - p.Spec.SchedulingGates = make([]corev1.PodSchedulingGate, 0) - } - utilpod.Gate(&p.Pod, kueuealpha.TopologySchedulingGate) + return p.Gate(kueuealpha.TopologySchedulingGate) +} + +// Gate adds kueue scheduling gate to the Pod by the gate name +func (p *PodWrapper) Gate(gateName string) *PodWrapper { + utilpod.Gate(&p.Pod, gateName) return p }