Skip to content

Commit

Permalink
Merge pull request #894 from mrlihanbo/cluster_status_bugfix
Browse files Browse the repository at this point in the history
add Timeout in WaitForCacheSync
  • Loading branch information
karmada-bot authored Nov 5, 2021
2 parents f7a8a5f + ac3878e commit 51c911a
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 38 deletions.
29 changes: 20 additions & 9 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr
return controllerruntime.Result{Requeue: true}, err
}

if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
}

return c.buildResourceInformers(cluster)
}

Expand Down Expand Up @@ -175,16 +180,22 @@ func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1al
}

c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil {
klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name)
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for _, gvr := range gvrTargets {
if !synced[gvr] {
klog.Errorf("Informer for %s hasn't synced.", gvr)
return fmt.Errorf("informer for %s hasn't synced", gvr)

if err := func() error {
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for _, gvr := range gvrTargets {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return err
}

return nil
Expand Down
46 changes: 27 additions & 19 deletions pkg/controllers/status/cluster_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,6 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
return controllerruntime.Result{Requeue: true}, err
}

// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(cluster)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}

// init the lease controller for every cluster
c.initLeaseController(clusterInformerManager.Context(), cluster)

var currentClusterStatus = clusterv1alpha1.ClusterStatus{}

var online, healthy bool
Expand All @@ -146,9 +136,20 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
klog.V(2).Infof("Cluster(%s) still offline after retry, ensuring offline is set.", cluster.Name)
currentClusterStatus.Conditions = generateReadyCondition(false, false)
setTransitionTime(&cluster.Status, &currentClusterStatus)
c.InformerManager.Stop(cluster.Name)
return c.updateStatusIfNeeded(cluster, currentClusterStatus)
}

// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(cluster)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}

// init the lease controller for every cluster
c.initLeaseController(clusterInformerManager.Context(), cluster)

clusterVersion, err := getKubernetesVersion(clusterClient)
if err != nil {
klog.Errorf("Failed to get server version of the member cluster: %v, err is : %v", cluster.Name, err)
Expand Down Expand Up @@ -227,17 +228,24 @@ func (c *ClusterStatusController) buildInformerForCluster(cluster *clusterv1alph
}

c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil {
klog.Errorf("The informer factory for cluster(%s) does not exist.", cluster.Name)
return nil, fmt.Errorf("informer factory for cluster(%s) does not exist", cluster.Name)
}
for _, gvr := range gvrs {
if !synced[gvr] {
klog.Errorf("Informer for %s hasn't synced.", gvr)
return nil, fmt.Errorf("informer for %s hasn't synced", gvr)

if err := func() error {
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for _, gvr := range gvrs {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return nil, err
}

return singleClusterInformerManager, nil
}

Expand Down
30 changes: 21 additions & 9 deletions pkg/controllers/status/workstatus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (c *WorkStatusController) Reconcile(ctx context.Context, req controllerrunt
return controllerruntime.Result{Requeue: true}, err
}

if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
}

return c.buildResourceInformers(cluster, work)
}

Expand Down Expand Up @@ -391,17 +396,24 @@ func (c *WorkStatusController) registerInformersAndStart(cluster *clusterv1alpha
}

c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil {
klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name)
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for gvr := range gvrTargets {
if !synced[gvr] {
klog.Errorf("Informer for %s hasn't synced.", gvr)
return fmt.Errorf("informer for %s hasn't synced", gvr)

if err := func() error {
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for gvr := range gvrTargets {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return err
}

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/util/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package util

import "time"

const (
// ServiceNamespaceLabel is added to work object, which is report by member cluster, to specify service namespace associated with EndpointSlice.
ServiceNamespaceLabel = "endpointslice.karmada.io/namespace"
Expand Down Expand Up @@ -125,3 +127,8 @@ const (
// ContextKeyObject is the context value key of a resource.
ContextKeyObject ContextKey = "object"
)

const (
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
CacheSyncTimeout = 30 * time.Second
)
12 changes: 12 additions & 0 deletions pkg/util/informermanager/multi-cluster-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type MultiClusterInformerManager interface {
// WaitForCacheSync waits for all caches to populate.
// Should call after 'ForCluster', otherwise no-ops.
WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool

// WaitForCacheSyncWithTimeout waits for all caches to populate with a definitive timeout.
// Should call after 'ForCluster', otherwise no-ops.
WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool
}

// NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl.
Expand Down Expand Up @@ -132,3 +136,11 @@ func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string) map[s
}
return manager.WaitForCacheSync()
}

func (m *multiClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool {
manager, exist := m.getManager(cluster)
if !exist {
return nil
}
return manager.WaitForCacheSyncWithTimeout(cacheSyncTimeout)
}
16 changes: 15 additions & 1 deletion pkg/util/informermanager/single-cluster-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type SingleClusterInformerManager interface {
// WaitForCacheSync waits for all caches to populate.
WaitForCacheSync() map[schema.GroupVersionResource]bool

// WaitForCacheSyncWithTimeout waits for all caches to populate with a definitive timeout.
WaitForCacheSyncWithTimeout(cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool

// Context returns the single cluster context.
Context() context.Context
}
Expand Down Expand Up @@ -136,9 +139,20 @@ func (s *singleClusterInformerManagerImpl) Stop() {
}

func (s *singleClusterInformerManagerImpl) WaitForCacheSync() map[schema.GroupVersionResource]bool {
return s.waitForCacheSync(s.ctx)
}

func (s *singleClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool {
ctx, cancel := context.WithTimeout(s.ctx, cacheSyncTimeout)
defer cancel()

return s.waitForCacheSync(ctx)
}

func (s *singleClusterInformerManagerImpl) waitForCacheSync(ctx context.Context) map[schema.GroupVersionResource]bool {
s.lock.Lock()
defer s.lock.Unlock()
res := s.informerFactory.WaitForCacheSync(s.ctx.Done())
res := s.informerFactory.WaitForCacheSync(ctx.Done())
for resource, synced := range res {
if _, exist := s.syncedInformers[resource]; !exist && synced {
s.syncedInformers[resource] = struct{}{}
Expand Down

0 comments on commit 51c911a

Please sign in to comment.