Skip to content

Commit

Permalink
Merge pull request #4514 from emissary-ingress/hqudsi/kates-cherrypick
Browse files Browse the repository at this point in the history
[v2.4] Port commits from PR #4488
  • Loading branch information
Hamzah Qudsi authored Sep 15, 2022
2 parents 72574f6 + 091be77 commit 469f83a
Show file tree
Hide file tree
Showing 6 changed files with 351 additions and 54 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ it will be removed; but as it won't be user-visible this isn't considered a brea
config. This has been fixed and the `alt_stats_name` field in the cluster config is now set
correctly. (Thanks to <a href="https://github.com/psalaberria002">Paul</a>!)

- Feature: The `AMBASSADOR_RECONFIG_MAX_DELAY` env var can be optionally set to batch changes for
the specified non-negative window period in seconds before doing an Envoy reconfiguration.
Default is "1" if not set.

## [1.14.5] TBD
[1.14.5]: https://github.com/emissary-ingress/emissary/compare/v2.3.2...v1.14.5

Expand Down
12 changes: 12 additions & 0 deletions cmd/entrypoint/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"encoding/json"
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
"time"

gw "sigs.k8s.io/gateway-api/apis/v1alpha1"

Expand All @@ -33,6 +35,16 @@ func WatchAllTheThings(
if err != nil {
return err
}
intv, err := strconv.Atoi(env("AMBASSADOR_RECONFIG_MAX_DELAY", "1"))
if err != nil {
return err
}
maxInterval := time.Duration(intv) * time.Second
err = client.MaxAccumulatorInterval(maxInterval)
if err != nil {
return err
}
dlog.Infof(ctx, "AMBASSADOR_RECONFIG_MAX_DELAY set to %d", intv)

serverTypeList, err := client.ServerResources()
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions docs/releaseNotes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ items:
config. This has been fixed and the <code>alt_stats_name</code> field in the cluster config is now set correctly.
(Thanks to <a href="https://github.com/psalaberria002">Paul</a>!)
- title: Add support for config change batch window before reconfiguring Envoy
type: feature
body: >-
The <code>AMBASSADOR_RECONFIG_MAX_DELAY</code> env var can be optionally set to batch changes for the specified
non-negative window period in seconds before doing an Envoy reconfiguration. Default is "1" if not set.
- version: 1.14.5
date: 'TBD'
notes:
Expand Down
95 changes: 60 additions & 35 deletions pkg/kates/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
)
Expand Down Expand Up @@ -80,6 +81,13 @@ const (
ObjectDelete
)

type changeStatus int

const (
awaitingDispatch changeStatus = iota
dispatched
)

func (dt DeltaType) MarshalJSON() ([]byte, error) {
switch dt {
case ObjectAdd:
Expand Down Expand Up @@ -165,45 +173,62 @@ func newAccumulator(ctx context.Context, client *Client, queries ...Query) (*Acc
client.watchRaw(ctx, q, rawUpdateCh, client.cliFor(field.mapping, q.Namespace))
}

acc := &Accumulator{client, fields, map[string]bool{}, 0, changed, sync.Mutex{}}

// This coalesces reads from rawUpdateCh to notifications that changes are available to be
// processed. This loop along with the logic in storeField guarantees the 3
// Goals/Requirements listed in the documentation for the Accumulator struct, i.e. Ensuring
// all Kinds are bootstrapped before any notification occurs, as well as ensuring that we
// continue to coalesce updates in the background while business logic is executing in order
// to ensure graceful load shedding.
go func() {
canSend := false

for {
var rawUp rawUpdate
if canSend {
select {
case changed <- struct{}{}:
canSend = false
continue
case rawUp = <-rawUpdateCh:
case <-ctx.Done():
return
}
acc := &Accumulator{
client: client,
fields: fields,
excluded: map[string]bool{},
synced: 0,
changed: changed,
mutex: sync.Mutex{},
}

go acc.Listen(ctx, rawUpdateCh, client.maxAccumulatorInterval)

return acc, nil
}

// Listen for updates from rawUpdateCh and sends notifications, coalescing reads as neccessary.
// This loop along with the logic in storeField isused to satisfy the 3 Goals/Requirements listed in
// the documentation for the Accumulator struct, i.e. Ensuring all Kinds are bootstrapped before any
// notification occurs, as well as ensuring that we continue to coalesce updates in the background while
// business logic is executing in order to ensure graceful load shedding.
func (a *Accumulator) Listen(ctx context.Context, rawUpdateCh <-chan rawUpdate, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
var changeStatus changeStatus
var lastChangeSent time.Time
var synced bool

sendUpdate := func() {
a.changed <- struct{}{}
changeStatus = dispatched
lastChangeSent = time.Now()
}

for {
select {
// We have two paths here:
// 1. If we get new data and it has been past our set interval since we last updated anything,
// we go ahead and immediately send that.
//
// 2. If we get new data but we just recently sent a change within our interval, we'll
// wait until we get our next Tick before sending a change.
case rawUp := <-rawUpdateCh:
synced = a.storeUpdate(rawUp)
since := rawUp.ts.Sub(lastChangeSent)
if synced && since >= interval {
sendUpdate()
} else {
select {
case rawUp = <-rawUpdateCh:
case <-ctx.Done():
return
}
changeStatus = awaitingDispatch
}

// Don't overwrite canSend if storeField returns false. We may not yet have
// had a chance to send a notification down the changed channel.
if acc.storeUpdate(rawUp) {
canSend = true
case <-ticker.C:
if synced && changeStatus == awaitingDispatch {
sendUpdate()
}
case <-ctx.Done():
return
}
}()

return acc, nil
}
}

func (a *Accumulator) Changed() <-chan struct{} {
Expand Down
Loading

0 comments on commit 469f83a

Please sign in to comment.