Skip to content

Commit

Permalink
Atomically free quota and queue inadmissible workloads
Browse files Browse the repository at this point in the history
Change-Id: I009a498577f7843b1ab0eeb2661a52ade37d1460
  • Loading branch information
alculquicondor committed Jan 12, 2023
1 parent 01a101e commit a599211
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 37 deletions.
47 changes: 30 additions & 17 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,18 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
// the state is unknown, the workload could have been assumed and we need
// to clear it from the cache.
if wl.Spec.Admission != nil || e.DeleteStateUnknown {
if err := r.cache.DeleteWorkload(wl); err != nil {
if !e.DeleteStateUnknown {
log.Error(err, "Failed to delete workload from cache")
}
}

// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Delete the workload from cache while holding the queues lock
// to guarantee that requeueued workloads are taken into account before
// the next scheduling cycle.
if err := r.cache.DeleteWorkload(wl); err != nil {
if !e.DeleteStateUnknown {
log.Error(err, "Failed to delete workload from cache")
}
}
})
}

// Even if the state is unknown, the last cached state tells us whether the
Expand Down Expand Up @@ -204,13 +208,18 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {

switch {
case status == finished:
if err := r.cache.DeleteWorkload(oldWl); err != nil && prevStatus == admitted {
log.Error(err, "Failed to delete workload from cache")
}
r.queues.DeleteWorkload(oldWl)
// The workload could have been in the queues if we missed an event.
r.queues.DeleteWorkload(wl)

// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Delete the workload from cache while holding the queues lock
// to guarantee that requeueued workloads are taken into account before
// the next scheduling cycle.
if err := r.cache.DeleteWorkload(oldWl); err != nil && prevStatus == admitted {
log.Error(err, "Failed to delete workload from cache")
}
})

case prevStatus == pending && status == pending:
if !r.queues.UpdateWorkload(oldWl, wlCopy) {
Expand All @@ -224,11 +233,15 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
}

case prevStatus == admitted && status == pending:
if err := r.cache.DeleteWorkload(oldWl); err != nil {
log.Error(err, "Failed to delete workload from cache")
}
// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, oldWl, func() {
// Delete the workload from cache while holding the queues lock
// to guarantee that requeueued workloads are taken into account before
// the next scheduling cycle.
if err := r.cache.DeleteWorkload(oldWl); err != nil {
log.Error(err, "Failed to delete workload from cache")
}
})

if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
Expand Down
20 changes: 12 additions & 8 deletions pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,19 +341,23 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.Workload, qKey
}
}

// QueueAssociatedInadmissibleWorkloads moves all associated workloads from
// inadmissibleWorkloads to heap. If at least one workload is moved,
// returns true. Otherwise returns false.
func (m *Manager) QueueAssociatedInadmissibleWorkloads(ctx context.Context, w *kueue.Workload) {
// QueueAssociatedInadmissibleWorkloadsAfter requeues into the heaps all
// previously inadmissible workloads in the same ClusterQueue and cohort (if
// they exist) as the provided admitted workload to the heaps.
// An optional action can be executed at the beginning of the function,
// while holding the lock, to provide atomicity with the operations in the
// queues.
func (m *Manager) QueueAssociatedInadmissibleWorkloadsAfter(ctx context.Context, w *kueue.Workload, action func()) {
m.Lock()
defer m.Unlock()
if action != nil {
action()
}

q := m.localQueues[workload.QueueKey(w)]
if q == nil {
if w.Spec.Admission == nil {
return
}

cq := m.clusterQueues[q.ClusterQueue]
cq := m.clusterQueues[string(w.Spec.Admission.ClusterQueue)]
if cq == nil {
return
}
Expand Down
38 changes: 26 additions & 12 deletions test/integration/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,27 +154,41 @@ var _ = ginkgo.Describe("Scheduler", func() {
ginkgo.It("Should admit workloads according to their priorities", func() {
queue := testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj()

lowPriorityVal, highPriorityVal := int32(10), int32(100)

wlLowPriority := testing.MakeWorkload("wl-low-priority", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "5").Priority(lowPriorityVal).Obj()
gomega.Expect(k8sClient.Create(ctx, wlLowPriority)).Should(gomega.Succeed())
wlHighPriority := testing.MakeWorkload("wl-high-priority", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "5").Priority(highPriorityVal).Obj()
gomega.Expect(k8sClient.Create(ctx, wlHighPriority)).Should(gomega.Succeed())
const lowPrio, midPrio, highPrio = 0, 10, 100

wlLow := testing.MakeWorkload("wl-low-priority", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(lowPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlLow)).Should(gomega.Succeed())
wlMid1 := testing.MakeWorkload("wl-mid-priority-1", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(midPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlMid1)).Should(gomega.Succeed())
wlMid2 := testing.MakeWorkload("wl-mid-priority-2", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(midPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlMid2)).Should(gomega.Succeed())
wlHigh1 := testing.MakeWorkload("wl-high-priority-1", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(highPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlHigh1)).Should(gomega.Succeed())
wlHigh2 := testing.MakeWorkload("wl-high-priority-2", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2").Priority(highPrio).Obj()
gomega.Expect(k8sClient.Create(ctx, wlHigh2)).Should(gomega.Succeed())

util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0)
util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 0)
// delay creating the queue until after workloads are created.
gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed())

ginkgo.By("checking the workload with low priority does not get admitted")
util.ExpectWorkloadsToBePending(ctx, k8sClient, wlLowPriority)
ginkgo.By("checking the workloads with lower priority do not get admitted")
util.ExpectWorkloadsToBePending(ctx, k8sClient, wlLow, wlMid1, wlMid2)

ginkgo.By("checking the workloads with high priority get admitted")
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, prodClusterQ.Name, wlHigh1, wlHigh2)

util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 3)
util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 2)
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 2)

ginkgo.By("checking the workload with high priority gets admitted")
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, prodClusterQ.Name, wlHighPriority)
ginkgo.By("after the high priority workloads finish, only the mid priority workloads should be admitted")
util.FinishWorkloads(ctx, k8sClient, wlHigh1, wlHigh2)

util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, prodClusterQ.Name, wlMid1, wlMid2)
util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 1)
util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1)
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 1)
util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 2)
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 4)
})

ginkgo.It("Should admit two small workloads after a big one finishes", func() {
Expand Down

0 comments on commit a599211

Please sign in to comment.