-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add System Scheduler that runs tasks on every node #287
Merged
Merged
Changes from 4 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
b24f48a
System scheduler and system stack
dadgar 3cd3ac6
Use valid driver values in test
dadgar 5cd9a55
Refactor shared code between schedulers
dadgar 5bfb712
Add diffSystemAlloc which gives richer information which node to plac…
dadgar 0c5ee68
Add negative test to DriverIterator, increase system scheduler attemp…
dadgar 5bbe7f6
diffResult stores values not pointers
dadgar 7feb5f1
Refactor task group constraint logic in generic/system stack
dadgar a0d3eb7
Validate task group count on system scheduler
dadgar 927efaf
Unit tests for the refactor scheduler methods
dadgar 2405101
Remove base nodes from stack constructors
dadgar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,18 +82,6 @@ func NewBatchScheduler(logger *log.Logger, state State, planner Planner) Schedul | |
return s | ||
} | ||
|
||
// setStatus is used to update the status of the evaluation | ||
func (s *GenericScheduler) setStatus(status, desc string) error { | ||
s.logger.Printf("[DEBUG] sched: %#v: setting status to %s", s.eval, status) | ||
newEval := s.eval.Copy() | ||
newEval.Status = status | ||
newEval.StatusDescription = desc | ||
if s.nextEval != nil { | ||
newEval.NextEval = s.nextEval.ID | ||
} | ||
return s.planner.UpdateEval(newEval) | ||
} | ||
|
||
// Process is used to handle a single evaluation | ||
func (s *GenericScheduler) Process(eval *structs.Evaluation) error { | ||
// Store the evaluation | ||
|
@@ -106,7 +94,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { | |
default: | ||
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", | ||
eval.TriggeredBy) | ||
return s.setStatus(structs.EvalStatusFailed, desc) | ||
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc) | ||
} | ||
|
||
// Retry up to the maxScheduleAttempts | ||
|
@@ -116,13 +104,13 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { | |
} | ||
if err := retryMax(limit, s.process); err != nil { | ||
if statusErr, ok := err.(*SetStatusError); ok { | ||
return s.setStatus(statusErr.EvalStatus, err.Error()) | ||
return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error()) | ||
} | ||
return err | ||
} | ||
|
||
// Update the status to complete | ||
return s.setStatus(structs.EvalStatusComplete, "") | ||
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusComplete, "") | ||
} | ||
|
||
// process is wrapped in retryMax to iteratively run the handler until we have no | ||
|
@@ -231,7 +219,7 @@ func (s *GenericScheduler) computeJobAllocs() error { | |
} | ||
|
||
// Attempt to do the upgrades in place | ||
diff.update = s.inplaceUpdate(diff.update) | ||
diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update) | ||
|
||
// Check if a rolling upgrade strategy is being used | ||
limit := len(diff.update) + len(diff.migrate) | ||
|
@@ -240,10 +228,10 @@ func (s *GenericScheduler) computeJobAllocs() error { | |
} | ||
|
||
// Treat migrations as an eviction and a new placement. | ||
s.evictAndPlace(diff, diff.migrate, allocMigrating, &limit) | ||
s.limitReached = evictAndPlace(s.ctx, diff, diff.migrate, allocUpdating, &limit) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be allocMigrating? |
||
|
||
// Treat non in-place updates as an eviction and new placement. | ||
s.evictAndPlace(diff, diff.update, allocUpdating, &limit) | ||
s.limitReached = evictAndPlace(s.ctx, diff, diff.update, allocUpdating, &limit) | ||
|
||
// Nothing remaining to do if placement is not required | ||
if len(diff.place) == 0 { | ||
|
@@ -254,103 +242,8 @@ func (s *GenericScheduler) computeJobAllocs() error { | |
return s.computePlacements(diff.place) | ||
} | ||
|
||
// evictAndPlace is used to mark allocations for evicts and add them to the placement queue | ||
func (s *GenericScheduler) evictAndPlace(diff *diffResult, allocs []allocTuple, desc string, limit *int) { | ||
n := len(allocs) | ||
for i := 0; i < n && i < *limit; i++ { | ||
a := allocs[i] | ||
s.plan.AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc) | ||
diff.place = append(diff.place, a) | ||
} | ||
if n <= *limit { | ||
*limit -= n | ||
} else { | ||
*limit = 0 | ||
s.limitReached = true | ||
} | ||
} | ||
|
||
// inplaceUpdate attempts to update allocations in-place where possible. | ||
func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple { | ||
n := len(updates) | ||
inplace := 0 | ||
for i := 0; i < n; i++ { | ||
// Get the udpate | ||
update := updates[i] | ||
|
||
// Check if the task drivers or config has changed, requires | ||
// a rolling upgrade since that cannot be done in-place. | ||
existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name) | ||
if tasksUpdated(update.TaskGroup, existing) { | ||
continue | ||
} | ||
|
||
// Get the existing node | ||
node, err := s.state.NodeByID(update.Alloc.NodeID) | ||
if err != nil { | ||
s.logger.Printf("[ERR] sched: %#v failed to get node '%s': %v", | ||
s.eval, update.Alloc.NodeID, err) | ||
continue | ||
} | ||
if node == nil { | ||
continue | ||
} | ||
|
||
// Set the existing node as the base set | ||
s.stack.SetNodes([]*structs.Node{node}) | ||
|
||
// Stage an eviction of the current allocation | ||
s.plan.AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, | ||
allocInPlace) | ||
|
||
// Attempt to match the task group | ||
option, size := s.stack.Select(update.TaskGroup) | ||
|
||
// Pop the allocation | ||
s.plan.PopUpdate(update.Alloc) | ||
|
||
// Skip if we could not do an in-place update | ||
if option == nil { | ||
continue | ||
} | ||
|
||
// Restore the network offers from the existing allocation. | ||
// We do not allow network resources (reserved/dynamic ports) | ||
// to be updated. This is guarded in taskUpdated, so we can | ||
// safely restore those here. | ||
for task, resources := range option.TaskResources { | ||
existing := update.Alloc.TaskResources[task] | ||
resources.Networks = existing.Networks | ||
} | ||
|
||
// Create a shallow copy | ||
newAlloc := new(structs.Allocation) | ||
*newAlloc = *update.Alloc | ||
|
||
// Update the allocation | ||
newAlloc.EvalID = s.eval.ID | ||
newAlloc.Job = s.job | ||
newAlloc.Resources = size | ||
newAlloc.TaskResources = option.TaskResources | ||
newAlloc.Metrics = s.ctx.Metrics() | ||
newAlloc.DesiredStatus = structs.AllocDesiredStatusRun | ||
newAlloc.ClientStatus = structs.AllocClientStatusPending | ||
s.plan.AppendAlloc(newAlloc) | ||
|
||
// Remove this allocation from the slice | ||
updates[i] = updates[n-1] | ||
i-- | ||
n-- | ||
inplace++ | ||
} | ||
if len(updates) > 0 { | ||
s.logger.Printf("[DEBUG] sched: %#v: %d in-place updates of %d", s.eval, inplace, len(updates)) | ||
} | ||
return updates[:n] | ||
} | ||
|
||
// computePlacements computes placements for allocations | ||
func (s *GenericScheduler) computePlacements(place []allocTuple) error { | ||
func (s *GenericScheduler) computePlacements(place []*allocTuple) error { | ||
// Get the base nodes | ||
nodes, err := readyNodesInDCs(s.state, s.job.Datacenters) | ||
if err != nil { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add a negative test in as well to check the
ParseBool
logic