Skip to content

Commit

Permalink
[controller/workload] Trigger reconcile on resource change
Browse files Browse the repository at this point in the history
Update the queue info of all the relevant pending workloads,
whenever a limitRange or runtimeClass has changed.
  • Loading branch information
trasc committed Mar 20, 2023
1 parent a191123 commit 0490283
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 13 deletions.
45 changes: 45 additions & 0 deletions pkg/controller/core/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand All @@ -31,6 +33,8 @@ const (
WorkloadClusterQueueKey = "status.admission.clusterQueue"
QueueClusterQueueKey = "spec.clusterQueue"
LimitRangeHasContainerType = "spec.hasContainerType"
WorkloadAdmittedKey = "status.admitted"
WorkloadRuntimeClassKey = "spec.runtimeClass"
)

func IndexQueueClusterQueue(obj client.Object) []string {
Expand Down Expand Up @@ -74,6 +78,41 @@ func IndexLimitRangeHasContainerType(obj client.Object) []string {
return nil
}

func IndexWorkloadAdmitted(obj client.Object) []string {
wl, ok := obj.(*kueue.Workload)
if !ok {
return nil
}

cond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadAdmitted)
if cond == nil {
return []string{string(metav1.ConditionFalse)}
}

return []string{string(cond.Status)}
}

func IndexWorkloadRuntimeClass(obj client.Object) []string {
wl, ok := obj.(*kueue.Workload)
if !ok {
return nil
}
set := make(map[string]struct{})
for _, ps := range wl.Spec.PodSets {
if ps.Template.Spec.RuntimeClassName != nil {
set[*ps.Template.Spec.RuntimeClassName] = struct{}{}
}
}
if len(set) > 0 {
ret := make([]string, 0, len(set))
for k := range set {
ret = append(ret, k)
}
return ret
}
return nil
}

// Setup sets the index with the given fields for core apis.
func Setup(ctx context.Context, indexer client.FieldIndexer) error {
if err := indexer.IndexField(ctx, &kueue.Workload{}, WorkloadQueueKey, IndexWorkloadQueue); err != nil {
Expand All @@ -82,6 +121,12 @@ func Setup(ctx context.Context, indexer client.FieldIndexer) error {
if err := indexer.IndexField(ctx, &kueue.Workload{}, WorkloadClusterQueueKey, IndexWorkloadClusterQueue); err != nil {
return fmt.Errorf("setting index on clusterQueue for Workload: %w", err)
}
if err := indexer.IndexField(ctx, &kueue.Workload{}, WorkloadAdmittedKey, IndexWorkloadAdmitted); err != nil {
return fmt.Errorf("setting index on admitted for Workload: %w", err)
}
if err := indexer.IndexField(ctx, &kueue.Workload{}, WorkloadRuntimeClassKey, IndexWorkloadRuntimeClass); err != nil {
return fmt.Errorf("setting index on runtimeClass for Workload: %w", err)
}
if err := indexer.IndexField(ctx, &kueue.LocalQueue{}, QueueClusterQueueKey, IndexQueueClusterQueue); err != nil {
return fmt.Errorf("setting index on clusterQueue for localQueue: %w", err)
}
Expand Down
105 changes: 94 additions & 11 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/source"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
Expand Down Expand Up @@ -193,7 +195,10 @@ func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req c
}

func (r *WorkloadReconciler) Create(e event.CreateEvent) bool {
wl := e.Object.(*kueue.Workload)
wl, isWorkload := e.Object.(*kueue.Workload)
if !isWorkload {
return true
}
defer r.notifyWatchers(wl)
status := workloadStatus(wl)
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
Expand All @@ -204,9 +209,7 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool {
}

wlCopy := wl.DeepCopy()
handlePodOverhead(r.log, wlCopy, r.client)
r.handlePodLimitRange(log, wlCopy)
r.handleLimitsToRequests(wlCopy)
r.adjustResources(log, wlCopy)

if wl.Status.Admission == nil {
if !r.queues.AddOrUpdateWorkload(wlCopy) {
Expand All @@ -222,7 +225,10 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool {
}

func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
wl := e.Object.(*kueue.Workload)
wl, isWorkload := e.Object.(*kueue.Workload)
if !isWorkload {
return true
}
defer r.notifyWatchers(wl)
status := "unknown"
if !e.DeleteStateUnknown {
Expand Down Expand Up @@ -258,7 +264,10 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
}

func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
oldWl := e.ObjectOld.(*kueue.Workload)
oldWl, isWorkload := e.ObjectOld.(*kueue.Workload)
if !isWorkload {
return true
}
wl := e.ObjectNew.(*kueue.Workload)
defer r.notifyWatchers(oldWl)
defer r.notifyWatchers(wl)
Expand All @@ -285,9 +294,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {

wlCopy := wl.DeepCopy()
// We do not handle old workload here as it will be deleted or replaced by new one anyway.
handlePodOverhead(r.log, wlCopy, r.client)
r.handlePodLimitRange(log, wlCopy)
r.handleLimitsToRequests(wlCopy)
r.adjustResources(log, wlCopy)

switch {
case status == finished:
Expand Down Expand Up @@ -359,8 +366,13 @@ func (r *WorkloadReconciler) notifyWatchers(wl *kueue.Workload) {

// SetupWithManager sets up the controller with the Manager.
func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
ruh := &resourceUpdatesHandler{
r: r,
}
return ctrl.NewControllerManagedBy(mgr).
For(&kueue.Workload{}).
Watches(&source.Kind{Type: &corev1.LimitRange{}}, ruh).
Watches(&source.Kind{Type: &nodev1.RuntimeClass{}}, ruh).
WithEventFilter(r).
Complete(r)
}
Expand Down Expand Up @@ -417,14 +429,14 @@ func workloadStatus(w *kueue.Workload) string {
// As a result, the pod's Overhead is not always correct. E.g. if we set a non-existent runtime class name to
// `pod.Spec.RuntimeClassName` and we also set the `pod.Spec.Overhead`, in real world, the pod creation will be
// rejected due to the mismatch with RuntimeClass. However, in the future we assume that they are correct.
func handlePodOverhead(log logr.Logger, wl *kueue.Workload, c client.Client) {
func (r *WorkloadReconciler) handlePodOverhead(log logr.Logger, wl *kueue.Workload) {
ctx := context.Background()

for i := range wl.Spec.PodSets {
podSpec := &wl.Spec.PodSets[i].Template.Spec
if podSpec.RuntimeClassName != nil && len(podSpec.Overhead) == 0 {
var runtimeClass nodev1.RuntimeClass
if err := c.Get(ctx, types.NamespacedName{Name: *podSpec.RuntimeClassName}, &runtimeClass); err != nil {
if err := r.client.Get(ctx, types.NamespacedName{Name: *podSpec.RuntimeClassName}, &runtimeClass); err != nil {
log.Error(err, "Could not get RuntimeClass")
continue
}
Expand Down Expand Up @@ -481,3 +493,74 @@ func (r *WorkloadReconciler) handleLimitsToRequests(wl *kueue.Workload) {
}
}
}

func (r *WorkloadReconciler) adjustResources(log logr.Logger, wl *kueue.Workload) {
r.handlePodOverhead(log, wl)
r.handlePodLimitRange(log, wl)
r.handleLimitsToRequests(wl)
}

type resourceUpdatesHandler struct {
r *WorkloadReconciler
}

func (h *resourceUpdatesHandler) Create(e event.CreateEvent, q workqueue.RateLimitingInterface) {
//TODO: the eventHandler should get a context soon, and this could be dropped
// https://github.com/kubernetes-sigs/controller-runtime/blob/master/pkg/handler/eventhandler.go
ctx := context.TODO()
log := ctrl.LoggerFrom(ctx)
log.V(5).Info("update resources event (create)")
h.handle(ctx, e.Object, q)
}

func (h *resourceUpdatesHandler) Update(e event.UpdateEvent, q workqueue.RateLimitingInterface) {
ctx := context.TODO()
log := ctrl.LoggerFrom(ctx)
log.V(5).Info("update resources event (update)")
h.handle(ctx, e.ObjectNew, q)
}

func (h *resourceUpdatesHandler) Delete(e event.DeleteEvent, q workqueue.RateLimitingInterface) {
ctx := context.TODO()
log := ctrl.LoggerFrom(ctx)
log.V(5).Info("update resources event (delete)")
h.handle(ctx, e.Object, q)
}

func (h *resourceUpdatesHandler) Generic(_ event.GenericEvent, _ workqueue.RateLimitingInterface) {
}

func (h *resourceUpdatesHandler) handle(ctx context.Context, obj client.Object, q workqueue.RateLimitingInterface) {
switch v := obj.(type) {
case *corev1.LimitRange:
log := ctrl.LoggerFrom(ctx).WithValues("limitRange", klog.KObj(v))
ctx = ctrl.LoggerInto(ctx, log)
h.queueReconcileForPending(ctx, q, client.InNamespace(v.Namespace))
case *nodev1.RuntimeClass:
log := ctrl.LoggerFrom(ctx).WithValues("runtimeClass", klog.KObj(v))
ctx = ctrl.LoggerInto(ctx, log)
h.queueReconcileForPending(ctx, q, client.MatchingFields{indexer.WorkloadRuntimeClassKey: v.Name})
default:
panic(v)
}
}

func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, q workqueue.RateLimitingInterface, opts ...client.ListOption) {
log := ctrl.LoggerFrom(ctx)
lst := kueue.WorkloadList{}
opts = append(opts, client.MatchingFields{indexer.WorkloadAdmittedKey: string(metav1.ConditionFalse)})
err := h.r.client.List(ctx, &lst, opts...)
if err != nil {
log.Error(err, "Could not list pending workloads")
}
log.V(4).Info("Found pending workloads", "count", len(lst.Items))
for _, w := range lst.Items {
wlCopy := w.DeepCopy()
log := log.WithValues("workload", klog.KObj(wlCopy))
log.V(5).Info("Queue reconcile for")
h.r.adjustResources(log, wlCopy)
if !h.r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist")
}
}
}
Loading

0 comments on commit 0490283

Please sign in to comment.