Skip to content

Commit

Permalink
Fix flaky test: hold CQ until all workloads are observed (#1544)
Browse files Browse the repository at this point in the history
Change-Id: If255ea869a61c1ea5a96e9855c9047c6de422b01
  • Loading branch information
alculquicondor authored Jan 4, 2024
1 parent d5c83e3 commit 1579d75
Showing 1 changed file with 52 additions and 34 deletions.
86 changes: 52 additions & 34 deletions test/integration/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,22 @@ var _ = ginkgo.Describe("Scheduler", func() {
podsCountQueue *kueue.LocalQueue
podsCountOnlyQueue *kueue.LocalQueue
preemptionQueue *kueue.LocalQueue
cqsStopPolicy *kueue.StopPolicy
)

ginkgo.BeforeEach(func() {
ginkgo.JustBeforeEach(func() {
gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).To(gomega.Succeed())
gomega.Expect(k8sClient.Create(ctx, spotTaintedFlavor)).To(gomega.Succeed())
gomega.Expect(k8sClient.Create(ctx, spotUntaintedFlavor)).To(gomega.Succeed())
cqsStopPolicy := ptr.Deref(cqsStopPolicy, kueue.None)

prodClusterQ = testing.MakeClusterQueue("prod-cq").
ResourceGroup(
*testing.MakeFlavorQuotas("spot-tainted").Resource(corev1.ResourceCPU, "5", "5").Obj(),
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(),
).
Cohort("prod-cohort").
StopPolicy(cqsStopPolicy).
Obj()
gomega.Expect(k8sClient.Create(ctx, prodClusterQ)).Should(gomega.Succeed())

Expand All @@ -113,6 +116,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
*testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "5").Obj(),
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(),
).
StopPolicy(cqsStopPolicy).
Obj()
gomega.Expect(k8sClient.Create(ctx, devClusterQ)).Should(gomega.Succeed())

Expand All @@ -123,6 +127,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
Resource(corev1.ResourcePods, "5").
Obj(),
).
StopPolicy(cqsStopPolicy).
Obj()
gomega.Expect(k8sClient.Create(ctx, podsCountClusterQ)).Should(gomega.Succeed())

Expand All @@ -132,6 +137,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
Resource(corev1.ResourcePods, "5").
Obj(),
).
StopPolicy(cqsStopPolicy).
Obj()
gomega.Expect(k8sClient.Create(ctx, podsCountOnlyClusterQ)).Should(gomega.Succeed())

Expand All @@ -142,6 +148,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
StopPolicy(cqsStopPolicy).
Obj()
gomega.Expect(k8sClient.Create(ctx, preemptionClusterQ)).Should(gomega.Succeed())

Expand All @@ -161,7 +168,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
gomega.Expect(k8sClient.Create(ctx, preemptionQueue)).Should(gomega.Succeed())
})

ginkgo.AfterEach(func() {
ginkgo.JustAfterEach(func() {
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, prodClusterQ, true)
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, devClusterQ, true)
Expand Down Expand Up @@ -372,44 +379,55 @@ var _ = ginkgo.Describe("Scheduler", func() {
})
})

ginkgo.It("Should admit workloads according to their priorities", func() {
queue := testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj()
ginkgo.When("Hold at startup", func() {
ginkgo.BeforeEach(func() {
cqsStopPolicy = ptr.To(kueue.Hold)
})
ginkgo.AfterEach(func() {
cqsStopPolicy = nil
})
ginkgo.It("Should admit workloads according to their priorities", func() {
const lowPrio, midPrio, highPrio = 0, 10, 100

wlLow := testing.MakeWorkload("wl-low-priority", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Priority(lowPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlLow)).Should(gomega.Succeed())
wlMid1 := testing.MakeWorkload("wl-mid-priority-1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Priority(midPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlMid1)).Should(gomega.Succeed())
wlMid2 := testing.MakeWorkload("wl-mid-priority-2", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Priority(midPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlMid2)).Should(gomega.Succeed())
wlHigh1 := testing.MakeWorkload("wl-high-priority-1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Priority(highPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlHigh1)).Should(gomega.Succeed())
wlHigh2 := testing.MakeWorkload("wl-high-priority-2", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Priority(highPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlHigh2)).Should(gomega.Succeed())

util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 5)
util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 0)

const lowPrio, midPrio, highPrio = 0, 10, 100
gomega.Eventually(func() error {
var cq kueue.ClusterQueue
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodClusterQ), &cq)).To(gomega.Succeed())
cq.Spec.StopPolicy = ptr.To(kueue.None)
return k8sClient.Update(ctx, &cq)
}, util.Timeout, util.Interval).Should(gomega.Succeed())

wlLow := testing.MakeWorkload("wl-low-priority", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(lowPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlLow)).Should(gomega.Succeed())
wlMid1 := testing.MakeWorkload("wl-mid-priority-1", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(midPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlMid1)).Should(gomega.Succeed())
wlMid2 := testing.MakeWorkload("wl-mid-priority-2", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(midPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlMid2)).Should(gomega.Succeed())
wlHigh1 := testing.MakeWorkload("wl-high-priority-1", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(highPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlHigh1)).Should(gomega.Succeed())
wlHigh2 := testing.MakeWorkload("wl-high-priority-2", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(highPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlHigh2)).Should(gomega.Succeed())
ginkgo.By("checking the workloads with lower priority do not get admitted")
util.ExpectWorkloadsToBePending(ctx, k8sClient, wlLow, wlMid1, wlMid2)

util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 0)
// delay creating the queue until after workloads are created.
gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed())
ginkgo.By("checking the workloads with high priority get admitted")
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wlHigh1, wlHigh2)

ginkgo.By("checking the workloads with lower priority do not get admitted")
util.ExpectWorkloadsToBePending(ctx, k8sClient, wlLow, wlMid1, wlMid2)
util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 3)
util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2)
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 2)

ginkgo.By("checking the workloads with high priority get admitted")
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wlHigh1, wlHigh2)
ginkgo.By("after the high priority workloads finish, only the mid priority workloads should be admitted")
util.FinishWorkloads(ctx, k8sClient, wlHigh1, wlHigh2)

util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 3)
util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2)
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 2)

ginkgo.By("after the high priority workloads finish, only the mid priority workloads should be admitted")
util.FinishWorkloads(ctx, k8sClient, wlHigh1, wlHigh2)

util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wlMid1, wlMid2)
util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 1)
util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2)
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 4)
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wlMid1, wlMid2)
util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 1)
util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2)
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 4)
})
})

ginkgo.It("Should admit two small workloads after a big one finishes", func() {
Expand Down

0 comments on commit 1579d75

Please sign in to comment.