From e964f21b0fdfec6632999730d728460f8bdcad0c Mon Sep 17 00:00:00 2001 From: lihanbo Date: Sat, 30 Oct 2021 16:06:32 +0800 Subject: [PATCH] add Timeout in WaitForCacheSync Signed-off-by: lihanbo --- .../mcs/service_export_controller.go | 20 +++++----- .../status/cluster_status_controller.go | 37 +++++++++---------- .../status/workstatus_controller.go | 24 +++++++----- pkg/util/constants.go | 7 ++++ .../informermanager/multi-cluster-manager.go | 27 ++++++++++++-- .../informermanager/single-cluster-manager.go | 6 +-- 6 files changed, 76 insertions(+), 45 deletions(-) diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index da91882c144a..5a41ad5b1823 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -95,6 +95,11 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr return controllerruntime.Result{Requeue: true}, err } + if !util.IsClusterReady(&cluster.Status) { + klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name) + return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) + } + return c.buildResourceInformers(cluster) } @@ -175,16 +180,11 @@ func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1al } c.InformerManager.Start(cluster.Name) - synced := c.InformerManager.WaitForCacheSync(cluster.Name) - if synced == nil { - klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name) - return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) - } - for _, gvr := range gvrTargets { - if !synced[gvr] { - klog.Errorf("Informer for %s hasn't synced.", gvr) - return fmt.Errorf("informer for %s hasn't synced", gvr) - } + + if err := c.InformerManager.WaitForCacheSyncWithTimeout(singleClusterInformerManager.Context(), util.CacheSyncTimeout, cluster.Name, gvrTargets); err != nil { + klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err) + c.InformerManager.Stop(cluster.Name) + return err } return nil diff --git a/pkg/controllers/status/cluster_status_controller.go b/pkg/controllers/status/cluster_status_controller.go index 47caf7f94d53..c02d449a5061 100644 --- a/pkg/controllers/status/cluster_status_controller.go +++ b/pkg/controllers/status/cluster_status_controller.go @@ -118,16 +118,6 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu return controllerruntime.Result{Requeue: true}, err } - // get or create informer for pods and nodes in member cluster - clusterInformerManager, err := c.buildInformerForCluster(cluster) - if err != nil { - klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err) - return controllerruntime.Result{Requeue: true}, err - } - - // init the lease controller for every cluster - c.initLeaseController(clusterInformerManager.Context(), cluster) - var currentClusterStatus = clusterv1alpha1.ClusterStatus{} var online, healthy bool @@ -146,9 +136,20 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu klog.V(2).Infof("Cluster(%s) still offline after retry, ensuring offline is set.", cluster.Name) currentClusterStatus.Conditions = generateReadyCondition(false, false) setTransitionTime(&cluster.Status, ¤tClusterStatus) + c.InformerManager.Stop(cluster.Name) return c.updateStatusIfNeeded(cluster, currentClusterStatus) } + // get or create informer for pods and nodes in member cluster + clusterInformerManager, err := c.buildInformerForCluster(cluster) + if err != nil { + klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err) + return controllerruntime.Result{Requeue: true}, err + } + + // init the lease controller for every cluster + c.initLeaseController(clusterInformerManager.Context(), cluster) + clusterVersion, err := getKubernetesVersion(clusterClient) if err != nil { klog.Errorf("Failed to get server version of the member cluster: %v, err is : %v", cluster.Name, err) @@ -227,17 +228,13 @@ func (c *ClusterStatusController) buildInformerForCluster(cluster *clusterv1alph } c.InformerManager.Start(cluster.Name) - synced := c.InformerManager.WaitForCacheSync(cluster.Name) - if synced == nil { - klog.Errorf("The informer factory for cluster(%s) does not exist.", cluster.Name) - return nil, fmt.Errorf("informer factory for cluster(%s) does not exist", cluster.Name) - } - for _, gvr := range gvrs { - if !synced[gvr] { - klog.Errorf("Informer for %s hasn't synced.", gvr) - return nil, fmt.Errorf("informer for %s hasn't synced", gvr) - } + + if err := c.InformerManager.WaitForCacheSyncWithTimeout(singleClusterInformerManager.Context(), util.CacheSyncTimeout, cluster.Name, gvrs); err != nil { + klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err) + c.InformerManager.Stop(cluster.Name) + return nil, err } + return singleClusterInformerManager, nil } diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index 8d378c195a25..0e5d67b0b98f 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -83,6 +83,11 @@ func (c *WorkStatusController) Reconcile(ctx context.Context, req controllerrunt return controllerruntime.Result{Requeue: true}, err } + if !util.IsClusterReady(&cluster.Status) { + klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name) + return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) + } + return c.buildResourceInformers(cluster, work) } @@ -391,17 +396,18 @@ func (c *WorkStatusController) registerInformersAndStart(cluster *clusterv1alpha } c.InformerManager.Start(cluster.Name) - synced := c.InformerManager.WaitForCacheSync(cluster.Name) - if synced == nil { - klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name) - return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) - } + + var gvrs []schema.GroupVersionResource for gvr := range gvrTargets { - if !synced[gvr] { - klog.Errorf("Informer for %s hasn't synced.", gvr) - return fmt.Errorf("informer for %s hasn't synced", gvr) - } + gvrs = append(gvrs, gvr) } + + if err := c.InformerManager.WaitForCacheSyncWithTimeout(singleClusterInformerManager.Context(), util.CacheSyncTimeout, cluster.Name, gvrs); err != nil { + klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err) + c.InformerManager.Stop(cluster.Name) + return err + } + return nil } diff --git a/pkg/util/constants.go b/pkg/util/constants.go index 9ae1eb535a42..6d32c02bd724 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -1,5 +1,7 @@ package util +import "time" + const ( // ServiceNamespaceLabel is added to work object, which is report by member cluster, to specify service namespace associated with EndpointSlice. ServiceNamespaceLabel = "endpointslice.karmada.io/namespace" @@ -125,3 +127,8 @@ const ( // ContextKeyObject is the context value key of a resource. ContextKeyObject ContextKey = "object" ) + +const ( + // CacheSyncTimeout refers to the time limit set on waiting for cache to sync + CacheSyncTimeout = 30 * time.Second +) diff --git a/pkg/util/informermanager/multi-cluster-manager.go b/pkg/util/informermanager/multi-cluster-manager.go index 4ca6a0344a29..a70c2b930c5a 100644 --- a/pkg/util/informermanager/multi-cluster-manager.go +++ b/pkg/util/informermanager/multi-cluster-manager.go @@ -1,6 +1,8 @@ package informermanager import ( + "context" + "fmt" "sync" "time" @@ -56,7 +58,11 @@ type MultiClusterInformerManager interface { // WaitForCacheSync waits for all caches to populate. // Should call after 'ForCluster', otherwise no-ops. - WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool + WaitForCacheSync(cluster string, stopCh <-chan struct{}) map[schema.GroupVersionResource]bool + + // WaitForCacheSyncWithTimeout waits for the cache of given resource to populate with a definitive timeout. + // Should call after 'ForCluster', otherwise no-ops. + WaitForCacheSyncWithTimeout(parentContext context.Context, cacheSyncTimeout time.Duration, cluster string, gvrTargets []schema.GroupVersionResource) error } // NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl. @@ -125,10 +131,25 @@ func (m *multiClusterInformerManagerImpl) Stop(cluster string) { delete(m.managers, cluster) } -func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool { +func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string, stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { manager, exist := m.getManager(cluster) if !exist { return nil } - return manager.WaitForCacheSync() + return manager.WaitForCacheSync(stopCh) +} + +func (m *multiClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(parentContext context.Context, cacheSyncTimeout time.Duration, cluster string, gvrTargets []schema.GroupVersionResource) error { + sourceStartCtx, cancel := context.WithTimeout(parentContext, cacheSyncTimeout) + defer cancel() + synced := m.WaitForCacheSync(cluster, sourceStartCtx.Done()) + if synced == nil { + return fmt.Errorf("no informerFactory for cluster %s exist", cluster) + } + for _, gvr := range gvrTargets { + if !synced[gvr] { + return fmt.Errorf("informer for %s hasn't synced", gvr) + } + } + return nil } diff --git a/pkg/util/informermanager/single-cluster-manager.go b/pkg/util/informermanager/single-cluster-manager.go index 57ae4a342c22..52479162fa17 100644 --- a/pkg/util/informermanager/single-cluster-manager.go +++ b/pkg/util/informermanager/single-cluster-manager.go @@ -44,7 +44,7 @@ type SingleClusterInformerManager interface { Stop() // WaitForCacheSync waits for all caches to populate. - WaitForCacheSync() map[schema.GroupVersionResource]bool + WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool // Context returns the single cluster context. Context() context.Context @@ -135,10 +135,10 @@ func (s *singleClusterInformerManagerImpl) Stop() { s.cancel() } -func (s *singleClusterInformerManagerImpl) WaitForCacheSync() map[schema.GroupVersionResource]bool { +func (s *singleClusterInformerManagerImpl) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { s.lock.Lock() defer s.lock.Unlock() - res := s.informerFactory.WaitForCacheSync(s.ctx.Done()) + res := s.informerFactory.WaitForCacheSync(stopCh) for resource, synced := range res { if _, exist := s.syncedInformers[resource]; !exist && synced { s.syncedInformers[resource] = struct{}{}