Skip to content
This repository has been archived by the owner on Apr 25, 2023. It is now read-only.

feat: make concurrency of the worker configurable #1400

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@
# Ignore binaries built by the makefile to avoid accidentally committing them
/images/kubefed/hyperfed
/images/kubefed/controller-manager

# editor and IDE paraphernalia
.idea
.vscode

# macOS paraphernalia
.DS_Store
4 changes: 3 additions & 1 deletion charts/kubefed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ chart and their default values.
| controllermanager.clusterHealthCheckFailureThreshold | Minimum consecutive failures for the cluster health to be considered failed after having succeeded. | 3 |
| controllermanager.clusterHealthCheckSuccessThreshold | Minimum consecutive successes for the cluster health to be considered successful after having failed. | 1 |
| controllermanager.clusterHealthCheckTimeout | Duration after which the cluster health check times out. | 3s |
| controllermanager.syncController.adoptResources | Whether to adopt pre-existing resource in member clusters. | Enabled |
| controllermanager.syncController.maxConcurrentReconciles | The maximum number of concurrent Reconciles of sync controller which can be run. | 1 |
| controllermanager.syncController.adoptResources | Whether to adopt pre-existing resource in member clusters. | Enabled |
| controllermanager.statusController.maxConcurrentReconciles | The maximum number of concurrent Reconciles of status controller which can be run. | 1 |
| controllermanager.service.labels | Kubernetes labels attached to the controller manager's services | {} |
| controllermanager.certManager.enabled | Specifies whether to enable the usage of the cert-manager for the certificates generation. | false |
| controllermanager.certManager.rootCertificate.organizations | Specifies the list of organizations to include in the cert-manager generated root certificate. | [] |
Expand Down
13 changes: 13 additions & 0 deletions charts/kubefed/charts/controllermanager/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -692,12 +692,25 @@ spec:
`Namespaced` or `Cluster`. `Namespaced` indicates that the KubeFed
namespace will be the only target of the control plane.
type: string
statusController:
properties:
maxConcurrentReconciles:
description: The maximum number of concurrent Reconciles of status
controller which can be run. Defaults to 1.
format: int64
type: integer
type: object
syncController:
properties:
adoptResources:
description: Whether to adopt pre-existing resources in member
clusters. Defaults to "Enabled".
type: string
maxConcurrentReconciles:
description: The maximum number of concurrent Reconciles of sync
controller which can be run. Defaults to 1.
format: int64
type: integer
type: object
required:
- scope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ spec:
successThreshold: {{ .Values.clusterHealthCheckSuccessThreshold | default 1 }}
timeout: {{ .Values.clusterHealthCheckTimeout | default "3s" | quote }}
syncController:
maxConcurrentReconciles: {{ .Values.syncController.maxConcurrentReconciles | default 1 }}
adoptResources: {{ .Values.syncController.adoptResources | default "Enabled" | quote }}
statusController:
maxConcurrentReconciles: {{ .Values.statusController.maxConcurrentReconciles | default 1 }}
featureGates:
{{- if .Values.featureGates }}
- name: PushReconciler
Expand Down
3 changes: 3 additions & 0 deletions charts/kubefed/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ controllermanager:
## Supported options are `configmaps` and `endpoints`
leaderElectResourceLock:
syncController:
maxConcurrentReconciles:
adoptResources:
jimmidyson marked this conversation as resolved.
Show resolved Hide resolved
statusController:
maxConcurrentReconciles:
## Value of feature gates item should be either `Enabled` or `Disabled`
featureGates:
PushReconciler:
Expand Down
3 changes: 3 additions & 0 deletions cmd/controller-manager/app/controller-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ func setOptionsByKubeFedConfig(opts *options.Options) {
opts.ClusterHealthCheckConfig.FailureThreshold = *spec.ClusterHealthCheck.FailureThreshold
opts.ClusterHealthCheckConfig.SuccessThreshold = *spec.ClusterHealthCheck.SuccessThreshold

opts.Config.MaxConcurrentSyncReconciles = *spec.SyncController.MaxConcurrentReconciles
opts.Config.MaxConcurrentStatusReconciles = *spec.StatusController.MaxConcurrentReconciles

opts.Config.SkipAdoptingResources = *spec.SyncController.AdoptResources == corev1b1.AdoptResourcesDisabled

var featureGates = make(map[string]bool)
Expand Down
3 changes: 3 additions & 0 deletions config/kubefedconfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ spec:
successThreshold: 1
timeout: 3s
syncController:
maxConcurrentReconciles: 1
adoptResources: Enabled
jimmidyson marked this conversation as resolved.
Show resolved Hide resolved
statusController:
maxConcurrentReconciles: 1
11 changes: 11 additions & 0 deletions pkg/apis/core/v1beta1/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ const (
DefaultClusterHealthCheckFailureThreshold = 3
DefaultClusterHealthCheckSuccessThreshold = 1
DefaultClusterHealthCheckTimeout = 3 * time.Second

DefaultSyncControllerMaxConcurrentReconciles = 1
DefaultStatusControllerMaxConcurrentReconciles = 1
)

func SetDefaultKubeFedConfig(fedConfig *v1beta1.KubeFedConfig) {
Expand Down Expand Up @@ -87,10 +90,18 @@ func SetDefaultKubeFedConfig(fedConfig *v1beta1.KubeFedConfig) {
spec.SyncController = &v1beta1.SyncControllerConfig{}
}

setInt64(&spec.SyncController.MaxConcurrentReconciles, DefaultSyncControllerMaxConcurrentReconciles)

if spec.SyncController.AdoptResources == nil {
spec.SyncController.AdoptResources = new(v1beta1.ResourceAdoption)
*spec.SyncController.AdoptResources = v1beta1.AdoptResourcesEnabled
}

if spec.StatusController == nil {
spec.StatusController = &v1beta1.StatusControllerConfig{}
}

setInt64(&spec.StatusController.MaxConcurrentReconciles, DefaultStatusControllerMaxConcurrentReconciles)
}

func setDefaultKubeFedFeatureGates(fgc []v1beta1.FeatureGatesConfig) []v1beta1.FeatureGatesConfig {
Expand Down
17 changes: 16 additions & 1 deletion pkg/apis/core/v1beta1/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,26 @@ func TestSetDefaultKubeFedConfig(t *testing.T) {
successCases["spec.clusterHealthCheck.timeout is preserved"] = KubeFedConfigComparison{timeoutKFC, modifiedTimeoutKFC}

// SyncController
syncControllerMaxConcurrentReconcilesKFC := defaultKubeFedConfig()
syncControllerMaxConcurrentReconciles := int64(DefaultSyncControllerMaxConcurrentReconciles + 3)
syncControllerMaxConcurrentReconcilesKFC.Spec.SyncController.MaxConcurrentReconciles = &syncControllerMaxConcurrentReconciles
modifiedSyncControllerMaxConcurrentReconcilesKFC := syncControllerMaxConcurrentReconcilesKFC.DeepCopyObject().(*v1beta1.KubeFedConfig)
SetDefaultKubeFedConfig(modifiedSyncControllerMaxConcurrentReconcilesKFC)
successCases["spec.syncController.maxConcurrentReconciles is preserved"] = KubeFedConfigComparison{syncControllerMaxConcurrentReconcilesKFC, modifiedSyncControllerMaxConcurrentReconcilesKFC}

adoptResourcesKFC := defaultKubeFedConfig()
*adoptResourcesKFC.Spec.SyncController.AdoptResources = v1beta1.AdoptResourcesDisabled
modifiedAdoptResourcesKFC := adoptResourcesKFC.DeepCopyObject().(*v1beta1.KubeFedConfig)
SetDefaultKubeFedConfig(modifiedAdoptResourcesKFC)
successCases["spec.leaderElect.adoptResources is preserved"] = KubeFedConfigComparison{adoptResourcesKFC, modifiedAdoptResourcesKFC}
successCases["spec.syncController.adoptResources is preserved"] = KubeFedConfigComparison{adoptResourcesKFC, modifiedAdoptResourcesKFC}

// StatusController
statusControllerMaxConcurrentReconcilesKFC := defaultKubeFedConfig()
statusControllerMaxConcurrentReconciles := int64(DefaultStatusControllerMaxConcurrentReconciles + 3)
statusControllerMaxConcurrentReconcilesKFC.Spec.StatusController.MaxConcurrentReconciles = &statusControllerMaxConcurrentReconciles
modifiedStatusControllerMaxConcurrentReconcilesKFC := statusControllerMaxConcurrentReconcilesKFC.DeepCopyObject().(*v1beta1.KubeFedConfig)
SetDefaultKubeFedConfig(modifiedStatusControllerMaxConcurrentReconcilesKFC)
successCases["spec.statusController.maxConcurrentReconciles is preserved"] = KubeFedConfigComparison{statusControllerMaxConcurrentReconcilesKFC, modifiedStatusControllerMaxConcurrentReconcilesKFC}

for k, v := range successCases {
if !reflect.DeepEqual(v.original, v.modified) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/apis/core/v1beta1/kubefedconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type KubeFedConfigSpec struct {
ClusterHealthCheck *ClusterHealthCheckConfig `json:"clusterHealthCheck,omitempty"`
// +optional
SyncController *SyncControllerConfig `json:"syncController,omitempty"`
// +optional
StatusController *StatusControllerConfig `json:"statusController,omitempty"`
}

type DurationConfig struct {
Expand Down Expand Up @@ -105,6 +107,10 @@ type ClusterHealthCheckConfig struct {
}

type SyncControllerConfig struct {
// The maximum number of concurrent Reconciles of sync controller which can be run.
// Defaults to 1.
// +optional
MaxConcurrentReconciles *int64 `json:"maxConcurrentReconciles,omitempty"`
// Whether to adopt pre-existing resources in member clusters. Defaults to
// "Enabled".
// +optional
Expand All @@ -118,6 +124,13 @@ const (
AdoptResourcesDisabled ResourceAdoption = "Disabled"
)

type StatusControllerConfig struct {
// The maximum number of concurrent Reconciles of status controller which can be run.
// Defaults to 1.
// +optional
MaxConcurrentReconciles *int64 `json:"maxConcurrentReconciles,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:resource:path=kubefedconfigs

Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/core/v1beta1/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,19 @@ func ValidateKubeFedConfig(kubeFedConfig, oldKubeFedConfig *v1beta1.KubeFedConfi
case sync.AdoptResources == nil:
allErrs = append(allErrs, field.Required(adoptPath, ""))
default:
allErrs = append(allErrs, validateIntPtrGreaterThan0(syncPath.Child("maxConcurrentReconciles"), sync.MaxConcurrentReconciles)...)
allErrs = append(allErrs, validateEnumStrings(adoptPath, string(*sync.AdoptResources),
[]string{string(v1beta1.AdoptResourcesEnabled), string(v1beta1.AdoptResourcesDisabled)})...)
}

statusController := spec.StatusController
statusControllerPath := specPath.Child("statusController")
if statusController == nil {
allErrs = append(allErrs, field.Required(statusControllerPath, ""))
} else {
allErrs = append(allErrs, validateIntPtrGreaterThan0(statusControllerPath.Child("maxConcurrentReconciles"), statusController.MaxConcurrentReconciles)...)
}

return allErrs
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/apis/core/v1beta1/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,14 @@ func TestValidateKubeFedConfig(t *testing.T) {
invalidSyncControllerNil.Spec.SyncController = nil
errorCases["spec.syncController: Required value"] = invalidSyncControllerNil

invalidSyncControllerMaxConcurrentReconcilesNil := testcommon.ValidKubeFedConfig()
invalidSyncControllerMaxConcurrentReconcilesNil.Spec.SyncController.MaxConcurrentReconciles = nil
errorCases["spec.syncController.maxConcurrentReconciles: Required value"] = invalidSyncControllerMaxConcurrentReconcilesNil

invalidSyncControllerMaxConcurrentReconcilesGreaterThan0 := testcommon.ValidKubeFedConfig()
invalidSyncControllerMaxConcurrentReconcilesGreaterThan0.Spec.SyncController.MaxConcurrentReconciles = zeroIntPtr
errorCases["spec.syncController.maxConcurrentReconciles: Invalid value"] = invalidSyncControllerMaxConcurrentReconcilesGreaterThan0

invalidAdoptResourcesNil := testcommon.ValidKubeFedConfig()
invalidAdoptResourcesNil.Spec.SyncController.AdoptResources = nil
errorCases["spec.syncController.adoptResources: Required value"] = invalidAdoptResourcesNil
Expand All @@ -853,6 +861,18 @@ func TestValidateKubeFedConfig(t *testing.T) {
invalidAdoptResources.Spec.SyncController.AdoptResources = &invalidAdoptResourcesValue
errorCases["spec.syncController.adoptResources: Unsupported value"] = invalidAdoptResources

invalidStatusControllerNil := testcommon.ValidKubeFedConfig()
invalidStatusControllerNil.Spec.StatusController = nil
errorCases["spec.statusController: Required value"] = invalidStatusControllerNil

invalidStatusControllerMaxConcurrentReconcilesNil := testcommon.ValidKubeFedConfig()
invalidStatusControllerMaxConcurrentReconcilesNil.Spec.StatusController.MaxConcurrentReconciles = nil
errorCases["spec.statusController.maxConcurrentReconciles: Required value"] = invalidStatusControllerMaxConcurrentReconcilesNil

invalidStatusControllerMaxConcurrentReconcilesGreaterThan0 := testcommon.ValidKubeFedConfig()
invalidStatusControllerMaxConcurrentReconcilesGreaterThan0.Spec.StatusController.MaxConcurrentReconciles = zeroIntPtr
errorCases["spec.statusController.maxConcurrentReconciles: Invalid value"] = invalidStatusControllerMaxConcurrentReconcilesGreaterThan0

for k, v := range errorCases {
errs := ValidateKubeFedConfig(v, testcommon.ValidKubeFedConfig())
if len(errs) == 0 {
Expand Down
30 changes: 30 additions & 0 deletions pkg/apis/core/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/controller/federatedtypeconfig/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func newController(config *util.ControllerConfig) (*Controller, error) {
stopChannels: make(map[string]chan struct{}),
}

c.worker = util.NewReconcileWorker("federatedtypeconfig", c.reconcile, util.WorkerTiming{})
c.worker = util.NewReconcileWorker("federatedtypeconfig", c.reconcile, util.WorkerOptions{})

// Only watch the KubeFed namespace to ensure
// restrictive authz can be applied to a namespaced
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/schedulingmanager/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func newSchedulingManager(config *util.ControllerConfig) (*SchedulingManager, er
schedulers: util.NewSafeMap(),
}

c.worker = util.NewReconcileWorker("schedulingmanager", c.reconcile, util.WorkerTiming{})
c.worker = util.NewReconcileWorker("schedulingmanager", c.reconcile, util.WorkerOptions{})

var err error
c.store, c.controller, err = util.NewGenericInformer(
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/schedulingpreference/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ func newSchedulingPreferenceController(config *util.ControllerConfig, scheduling
eventRecorder: recorder,
}

s.worker = util.NewReconcileWorker(strings.ToLower(schedulingType.Kind), s.reconcile, util.WorkerTiming{
ClusterSyncDelay: s.clusterAvailableDelay,
s.worker = util.NewReconcileWorker(strings.ToLower(schedulingType.Kind), s.reconcile, util.WorkerOptions{
WorkerTiming: util.WorkerTiming{
ClusterSyncDelay: s.clusterAvailableDelay,
},
})

eventHandlers := schedulingtypes.SchedulerEventHandlers{
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/status/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ func newKubeFedStatusController(controllerConfig *util.ControllerConfig, typeCon
fedNamespace: controllerConfig.KubeFedNamespace,
}

s.worker = util.NewReconcileWorker(strings.ToLower(statusAPIResource.Kind), s.reconcile, util.WorkerTiming{
ClusterSyncDelay: s.clusterAvailableDelay,
s.worker = util.NewReconcileWorker(strings.ToLower(statusAPIResource.Kind), s.reconcile, util.WorkerOptions{
WorkerTiming: util.WorkerTiming{
ClusterSyncDelay: s.clusterAvailableDelay,
},
MaxConcurrentReconciles: int(controllerConfig.MaxConcurrentStatusReconciles),
})

// Build deliverer for triggering cluster reconciliations.
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,11 @@ func newKubeFedSyncController(controllerConfig *util.ControllerConfig, typeConfi
rawResourceStatusCollection: controllerConfig.RawResourceStatusCollection,
}

s.worker = util.NewReconcileWorker(strings.ToLower(federatedTypeAPIResource.Kind), s.reconcile, util.WorkerTiming{
ClusterSyncDelay: s.clusterAvailableDelay,
s.worker = util.NewReconcileWorker(strings.ToLower(federatedTypeAPIResource.Kind), s.reconcile, util.WorkerOptions{
WorkerTiming: util.WorkerTiming{
ClusterSyncDelay: s.clusterAvailableDelay,
},
MaxConcurrentReconciles: int(controllerConfig.MaxConcurrentSyncReconciles),
})

// Build deliverer for triggering cluster reconciliations.
Expand Down
13 changes: 13 additions & 0 deletions pkg/controller/testdata/fixtures/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -774,12 +774,25 @@ spec:
`Namespaced` or `Cluster`. `Namespaced` indicates that the KubeFed
namespace will be the only target of the control plane.
type: string
statusController:
properties:
maxConcurrentReconciles:
description: The maximum number of concurrent Reconciles of status
controller which can be run. Defaults to 1.
format: int64
type: integer
type: object
syncController:
properties:
adoptResources:
description: Whether to adopt pre-existing resources in member
clusters. Defaults to "Enabled".
type: string
maxConcurrentReconciles:
description: The maximum number of concurrent Reconciles of sync
controller which can be run. Defaults to 1.
format: int64
type: integer
type: object
required:
- scope
Expand Down
14 changes: 8 additions & 6 deletions pkg/controller/util/controllerconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ type ClusterHealthCheckConfig struct {
// controllers.
type ControllerConfig struct {
KubeFedNamespaces
KubeConfig *restclient.Config
ClusterAvailableDelay time.Duration
ClusterUnavailableDelay time.Duration
MinimizeLatency bool
SkipAdoptingResources bool
RawResourceStatusCollection bool
KubeConfig *restclient.Config
ClusterAvailableDelay time.Duration
ClusterUnavailableDelay time.Duration
MinimizeLatency bool
MaxConcurrentSyncReconciles int64
MaxConcurrentStatusReconciles int64
SkipAdoptingResources bool
RawResourceStatusCollection bool
}

func (c *ControllerConfig) LimitedScope() bool {
Expand Down
Loading