Skip to content
This repository has been archived by the owner on Apr 25, 2023. It is now read-only.

feat: introduce informer cache sync timeout #1460

Merged
merged 4 commits into from
Oct 21, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/kubefed/README.md
Original file line number Diff line number Diff line change
@@ -115,6 +115,7 @@ chart and their default values.
| controllermanager.featureGates.SchedulerPreferences | Scheduler preferences feature. | true |
| controllermanager.clusterAvailableDelay | Time to wait before reconciling on a healthy cluster. | 20s |
| controllermanager.clusterUnavailableDelay | Time to wait before giving up on an unhealthy cluster. | 60s |
| controllermanager.cacheSyncTimeout | Time to wait for all caches to sync before exit. | 5m |
| controllermanager.leaderElectLeaseDuration | The maximum duration that a leader can be stopped before it is replaced by another candidate. | 15s |
| controllermanager.leaderElectRenewDeadline | The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to `controllermanager.LeaderElectLeaseDuration. | 10s |
| controllermanager.leaderElectRetryPeriod | The duration the clients should wait between attempting acquisition and renewal of a leadership. | 5s |
3 changes: 3 additions & 0 deletions charts/kubefed/charts/controllermanager/crds/crds.yaml
Original file line number Diff line number Diff line change
@@ -645,6 +645,9 @@ spec:
availableDelay:
description: Time to wait before reconciling on a healthy cluster.
type: string
cacheSyncTimeout:
description: Time to wait for all caches to sync before exit.
type: string
unavailableDelay:
description: Time to wait before giving up on an unhealthy cluster.
type: string
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ spec:
controllerDuration:
availableDelay: {{ .Values.clusterAvailableDelay | default "20s" | quote }}
unavailableDelay: {{ .Values.clusterUnavailableDelay | default "60s" | quote }}
cacheSyncTimeout: {{ .Values.cacheSyncTimeout | default "5m" | quote }}
leaderElect:
leaseDuration: {{ .Values.leaderElectLeaseDuration | default "15s" | quote }}
renewDeadline: {{ .Values.leaderElectRenewDeadline | default "10s" | quote }}
1 change: 1 addition & 0 deletions charts/kubefed/values.yaml
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ controllermanager:
enabled: true
clusterAvailableDelay:
clusterUnavailableDelay:
cacheSyncTimeout:
leaderElectLeaseDuration:
leaderElectRenewDeadline:
leaderElectRetryPeriod:
1 change: 1 addition & 0 deletions cmd/controller-manager/app/controller-manager.go
Original file line number Diff line number Diff line change
@@ -364,6 +364,7 @@ func setOptionsByKubeFedConfig(opts *options.Options) {

opts.Config.ClusterAvailableDelay = spec.ControllerDuration.AvailableDelay.Duration
opts.Config.ClusterUnavailableDelay = spec.ControllerDuration.UnavailableDelay.Duration
opts.Config.CacheSyncTimeout = spec.ControllerDuration.CacheSyncTimeout.Duration

opts.LeaderElection.ResourceLock = *spec.LeaderElect.ResourceLock
opts.LeaderElection.RetryPeriod = spec.LeaderElect.RetryPeriod.Duration
1 change: 1 addition & 0 deletions config/kubefedconfig.yaml
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ spec:
controllerDuration:
availableDelay: 20s
unavailableDelay: 60s
cacheSyncTimeout: 5m
leaderElect:
leaseDuration: 1500ms
renewDeadline: 1000ms
1 change: 1 addition & 0 deletions pkg/apis/core/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/apis/core/v1beta1/defaults/defaults.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ import (
const (
DefaultClusterAvailableDelay = 20 * time.Second
DefaultClusterUnavailableDelay = 60 * time.Second
DefaultCacheSyncTimeout = 5 * time.Minute

DefaultLeaderElectionLeaseDuration = 15 * time.Second
DefaultLeaderElectionRenewDeadline = 10 * time.Second
@@ -54,6 +55,7 @@ func SetDefaultKubeFedConfig(fedConfig *v1beta1.KubeFedConfig) {
duration := spec.ControllerDuration
setDuration(&duration.AvailableDelay, DefaultClusterAvailableDelay)
setDuration(&duration.UnavailableDelay, DefaultClusterUnavailableDelay)
setDuration(&duration.CacheSyncTimeout, DefaultCacheSyncTimeout)

if spec.LeaderElect == nil {
spec.LeaderElect = &v1beta1.LeaderElectConfig{}
6 changes: 6 additions & 0 deletions pkg/apis/core/v1beta1/defaults/defaults_test.go
Original file line number Diff line number Diff line change
@@ -50,6 +50,12 @@ func TestSetDefaultKubeFedConfig(t *testing.T) {
SetDefaultKubeFedConfig(modifiedUnavailableDelayKFC)
successCases["spec.controllerDuration.unavailableDelay is preserved"] = KubeFedConfigComparison{unavailableDelayKFC, modifiedUnavailableDelayKFC}

cacheSyncTimeoutKFC := defaultKubeFedConfig()
cacheSyncTimeoutKFC.Spec.ControllerDuration.CacheSyncTimeout.Duration = DefaultCacheSyncTimeout + 31*time.Second
modifiedCacheSyncTimeoutKFC := cacheSyncTimeoutKFC.DeepCopyObject().(*v1beta1.KubeFedConfig)
SetDefaultKubeFedConfig(modifiedCacheSyncTimeoutKFC)
successCases["spec.controllerDuration.cacheSyncTimeout is preserved"] = KubeFedConfigComparison{cacheSyncTimeoutKFC, modifiedCacheSyncTimeoutKFC}

// LeaderElect
leaseDurationKFC := defaultKubeFedConfig()
leaseDurationKFC.Spec.LeaderElect.LeaseDuration.Duration = DefaultLeaderElectionLeaseDuration + 11*time.Second
3 changes: 3 additions & 0 deletions pkg/apis/core/v1beta1/kubefedconfig_types.go
Original file line number Diff line number Diff line change
@@ -48,6 +48,9 @@ type DurationConfig struct {
// Time to wait before giving up on an unhealthy cluster.
// +optional
UnavailableDelay *metav1.Duration `json:"unavailableDelay,omitempty"`
// Time to wait for all caches to sync before exit.
// +optional
CacheSyncTimeout *metav1.Duration `json:"cacheSyncTimeout,omitempty"`
}
type LeaderElectConfig struct {
// The duration that non-leader candidates will wait after observing a leadership
1 change: 1 addition & 0 deletions pkg/apis/core/v1beta1/validation/validation.go
Original file line number Diff line number Diff line change
@@ -285,6 +285,7 @@ func ValidateKubeFedConfig(kubeFedConfig, oldKubeFedConfig *v1beta1.KubeFedConfi
} else {
allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("availableDelay"), duration.AvailableDelay)...)
allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("unavailableDelay"), duration.UnavailableDelay)...)
allErrs = append(allErrs, validateDurationGreaterThan0(durationPath.Child("cacheSyncTimeout"), duration.CacheSyncTimeout)...)
}

elect := spec.LeaderElect
8 changes: 8 additions & 0 deletions pkg/apis/core/v1beta1/validation/validation_test.go
Original file line number Diff line number Diff line change
@@ -732,6 +732,14 @@ func TestValidateKubeFedConfig(t *testing.T) {
invalidUnavailableDelayGreaterThan0.Spec.ControllerDuration.UnavailableDelay.Duration = 0
errorCases["spec.controllerDuration.unavailableDelay: Invalid value"] = invalidUnavailableDelayGreaterThan0

invalidCacheSyncTimeoutNil := testcommon.ValidKubeFedConfig()
invalidCacheSyncTimeoutNil.Spec.ControllerDuration.CacheSyncTimeout = nil
errorCases["spec.controllerDuration.cacheSyncTimeout: Required value"] = invalidCacheSyncTimeoutNil

invalidCacheSyncTimeoutGreaterThan0 := testcommon.ValidKubeFedConfig()
invalidCacheSyncTimeoutGreaterThan0.Spec.ControllerDuration.CacheSyncTimeout.Duration = 0
errorCases["spec.controllerDuration.cacheSyncTimeout: Invalid value"] = invalidCacheSyncTimeoutGreaterThan0

invalidLeaderElectNil := testcommon.ValidKubeFedConfig()
invalidLeaderElectNil.Spec.LeaderElect = nil
errorCases["spec.leaderElect: Required value"] = invalidLeaderElectNil
6 changes: 6 additions & 0 deletions pkg/apis/core/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 16 additions & 4 deletions pkg/controller/status/controller.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,8 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -70,6 +72,8 @@ type KubeFedStatusController struct {
clusterUnavailableDelay time.Duration
smallDelay time.Duration

cacheSyncTimeout time.Duration

typeConfig typeconfig.Interface

client genericclient.Client
@@ -116,6 +120,7 @@ func newKubeFedStatusController(controllerConfig *util.ControllerConfig, typeCon
clusterAvailableDelay: controllerConfig.ClusterAvailableDelay,
clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay,
smallDelay: time.Second * 3,
cacheSyncTimeout: controllerConfig.CacheSyncTimeout,
typeConfig: typeConfig,
client: client,
statusClient: statusClient,
@@ -195,6 +200,13 @@ func (s *KubeFedStatusController) Run(stopChan <-chan struct{}) {
}()
}

// Wait until all data stores are in sync for a definitive timeout, and returns if there is an error or a timeout.
func (s *KubeFedStatusController) waitForSync() error {
return wait.PollImmediate(util.SyncedPollPeriod, s.cacheSyncTimeout, func() (bool, error) {
return s.isSynced(), nil
})
}

// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
// synced with the corresponding api server.
func (s *KubeFedStatusController) isSynced() bool {
@@ -217,6 +229,7 @@ func (s *KubeFedStatusController) isSynced() bool {
return false
}
if !s.informer.GetTargetStore().ClustersSynced(clusters) {
klog.V(2).Info("Target clusters' informers not synced")
return false
}
return true
@@ -234,10 +247,8 @@ func (s *KubeFedStatusController) reconcileOnClusterChange() {
}

func (s *KubeFedStatusController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
defer metrics.UpdateControllerReconcileDurationFromStart("statuscontroller", time.Now())

if !s.isSynced() {
return util.StatusNotSynced
if err := s.waitForSync(); err != nil {
klog.Fatalf("failed to wait for all data stores to sync: %v", err)
}

federatedKind := s.typeConfig.GetFederatedType().Kind
@@ -248,6 +259,7 @@ func (s *KubeFedStatusController) reconcile(qualifiedName util.QualifiedName) ut
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished reconciling %v %v (duration: %v)", statusKind, key, time.Since(startTime))
metrics.UpdateControllerReconcileDurationFromStart("statuscontroller", startTime)
}()

fedObject, err := s.objFromCache(s.federatedStore, federatedKind, key)
15 changes: 13 additions & 2 deletions pkg/controller/sync/controller.go
Original file line number Diff line number Diff line change
@@ -79,6 +79,8 @@ type KubeFedSyncController struct {
clusterUnavailableDelay time.Duration
smallDelay time.Duration

cacheSyncTimeout time.Duration

typeConfig typeconfig.Interface

fedAccessor FederatedResourceAccessor
@@ -123,6 +125,7 @@ func newKubeFedSyncController(controllerConfig *util.ControllerConfig, typeConfi
clusterAvailableDelay: controllerConfig.ClusterAvailableDelay,
clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay,
smallDelay: time.Second * 3,
cacheSyncTimeout: controllerConfig.CacheSyncTimeout,
eventRecorder: recorder,
typeConfig: typeConfig,
hostClusterClient: client,
@@ -203,6 +206,13 @@ func (s *KubeFedSyncController) Run(stopChan <-chan struct{}) {
}()
}

// Wait until all data stores are in sync for a definitive timeout, and returns if there is an error or a timeout.
func (s *KubeFedSyncController) waitForSync() error {
return wait.PollImmediate(util.SyncedPollPeriod, s.cacheSyncTimeout, func() (bool, error) {
return s.isSynced(), nil
})
}

// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
// synced with the corresponding api server.
func (s *KubeFedSyncController) isSynced() bool {
@@ -223,6 +233,7 @@ func (s *KubeFedSyncController) isSynced() bool {
return false
}
if !s.informer.GetTargetStore().ClustersSynced(clusters) {
klog.V(2).Info("Target clusters' informers not synced")
return false
}
return true
@@ -240,8 +251,8 @@ func (s *KubeFedSyncController) reconcileOnClusterChange() {
}

func (s *KubeFedSyncController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
if !s.isSynced() {
return util.StatusNotSynced
if err := s.waitForSync(); err != nil {
klog.Fatalf("failed to wait for all data stores to sync: %v", err)
}

kind := s.typeConfig.GetFederatedType().Kind
6 changes: 4 additions & 2 deletions pkg/controller/util/constants.go
Original file line number Diff line number Diff line change
@@ -20,9 +20,11 @@ import (
"time"
)

// Providing 0 duration to an informer indicates that resync should be delayed as long as possible
const (
NoResyncPeriod time.Duration = 0 * time.Second
// Providing 0 duration to an informer indicates that resync should be delayed as long as possible
NoResyncPeriod = 0 * time.Second

SyncedPollPeriod = 10 * time.Second

NamespaceName = "namespaces"
NamespaceKind = "Namespace"
1 change: 1 addition & 0 deletions pkg/controller/util/controllerconfig.go
Original file line number Diff line number Diff line change
@@ -72,6 +72,7 @@ type ControllerConfig struct {
ClusterAvailableDelay time.Duration
ClusterUnavailableDelay time.Duration
MinimizeLatency bool
CacheSyncTimeout time.Duration
MaxConcurrentSyncReconciles int64
MaxConcurrentStatusReconciles int64
SkipAdoptingResources bool
35 changes: 13 additions & 22 deletions pkg/controller/util/federated_informer.go
Original file line number Diff line number Diff line change
@@ -534,33 +534,24 @@ func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string {
return key
}

// Checks whether stores for all clusters form the lists (and only these) are there and
// ClustersSynced checks whether stores for all clusters form the lists (and only these) are there and
// are synced.
func (fs *federatedStoreImpl) ClustersSynced(clusters []*fedv1b1.KubeFedCluster) bool {
// Get the list of informers to check under a lock and check it outside.
okSoFar, informersToCheck := func() (bool, []informer) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()

if len(fs.federatedInformer.targetInformers) != len(clusters) {
return false, []informer{}
}
informersToCheck := make([]informer, 0, len(clusters))
for _, cluster := range clusters {
if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found {
informersToCheck = append(informersToCheck, targetInformer)
} else {
return false, []informer{}
}
}
return true, informersToCheck
}()
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()

if !okSoFar {
if len(fs.federatedInformer.targetInformers) != len(clusters) {
klog.V(4).Infof("The number of target informers mismatch with given clusters")
return false
}
for _, informerToCheck := range informersToCheck {
if !informerToCheck.controller.HasSynced() {
for _, cluster := range clusters {
if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found {
if !targetInformer.controller.HasSynced() {
klog.V(4).Infof("Informer of cluster %q not synced", cluster.Name)
return false
}
} else {
klog.V(4).Infof("Informer of cluster %q not found", cluster.Name)
return false
}
}
1 change: 1 addition & 0 deletions test/common/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.