Skip to content

Commit

Permalink
Drop v1.Node from workItem
Browse files Browse the repository at this point in the history
Signed-off-by: Davanum Srinivas <[email protected]>
  • Loading branch information
dims committed Jan 15, 2025
1 parent bf18a51 commit b85c677
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 34 deletions.
69 changes: 38 additions & 31 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ func init() {

// workItem contains the node and an action for that node
type workItem struct {
node *v1.Node
name string
providerID string
action func(node *v1.Node) error
requeuingCount int
enqueueTime time.Time
}

func (w workItem) String() string {
return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime)
return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.name, w.requeuingCount, w.enqueueTime)
}

const (
Expand Down Expand Up @@ -82,7 +83,7 @@ type Controller struct {
nodeInformer coreinformers.NodeInformer
kubeClient clientset.Interface
cloud *awsv1.Cloud
workqueue workqueue.RateLimitingInterface
workqueue workqueue.TypedRateLimitingInterface[*workItem]
nodesSynced cache.InformerSynced

// Value controlling Controller monitoring period, i.e. how often does Controller
Expand Down Expand Up @@ -116,20 +117,23 @@ func NewTaggingController(
return nil, err
}

var rateLimiter workqueue.TypedRateLimiter[any]
var rateLimiter workqueue.TypedRateLimiter[*workItem]
var rateLimitEnabled bool
if rateLimit > 0.0 && burstLimit > 0 {
klog.Infof("Rate limit enabled on controller with rate %f and burst %d.", rateLimit, burstLimit)

// This is the workqueue.DefaultControllerRateLimiter() but in case where throttling is enabled on the controller,
// the rate and burst values are set to the provided values.
rateLimiter = workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[any](5*time.Millisecond, 1000*time.Second),
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(rateLimit), burstLimit)},
rateLimiter = workqueue.NewTypedMaxOfRateLimiter[*workItem](
workqueue.NewTypedItemExponentialFailureRateLimiter[*workItem](5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[*workItem]{Limiter: rate.NewLimiter(rate.Limit(rateLimit), burstLimit)},
)

rateLimitEnabled = true
} else {
klog.Infof("Rate limit disabled on controller.")
rateLimiter = workqueue.DefaultTypedControllerRateLimiter[any]()
rateLimiter = workqueue.DefaultTypedControllerRateLimiter[*workItem]()
rateLimitEnabled = false
}

Expand All @@ -139,9 +143,10 @@ func NewTaggingController(
cloud: awsCloud,
tags: tags,
resources: resources,
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{
Name: TaggingControllerClientName,
}),
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(
rateLimiter,
workqueue.TypedRateLimitingQueueConfig[*workItem]{Name: TaggingControllerClientName},
),
nodesSynced: nodeInformer.Informer().HasSynced,
nodeMonitorPeriod: nodeMonitorPeriod,
rateLimitEnabled: rateLimitEnabled,
Expand Down Expand Up @@ -205,31 +210,32 @@ func (tc *Controller) work() {
// process reads each message in the queue and performs either
// tag or untag function on the Node object
func (tc *Controller) process() bool {
obj, shutdown := tc.workqueue.Get()
item, shutdown := tc.workqueue.Get()
if shutdown {
return false
}

klog.Infof("Starting to process %s", obj)

err := func(obj interface{}) error {
defer tc.workqueue.Done(obj)
klog.Infof("Starting to process %s", item)
node, exists, err := tc.nodeInformer.Informer().GetIndexer().GetByKey(item.name)
if err != nil {
klog.Errorf("Error occurred while getting node from informer %s", item)
utilruntime.HandleError(err)
}
if !exists {
klog.Errorf("Error occurred node missing in informer %s", item)
return false
}

workItem, ok := obj.(*workItem)
if !ok {
tc.workqueue.Forget(obj)
err := fmt.Errorf("expected workItem in workqueue but got %s", obj)
utilruntime.HandleError(err)
return nil
}
err = func(workItem *workItem, node *v1.Node) error {
defer tc.workqueue.Done(workItem)

timeTaken := time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken)
klog.Infof("Dequeuing latency %f seconds", timeTaken)

instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID()
instanceID, err := awsv1.KubernetesInstanceID(workItem.providerID).MapToAWSInstanceID()
if err != nil {
err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err)
err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.name, err)
utilruntime.HandleError(err)
return nil
}
Expand All @@ -238,11 +244,11 @@ func (tc *Controller) process() bool {
if variant.IsVariantNode(string(instanceID)) {
klog.Infof("Skip processing the node %s since it is a %s node",
instanceID, variant.NodeType(string(instanceID)))
tc.workqueue.Forget(obj)
tc.workqueue.Forget(workItem)
return nil
}

err = workItem.action(workItem.node)
err = workItem.action(node)

if err != nil {
if workItem.requeuingCount < maxRequeuingCount {
Expand All @@ -263,12 +269,12 @@ func (tc *Controller) process() bool {
klog.Infof("Processing latency %f seconds", timeTaken)
}

tc.workqueue.Forget(obj)
tc.workqueue.Forget(workItem)
return nil
}(obj)
}(item, node.(*v1.Node))

if err != nil {
klog.Errorf("Error occurred while processing %s", obj)
klog.Errorf("Error occurred while processing %s", item)
utilruntime.HandleError(err)
}

Expand Down Expand Up @@ -369,7 +375,8 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error {
// action for the object for a workitem and enqueue to the workqueue
func (tc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) {
item := &workItem{
node: node,
name: node.GetName(),
providerID: node.Spec.ProviderID,
action: action,
requeuingCount: 0,
enqueueTime: time.Now(),
Expand Down
9 changes: 6 additions & 3 deletions pkg/controllers/tagging/tagging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
klog.SetOutput(os.Stderr)
}()

clientset := fake.NewSimpleClientset(testcase.currNode)
clientset := fake.NewClientset(testcase.currNode)
informer := informers.NewSharedInformerFactory(clientset, time.Second)
nodeInformer := informer.Core().V1().Nodes()

Expand All @@ -221,8 +221,11 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
nodeMonitorPeriod: 1 * time.Second,
tags: map[string]string{"key2": "value2", "key1": "value1"},
resources: []string{"instance"},
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"),
rateLimitEnabled: testcase.rateLimited,
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[*workItem](),
workqueue.TypedRateLimitingQueueConfig[*workItem]{Name: "Tagging"},
),
rateLimitEnabled: testcase.rateLimited,
}

if testcase.toBeTagged {
Expand Down

0 comments on commit b85c677

Please sign in to comment.