Skip to content

Commit

Permalink
pkg/kates: batch raw updates for a certain interval before sending ch…
Browse files Browse the repository at this point in the history
…anges

The Accumulator struct attempts to coalece changes into a single snapshot
update as a way to do graceful load shedding. However, while this was the behavior
on bootstrap, it didn't always happen mid-watch - each event that was received turned
into a single snapshot update, thus not really satisfying this requirement.

We add a new option to batch changes for a specified interval before sending a snapshot
update. This allows a way to configure how aggressive load shedding can take place. The
default interval is set at 0 to keep the current previous behavior.

A snapshot update won't be sent until all resources are fully bootstrapped,
regardless of what interval is set. This is the ensure that the other requirements for
the Accumulator are still satisfied.

For testing, we add a new test case TestBatchNoNotifyBeforeSync.

Signed-off-by: Hamzah Qudsi <[email protected]>
  • Loading branch information
Hamzah Qudsi committed Sep 8, 2022
1 parent f810b88 commit 5271ca9
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 57 deletions.
93 changes: 57 additions & 36 deletions pkg/kates/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
)
Expand Down Expand Up @@ -80,6 +81,12 @@ const (
ObjectDelete
)

type changeStatus int
const (
awaitingDispatch changeStatus = iota
dispatched
)

func (dt DeltaType) MarshalJSON() ([]byte, error) {
switch dt {
case ObjectAdd:
Expand Down Expand Up @@ -165,47 +172,61 @@ func newAccumulator(ctx context.Context, client *Client, queries ...Query) (*Acc
client.watchRaw(ctx, q, rawUpdateCh, client.cliFor(field.mapping, q.Namespace))
}

acc := &Accumulator{client, fields, map[string]bool{}, 0, changed, sync.Mutex{}}

// This coalesces reads from rawUpdateCh to notifications that changes are available to be
// processed. This loop along with the logic in storeField guarantees the 3
// Goals/Requirements listed in the documentation for the Accumulator struct, i.e. Ensuring
// all Kinds are bootstrapped before any notification occurs, as well as ensuring that we
// continue to coalesce updates in the background while business logic is executing in order
// to ensure graceful load shedding.
go func() {
canSend := false

for {
var rawUp rawUpdate
if canSend {
select {
case changed <- struct{}{}:
canSend = false
continue
case rawUp = <-rawUpdateCh:
case <-ctx.Done():
return
}
} else {
select {
case rawUp = <-rawUpdateCh:
case <-ctx.Done():
return
}
}
acc := &Accumulator{
client:client,
fields:fields,
excluded: map[string]bool{},
synced: 0,
changed: changed,
mutex:sync.Mutex{},
}

// Don't overwrite canSend if storeField returns false. We may not yet have
// had a chance to send a notification down the changed channel.
if acc.storeUpdate(rawUp) {
canSend = true
}
}
}()
go acc.Listen(ctx, rawUpdateCh, client.maxAccumulatorInterval)

return acc, nil
}

// This coalesces reads from rawUpdateCh to notifications that changes are available to be
// processed. This loop along with the logic in storeField guarantees the 3
// Goals/Requirements listed in the documentation for the Accumulator struct, i.e. Ensuring
// all Kinds are bootstrapped before any notification occurs, as well as ensuring that we
// continue to coalesce updates in the background while business logic is executing in order
// to ensure graceful load shedding.
func (a *Accumulator) Listen(ctx context.Context, rawUpdateCh chan rawUpdate, interval time.Duration) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
var changeStatus changeStatus
var lastChangeSent time.Time
var since time.Duration
var synced bool

maybeNotify := func() {
if (synced && changeStatus == awaitingDispatch && since >= interval) {
a.changed <- struct{}{}
lastChangeSent = time.Now()
changeStatus = dispatched
}
}

for {
select {
case rawUp := <-rawUpdateCh:
synced = a.storeUpdate(rawUp)
if synced {
changeStatus = awaitingDispatch
}
since = rawUp.ts.Sub(lastChangeSent)
maybeNotify()
case <-ticker.C:
synced = a.synced >= len(a.fields)
since = time.Now().Sub(lastChangeSent)
maybeNotify()
case <-ctx.Done():
return
}
}
}

func (a *Accumulator) Changed() <-chan struct{} {
return a.changed
}
Expand Down
72 changes: 71 additions & 1 deletion pkg/kates/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kates

import (
"fmt"
"sync"
"testing"
"time"

Expand All @@ -18,7 +19,7 @@ type Snap struct {
func TestBootstrapNoNotifyBeforeSync(t *testing.T) {
// Create a set of 10 configmaps to give us some resources to watch.
ctx, cli := testClient(t, nil)
var cms [10]*ConfigMap
var cms [10]*ConfigMap
for i := 0; i < 10; i++ {
cm := &ConfigMap{
TypeMeta: TypeMeta{
Expand Down Expand Up @@ -92,3 +93,72 @@ func TestBootstrapNotifyEvenOnEmptyWatch(t *testing.T) {
// that satisfy the selector, the ConfigMaps field should be empty.
assert.Equal(t, 0, len(snap.ConfigMaps))
}

// Make sure we coalesce raw changes before sending an update when a batch of resources
// are created/modified in quick succession.
func TestBatchNoNotifyBeforeSync(t *testing.T) {
ctx, cli := testClient(t, nil)
// Set a long interval to make sure all changes are batched before sending.
cli.MaxAccumulatorInterval(10 * time.Second)
acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-batch"})
require.NoError(t, err)

snap := &Snap{}

// Listen for changes from the Accumulator. Here it will listen for only 2 updates
// The first update should be the one sent during bootstrap. No resources should have changed
// in this update. The second update should contain resource changes.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
numUpdates := 0
for {
<-acc.Changed()
updated, err := acc.Update(ctx, snap)
require.NoError(t, err)
if updated {
numUpdates++
}
if numUpdates == 2 {
break
}
}
}()

// Use a separate client to create resources to avoid any potential uses of the cache
_, cli2 := testClient(t, nil)

// Create a set of 10 Configmaps after the Accumulator is watching to simulate getting
// a bunch of resources at once mid-watch.
var cms [10]*ConfigMap
for i := 0; i < 10; i++ {
cm := &ConfigMap{
TypeMeta: TypeMeta{
Kind: "ConfigMap",
},
ObjectMeta: ObjectMeta{
Name: fmt.Sprintf("test-batch-%d", i),
Labels: map[string]string{
"test": "test-batch",
},
},
}
err := cli2.Upsert(ctx, cm, cm, &cm)
require.NoError(t, err)
cms[i] = cm
}
wg.Wait()

// After receiving 2 updates from the Accumulator, we should have 10 ConfigMaps
// in our Snapshot due to the Accumulator coalescing changes before sending an update.
assert.Equal(t, 10, len(snap.ConfigMaps))

t.Cleanup(func() {
for _, cm := range cms {
if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) {
t.Error(err)
}
}
})
}
49 changes: 29 additions & 20 deletions pkg/kates/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ import (
// 2. The Accumulator API is guaranteed to bootstrap (i.e. perform an initial List operation) on
// all watches prior to notifying the user that resources are available to process.
type Client struct {
config *ConfigFlags
cli dynamic.Interface
mapper meta.RESTMapper
disco discovery.CachedDiscoveryInterface
mutex sync.Mutex
canonical map[string]*Unstructured
config *ConfigFlags
cli dynamic.Interface
mapper meta.RESTMapper
disco discovery.CachedDiscoveryInterface
mutex sync.Mutex
canonical map[string]*Unstructured
maxAccumulatorInterval time.Duration

// This is an internal interface for testing, it lets us deliberately introduce delays into the
// implementation, e.g. effectively increasing the latency to the api server in a controllable
Expand Down Expand Up @@ -149,14 +150,15 @@ func NewClientFromConfigFlags(config *ConfigFlags) (*Client, error) {
}

return &Client{
config: config,
cli: cli,
mapper: mapper,
disco: disco,
canonical: make(map[string]*Unstructured),
watchAdded: func(oldObj, newObj *Unstructured) {},
watchUpdated: func(oldObj, newObj *Unstructured) {},
watchDeleted: func(oldObj, newObj *Unstructured) {},
config: config,
cli: cli,
mapper: mapper,
disco: disco,
canonical: make(map[string]*Unstructured),
maxAccumulatorInterval: 0 * time.Second,
watchAdded: func(oldObj, newObj *Unstructured) {},
watchUpdated: func(oldObj, newObj *Unstructured) {},
watchDeleted: func(oldObj, newObj *Unstructured) {},
}, nil
}

Expand Down Expand Up @@ -211,6 +213,12 @@ func InCluster() bool {
err == nil && !fi.IsDir()
}

func (c *Client) MaxAccumulatorInterval(interval time.Duration) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.maxAccumulatorInterval = interval
}

// DynamicInterface is an accessor method to the k8s dynamic client
func (c *Client) DynamicInterface() dynamic.Interface {
return c.cli
Expand Down Expand Up @@ -371,7 +379,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat
// resource instances of the kind being watched
lw := newListWatcher(ctx, cli, query, func(lw *lw) {
if lw.hasSynced() {
target <- rawUpdate{query.Name, true, nil, nil}
target <- rawUpdate{query.Name, true, nil, nil, time.Now()}
}
})
informer = cache.NewSharedInformer(lw, &Unstructured{}, 5*time.Minute)
Expand Down Expand Up @@ -410,7 +418,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat
// better/faster tests.
c.watchAdded(nil, obj.(*Unstructured))
lw.countAddEvent()
target <- rawUpdate{query.Name, lw.hasSynced(), nil, obj.(*Unstructured)}
target <- rawUpdate{query.Name, lw.hasSynced(), nil, obj.(*Unstructured), time.Now()}
},
UpdateFunc: func(oldObj, newObj interface{}) {
old := oldObj.(*Unstructured)
Expand All @@ -421,7 +429,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat
// nicer prettier set of hooks, but for now all we need is this hack for
// better/faster tests.
c.watchUpdated(old, new)
target <- rawUpdate{query.Name, lw.hasSynced(), old, new}
target <- rawUpdate{query.Name, lw.hasSynced(), old, new, time.Now()}
},
DeleteFunc: func(obj interface{}) {
var old *Unstructured
Expand All @@ -444,7 +452,7 @@ func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdat
c.mutex.Lock()
delete(c.canonical, key)
c.mutex.Unlock()
target <- rawUpdate{query.Name, lw.hasSynced(), old, nil}
target <- rawUpdate{query.Name, lw.hasSynced(), old, nil, time.Now()}
},
},
)
Expand All @@ -457,7 +465,8 @@ type rawUpdate struct {
synced bool
old *unstructured.Unstructured
new *unstructured.Unstructured
}
ts time.Time
}

type lw struct {
// All these fields are read-only and initialized on construction.
Expand Down Expand Up @@ -499,7 +508,7 @@ func (lw *lw) countAddEvent() {
// dispatched to consider things synced at the dispatch layer.
//
// So to track syncedness properly for our users, when we do our first List() we remember how many
// resourcees there are and we do not consider ourselves synced until we have dispatched at least as
// resources there are and we do not consider ourselves synced until we have dispatched at least as
// many Add events as there are resources.
func (lw *lw) hasSynced() (result bool) {
lw.withMutex(func() {
Expand Down

0 comments on commit 5271ca9

Please sign in to comment.