-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reuse blocked evaluations and handle unblock events that occurred during scheduling #1199
Merged
Merged
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
08b2291
Reuse the same evaluation and reblock it until there is no more work …
dadgar 580a363
Worker annotates evals with their snapshot index
dadgar 1fc3fec
Track unblock indexes and check evals on block to see if they missed …
dadgar 8456f77
Periodically unblock failed evaluations
dadgar bd079a2
test fixes and delete
dadgar 230b663
address comment
dadgar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,15 +35,21 @@ type BlockedEvals struct { | |
escaped map[string]*structs.Evaluation | ||
|
||
// unblockCh is used to buffer unblocking of evaluations. | ||
capacityChangeCh chan string | ||
capacityChangeCh chan *capacityUpdate | ||
|
||
// jobs is the map of blocked job and is used to ensure that only one | ||
// blocked eval exists for each job. | ||
jobs map[string]struct{} | ||
|
||
// unblockIndexes maps computed node classes to the index in which they were | ||
// unblocked. This is used to check if an evaluation could have been | ||
// unblocked between the time they were in the scheduler and the time they | ||
// are being blocked. | ||
unblockIndexes map[string]uint64 | ||
|
||
// duplicates is the set of evaluations for jobs that had pre-existing | ||
// blocked evaluations. These should be marked as cancelled since only one | ||
// blocked eval is neeeded bper job. | ||
// blocked eval is neeeded per job. | ||
duplicates []*structs.Evaluation | ||
|
||
// duplicateCh is used to signal that a duplicate eval was added to the | ||
|
@@ -55,6 +61,12 @@ type BlockedEvals struct { | |
stopCh chan struct{} | ||
} | ||
|
||
// capacityUpdate stores unblock data. | ||
type capacityUpdate struct { | ||
computedClass string | ||
index uint64 | ||
} | ||
|
||
// BlockedStats returns all the stats about the blocked eval tracker. | ||
type BlockedStats struct { | ||
// TotalEscaped is the total number of blocked evaluations that have escaped | ||
|
@@ -73,7 +85,8 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { | |
captured: make(map[string]*structs.Evaluation), | ||
escaped: make(map[string]*structs.Evaluation), | ||
jobs: make(map[string]struct{}), | ||
capacityChangeCh: make(chan string, unblockBuffer), | ||
unblockIndexes: make(map[string]uint64), | ||
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer), | ||
duplicateCh: make(chan struct{}, 1), | ||
stopCh: make(chan struct{}), | ||
stats: new(BlockedStats), | ||
|
@@ -133,6 +146,14 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { | |
return | ||
} | ||
|
||
// Check if the eval missed an unblock while it was in the scheduler at an | ||
// older index. | ||
if b.missedUnblock(eval) { | ||
// Just re-enqueue the eval immediately | ||
b.evalBroker.Enqueue(eval) | ||
return | ||
} | ||
|
||
// Mark the job as tracked. | ||
b.stats.TotalBlocked++ | ||
b.jobs[eval.JobID] = struct{}{} | ||
|
@@ -152,16 +173,65 @@ func (b *BlockedEvals) Block(eval *structs.Evaluation) { | |
b.captured[eval.ID] = eval | ||
} | ||
|
||
// missedUnblock returns whether an evaluation missed an unblock while it was in | ||
// the scheduler. Since the scheduler can operate at an index in the past, the | ||
// evaluation may have been processed missing data that would allow it to | ||
// complete. This method returns if that is the case and should be called with | ||
// the lock held. | ||
func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool { | ||
var max uint64 = 0 | ||
for class, index := range b.unblockIndexes { | ||
// Calculate the max unblock index | ||
if max < index { | ||
max = index | ||
} | ||
|
||
elig, ok := eval.ClassEligibility[class] | ||
if !ok { | ||
// The evaluation was processed and did not encounter this class. | ||
// Thus for correctness we need to unblock it. | ||
return true | ||
} | ||
|
||
// The evaluation could use the computed node class and the eval was | ||
// processed before the last unblock. | ||
if elig && eval.SnapshotIndex < index { | ||
return true | ||
} | ||
} | ||
|
||
// If the evaluation has escaped, and the map contains an index older than | ||
// the evaluations, it should be unblocked. | ||
if eval.EscapedComputedClass && eval.SnapshotIndex < max { | ||
return true | ||
} | ||
|
||
// The evaluation is ahead of all recent unblocks. | ||
return false | ||
} | ||
|
||
// Unblock causes any evaluation that could potentially make progress on a | ||
// capacity change on the passed computed node class to be enqueued into the | ||
// eval broker. | ||
func (b *BlockedEvals) Unblock(computedClass string) { | ||
func (b *BlockedEvals) Unblock(computedClass string, index uint64) { | ||
b.l.Lock() | ||
|
||
// Do nothing if not enabled | ||
if !b.enabled { | ||
b.l.Unlock() | ||
return | ||
} | ||
|
||
b.capacityChangeCh <- computedClass | ||
// Store the index in which the unblock happened. We use this on subsequent | ||
// block calls in case the evaluation was in the scheduler when a trigger | ||
// occured. | ||
b.unblockIndexes[computedClass] = index | ||
b.l.Unlock() | ||
|
||
b.capacityChangeCh <- &capacityUpdate{ | ||
computedClass: computedClass, | ||
index: index, | ||
} | ||
} | ||
|
||
// watchCapacity is a long lived function that watches for capacity changes in | ||
|
@@ -171,15 +241,15 @@ func (b *BlockedEvals) watchCapacity() { | |
select { | ||
case <-b.stopCh: | ||
return | ||
case computedClass := <-b.capacityChangeCh: | ||
b.unblock(computedClass) | ||
case update := <-b.capacityChangeCh: | ||
b.unblock(update.computedClass, update.index) | ||
} | ||
} | ||
} | ||
|
||
// unblock unblocks all blocked evals that could run on the passed computed node | ||
// class. | ||
func (b *BlockedEvals) unblock(computedClass string) { | ||
func (b *BlockedEvals) unblock(computedClass string, index uint64) { | ||
b.l.Lock() | ||
defer b.l.Unlock() | ||
|
||
|
@@ -188,6 +258,11 @@ func (b *BlockedEvals) unblock(computedClass string) { | |
return | ||
} | ||
|
||
// Store the index in which the unblock happened. We use this on subsequent | ||
// block calls in case the evaluation was in the scheduler when a trigger | ||
// occured. | ||
b.unblockIndexes[computedClass] = index | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are doing this in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't copy paste fail |
||
|
||
// Every eval that has escaped computed node class has to be unblocked | ||
// because any node could potentially be feasible. | ||
var unblocked []*structs.Evaluation | ||
|
@@ -229,6 +304,35 @@ func (b *BlockedEvals) unblock(computedClass string) { | |
} | ||
} | ||
|
||
// UnblockFailed unblocks all blocked evaluation that were due to scheduler | ||
// failure. | ||
func (b *BlockedEvals) UnblockFailed() { | ||
b.l.Lock() | ||
defer b.l.Unlock() | ||
|
||
// Do nothing if not enabled | ||
if !b.enabled { | ||
return | ||
} | ||
|
||
var unblock []*structs.Evaluation | ||
for id, eval := range b.captured { | ||
if eval.TriggeredBy == structs.EvalTriggerMaxPlans { | ||
unblock = append(unblock, eval) | ||
delete(b.captured, id) | ||
} | ||
} | ||
|
||
for id, eval := range b.escaped { | ||
if eval.TriggeredBy == structs.EvalTriggerMaxPlans { | ||
unblock = append(unblock, eval) | ||
delete(b.escaped, id) | ||
} | ||
} | ||
|
||
b.evalBroker.EnqueueAll(unblock) | ||
} | ||
|
||
// GetDuplicates returns all the duplicate evaluations and blocks until the | ||
// passed timeout. | ||
func (b *BlockedEvals) GetDuplicates(timeout time.Duration) []*structs.Evaluation { | ||
|
@@ -273,7 +377,7 @@ func (b *BlockedEvals) Flush() { | |
b.escaped = make(map[string]*structs.Evaluation) | ||
b.jobs = make(map[string]struct{}) | ||
b.duplicates = nil | ||
b.capacityChangeCh = make(chan string, unblockBuffer) | ||
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer) | ||
b.stopCh = make(chan struct{}) | ||
b.duplicateCh = make(chan struct{}, 1) | ||
} | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add some more comment as to how/why this is possible. "The scheduler could have been invoked with a snapshot of state that was prior to additional capacity being added or allocations becoming terminal."