Skip to content

Commit

Permalink
Fix controller reconcile concurrent
Browse files Browse the repository at this point in the history
Signed-off-by: pigletfly <[email protected]>
  • Loading branch information
pigletfly committed Feb 8, 2022
1 parent 346c025 commit 34aed67
Show file tree
Hide file tree
Showing 16 changed files with 172 additions and 20 deletions.
2 changes: 0 additions & 2 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ func startWorkStatusController(ctx controllerscontext.Context) (bool, error) {
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 1,
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
Expand All @@ -242,7 +241,6 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error)
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 1,
PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(ctx.Opts.ClusterName),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
Expand Down
2 changes: 0 additions & 2 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 1,
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
Expand Down Expand Up @@ -284,7 +283,6 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 1,
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
Expand Down
11 changes: 10 additions & 1 deletion pkg/clusterdiscovery/clusterapi/clusterapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clusterapi

import (
"context"
"flag"
"fmt"
"os"

Expand All @@ -27,6 +28,14 @@ import (
"github.com/karmada-io/karmada/pkg/util/informermanager/keys"
)

func init() {
flag.IntVar(&concurrentReconciles, "clusterapi-workers", concurrentReconciles, "Max concurrent workers for clusterapi worker.")
}

var (
concurrentReconciles = 5
)

const (
resourceCluster = "clusters"
)
Expand Down Expand Up @@ -58,7 +67,7 @@ func (d *ClusterDetector) Start(ctx context.Context) error {

d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
d.Processor = util.NewAsyncWorker("cluster-api cluster detector", ClusterWideKeyFunc, d.Reconcile)
d.Processor.Run(1, d.stopCh)
d.Processor.Run(concurrentReconciles, d.stopCh)
d.discoveryCluster()

<-d.stopCh
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/binding/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package binding

import (
"context"
"flag"
"fmt"

corev1 "k8s.io/api/core/v1"
Expand All @@ -15,6 +16,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -30,6 +32,14 @@ import (
"github.com/karmada-io/karmada/pkg/util/overridemanager"
)

func init() {
flag.IntVar(&rbConcurrentReconciles, "resourcebinding-workers", rbConcurrentReconciles, "Max concurrent workers for binding controller.")
}

var (
rbConcurrentReconciles = 5
)

// ControllerName is the controller name that will be used when reporting events.
const ControllerName = "binding-controller"

Expand Down Expand Up @@ -169,6 +179,8 @@ func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manag
Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn).
Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
WithOptions(controller.Options{
MaxConcurrentReconciles: rbConcurrentReconciles}).
Complete(c)
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/binding/cluster_resource_binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package binding

import (
"context"
"flag"
"fmt"

corev1 "k8s.io/api/core/v1"
Expand All @@ -15,6 +16,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -30,6 +32,14 @@ import (
"github.com/karmada-io/karmada/pkg/util/overridemanager"
)

func init() {
flag.IntVar(&crbConcurrentReconciles, "clusterresourcebinding-workers", crbConcurrentReconciles, "Max concurrent workers for clusterresourcebinding controller.")
}

var (
crbConcurrentReconciles = 5
)

// ClusterResourceBindingControllerName is the controller name that will be used when reporting events.
const ClusterResourceBindingControllerName = "cluster-resource-binding-controller"

Expand Down Expand Up @@ -158,6 +168,8 @@ func (c *ClusterResourceBindingController) SetupWithManager(mgr controllerruntim
Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn).
Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
WithOptions(controller.Options{
MaxConcurrentReconciles: crbConcurrentReconciles}).
Complete(c)
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/controllers/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"context"
"flag"
"fmt"
"sync"
"time"
Expand All @@ -19,13 +20,22 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/names"
)

func init() {
flag.IntVar(&concurrentReconciles, "cluster-workers", concurrentReconciles, "Max concurrent workers for cluster controller.")
}

var (
concurrentReconciles = 5
)

const (
// ControllerName is the controller name that will be used when reporting events.
ControllerName = "cluster-controller"
Expand Down Expand Up @@ -104,7 +114,8 @@ func (c *Controller) Start(ctx context.Context) error {
// SetupWithManager creates a controller and register to controller manager.
func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
return utilerrors.NewAggregate([]error{
controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).Complete(c),
controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithOptions(controller.Options{
MaxConcurrentReconciles: concurrentReconciles}).Complete(c),
mgr.Add(c),
})
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/execution/execution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package execution

import (
"context"
"flag"
"fmt"

corev1 "k8s.io/api/core/v1"
Expand All @@ -15,6 +16,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand All @@ -27,6 +29,14 @@ import (
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
)

func init() {
flag.IntVar(&concurrentReconciles, "execution-workers", concurrentReconciles, "Max concurrent workers for execution controller.")
}

var (
concurrentReconciles = 5
)

const (
// ControllerName is the controller name that will be used when reporting events.
ControllerName = "execution-controller"
Expand Down Expand Up @@ -100,6 +110,8 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
For(&workv1alpha1.Work{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
WithEventFilter(c.PredicateFunc).
WithOptions(controller.Options{
MaxConcurrentReconciles: concurrentReconciles}).
Complete(c)
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/controllers/hpa/hpa_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hpa

import (
"context"
"flag"

autoscalingv1 "k8s.io/api/autoscaling/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -14,6 +15,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/kind/pkg/errors"

workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
Expand All @@ -25,6 +27,14 @@ import (
"github.com/karmada-io/karmada/pkg/util/restmapper"
)

func init() {
flag.IntVar(&concurrentReconciles, "hpa-workers", concurrentReconciles, "Max concurrent workers for hpa controller.")
}

var (
concurrentReconciles = 5
)

// ControllerName is the controller name that will be used when reporting events.
const ControllerName = "hpa-controller"

Expand Down Expand Up @@ -153,7 +163,8 @@ func (c *HorizontalPodAutoscalerController) getTargetPlacement(objRef autoscalin

// SetupWithManager creates a controller and register to controller manager.
func (c *HorizontalPodAutoscalerController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&autoscalingv1.HorizontalPodAutoscaler{}).Complete(c)
return controllerruntime.NewControllerManagedBy(mgr).For(&autoscalingv1.HorizontalPodAutoscaler{}).WithOptions(controller.Options{
MaxConcurrentReconciles: concurrentReconciles}).Complete(c)
}

func (c *HorizontalPodAutoscalerController) deleteWorks(workName string) error {
Expand Down
13 changes: 12 additions & 1 deletion pkg/controllers/mcs/endpointslice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mcs

import (
"context"
"flag"

discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -12,6 +13,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand All @@ -21,6 +23,14 @@ import (
"github.com/karmada-io/karmada/pkg/util/names"
)

func init() {
flag.IntVar(&endpointConcurrentReconciles, "endpointslice-workers", endpointConcurrentReconciles, "Max concurrent workers for endpointslice controller.")
}

var (
endpointConcurrentReconciles = 5
)

// EndpointSliceControllerName is the controller name that will be used when reporting events.
const EndpointSliceControllerName = "endpointslice-controller"

Expand Down Expand Up @@ -73,7 +83,8 @@ func (c *EndpointSliceController) SetupWithManager(mgr controllerruntime.Manager
return false
},
}
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(serviceImportPredicateFun).Complete(c)
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(serviceImportPredicateFun).WithOptions(controller.Options{
MaxConcurrentReconciles: endpointConcurrentReconciles}).Complete(c)
}

func (c *EndpointSliceController) collectEndpointSliceFromWork(work *workv1alpha1.Work) (controllerruntime.Result, error) {
Expand Down
16 changes: 13 additions & 3 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mcs

import (
"context"
"flag"
"fmt"
"reflect"
"sync"
Expand All @@ -20,6 +21,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/kind/pkg/errors"
mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
Expand All @@ -33,6 +35,14 @@ import (
"github.com/karmada-io/karmada/pkg/util/names"
)

func init() {
flag.IntVar(&serviceExportConcurrentReconciles, "serviceexport-workers", serviceExportConcurrentReconciles, "Max concurrent workers for serviceexport controller.")
}

var (
serviceExportConcurrentReconciles = 5
)

// ServiceExportControllerName is the controller name that will be used when reporting events.
const ServiceExportControllerName = "service-export-controller"

Expand All @@ -43,7 +53,6 @@ type ServiceExportController struct {
RESTMapper meta.RESTMapper
StopChan <-chan struct{}
InformerManager informermanager.MultiClusterInformerManager
WorkerNumber int // WorkerNumber is the number of worker goroutines
PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys.
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
// eventHandlers holds the handlers which used to handle events reported from member clusters.
Expand Down Expand Up @@ -107,13 +116,14 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr

// SetupWithManager creates a controller and register to controller manager.
func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).Complete(c)
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{
MaxConcurrentReconciles: serviceExportConcurrentReconciles}).Complete(c)
}

// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *ServiceExportController) RunWorkQueue() {
c.worker = util.NewAsyncWorker("service-export", nil, c.syncServiceExportOrEndpointSlice)
c.worker.Run(c.WorkerNumber, c.StopChan)
c.worker.Run(serviceExportConcurrentReconciles, c.StopChan)
}

func (c *ServiceExportController) syncServiceExportOrEndpointSlice(key util.QueueKey) error {
Expand Down
Loading

0 comments on commit 34aed67

Please sign in to comment.