diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index da91882c144a..60aab9237520 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -95,6 +95,11 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr return controllerruntime.Result{Requeue: true}, err } + if !util.IsClusterReady(&cluster.Status) { + klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name) + return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) + } + return c.buildResourceInformers(cluster) } @@ -175,16 +180,22 @@ func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1al } c.InformerManager.Start(cluster.Name) - synced := c.InformerManager.WaitForCacheSync(cluster.Name) - if synced == nil { - klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name) - return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) - } - for _, gvr := range gvrTargets { - if !synced[gvr] { - klog.Errorf("Informer for %s hasn't synced.", gvr) - return fmt.Errorf("informer for %s hasn't synced", gvr) + + if err := func() error { + synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout) + if synced == nil { + return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) } + for _, gvr := range gvrTargets { + if !synced[gvr] { + return fmt.Errorf("informer for %s hasn't synced", gvr) + } + } + return nil + }(); err != nil { + klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err) + c.InformerManager.Stop(cluster.Name) + return err } return nil diff --git a/pkg/controllers/status/cluster_status_controller.go b/pkg/controllers/status/cluster_status_controller.go index 47caf7f94d53..4bfc42adbac0 100644 --- a/pkg/controllers/status/cluster_status_controller.go +++ b/pkg/controllers/status/cluster_status_controller.go @@ -118,16 +118,6 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu return controllerruntime.Result{Requeue: true}, err } - // get or create informer for pods and nodes in member cluster - clusterInformerManager, err := c.buildInformerForCluster(cluster) - if err != nil { - klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err) - return controllerruntime.Result{Requeue: true}, err - } - - // init the lease controller for every cluster - c.initLeaseController(clusterInformerManager.Context(), cluster) - var currentClusterStatus = clusterv1alpha1.ClusterStatus{} var online, healthy bool @@ -146,9 +136,20 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu klog.V(2).Infof("Cluster(%s) still offline after retry, ensuring offline is set.", cluster.Name) currentClusterStatus.Conditions = generateReadyCondition(false, false) setTransitionTime(&cluster.Status, ¤tClusterStatus) + c.InformerManager.Stop(cluster.Name) return c.updateStatusIfNeeded(cluster, currentClusterStatus) } + // get or create informer for pods and nodes in member cluster + clusterInformerManager, err := c.buildInformerForCluster(cluster) + if err != nil { + klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err) + return controllerruntime.Result{Requeue: true}, err + } + + // init the lease controller for every cluster + c.initLeaseController(clusterInformerManager.Context(), cluster) + clusterVersion, err := getKubernetesVersion(clusterClient) if err != nil { klog.Errorf("Failed to get server version of the member cluster: %v, err is : %v", cluster.Name, err) @@ -227,17 +228,24 @@ func (c *ClusterStatusController) buildInformerForCluster(cluster *clusterv1alph } c.InformerManager.Start(cluster.Name) - synced := c.InformerManager.WaitForCacheSync(cluster.Name) - if synced == nil { - klog.Errorf("The informer factory for cluster(%s) does not exist.", cluster.Name) - return nil, fmt.Errorf("informer factory for cluster(%s) does not exist", cluster.Name) - } - for _, gvr := range gvrs { - if !synced[gvr] { - klog.Errorf("Informer for %s hasn't synced.", gvr) - return nil, fmt.Errorf("informer for %s hasn't synced", gvr) + + if err := func() error { + synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout) + if synced == nil { + return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) + } + for _, gvr := range gvrs { + if !synced[gvr] { + return fmt.Errorf("informer for %s hasn't synced", gvr) + } } + return nil + }(); err != nil { + klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err) + c.InformerManager.Stop(cluster.Name) + return nil, err } + return singleClusterInformerManager, nil } diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index 6f29d2c89f07..f03eccba28ee 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -83,6 +83,11 @@ func (c *WorkStatusController) Reconcile(ctx context.Context, req controllerrunt return controllerruntime.Result{Requeue: true}, err } + if !util.IsClusterReady(&cluster.Status) { + klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name) + return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) + } + return c.buildResourceInformers(cluster, work) } @@ -391,17 +396,24 @@ func (c *WorkStatusController) registerInformersAndStart(cluster *clusterv1alpha } c.InformerManager.Start(cluster.Name) - synced := c.InformerManager.WaitForCacheSync(cluster.Name) - if synced == nil { - klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name) - return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) - } - for gvr := range gvrTargets { - if !synced[gvr] { - klog.Errorf("Informer for %s hasn't synced.", gvr) - return fmt.Errorf("informer for %s hasn't synced", gvr) + + if err := func() error { + synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout) + if synced == nil { + return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) } + for gvr := range gvrTargets { + if !synced[gvr] { + return fmt.Errorf("informer for %s hasn't synced", gvr) + } + } + return nil + }(); err != nil { + klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err) + c.InformerManager.Stop(cluster.Name) + return err } + return nil } diff --git a/pkg/util/constants.go b/pkg/util/constants.go index 9ae1eb535a42..6d32c02bd724 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -1,5 +1,7 @@ package util +import "time" + const ( // ServiceNamespaceLabel is added to work object, which is report by member cluster, to specify service namespace associated with EndpointSlice. ServiceNamespaceLabel = "endpointslice.karmada.io/namespace" @@ -125,3 +127,8 @@ const ( // ContextKeyObject is the context value key of a resource. ContextKeyObject ContextKey = "object" ) + +const ( + // CacheSyncTimeout refers to the time limit set on waiting for cache to sync + CacheSyncTimeout = 30 * time.Second +) diff --git a/pkg/util/informermanager/multi-cluster-manager.go b/pkg/util/informermanager/multi-cluster-manager.go index 4ca6a0344a29..b2e9a936e28f 100644 --- a/pkg/util/informermanager/multi-cluster-manager.go +++ b/pkg/util/informermanager/multi-cluster-manager.go @@ -57,6 +57,10 @@ type MultiClusterInformerManager interface { // WaitForCacheSync waits for all caches to populate. // Should call after 'ForCluster', otherwise no-ops. WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool + + // WaitForCacheSyncWithTimeout waits for all caches to populate with a definitive timeout. + // Should call after 'ForCluster', otherwise no-ops. + WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool } // NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl. @@ -132,3 +136,11 @@ func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string) map[s } return manager.WaitForCacheSync() } + +func (m *multiClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool { + manager, exist := m.getManager(cluster) + if !exist { + return nil + } + return manager.WaitForCacheSyncWithTimeout(cacheSyncTimeout) +} diff --git a/pkg/util/informermanager/single-cluster-manager.go b/pkg/util/informermanager/single-cluster-manager.go index 57ae4a342c22..48993769bcd4 100644 --- a/pkg/util/informermanager/single-cluster-manager.go +++ b/pkg/util/informermanager/single-cluster-manager.go @@ -46,6 +46,9 @@ type SingleClusterInformerManager interface { // WaitForCacheSync waits for all caches to populate. WaitForCacheSync() map[schema.GroupVersionResource]bool + // WaitForCacheSyncWithTimeout waits for all caches to populate with a definitive timeout. + WaitForCacheSyncWithTimeout(cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool + // Context returns the single cluster context. Context() context.Context } @@ -136,9 +139,20 @@ func (s *singleClusterInformerManagerImpl) Stop() { } func (s *singleClusterInformerManagerImpl) WaitForCacheSync() map[schema.GroupVersionResource]bool { + return s.waitForCacheSync(s.ctx) +} + +func (s *singleClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool { + ctx, cancel := context.WithTimeout(s.ctx, cacheSyncTimeout) + defer cancel() + + return s.waitForCacheSync(ctx) +} + +func (s *singleClusterInformerManagerImpl) waitForCacheSync(ctx context.Context) map[schema.GroupVersionResource]bool { s.lock.Lock() defer s.lock.Unlock() - res := s.informerFactory.WaitForCacheSync(s.ctx.Done()) + res := s.informerFactory.WaitForCacheSync(ctx.Done()) for resource, synced := range res { if _, exist := s.syncedInformers[resource]; !exist && synced { s.syncedInformers[resource] = struct{}{}