Skip to content

Commit

Permalink
Add flags to configure controller parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
sushrk committed Nov 14, 2024
1 parent b709b9c commit 755bc49
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 44 deletions.
2 changes: 1 addition & 1 deletion controllers/core/configmap_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *ConfigMapReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
)
}
} else {
r.Log.Error(err, "failed to retrieve branch ENI cool down period from amazon-vpc-cni configmap, will retain the current cooldown period", "cool down period", curCoolDownPeriod)
r.Log.Info("branch ENI cool down period not configured in amazon-vpc-cni configmap, will retain the current cooldown period", "cool down period", curCoolDownPeriod)
}

// Check if the Windows IPAM flag has changed
Expand Down
7 changes: 3 additions & 4 deletions controllers/core/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ var (
// one routines to help high rate churn and larger nodes groups restarting
// when the controller has to be restarted for various reasons.
const (
MaxNodeConcurrentReconciles = 10
NodeTerminationFinalizer = "networking.k8s.aws/resource-cleanup"
NodeTerminationFinalizer = "networking.k8s.aws/resource-cleanup"
)

// NodeReconciler reconciles a Node object
Expand Down Expand Up @@ -143,7 +142,7 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
return ctrl.Result{}, err
}

func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager, healthzHandler *rcHealthz.HealthzHandler) error {
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager, maxConcurrentReconciles int, healthzHandler *rcHealthz.HealthzHandler) error {
// add health check on subpath for node controller
healthzHandler.AddControllersHealthCheckers(
map[string]healthz.Checker{"health-node-controller": r.Check()},
Expand All @@ -153,7 +152,7 @@ func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager, healthzHandler *rcHe

return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Node{}).
WithOptions(controller.Options{MaxConcurrentReconciles: MaxNodeConcurrentReconciles}).
WithOptions(controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}).
Owns(&v1alpha1.CNINode{}).
Complete(r)
}
Expand Down
9 changes: 4 additions & 5 deletions controllers/core/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ type PodReconciler struct {
}

var (
PodRequeueRequest = ctrl.Result{Requeue: true, RequeueAfter: time.Second}
MaxPodConcurrentReconciles = 20
PodRequeueRequest = ctrl.Result{Requeue: true, RequeueAfter: time.Second}
)

// Reconcile handles create/update/delete event by delegating the request to the handler
Expand Down Expand Up @@ -192,8 +191,8 @@ func getAggregateResources(pod *v1.Pod) map[string]int64 {
// list of runnable. After Manager acquire the lease the pod controller runnable
// will be started and the Pod events will be sent to Reconcile function
func (r *PodReconciler) SetupWithManager(ctx context.Context, manager ctrl.Manager,
clientSet *kubernetes.Clientset, pageLimit int, syncPeriod time.Duration, healthzHandler *rcHealthz.HealthzHandler) error {
r.Log.Info("The pod controller is using MaxConcurrentReconciles", "Routines", MaxPodConcurrentReconciles)
clientSet *kubernetes.Clientset, pageLimit int, syncPeriod time.Duration, maxConcurrentReconciles int, healthzHandler *rcHealthz.HealthzHandler) error {
r.Log.Info("The pod controller is using MaxConcurrentReconciles", "Routines", maxConcurrentReconciles)

customChecker, err := custom.NewControllerManagedBy(ctx, manager).
WithLogger(r.Log.WithName("custom pod controller")).
Expand All @@ -205,7 +204,7 @@ func (r *PodReconciler) SetupWithManager(ctx context.Context, manager ctrl.Manag
}).Options(custom.Options{
PageLimit: pageLimit,
ResyncPeriod: syncPeriod,
MaxConcurrentReconciles: MaxPodConcurrentReconciles,
MaxConcurrentReconciles: maxConcurrentReconciles,
}).UsingConditions(r.Condition).Complete(r)

// add health check on subpath for pod and pod customized controllers
Expand Down
33 changes: 27 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ func main() {
var enableWindowsPrefixDelegation bool
var region string
var vpcID string
var nodeWorkerCount int
var userClientQPS int
var userClientBurst int
var instanceClientQPS int
var instanceClientBurst int
var apiServerQPS int
var apiServerBurst int
var maxPodConcurrentReconciles int
var maxNodeConcurrentReconciles int

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080",
"The address the metric endpoint binds to.")
Expand Down Expand Up @@ -143,6 +152,17 @@ func main() {
"Enable the feature flag for Windows prefix delegation")
flag.StringVar(&region, "aws-region", "", "The aws region of the k8s cluster")
flag.StringVar(&vpcID, "vpc-id", "", "The VPC ID where EKS cluster is deployed")
flag.IntVar(&nodeWorkerCount, "node-mgr-workers", 10, "The number of node workers")
flag.IntVar(&userClientQPS, "user-client-qps", 12, "The user client QPS rate")
flag.IntVar(&userClientBurst, "user-client-burst", 18, "The user client burst limit")
flag.IntVar(&instanceClientQPS, "instance-qps", 12, "The instance client QPS rate")
flag.IntVar(&instanceClientBurst, "instance-burst", 18, "The instance client burst limit")
// API Server QPS & burst
// Use the same values as default client (https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/client/config/config.go#L85)
flag.IntVar(&apiServerQPS, "apiserver-qps", 20, "The API server client QPS rate")
flag.IntVar(&apiServerBurst, "apiserver-burst", 30, "The API server client burst limit")
flag.IntVar(&maxPodConcurrentReconciles, "max-pod-reconcile", 20, "The maximum number of concurrent reconciles for pod controller")
flag.IntVar(&maxNodeConcurrentReconciles, "max-node-reconcile", 10, "The maximum number of concurrent reconciles for node controller")

flag.Parse()

Expand Down Expand Up @@ -200,8 +220,8 @@ func main() {

kubeConfig := ctrl.GetConfigOrDie()
// Set the API Server QPS and Burst
kubeConfig.QPS = config.DefaultAPIServerQPS
kubeConfig.Burst = config.DefaultAPIServerBurst
kubeConfig.QPS = float32(apiServerQPS)
kubeConfig.Burst = apiServerBurst
kubeConfig.UserAgent = fmt.Sprintf("%s/%s", ec2API.AppName, version.GitVersion)

setupLog.Info("starting the controller with leadership setting",
Expand Down Expand Up @@ -270,7 +290,8 @@ func main() {
region = ""
}

ec2Wrapper, err := ec2API.NewEC2Wrapper(roleARN, clusterName, region, setupLog)
ec2Wrapper, err := ec2API.NewEC2Wrapper(roleARN, clusterName, region, instanceClientQPS,
instanceClientBurst, userClientQPS, userClientBurst, setupLog)
if err != nil {
setupLog.Error(err, "unable to create ec2 wrapper")
}
Expand Down Expand Up @@ -316,7 +337,7 @@ func main() {
}

nodeManagerWorkers := asyncWorkers.NewDefaultWorkerPool("node async workers",
10, 1, ctrl.Log.WithName("node async workers"), ctx)
nodeWorkerCount, 1, ctrl.Log.WithName("node async workers"), ctx)
nodeManager, err := manager.NewNodeManager(ctrl.Log.WithName("node manager"), resourceManager,
apiWrapper, nodeManagerWorkers, controllerConditions, version.GitVersion, healthzHandler)

Expand All @@ -334,7 +355,7 @@ func main() {
K8sAPI: k8sApi,
DataStore: dataStore,
Condition: controllerConditions,
}).SetupWithManager(ctx, mgr, clientSet, listPageLimit, syncPeriod, healthzHandler); err != nil {
}).SetupWithManager(ctx, mgr, clientSet, listPageLimit, syncPeriod, maxPodConcurrentReconciles, healthzHandler); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "pod")
os.Exit(1)
}
Expand All @@ -357,7 +378,7 @@ func main() {
Manager: nodeManager,
Conditions: controllerConditions,
Context: ctx,
}).SetupWithManager(mgr, healthzHandler); err != nil {
}).SetupWithManager(mgr, maxNodeConcurrentReconciles, healthzHandler); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Node")
os.Exit(1)
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/aws/ec2/api/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"strings"
"time"

"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -363,7 +362,8 @@ type ec2Wrapper struct {

// NewEC2Wrapper takes the roleARN that will be assumed to make all the EC2 API Calls, if no roleARN
// is passed then the ec2 client will be initialized with the instance's service role account.
func NewEC2Wrapper(roleARN, clusterName, region string, log logr.Logger) (EC2Wrapper, error) {
func NewEC2Wrapper(roleARN, clusterName, region string, instanceClientQPS, instanceClientBurst,
userClientQPS, userClientBurst int, log logr.Logger) (EC2Wrapper, error) {
// Register the metrics
prometheusRegister()

Expand All @@ -377,28 +377,28 @@ func NewEC2Wrapper(roleARN, clusterName, region string, log logr.Logger) (EC2Wra
// Role ARN is passed, assume the role ARN to make EC2 API Calls
if roleARN != "" {
// Create the instance service client with low QPS, it will be only used fro associate branch to trunk calls
log.Info("Creating INSTANCE service client with configured QPS", "QPS", config.InstanceServiceClientQPS, "Burst", config.InstanceServiceClientBurst)
instanceServiceClient, err := ec2Wrapper.getInstanceServiceClient(config.InstanceServiceClientQPS,
config.InstanceServiceClientBurst, instanceSession)
log.Info("Creating INSTANCE service client with configured QPS", "QPS", instanceClientQPS, "Burst", instanceClientBurst)
instanceServiceClient, err := ec2Wrapper.getInstanceServiceClient(instanceClientQPS, instanceClientBurst,
instanceSession)
if err != nil {
return nil, err
}
ec2Wrapper.instanceServiceClient = instanceServiceClient

// Create the user service client with higher QPS, this will be used to make rest of the EC2 API Calls
log.Info("Creating USER service client with configured QPS", "QPS", config.UserServiceClientQPS, "Burst", config.UserServiceClientQPSBurst)
log.Info("Creating USER service client with configured QPS", "QPS", userClientQPS, "Burst", userClientBurst)
userServiceClient, err := ec2Wrapper.getClientUsingAssumedRole(*instanceSession.Config.Region, roleARN, clusterName, region,
config.UserServiceClientQPS, config.UserServiceClientQPSBurst)
userClientQPS, userClientBurst)
if err != nil {
return nil, err
}
ec2Wrapper.userServiceClient = userServiceClient
} else {
// Role ARN is not provided, assuming that instance service client is whitelisted for ENI branching and use
// Role ARN is not provided, assuming that instance service client is allowlisted for ENI branching and use
// the instance service client as the user service client with higher QPS.
log.Info("Creating INSTANCE service client with configured USER Service QPS", "QPS", config.InstanceServiceClientQPS, "Burst", config.InstanceServiceClientBurst)
instanceServiceClient, err := ec2Wrapper.getInstanceServiceClient(config.UserServiceClientQPS,
config.UserServiceClientQPSBurst, instanceSession)
log.Info("Creating INSTANCE service client with configured USER Service QPS", "QPS", userClientQPS, "Burst", userClientBurst)
instanceServiceClient, err := ec2Wrapper.getInstanceServiceClient(userClientQPS,
userClientBurst, instanceSession)
if err != nil {
return nil, err
}
Expand Down
17 changes: 0 additions & 17 deletions pkg/config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,6 @@ const (
IPv4PDDefaultWarmIPTargetSize = 1
IPv4PDDefaultMinIPTargetSize = 3
IPv4PDDefaultWarmPrefixTargetSize = 0

// EC2 API QPS for user service client
// Tested: 15 + 20 limits
// Tested: 15 + 8 limits (not seeing significant degradation from 15+20)
// Tested: 12 + 8 limits (not seeing significant degradation from 15+8)
// Larger number seems not make latency better than 12+8
UserServiceClientQPS = 12
UserServiceClientQPSBurst = 18

// EC2 API QPS for instance service client
InstanceServiceClientQPS = 12
InstanceServiceClientBurst = 18

// API Server QPS
// Use the same values as default client (https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/client/config/config.go#L85)
DefaultAPIServerQPS = 20
DefaultAPIServerBurst = 30
)

// LoadResourceConfig returns the Resource Configuration for all resources managed by the VPC Resource Controller. Currently
Expand Down

0 comments on commit 755bc49

Please sign in to comment.