diff --git a/pkg/controllers/tagging/metrics.go b/pkg/controllers/tagging/metrics.go index 6ac3a9db2f..32980d7a04 100644 --- a/pkg/controllers/tagging/metrics.go +++ b/pkg/controllers/tagging/metrics.go @@ -14,23 +14,15 @@ limitations under the License. package tagging import ( + "sync" + "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" - "sync" ) var register sync.Once var ( - workItemDuration = metrics.NewHistogramVec( - &metrics.HistogramOpts{ - Name: "cloudprovider_aws_tagging_controller_work_item_duration_seconds", - Help: "workitem latency of workitem being in the queue and time it takes to process", - StabilityLevel: metrics.ALPHA, - Buckets: metrics.ExponentialBuckets(0.5, 1.5, 20), - }, - []string{"latency_type"}) - workItemError = metrics.NewCounterVec( &metrics.CounterOpts{ Name: "cloudprovider_aws_tagging_controller_work_item_errors_total", @@ -43,15 +35,10 @@ var ( // registerMetrics registers tagging-controller metrics. func registerMetrics() { register.Do(func() { - legacyregistry.MustRegister(workItemDuration) legacyregistry.MustRegister(workItemError) }) } -func recordWorkItemLatencyMetrics(latencyType string, timeTaken float64) { - workItemDuration.With(metrics.Labels{"latency_type": latencyType}).Observe(timeTaken) -} - func recordWorkItemErrorMetrics(errorType string, instanceID string) { workItemError.With(metrics.Labels{"error_type": errorType, "instance_id": instanceID}).Inc() } diff --git a/pkg/controllers/tagging/tagging_controller.go b/pkg/controllers/tagging/tagging_controller.go index 909c8237eb..c5fa9645c5 100644 --- a/pkg/controllers/tagging/tagging_controller.go +++ b/pkg/controllers/tagging/tagging_controller.go @@ -22,6 +22,7 @@ import ( "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" @@ -42,16 +43,21 @@ func init() { registerMetrics() } -// workItem contains the node and an action for that node +// taggingControllerNode contains the node details required for tag/untag of node resources. +type taggingControllerNode struct { + providerID string + name string +} + +// workItem contains the node name, provider id and an action for that node. type workItem struct { - node *v1.Node - action func(node *v1.Node) error - requeuingCount int - enqueueTime time.Time + name string + providerID string + action string } func (w workItem) String() string { - return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime) + return fmt.Sprintf("[Node: %s, Action: %s]", w.name, w.action) } const ( @@ -62,17 +68,15 @@ const ( // The label for depicting total number of errors a work item encounter and succeed totalErrorsWorkItemErrorMetric = "total_errors" - // The label for depicting total time when work item gets queued to processed - workItemProcessingTimeWorkItemMetric = "work_item_processing_time" - - // The label for depicting total time when work item gets queued to dequeued - workItemDequeuingTimeWorkItemMetric = "work_item_dequeuing_time" - // The label for depicting total number of errors a work item encounter and fail errorsAfterRetriesExhaustedWorkItemErrorMetric = "errors_after_retries_exhausted" // The period of time after Node creation to retry tagging due to eventual consistency of the CreateTags API. newNodeEventualConsistencyGracePeriod = time.Minute * 5 + + addTag = "ADD" + + deleteTag = "DELETE" ) // Controller is the controller implementation for tagging cluster resources. @@ -150,7 +154,7 @@ func NewTaggingController( tc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { node := obj.(*v1.Node) - tc.enqueueNode(node, tc.tagNodesResources) + tc.enqueueNode(node, addTag) }, UpdateFunc: func(oldObj, newObj interface{}) { node := newObj.(*v1.Node) @@ -163,11 +167,11 @@ func NewTaggingController( return } - tc.enqueueNode(node, tc.tagNodesResources) + tc.enqueueNode(node, addTag) }, DeleteFunc: func(obj interface{}) { node := obj.(*v1.Node) - tc.enqueueNode(node, tc.untagNodeResources) + tc.enqueueNode(node, deleteTag) }, }) @@ -213,7 +217,7 @@ func (tc *Controller) process() bool { err := func(obj interface{}) error { defer tc.workqueue.Done(obj) - workItem, ok := obj.(*workItem) + workItem, ok := obj.(workItem) if !ok { tc.workqueue.Forget(obj) err := fmt.Errorf("expected workItem in workqueue but got %s", obj) @@ -221,13 +225,9 @@ func (tc *Controller) process() bool { return nil } - timeTaken := time.Since(workItem.enqueueTime).Seconds() - recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken) - klog.Infof("Dequeuing latency %f seconds", timeTaken) - - instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID() + instanceID, err := awsv1.KubernetesInstanceID(workItem.providerID).MapToAWSInstanceID() if err != nil { - err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err) + err = fmt.Errorf("error in getting instanceID for node %s, error: %v", workItem.name, err) utilruntime.HandleError(err) return nil } @@ -239,26 +239,31 @@ func (tc *Controller) process() bool { tc.workqueue.Forget(obj) return nil } - - err = workItem.action(workItem.node) - + if workItem.action == addTag { + err = tc.tagNodesResources(&taggingControllerNode{ + name: workItem.name, + providerID: workItem.providerID, + }) + } else { + err = tc.untagNodeResources(&taggingControllerNode{ + name: workItem.name, + providerID: workItem.providerID, + }) + } if err != nil { - if workItem.requeuingCount < maxRequeuingCount { + numRetries := tc.workqueue.NumRequeues(workItem) + if numRetries < maxRequeuingCount { // Put the item back on the workqueue to handle any transient errors. - workItem.requeuingCount++ tc.workqueue.AddRateLimited(workItem) recordWorkItemErrorMetrics(totalErrorsWorkItemErrorMetric, string(instanceID)) - return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), workItem.requeuingCount) + return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), numRetries) } klog.Errorf("error processing work item %s: %s, requeuing count exceeded", workItem, err.Error()) recordWorkItemErrorMetrics(errorsAfterRetriesExhaustedWorkItemErrorMetric, string(instanceID)) } else { klog.Infof("Finished processing %s", workItem) - timeTaken = time.Since(workItem.enqueueTime).Seconds() - recordWorkItemLatencyMetrics(workItemProcessingTimeWorkItemMetric, timeTaken) - klog.Infof("Processing latency %f seconds", timeTaken) } tc.workqueue.Forget(obj) @@ -275,11 +280,19 @@ func (tc *Controller) process() bool { // tagNodesResources tag node resources // If we want to tag more resources, modify this function appropriately -func (tc *Controller) tagNodesResources(node *v1.Node) error { +func (tc *Controller) tagNodesResources(node *taggingControllerNode) error { for _, resource := range tc.resources { switch resource { case opt.Instance: - err := tc.tagEc2Instance(node) + v1node, err := tc.nodeInformer.Lister().Get(node.name) + if err != nil { + // If node not found, just ignore it as its okay to not add tags when the node object is deleted. + if apierrors.IsNotFound(err) { + return nil + } + return err + } + err = tc.tagEc2Instance(v1node) if err != nil { return err } @@ -332,7 +345,7 @@ func (tc *Controller) tagEc2Instance(node *v1.Node) error { // untagNodeResources untag node resources // If we want to untag more resources, modify this function appropriately -func (tc *Controller) untagNodeResources(node *v1.Node) error { +func (tc *Controller) untagNodeResources(node *taggingControllerNode) error { for _, resource := range tc.resources { switch resource { case opt.Instance: @@ -348,13 +361,13 @@ func (tc *Controller) untagNodeResources(node *v1.Node) error { // untagEc2Instances deletes the provided tags to each EC2 instances in // the cluster. -func (tc *Controller) untagEc2Instance(node *v1.Node) error { - instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID() +func (tc *Controller) untagEc2Instance(node *taggingControllerNode) error { + instanceID, _ := awsv1.KubernetesInstanceID(node.providerID).MapToAWSInstanceID() err := tc.cloud.UntagResource(string(instanceID), tc.tags) if err != nil { - klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err) + klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.name, err) return err } @@ -365,12 +378,13 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error { // enqueueNode takes in the object and an // action for the object for a workitem and enqueue to the workqueue -func (tc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) { - item := &workItem{ - node: node, - action: action, - requeuingCount: 0, - enqueueTime: time.Now(), +func (tc *Controller) enqueueNode(node *v1.Node, action string) { + // if the struct has fields which are all comparable then the workqueue add will handle make sure multiple adds of the same object + // will only have one item in the workqueue. + item := workItem{ + name: node.GetName(), + providerID: node.Spec.ProviderID, + action: action, } if tc.rateLimitEnabled { diff --git a/pkg/controllers/tagging/tagging_controller_test.go b/pkg/controllers/tagging/tagging_controller_test.go index 89142bec12..595910ece2 100644 --- a/pkg/controllers/tagging/tagging_controller_test.go +++ b/pkg/controllers/tagging/tagging_controller_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -221,14 +222,18 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { nodeMonitorPeriod: 1 * time.Second, tags: map[string]string{"key2": "value2", "key1": "value1"}, resources: []string{"instance"}, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"), - rateLimitEnabled: testcase.rateLimited, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 5*time.Millisecond), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), "Tagging"), + rateLimitEnabled: testcase.rateLimited, } if testcase.toBeTagged { - tc.enqueueNode(testcase.currNode, tc.tagNodesResources) + tc.enqueueNode(testcase.currNode, addTag) } else { - tc.enqueueNode(testcase.currNode, tc.untagNodeResources) + tc.enqueueNode(testcase.currNode, deleteTag) } if tc.rateLimitEnabled { @@ -236,12 +241,13 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { time.Sleep(10 * time.Millisecond) } + cnt := 0 for tc.workqueue.Len() > 0 { tc.process() - + cnt++ // sleep briefly because of exponential backoff when requeueing failed workitem // resulting in workqueue to be empty if checked immediately - time.Sleep(1500 * time.Millisecond) + time.Sleep(7 * time.Millisecond) } for _, msg := range testcase.expectedMessages { @@ -256,12 +262,80 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { if !strings.Contains(logBuf.String(), "requeuing count exceeded") { t.Errorf("\nExceeded requeue count but did not stop: \n%v\n", logBuf.String()) } + if cnt != maxRequeuingCount+1 { + t.Errorf("the node got requeued %d, more than the max requeuing count of %d", cnt, maxRequeuingCount) + } } } }) } } +func TestMultipleEnqueues(t *testing.T) { + awsServices := awsv1.NewFakeAWSServices(TestClusterID) + fakeAws, _ := awsv1.NewAWSCloud(config.CloudConfig{}, awsServices) + + testNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: v1.NodeSpec{ + ProviderID: "i-0001", + }, + } + testNode1 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: v1.NodeSpec{ + ProviderID: "i-0002", + }, + } + clientset := fake.NewSimpleClientset(testNode, testNode1) + informer := informers.NewSharedInformerFactory(clientset, time.Second) + nodeInformer := informer.Core().V1().Nodes() + + if err := syncNodeStore(nodeInformer, clientset); err != nil { + t.Errorf("unexpected error: %v", err) + } + + tc, err := NewTaggingController(nodeInformer, clientset, fakeAws, time.Second, nil, []string{}, 0, 0) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + tc.enqueueNode(testNode, addTag) + if tc.workqueue.Len() != 1 { + t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len()) + } + // adding the same node with similar operation shouldn't add to the workqueue + tc.enqueueNode(testNode, addTag) + if tc.workqueue.Len() != 1 { + t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len()) + } + // adding the same node with different operation should add to the workqueue + tc.enqueueNode(testNode, deleteTag) + if tc.workqueue.Len() != 2 { + t.Errorf("invalid work queue length, expected 2, got %d", tc.workqueue.Len()) + } + // adding the different node should add to the workqueue + tc.enqueueNode(testNode1, addTag) + if tc.workqueue.Len() != 3 { + t.Errorf("invalid work queue length, expected 3, got %d", tc.workqueue.Len()) + } + // should handle the add tag properly + tc.process() + if tc.workqueue.Len() != 2 { + t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len()) + } + // should handle the delete tag properly + tc.process() + if tc.workqueue.Len() != 1 { + t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len()) + } +} + func syncNodeStore(nodeinformer coreinformers.NodeInformer, f *fake.Clientset) error { nodes, err := f.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) if err != nil {