Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Timeout in WaitForCacheSync #894

Merged
merged 1 commit into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr
return controllerruntime.Result{Requeue: true}, err
}

if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
}

return c.buildResourceInformers(cluster)
}

Expand Down Expand Up @@ -175,16 +180,22 @@ func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1al
}

c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil {
klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name)
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for _, gvr := range gvrTargets {
if !synced[gvr] {
klog.Errorf("Informer for %s hasn't synced.", gvr)
return fmt.Errorf("informer for %s hasn't synced", gvr)

if err := func() error {
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for _, gvr := range gvrTargets {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return err
}

return nil
Expand Down
46 changes: 27 additions & 19 deletions pkg/controllers/status/cluster_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,6 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
return controllerruntime.Result{Requeue: true}, err
}

// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(cluster)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}

// init the lease controller for every cluster
c.initLeaseController(clusterInformerManager.Context(), cluster)

var currentClusterStatus = clusterv1alpha1.ClusterStatus{}

var online, healthy bool
Expand All @@ -146,9 +136,20 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
klog.V(2).Infof("Cluster(%s) still offline after retry, ensuring offline is set.", cluster.Name)
currentClusterStatus.Conditions = generateReadyCondition(false, false)
setTransitionTime(&cluster.Status, &currentClusterStatus)
c.InformerManager.Stop(cluster.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrlihanbo Hello Hanbo, how are you doing?

Can you remember the reason why stopping the informer here, in case of a cluster offline?

#2930 is now trying to solve an issue due to this change.

also, cc @Garrybest to help recall.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe because we want to re-establish the informer after the apiserver is healthy? 🤔

The reason may be not so convincible because I don't remember this line as well. 🤣

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked to @mrlihanbo, he said this is probably for disabling repetitive warning logs, especially for those clusters offline for a long time.

return c.updateStatusIfNeeded(cluster, currentClusterStatus)
}

// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(cluster)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}

// init the lease controller for every cluster
c.initLeaseController(clusterInformerManager.Context(), cluster)

clusterVersion, err := getKubernetesVersion(clusterClient)
if err != nil {
klog.Errorf("Failed to get server version of the member cluster: %v, err is : %v", cluster.Name, err)
Expand Down Expand Up @@ -227,17 +228,24 @@ func (c *ClusterStatusController) buildInformerForCluster(cluster *clusterv1alph
}

c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil {
klog.Errorf("The informer factory for cluster(%s) does not exist.", cluster.Name)
return nil, fmt.Errorf("informer factory for cluster(%s) does not exist", cluster.Name)
}
for _, gvr := range gvrs {
if !synced[gvr] {
klog.Errorf("Informer for %s hasn't synced.", gvr)
return nil, fmt.Errorf("informer for %s hasn't synced", gvr)

if err := func() error {
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for _, gvr := range gvrs {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return nil, err
}

return singleClusterInformerManager, nil
}

Expand Down
30 changes: 21 additions & 9 deletions pkg/controllers/status/workstatus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (c *WorkStatusController) Reconcile(ctx context.Context, req controllerrunt
return controllerruntime.Result{Requeue: true}, err
}

if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
}

return c.buildResourceInformers(cluster, work)
}

Expand Down Expand Up @@ -391,17 +396,24 @@ func (c *WorkStatusController) registerInformersAndStart(cluster *clusterv1alpha
}

c.InformerManager.Start(cluster.Name)
synced := c.InformerManager.WaitForCacheSync(cluster.Name)
if synced == nil {
klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name)
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for gvr := range gvrTargets {
if !synced[gvr] {
klog.Errorf("Informer for %s hasn't synced.", gvr)
return fmt.Errorf("informer for %s hasn't synced", gvr)

if err := func() error {
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, util.CacheSyncTimeout)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for gvr := range gvrTargets {
if !synced[gvr] {
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
return err
}

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/util/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package util

import "time"

const (
// ServiceNamespaceLabel is added to work object, which is report by member cluster, to specify service namespace associated with EndpointSlice.
ServiceNamespaceLabel = "endpointslice.karmada.io/namespace"
Expand Down Expand Up @@ -125,3 +127,8 @@ const (
// ContextKeyObject is the context value key of a resource.
ContextKeyObject ContextKey = "object"
)

const (
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
CacheSyncTimeout = 30 * time.Second
)
12 changes: 12 additions & 0 deletions pkg/util/informermanager/multi-cluster-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type MultiClusterInformerManager interface {
// WaitForCacheSync waits for all caches to populate.
// Should call after 'ForCluster', otherwise no-ops.
WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool

// WaitForCacheSyncWithTimeout waits for all caches to populate with a definitive timeout.
// Should call after 'ForCluster', otherwise no-ops.
WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool
}

// NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl.
Expand Down Expand Up @@ -132,3 +136,11 @@ func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string) map[s
}
return manager.WaitForCacheSync()
}

func (m *multiClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool {
manager, exist := m.getManager(cluster)
if !exist {
return nil
}
return manager.WaitForCacheSyncWithTimeout(cacheSyncTimeout)
}
16 changes: 15 additions & 1 deletion pkg/util/informermanager/single-cluster-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type SingleClusterInformerManager interface {
// WaitForCacheSync waits for all caches to populate.
WaitForCacheSync() map[schema.GroupVersionResource]bool

// WaitForCacheSyncWithTimeout waits for all caches to populate with a definitive timeout.
WaitForCacheSyncWithTimeout(cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool

// Context returns the single cluster context.
Context() context.Context
}
Expand Down Expand Up @@ -136,9 +139,20 @@ func (s *singleClusterInformerManagerImpl) Stop() {
}

func (s *singleClusterInformerManagerImpl) WaitForCacheSync() map[schema.GroupVersionResource]bool {
return s.waitForCacheSync(s.ctx)
}

func (s *singleClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool {
ctx, cancel := context.WithTimeout(s.ctx, cacheSyncTimeout)
defer cancel()

return s.waitForCacheSync(ctx)
}

func (s *singleClusterInformerManagerImpl) waitForCacheSync(ctx context.Context) map[schema.GroupVersionResource]bool {
s.lock.Lock()
defer s.lock.Unlock()
res := s.informerFactory.WaitForCacheSync(s.ctx.Done())
res := s.informerFactory.WaitForCacheSync(ctx.Done())
for resource, synced := range res {
if _, exist := s.syncedInformers[resource]; !exist && synced {
s.syncedInformers[resource] = struct{}{}
Expand Down