-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
prioritized client updates #17354
prioritized client updates #17354
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
```release-note:improvement | ||
client: prioritize allocation updates to reduce Raft and RPC load | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1420,30 +1420,36 @@ func (ar *allocRunner) persistLastAcknowledgedState(a *state.State) { | |
} | ||
} | ||
|
||
// LastAcknowledgedStateIsCurrent returns true if the current state matches the | ||
// state that was last acknowledged from a server update. This is called from | ||
// the client in the same goroutine that called AcknowledgeState so that we | ||
// can't get a TOCTOU error. | ||
func (ar *allocRunner) LastAcknowledgedStateIsCurrent(a *structs.Allocation) bool { | ||
// GetUpdatePriority returns the update priority based the difference between | ||
// the current state and the state that was last acknowledged from a server | ||
// update. This is called from the client in the same goroutine that called | ||
// AcknowledgeState so that we can't get a TOCTOU error. | ||
func (ar *allocRunner) GetUpdatePriority(a *structs.Allocation) cstructs.AllocUpdatePriority { | ||
ar.stateLock.RLock() | ||
defer ar.stateLock.RUnlock() | ||
|
||
last := ar.lastAcknowledgedState | ||
if last == nil { | ||
return false | ||
return cstructs.AllocUpdatePriorityTypical | ||
} | ||
|
||
switch { | ||
case last.ClientStatus != a.ClientStatus: | ||
return false | ||
return cstructs.AllocUpdatePriorityUrgent | ||
case last.ClientDescription != a.ClientDescription: | ||
return false | ||
return cstructs.AllocUpdatePriorityTypical | ||
case !last.DeploymentStatus.Equal(a.DeploymentStatus): | ||
return false | ||
return cstructs.AllocUpdatePriorityTypical | ||
Comment on lines
1441
to
+1442
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically deployments are gated by this field, so it could considered critical since it can cause a scheduling decision... ...but nothing about deployments is concerned with sub-second latencies, so I think it's fine to leave this as If you're in this code again maybe add a comment pointing out that while deployment status changes are not urgent, they can affect scheduling but not in a way that sub-second skew is significant. If you really want to tidy things up the PR description misses this too:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Somehow I missed that, so yeah I would've set it to urgent based on the reasoning I had in the PR. I'll keep (for now at least) and I'll add some commentary here around reasoning for things. |
||
case !last.NetworkStatus.Equal(a.NetworkStatus): | ||
return false | ||
return cstructs.AllocUpdatePriorityTypical | ||
} | ||
return maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool { | ||
|
||
if !maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool { | ||
return st.Equal(o) | ||
}) | ||
|
||
}) { | ||
return cstructs.AllocUpdatePriorityTypical | ||
} | ||
|
||
return cstructs.AllocUpdatePriorityNone | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ import ( | |
"github.com/hashicorp/nomad/client/serviceregistration/wrapper" | ||
"github.com/hashicorp/nomad/client/state" | ||
"github.com/hashicorp/nomad/client/stats" | ||
cstructs "github.com/hashicorp/nomad/client/structs" | ||
"github.com/hashicorp/nomad/client/vaultclient" | ||
"github.com/hashicorp/nomad/command/agent/consul" | ||
"github.com/hashicorp/nomad/helper" | ||
|
@@ -213,8 +214,10 @@ type Client struct { | |
invalidAllocs map[string]struct{} | ||
invalidAllocsLock sync.Mutex | ||
|
||
// allocUpdates stores allocations that need to be synced to the server. | ||
allocUpdates chan *structs.Allocation | ||
// allocUpdates stores allocations that need to be synced to the server, and | ||
// allocUpdatesLock guards access to it for concurrent updates. | ||
allocUpdates map[string]*structs.Allocation | ||
allocUpdatesLock sync.Mutex | ||
|
||
// consulService is the Consul handler implementation for managing services | ||
// and checks. | ||
|
@@ -366,7 +369,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie | |
logger: logger, | ||
rpcLogger: logger.Named("rpc"), | ||
allocs: make(map[string]interfaces.AllocRunner), | ||
allocUpdates: make(chan *structs.Allocation, 64), | ||
allocUpdates: make(map[string]*structs.Allocation, 64), | ||
shutdownCh: make(chan struct{}), | ||
triggerDiscoveryCh: make(chan struct{}), | ||
triggerNodeUpdate: make(chan struct{}, 8), | ||
|
@@ -1322,10 +1325,11 @@ func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) { | |
|
||
// Mark alloc as failed so server can handle this | ||
failed := makeFailedAlloc(alloc, err) | ||
select { | ||
case c.allocUpdates <- failed: | ||
case <-c.shutdownCh: | ||
} | ||
|
||
c.allocUpdatesLock.Lock() | ||
defer c.allocUpdatesLock.Unlock() | ||
|
||
c.allocUpdates[alloc.ID] = failed | ||
} | ||
|
||
// saveState is used to snapshot our state into the data dir. | ||
|
@@ -2099,10 +2103,10 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) { | |
stripped.DeploymentStatus = alloc.DeploymentStatus | ||
stripped.NetworkStatus = alloc.NetworkStatus | ||
|
||
select { | ||
case c.allocUpdates <- stripped: | ||
case <-c.shutdownCh: | ||
} | ||
c.allocUpdatesLock.Lock() | ||
defer c.allocUpdatesLock.Unlock() | ||
|
||
c.allocUpdates[stripped.ID] = stripped | ||
tgross marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// PutAllocation stores an allocation or returns an error if it could not be stored. | ||
|
@@ -2114,39 +2118,27 @@ func (c *Client) PutAllocation(alloc *structs.Allocation) error { | |
// server. | ||
func (c *Client) allocSync() { | ||
syncTicker := time.NewTicker(allocSyncIntv) | ||
updates := make(map[string]*structs.Allocation) | ||
updateTicks := 0 | ||
|
||
for { | ||
select { | ||
case <-c.shutdownCh: | ||
syncTicker.Stop() | ||
return | ||
case alloc := <-c.allocUpdates: | ||
// Batch the allocation updates until the timer triggers. | ||
updates[alloc.ID] = alloc | ||
|
||
case <-syncTicker.C: | ||
// Fast path if there are no updates | ||
if len(updates) == 0 { | ||
continue | ||
} | ||
// Ensure we never send an update before we've had at least one sync | ||
// from the server | ||
select { | ||
case <-c.serversContactedCh: | ||
default: | ||
continue | ||
} | ||
|
||
sync := c.filterAcknowledgedUpdates(updates) | ||
if len(sync) == 0 { | ||
// No updates to send | ||
updates = make(map[string]*structs.Allocation, len(updates)) | ||
updateTicks++ | ||
toSync := c.updatesToSync(updateTicks) | ||
|
||
if len(toSync) == 0 { | ||
syncTicker.Reset(allocSyncIntv) | ||
continue | ||
} | ||
|
||
// Send to server. | ||
args := structs.AllocUpdateRequest{ | ||
Alloc: sync, | ||
Alloc: toSync, | ||
WriteRequest: structs.WriteRequest{Region: c.Region()}, | ||
} | ||
|
||
|
@@ -2156,12 +2148,25 @@ func (c *Client) allocSync() { | |
// Error updating allocations, do *not* clear | ||
// updates and retry after backoff | ||
c.logger.Error("error updating allocations", "error", err) | ||
|
||
// refill the updates queue with updates that we failed to make, | ||
// but only if a newer update for that alloc hasn't come in. | ||
c.allocUpdatesLock.Lock() | ||
for _, unsynced := range toSync { | ||
if _, ok := c.allocUpdates[unsynced.ID]; !ok { | ||
c.allocUpdates[unsynced.ID] = unsynced | ||
} | ||
} | ||
c.allocUpdatesLock.Unlock() | ||
|
||
syncTicker.Reset(c.retryIntv(allocSyncRetryIntv)) | ||
continue | ||
} | ||
|
||
// Record that we've successfully synced these updates so that it's | ||
// written to disk | ||
c.allocLock.RLock() | ||
for _, update := range sync { | ||
for _, update := range toSync { | ||
if ar, ok := c.allocs[update.ID]; ok { | ||
ar.AcknowledgeState(&arstate.State{ | ||
ClientStatus: update.ClientStatus, | ||
|
@@ -2174,33 +2179,76 @@ func (c *Client) allocSync() { | |
} | ||
c.allocLock.RUnlock() | ||
|
||
// Successfully updated allocs, reset map and ticker. | ||
// Always reset ticker to give loop time to receive | ||
// alloc updates. If the RPC took the ticker interval | ||
// we may call it in a tight loop before draining | ||
// buffered updates. | ||
updates = make(map[string]*structs.Allocation, len(updates)) | ||
// Successfully updated allocs. Reset ticker to give loop time to | ||
// receive new alloc updates. Otherwise if the RPC took the ticker | ||
// interval we may call it in a tight loop reading empty updates. | ||
updateTicks = 0 | ||
syncTicker.Reset(allocSyncIntv) | ||
} | ||
} | ||
} | ||
|
||
func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) []*structs.Allocation { | ||
// updatesToSync returns a list of client allocation updates we need to make in | ||
// this tick of the allocSync. It returns nil if there's no updates to make | ||
// yet. The caller is responsible for restoring the c.allocUpdates map if it | ||
// can't successfully send the updates. | ||
func (c *Client) updatesToSync(updateTicks int) []*structs.Allocation { | ||
|
||
c.allocUpdatesLock.Lock() | ||
defer c.allocUpdatesLock.Unlock() | ||
|
||
// Fast path if there are no pending updates | ||
if len(c.allocUpdates) == 0 { | ||
return nil | ||
} | ||
|
||
// Ensure we never send an update before we've had at least one sync from | ||
// the server | ||
select { | ||
case <-c.serversContactedCh: | ||
default: | ||
return nil | ||
} | ||
|
||
toSync, urgent := c.filterAcknowledgedUpdates(c.allocUpdates) | ||
|
||
// Only update every 5th tick if there's no priority updates | ||
if updateTicks%5 != 0 && !urgent { | ||
return nil | ||
} | ||
|
||
// Clear here so that allocrunners can queue up the next set of updates | ||
// while we're waiting to hear from the server | ||
c.allocUpdates = make(map[string]*structs.Allocation, len(c.allocUpdates)) | ||
tgross marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return toSync | ||
} | ||
|
||
// filteredAcknowledgedUpdates returns a list of client alloc updates with the | ||
// already-acknowledged updates removed, and the highest priority of any update. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand correctly, the caller must hold at least a read lock on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd go a step further and suggest extracting the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Well it's not a
So in other words, make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shoenig @jrasell I've refactored this by pulling out the |
||
func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) ([]*structs.Allocation, bool) { | ||
var urgent bool | ||
sync := make([]*structs.Allocation, 0, len(updates)) | ||
tgross marked this conversation as resolved.
Show resolved
Hide resolved
|
||
c.allocLock.RLock() | ||
defer c.allocLock.RUnlock() | ||
for allocID, update := range updates { | ||
if ar, ok := c.allocs[allocID]; ok { | ||
if !ar.LastAcknowledgedStateIsCurrent(update) { | ||
switch ar.GetUpdatePriority(update) { | ||
case cstructs.AllocUpdatePriorityUrgent: | ||
sync = append(sync, update) | ||
urgent = true | ||
case cstructs.AllocUpdatePriorityTypical: | ||
sync = append(sync, update) | ||
case cstructs.AllocUpdatePriorityNone: | ||
// update is dropped | ||
} | ||
} else { | ||
// no allocrunner (typically a failed placement), so we need | ||
// to send update | ||
sync = append(sync, update) | ||
jrasell marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
return sync | ||
return sync, urgent | ||
} | ||
|
||
// allocUpdates holds the results of receiving updated allocations from the | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't know what it was before, how can we assume the change is typical? Seems worth a comment especially since all of the other code in this method must check from Highest Priority to Lowest in order to ensure a change to a low priority field doesn't demote an actually high priority update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that we don't know for sure. In practice an allocation will never become healthy quickly enough that the first update we send is that update. That being said we probably should account for allocations that quickly fail because there's a bunch of things that can go unrecoverably wrong on the client before we ever hit the task runner, and it'd be nice to be able to send those failure states to the server more quickly.