Skip to content

Commit

Permalink
pkg/kates: fix bug where Accumulator was not coalescing changes mid-w…
Browse files Browse the repository at this point in the history
…atch

For testing, we add a new test case TestBatchNoNotifyBeforeSync.

Also run make format/go.

Signed-off-by: Hamzah Qudsi <[email protected]>
  • Loading branch information
Hamzah Qudsi committed Sep 6, 2022
1 parent 02e79c3 commit 6087e1f
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 57 deletions.
82 changes: 46 additions & 36 deletions pkg/kates/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
)
Expand Down Expand Up @@ -55,6 +56,9 @@ type Accumulator struct {
excluded map[string]bool
synced int
changed chan struct{}
lastUpdateSent time.Time
maxSize int
maxInterval time.Duration
mutex sync.Mutex
}

Expand Down Expand Up @@ -165,47 +169,53 @@ func newAccumulator(ctx context.Context, client *Client, queries ...Query) (*Acc
client.watchRaw(ctx, q, rawUpdateCh, client.cliFor(field.mapping, q.Namespace))
}

acc := &Accumulator{client, fields, map[string]bool{}, 0, changed, sync.Mutex{}}

// This coalesces reads from rawUpdateCh to notifications that changes are available to be
// processed. This loop along with the logic in storeField guarantees the 3
// Goals/Requirements listed in the documentation for the Accumulator struct, i.e. Ensuring
// all Kinds are bootstrapped before any notification occurs, as well as ensuring that we
// continue to coalesce updates in the background while business logic is executing in order
// to ensure graceful load shedding.
go func() {
canSend := false

for {
var rawUp rawUpdate
if canSend {
select {
case changed <- struct{}{}:
canSend = false
continue
case rawUp = <-rawUpdateCh:
case <-ctx.Done():
return
}
} else {
select {
case rawUp = <-rawUpdateCh:
case <-ctx.Done():
return
}
}
acc := &Accumulator{
client:client,
fields:fields,
excluded: map[string]bool{},
synced: 0,
changed: changed,
maxInterval: client.maxAccumulatorInterval,
mutex:sync.Mutex{},
}

// Don't overwrite canSend if storeField returns false. We may not yet have
// had a chance to send a notification down the changed channel.
if acc.storeUpdate(rawUp) {
canSend = true
}
}
}()
go acc.Listen(ctx, rawUpdateCh)

return acc, nil
}

// This coalesces reads from rawUpdateCh to notifications that changes are available to be
// processed. This loop along with the logic in storeField guarantees the 3
// Goals/Requirements listed in the documentation for the Accumulator struct, i.e. Ensuring
// all Kinds are bootstrapped before any notification occurs, as well as ensuring that we
// continue to coalesce updates in the background while business logic is executing in order
// to ensure graceful load shedding.
func (a *Accumulator) Listen(ctx context.Context, rawUpdateCh chan rawUpdate) {
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case rawUp := <-rawUpdateCh:
synced := a.storeUpdate(rawUp)
since := rawUp.ts.Sub(a.lastUpdateSent)
a.maybeNotify(synced, since)
case <-ticker.C:
synced := a.synced >= len(a.fields)
since := time.Now().Sub(a.lastUpdateSent)
a.maybeNotify(synced, since)
case <-ctx.Done():
ticker.Stop()
return
}
}
}

func (a *Accumulator) maybeNotify(synced bool, since time.Duration) {
if (synced && since >= a.maxInterval) {
a.changed <- struct{}{}
a.lastUpdateSent = time.Now()
}
}

func (a *Accumulator) Changed() <-chan struct{} {
return a.changed
}
Expand Down
72 changes: 71 additions & 1 deletion pkg/kates/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kates

import (
"fmt"
"sync"
"testing"
"time"

Expand All @@ -18,7 +19,7 @@ type Snap struct {
func TestBootstrapNoNotifyBeforeSync(t *testing.T) {
// Create a set of 10 configmaps to give us some resources to watch.
ctx, cli := testClient(t, nil)
var cms [10]*ConfigMap
var cms [10]*ConfigMap
for i := 0; i < 10; i++ {
cm := &ConfigMap{
TypeMeta: TypeMeta{
Expand Down Expand Up @@ -92,3 +93,72 @@ func TestBootstrapNotifyEvenOnEmptyWatch(t *testing.T) {
// that satisfy the selector, the ConfigMaps field should be empty.
assert.Equal(t, 0, len(snap.ConfigMaps))
}

// Make sure we coalesce raw changes before sending an update when a batch of resources
// are created/modified in quick succession.
func TestBatchNoNotifyBeforeSync(t *testing.T) {
ctx, cli := testClient(t, nil)
// Set a long interval to make sure all changes are batched before sending.
cli.MaxAccumulatorInterval(10 * time.Second)
acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-batch"})
require.NoError(t, err)

snap := &Snap{}

// Listen for changes from the Accumulator. Here it will listen for only 2 updates
// The first update should be the one sent during bootstrap. No resources should have changed
// in this update. The second update should contain resource changes.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
numUpdates := 0
for {
<-acc.Changed()
updated, err := acc.Update(ctx, snap)
require.NoError(t, err)
if updated {
numUpdates++
}
if numUpdates == 2 {
break
}
}
}()

// Use a separate client to create resources to avoid any potential uses of the cache
_, cli2 := testClient(t, nil)

// Create a set of 10 Configmaps after the Accumulator is watching to simulate getting
// a bunch of resources at once mid-watch.
var cms [10]*ConfigMap
for i := 0; i < 10; i++ {
cm := &ConfigMap{
TypeMeta: TypeMeta{
Kind: "ConfigMap",
},
ObjectMeta: ObjectMeta{
Name: fmt.Sprintf("test-batch-%d", i),
Labels: map[string]string{
"test": "test-batch",
},
},
}
err := cli2.Upsert(ctx, cm, cm, &cm)
require.NoError(t, err)
cms[i] = cm
}
wg.Wait()

// After receiving 2 updates from the Accumulator, we should have 10 ConfigMaps
// in our Snapshot due to the Accumulator coalescing changes before sending an update.
assert.Equal(t, 10, len(snap.ConfigMaps))

t.Cleanup(func() {
for _, cm := range cms {
if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) {
t.Error(err)
}
}
})
}
49 changes: 29 additions & 20 deletions pkg/kates/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ import (
// 2. The Accumulator API is guaranteed to bootstrap (i.e. perform an initial List operation) on
// all watches prior to notifying the user that resources are available to process.
type Client struct {
config *ConfigFlags
cli dynamic.Interface
mapper meta.RESTMapper
disco discovery.CachedDiscoveryInterface
mutex sync.Mutex
canonical map[string]*Unstructured
config *ConfigFlags
cli dynamic.Interface
mapper meta.RESTMapper
disco discovery.CachedDiscoveryInterface
mutex sync.Mutex
canonical map[string]*Unstructured
maxAccumulatorInterval time.Duration

// This is an internal interface for testing, it lets us deliberately introduce delays into the
// implementation, e.g. effectively increasing the latency to the api server in a controllable
Expand Down Expand Up @@ -149,14 +150,15 @@ func NewClientFromConfigFlags(config *ConfigFlags) (*Client, error) {
}

return &Client{
config: config,
cli: cli,
mapper: mapper,
disco: disco,
canonical: make(map[string]*Unstructured),
watchAdded: func(oldObj, newObj *Unstructured) {},
watchUpdated: func(oldObj, newObj *Unstructured) {},
watchDeleted: func(oldObj, newObj *Unstructured) {},
config: config,
cli: cli,
mapper: mapper,
disco: disco,
canonical: make(map[string]*Unstructured),
maxAccumulatorInterval: 5 * time.Second,
watchAdded: func(oldObj, newObj *Unstructured) {},
watchUpdated: func(oldObj, newObj *Unstructured) {},
watchDeleted: func(oldObj, newObj *Unstructured) {},
}, nil
}

Expand Down Expand Up @@ -211,6 +213,12 @@ func InCluster() bool {
err == nil && !fi.IsDir()
}

func (c *Client) MaxAccumulatorInterval(interval time.Duration) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.maxAccumulatorInterval = interval
}

// DynamicInterface is an accessor method to the k8s dynamic client
func (c *Client) DynamicInterface() dynamic.Interface {
return c.cli
Expand Down Expand Up @@ -371,7 +379,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat
// resource instances of the kind being watched
lw := newListWatcher(ctx, cli, query, func(lw *lw) {
if lw.hasSynced() {
target <- rawUpdate{query.Name, true, nil, nil}
target <- rawUpdate{query.Name, true, nil, nil, time.Now()}
}
})
informer = cache.NewSharedInformer(lw, &Unstructured{}, 5*time.Minute)
Expand Down Expand Up @@ -410,7 +418,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat
// better/faster tests.
c.watchAdded(nil, obj.(*Unstructured))
lw.countAddEvent()
target <- rawUpdate{query.Name, lw.hasSynced(), nil, obj.(*Unstructured)}
target <- rawUpdate{query.Name, lw.hasSynced(), nil, obj.(*Unstructured), time.Now()}
},
UpdateFunc: func(oldObj, newObj interface{}) {
old := oldObj.(*Unstructured)
Expand All @@ -421,7 +429,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat
// nicer prettier set of hooks, but for now all we need is this hack for
// better/faster tests.
c.watchUpdated(old, new)
target <- rawUpdate{query.Name, lw.hasSynced(), old, new}
target <- rawUpdate{query.Name, lw.hasSynced(), old, new, time.Now()}
},
DeleteFunc: func(obj interface{}) {
var old *Unstructured
Expand All @@ -444,7 +452,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat
c.mutex.Lock()
delete(c.canonical, key)
c.mutex.Unlock()
target <- rawUpdate{query.Name, lw.hasSynced(), old, nil}
target <- rawUpdate{query.Name, lw.hasSynced(), old, nil, time.Now()}
},
},
)
Expand All @@ -457,7 +465,8 @@ type rawUpdate struct {
synced bool
old *unstructured.Unstructured
new *unstructured.Unstructured
}
ts time.Time
}

type lw struct {
// All these fields are read-only and initialized on construction.
Expand Down Expand Up @@ -499,7 +508,7 @@ func (lw *lw) countAddEvent() {
// dispatched to consider things synced at the dispatch layer.
//
// So to track syncedness properly for our users, when we do our first List() we remember how many
// resourcees there are and we do not consider ourselves synced until we have dispatched at least as
// resources there are and we do not consider ourselves synced until we have dispatched at least as
// many Add events as there are resources.
func (lw *lw) hasSynced() (result bool) {
lw.withMutex(func() {
Expand Down

0 comments on commit 6087e1f

Please sign in to comment.