diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index aa89e453c0..0bf50b1fdc 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -34,10 +34,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" configapi "sigs.k8s.io/kueue/apis/config/v1beta1" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" "sigs.k8s.io/kueue/pkg/workload" @@ -1134,7 +1136,7 @@ var _ = ginkgo.Describe("Interacting with scheduler", ginkgo.Ordered, ginkgo.Con ) ginkgo.BeforeAll(func() { - fwk.StartManager(ctx, cfg, managerAndControllersSetup(true, nil)) + fwk.StartManager(ctx, cfg, managerAndControllersSetup(false, true, nil)) }) ginkgo.AfterAll(func() { fwk.StopManager(ctx) @@ -1957,6 +1959,7 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe }, } fwk.StartManager(ctx, cfg, managerAndControllersSetup( + false, false, &configapi.Configuration{WaitForPodsReady: waitForPodsReady}, jobframework.WithWaitForPodsReady(waitForPodsReady), @@ -2158,6 +2161,140 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe }) }) +var _ = ginkgo.Describe("Job controller when TopologyAwareScheduling enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + const ( + nodeGroupLabel = "node-group" + tasBlockLabel = "cloud.com/topology-block" + ) + + var ( + ns *corev1.Namespace + nodes []corev1.Node + topology *kueuealpha.Topology + tasFlavor *kueue.ResourceFlavor + clusterQueue *kueue.ClusterQueue + localQueue *kueue.LocalQueue + ) + + ginkgo.BeforeAll(func() { + fwk.StartManager(ctx, cfg, managerAndControllersSetup(true, true, nil)) + }) + + ginkgo.AfterAll(func() { + fwk.StopManager(ctx) + }) + + ginkgo.BeforeEach(func() { + features.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), features.TopologyAwareScheduling, true) + + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "tas-job-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + + nodes = []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "b1", + Labels: map[string]string{ + nodeGroupLabel: "tas", + tasBlockLabel: "b1", + }, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + }, + } + for _, node := range nodes { + gomega.Expect(k8sClient.Create(ctx, &node)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Status().Update(ctx, &node)).Should(gomega.Succeed()) + } + + topology = testing.MakeTopology("default").Levels([]string{ + tasBlockLabel, + }).Obj() + gomega.Expect(k8sClient.Create(ctx, topology)).Should(gomega.Succeed()) + + tasFlavor = testing.MakeResourceFlavor("tas-flavor"). + NodeLabel(nodeGroupLabel, "tas"). + TopologyName("default").Obj() + gomega.Expect(k8sClient.Create(ctx, tasFlavor)).Should(gomega.Succeed()) + + clusterQueue = testing.MakeClusterQueue("cluster-queue"). + ResourceGroup(*testing.MakeFlavorQuotas(tasFlavor.Name).Resource(corev1.ResourceCPU, "5").Obj()). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed()) + util.ExpectClusterQueuesToBeActive(ctx, k8sClient, clusterQueue) + + localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectObjectToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectObjectToBeDeleted(ctx, k8sClient, tasFlavor, true) + gomega.Expect(util.DeleteObject(ctx, k8sClient, topology)).Should(gomega.Succeed()) + for _, node := range nodes { + util.ExpectObjectToBeDeleted(ctx, k8sClient, &node, true) + } + }) + + ginkgo.It("should admit workload which fits in a required topology domain", func() { + job := testingjob.MakeJob("job", ns.Name). + Queue(localQueue.Name). + PodAnnotation(kueuealpha.PodSetRequiredTopologyAnnotation, tasBlockLabel). + Request(corev1.ResourceCPU, "1"). + Obj() + ginkgo.By("creating a job which requires block", func() { + gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) + }) + + wl := &kueue.Workload{} + wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: ns.Name} + + ginkgo.By("verify the workload is created", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlLookupKey, wl)).Should(gomega.Succeed()) + g.Expect(wl.Spec.PodSets).Should(gomega.BeComparableTo([]kueue.PodSet{{ + Name: "main", + Count: 1, + TopologyRequest: &kueue.PodSetTopologyRequest{ + Required: ptr.To(tasBlockLabel), + }, + }}, cmpopts.IgnoreFields(kueue.PodSet{}, "Template"))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("verify the workload is admitted", func() { + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) + }) + + ginkgo.By("verify admission for the workload", func() { + wl := &kueue.Workload{} + wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: ns.Name} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlLookupKey, wl)).Should(gomega.Succeed()) + g.Expect(wl.Status.Admission).ShouldNot(gomega.BeNil()) + g.Expect(wl.Status.Admission.PodSetAssignments).Should(gomega.HaveLen(1)) + g.Expect(wl.Status.Admission.PodSetAssignments[0].TopologyAssignment).Should(gomega.BeComparableTo( + &kueue.TopologyAssignment{ + Levels: []string{tasBlockLabel}, + Domains: []kueue.TopologyDomainAssignment{{Count: 1, Values: []string{"b1"}}}, + }, + )) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) +}) + func expectJobUnsuspendedWithNodeSelectors(key types.NamespacedName, ns map[string]string) { job := &batchv1.Job{} gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { diff --git a/test/integration/controller/jobs/job/suite_test.go b/test/integration/controller/jobs/job/suite_test.go index 3e60ec7799..0dff9d4d26 100644 --- a/test/integration/controller/jobs/job/suite_test.go +++ b/test/integration/controller/jobs/job/suite_test.go @@ -34,6 +34,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/job" + "sigs.k8s.io/kueue/pkg/controller/tas" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/scheduler" "sigs.k8s.io/kueue/test/integration/framework" @@ -85,30 +86,29 @@ func managerSetup(opts ...jobframework.Option) framework.ManagerSetup { } } -func managerAndControllersSetup(enableScheduler bool, configuration *config.Configuration, opts ...jobframework.Option) framework.ManagerSetup { +func managerAndControllersSetup( + setupTASControllers bool, + enableScheduler bool, + configuration *config.Configuration, + opts ...jobframework.Option, +) framework.ManagerSetup { return func(ctx context.Context, mgr manager.Manager) { - err := indexer.Setup(ctx, mgr.GetFieldIndexer()) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - - cCache := cache.New(mgr.GetClient()) - queues := queue.NewManager(mgr.GetClient(), cCache) - + managerSetup(opts...)(ctx, mgr) if configuration == nil { configuration = &config.Configuration{} } mgr.GetScheme().Default(configuration) + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) - err = job.SetupIndexes(ctx, mgr.GetFieldIndexer()) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - err = job.NewReconciler(mgr.GetClient(), - mgr.GetEventRecorderFor(constants.JobControllerName), opts...).SetupWithManager(mgr) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - err = job.SetupWebhook(mgr, opts...) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - jobframework.EnableIntegration(job.FrameworkName) + if setupTASControllers { + failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, configuration) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "TAS controller", failedCtrl) + } if enableScheduler { sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName))