From 755bc49306c62fdebe9aea75e336f03be12fb39d Mon Sep 17 00:00:00 2001 From: sushrk Date: Thu, 14 Nov 2024 08:07:25 +0000 Subject: [PATCH 1/2] Add flags to configure controller parameters --- controllers/core/configmap_controller.go | 2 +- controllers/core/node_controller.go | 7 +++-- controllers/core/pod_controller.go | 9 +++---- main.go | 33 +++++++++++++++++++----- pkg/aws/ec2/api/wrapper.go | 22 ++++++++-------- pkg/config/loader.go | 17 ------------ 6 files changed, 46 insertions(+), 44 deletions(-) diff --git a/controllers/core/configmap_controller.go b/controllers/core/configmap_controller.go index e28037cd..45bf4c0a 100644 --- a/controllers/core/configmap_controller.go +++ b/controllers/core/configmap_controller.go @@ -91,7 +91,7 @@ func (r *ConfigMapReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( ) } } else { - r.Log.Error(err, "failed to retrieve branch ENI cool down period from amazon-vpc-cni configmap, will retain the current cooldown period", "cool down period", curCoolDownPeriod) + r.Log.Info("branch ENI cool down period not configured in amazon-vpc-cni configmap, will retain the current cooldown period", "cool down period", curCoolDownPeriod) } // Check if the Windows IPAM flag has changed diff --git a/controllers/core/node_controller.go b/controllers/core/node_controller.go index d8b9b78b..a794bef7 100644 --- a/controllers/core/node_controller.go +++ b/controllers/core/node_controller.go @@ -57,8 +57,7 @@ var ( // one routines to help high rate churn and larger nodes groups restarting // when the controller has to be restarted for various reasons. const ( - MaxNodeConcurrentReconciles = 10 - NodeTerminationFinalizer = "networking.k8s.aws/resource-cleanup" + NodeTerminationFinalizer = "networking.k8s.aws/resource-cleanup" ) // NodeReconciler reconciles a Node object @@ -143,7 +142,7 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. return ctrl.Result{}, err } -func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager, healthzHandler *rcHealthz.HealthzHandler) error { +func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager, maxConcurrentReconciles int, healthzHandler *rcHealthz.HealthzHandler) error { // add health check on subpath for node controller healthzHandler.AddControllersHealthCheckers( map[string]healthz.Checker{"health-node-controller": r.Check()}, @@ -153,7 +152,7 @@ func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager, healthzHandler *rcHe return ctrl.NewControllerManagedBy(mgr). For(&corev1.Node{}). - WithOptions(controller.Options{MaxConcurrentReconciles: MaxNodeConcurrentReconciles}). + WithOptions(controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}). Owns(&v1alpha1.CNINode{}). Complete(r) } diff --git a/controllers/core/pod_controller.go b/controllers/core/pod_controller.go index fc8ab5c5..6b31f834 100644 --- a/controllers/core/pod_controller.go +++ b/controllers/core/pod_controller.go @@ -56,8 +56,7 @@ type PodReconciler struct { } var ( - PodRequeueRequest = ctrl.Result{Requeue: true, RequeueAfter: time.Second} - MaxPodConcurrentReconciles = 20 + PodRequeueRequest = ctrl.Result{Requeue: true, RequeueAfter: time.Second} ) // Reconcile handles create/update/delete event by delegating the request to the handler @@ -192,8 +191,8 @@ func getAggregateResources(pod *v1.Pod) map[string]int64 { // list of runnable. After Manager acquire the lease the pod controller runnable // will be started and the Pod events will be sent to Reconcile function func (r *PodReconciler) SetupWithManager(ctx context.Context, manager ctrl.Manager, - clientSet *kubernetes.Clientset, pageLimit int, syncPeriod time.Duration, healthzHandler *rcHealthz.HealthzHandler) error { - r.Log.Info("The pod controller is using MaxConcurrentReconciles", "Routines", MaxPodConcurrentReconciles) + clientSet *kubernetes.Clientset, pageLimit int, syncPeriod time.Duration, maxConcurrentReconciles int, healthzHandler *rcHealthz.HealthzHandler) error { + r.Log.Info("The pod controller is using MaxConcurrentReconciles", "Routines", maxConcurrentReconciles) customChecker, err := custom.NewControllerManagedBy(ctx, manager). WithLogger(r.Log.WithName("custom pod controller")). @@ -205,7 +204,7 @@ func (r *PodReconciler) SetupWithManager(ctx context.Context, manager ctrl.Manag }).Options(custom.Options{ PageLimit: pageLimit, ResyncPeriod: syncPeriod, - MaxConcurrentReconciles: MaxPodConcurrentReconciles, + MaxConcurrentReconciles: maxConcurrentReconciles, }).UsingConditions(r.Condition).Complete(r) // add health check on subpath for pod and pod customized controllers diff --git a/main.go b/main.go index 51c070c1..7b927ff3 100644 --- a/main.go +++ b/main.go @@ -108,6 +108,15 @@ func main() { var enableWindowsPrefixDelegation bool var region string var vpcID string + var nodeWorkerCount int + var userClientQPS int + var userClientBurst int + var instanceClientQPS int + var instanceClientBurst int + var apiServerQPS int + var apiServerBurst int + var maxPodConcurrentReconciles int + var maxNodeConcurrentReconciles int flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") @@ -143,6 +152,17 @@ func main() { "Enable the feature flag for Windows prefix delegation") flag.StringVar(®ion, "aws-region", "", "The aws region of the k8s cluster") flag.StringVar(&vpcID, "vpc-id", "", "The VPC ID where EKS cluster is deployed") + flag.IntVar(&nodeWorkerCount, "node-mgr-workers", 10, "The number of node workers") + flag.IntVar(&userClientQPS, "user-client-qps", 12, "The user client QPS rate") + flag.IntVar(&userClientBurst, "user-client-burst", 18, "The user client burst limit") + flag.IntVar(&instanceClientQPS, "instance-qps", 12, "The instance client QPS rate") + flag.IntVar(&instanceClientBurst, "instance-burst", 18, "The instance client burst limit") + // API Server QPS & burst + // Use the same values as default client (https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/client/config/config.go#L85) + flag.IntVar(&apiServerQPS, "apiserver-qps", 20, "The API server client QPS rate") + flag.IntVar(&apiServerBurst, "apiserver-burst", 30, "The API server client burst limit") + flag.IntVar(&maxPodConcurrentReconciles, "max-pod-reconcile", 20, "The maximum number of concurrent reconciles for pod controller") + flag.IntVar(&maxNodeConcurrentReconciles, "max-node-reconcile", 10, "The maximum number of concurrent reconciles for node controller") flag.Parse() @@ -200,8 +220,8 @@ func main() { kubeConfig := ctrl.GetConfigOrDie() // Set the API Server QPS and Burst - kubeConfig.QPS = config.DefaultAPIServerQPS - kubeConfig.Burst = config.DefaultAPIServerBurst + kubeConfig.QPS = float32(apiServerQPS) + kubeConfig.Burst = apiServerBurst kubeConfig.UserAgent = fmt.Sprintf("%s/%s", ec2API.AppName, version.GitVersion) setupLog.Info("starting the controller with leadership setting", @@ -270,7 +290,8 @@ func main() { region = "" } - ec2Wrapper, err := ec2API.NewEC2Wrapper(roleARN, clusterName, region, setupLog) + ec2Wrapper, err := ec2API.NewEC2Wrapper(roleARN, clusterName, region, instanceClientQPS, + instanceClientBurst, userClientQPS, userClientBurst, setupLog) if err != nil { setupLog.Error(err, "unable to create ec2 wrapper") } @@ -316,7 +337,7 @@ func main() { } nodeManagerWorkers := asyncWorkers.NewDefaultWorkerPool("node async workers", - 10, 1, ctrl.Log.WithName("node async workers"), ctx) + nodeWorkerCount, 1, ctrl.Log.WithName("node async workers"), ctx) nodeManager, err := manager.NewNodeManager(ctrl.Log.WithName("node manager"), resourceManager, apiWrapper, nodeManagerWorkers, controllerConditions, version.GitVersion, healthzHandler) @@ -334,7 +355,7 @@ func main() { K8sAPI: k8sApi, DataStore: dataStore, Condition: controllerConditions, - }).SetupWithManager(ctx, mgr, clientSet, listPageLimit, syncPeriod, healthzHandler); err != nil { + }).SetupWithManager(ctx, mgr, clientSet, listPageLimit, syncPeriod, maxPodConcurrentReconciles, healthzHandler); err != nil { setupLog.Error(err, "unable to create controller", "controller", "pod") os.Exit(1) } @@ -357,7 +378,7 @@ func main() { Manager: nodeManager, Conditions: controllerConditions, Context: ctx, - }).SetupWithManager(mgr, healthzHandler); err != nil { + }).SetupWithManager(mgr, maxNodeConcurrentReconciles, healthzHandler); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Node") os.Exit(1) } diff --git a/pkg/aws/ec2/api/wrapper.go b/pkg/aws/ec2/api/wrapper.go index 9be5a773..29e93156 100644 --- a/pkg/aws/ec2/api/wrapper.go +++ b/pkg/aws/ec2/api/wrapper.go @@ -19,7 +19,6 @@ import ( "strings" "time" - "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils" "github.com/aws/aws-sdk-go/aws" @@ -363,7 +362,8 @@ type ec2Wrapper struct { // NewEC2Wrapper takes the roleARN that will be assumed to make all the EC2 API Calls, if no roleARN // is passed then the ec2 client will be initialized with the instance's service role account. -func NewEC2Wrapper(roleARN, clusterName, region string, log logr.Logger) (EC2Wrapper, error) { +func NewEC2Wrapper(roleARN, clusterName, region string, instanceClientQPS, instanceClientBurst, + userClientQPS, userClientBurst int, log logr.Logger) (EC2Wrapper, error) { // Register the metrics prometheusRegister() @@ -377,28 +377,28 @@ func NewEC2Wrapper(roleARN, clusterName, region string, log logr.Logger) (EC2Wra // Role ARN is passed, assume the role ARN to make EC2 API Calls if roleARN != "" { // Create the instance service client with low QPS, it will be only used fro associate branch to trunk calls - log.Info("Creating INSTANCE service client with configured QPS", "QPS", config.InstanceServiceClientQPS, "Burst", config.InstanceServiceClientBurst) - instanceServiceClient, err := ec2Wrapper.getInstanceServiceClient(config.InstanceServiceClientQPS, - config.InstanceServiceClientBurst, instanceSession) + log.Info("Creating INSTANCE service client with configured QPS", "QPS", instanceClientQPS, "Burst", instanceClientBurst) + instanceServiceClient, err := ec2Wrapper.getInstanceServiceClient(instanceClientQPS, instanceClientBurst, + instanceSession) if err != nil { return nil, err } ec2Wrapper.instanceServiceClient = instanceServiceClient // Create the user service client with higher QPS, this will be used to make rest of the EC2 API Calls - log.Info("Creating USER service client with configured QPS", "QPS", config.UserServiceClientQPS, "Burst", config.UserServiceClientQPSBurst) + log.Info("Creating USER service client with configured QPS", "QPS", userClientQPS, "Burst", userClientBurst) userServiceClient, err := ec2Wrapper.getClientUsingAssumedRole(*instanceSession.Config.Region, roleARN, clusterName, region, - config.UserServiceClientQPS, config.UserServiceClientQPSBurst) + userClientQPS, userClientBurst) if err != nil { return nil, err } ec2Wrapper.userServiceClient = userServiceClient } else { - // Role ARN is not provided, assuming that instance service client is whitelisted for ENI branching and use + // Role ARN is not provided, assuming that instance service client is allowlisted for ENI branching and use // the instance service client as the user service client with higher QPS. - log.Info("Creating INSTANCE service client with configured USER Service QPS", "QPS", config.InstanceServiceClientQPS, "Burst", config.InstanceServiceClientBurst) - instanceServiceClient, err := ec2Wrapper.getInstanceServiceClient(config.UserServiceClientQPS, - config.UserServiceClientQPSBurst, instanceSession) + log.Info("Creating INSTANCE service client with configured USER Service QPS", "QPS", userClientQPS, "Burst", userClientBurst) + instanceServiceClient, err := ec2Wrapper.getInstanceServiceClient(userClientQPS, + userClientBurst, instanceSession) if err != nil { return nil, err } diff --git a/pkg/config/loader.go b/pkg/config/loader.go index 833af06a..358e2987 100644 --- a/pkg/config/loader.go +++ b/pkg/config/loader.go @@ -43,23 +43,6 @@ const ( IPv4PDDefaultWarmIPTargetSize = 1 IPv4PDDefaultMinIPTargetSize = 3 IPv4PDDefaultWarmPrefixTargetSize = 0 - - // EC2 API QPS for user service client - // Tested: 15 + 20 limits - // Tested: 15 + 8 limits (not seeing significant degradation from 15+20) - // Tested: 12 + 8 limits (not seeing significant degradation from 15+8) - // Larger number seems not make latency better than 12+8 - UserServiceClientQPS = 12 - UserServiceClientQPSBurst = 18 - - // EC2 API QPS for instance service client - InstanceServiceClientQPS = 12 - InstanceServiceClientBurst = 18 - - // API Server QPS - // Use the same values as default client (https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/client/config/config.go#L85) - DefaultAPIServerQPS = 20 - DefaultAPIServerBurst = 30 ) // LoadResourceConfig returns the Resource Configuration for all resources managed by the VPC Resource Controller. Currently From 06fa8a938759a9f3229ebad2be34f25ed12e1d3e Mon Sep 17 00:00:00 2001 From: sushrk Date: Thu, 14 Nov 2024 21:34:05 +0000 Subject: [PATCH 2/2] update flag names --- main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 7b927ff3..c11b39c7 100644 --- a/main.go +++ b/main.go @@ -155,8 +155,8 @@ func main() { flag.IntVar(&nodeWorkerCount, "node-mgr-workers", 10, "The number of node workers") flag.IntVar(&userClientQPS, "user-client-qps", 12, "The user client QPS rate") flag.IntVar(&userClientBurst, "user-client-burst", 18, "The user client burst limit") - flag.IntVar(&instanceClientQPS, "instance-qps", 12, "The instance client QPS rate") - flag.IntVar(&instanceClientBurst, "instance-burst", 18, "The instance client burst limit") + flag.IntVar(&instanceClientQPS, "instance-client-qps", 12, "The instance client QPS rate") + flag.IntVar(&instanceClientBurst, "instance-client-burst", 18, "The instance client burst limit") // API Server QPS & burst // Use the same values as default client (https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/client/config/config.go#L85) flag.IntVar(&apiServerQPS, "apiserver-qps", 20, "The API server client QPS rate")