diff --git a/pkg/controller/core/indexer/indexer.go b/pkg/controller/core/indexer/indexer.go index 1318fe74f58..af29f33f3aa 100644 --- a/pkg/controller/core/indexer/indexer.go +++ b/pkg/controller/core/indexer/indexer.go @@ -21,6 +21,8 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -31,6 +33,8 @@ const ( WorkloadClusterQueueKey = "status.admission.clusterQueue" QueueClusterQueueKey = "spec.clusterQueue" LimitRangeHasContainerType = "spec.hasContainerType" + WorkloadAdmittedKey = "status.admitted" + WorkloadRuntimeClassKey = "spec.runtimeClass" ) func IndexQueueClusterQueue(obj client.Object) []string { @@ -74,6 +78,41 @@ func IndexLimitRangeHasContainerType(obj client.Object) []string { return nil } +func IndexWorkloadAdmitted(obj client.Object) []string { + wl, ok := obj.(*kueue.Workload) + if !ok { + return nil + } + + cond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadAdmitted) + if cond == nil { + return []string{string(metav1.ConditionFalse)} + } + + return []string{string(cond.Status)} +} + +func IndexWorkloadRuntimeClass(obj client.Object) []string { + wl, ok := obj.(*kueue.Workload) + if !ok { + return nil + } + set := make(map[string]struct{}) + for _, ps := range wl.Spec.PodSets { + if ps.Template.Spec.RuntimeClassName != nil { + set[*ps.Template.Spec.RuntimeClassName] = struct{}{} + } + } + if len(set) > 0 { + ret := make([]string, 0, len(set)) + for k := range set { + ret = append(ret, k) + } + return ret + } + return nil +} + // Setup sets the index with the given fields for core apis. func Setup(ctx context.Context, indexer client.FieldIndexer) error { if err := indexer.IndexField(ctx, &kueue.Workload{}, WorkloadQueueKey, IndexWorkloadQueue); err != nil { @@ -82,6 +121,12 @@ func Setup(ctx context.Context, indexer client.FieldIndexer) error { if err := indexer.IndexField(ctx, &kueue.Workload{}, WorkloadClusterQueueKey, IndexWorkloadClusterQueue); err != nil { return fmt.Errorf("setting index on clusterQueue for Workload: %w", err) } + if err := indexer.IndexField(ctx, &kueue.Workload{}, WorkloadAdmittedKey, IndexWorkloadAdmitted); err != nil { + return fmt.Errorf("setting index on admitted for Workload: %w", err) + } + if err := indexer.IndexField(ctx, &kueue.Workload{}, WorkloadRuntimeClassKey, IndexWorkloadRuntimeClass); err != nil { + return fmt.Errorf("setting index on runtimeClass for Workload: %w", err) + } if err := indexer.IndexField(ctx, &kueue.LocalQueue{}, QueueClusterQueueKey, IndexQueueClusterQueue); err != nil { return fmt.Errorf("setting index on clusterQueue for localQueue: %w", err) } diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 330af2d01be..c68578daa1c 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -27,11 +27,13 @@ import ( apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/source" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" @@ -193,7 +195,10 @@ func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req c } func (r *WorkloadReconciler) Create(e event.CreateEvent) bool { - wl := e.Object.(*kueue.Workload) + wl, isWorkload := e.Object.(*kueue.Workload) + if !isWorkload { + return true + } defer r.notifyWatchers(wl) status := workloadStatus(wl) log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status) @@ -204,9 +209,7 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool { } wlCopy := wl.DeepCopy() - handlePodOverhead(r.log, wlCopy, r.client) - r.handlePodLimitRange(log, wlCopy) - r.handleLimitsToRequests(wlCopy) + r.adjustResources(log, wlCopy) if wl.Status.Admission == nil { if !r.queues.AddOrUpdateWorkload(wlCopy) { @@ -222,7 +225,10 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool { } func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool { - wl := e.Object.(*kueue.Workload) + wl, isWorkload := e.Object.(*kueue.Workload) + if !isWorkload { + return true + } defer r.notifyWatchers(wl) status := "unknown" if !e.DeleteStateUnknown { @@ -258,7 +264,10 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool { } func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { - oldWl := e.ObjectOld.(*kueue.Workload) + oldWl, isWorkload := e.ObjectOld.(*kueue.Workload) + if !isWorkload { + return true + } wl := e.ObjectNew.(*kueue.Workload) defer r.notifyWatchers(oldWl) defer r.notifyWatchers(wl) @@ -285,9 +294,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { wlCopy := wl.DeepCopy() // We do not handle old workload here as it will be deleted or replaced by new one anyway. - handlePodOverhead(r.log, wlCopy, r.client) - r.handlePodLimitRange(log, wlCopy) - r.handleLimitsToRequests(wlCopy) + r.adjustResources(log, wlCopy) switch { case status == finished: @@ -359,8 +366,13 @@ func (r *WorkloadReconciler) notifyWatchers(wl *kueue.Workload) { // SetupWithManager sets up the controller with the Manager. func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { + ruh := &resourceUpdatesHandler{ + r: r, + } return ctrl.NewControllerManagedBy(mgr). For(&kueue.Workload{}). + Watches(&source.Kind{Type: &corev1.LimitRange{}}, ruh). + Watches(&source.Kind{Type: &nodev1.RuntimeClass{}}, ruh). WithEventFilter(r). Complete(r) } @@ -417,14 +429,14 @@ func workloadStatus(w *kueue.Workload) string { // As a result, the pod's Overhead is not always correct. E.g. if we set a non-existent runtime class name to // `pod.Spec.RuntimeClassName` and we also set the `pod.Spec.Overhead`, in real world, the pod creation will be // rejected due to the mismatch with RuntimeClass. However, in the future we assume that they are correct. -func handlePodOverhead(log logr.Logger, wl *kueue.Workload, c client.Client) { +func (r *WorkloadReconciler) handlePodOverhead(log logr.Logger, wl *kueue.Workload) { ctx := context.Background() for i := range wl.Spec.PodSets { podSpec := &wl.Spec.PodSets[i].Template.Spec if podSpec.RuntimeClassName != nil && len(podSpec.Overhead) == 0 { var runtimeClass nodev1.RuntimeClass - if err := c.Get(ctx, types.NamespacedName{Name: *podSpec.RuntimeClassName}, &runtimeClass); err != nil { + if err := r.client.Get(ctx, types.NamespacedName{Name: *podSpec.RuntimeClassName}, &runtimeClass); err != nil { log.Error(err, "Could not get RuntimeClass") continue } @@ -481,3 +493,74 @@ func (r *WorkloadReconciler) handleLimitsToRequests(wl *kueue.Workload) { } } } + +func (r *WorkloadReconciler) adjustResources(log logr.Logger, wl *kueue.Workload) { + r.handlePodOverhead(log, wl) + r.handlePodLimitRange(log, wl) + r.handleLimitsToRequests(wl) +} + +type resourceUpdatesHandler struct { + r *WorkloadReconciler +} + +func (h *resourceUpdatesHandler) Create(e event.CreateEvent, q workqueue.RateLimitingInterface) { + //TODO: the eventHandler should get a context soon, and this could be dropped + // https://github.com/kubernetes-sigs/controller-runtime/blob/master/pkg/handler/eventhandler.go + ctx := context.TODO() + log := ctrl.LoggerFrom(ctx) + log.V(5).Info("update resources event (create)") + h.handle(ctx, e.Object, q) +} + +func (h *resourceUpdatesHandler) Update(e event.UpdateEvent, q workqueue.RateLimitingInterface) { + ctx := context.TODO() + log := ctrl.LoggerFrom(ctx) + log.V(5).Info("update resources event (update)") + h.handle(ctx, e.ObjectNew, q) +} + +func (h *resourceUpdatesHandler) Delete(e event.DeleteEvent, q workqueue.RateLimitingInterface) { + ctx := context.TODO() + log := ctrl.LoggerFrom(ctx) + log.V(5).Info("update resources event (delete)") + h.handle(ctx, e.Object, q) +} + +func (h *resourceUpdatesHandler) Generic(_ event.GenericEvent, _ workqueue.RateLimitingInterface) { +} + +func (h *resourceUpdatesHandler) handle(ctx context.Context, obj client.Object, q workqueue.RateLimitingInterface) { + switch v := obj.(type) { + case *corev1.LimitRange: + log := ctrl.LoggerFrom(ctx).WithValues("limitRange", klog.KObj(v)) + ctx = ctrl.LoggerInto(ctx, log) + h.queueReconcileForPending(ctx, q, client.InNamespace(v.Namespace)) + case *nodev1.RuntimeClass: + log := ctrl.LoggerFrom(ctx).WithValues("runtimeClass", klog.KObj(v)) + ctx = ctrl.LoggerInto(ctx, log) + h.queueReconcileForPending(ctx, q, client.MatchingFields{indexer.WorkloadRuntimeClassKey: v.Name}) + default: + panic(v) + } +} + +func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, q workqueue.RateLimitingInterface, opts ...client.ListOption) { + log := ctrl.LoggerFrom(ctx) + lst := kueue.WorkloadList{} + opts = append(opts, client.MatchingFields{indexer.WorkloadAdmittedKey: string(metav1.ConditionFalse)}) + err := h.r.client.List(ctx, &lst, opts...) + if err != nil { + log.Error(err, "Could not list pending workloads") + } + log.V(4).Info("Found pending workloads", "count", len(lst.Items)) + for _, w := range lst.Items { + wlCopy := w.DeepCopy() + log := log.WithValues("workload", klog.KObj(wlCopy)) + log.V(5).Info("Queue reconcile for") + h.r.adjustResources(log, wlCopy) + if !h.r.queues.AddOrUpdateWorkload(wlCopy) { + log.V(2).Info("Queue for workload didn't exist") + } + } +} diff --git a/test/integration/controller/core/workload_controller_test.go b/test/integration/controller/core/workload_controller_test.go index 01fdf49eada..9a5f9de9515 100644 --- a/test/integration/controller/core/workload_controller_test.go +++ b/test/integration/controller/core/workload_controller_test.go @@ -321,7 +321,6 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { }) ginkgo.AfterEach(func() { gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) - gomega.Expect(util.DeleteRuntimeClass(ctx, k8sClient, runtimeClass)).To(gomega.Succeed()) gomega.Expect(util.DeleteResourceFlavor(ctx, k8sClient, onDemandFlavor)).To(gomega.Succeed()) util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) }) @@ -424,7 +423,6 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { }) ginkgo.AfterEach(func() { gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) - gomega.Expect(util.DeleteRuntimeClass(ctx, k8sClient, runtimeClass)).To(gomega.Succeed()) gomega.Expect(util.DeleteResourceFlavor(ctx, k8sClient, onDemandFlavor)).To(gomega.Succeed()) util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) }) @@ -470,4 +468,185 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { }) }) }) + + ginkgo.When("When RuntimeClass is defined and change", func() { + ginkgo.BeforeEach(func() { + runtimeClass = testing.MakeRuntimeClass("kata", "bar-handler"). + PodOverhead(corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("2")}). + Obj() + gomega.Expect(k8sClient.Create(ctx, runtimeClass)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).To(gomega.Succeed()) + clusterQueue = testing.MakeClusterQueue("clusterqueue"). + ResourceGroup(*testing.MakeFlavorQuotas(onDemandFlavor.Name). + Resource(corev1.ResourceCPU, "5", "5").Obj()). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + gomega.Expect(util.DeleteRuntimeClass(ctx, k8sClient, runtimeClass)).To(gomega.Succeed()) + gomega.Expect(util.DeleteResourceFlavor(ctx, k8sClient, onDemandFlavor)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) + }) + + ginkgo.It("Should sync the resource requests with the new overhead", func() { + ginkgo.By("Create and wait for the first workload admission", func() { + wl = testing.MakeWorkload("one", ns.Name). + Queue(localQueue.Name). + Request(corev1.ResourceCPU, "1"). + RuntimeClass("kata"). + Obj() + gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) + + gomega.Eventually(func() bool { + read := kueue.Workload{} + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &read); err != nil { + return false + } + return apimeta.IsStatusConditionTrue(read.Status.Conditions, kueue.WorkloadAdmitted) + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + }) + + var wl2 *kueue.Workload + ginkgo.By("Create a second workload, should stay pending", func() { + wl2 = testing.MakeWorkload("two", ns.Name). + Queue(localQueue.Name). + Request(corev1.ResourceCPU, "1"). + RuntimeClass("kata"). + Obj() + gomega.Expect(k8sClient.Create(ctx, wl2)).To(gomega.Succeed()) + + gomega.Consistently(func() bool { + read := kueue.Workload{} + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl2), &read); err != nil { + return false + } + return apimeta.IsStatusConditionTrue(read.Status.Conditions, kueue.WorkloadAdmitted) + }, util.ConsistentDuration, util.Interval).Should(gomega.BeFalse()) + }) + + ginkgo.By("Decresing the runtimeClass", func() { + updatedRC := nodev1.RuntimeClass{} + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(runtimeClass), &updatedRC)).To(gomega.Succeed()) + updatedRC.Overhead.PodFixed[corev1.ResourceCPU] = resource.MustParse("1") + gomega.Expect(k8sClient.Update(ctx, &updatedRC)).To(gomega.Succeed()) + }) + + ginkgo.By("The second one should be also accepted", func() { + gomega.Eventually(func() bool { + read := kueue.Workload{} + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl2), &read); err != nil { + return false + } + return apimeta.IsStatusConditionTrue(read.Status.Conditions, kueue.WorkloadAdmitted) + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + }) + + ginkgo.By("Check queue resource consumption", func() { + gomega.Eventually(func() kueue.ClusterQueueStatus { + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) + return updatedCQ.Status + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ + PendingWorkloads: 0, + AdmittedWorkloads: 2, + FlavorsUsage: []kueue.FlavorUsage{{ + Name: kueue.ResourceFlavorReference(onDemandFlavor.Name), + Resources: []kueue.ResourceUsage{{ + Name: corev1.ResourceCPU, + Total: resource.MustParse("5"), + }}, + }}, + }, ignoreCqCondition)) + }) + }) + }) + ginkgo.When("When LimitRanges are defined and change", func() { + var limitRange *corev1.LimitRange + ginkgo.BeforeEach(func() { + limitRange = testing.MakeLimitRange("limits", ns.Name).WithValue("DefaultRequest", corev1.ResourceCPU, "3").Obj() + gomega.Expect(k8sClient.Create(ctx, limitRange)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).To(gomega.Succeed()) + clusterQueue = testing.MakeClusterQueue("clusterqueue"). + ResourceGroup(*testing.MakeFlavorQuotas(onDemandFlavor.Name). + Resource(corev1.ResourceCPU, "5", "5").Obj()). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + gomega.Expect(util.DeleteResourceFlavor(ctx, k8sClient, onDemandFlavor)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) + }) + + ginkgo.It("Should sync the resource requests with the limit", func() { + ginkgo.By("Create and wait for the first workload admission", func() { + wl = testing.MakeWorkload("one", ns.Name). + Queue(localQueue.Name). + Obj() + gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) + + gomega.Eventually(func() bool { + read := kueue.Workload{} + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &read); err != nil { + return false + } + return apimeta.IsStatusConditionTrue(read.Status.Conditions, kueue.WorkloadAdmitted) + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + }) + + var wl2 *kueue.Workload + ginkgo.By("Create a second workload, should stay pending", func() { + wl2 = testing.MakeWorkload("two", ns.Name). + Queue(localQueue.Name). + Obj() + gomega.Expect(k8sClient.Create(ctx, wl2)).To(gomega.Succeed()) + + gomega.Consistently(func() bool { + read := kueue.Workload{} + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl2), &read); err != nil { + return false + } + return apimeta.IsStatusConditionTrue(read.Status.Conditions, kueue.WorkloadAdmitted) + }, util.ConsistentDuration, util.Interval).Should(gomega.BeFalse()) + }) + + ginkgo.By("Decresing the limit's default", func() { + updatedLr := corev1.LimitRange{} + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(limitRange), &updatedLr)).To(gomega.Succeed()) + updatedLr.Spec.Limits[0].DefaultRequest[corev1.ResourceCPU] = resource.MustParse("2") + gomega.Expect(k8sClient.Update(ctx, &updatedLr)).To(gomega.Succeed()) + }) + + ginkgo.By("The second one should be also accepted", func() { + gomega.Eventually(func() bool { + read := kueue.Workload{} + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl2), &read); err != nil { + return false + } + return apimeta.IsStatusConditionTrue(read.Status.Conditions, kueue.WorkloadAdmitted) + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + }) + + ginkgo.By("Check queue resource consumption", func() { + gomega.Eventually(func() kueue.ClusterQueueStatus { + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) + return updatedCQ.Status + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ + PendingWorkloads: 0, + AdmittedWorkloads: 2, + FlavorsUsage: []kueue.FlavorUsage{{ + Name: kueue.ResourceFlavorReference(onDemandFlavor.Name), + Resources: []kueue.ResourceUsage{{ + Name: corev1.ResourceCPU, + Total: resource.MustParse("5"), + }}, + }}, + }, ignoreCqCondition)) + }) + }) + }) }) diff --git a/test/util/util.go b/test/util/util.go index 96ec91ae0c2..0f7d602626d 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -89,6 +89,10 @@ func DeleteNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) if err := DeleteWorkloadsInNamespace(ctx, c, ns); err != nil { return err } + err = c.DeleteAllOf(ctx, &corev1.LimitRange{}, client.InNamespace(ns.Name), client.PropagationPolicy(metav1.DeletePropagationBackground)) + if err != nil && !apierrors.IsNotFound(err) { + return err + } if err := c.Delete(ctx, ns); err != nil && !apierrors.IsNotFound(err) { return err }