From 4029ec6824a004d7d3a19d98f1d11a89cea63936 Mon Sep 17 00:00:00 2001 From: Antoni Zawodny Date: Mon, 22 Jul 2024 14:32:11 +0200 Subject: [PATCH] feat: List resources from the watch cache Signed-off-by: Antoni Zawodny --- pkg/cache/cluster.go | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 3f662effc..c7452de17 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -553,6 +553,7 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso } listRetry.Steps = int(c.listRetryLimit) + opts.ResourceVersion = "0" err := retry.OnError(listRetry, c.listRetryFunc, func() error { var ierr error res, ierr = resClient.List(ctx, opts) @@ -605,6 +606,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc } func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) { + timeoutSeconds := int64(c.watchResyncTimeout.Seconds()) kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) { defer func() { if r := recover(); r != nil { @@ -622,6 +624,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.TimeoutSeconds = &timeoutSeconds res, err := resClient.Watch(ctx, options) if errors.IsNotFound(err) { c.stopWatching(api.GroupKind, ns) @@ -633,17 +636,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo return err } - defer func() { - w.Stop() - resourceVersion = "" - }() - - var watchResyncTimeoutCh <-chan time.Time - if c.watchResyncTimeout > 0 { - shouldResync := time.NewTimer(c.watchResyncTimeout) - defer shouldResync.Stop() - watchResyncTimeoutCh = shouldResync.C - } + defer w.Stop() for { select { @@ -651,12 +644,9 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo case <-ctx.Done(): return nil - // re-synchronize API state and restart watch periodically - case <-watchResyncTimeoutCh: - return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host) - // re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version case <-w.Done(): + resourceVersion = "" return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.config.Host) case event, ok := <-w.ResultChan(): @@ -666,8 +656,10 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo obj, ok := event.Object.(*unstructured.Unstructured) if !ok { + resourceVersion = "" return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object) } + resourceVersion = obj.GetResourceVersion() c.processEvent(event.Type, obj) if kube.IsCRD(obj) {