From 5271ca9fa86b9a9fa6383ed3869b234331bfc181 Mon Sep 17 00:00:00 2001 From: Hamzah Qudsi Date: Tue, 30 Aug 2022 12:26:38 -0400 Subject: [PATCH] pkg/kates: batch raw updates for a certain interval before sending changes The Accumulator struct attempts to coalece changes into a single snapshot update as a way to do graceful load shedding. However, while this was the behavior on bootstrap, it didn't always happen mid-watch - each event that was received turned into a single snapshot update, thus not really satisfying this requirement. We add a new option to batch changes for a specified interval before sending a snapshot update. This allows a way to configure how aggressive load shedding can take place. The default interval is set at 0 to keep the current previous behavior. A snapshot update won't be sent until all resources are fully bootstrapped, regardless of what interval is set. This is the ensure that the other requirements for the Accumulator are still satisfied. For testing, we add a new test case TestBatchNoNotifyBeforeSync. Signed-off-by: Hamzah Qudsi --- pkg/kates/accumulator.go | 93 +++++++++++++++++++++-------------- pkg/kates/accumulator_test.go | 72 ++++++++++++++++++++++++++- pkg/kates/client.go | 49 ++++++++++-------- 3 files changed, 157 insertions(+), 57 deletions(-) diff --git a/pkg/kates/accumulator.go b/pkg/kates/accumulator.go index 5e7d977ef59..311c35e1362 100644 --- a/pkg/kates/accumulator.go +++ b/pkg/kates/accumulator.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "sync" + "time" "k8s.io/apimachinery/pkg/api/meta" ) @@ -80,6 +81,12 @@ const ( ObjectDelete ) +type changeStatus int +const ( + awaitingDispatch changeStatus = iota + dispatched +) + func (dt DeltaType) MarshalJSON() ([]byte, error) { switch dt { case ObjectAdd: @@ -165,47 +172,61 @@ func newAccumulator(ctx context.Context, client *Client, queries ...Query) (*Acc client.watchRaw(ctx, q, rawUpdateCh, client.cliFor(field.mapping, q.Namespace)) } - acc := &Accumulator{client, fields, map[string]bool{}, 0, changed, sync.Mutex{}} - - // This coalesces reads from rawUpdateCh to notifications that changes are available to be - // processed. This loop along with the logic in storeField guarantees the 3 - // Goals/Requirements listed in the documentation for the Accumulator struct, i.e. Ensuring - // all Kinds are bootstrapped before any notification occurs, as well as ensuring that we - // continue to coalesce updates in the background while business logic is executing in order - // to ensure graceful load shedding. - go func() { - canSend := false - - for { - var rawUp rawUpdate - if canSend { - select { - case changed <- struct{}{}: - canSend = false - continue - case rawUp = <-rawUpdateCh: - case <-ctx.Done(): - return - } - } else { - select { - case rawUp = <-rawUpdateCh: - case <-ctx.Done(): - return - } - } + acc := &Accumulator{ + client:client, + fields:fields, + excluded: map[string]bool{}, + synced: 0, + changed: changed, + mutex:sync.Mutex{}, + } - // Don't overwrite canSend if storeField returns false. We may not yet have - // had a chance to send a notification down the changed channel. - if acc.storeUpdate(rawUp) { - canSend = true - } - } - }() + go acc.Listen(ctx, rawUpdateCh, client.maxAccumulatorInterval) return acc, nil } +// This coalesces reads from rawUpdateCh to notifications that changes are available to be +// processed. This loop along with the logic in storeField guarantees the 3 +// Goals/Requirements listed in the documentation for the Accumulator struct, i.e. Ensuring +// all Kinds are bootstrapped before any notification occurs, as well as ensuring that we +// continue to coalesce updates in the background while business logic is executing in order +// to ensure graceful load shedding. +func (a *Accumulator) Listen(ctx context.Context, rawUpdateCh chan rawUpdate, interval time.Duration) { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + var changeStatus changeStatus + var lastChangeSent time.Time + var since time.Duration + var synced bool + + maybeNotify := func() { + if (synced && changeStatus == awaitingDispatch && since >= interval) { + a.changed <- struct{}{} + lastChangeSent = time.Now() + changeStatus = dispatched + } + } + + for { + select { + case rawUp := <-rawUpdateCh: + synced = a.storeUpdate(rawUp) + if synced { + changeStatus = awaitingDispatch + } + since = rawUp.ts.Sub(lastChangeSent) + maybeNotify() + case <-ticker.C: + synced = a.synced >= len(a.fields) + since = time.Now().Sub(lastChangeSent) + maybeNotify() + case <-ctx.Done(): + return + } + } +} + func (a *Accumulator) Changed() <-chan struct{} { return a.changed } diff --git a/pkg/kates/accumulator_test.go b/pkg/kates/accumulator_test.go index d990fcc7b9f..768a1514a69 100644 --- a/pkg/kates/accumulator_test.go +++ b/pkg/kates/accumulator_test.go @@ -2,6 +2,7 @@ package kates import ( "fmt" + "sync" "testing" "time" @@ -18,7 +19,7 @@ type Snap struct { func TestBootstrapNoNotifyBeforeSync(t *testing.T) { // Create a set of 10 configmaps to give us some resources to watch. ctx, cli := testClient(t, nil) - var cms [10]*ConfigMap + var cms [10]*ConfigMap for i := 0; i < 10; i++ { cm := &ConfigMap{ TypeMeta: TypeMeta{ @@ -92,3 +93,72 @@ func TestBootstrapNotifyEvenOnEmptyWatch(t *testing.T) { // that satisfy the selector, the ConfigMaps field should be empty. assert.Equal(t, 0, len(snap.ConfigMaps)) } + +// Make sure we coalesce raw changes before sending an update when a batch of resources +// are created/modified in quick succession. +func TestBatchNoNotifyBeforeSync(t *testing.T) { + ctx, cli := testClient(t, nil) + // Set a long interval to make sure all changes are batched before sending. + cli.MaxAccumulatorInterval(10 * time.Second) + acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-batch"}) + require.NoError(t, err) + + snap := &Snap{} + + // Listen for changes from the Accumulator. Here it will listen for only 2 updates + // The first update should be the one sent during bootstrap. No resources should have changed + // in this update. The second update should contain resource changes. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + numUpdates := 0 + for { + <-acc.Changed() + updated, err := acc.Update(ctx, snap) + require.NoError(t, err) + if updated { + numUpdates++ + } + if numUpdates == 2 { + break + } + } + }() + + // Use a separate client to create resources to avoid any potential uses of the cache + _, cli2 := testClient(t, nil) + + // Create a set of 10 Configmaps after the Accumulator is watching to simulate getting + // a bunch of resources at once mid-watch. + var cms [10]*ConfigMap + for i := 0; i < 10; i++ { + cm := &ConfigMap{ + TypeMeta: TypeMeta{ + Kind: "ConfigMap", + }, + ObjectMeta: ObjectMeta{ + Name: fmt.Sprintf("test-batch-%d", i), + Labels: map[string]string{ + "test": "test-batch", + }, + }, + } + err := cli2.Upsert(ctx, cm, cm, &cm) + require.NoError(t, err) + cms[i] = cm + } + wg.Wait() + + // After receiving 2 updates from the Accumulator, we should have 10 ConfigMaps + // in our Snapshot due to the Accumulator coalescing changes before sending an update. + assert.Equal(t, 10, len(snap.ConfigMaps)) + + t.Cleanup(func() { + for _, cm := range cms { + if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) { + t.Error(err) + } + } + }) +} diff --git a/pkg/kates/client.go b/pkg/kates/client.go index b449c9090b2..ed0f1514424 100644 --- a/pkg/kates/client.go +++ b/pkg/kates/client.go @@ -77,12 +77,13 @@ import ( // 2. The Accumulator API is guaranteed to bootstrap (i.e. perform an initial List operation) on // all watches prior to notifying the user that resources are available to process. type Client struct { - config *ConfigFlags - cli dynamic.Interface - mapper meta.RESTMapper - disco discovery.CachedDiscoveryInterface - mutex sync.Mutex - canonical map[string]*Unstructured + config *ConfigFlags + cli dynamic.Interface + mapper meta.RESTMapper + disco discovery.CachedDiscoveryInterface + mutex sync.Mutex + canonical map[string]*Unstructured + maxAccumulatorInterval time.Duration // This is an internal interface for testing, it lets us deliberately introduce delays into the // implementation, e.g. effectively increasing the latency to the api server in a controllable @@ -149,14 +150,15 @@ func NewClientFromConfigFlags(config *ConfigFlags) (*Client, error) { } return &Client{ - config: config, - cli: cli, - mapper: mapper, - disco: disco, - canonical: make(map[string]*Unstructured), - watchAdded: func(oldObj, newObj *Unstructured) {}, - watchUpdated: func(oldObj, newObj *Unstructured) {}, - watchDeleted: func(oldObj, newObj *Unstructured) {}, + config: config, + cli: cli, + mapper: mapper, + disco: disco, + canonical: make(map[string]*Unstructured), + maxAccumulatorInterval: 0 * time.Second, + watchAdded: func(oldObj, newObj *Unstructured) {}, + watchUpdated: func(oldObj, newObj *Unstructured) {}, + watchDeleted: func(oldObj, newObj *Unstructured) {}, }, nil } @@ -211,6 +213,12 @@ func InCluster() bool { err == nil && !fi.IsDir() } +func (c *Client) MaxAccumulatorInterval(interval time.Duration) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.maxAccumulatorInterval = interval +} + // DynamicInterface is an accessor method to the k8s dynamic client func (c *Client) DynamicInterface() dynamic.Interface { return c.cli @@ -371,7 +379,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat // resource instances of the kind being watched lw := newListWatcher(ctx, cli, query, func(lw *lw) { if lw.hasSynced() { - target <- rawUpdate{query.Name, true, nil, nil} + target <- rawUpdate{query.Name, true, nil, nil, time.Now()} } }) informer = cache.NewSharedInformer(lw, &Unstructured{}, 5*time.Minute) @@ -410,7 +418,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat // better/faster tests. c.watchAdded(nil, obj.(*Unstructured)) lw.countAddEvent() - target <- rawUpdate{query.Name, lw.hasSynced(), nil, obj.(*Unstructured)} + target <- rawUpdate{query.Name, lw.hasSynced(), nil, obj.(*Unstructured), time.Now()} }, UpdateFunc: func(oldObj, newObj interface{}) { old := oldObj.(*Unstructured) @@ -421,7 +429,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat // nicer prettier set of hooks, but for now all we need is this hack for // better/faster tests. c.watchUpdated(old, new) - target <- rawUpdate{query.Name, lw.hasSynced(), old, new} + target <- rawUpdate{query.Name, lw.hasSynced(), old, new, time.Now()} }, DeleteFunc: func(obj interface{}) { var old *Unstructured @@ -444,7 +452,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat c.mutex.Lock() delete(c.canonical, key) c.mutex.Unlock() - target <- rawUpdate{query.Name, lw.hasSynced(), old, nil} + target <- rawUpdate{query.Name, lw.hasSynced(), old, nil, time.Now()} }, }, ) @@ -457,7 +465,8 @@ type rawUpdate struct { synced bool old *unstructured.Unstructured new *unstructured.Unstructured -} + ts time.Time +} type lw struct { // All these fields are read-only and initialized on construction. @@ -499,7 +508,7 @@ func (lw *lw) countAddEvent() { // dispatched to consider things synced at the dispatch layer. // // So to track syncedness properly for our users, when we do our first List() we remember how many -// resourcees there are and we do not consider ourselves synced until we have dispatched at least as +// resources there are and we do not consider ourselves synced until we have dispatched at least as // many Add events as there are resources. func (lw *lw) hasSynced() (result bool) { lw.withMutex(func() {