diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 521d77a89112..da41431b11f0 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -222,7 +222,6 @@ func startWorkStatusController(ctx controllerscontext.Context) (bool, error) { RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: informermanager.GetInstance(), StopChan: ctx.StopChan, - WorkerNumber: 1, ObjectWatcher: ctx.ObjectWatcher, PredicateFunc: helper.NewExecutionPredicateOnAgent(), ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, @@ -242,7 +241,6 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error) RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: informermanager.GetInstance(), StopChan: ctx.StopChan, - WorkerNumber: 1, PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(ctx.Opts.ClusterName), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index c1cbf4ff0f8b..08f7f9d5c7b5 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -247,7 +247,6 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: informermanager.GetInstance(), StopChan: ctx.StopChan, - WorkerNumber: 1, ObjectWatcher: ctx.ObjectWatcher, PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), ClusterClientSetFunc: util.NewClusterDynamicClientSet, @@ -284,7 +283,6 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool, RESTMapper: ctx.Mgr.GetRESTMapper(), InformerManager: informermanager.GetInstance(), StopChan: ctx.StopChan, - WorkerNumber: 1, PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, diff --git a/pkg/clusterdiscovery/clusterapi/clusterapi.go b/pkg/clusterdiscovery/clusterapi/clusterapi.go index 2c97dc262979..6c6ab8c33cd9 100644 --- a/pkg/clusterdiscovery/clusterapi/clusterapi.go +++ b/pkg/clusterdiscovery/clusterapi/clusterapi.go @@ -2,6 +2,7 @@ package clusterapi import ( "context" + "flag" "fmt" "os" @@ -27,6 +28,14 @@ import ( "github.com/karmada-io/karmada/pkg/util/informermanager/keys" ) +func init() { + flag.IntVar(&concurrentReconciles, "clusterapi-workers", concurrentReconciles, "Max concurrent workers for clusterapi worker.") +} + +var ( + concurrentReconciles = 5 +) + const ( resourceCluster = "clusters" ) @@ -58,7 +67,7 @@ func (d *ClusterDetector) Start(ctx context.Context) error { d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) d.Processor = util.NewAsyncWorker("cluster-api cluster detector", ClusterWideKeyFunc, d.Reconcile) - d.Processor.Run(1, d.stopCh) + d.Processor.Run(concurrentReconciles, d.stopCh) d.discoveryCluster() <-d.stopCh diff --git a/pkg/controllers/binding/binding_controller.go b/pkg/controllers/binding/binding_controller.go index 8cf584b33bd7..31c57f7136d6 100644 --- a/pkg/controllers/binding/binding_controller.go +++ b/pkg/controllers/binding/binding_controller.go @@ -2,6 +2,7 @@ package binding import ( "context" + "flag" "fmt" corev1 "k8s.io/api/core/v1" @@ -15,6 +16,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -30,6 +32,14 @@ import ( "github.com/karmada-io/karmada/pkg/util/overridemanager" ) +func init() { + flag.IntVar(&rbConcurrentReconciles, "resourcebinding-workers", rbConcurrentReconciles, "Max concurrent workers for binding controller.") +} + +var ( + rbConcurrentReconciles = 5 +) + // ControllerName is the controller name that will be used when reporting events. const ControllerName = "binding-controller" @@ -169,6 +179,8 @@ func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manag Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn). Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). + WithOptions(controller.Options{ + MaxConcurrentReconciles: rbConcurrentReconciles}). Complete(c) } diff --git a/pkg/controllers/binding/cluster_resource_binding_controller.go b/pkg/controllers/binding/cluster_resource_binding_controller.go index db1beb9405b0..9ea4579afba1 100644 --- a/pkg/controllers/binding/cluster_resource_binding_controller.go +++ b/pkg/controllers/binding/cluster_resource_binding_controller.go @@ -2,6 +2,7 @@ package binding import ( "context" + "flag" "fmt" corev1 "k8s.io/api/core/v1" @@ -15,6 +16,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -30,6 +32,14 @@ import ( "github.com/karmada-io/karmada/pkg/util/overridemanager" ) +func init() { + flag.IntVar(&crbConcurrentReconciles, "clusterresourcebinding-workers", crbConcurrentReconciles, "Max concurrent workers for clusterresourcebinding controller.") +} + +var ( + crbConcurrentReconciles = 5 +) + // ClusterResourceBindingControllerName is the controller name that will be used when reporting events. const ClusterResourceBindingControllerName = "cluster-resource-binding-controller" @@ -158,6 +168,8 @@ func (c *ClusterResourceBindingController) SetupWithManager(mgr controllerruntim Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn). Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())). + WithOptions(controller.Options{ + MaxConcurrentReconciles: crbConcurrentReconciles}). Complete(c) } diff --git a/pkg/controllers/cluster/cluster_controller.go b/pkg/controllers/cluster/cluster_controller.go index 116d302441c5..cba4b864241e 100644 --- a/pkg/controllers/cluster/cluster_controller.go +++ b/pkg/controllers/cluster/cluster_controller.go @@ -2,6 +2,7 @@ package cluster import ( "context" + "flag" "fmt" "sync" "time" @@ -19,6 +20,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" @@ -26,6 +28,14 @@ import ( "github.com/karmada-io/karmada/pkg/util/names" ) +func init() { + flag.IntVar(&concurrentReconciles, "cluster-workers", concurrentReconciles, "Max concurrent workers for cluster controller.") +} + +var ( + concurrentReconciles = 5 +) + const ( // ControllerName is the controller name that will be used when reporting events. ControllerName = "cluster-controller" @@ -104,7 +114,8 @@ func (c *Controller) Start(ctx context.Context) error { // SetupWithManager creates a controller and register to controller manager. func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { return utilerrors.NewAggregate([]error{ - controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).Complete(c), + controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithOptions(controller.Options{ + MaxConcurrentReconciles: concurrentReconciles}).Complete(c), mgr.Add(c), }) } diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index 5e6bef76e26e..dd8944beb095 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -2,6 +2,7 @@ package execution import ( "context" + "flag" "fmt" corev1 "k8s.io/api/core/v1" @@ -15,6 +16,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -27,6 +29,14 @@ import ( "github.com/karmada-io/karmada/pkg/util/objectwatcher" ) +func init() { + flag.IntVar(&concurrentReconciles, "execution-workers", concurrentReconciles, "Max concurrent workers for execution controller.") +} + +var ( + concurrentReconciles = 5 +) + const ( // ControllerName is the controller name that will be used when reporting events. ControllerName = "execution-controller" @@ -100,6 +110,8 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { For(&workv1alpha1.Work{}). WithEventFilter(predicate.GenerationChangedPredicate{}). WithEventFilter(c.PredicateFunc). + WithOptions(controller.Options{ + MaxConcurrentReconciles: concurrentReconciles}). Complete(c) } diff --git a/pkg/controllers/hpa/hpa_controller.go b/pkg/controllers/hpa/hpa_controller.go index cbde5f3ffbd3..bcdc7a96fd6a 100644 --- a/pkg/controllers/hpa/hpa_controller.go +++ b/pkg/controllers/hpa/hpa_controller.go @@ -2,6 +2,7 @@ package hpa import ( "context" + "flag" autoscalingv1 "k8s.io/api/autoscaling/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -14,6 +15,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/kind/pkg/errors" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" @@ -25,6 +27,14 @@ import ( "github.com/karmada-io/karmada/pkg/util/restmapper" ) +func init() { + flag.IntVar(&concurrentReconciles, "hpa-workers", concurrentReconciles, "Max concurrent workers for hpa controller.") +} + +var ( + concurrentReconciles = 5 +) + // ControllerName is the controller name that will be used when reporting events. const ControllerName = "hpa-controller" @@ -153,7 +163,8 @@ func (c *HorizontalPodAutoscalerController) getTargetPlacement(objRef autoscalin // SetupWithManager creates a controller and register to controller manager. func (c *HorizontalPodAutoscalerController) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr).For(&autoscalingv1.HorizontalPodAutoscaler{}).Complete(c) + return controllerruntime.NewControllerManagedBy(mgr).For(&autoscalingv1.HorizontalPodAutoscaler{}).WithOptions(controller.Options{ + MaxConcurrentReconciles: concurrentReconciles}).Complete(c) } func (c *HorizontalPodAutoscalerController) deleteWorks(workName string) error { diff --git a/pkg/controllers/mcs/endpointslice_controller.go b/pkg/controllers/mcs/endpointslice_controller.go index 18ab1189eca2..24ceafbcbf3c 100644 --- a/pkg/controllers/mcs/endpointslice_controller.go +++ b/pkg/controllers/mcs/endpointslice_controller.go @@ -2,6 +2,7 @@ package mcs import ( "context" + "flag" discoveryv1 "k8s.io/api/discovery/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -12,6 +13,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -21,6 +23,14 @@ import ( "github.com/karmada-io/karmada/pkg/util/names" ) +func init() { + flag.IntVar(&endpointConcurrentReconciles, "endpointslice-workers", endpointConcurrentReconciles, "Max concurrent workers for endpointslice controller.") +} + +var ( + endpointConcurrentReconciles = 5 +) + // EndpointSliceControllerName is the controller name that will be used when reporting events. const EndpointSliceControllerName = "endpointslice-controller" @@ -73,7 +83,8 @@ func (c *EndpointSliceController) SetupWithManager(mgr controllerruntime.Manager return false }, } - return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(serviceImportPredicateFun).Complete(c) + return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(serviceImportPredicateFun).WithOptions(controller.Options{ + MaxConcurrentReconciles: endpointConcurrentReconciles}).Complete(c) } func (c *EndpointSliceController) collectEndpointSliceFromWork(work *workv1alpha1.Work) (controllerruntime.Result, error) { diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index e7bf199c0aeb..0231cfedd8a4 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -2,6 +2,7 @@ package mcs import ( "context" + "flag" "fmt" "reflect" "sync" @@ -20,6 +21,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/kind/pkg/errors" mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" @@ -33,6 +35,14 @@ import ( "github.com/karmada-io/karmada/pkg/util/names" ) +func init() { + flag.IntVar(&serviceExportConcurrentReconciles, "serviceexport-workers", serviceExportConcurrentReconciles, "Max concurrent workers for serviceexport controller.") +} + +var ( + serviceExportConcurrentReconciles = 5 +) + // ServiceExportControllerName is the controller name that will be used when reporting events. const ServiceExportControllerName = "service-export-controller" @@ -43,7 +53,6 @@ type ServiceExportController struct { RESTMapper meta.RESTMapper StopChan <-chan struct{} InformerManager informermanager.MultiClusterInformerManager - WorkerNumber int // WorkerNumber is the number of worker goroutines PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys. ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) // eventHandlers holds the handlers which used to handle events reported from member clusters. @@ -107,13 +116,14 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr // SetupWithManager creates a controller and register to controller manager. func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).Complete(c) + return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{ + MaxConcurrentReconciles: serviceExportConcurrentReconciles}).Complete(c) } // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *ServiceExportController) RunWorkQueue() { c.worker = util.NewAsyncWorker("service-export", nil, c.syncServiceExportOrEndpointSlice) - c.worker.Run(c.WorkerNumber, c.StopChan) + c.worker.Run(serviceExportConcurrentReconciles, c.StopChan) } func (c *ServiceExportController) syncServiceExportOrEndpointSlice(key util.QueueKey) error { diff --git a/pkg/controllers/mcs/service_import_controller.go b/pkg/controllers/mcs/service_import_controller.go index 1cbdb4492b5f..c16309547a1e 100644 --- a/pkg/controllers/mcs/service_import_controller.go +++ b/pkg/controllers/mcs/service_import_controller.go @@ -2,6 +2,7 @@ package mcs import ( "context" + "flag" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -12,11 +13,20 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" "github.com/karmada-io/karmada/pkg/util/names" ) +func init() { + flag.IntVar(&serviceImportConcurrentReconciles, "serviceimport-workers", serviceImportConcurrentReconciles, "Max concurrent workers for serviceimport controller.") +} + +var ( + serviceImportConcurrentReconciles = 5 +) + // ServiceImportControllerName is the controller name that will be used when reporting events. const ServiceImportControllerName = "service-import-controller" @@ -48,7 +58,8 @@ func (c *ServiceImportController) Reconcile(ctx context.Context, req controllerr // SetupWithManager creates a controller and register to controller manager. func (c *ServiceImportController) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr).For(&mcsv1alpha1.ServiceImport{}).Complete(c) + return controllerruntime.NewControllerManagedBy(mgr).For(&mcsv1alpha1.ServiceImport{}).WithOptions(controller.Options{ + MaxConcurrentReconciles: serviceImportConcurrentReconciles}).Complete(c) } func (c *ServiceImportController) deleteDerivedService(svcImport types.NamespacedName) (controllerruntime.Result, error) { diff --git a/pkg/controllers/namespace/namespace_sync_controller.go b/pkg/controllers/namespace/namespace_sync_controller.go index 9431dd3033b8..41b0f4786ef2 100644 --- a/pkg/controllers/namespace/namespace_sync_controller.go +++ b/pkg/controllers/namespace/namespace_sync_controller.go @@ -2,6 +2,7 @@ package namespace import ( "context" + "flag" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -12,6 +13,7 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -25,6 +27,14 @@ import ( "github.com/karmada-io/karmada/pkg/util/names" ) +func init() { + flag.IntVar(&concurrentReconciles, "namespace-sync-workers", concurrentReconciles, "Max concurrent workers for namespace sync controller.") +} + +var ( + concurrentReconciles = 5 +) + const ( // ControllerName is the controller name that will be used when reporting events. ControllerName = "namespace-sync-controller" @@ -158,5 +168,6 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { return controllerruntime.NewControllerManagedBy(mgr). For(&corev1.Namespace{}).Watches(&source.Kind{Type: &clusterv1alpha1.Cluster{}}, handler.EnqueueRequestsFromMapFunc(namespaceFn), - predicate).Complete(c) + predicate).WithOptions(controller.Options{ + MaxConcurrentReconciles: concurrentReconciles}).Complete(c) } diff --git a/pkg/controllers/status/cluster_status_controller.go b/pkg/controllers/status/cluster_status_controller.go index 92ffdc194297..e6d613a1c03d 100644 --- a/pkg/controllers/status/cluster_status_controller.go +++ b/pkg/controllers/status/cluster_status_controller.go @@ -2,6 +2,7 @@ package status import ( "context" + "flag" "fmt" "net/http" "strings" @@ -26,6 +27,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -35,6 +37,14 @@ import ( "github.com/karmada-io/karmada/pkg/util/informermanager" ) +func init() { + flag.IntVar(&clusterStatusConcurrentReconciles, "cluster-status-workers", clusterStatusConcurrentReconciles, "Max concurrent workers for cluster status controller.") +} + +var ( + clusterStatusConcurrentReconciles = 5 +) + const ( // ControllerName is the controller name that will be used when reporting events. ControllerName = "cluster-status-controller" @@ -112,7 +122,8 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr // SetupWithManager creates a controller and register to controller manager. func (c *ClusterStatusController) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).Complete(c) + return controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{ + MaxConcurrentReconciles: clusterStatusConcurrentReconciles}).Complete(c) } func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Cluster) (controllerruntime.Result, error) { diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index 8487fe72d9e2..d348422d744a 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -3,6 +3,7 @@ package status import ( "context" "encoding/json" + "flag" "fmt" "reflect" @@ -18,6 +19,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/predicate" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" @@ -31,6 +33,16 @@ import ( "github.com/karmada-io/karmada/pkg/util/restmapper" ) +func init() { + flag.IntVar(&workStatusConcurrentReconciles, "work-status-workers", workStatusConcurrentReconciles, "Max concurrent workers for work status controller.") + flag.IntVar(&resourceStatusConcurrentReconciles, "resource-status-workers", resourceStatusConcurrentReconciles, "Max concurrent workers for resource worker.") +} + +var ( + workStatusConcurrentReconciles = 5 + resourceStatusConcurrentReconciles = 5 +) + // WorkStatusControllerName is the controller name that will be used when reporting events. const WorkStatusControllerName = "work-status-controller" @@ -42,7 +54,6 @@ type WorkStatusController struct { InformerManager informermanager.MultiClusterInformerManager eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. StopChan <-chan struct{} - WorkerNumber int // WorkerNumber is the number of worker goroutines worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. ObjectWatcher objectwatcher.ObjectWatcher PredicateFunc predicate.Predicate @@ -117,7 +128,7 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *WorkStatusController) RunWorkQueue() { c.worker = util.NewAsyncWorker("work-status", generateKey, c.syncWorkStatus) - c.worker.Run(c.WorkerNumber, c.StopChan) + c.worker.Run(resourceStatusConcurrentReconciles, c.StopChan) } // generateKey generates a key from obj, the key contains cluster, GVK, namespace and name. @@ -448,5 +459,6 @@ func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1. // SetupWithManager creates a controller and register to controller manager. func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).Complete(c) + return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{ + MaxConcurrentReconciles: workStatusConcurrentReconciles}).Complete(c) } diff --git a/pkg/controllers/unifiedauth/unified_auth_controller.go b/pkg/controllers/unifiedauth/unified_auth_controller.go index 0618e34fd004..9f2d2eabf11b 100644 --- a/pkg/controllers/unifiedauth/unified_auth_controller.go +++ b/pkg/controllers/unifiedauth/unified_auth_controller.go @@ -2,6 +2,7 @@ package unifiedauth import ( "context" + "flag" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -15,6 +16,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -28,6 +30,14 @@ import ( "github.com/karmada-io/karmada/pkg/util/names" ) +func init() { + flag.IntVar(&concurrentReconciles, "unified-auth-workers", concurrentReconciles, "Max concurrent workers for unified-auth controller.") +} + +var ( + concurrentReconciles = 5 +) + const ( // ControllerName is the controller name that will be used when reporting events. ControllerName = "unified-auth-controller" @@ -244,7 +254,10 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { return utilerrors.NewAggregate([]error{ controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(clusterPredicateFunc). Watches(&source.Kind{Type: &rbacv1.ClusterRole{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleMapFunc())). - Watches(&source.Kind{Type: &rbacv1.ClusterRoleBinding{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleBindingMapFunc())).Complete(c), + Watches(&source.Kind{Type: &rbacv1.ClusterRoleBinding{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleBindingMapFunc())). + WithOptions(controller.Options{ + MaxConcurrentReconciles: concurrentReconciles}). + Complete(c), mgr.Add(c), }) } diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index a36e7b427c27..e09328a665ac 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -2,6 +2,7 @@ package detector import ( "context" + "flag" "fmt" "reflect" "sync" @@ -38,6 +39,15 @@ import ( "github.com/karmada-io/karmada/pkg/util/restmapper" ) +func init() { + flag.IntVar(&concurrentReconciles, "detector-workers", concurrentReconciles, "Max concurrent workers for detector controller.") +} + +var ( + concurrentReconciles = 5 + rbConcurrentReconciles = flag.Lookup("resourcebinding-workers").Value.(flag.Getter).Get().(int) +) + // ResourceDetector is a resource watcher which watches all resources and reconcile the events. type ResourceDetector struct { // DiscoveryClientSet is used to resource discovery. @@ -94,7 +104,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { // setup binding reconcile worker d.bindingReconcileWorker = util.NewAsyncWorker("resourceBinding reconciler", ClusterWideKeyFunc, d.ReconcileResourceBinding) - d.bindingReconcileWorker.Run(1, d.stopCh) + d.bindingReconcileWorker.Run(rbConcurrentReconciles, d.stopCh) // watch and enqueue ResourceBinding changes. resourceBindingGVR := schema.GroupVersionResource{ @@ -119,7 +129,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { d.InformerManager.ForResource(propagationPolicyGVR, d.EventHandler) d.InformerManager.ForResource(clusterPropagationPolicyGVR, d.EventHandler) d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile) - d.Processor.Run(1, d.stopCh) + d.Processor.Run(concurrentReconciles, d.stopCh) go d.discoverResources(30 * time.Second) <-d.stopCh