From 6087e1f80b812b50b7c49ec2a116ccf5f494e42d Mon Sep 17 00:00:00 2001 From: Hamzah Qudsi Date: Tue, 30 Aug 2022 12:26:38 -0400 Subject: [PATCH] pkg/kates: fix bug where Accumulator was not coalescing changes mid-watch For testing, we add a new test case TestBatchNoNotifyBeforeSync. Also run make format/go. Signed-off-by: Hamzah Qudsi --- pkg/kates/accumulator.go | 82 ++++++++++++++++++++--------------- pkg/kates/accumulator_test.go | 72 +++++++++++++++++++++++++++++- pkg/kates/client.go | 49 ++++++++++++--------- 3 files changed, 146 insertions(+), 57 deletions(-) diff --git a/pkg/kates/accumulator.go b/pkg/kates/accumulator.go index 5e7d977ef59..83fe0e6f22d 100644 --- a/pkg/kates/accumulator.go +++ b/pkg/kates/accumulator.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "sync" + "time" "k8s.io/apimachinery/pkg/api/meta" ) @@ -55,6 +56,9 @@ type Accumulator struct { excluded map[string]bool synced int changed chan struct{} + lastUpdateSent time.Time + maxSize int + maxInterval time.Duration mutex sync.Mutex } @@ -165,47 +169,53 @@ func newAccumulator(ctx context.Context, client *Client, queries ...Query) (*Acc client.watchRaw(ctx, q, rawUpdateCh, client.cliFor(field.mapping, q.Namespace)) } - acc := &Accumulator{client, fields, map[string]bool{}, 0, changed, sync.Mutex{}} - - // This coalesces reads from rawUpdateCh to notifications that changes are available to be - // processed. This loop along with the logic in storeField guarantees the 3 - // Goals/Requirements listed in the documentation for the Accumulator struct, i.e. Ensuring - // all Kinds are bootstrapped before any notification occurs, as well as ensuring that we - // continue to coalesce updates in the background while business logic is executing in order - // to ensure graceful load shedding. - go func() { - canSend := false - - for { - var rawUp rawUpdate - if canSend { - select { - case changed <- struct{}{}: - canSend = false - continue - case rawUp = <-rawUpdateCh: - case <-ctx.Done(): - return - } - } else { - select { - case rawUp = <-rawUpdateCh: - case <-ctx.Done(): - return - } - } + acc := &Accumulator{ + client:client, + fields:fields, + excluded: map[string]bool{}, + synced: 0, + changed: changed, + maxInterval: client.maxAccumulatorInterval, + mutex:sync.Mutex{}, + } - // Don't overwrite canSend if storeField returns false. We may not yet have - // had a chance to send a notification down the changed channel. - if acc.storeUpdate(rawUp) { - canSend = true - } - } - }() + go acc.Listen(ctx, rawUpdateCh) return acc, nil } +// This coalesces reads from rawUpdateCh to notifications that changes are available to be +// processed. This loop along with the logic in storeField guarantees the 3 +// Goals/Requirements listed in the documentation for the Accumulator struct, i.e. Ensuring +// all Kinds are bootstrapped before any notification occurs, as well as ensuring that we +// continue to coalesce updates in the background while business logic is executing in order +// to ensure graceful load shedding. +func (a *Accumulator) Listen(ctx context.Context, rawUpdateCh chan rawUpdate) { + ticker := time.NewTicker(100 * time.Millisecond) + for { + select { + case rawUp := <-rawUpdateCh: + synced := a.storeUpdate(rawUp) + since := rawUp.ts.Sub(a.lastUpdateSent) + a.maybeNotify(synced, since) + case <-ticker.C: + synced := a.synced >= len(a.fields) + since := time.Now().Sub(a.lastUpdateSent) + a.maybeNotify(synced, since) + case <-ctx.Done(): + ticker.Stop() + return + } + } +} + +func (a *Accumulator) maybeNotify(synced bool, since time.Duration) { + if (synced && since >= a.maxInterval) { + a.changed <- struct{}{} + a.lastUpdateSent = time.Now() + } +} + func (a *Accumulator) Changed() <-chan struct{} { return a.changed } diff --git a/pkg/kates/accumulator_test.go b/pkg/kates/accumulator_test.go index d990fcc7b9f..768a1514a69 100644 --- a/pkg/kates/accumulator_test.go +++ b/pkg/kates/accumulator_test.go @@ -2,6 +2,7 @@ package kates import ( "fmt" + "sync" "testing" "time" @@ -18,7 +19,7 @@ type Snap struct { func TestBootstrapNoNotifyBeforeSync(t *testing.T) { // Create a set of 10 configmaps to give us some resources to watch. ctx, cli := testClient(t, nil) - var cms [10]*ConfigMap + var cms [10]*ConfigMap for i := 0; i < 10; i++ { cm := &ConfigMap{ TypeMeta: TypeMeta{ @@ -92,3 +93,72 @@ func TestBootstrapNotifyEvenOnEmptyWatch(t *testing.T) { // that satisfy the selector, the ConfigMaps field should be empty. assert.Equal(t, 0, len(snap.ConfigMaps)) } + +// Make sure we coalesce raw changes before sending an update when a batch of resources +// are created/modified in quick succession. +func TestBatchNoNotifyBeforeSync(t *testing.T) { + ctx, cli := testClient(t, nil) + // Set a long interval to make sure all changes are batched before sending. + cli.MaxAccumulatorInterval(10 * time.Second) + acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-batch"}) + require.NoError(t, err) + + snap := &Snap{} + + // Listen for changes from the Accumulator. Here it will listen for only 2 updates + // The first update should be the one sent during bootstrap. No resources should have changed + // in this update. The second update should contain resource changes. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + numUpdates := 0 + for { + <-acc.Changed() + updated, err := acc.Update(ctx, snap) + require.NoError(t, err) + if updated { + numUpdates++ + } + if numUpdates == 2 { + break + } + } + }() + + // Use a separate client to create resources to avoid any potential uses of the cache + _, cli2 := testClient(t, nil) + + // Create a set of 10 Configmaps after the Accumulator is watching to simulate getting + // a bunch of resources at once mid-watch. + var cms [10]*ConfigMap + for i := 0; i < 10; i++ { + cm := &ConfigMap{ + TypeMeta: TypeMeta{ + Kind: "ConfigMap", + }, + ObjectMeta: ObjectMeta{ + Name: fmt.Sprintf("test-batch-%d", i), + Labels: map[string]string{ + "test": "test-batch", + }, + }, + } + err := cli2.Upsert(ctx, cm, cm, &cm) + require.NoError(t, err) + cms[i] = cm + } + wg.Wait() + + // After receiving 2 updates from the Accumulator, we should have 10 ConfigMaps + // in our Snapshot due to the Accumulator coalescing changes before sending an update. + assert.Equal(t, 10, len(snap.ConfigMaps)) + + t.Cleanup(func() { + for _, cm := range cms { + if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) { + t.Error(err) + } + } + }) +} diff --git a/pkg/kates/client.go b/pkg/kates/client.go index 8c89aafc60d..fdb3c4f5fe6 100644 --- a/pkg/kates/client.go +++ b/pkg/kates/client.go @@ -77,12 +77,13 @@ import ( // 2. The Accumulator API is guaranteed to bootstrap (i.e. perform an initial List operation) on // all watches prior to notifying the user that resources are available to process. type Client struct { - config *ConfigFlags - cli dynamic.Interface - mapper meta.RESTMapper - disco discovery.CachedDiscoveryInterface - mutex sync.Mutex - canonical map[string]*Unstructured + config *ConfigFlags + cli dynamic.Interface + mapper meta.RESTMapper + disco discovery.CachedDiscoveryInterface + mutex sync.Mutex + canonical map[string]*Unstructured + maxAccumulatorInterval time.Duration // This is an internal interface for testing, it lets us deliberately introduce delays into the // implementation, e.g. effectively increasing the latency to the api server in a controllable @@ -149,14 +150,15 @@ func NewClientFromConfigFlags(config *ConfigFlags) (*Client, error) { } return &Client{ - config: config, - cli: cli, - mapper: mapper, - disco: disco, - canonical: make(map[string]*Unstructured), - watchAdded: func(oldObj, newObj *Unstructured) {}, - watchUpdated: func(oldObj, newObj *Unstructured) {}, - watchDeleted: func(oldObj, newObj *Unstructured) {}, + config: config, + cli: cli, + mapper: mapper, + disco: disco, + canonical: make(map[string]*Unstructured), + maxAccumulatorInterval: 5 * time.Second, + watchAdded: func(oldObj, newObj *Unstructured) {}, + watchUpdated: func(oldObj, newObj *Unstructured) {}, + watchDeleted: func(oldObj, newObj *Unstructured) {}, }, nil } @@ -211,6 +213,12 @@ func InCluster() bool { err == nil && !fi.IsDir() } +func (c *Client) MaxAccumulatorInterval(interval time.Duration) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.maxAccumulatorInterval = interval +} + // DynamicInterface is an accessor method to the k8s dynamic client func (c *Client) DynamicInterface() dynamic.Interface { return c.cli @@ -371,7 +379,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat // resource instances of the kind being watched lw := newListWatcher(ctx, cli, query, func(lw *lw) { if lw.hasSynced() { - target <- rawUpdate{query.Name, true, nil, nil} + target <- rawUpdate{query.Name, true, nil, nil, time.Now()} } }) informer = cache.NewSharedInformer(lw, &Unstructured{}, 5*time.Minute) @@ -410,7 +418,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat // better/faster tests. c.watchAdded(nil, obj.(*Unstructured)) lw.countAddEvent() - target <- rawUpdate{query.Name, lw.hasSynced(), nil, obj.(*Unstructured)} + target <- rawUpdate{query.Name, lw.hasSynced(), nil, obj.(*Unstructured), time.Now()} }, UpdateFunc: func(oldObj, newObj interface{}) { old := oldObj.(*Unstructured) @@ -421,7 +429,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat // nicer prettier set of hooks, but for now all we need is this hack for // better/faster tests. c.watchUpdated(old, new) - target <- rawUpdate{query.Name, lw.hasSynced(), old, new} + target <- rawUpdate{query.Name, lw.hasSynced(), old, new, time.Now()} }, DeleteFunc: func(obj interface{}) { var old *Unstructured @@ -444,7 +452,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat c.mutex.Lock() delete(c.canonical, key) c.mutex.Unlock() - target <- rawUpdate{query.Name, lw.hasSynced(), old, nil} + target <- rawUpdate{query.Name, lw.hasSynced(), old, nil, time.Now()} }, }, ) @@ -457,7 +465,8 @@ type rawUpdate struct { synced bool old *unstructured.Unstructured new *unstructured.Unstructured -} + ts time.Time +} type lw struct { // All these fields are read-only and initialized on construction. @@ -499,7 +508,7 @@ func (lw *lw) countAddEvent() { // dispatched to consider things synced at the dispatch layer. // // So to track syncedness properly for our users, when we do our first List() we remember how many -// resourcees there are and we do not consider ourselves synced until we have dispatched at least as +// resources there are and we do not consider ourselves synced until we have dispatched at least as // many Add events as there are resources. func (lw *lw) hasSynced() (result bool) { lw.withMutex(func() {