Skip to content

Commit

Permalink
Merge pull request kubernetes#512 from alculquicondor/fix-requeue-ina…
Browse files Browse the repository at this point in the history
…dmissible

Atomically release quota and queue inadmissible workloads
  • Loading branch information
k8s-ci-robot authored Jan 13, 2023
2 parents 01a101e + a599211 commit 6c4779f
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 37 deletions.
47 changes: 30 additions & 17 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,18 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
// the state is unknown, the workload could have been assumed and we need
// to clear it from the cache.
if wl.Spec.Admission != nil || e.DeleteStateUnknown {
if err := r.cache.DeleteWorkload(wl); err != nil {
if !e.DeleteStateUnknown {
log.Error(err, "Failed to delete workload from cache")
}
}

// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Delete the workload from cache while holding the queues lock
// to guarantee that requeueued workloads are taken into account before
// the next scheduling cycle.
if err := r.cache.DeleteWorkload(wl); err != nil {
if !e.DeleteStateUnknown {
log.Error(err, "Failed to delete workload from cache")
}
}
})
}

// Even if the state is unknown, the last cached state tells us whether the
Expand Down Expand Up @@ -204,13 +208,18 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {

switch {
case status == finished:
if err := r.cache.DeleteWorkload(oldWl); err != nil && prevStatus == admitted {
log.Error(err, "Failed to delete workload from cache")
}
r.queues.DeleteWorkload(oldWl)
// The workload could have been in the queues if we missed an event.
r.queues.DeleteWorkload(wl)

// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Delete the workload from cache while holding the queues lock
// to guarantee that requeueued workloads are taken into account before
// the next scheduling cycle.
if err := r.cache.DeleteWorkload(oldWl); err != nil && prevStatus == admitted {
log.Error(err, "Failed to delete workload from cache")
}
})

case prevStatus == pending && status == pending:
if !r.queues.UpdateWorkload(oldWl, wlCopy) {
Expand All @@ -224,11 +233,15 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
}

case prevStatus == admitted && status == pending:
if err := r.cache.DeleteWorkload(oldWl); err != nil {
log.Error(err, "Failed to delete workload from cache")
}
// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, oldWl, func() {
// Delete the workload from cache while holding the queues lock
// to guarantee that requeueued workloads are taken into account before
// the next scheduling cycle.
if err := r.cache.DeleteWorkload(oldWl); err != nil {
log.Error(err, "Failed to delete workload from cache")
}
})

if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
Expand Down
20 changes: 12 additions & 8 deletions pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,19 +341,23 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.Workload, qKey
}
}

// QueueAssociatedInadmissibleWorkloads moves all associated workloads from
// inadmissibleWorkloads to heap. If at least one workload is moved,
// returns true. Otherwise returns false.
func (m *Manager) QueueAssociatedInadmissibleWorkloads(ctx context.Context, w *kueue.Workload) {
// QueueAssociatedInadmissibleWorkloadsAfter requeues into the heaps all
// previously inadmissible workloads in the same ClusterQueue and cohort (if
// they exist) as the provided admitted workload to the heaps.
// An optional action can be executed at the beginning of the function,
// while holding the lock, to provide atomicity with the operations in the
// queues.
func (m *Manager) QueueAssociatedInadmissibleWorkloadsAfter(ctx context.Context, w *kueue.Workload, action func()) {
m.Lock()
defer m.Unlock()
if action != nil {
action()
}

q := m.localQueues[workload.QueueKey(w)]
if q == nil {
if w.Spec.Admission == nil {
return
}

cq := m.clusterQueues[q.ClusterQueue]
cq := m.clusterQueues[string(w.Spec.Admission.ClusterQueue)]
if cq == nil {
return
}
Expand Down
38 changes: 26 additions & 12 deletions test/integration/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,27 +154,41 @@ var _ = ginkgo.Describe("Scheduler", func() {
ginkgo.It("Should admit workloads according to their priorities", func() {
queue := testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj()

lowPriorityVal, highPriorityVal := int32(10), int32(100)

wlLowPriority := testing.MakeWorkload("wl-low-priority", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "5").Priority(lowPriorityVal).Obj()
gomega.Expect(k8sClient.Create(ctx, wlLowPriority)).Should(gomega.Succeed())
wlHighPriority := testing.MakeWorkload("wl-high-priority", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "5").Priority(highPriorityVal).Obj()
gomega.Expect(k8sClient.Create(ctx, wlHighPriority)).Should(gomega.Succeed())
const lowPrio, midPrio, highPrio = 0, 10, 100

wlLow := testing.MakeWorkload("wl-low-priority", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(lowPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlLow)).Should(gomega.Succeed())
wlMid1 := testing.MakeWorkload("wl-mid-priority-1", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(midPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlMid1)).Should(gomega.Succeed())
wlMid2 := testing.MakeWorkload("wl-mid-priority-2", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(midPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlMid2)).Should(gomega.Succeed())
wlHigh1 := testing.MakeWorkload("wl-high-priority-1", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(highPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlHigh1)).Should(gomega.Succeed())
wlHigh2 := testing.MakeWorkload("wl-high-priority-2", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(highPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlHigh2)).Should(gomega.Succeed())

util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0)
util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 0)
// delay creating the queue until after workloads are created.
gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed())

ginkgo.By("checking the workload with low priority does not get admitted")
util.ExpectWorkloadsToBePending(ctx, k8sClient, wlLowPriority)
ginkgo.By("checking the workloads with lower priority do not get admitted")
util.ExpectWorkloadsToBePending(ctx, k8sClient, wlLow, wlMid1, wlMid2)

ginkgo.By("checking the workloads with high priority get admitted")
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, prodClusterQ.Name, wlHigh1, wlHigh2)

util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 3)
util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 2)
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 2)

ginkgo.By("checking the workload with high priority gets admitted")
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, prodClusterQ.Name, wlHighPriority)
ginkgo.By("after the high priority workloads finish, only the mid priority workloads should be admitted")
util.FinishWorkloads(ctx, k8sClient, wlHigh1, wlHigh2)

util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, prodClusterQ.Name, wlMid1, wlMid2)
util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 1)
util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1)
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 1)
util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 2)
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 4)
})

ginkgo.It("Should admit two small workloads after a big one finishes", func() {
Expand Down

0 comments on commit 6c4779f

Please sign in to comment.