Skip to content

Commit

Permalink
TAS: Add integration tests for job.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbobrovskyi committed Oct 31, 2024
1 parent 10e5abf commit cf4506d
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 16 deletions.
189 changes: 188 additions & 1 deletion test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
"sigs.k8s.io/kueue/pkg/workload"
Expand Down Expand Up @@ -1134,7 +1136,7 @@ var _ = ginkgo.Describe("Interacting with scheduler", ginkgo.Ordered, ginkgo.Con
)

ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerAndControllersSetup(true, nil))
fwk.StartManager(ctx, cfg, managerAndControllersSetup(false, true, nil))
})
ginkgo.AfterAll(func() {
fwk.StopManager(ctx)
Expand Down Expand Up @@ -1957,6 +1959,7 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe
},
}
fwk.StartManager(ctx, cfg, managerAndControllersSetup(
false,
false,
&configapi.Configuration{WaitForPodsReady: waitForPodsReady},
jobframework.WithWaitForPodsReady(waitForPodsReady),
Expand Down Expand Up @@ -2158,6 +2161,190 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe
})
})

var _ = ginkgo.Describe("Job controller when TopologyAwareScheduling enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
const (
tasBlockLabel = "cloud.com/topology-block"
tasRackLabel = "cloud.com/topology-rack"
)

var (
ns *corev1.Namespace
nodes []corev1.Node
topology *kueuealpha.Topology
tasFlavor *kueue.ResourceFlavor
clusterQueue *kueue.ClusterQueue
localQueue *kueue.LocalQueue
)

ginkgo.BeforeAll(func() {
fwk.StartManager(ctx, cfg, managerAndControllersSetup(true, true, nil))
})

ginkgo.AfterAll(func() {
fwk.StopManager(ctx)
})

ginkgo.BeforeEach(func() {
features.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), features.TopologyAwareScheduling, true)

ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "tas-job-",
},
}
gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed())

nodes = []corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "b1-r1",
Labels: map[string]string{
"node-group": "tas",
tasBlockLabel: "b1",
tasRackLabel: "r1",
},
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
},
},
}
for _, node := range nodes {
gomega.Expect(k8sClient.Create(ctx, &node)).Should(gomega.Succeed())
gomega.Expect(k8sClient.Status().Update(ctx, &node)).Should(gomega.Succeed())
}

topology = testing.MakeTopology("default").Levels([]string{
tasBlockLabel,
tasRackLabel,
}).Obj()
gomega.Expect(k8sClient.Create(ctx, topology)).Should(gomega.Succeed())

tasFlavor = testing.MakeResourceFlavor("tas-flavor").
NodeLabel("node-group", "tas").
TopologyName("default").Obj()
gomega.Expect(k8sClient.Create(ctx, tasFlavor)).Should(gomega.Succeed())

clusterQueue = testing.MakeClusterQueue("cluster-queue").
ResourceGroup(*testing.MakeFlavorQuotas(tasFlavor.Name).Resource(corev1.ResourceCPU, "5").Obj()).
Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed())
util.ExpectClusterQueuesToBeActive(ctx, k8sClient, clusterQueue)

localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectObjectToBeDeleted(ctx, k8sClient, clusterQueue, true)
util.ExpectObjectToBeDeleted(ctx, k8sClient, tasFlavor, true)
gomega.Expect(util.DeleteObject(ctx, k8sClient, topology)).Should(gomega.Succeed())
for _, node := range nodes {
util.ExpectObjectToBeDeleted(ctx, k8sClient, &node, true)
}
})

ginkgo.It("should admit workload which fits in a required topology domain", func() {
job := testingjob.MakeJob("job", ns.Name).
Queue(localQueue.Name).
PodAnnotation(kueuealpha.PodSetRequiredTopologyAnnotation, tasRackLabel).
Request(corev1.ResourceCPU, "1").
Obj()
ginkgo.By("creating a job which requires rack", func() {
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
})

wl := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: ns.Name}

ginkgo.By("verify the workload is created", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlLookupKey, wl)).Should(gomega.Succeed())
g.Expect(wl.Spec.PodSets).Should(gomega.BeComparableTo([]kueue.PodSet{{
Name: "main",
Count: 1,
TopologyRequest: &kueue.PodSetTopologyRequest{
Required: ptr.To(tasRackLabel),
},
}}, cmpopts.IgnoreFields(kueue.PodSet{}, "Template")))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("verify the workload is admitted", func() {
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl)
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
})

ginkgo.By("verify admission for the workload", func() {
wl := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: ns.Name}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlLookupKey, wl)).Should(gomega.Succeed())
g.Expect(wl.Status.Admission).ShouldNot(gomega.BeNil())
g.Expect(wl.Status.Admission.PodSetAssignments).Should(gomega.HaveLen(1))
g.Expect(wl.Status.Admission.PodSetAssignments[0].TopologyAssignment).Should(gomega.BeComparableTo(
&kueue.TopologyAssignment{
Levels: []string{tasBlockLabel, tasRackLabel},
Domains: []kueue.TopologyDomainAssignment{{Count: 1, Values: []string{"b1", "r1"}}},
},
))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("should admit workload which fits in a preferred topology domain", func() {
job := testingjob.MakeJob("job", ns.Name).
Queue(localQueue.Name).
PodAnnotation(kueuealpha.PodSetPreferredTopologyAnnotation, tasBlockLabel).
Request(corev1.ResourceCPU, "1").
Obj()
ginkgo.By("creating a job which preferred block", func() {
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
})

wl := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: ns.Name}

ginkgo.By("verify the workload is created", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlLookupKey, wl)).Should(gomega.Succeed())
g.Expect(wl.Spec.PodSets).Should(gomega.BeComparableTo([]kueue.PodSet{{
Name: "main",
Count: 1,
TopologyRequest: &kueue.PodSetTopologyRequest{
Preferred: ptr.To(tasBlockLabel),
},
}}, cmpopts.IgnoreFields(kueue.PodSet{}, "Template")))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("verify the workload is admitted", func() {
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl)
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
})

ginkgo.By("verify admission for the workload", func() {
wl := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: ns.Name}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlLookupKey, wl)).Should(gomega.Succeed())
g.Expect(wl.Status.Admission).ShouldNot(gomega.BeNil())
g.Expect(wl.Status.Admission.PodSetAssignments).Should(gomega.HaveLen(1))
g.Expect(wl.Status.Admission.PodSetAssignments[0].TopologyAssignment).Should(gomega.BeComparableTo(
&kueue.TopologyAssignment{
Levels: []string{tasBlockLabel, tasRackLabel},
Domains: []kueue.TopologyDomainAssignment{{Count: 1, Values: []string{"b1", "r1"}}},
},
))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})
})

func expectJobUnsuspendedWithNodeSelectors(key types.NamespacedName, ns map[string]string) {
job := &batchv1.Job{}
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
Expand Down
30 changes: 15 additions & 15 deletions test/integration/controller/jobs/job/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/job"
"sigs.k8s.io/kueue/pkg/controller/tas"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/scheduler"
"sigs.k8s.io/kueue/test/integration/framework"
Expand Down Expand Up @@ -85,30 +86,29 @@ func managerSetup(opts ...jobframework.Option) framework.ManagerSetup {
}
}

func managerAndControllersSetup(enableScheduler bool, configuration *config.Configuration, opts ...jobframework.Option) framework.ManagerSetup {
func managerAndControllersSetup(
setupTAS bool,
enableScheduler bool,
configuration *config.Configuration,
opts ...jobframework.Option,
) framework.ManagerSetup {
return func(ctx context.Context, mgr manager.Manager) {
err := indexer.Setup(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())

cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

managerSetup(opts...)(ctx, mgr)
if configuration == nil {
configuration = &config.Configuration{}
}
mgr.GetScheme().Default(configuration)

cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = job.SetupIndexes(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = job.NewReconciler(mgr.GetClient(),
mgr.GetEventRecorderFor(constants.JobControllerName), opts...).SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = job.SetupWebhook(mgr, opts...)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
jobframework.EnableIntegration(job.FrameworkName)
if setupTAS {
failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "TAS controller", failedCtrl)
}

if enableScheduler {
sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName))
Expand Down

0 comments on commit cf4506d

Please sign in to comment.