Skip to content

Commit

Permalink
Merge pull request #287 from hashicorp/f-system-scheduler
Browse files Browse the repository at this point in the history
Add System Scheduler that runs tasks on every node
  • Loading branch information
dadgar committed Oct 17, 2015
2 parents bacb89a + 2405101 commit b6c826e
Show file tree
Hide file tree
Showing 13 changed files with 1,886 additions and 150 deletions.
53 changes: 53 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,59 @@ func Job() *structs.Job {
return job
}

func SystemJob() *structs.Job {
job := &structs.Job{
Region: "global",
ID: structs.GenerateUUID(),
Name: "my-job",
Type: structs.JobTypeSystem,
Priority: 100,
AllAtOnce: false,
Datacenters: []string{"dc1"},
Constraints: []*structs.Constraint{
&structs.Constraint{
Hard: true,
LTarget: "$attr.kernel.name",
RTarget: "linux",
Operand: "=",
},
},
TaskGroups: []*structs.TaskGroup{
&structs.TaskGroup{
Name: "web",
Count: 1,
Tasks: []*structs.Task{
&structs.Task{
Name: "web",
Driver: "exec",
Config: map[string]string{
"command": "/bin/date",
"args": "+%s",
},
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
MBits: 50,
DynamicPorts: []string{"http"},
},
},
},
},
},
},
},
Meta: map[string]string{
"owner": "armon",
},
Status: structs.JobStatusPending,
CreateIndex: 42,
ModifyIndex: 99,
}
return job
}

func Eval() *structs.Evaluation {
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ const (
JobTypeCore = "_core"
JobTypeService = "service"
JobTypeBatch = "batch"
JobTypeSystem = "system"
)

const (
Expand Down Expand Up @@ -828,6 +829,12 @@ func (j *Job) Validate() error {
} else {
taskGroups[tg.Name] = idx
}

if j.Type == "system" && tg.Count != 1 {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Job task group %d has count %d. Only count of 1 is supported with system scheduler",
idx+1, tg.Count))
}
}

// Validate the task group
Expand Down
15 changes: 14 additions & 1 deletion scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"regexp"
"strconv"
"strings"

"github.com/hashicorp/go-version"
Expand Down Expand Up @@ -129,10 +130,22 @@ func (iter *DriverIterator) Reset() {
func (iter *DriverIterator) hasDrivers(option *structs.Node) bool {
for driver := range iter.drivers {
driverStr := fmt.Sprintf("driver.%s", driver)
_, ok := option.Attributes[driverStr]
value, ok := option.Attributes[driverStr]
if !ok {
return false
}

enabled, err := strconv.ParseBool(value)
if err != nil {
iter.ctx.Logger().
Printf("[WARN] scheduler.DriverIterator: node %v has invalid driver setting %v: %v",
option.ID, driverStr, value)
return false
}

if !enabled {
return false
}
}
return true
}
Expand Down
7 changes: 5 additions & 2 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,14 @@ func TestDriverIterator(t *testing.T) {
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)

nodes[0].Attributes["driver.foo"] = "2"
nodes[2].Attributes["driver.foo"] = "2"
nodes[0].Attributes["driver.foo"] = "1"
nodes[1].Attributes["driver.foo"] = "0"
nodes[2].Attributes["driver.foo"] = "true"
nodes[3].Attributes["driver.foo"] = "False"

drivers := map[string]struct{}{
"exec": struct{}{},
Expand Down
121 changes: 7 additions & 114 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,6 @@ func NewBatchScheduler(logger *log.Logger, state State, planner Planner) Schedul
return s
}

// setStatus is used to update the status of the evaluation
func (s *GenericScheduler) setStatus(status, desc string) error {
s.logger.Printf("[DEBUG] sched: %#v: setting status to %s", s.eval, status)
newEval := s.eval.Copy()
newEval.Status = status
newEval.StatusDescription = desc
if s.nextEval != nil {
newEval.NextEval = s.nextEval.ID
}
return s.planner.UpdateEval(newEval)
}

// Process is used to handle a single evaluation
func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
// Store the evaluation
Expand All @@ -106,7 +94,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
return s.setStatus(structs.EvalStatusFailed, desc)
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc)
}

// Retry up to the maxScheduleAttempts
Expand All @@ -116,13 +104,13 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
}
if err := retryMax(limit, s.process); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return s.setStatus(statusErr.EvalStatus, err.Error())
return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error())
}
return err
}

// Update the status to complete
return s.setStatus(structs.EvalStatusComplete, "")
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusComplete, "")
}

// process is wrapped in retryMax to iteratively run the handler until we have no
Expand All @@ -143,7 +131,7 @@ func (s *GenericScheduler) process() (bool, error) {
s.ctx = NewEvalContext(s.state, s.plan, s.logger)

// Construct the placement stack
s.stack = NewGenericStack(s.batch, s.ctx, nil)
s.stack = NewGenericStack(s.batch, s.ctx)
if s.job != nil {
s.stack.SetJob(s.job)
}
Expand Down Expand Up @@ -231,7 +219,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
}

// Attempt to do the upgrades in place
diff.update = s.inplaceUpdate(diff.update)
diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update)

// Check if a rolling upgrade strategy is being used
limit := len(diff.update) + len(diff.migrate)
Expand All @@ -240,10 +228,10 @@ func (s *GenericScheduler) computeJobAllocs() error {
}

// Treat migrations as an eviction and a new placement.
s.evictAndPlace(diff, diff.migrate, allocMigrating, &limit)
s.limitReached = evictAndPlace(s.ctx, diff, diff.migrate, allocMigrating, &limit)

// Treat non in-place updates as an eviction and new placement.
s.evictAndPlace(diff, diff.update, allocUpdating, &limit)
s.limitReached = evictAndPlace(s.ctx, diff, diff.update, allocUpdating, &limit)

// Nothing remaining to do if placement is not required
if len(diff.place) == 0 {
Expand All @@ -254,101 +242,6 @@ func (s *GenericScheduler) computeJobAllocs() error {
return s.computePlacements(diff.place)
}

// evictAndPlace is used to mark allocations for evicts and add them to the placement queue
func (s *GenericScheduler) evictAndPlace(diff *diffResult, allocs []allocTuple, desc string, limit *int) {
n := len(allocs)
for i := 0; i < n && i < *limit; i++ {
a := allocs[i]
s.plan.AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc)
diff.place = append(diff.place, a)
}
if n <= *limit {
*limit -= n
} else {
*limit = 0
s.limitReached = true
}
}

// inplaceUpdate attempts to update allocations in-place where possible.
func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple {
n := len(updates)
inplace := 0
for i := 0; i < n; i++ {
// Get the udpate
update := updates[i]

// Check if the task drivers or config has changed, requires
// a rolling upgrade since that cannot be done in-place.
existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name)
if tasksUpdated(update.TaskGroup, existing) {
continue
}

// Get the existing node
node, err := s.state.NodeByID(update.Alloc.NodeID)
if err != nil {
s.logger.Printf("[ERR] sched: %#v failed to get node '%s': %v",
s.eval, update.Alloc.NodeID, err)
continue
}
if node == nil {
continue
}

// Set the existing node as the base set
s.stack.SetNodes([]*structs.Node{node})

// Stage an eviction of the current allocation
s.plan.AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop,
allocInPlace)

// Attempt to match the task group
option, size := s.stack.Select(update.TaskGroup)

// Pop the allocation
s.plan.PopUpdate(update.Alloc)

// Skip if we could not do an in-place update
if option == nil {
continue
}

// Restore the network offers from the existing allocation.
// We do not allow network resources (reserved/dynamic ports)
// to be updated. This is guarded in taskUpdated, so we can
// safely restore those here.
for task, resources := range option.TaskResources {
existing := update.Alloc.TaskResources[task]
resources.Networks = existing.Networks
}

// Create a shallow copy
newAlloc := new(structs.Allocation)
*newAlloc = *update.Alloc

// Update the allocation
newAlloc.EvalID = s.eval.ID
newAlloc.Job = s.job
newAlloc.Resources = size
newAlloc.TaskResources = option.TaskResources
newAlloc.Metrics = s.ctx.Metrics()
newAlloc.DesiredStatus = structs.AllocDesiredStatusRun
newAlloc.ClientStatus = structs.AllocClientStatusPending
s.plan.AppendAlloc(newAlloc)

// Remove this allocation from the slice
updates[i] = updates[n-1]
i--
n--
inplace++
}
if len(updates) > 0 {
s.logger.Printf("[DEBUG] sched: %#v: %d in-place updates of %d", s.eval, inplace, len(updates))
}
return updates[:n]
}

// computePlacements computes placements for allocations
func (s *GenericScheduler) computePlacements(place []allocTuple) error {
// Get the base nodes
Expand Down
6 changes: 3 additions & 3 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestServiceSched_JobRegister(t *testing.T) {
job := mock.Job()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))

// Create a mock evaluation to deregister the job
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) {
job := mock.Job()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))

// Create a mock evaluation to deregister the job
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
Expand Down Expand Up @@ -550,7 +550,7 @@ func TestServiceSched_RetryLimit(t *testing.T) {
job := mock.Job()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))

// Create a mock evaluation to deregister the job
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
Expand Down
1 change: 1 addition & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
var BuiltinSchedulers = map[string]Factory{
"service": NewServiceScheduler,
"batch": NewBatchScheduler,
"system": NewSystemScheduler,
}

// NewScheduler is used to instantiate and return a new scheduler
Expand Down
Loading

0 comments on commit b6c826e

Please sign in to comment.