Skip to content

Commit

Permalink
prioritized client updates
Browse files Browse the repository at this point in the history
The allocrunner sends several updates to the server during the early lifecycle
of an allocation and its tasks. Clients batch-up allocation updates every 200ms,
but experiments like the C2M challenge has shown that even with this batching,
servers can be overwhelmed with client updates during high volume
deployments. Benchmarking done in #9451 has shown that client updates can easily
represent ~70% of all Nomad Raft traffic.

Each allocation sends many updates during its lifetime, but only those that
change the `ClientStatus` field are critical for progressing a deployment or
kicking off a reschedule to recover from failures.

Add a priority to the client allocation sync and update the `syncTicker`
receiver so that we only send an update if there's a high priority update
waiting, or on every 5th tick. This means when there are no high priority
updates, the client will send updates at most every 1s instead of
200ms. Benchmarks have shown this can reduce overall Raft traffic by 10%, as
well as reduce client-to-server RPC traffic.

This changeset also switches from a channel-based collection of updates to a
shared buffer, so as to split batching from sending and prevent backpressure
onto the allocrunner when the RPC is slow. This doesn't have a major performance
benefit in the benchmarks but makes the implementation of the prioritized update
simpler.

Fixes: #9451
  • Loading branch information
tgross committed May 30, 2023
1 parent 60e0404 commit 116ad2a
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 89 deletions.
3 changes: 3 additions & 0 deletions .changelog/17354.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
client: prioritize allocation updates to reduce Raft and RPC load
```
30 changes: 18 additions & 12 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,30 +1420,36 @@ func (ar *allocRunner) persistLastAcknowledgedState(a *state.State) {
}
}

// LastAcknowledgedStateIsCurrent returns true if the current state matches the
// state that was last acknowledged from a server update. This is called from
// the client in the same goroutine that called AcknowledgeState so that we
// can't get a TOCTOU error.
func (ar *allocRunner) LastAcknowledgedStateIsCurrent(a *structs.Allocation) bool {
// GetUpdatePriority returns the update priority based the difference between
// the current state and the state that was last acknowledged from a server
// update. This is called from the client in the same goroutine that called
// AcknowledgeState so that we can't get a TOCTOU error.
func (ar *allocRunner) GetUpdatePriority(a *structs.Allocation) cstructs.AllocUpdatePriority {
ar.stateLock.RLock()
defer ar.stateLock.RUnlock()

last := ar.lastAcknowledgedState
if last == nil {
return false
return cstructs.AllocUpdatePriorityTypical
}

switch {
case last.ClientStatus != a.ClientStatus:
return false
return cstructs.AllocUpdatePriorityUrgent
case last.ClientDescription != a.ClientDescription:
return false
return cstructs.AllocUpdatePriorityTypical
case !last.DeploymentStatus.Equal(a.DeploymentStatus):
return false
return cstructs.AllocUpdatePriorityTypical
case !last.NetworkStatus.Equal(a.NetworkStatus):
return false
return cstructs.AllocUpdatePriorityTypical
}
return maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool {

if !maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool {
return st.Equal(o)
})

}) {
return cstructs.AllocUpdatePriorityTypical
}

return cstructs.AllocUpdatePriorityNone
}
20 changes: 13 additions & 7 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hashicorp/nomad/client/serviceregistration"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -2458,7 +2459,7 @@ func TestAllocRunner_PreKill_RunOnDone(t *testing.T) {
))
}

func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) {
func TestAllocRunner_GetUpdatePriority(t *testing.T) {
ci.Parallel(t)

alloc := mock.Alloc()
Expand Down Expand Up @@ -2489,12 +2490,12 @@ func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) {
NetworkStatus: calloc.NetworkStatus,
})

must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))
must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc))

// clientAlloc mutates the state, so verify this doesn't break the check
// without state having been updated
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))
must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc))

// make a no-op state update
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
Expand All @@ -2503,14 +2504,19 @@ func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) {
DNS: &structs.DNSConfig{},
})
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))
must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc))

// make a state update that should be detected as a change
// make a low priority state update
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.2.1",
Address: "192.168.1.2",
DNS: &structs.DNSConfig{},
})
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.False(t, ar.LastAcknowledgedStateIsCurrent(calloc))
must.Eq(t, cstructs.AllocUpdatePriorityTypical, ar.GetUpdatePriority(calloc))

// make a state update that should be detected as high priority
ar.SetClientStatus(structs.AllocClientStatusFailed)
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.Eq(t, cstructs.AllocUpdatePriorityUrgent, ar.GetUpdatePriority(calloc))
}
2 changes: 1 addition & 1 deletion client/allocrunner/interfaces/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type AllocRunner interface {
AllocState() *state.State
PersistState() error
AcknowledgeState(*state.State)
LastAcknowledgedStateIsCurrent(*structs.Allocation) bool
GetUpdatePriority(*structs.Allocation) cstructs.AllocUpdatePriority
SetClientStatus(string)

Signal(taskName, signal string) error
Expand Down
130 changes: 89 additions & 41 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
Expand Down Expand Up @@ -213,8 +214,10 @@ type Client struct {
invalidAllocs map[string]struct{}
invalidAllocsLock sync.Mutex

// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation
// allocUpdates stores allocations that need to be synced to the server, and
// allocUpdatesLock guards access to it for concurrent updates.
allocUpdates map[string]*structs.Allocation
allocUpdatesLock sync.Mutex

// consulService is the Consul handler implementation for managing services
// and checks.
Expand Down Expand Up @@ -366,7 +369,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
logger: logger,
rpcLogger: logger.Named("rpc"),
allocs: make(map[string]interfaces.AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
allocUpdates: make(map[string]*structs.Allocation, 64),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
Expand Down Expand Up @@ -1322,10 +1325,11 @@ func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) {

// Mark alloc as failed so server can handle this
failed := makeFailedAlloc(alloc, err)
select {
case c.allocUpdates <- failed:
case <-c.shutdownCh:
}

c.allocUpdatesLock.Lock()
defer c.allocUpdatesLock.Unlock()

c.allocUpdates[alloc.ID] = failed
}

// saveState is used to snapshot our state into the data dir.
Expand Down Expand Up @@ -2099,10 +2103,10 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) {
stripped.DeploymentStatus = alloc.DeploymentStatus
stripped.NetworkStatus = alloc.NetworkStatus

select {
case c.allocUpdates <- stripped:
case <-c.shutdownCh:
}
c.allocUpdatesLock.Lock()
defer c.allocUpdatesLock.Unlock()

c.allocUpdates[stripped.ID] = stripped
}

// PutAllocation stores an allocation or returns an error if it could not be stored.
Expand All @@ -2114,39 +2118,27 @@ func (c *Client) PutAllocation(alloc *structs.Allocation) error {
// server.
func (c *Client) allocSync() {
syncTicker := time.NewTicker(allocSyncIntv)
updates := make(map[string]*structs.Allocation)
updateTicks := 0

for {
select {
case <-c.shutdownCh:
syncTicker.Stop()
return
case alloc := <-c.allocUpdates:
// Batch the allocation updates until the timer triggers.
updates[alloc.ID] = alloc

case <-syncTicker.C:
// Fast path if there are no updates
if len(updates) == 0 {
continue
}
// Ensure we never send an update before we've had at least one sync
// from the server
select {
case <-c.serversContactedCh:
default:
continue
}

sync := c.filterAcknowledgedUpdates(updates)
if len(sync) == 0 {
// No updates to send
updates = make(map[string]*structs.Allocation, len(updates))
updateTicks++
toSync := c.updatesToSync(updateTicks)

if len(toSync) == 0 {
syncTicker.Reset(allocSyncIntv)
continue
}

// Send to server.
args := structs.AllocUpdateRequest{
Alloc: sync,
Alloc: toSync,
WriteRequest: structs.WriteRequest{Region: c.Region()},
}

Expand All @@ -2156,12 +2148,25 @@ func (c *Client) allocSync() {
// Error updating allocations, do *not* clear
// updates and retry after backoff
c.logger.Error("error updating allocations", "error", err)

// refill the updates queue with updates that we failed to make,
// but only if a newer update for that alloc hasn't come in.
c.allocUpdatesLock.Lock()
for _, unsynced := range toSync {
if _, ok := c.allocUpdates[unsynced.ID]; !ok {
c.allocUpdates[unsynced.ID] = unsynced
}
}
c.allocUpdatesLock.Unlock()

syncTicker.Reset(c.retryIntv(allocSyncRetryIntv))
continue
}

// Record that we've successfully synced these updates so that it's
// written to disk
c.allocLock.RLock()
for _, update := range sync {
for _, update := range toSync {
if ar, ok := c.allocs[update.ID]; ok {
ar.AcknowledgeState(&arstate.State{
ClientStatus: update.ClientStatus,
Expand All @@ -2174,33 +2179,76 @@ func (c *Client) allocSync() {
}
c.allocLock.RUnlock()

// Successfully updated allocs, reset map and ticker.
// Always reset ticker to give loop time to receive
// alloc updates. If the RPC took the ticker interval
// we may call it in a tight loop before draining
// buffered updates.
updates = make(map[string]*structs.Allocation, len(updates))
// Successfully updated allocs. Reset ticker to give loop time to
// receive new alloc updates. Otherwise if the RPC took the ticker
// interval we may call it in a tight loop reading empty updates.
updateTicks = 0
syncTicker.Reset(allocSyncIntv)
}
}
}

func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) []*structs.Allocation {
// updatesToSync returns a list of client allocation updates we need to make in
// this tick of the allocSync. It returns nil if there's no updates to make
// yet. The caller is responsible for restoring the c.allocUpdates map if it
// can't successfully send the updates.
func (c *Client) updatesToSync(updateTicks int) []*structs.Allocation {

c.allocUpdatesLock.Lock()
defer c.allocUpdatesLock.Unlock()

// Fast path if there are no pending updates
if len(c.allocUpdates) == 0 {
return nil
}

// Ensure we never send an update before we've had at least one sync from
// the server
select {
case <-c.serversContactedCh:
default:
return nil
}

toSync, urgent := c.filterAcknowledgedUpdates(c.allocUpdates)

// Only update every 5th tick if there's no priority updates
if updateTicks%5 != 0 && !urgent {
return nil
}

// Clear here so that allocrunners can queue up the next set of updates
// while we're waiting to hear from the server
c.allocUpdates = make(map[string]*structs.Allocation, len(c.allocUpdates))

return toSync
}

// filteredAcknowledgedUpdates returns a list of client alloc updates with the
// already-acknowledged updates removed, and the highest priority of any update.
func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) ([]*structs.Allocation, bool) {
var urgent bool
sync := make([]*structs.Allocation, 0, len(updates))
c.allocLock.RLock()
defer c.allocLock.RUnlock()
for allocID, update := range updates {
if ar, ok := c.allocs[allocID]; ok {
if !ar.LastAcknowledgedStateIsCurrent(update) {
switch ar.GetUpdatePriority(update) {
case cstructs.AllocUpdatePriorityUrgent:
sync = append(sync, update)
urgent = true
case cstructs.AllocUpdatePriorityTypical:
sync = append(sync, update)
case cstructs.AllocUpdatePriorityNone:
// update is dropped
}
} else {
// no allocrunner (typically a failed placement), so we need
// to send update
sync = append(sync, update)
}
}
return sync
return sync, urgent
}

// allocUpdates holds the results of receiving updated allocations from the
Expand Down
8 changes: 5 additions & 3 deletions client/client_interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ func (ar *emptyAllocRunner) AllocState() *state.State {
return ar.allocState.Copy()
}

func (ar *emptyAllocRunner) PersistState() error { return nil }
func (ar *emptyAllocRunner) AcknowledgeState(*state.State) {}
func (ar *emptyAllocRunner) LastAcknowledgedStateIsCurrent(*structs.Allocation) bool { return false }
func (ar *emptyAllocRunner) PersistState() error { return nil }
func (ar *emptyAllocRunner) AcknowledgeState(*state.State) {}
func (ar *emptyAllocRunner) GetUpdatePriority(*structs.Allocation) cstructs.AllocUpdatePriority {
return cstructs.AllocUpdatePriorityUrgent
}

func (ar *emptyAllocRunner) SetClientStatus(status string) {
ar.allocLock.Lock()
Expand Down
Loading

0 comments on commit 116ad2a

Please sign in to comment.