Skip to content

Commit

Permalink
optimize tagging controller workqueue handling
Browse files Browse the repository at this point in the history
  • Loading branch information
kmala committed Jan 23, 2025
1 parent 933cfde commit 2271e31
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 64 deletions.
17 changes: 2 additions & 15 deletions pkg/controllers/tagging/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,15 @@ limitations under the License.
package tagging

import (
"sync"

"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"sync"
)

var register sync.Once

var (
workItemDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Name: "cloudprovider_aws_tagging_controller_work_item_duration_seconds",
Help: "workitem latency of workitem being in the queue and time it takes to process",
StabilityLevel: metrics.ALPHA,
Buckets: metrics.ExponentialBuckets(0.5, 1.5, 20),
},
[]string{"latency_type"})

workItemError = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "cloudprovider_aws_tagging_controller_work_item_errors_total",
Expand All @@ -43,15 +35,10 @@ var (
// registerMetrics registers tagging-controller metrics.
func registerMetrics() {
register.Do(func() {
legacyregistry.MustRegister(workItemDuration)
legacyregistry.MustRegister(workItemError)
})
}

func recordWorkItemLatencyMetrics(latencyType string, timeTaken float64) {
workItemDuration.With(metrics.Labels{"latency_type": latencyType}).Observe(timeTaken)
}

func recordWorkItemErrorMetrics(errorType string, instanceID string) {
workItemError.With(metrics.Labels{"error_type": errorType, "instance_id": instanceID}).Inc()
}
100 changes: 57 additions & 43 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
Expand All @@ -42,16 +43,21 @@ func init() {
registerMetrics()
}

// workItem contains the node and an action for that node
// taggingControllerNode contains the node details required for tag/untag of node resources.
type taggingControllerNode struct {
providerID string
name string
}

// workItem contains the node name, provider id and an action for that node.
type workItem struct {
node *v1.Node
action func(node *v1.Node) error
requeuingCount int
enqueueTime time.Time
name string
providerID string
action string
}

func (w workItem) String() string {
return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime)
return fmt.Sprintf("[Node: %s, Action: %s]", w.name, w.action)
}

const (
Expand All @@ -62,17 +68,15 @@ const (
// The label for depicting total number of errors a work item encounter and succeed
totalErrorsWorkItemErrorMetric = "total_errors"

// The label for depicting total time when work item gets queued to processed
workItemProcessingTimeWorkItemMetric = "work_item_processing_time"

// The label for depicting total time when work item gets queued to dequeued
workItemDequeuingTimeWorkItemMetric = "work_item_dequeuing_time"

// The label for depicting total number of errors a work item encounter and fail
errorsAfterRetriesExhaustedWorkItemErrorMetric = "errors_after_retries_exhausted"

// The period of time after Node creation to retry tagging due to eventual consistency of the CreateTags API.
newNodeEventualConsistencyGracePeriod = time.Minute * 5

addTag = "ADD"

deleteTag = "DELETE"
)

// Controller is the controller implementation for tagging cluster resources.
Expand Down Expand Up @@ -150,7 +154,7 @@ func NewTaggingController(
tc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*v1.Node)
tc.enqueueNode(node, tc.tagNodesResources)
tc.enqueueNode(node, addTag)
},
UpdateFunc: func(oldObj, newObj interface{}) {
node := newObj.(*v1.Node)
Expand All @@ -163,11 +167,11 @@ func NewTaggingController(
return
}

tc.enqueueNode(node, tc.tagNodesResources)
tc.enqueueNode(node, addTag)
},
DeleteFunc: func(obj interface{}) {
node := obj.(*v1.Node)
tc.enqueueNode(node, tc.untagNodeResources)
tc.enqueueNode(node, deleteTag)
},
})

Expand Down Expand Up @@ -213,21 +217,17 @@ func (tc *Controller) process() bool {
err := func(obj interface{}) error {
defer tc.workqueue.Done(obj)

workItem, ok := obj.(*workItem)
workItem, ok := obj.(workItem)
if !ok {
tc.workqueue.Forget(obj)
err := fmt.Errorf("expected workItem in workqueue but got %s", obj)
utilruntime.HandleError(err)
return nil
}

timeTaken := time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken)
klog.Infof("Dequeuing latency %f seconds", timeTaken)

instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID()
instanceID, err := awsv1.KubernetesInstanceID(workItem.providerID).MapToAWSInstanceID()
if err != nil {
err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err)
err = fmt.Errorf("error in getting instanceID for node %s, error: %v", workItem.name, err)
utilruntime.HandleError(err)
return nil
}
Expand All @@ -239,26 +239,31 @@ func (tc *Controller) process() bool {
tc.workqueue.Forget(obj)
return nil
}

err = workItem.action(workItem.node)

if workItem.action == addTag {
err = tc.tagNodesResources(&taggingControllerNode{
name: workItem.name,
providerID: workItem.providerID,
})
} else {
err = tc.untagNodeResources(&taggingControllerNode{
name: workItem.name,
providerID: workItem.providerID,
})
}
if err != nil {
if workItem.requeuingCount < maxRequeuingCount {
numRetries := tc.workqueue.NumRequeues(workItem)
if numRetries < maxRequeuingCount {
// Put the item back on the workqueue to handle any transient errors.
workItem.requeuingCount++
tc.workqueue.AddRateLimited(workItem)

recordWorkItemErrorMetrics(totalErrorsWorkItemErrorMetric, string(instanceID))
return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), workItem.requeuingCount)
return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), numRetries)
}

klog.Errorf("error processing work item %s: %s, requeuing count exceeded", workItem, err.Error())
recordWorkItemErrorMetrics(errorsAfterRetriesExhaustedWorkItemErrorMetric, string(instanceID))
} else {
klog.Infof("Finished processing %s", workItem)
timeTaken = time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemProcessingTimeWorkItemMetric, timeTaken)
klog.Infof("Processing latency %f seconds", timeTaken)
}

tc.workqueue.Forget(obj)
Expand All @@ -275,11 +280,19 @@ func (tc *Controller) process() bool {

// tagNodesResources tag node resources
// If we want to tag more resources, modify this function appropriately
func (tc *Controller) tagNodesResources(node *v1.Node) error {
func (tc *Controller) tagNodesResources(node *taggingControllerNode) error {
for _, resource := range tc.resources {
switch resource {
case opt.Instance:
err := tc.tagEc2Instance(node)
v1node, err := tc.nodeInformer.Lister().Get(node.name)
if err != nil {
// If node not found, just ignore it as its okay to not add tags when the node object is deleted.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
err = tc.tagEc2Instance(v1node)
if err != nil {
return err
}
Expand Down Expand Up @@ -332,7 +345,7 @@ func (tc *Controller) tagEc2Instance(node *v1.Node) error {

// untagNodeResources untag node resources
// If we want to untag more resources, modify this function appropriately
func (tc *Controller) untagNodeResources(node *v1.Node) error {
func (tc *Controller) untagNodeResources(node *taggingControllerNode) error {
for _, resource := range tc.resources {
switch resource {
case opt.Instance:
Expand All @@ -348,13 +361,13 @@ func (tc *Controller) untagNodeResources(node *v1.Node) error {

// untagEc2Instances deletes the provided tags to each EC2 instances in
// the cluster.
func (tc *Controller) untagEc2Instance(node *v1.Node) error {
instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
func (tc *Controller) untagEc2Instance(node *taggingControllerNode) error {
instanceID, _ := awsv1.KubernetesInstanceID(node.providerID).MapToAWSInstanceID()

err := tc.cloud.UntagResource(string(instanceID), tc.tags)

if err != nil {
klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err)
klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.name, err)
return err
}

Expand All @@ -365,12 +378,13 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error {

// enqueueNode takes in the object and an
// action for the object for a workitem and enqueue to the workqueue
func (tc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) {
item := &workItem{
node: node,
action: action,
requeuingCount: 0,
enqueueTime: time.Now(),
func (tc *Controller) enqueueNode(node *v1.Node, action string) {
// if the struct has fields which are all comparable then the workqueue add will handle make sure multiple adds of the same object
// will only have one item in the workqueue.
item := workItem{
name: node.GetName(),
providerID: node.Spec.ProviderID,
action: action,
}

if tc.rateLimitEnabled {
Expand Down
86 changes: 80 additions & 6 deletions pkg/controllers/tagging/tagging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -221,27 +222,32 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
nodeMonitorPeriod: 1 * time.Second,
tags: map[string]string{"key2": "value2", "key1": "value1"},
resources: []string{"instance"},
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"),
rateLimitEnabled: testcase.rateLimited,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Millisecond, 5*time.Millisecond),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), "Tagging"),
rateLimitEnabled: testcase.rateLimited,
}

if testcase.toBeTagged {
tc.enqueueNode(testcase.currNode, tc.tagNodesResources)
tc.enqueueNode(testcase.currNode, addTag)
} else {
tc.enqueueNode(testcase.currNode, tc.untagNodeResources)
tc.enqueueNode(testcase.currNode, deleteTag)
}

if tc.rateLimitEnabled {
// If rate limit is enabled, sleep for 10 ms to wait for the item to be added to the queue since the base delay is 5 ms.
time.Sleep(10 * time.Millisecond)
}

cnt := 0
for tc.workqueue.Len() > 0 {
tc.process()

cnt++
// sleep briefly because of exponential backoff when requeueing failed workitem
// resulting in workqueue to be empty if checked immediately
time.Sleep(1500 * time.Millisecond)
time.Sleep(7 * time.Millisecond)
}

for _, msg := range testcase.expectedMessages {
Expand All @@ -256,12 +262,80 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
if !strings.Contains(logBuf.String(), "requeuing count exceeded") {
t.Errorf("\nExceeded requeue count but did not stop: \n%v\n", logBuf.String())
}
if cnt != maxRequeuingCount+1 {
t.Errorf("the node got requeued %d, more than the max requeuing count of %d", cnt, maxRequeuingCount)
}
}
}
})
}
}

func TestMultipleEnqueues(t *testing.T) {
awsServices := awsv1.NewFakeAWSServices(TestClusterID)
fakeAws, _ := awsv1.NewAWSCloud(config.CloudConfig{}, awsServices)

testNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "i-0001",
},
}
testNode1 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "i-0002",
},
}
clientset := fake.NewSimpleClientset(testNode, testNode1)
informer := informers.NewSharedInformerFactory(clientset, time.Second)
nodeInformer := informer.Core().V1().Nodes()

if err := syncNodeStore(nodeInformer, clientset); err != nil {
t.Errorf("unexpected error: %v", err)
}

tc, err := NewTaggingController(nodeInformer, clientset, fakeAws, time.Second, nil, []string{}, 0, 0)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
tc.enqueueNode(testNode, addTag)
if tc.workqueue.Len() != 1 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
// adding the same node with similar operation shouldn't add to the workqueue
tc.enqueueNode(testNode, addTag)
if tc.workqueue.Len() != 1 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
// adding the same node with different operation should add to the workqueue
tc.enqueueNode(testNode, deleteTag)
if tc.workqueue.Len() != 2 {
t.Errorf("invalid work queue length, expected 2, got %d", tc.workqueue.Len())
}
// adding the different node should add to the workqueue
tc.enqueueNode(testNode1, addTag)
if tc.workqueue.Len() != 3 {
t.Errorf("invalid work queue length, expected 3, got %d", tc.workqueue.Len())
}
// should handle the add tag properly
tc.process()
if tc.workqueue.Len() != 2 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
// should handle the delete tag properly
tc.process()
if tc.workqueue.Len() != 1 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
}

func syncNodeStore(nodeinformer coreinformers.NodeInformer, f *fake.Clientset) error {
nodes, err := f.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
Expand Down

0 comments on commit 2271e31

Please sign in to comment.