From ac02e3a8515d2bfd711cac7efa4f58bad5460a8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Wo=C5=BAniak?= Date: Wed, 20 Nov 2024 11:24:54 +0100 Subject: [PATCH] TAS: support rank-based ordering for JobSet (#3591) * TAS: support rank-based ordering for JobSet * Update test/e2e/tas/jobset_test.go Co-authored-by: Patryk Bundyra * Update test/e2e/tas/jobset_test.go Co-authored-by: Patryk Bundyra --------- Co-authored-by: Patryk Bundyra --- pkg/controller/tas/topology_ungater.go | 96 ++++- pkg/controller/tas/topology_ungater_test.go | 388 +++++++++++++++++++- pkg/util/testingjobs/jobset/wrappers.go | 13 + test/e2e/tas/jobset_test.go | 171 +++++++++ 4 files changed, 648 insertions(+), 20 deletions(-) create mode 100644 test/e2e/tas/jobset_test.go diff --git a/pkg/controller/tas/topology_ungater.go b/pkg/controller/tas/topology_ungater.go index af9502606d..f0f1fdaeb4 100644 --- a/pkg/controller/tas/topology_ungater.go +++ b/pkg/controller/tas/topology_ungater.go @@ -19,6 +19,7 @@ package tas import ( "context" "errors" + "fmt" "slices" "strconv" "strings" @@ -38,6 +39,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" configapi "sigs.k8s.io/kueue/apis/config/v1beta1" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" @@ -57,6 +59,11 @@ const ( ungateBatchPeriod = time.Second ) +type replicatedJobsInfo struct { + replicasCount int + jobIndexLabel string +} + var ( errPendingUngateOps = errors.New("pending ungate operations") ) @@ -396,39 +403,92 @@ func assignGatedPodsToDomainsGreedy( func readRanksIfAvailable(log logr.Logger, psa *kueue.PodSetAssignment, pods []*corev1.Pod) (map[int]*corev1.Pod, bool) { + if len(pods) == 0 { + // If there are no pods then we are done. We do this special check to + // ensure we have at least one pod as the code below determines if + // rank-ordering is enabled based on the first Pod. + return nil, false + } + if podIndexLabel, rjInfo := determineRanksLookup(pods[0]); podIndexLabel != nil { + result, err := readRanksForLabels(psa, pods, *podIndexLabel, rjInfo) + if err != nil { + log.Error(err, "failed to read rank information from Pods") + return nil, false + } + return result, true + } + // couldn't determine the labels to lookup the Pod ranks + return nil, false +} + +func determineRanksLookup(pod *corev1.Pod) (*string, *replicatedJobsInfo) { + // Check if this is JobSet + if jobCount, _ := readIntFromLabel(pod, jobset.ReplicatedJobReplicas); jobCount != nil { + return ptr.To(batchv1.JobCompletionIndexAnnotation), &replicatedJobsInfo{ + jobIndexLabel: jobset.JobIndexKey, + replicasCount: *jobCount, + } + } + // Check if this is batch/Job + if _, found := pod.Labels[batchv1.JobCompletionIndexAnnotation]; found { + return ptr.To(batchv1.JobCompletionIndexAnnotation), nil + } + return nil, nil +} + +func readRanksForLabels( + psa *kueue.PodSetAssignment, + pods []*corev1.Pod, + podIndexLabel string, + rjInfo *replicatedJobsInfo, +) (map[int]*corev1.Pod, error) { result := make(map[int]*corev1.Pod, 0) - count := int(*psa.Count) + podSetSize := int(*psa.Count) for _, pod := range pods { - rank := readIntFromLabel(log, pod, batchv1.JobCompletionIndexAnnotation) - if rank == nil { + podIndex, err := readIntFromLabel(pod, podIndexLabel) + if err != nil { // the Pod has no rank information - ranks cannot be used - return nil, false + return nil, err } - if _, found := result[*rank]; found { - // there is a conflict in ranks, they cannot be used - return nil, false + rank := *podIndex + if rjInfo != nil { + jobIndex, err := readIntFromLabel(pod, rjInfo.jobIndexLabel) + if err != nil { + // the Pod has no Job index information - ranks cannot be used + return nil, err + } + singleJobSize := podSetSize / rjInfo.replicasCount + if *podIndex >= singleJobSize { + // the pod index exceeds size, this scenario is not + // supported by the rank-based ordering of pods. + return nil, fmt.Errorf("pod index %v of Pod %q exceeds the single Job size: %v", *podIndex, klog.KObj(pod), singleJobSize) + } + rank = *podIndex + *jobIndex*singleJobSize } - if *rank >= count { - // the rank exceeds parallelism, this scenario is not supported by - // the rank-based ordering of pods. - return nil, false + if rank >= podSetSize { + // the rank exceeds the PodSet size, this scenario is not supported + // by the rank-based ordering of pods. + return nil, fmt.Errorf("rank %v of Pod %q exceeds PodSet size %v", rank, klog.KObj(pod), podSetSize) } - result[*rank] = pod + if _, found := result[rank]; found { + // there is a conflict in ranks, they cannot be used + return nil, fmt.Errorf("conflicting rank %v found for pod %q", rank, klog.KObj(pod)) + } + result[rank] = pod } - return result, true + return result, nil } -func readIntFromLabel(log logr.Logger, pod *corev1.Pod, labelKey string) *int { +func readIntFromLabel(pod *corev1.Pod, labelKey string) (*int, error) { v, found := pod.Labels[labelKey] if !found { - return nil + return nil, fmt.Errorf("no label %q for Pod %q", labelKey, klog.KObj(pod)) } i, err := strconv.Atoi(v) if err != nil { - log.Error(err, "failed to parse index annotation", "value", v) - return nil + return nil, fmt.Errorf("failed to parse label value %q for Pod %q", v, klog.KObj(pod)) } - return ptr.To(i) + return ptr.To(i), nil } func isAdmittedByTAS(w *kueue.Workload) bool { diff --git a/pkg/controller/tas/topology_ungater_test.go b/pkg/controller/tas/topology_ungater_test.go index 3e526d98fb..d4b1042077 100644 --- a/pkg/controller/tas/topology_ungater_test.go +++ b/pkg/controller/tas/topology_ungater_test.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" "sigs.k8s.io/controller-runtime/pkg/reconcile" + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -1163,12 +1164,19 @@ func TestReconcile(t *testing.T) { }, }, { - Count: 2, + Count: 1, Values: []string{ "b1", "r2", }, }, + { + Count: 1, + Values: []string{ + "b2", + "r1", + }, + }, }, }). Obj(), @@ -1238,7 +1246,14 @@ func TestReconcile(t *testing.T) { tasBlockLabel: "b1", tasRackLabel: "r2", }, - Count: 2, + Count: 1, + }, + { + NodeSelector: map[string]string{ + tasBlockLabel: "b2", + tasRackLabel: "r1", + }, + Count: 1, }, }, }, @@ -1313,6 +1328,375 @@ func TestReconcile(t *testing.T) { }, }, }, + "ranks: support rank-based ordering for JobSet - for all Pods": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 4).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "4"). + AssignmentPodCount(4). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 2, + Values: []string{ + "b1", + "r1", + }, + }, + { + Count: 1, + Values: []string{ + "b1", + "r2", + }, + }, + { + Count: 1, + Values: []string{ + "b2", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("p0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "0"). + Label(jobset.JobIndexKey, "0"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + *testingpod.MakePod("p1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "0"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + *testingpod.MakePod("p2", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "0"). + Label(jobset.JobIndexKey, "1"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + *testingpod.MakePod("p3", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "1"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + cmpNS: true, + wantPods: []corev1.Pod{ + *testingpod.MakePod("p0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "0"). + Label(jobset.JobIndexKey, "0"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("p1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "0"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("p2", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "0"). + Label(jobset.JobIndexKey, "1"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r2"). + Obj(), + *testingpod.MakePod("p3", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "1"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b2"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + }, + wantCounts: []counts{ + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r1", + }, + Count: 2, + }, + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r2", + }, + Count: 1, + }, + { + NodeSelector: map[string]string{ + tasBlockLabel: "b2", + tasRackLabel: "r1", + }, + Count: 1, + }, + }, + }, + "ranks: support rank-based ordering for JobSet - some Pods already scheduled": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 4).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "4"). + AssignmentPodCount(4). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 2, + Values: []string{ + "b1", + "r1", + }, + }, + { + Count: 1, + Values: []string{ + "b1", + "r2", + }, + }, + { + Count: 1, + Values: []string{ + "b2", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("p0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "0"). + Label(jobset.JobIndexKey, "0"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + *testingpod.MakePod("p1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "0"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("p2", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "0"). + Label(jobset.JobIndexKey, "1"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r2"). + Obj(), + *testingpod.MakePod("p3", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "1"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + cmpNS: true, + wantPods: []corev1.Pod{ + *testingpod.MakePod("p0", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "0"). + Label(jobset.JobIndexKey, "0"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("p1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "0"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("p2", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "0"). + Label(jobset.JobIndexKey, "1"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r2"). + Obj(), + *testingpod.MakePod("p3", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "1"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b2"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + }, + wantCounts: []counts{ + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r1", + }, + Count: 2, + }, + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r2", + }, + Count: 1, + }, + { + NodeSelector: map[string]string{ + tasBlockLabel: "b2", + tasRackLabel: "r1", + }, + Count: 1, + }, + }, + }, + "ranks: support rank-based ordering for JobSet - only subset of pods is observed so far": { + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 4).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "4"). + AssignmentPodCount(4). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: defaultTestLevels, + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 2, + Values: []string{ + "b1", + "r1", + }, + }, + { + Count: 1, + Values: []string{ + "b1", + "r2", + }, + }, + { + Count: 1, + Values: []string{ + "b2", + "r1", + }, + }, + }, + }). + Obj(), + ). + Admitted(true). + Obj(), + }, + pods: []corev1.Pod{ + *testingpod.MakePod("p1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "0"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + *testingpod.MakePod("p3", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "1"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + TopologySchedulingGate(). + Obj(), + }, + cmpNS: true, + wantPods: []corev1.Pod{ + *testingpod.MakePod("p1", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "0"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b1"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + *testingpod.MakePod("p3", "ns"). + Annotation(kueuealpha.WorkloadAnnotation, "unit-test"). + Label(batchv1.JobCompletionIndexAnnotation, "1"). + Label(jobset.JobIndexKey, "1"). + Label(jobset.ReplicatedJobReplicas, "2"). + Label(kueuealpha.PodSetLabel, kueue.DefaultPodSetName). + NodeSelector(tasBlockLabel, "b2"). + NodeSelector(tasRackLabel, "r1"). + Obj(), + }, + wantCounts: []counts{ + { + NodeSelector: map[string]string{ + tasBlockLabel: "b1", + tasRackLabel: "r1", + }, + Count: 1, + }, + { + NodeSelector: map[string]string{ + tasBlockLabel: "b2", + tasRackLabel: "r1", + }, + Count: 1, + }, + }, + }, } for name, tc := range testCases { diff --git a/pkg/util/testingjobs/jobset/wrappers.go b/pkg/util/testingjobs/jobset/wrappers.go index 04fe364800..d257020bb9 100644 --- a/pkg/util/testingjobs/jobset/wrappers.go +++ b/pkg/util/testingjobs/jobset/wrappers.go @@ -135,6 +135,19 @@ func (j *JobSetWrapper) Request(replicatedJobName string, r corev1.ResourceName, return j } +// Limit adds a resource limit to the first container of the target replicatedJob. +func (j *JobSetWrapper) Limit(replicatedJobName string, r corev1.ResourceName, v string) *JobSetWrapper { + for i, replicatedJob := range j.Spec.ReplicatedJobs { + if replicatedJob.Name == replicatedJobName { + if j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[0].Resources.Limits == nil { + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[0].Resources.Limits = map[corev1.ResourceName]resource.Quantity{} + } + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[0].Resources.Limits[r] = resource.MustParse(v) + } + } + return j +} + // PriorityClass updates JobSet priorityclass. func (j *JobSetWrapper) PriorityClass(pc string) *JobSetWrapper { for i := range j.Spec.ReplicatedJobs { diff --git a/test/e2e/tas/jobset_test.go b/test/e2e/tas/jobset_test.go new file mode 100644 index 0000000000..b882078d05 --- /dev/null +++ b/test/e2e/tas/jobset_test.go @@ -0,0 +1,171 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tase2e + +import ( + "fmt" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" + + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/util/testing" + testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" + "sigs.k8s.io/kueue/test/util" +) + +var _ = ginkgo.Describe("TopologyAwareScheduling for JobSet", func() { + var ns *corev1.Namespace + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "e2e-tas-jobset-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + + ginkgo.When("Creating a JobSet", func() { + var ( + topology *kueuealpha.Topology + tasFlavor *kueue.ResourceFlavor + localQueue *kueue.LocalQueue + clusterQueue *kueue.ClusterQueue + ) + ginkgo.BeforeEach(func() { + topology = testing.MakeTopology("datacenter").Levels([]string{ + topologyLevelBlock, + topologyLevelRack, + topologyLevelHostname, + }).Obj() + gomega.Expect(k8sClient.Create(ctx, topology)).Should(gomega.Succeed()) + + tasFlavor = testing.MakeResourceFlavor("tas-flavor"). + NodeLabel(tasNodeGroupLabel, instanceType).TopologyName(topology.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, tasFlavor)).Should(gomega.Succeed()) + clusterQueue = testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas("tas-flavor"). + Resource(extraResource, "8"). + Obj(), + ). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed()) + util.ExpectClusterQueuesToBeActive(ctx, k8sClient, clusterQueue) + + localQueue = testing.MakeLocalQueue("main", ns.Name).ClusterQueue("cluster-queue").Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteAllJobsetsInNamespace(ctx, k8sClient, ns)).Should(gomega.Succeed()) + // Force remove workloads to be sure that cluster queue can be removed. + gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).Should(gomega.Succeed()) + gomega.Expect(util.DeleteObject(ctx, k8sClient, localQueue)).Should(gomega.Succeed()) + gomega.Expect(util.DeleteObject(ctx, k8sClient, topology)).Should(gomega.Succeed()) + util.ExpectObjectToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectObjectToBeDeleted(ctx, k8sClient, tasFlavor, true) + }) + + ginkgo.It("Should place pods based on the ranks-ordering", func() { + replicas := 3 + parallelism := 2 + numPods := replicas * parallelism + sampleJob := testingjobset.MakeJobSet("ranks-jobset", ns.Name). + Queue("main"). + ReplicatedJobs( + testingjobset.ReplicatedJobRequirements{ + Name: "replicated-job-1", + Replicas: int32(replicas), + Parallelism: int32(parallelism), + Completions: int32(parallelism), + Image: util.E2eTestSleepImage, + Args: []string{"60s"}, + PodAnnotations: map[string]string{ + kueuealpha.PodSetPreferredTopologyAnnotation: topologyLevelBlock, + }, + }, + ). + Request("replicated-job-1", extraResource, "1"). + Limit("replicated-job-1", extraResource, "1"). + Obj() + sampleJob.Annotations = map[string]string{constants.QueueLabel: localQueue.Name} + gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed()) + + ginkgo.By("JobSet is unsuspended", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(sampleJob), sampleJob)).To(gomega.Succeed()) + g.Expect(sampleJob.Spec.Suspend).Should(gomega.Equal(ptr.To(false))) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + pods := &corev1.PodList{} + ginkgo.By("ensure all pods are created", func() { + gomega.Eventually(func(g gomega.Gomega) { + gomega.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name))).To(gomega.Succeed()) + g.Expect(pods.Items).Should(gomega.HaveLen(numPods)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("ensure all pods are scheduled", func() { + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", ""), + } + gomega.Eventually(func(g gomega.Gomega) { + gomega.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name), listOpts)).To(gomega.Succeed()) + g.Expect(pods.Items).Should(gomega.HaveLen(numPods)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("verify the assignment of pods are as expected with rank-based ordering", func() { + gomega.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name))).To(gomega.Succeed()) + gotAssignment := readRankAssignmentsFromJobSetPods(pods.Items) + wantAssignment := map[string]string{ + "0/0": "kind-worker", + "0/1": "kind-worker2", + "1/0": "kind-worker3", + "1/1": "kind-worker4", + "2/0": "kind-worker5", + "2/1": "kind-worker6", + } + gomega.Expect(wantAssignment).Should(gomega.BeComparableTo(gotAssignment)) + }) + }) + }) +}) + +func readRankAssignmentsFromJobSetPods(pods []corev1.Pod) map[string]string { + assignment := make(map[string]string, len(pods)) + for _, pod := range pods { + podIndex := pod.Labels[batchv1.JobCompletionIndexAnnotation] + jobIndex := pod.Labels[jobset.JobIndexKey] + key := fmt.Sprintf("%v/%v", jobIndex, podIndex) + assignment[key] = pod.Spec.NodeName + } + return assignment +}