From 8d9887afc74a380b63b50cf97512f073c0342856 Mon Sep 17 00:00:00 2001 From: Ahmad Karimi Date: Mon, 14 Aug 2023 22:01:17 +0330 Subject: [PATCH] check cache syncs before dag rebuild Signed-off-by: Ahmad Karimi --- cmd/contour/serve.go | 2 +- internal/contour/handler.go | 57 +++++++++++++++++-------------------- 2 files changed, 27 insertions(+), 32 deletions(-) diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index b89d4bb2fd1..9167395e16e 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -1200,7 +1200,7 @@ func informOnResource(obj client.Object, handler cache.ResourceEventHandler, cac if err == nil { if er, ok := handler.(*contour.EventRecorder); ok { if eh, ok := er.Next.(*contour.EventHandler); ok { - eh.UpstreamSyncFuncs = append(eh.UpstreamSyncFuncs, resourceEventHandlerRegistration.HasSynced) + eh.UpstreamCacheSyncs = append(eh.UpstreamCacheSyncs, resourceEventHandlerRegistration.HasSynced) } } } diff --git a/internal/contour/handler.go b/internal/contour/handler.go index 8c278bed28c..c61cf443424 100644 --- a/internal/contour/handler.go +++ b/internal/contour/handler.go @@ -22,7 +22,7 @@ import ( "time" "github.com/sirupsen/logrus" - "k8s.io/client-go/tools/cache/synctrack" + "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/projectcontour/contour/internal/dag" @@ -58,29 +58,24 @@ type EventHandler struct { // an event has been received. seq int - syncTracker *synctrack.SingleFileTracker - - UpstreamSyncFuncs []func() bool + UpstreamCacheSyncs []cache.InformerSynced } func NewEventHandler(config EventHandlerConfig) *EventHandler { return &EventHandler{ - FieldLogger: config.Logger, - builder: config.Builder, - observer: config.Observer, - holdoffDelay: config.HoldoffDelay, - holdoffMaxDelay: config.HoldoffMaxDelay, - statusUpdater: config.StatusUpdater, - update: make(chan any), - sequence: make(chan int, 1), - syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: func() bool { return true }}, - UpstreamSyncFuncs: make([]func() bool, 0), + FieldLogger: config.Logger, + builder: config.Builder, + observer: config.Observer, + holdoffDelay: config.HoldoffDelay, + holdoffMaxDelay: config.HoldoffMaxDelay, + statusUpdater: config.StatusUpdater, + update: make(chan any), + sequence: make(chan int, 1), } } type opAdd struct { - obj any - isInInitialList bool + obj any } type opUpdate struct { @@ -93,9 +88,9 @@ type opDelete struct { func (e *EventHandler) OnAdd(obj any, isInInitialList bool) { if isInInitialList { - e.syncTracker.Start() + time.Sleep(time.Second * 10) } - e.update <- opAdd{obj: obj, isInInitialList: isInInitialList} + e.update <- opAdd{obj: obj} } func (e *EventHandler) OnUpdate(oldObj, newObj any) { @@ -155,7 +150,6 @@ func (e *EventHandler) Start(ctx context.Context) error { // 4. We're stopping. // // Only one of these things can happen at a time. - outerSelect: select { case op := <-e.update: if e.onUpdate(op) { @@ -178,22 +172,23 @@ func (e *EventHandler) Start(ctx context.Context) error { // not to process it. e.incSequence() } - if updateOpAdd, ok := op.(opAdd); ok { - if updateOpAdd.isInInitialList { - e.syncTracker.Finished() + case <-pending: + + // Ensure informer caches are synced + hasSynced := true + for _, syncFunc := range e.UpstreamCacheSyncs { + if !syncFunc() { + e.Warn("at least one informer cache is not synced") + hasSynced = false + break } } - case <-pending: - if !e.syncTracker.HasSynced() { - e.Info("inner cache not in sync, skipping") + + // Schedule a retry for dag rebuild + if !hasSynced { + timer.Reset(e.holdoffDelay) break } - for _, hasSynced := range e.UpstreamSyncFuncs { - if !hasSynced() { - e.Info("an upstream cache is not in sync, skipping") - break outerSelect - } - } e.WithField("last_update", time.Since(lastDAGRebuild)).WithField("outstanding", reset()).Info("performing delayed update") e.rebuildDAG()