Skip to content

Commit

Permalink
add Timeout in WaitForCacheSync
Browse files Browse the repository at this point in the history
Signed-off-by: lihanbo <[email protected]>
  • Loading branch information
mrlihanbo committed Nov 4, 2021
1 parent c0e293e commit e964f21
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 45 deletions.
20 changes: 10 additions & 10 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr
return controllerruntime.Result{Requeue: true}, err
}

if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
}

return c.buildResourceInformers(cluster)
}

Expand Down Expand Up @@ -175,16 +180,11 @@ func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1al
}

c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil {
klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name)
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for _, gvr := range gvrTargets {
if !synced[gvr] {
klog.Errorf("Informer for %s hasn't synced.", gvr)
return fmt.Errorf("informer for %s hasn't synced", gvr)
}

if err := c.InformerManager.WaitForCacheSyncWithTimeout(singleClusterInformerManager.Context(), util.CacheSyncTimeout, cluster.Name, gvrTargets); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return err
}

return nil
Expand Down
37 changes: 17 additions & 20 deletions pkg/controllers/status/cluster_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,6 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
return controllerruntime.Result{Requeue: true}, err
}

// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(cluster)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}

// init the lease controller for every cluster
c.initLeaseController(clusterInformerManager.Context(), cluster)

var currentClusterStatus = clusterv1alpha1.ClusterStatus{}

var online, healthy bool
Expand All @@ -146,9 +136,20 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
klog.V(2).Infof("Cluster(%s) still offline after retry, ensuring offline is set.", cluster.Name)
currentClusterStatus.Conditions = generateReadyCondition(false, false)
setTransitionTime(&cluster.Status, &currentClusterStatus)
c.InformerManager.Stop(cluster.Name)
return c.updateStatusIfNeeded(cluster, currentClusterStatus)
}

// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(cluster)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}

// init the lease controller for every cluster
c.initLeaseController(clusterInformerManager.Context(), cluster)

clusterVersion, err := getKubernetesVersion(clusterClient)
if err != nil {
klog.Errorf("Failed to get server version of the member cluster: %v, err is : %v", cluster.Name, err)
Expand Down Expand Up @@ -227,17 +228,13 @@ func (c *ClusterStatusController) buildInformerForCluster(cluster *clusterv1alph
}

c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil {
klog.Errorf("The informer factory for cluster(%s) does not exist.", cluster.Name)
return nil, fmt.Errorf("informer factory for cluster(%s) does not exist", cluster.Name)
}
for _, gvr := range gvrs {
if !synced[gvr] {
klog.Errorf("Informer for %s hasn't synced.", gvr)
return nil, fmt.Errorf("informer for %s hasn't synced", gvr)
}

if err := c.InformerManager.WaitForCacheSyncWithTimeout(singleClusterInformerManager.Context(), util.CacheSyncTimeout, cluster.Name, gvrs); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return nil, err
}

return singleClusterInformerManager, nil
}

Expand Down
24 changes: 15 additions & 9 deletions pkg/controllers/status/workstatus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (c *WorkStatusController) Reconcile(ctx context.Context, req controllerrunt
return controllerruntime.Result{Requeue: true}, err
}

if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
}

return c.buildResourceInformers(cluster, work)
}

Expand Down Expand Up @@ -391,17 +396,18 @@ func (c *WorkStatusController) registerInformersAndStart(cluster *clusterv1alpha
}

c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil {
klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name)
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}

var gvrs []schema.GroupVersionResource
for gvr := range gvrTargets {
if !synced[gvr] {
klog.Errorf("Informer for %s hasn't synced.", gvr)
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
gvrs = append(gvrs, gvr)
}

if err := c.InformerManager.WaitForCacheSyncWithTimeout(singleClusterInformerManager.Context(), util.CacheSyncTimeout, cluster.Name, gvrs); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return err
}

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/util/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package util

import "time"

const (
// ServiceNamespaceLabel is added to work object, which is report by member cluster, to specify service namespace associated with EndpointSlice.
ServiceNamespaceLabel = "endpointslice.karmada.io/namespace"
Expand Down Expand Up @@ -125,3 +127,8 @@ const (
// ContextKeyObject is the context value key of a resource.
ContextKeyObject ContextKey = "object"
)

const (
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
CacheSyncTimeout = 30 * time.Second
)
27 changes: 24 additions & 3 deletions pkg/util/informermanager/multi-cluster-manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package informermanager

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -56,7 +58,11 @@ type MultiClusterInformerManager interface {

// WaitForCacheSync waits for all caches to populate.
// Should call after 'ForCluster', otherwise no-ops.
WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool
WaitForCacheSync(cluster string, stopCh <-chan struct{}) map[schema.GroupVersionResource]bool

// WaitForCacheSyncWithTimeout waits for the cache of given resource to populate with a definitive timeout.
// Should call after 'ForCluster', otherwise no-ops.
WaitForCacheSyncWithTimeout(parentContext context.Context, cacheSyncTimeout time.Duration, cluster string, gvrTargets []schema.GroupVersionResource) error
}

// NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl.
Expand Down Expand Up @@ -125,10 +131,25 @@ func (m *multiClusterInformerManagerImpl) Stop(cluster string) {
delete(m.managers, cluster)
}

func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool {
func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string, stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
manager, exist := m.getManager(cluster)
if !exist {
return nil
}
return manager.WaitForCacheSync()
return manager.WaitForCacheSync(stopCh)
}

func (m *multiClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(parentContext context.Context, cacheSyncTimeout time.Duration, cluster string, gvrTargets []schema.GroupVersionResource) error {
sourceStartCtx, cancel := context.WithTimeout(parentContext, cacheSyncTimeout)
defer cancel()
synced := m.WaitForCacheSync(cluster, sourceStartCtx.Done())
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster)
}
for _, gvr := range gvrTargets {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}
6 changes: 3 additions & 3 deletions pkg/util/informermanager/single-cluster-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type SingleClusterInformerManager interface {
Stop()

// WaitForCacheSync waits for all caches to populate.
WaitForCacheSync() map[schema.GroupVersionResource]bool
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool

// Context returns the single cluster context.
Context() context.Context
Expand Down Expand Up @@ -135,10 +135,10 @@ func (s *singleClusterInformerManagerImpl) Stop() {
s.cancel()
}

func (s *singleClusterInformerManagerImpl) WaitForCacheSync() map[schema.GroupVersionResource]bool {
func (s *singleClusterInformerManagerImpl) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
s.lock.Lock()
defer s.lock.Unlock()
res := s.informerFactory.WaitForCacheSync(s.ctx.Done())
res := s.informerFactory.WaitForCacheSync(stopCh)
for resource, synced := range res {
if _, exist := s.syncedInformers[resource]; !exist && synced {
s.syncedInformers[resource] = struct{}{}
Expand Down

0 comments on commit e964f21

Please sign in to comment.