Skip to content

Commit

Permalink
fix workqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
shaofan-hs authored and shaofan-hs committed Nov 30, 2023
1 parent 266398d commit 91d29b6
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 224 deletions.
29 changes: 18 additions & 11 deletions controller/workqueue/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const (
Expand All @@ -36,14 +37,14 @@ var (
DefaultNumOfPriorityLotteries = []int{1, 2, 4, 8, 16}
)

func DefaultGetPriorityFuncBuilder(cli client.Client) GetPriorityFunc {
return GetPriorityFuncBuilder(cli, DefaultWorkQueuePriority, DefaultWorkQueuePriorityLabel)
func DefaultGetPriorityFuncBuilder(cli client.Client, objectGetter func() client.Object) GetPriorityFunc {
return GetPriorityFuncBuilder(cli, objectGetter, DefaultWorkQueuePriorityLabel, DefaultWorkQueuePriority)
}

// GetPriorityFunc is the function to get the priority of an item
// We use the label to get the priority of an item
// If the label is not set in the item, we will get the priority from the namespace label
func GetPriorityFuncBuilder(cli client.Client, defaultWorkQueuePriority int, workQueuePriorityLabel string) GetPriorityFunc {
func GetPriorityFuncBuilder(cli client.Client, objectGetter func() client.Object, workQueuePriorityLabel string, defaultWorkQueuePriority int) GetPriorityFunc {
if cli == nil {
panic("cli is required")
}
Expand All @@ -52,26 +53,32 @@ func GetPriorityFuncBuilder(cli client.Client, defaultWorkQueuePriority int, wor
}

return func(item interface{}) int {
clientObject, ok := item.(client.Object)
req, ok := item.(reconcile.Request)
if !ok {
return defaultWorkQueuePriority
}

object := objectGetter()
err := cli.Get(context.Background(), req.NamespacedName, object)
if err != nil {
klog.Errorf("Failed to get object: %v, error: %v", req.NamespacedName, err)
return defaultWorkQueuePriority
}

var priorityLableValue string
labels := clientObject.GetLabels()
labels := object.GetLabels()
if len(labels) != 0 {
priorityLableValue = labels[workQueuePriorityLabel]
}

if priorityLableValue == "" {
name := clientObject.GetNamespace()
if name == "" {
if req.Namespace == "" {
return defaultWorkQueuePriority
}

namespace := &corev1.Namespace{}
if err := cli.Get(context.Background(), client.ObjectKey{Name: name}, namespace); err != nil {
klog.V(4).ErrorS(err, "Failed to get namespace", "namespace", name)
if err := cli.Get(context.Background(), client.ObjectKey{Name: req.Namespace}, namespace); err != nil {
klog.Errorf("Failed to get namespace: %v, error: %v", req.Namespace, err)
return defaultWorkQueuePriority
} else {
labels := namespace.GetLabels()
Expand All @@ -88,7 +95,7 @@ func GetPriorityFuncBuilder(cli client.Client, defaultWorkQueuePriority int, wor

priority, err := strconv.Atoi(priorityLableValue)
if err != nil {
klog.V(4).ErrorS(err, "Failed to convert label value to int", "priorityLableValue", priorityLableValue)
klog.Errorf("Failed to convert label value: %q to int, error: %v", priorityLableValue, err)
return defaultWorkQueuePriority
}
return priority
Expand Down
266 changes: 156 additions & 110 deletions controller/workqueue/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,162 +17,208 @@
package workqueue

import (
"context"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("Test defaults", func() {
const (
timeout = time.Second * 3
interval = time.Millisecond * 100

testConfigmap = "test1"
testConfigmap = "configmap1"
testNamespace = "default"
)

var (
configmapData = map[string]string{"hello": "world"}
objectGetter = func() client.Object {
return &corev1.ConfigMap{}
}
req = reconcile.Request{NamespacedName: client.ObjectKey{
Name: testConfigmap,
Namespace: testNamespace,
}}
)

Context("Use default workqueue priority", func() {
It("Should get default workqueue priority if the item has no labels", func() {
getPriorityFunc := DefaultGetPriorityFuncBuilder(k8sClient)
configmap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: testConfigmap,
Namespace: testNamespace,
},
Data: configmapData,
}
err := ensureConfigmap(k8sClient, testNamespace, testConfigmap, DefaultWorkQueuePriorityLabel, "")
Expect(err).NotTo(HaveOccurred())

priority := getPriorityFunc(configmap)
getPriorityFunc := DefaultGetPriorityFuncBuilder(k8sClient, objectGetter)
priority := getPriorityFunc(req)
Expect(priority).To(Equal(DefaultWorkQueuePriority))
})

It("Should get workqueue priority from item labels", func() {
getPriorityFunc := DefaultGetPriorityFuncBuilder(k8sClient)
configmap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: testConfigmap,
Namespace: testNamespace,
Labels: map[string]string{
DefaultWorkQueuePriorityLabel: "3",
},
},
Data: configmapData,
}
It("Should get workqueue priority from default item priority label", func() {
err := ensureConfigmap(k8sClient, testNamespace, testConfigmap, DefaultWorkQueuePriorityLabel, "3")
Expect(err).NotTo(HaveOccurred())

priority := getPriorityFunc(configmap)
getPriorityFunc := DefaultGetPriorityFuncBuilder(k8sClient, objectGetter)
priority := getPriorityFunc(req)
Expect(priority).To(Equal(3))
})

It("Should get workqueue priority from namesapce labels", func() {
getPriorityFunc := DefaultGetPriorityFuncBuilder(k8sClient)
configmap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: testConfigmap,
Namespace: testNamespace,
},
}
It("Should get workqueue priority from default namesapce priority label", func() {
err := ensureConfigmap(k8sClient, testNamespace, testConfigmap, DefaultWorkQueuePriorityLabel, "")
Expect(err).NotTo(HaveOccurred())

namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: testNamespace,
Labels: map[string]string{
DefaultWorkQueuePriorityLabel: "4",
},
},
}
err := k8sClient.Update(ctx, namespace)
err = ensureNamespace(k8sClient, testNamespace, DefaultWorkQueuePriorityLabel, "4")
Expect(err).NotTo(HaveOccurred())

Eventually(func() bool {
namespace := &corev1.Namespace{}
if err := k8sClient.Get(ctx, client.ObjectKey{Name: testNamespace}, namespace); err != nil {
return false
}
if namespace.Labels == nil {
return false
}
return namespace.Labels[DefaultWorkQueuePriorityLabel] == "4"
}, timeout, interval).Should(BeTrue())

priority := getPriorityFunc(configmap)
getPriorityFunc := DefaultGetPriorityFuncBuilder(k8sClient, objectGetter)
priority := getPriorityFunc(req)
Expect(priority).To(Equal(4))
})
})

Context("Use custom workqueue priority", func() {
It("Should get custom workqueue priority if the item has no labels", func() {
getPriorityFunc := GetPriorityFuncBuilder(k8sClient, 1, "custom-priority")
configmap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: testConfigmap,
Namespace: testNamespace,
},
Data: configmapData,
}
It("Should get default workqueue priority if the item has no labels", func() {
err := ensureConfigmap(k8sClient, testNamespace, testConfigmap, "custom-priority", "")
Expect(err).NotTo(HaveOccurred())

priority := getPriorityFunc(configmap)
getPriorityFunc := GetPriorityFuncBuilder(k8sClient, objectGetter, "custom-priority", 1)
priority := getPriorityFunc(req)
Expect(priority).To(Equal(1))
})

It("Should get custom workqueue priority from item labels", func() {
getPriorityFunc := GetPriorityFuncBuilder(k8sClient, 1, "custom-priority")
configmap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: testConfigmap,
Namespace: testNamespace,
Labels: map[string]string{
"custom-priority": "2",
},
},
Data: configmapData,
}
It("Should get workqueue priority from custom item priority label", func() {
err := ensureConfigmap(k8sClient, testNamespace, testConfigmap, "custom-priority", "3")
Expect(err).NotTo(HaveOccurred())

priority := getPriorityFunc(configmap)
Expect(priority).To(Equal(2))
getPriorityFunc := GetPriorityFuncBuilder(k8sClient, objectGetter, "custom-priority", 1)
priority := getPriorityFunc(req)
Expect(priority).To(Equal(3))
})

It("Should get custom workqueue priority from namesapce labels", func() {
getPriorityFunc := GetPriorityFuncBuilder(k8sClient, 1, "custom-priority")
configmap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: testConfigmap,
Namespace: testNamespace,
},
}
It("Should get workqueue priority from custom namesapce priority label", func() {
err := ensureConfigmap(k8sClient, testNamespace, testConfigmap, "custom-priority", "")
Expect(err).NotTo(HaveOccurred())

namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: testNamespace,
Labels: map[string]string{
"custom-priority": "3",
},
},
}
err := k8sClient.Update(ctx, namespace)
err = ensureNamespace(k8sClient, testNamespace, "custom-priority", "4")
Expect(err).NotTo(HaveOccurred())

Eventually(func() bool {
namespace := &corev1.Namespace{}
if err := k8sClient.Get(ctx, client.ObjectKey{Name: testNamespace}, namespace); err != nil {
return false
}
if namespace.Labels == nil {
return false
}
return namespace.Labels["custom-priority"] == "3"
}, timeout, interval).Should(BeTrue())

priority := getPriorityFunc(configmap)
Expect(priority).To(Equal(3))
getPriorityFunc := GetPriorityFuncBuilder(k8sClient, objectGetter, "custom-priority", 1)
priority := getPriorityFunc(req)
Expect(priority).To(Equal(4))
})
})
})

func ensureConfigmap(cli client.Client, namespace, name, priorityLabelKey, priorityLabelValue string) error {
// Ensure the configmap exists
configmap := &corev1.ConfigMap{}
err := cli.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, configmap)
if err != nil {
if errors.IsNotFound(err) {
labels := map[string]string{}
if priorityLabelValue != "" {
labels[priorityLabelKey] = priorityLabelValue
}

err := cli.Create(context.Background(), &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: labels,
},
})
return err
}
return err
}

// If the label is already set, we don't need to update it
labelValue, ok := configmap.Labels[priorityLabelKey]
if !ok && priorityLabelValue == "" {
return nil
} else if ok && labelValue == priorityLabelValue {
return nil
}

// If the label is not set, we need to set it
if priorityLabelValue == "" {
configmap.Labels = map[string]string{}
} else {
configmap.Labels = map[string]string{
priorityLabelKey: priorityLabelValue,
}
}

// Update the configmap
err = cli.Update(context.Background(), configmap)
if err != nil {
return err
}
Eventually(func() bool {
configmap1 := &corev1.ConfigMap{}
err := cli.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, configmap1)
if err != nil {
return false
}
return configmap1.ResourceVersion >= configmap.ResourceVersion
}, time.Second*3, time.Millisecond*100).Should(BeTrue())

return nil
}

func ensureNamespace(cli client.Client, name, priorityLabelKey, priorityLabelValue string) error {
// Ensure the namespace exists
namespace := &corev1.Namespace{}
err := cli.Get(context.Background(), client.ObjectKey{Name: name}, namespace)
if err != nil {
if errors.IsNotFound(err) {
labels := map[string]string{}
if priorityLabelValue != "" {
labels[priorityLabelKey] = priorityLabelValue
}
err := cli.Create(context.Background(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
},
})
return err
}
return err
}

// If the label is already set, we don't need to update it
labelValue, ok := namespace.Labels[priorityLabelKey]
if !ok && priorityLabelValue == "" {
return nil
} else if ok && labelValue == priorityLabelValue {
return nil
}

// If the label is not set, we need to set it
if priorityLabelValue == "" {
namespace.Labels = map[string]string{}
} else {
namespace.Labels = map[string]string{
priorityLabelKey: priorityLabelValue,
}
}

// Update the namespace
err = cli.Update(context.Background(), namespace)
if err != nil {
return err
}
Eventually(func() bool {
namespace1 := &corev1.Namespace{}
err := cli.Get(context.Background(), client.ObjectKey{Name: name}, namespace1)
if err != nil {
return false
}
return namespace1.ResourceVersion >= namespace.ResourceVersion
}, time.Second*3, time.Millisecond*100).Should(BeTrue())

return nil
}
Loading

0 comments on commit 91d29b6

Please sign in to comment.