Skip to content

Commit

Permalink
check cache syncs before dag rebuild
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmad Karimi <[email protected]>
  • Loading branch information
therealak12 committed Aug 14, 2023
1 parent c3b5e6f commit 8d9887a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 32 deletions.
2 changes: 1 addition & 1 deletion cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ func informOnResource(obj client.Object, handler cache.ResourceEventHandler, cac
if err == nil {
if er, ok := handler.(*contour.EventRecorder); ok {
if eh, ok := er.Next.(*contour.EventHandler); ok {
eh.UpstreamSyncFuncs = append(eh.UpstreamSyncFuncs, resourceEventHandlerRegistration.HasSynced)
eh.UpstreamCacheSyncs = append(eh.UpstreamCacheSyncs, resourceEventHandlerRegistration.HasSynced)
}
}
}
Expand Down
57 changes: 26 additions & 31 deletions internal/contour/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"time"

"github.com/sirupsen/logrus"
"k8s.io/client-go/tools/cache/synctrack"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/projectcontour/contour/internal/dag"
Expand Down Expand Up @@ -58,29 +58,24 @@ type EventHandler struct {
// an event has been received.
seq int

syncTracker *synctrack.SingleFileTracker

UpstreamSyncFuncs []func() bool
UpstreamCacheSyncs []cache.InformerSynced
}

func NewEventHandler(config EventHandlerConfig) *EventHandler {
return &EventHandler{
FieldLogger: config.Logger,
builder: config.Builder,
observer: config.Observer,
holdoffDelay: config.HoldoffDelay,
holdoffMaxDelay: config.HoldoffMaxDelay,
statusUpdater: config.StatusUpdater,
update: make(chan any),
sequence: make(chan int, 1),
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: func() bool { return true }},
UpstreamSyncFuncs: make([]func() bool, 0),
FieldLogger: config.Logger,
builder: config.Builder,
observer: config.Observer,
holdoffDelay: config.HoldoffDelay,
holdoffMaxDelay: config.HoldoffMaxDelay,
statusUpdater: config.StatusUpdater,
update: make(chan any),
sequence: make(chan int, 1),
}
}

type opAdd struct {
obj any
isInInitialList bool
obj any
}

type opUpdate struct {
Expand All @@ -93,9 +88,9 @@ type opDelete struct {

func (e *EventHandler) OnAdd(obj any, isInInitialList bool) {
if isInInitialList {
e.syncTracker.Start()
time.Sleep(time.Second * 10)
}
e.update <- opAdd{obj: obj, isInInitialList: isInInitialList}
e.update <- opAdd{obj: obj}
}

func (e *EventHandler) OnUpdate(oldObj, newObj any) {
Expand Down Expand Up @@ -155,7 +150,6 @@ func (e *EventHandler) Start(ctx context.Context) error {
// 4. We're stopping.
//
// Only one of these things can happen at a time.
outerSelect:
select {
case op := <-e.update:
if e.onUpdate(op) {
Expand All @@ -178,22 +172,23 @@ func (e *EventHandler) Start(ctx context.Context) error {
// not to process it.
e.incSequence()
}
if updateOpAdd, ok := op.(opAdd); ok {
if updateOpAdd.isInInitialList {
e.syncTracker.Finished()
case <-pending:

// Ensure informer caches are synced
hasSynced := true
for _, syncFunc := range e.UpstreamCacheSyncs {
if !syncFunc() {
e.Warn("at least one informer cache is not synced")
hasSynced = false
break
}
}
case <-pending:
if !e.syncTracker.HasSynced() {
e.Info("inner cache not in sync, skipping")

// Schedule a retry for dag rebuild
if !hasSynced {
timer.Reset(e.holdoffDelay)
break
}
for _, hasSynced := range e.UpstreamSyncFuncs {
if !hasSynced() {
e.Info("an upstream cache is not in sync, skipping")
break outerSelect
}
}

e.WithField("last_update", time.Since(lastDAGRebuild)).WithField("outstanding", reset()).Info("performing delayed update")
e.rebuildDAG()
Expand Down

0 comments on commit 8d9887a

Please sign in to comment.