diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 351d8f7fd86..abfb16c8c2e 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -109,6 +109,59 @@ func Job() *structs.Job { return job } +func SystemJob() *structs.Job { + job := &structs.Job{ + Region: "global", + ID: structs.GenerateUUID(), + Name: "my-job", + Type: structs.JobTypeSystem, + Priority: 100, + AllAtOnce: false, + Datacenters: []string{"dc1"}, + Constraints: []*structs.Constraint{ + &structs.Constraint{ + Hard: true, + LTarget: "$attr.kernel.name", + RTarget: "linux", + Operand: "=", + }, + }, + TaskGroups: []*structs.TaskGroup{ + &structs.TaskGroup{ + Name: "web", + Count: 1, + Tasks: []*structs.Task{ + &structs.Task{ + Name: "web", + Driver: "exec", + Config: map[string]string{ + "command": "/bin/date", + "args": "+%s", + }, + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + MBits: 50, + DynamicPorts: []string{"http"}, + }, + }, + }, + }, + }, + }, + }, + Meta: map[string]string{ + "owner": "armon", + }, + Status: structs.JobStatusPending, + CreateIndex: 42, + ModifyIndex: 99, + } + return job +} + func Eval() *structs.Evaluation { eval := &structs.Evaluation{ ID: structs.GenerateUUID(), diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 09cef210a13..8d58a985784 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -699,6 +699,7 @@ const ( JobTypeCore = "_core" JobTypeService = "service" JobTypeBatch = "batch" + JobTypeSystem = "system" ) const ( @@ -828,6 +829,12 @@ func (j *Job) Validate() error { } else { taskGroups[tg.Name] = idx } + + if j.Type == "system" && tg.Count != 1 { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("Job task group %d has count %d. Only count of 1 is supported with system scheduler", + idx+1, tg.Count)) + } } // Validate the task group diff --git a/scheduler/feasible.go b/scheduler/feasible.go index cbf811b8cca..559252045f1 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -4,6 +4,7 @@ import ( "fmt" "reflect" "regexp" + "strconv" "strings" "github.com/hashicorp/go-version" @@ -129,10 +130,22 @@ func (iter *DriverIterator) Reset() { func (iter *DriverIterator) hasDrivers(option *structs.Node) bool { for driver := range iter.drivers { driverStr := fmt.Sprintf("driver.%s", driver) - _, ok := option.Attributes[driverStr] + value, ok := option.Attributes[driverStr] if !ok { return false } + + enabled, err := strconv.ParseBool(value) + if err != nil { + iter.ctx.Logger(). + Printf("[WARN] scheduler.DriverIterator: node %v has invalid driver setting %v: %v", + option.ID, driverStr, value) + return false + } + + if !enabled { + return false + } } return true } diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 2f091c2f457..1cf58d1f6b3 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -82,11 +82,14 @@ func TestDriverIterator(t *testing.T) { mock.Node(), mock.Node(), mock.Node(), + mock.Node(), } static := NewStaticIterator(ctx, nodes) - nodes[0].Attributes["driver.foo"] = "2" - nodes[2].Attributes["driver.foo"] = "2" + nodes[0].Attributes["driver.foo"] = "1" + nodes[1].Attributes["driver.foo"] = "0" + nodes[2].Attributes["driver.foo"] = "true" + nodes[3].Attributes["driver.foo"] = "False" drivers := map[string]struct{}{ "exec": struct{}{}, diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index bd5bb81e48e..7957f2360ff 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -82,18 +82,6 @@ func NewBatchScheduler(logger *log.Logger, state State, planner Planner) Schedul return s } -// setStatus is used to update the status of the evaluation -func (s *GenericScheduler) setStatus(status, desc string) error { - s.logger.Printf("[DEBUG] sched: %#v: setting status to %s", s.eval, status) - newEval := s.eval.Copy() - newEval.Status = status - newEval.StatusDescription = desc - if s.nextEval != nil { - newEval.NextEval = s.nextEval.ID - } - return s.planner.UpdateEval(newEval) -} - // Process is used to handle a single evaluation func (s *GenericScheduler) Process(eval *structs.Evaluation) error { // Store the evaluation @@ -106,7 +94,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) - return s.setStatus(structs.EvalStatusFailed, desc) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc) } // Retry up to the maxScheduleAttempts @@ -116,13 +104,13 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { } if err := retryMax(limit, s.process); err != nil { if statusErr, ok := err.(*SetStatusError); ok { - return s.setStatus(statusErr.EvalStatus, err.Error()) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error()) } return err } // Update the status to complete - return s.setStatus(structs.EvalStatusComplete, "") + return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusComplete, "") } // process is wrapped in retryMax to iteratively run the handler until we have no @@ -143,7 +131,7 @@ func (s *GenericScheduler) process() (bool, error) { s.ctx = NewEvalContext(s.state, s.plan, s.logger) // Construct the placement stack - s.stack = NewGenericStack(s.batch, s.ctx, nil) + s.stack = NewGenericStack(s.batch, s.ctx) if s.job != nil { s.stack.SetJob(s.job) } @@ -231,7 +219,7 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Attempt to do the upgrades in place - diff.update = s.inplaceUpdate(diff.update) + diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update) // Check if a rolling upgrade strategy is being used limit := len(diff.update) + len(diff.migrate) @@ -240,10 +228,10 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Treat migrations as an eviction and a new placement. - s.evictAndPlace(diff, diff.migrate, allocMigrating, &limit) + s.limitReached = evictAndPlace(s.ctx, diff, diff.migrate, allocMigrating, &limit) // Treat non in-place updates as an eviction and new placement. - s.evictAndPlace(diff, diff.update, allocUpdating, &limit) + s.limitReached = evictAndPlace(s.ctx, diff, diff.update, allocUpdating, &limit) // Nothing remaining to do if placement is not required if len(diff.place) == 0 { @@ -254,101 +242,6 @@ func (s *GenericScheduler) computeJobAllocs() error { return s.computePlacements(diff.place) } -// evictAndPlace is used to mark allocations for evicts and add them to the placement queue -func (s *GenericScheduler) evictAndPlace(diff *diffResult, allocs []allocTuple, desc string, limit *int) { - n := len(allocs) - for i := 0; i < n && i < *limit; i++ { - a := allocs[i] - s.plan.AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc) - diff.place = append(diff.place, a) - } - if n <= *limit { - *limit -= n - } else { - *limit = 0 - s.limitReached = true - } -} - -// inplaceUpdate attempts to update allocations in-place where possible. -func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple { - n := len(updates) - inplace := 0 - for i := 0; i < n; i++ { - // Get the udpate - update := updates[i] - - // Check if the task drivers or config has changed, requires - // a rolling upgrade since that cannot be done in-place. - existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name) - if tasksUpdated(update.TaskGroup, existing) { - continue - } - - // Get the existing node - node, err := s.state.NodeByID(update.Alloc.NodeID) - if err != nil { - s.logger.Printf("[ERR] sched: %#v failed to get node '%s': %v", - s.eval, update.Alloc.NodeID, err) - continue - } - if node == nil { - continue - } - - // Set the existing node as the base set - s.stack.SetNodes([]*structs.Node{node}) - - // Stage an eviction of the current allocation - s.plan.AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, - allocInPlace) - - // Attempt to match the task group - option, size := s.stack.Select(update.TaskGroup) - - // Pop the allocation - s.plan.PopUpdate(update.Alloc) - - // Skip if we could not do an in-place update - if option == nil { - continue - } - - // Restore the network offers from the existing allocation. - // We do not allow network resources (reserved/dynamic ports) - // to be updated. This is guarded in taskUpdated, so we can - // safely restore those here. - for task, resources := range option.TaskResources { - existing := update.Alloc.TaskResources[task] - resources.Networks = existing.Networks - } - - // Create a shallow copy - newAlloc := new(structs.Allocation) - *newAlloc = *update.Alloc - - // Update the allocation - newAlloc.EvalID = s.eval.ID - newAlloc.Job = s.job - newAlloc.Resources = size - newAlloc.TaskResources = option.TaskResources - newAlloc.Metrics = s.ctx.Metrics() - newAlloc.DesiredStatus = structs.AllocDesiredStatusRun - newAlloc.ClientStatus = structs.AllocClientStatusPending - s.plan.AppendAlloc(newAlloc) - - // Remove this allocation from the slice - updates[i] = updates[n-1] - i-- - n-- - inplace++ - } - if len(updates) > 0 { - s.logger.Printf("[DEBUG] sched: %#v: %d in-place updates of %d", s.eval, inplace, len(updates)) - } - return updates[:n] -} - // computePlacements computes placements for allocations func (s *GenericScheduler) computePlacements(place []allocTuple) error { // Get the base nodes diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index b6cef783795..dfb35cb3c5e 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -22,7 +22,7 @@ func TestServiceSched_JobRegister(t *testing.T) { job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - // Create a mock evaluation to deregister the job + // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, @@ -71,7 +71,7 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - // Create a mock evaluation to deregister the job + // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, @@ -550,7 +550,7 @@ func TestServiceSched_RetryLimit(t *testing.T) { job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - // Create a mock evaluation to deregister the job + // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 8a62d7c85f0..baed71e734e 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -13,6 +13,7 @@ import ( var BuiltinSchedulers = map[string]Factory{ "service": NewServiceScheduler, "batch": NewBatchScheduler, + "system": NewSystemScheduler, } // NewScheduler is used to instantiate and return a new scheduler diff --git a/scheduler/stack.go b/scheduler/stack.go index c1468602f72..2cda626c488 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -48,7 +48,7 @@ type GenericStack struct { } // NewGenericStack constructs a stack used for selecting service placements -func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *GenericStack { +func NewGenericStack(batch bool, ctx Context) *GenericStack { // Create a new stack s := &GenericStack{ batch: batch, @@ -58,7 +58,7 @@ func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *Generi // Create the source iterator. We randomize the order we visit nodes // to reduce collisions between schedulers and to do a basic load // balancing across eligible nodes. - s.source = NewRandomIterator(ctx, baseNodes) + s.source = NewRandomIterator(ctx, nil) // Attach the job constraints. The job is filled in later. s.jobConstraint = NewConstraintIterator(ctx, s.source, nil) @@ -92,11 +92,6 @@ func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *Generi // Select the node with the maximum score for placement s.maxScore = NewMaxScoreIterator(ctx, s.limit) - - // Set the nodes if given - if len(baseNodes) != 0 { - s.SetNodes(baseNodes) - } return s } @@ -109,7 +104,7 @@ func (s *GenericStack) SetNodes(baseNodes []*structs.Node) { // Apply a limit function. This is to avoid scanning *every* possible node. // For batch jobs we only need to evaluate 2 options and depend on the - // powwer of two choices. For services jobs we need to visit "enough". + // power of two choices. For services jobs we need to visit "enough". // Using a log of the total number of nodes is a good restriction, with // at least 2 as the floor limit := 2 @@ -134,21 +129,12 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso s.ctx.Reset() start := time.Now() - // Collect the constraints, drivers and resources required by each - // sub-task to aggregate the TaskGroup totals - constr := make([]*structs.Constraint, 0, len(tg.Constraints)) - drivers := make(map[string]struct{}) - size := new(structs.Resources) - constr = append(constr, tg.Constraints...) - for _, task := range tg.Tasks { - drivers[task.Driver] = struct{}{} - constr = append(constr, task.Constraints...) - size.Add(task.Resources) - } + // Get the task groups constraints. + tgConstr := taskGroupConstraints(tg) // Update the parameters of iterators - s.taskGroupDrivers.SetDrivers(drivers) - s.taskGroupConstraint.SetConstraints(constr) + s.taskGroupDrivers.SetDrivers(tgConstr.drivers) + s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.binPack.SetTasks(tg.Tasks) // Find the node with the max score @@ -163,5 +149,83 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso // Store the compute time s.ctx.Metrics().AllocationTime = time.Since(start) - return option, size + return option, tgConstr.size +} + +// SystemStack is the Stack used for the System scheduler. It is designed to +// attempt to make placements on all nodes. +type SystemStack struct { + ctx Context + source *StaticIterator + jobConstraint *ConstraintIterator + taskGroupDrivers *DriverIterator + taskGroupConstraint *ConstraintIterator + binPack *BinPackIterator +} + +// NewSystemStack constructs a stack used for selecting service placements +func NewSystemStack(ctx Context) *SystemStack { + // Create a new stack + s := &SystemStack{ctx: ctx} + + // Create the source iterator. We visit nodes in a linear order because we + // have to evaluate on all nodes. + s.source = NewStaticIterator(ctx, nil) + + // Attach the job constraints. The job is filled in later. + s.jobConstraint = NewConstraintIterator(ctx, s.source, nil) + + // Filter on task group drivers first as they are faster + s.taskGroupDrivers = NewDriverIterator(ctx, s.jobConstraint, nil) + + // Filter on task group constraints second + s.taskGroupConstraint = NewConstraintIterator(ctx, s.taskGroupDrivers, nil) + + // Upgrade from feasible to rank iterator + rankSource := NewFeasibleRankIterator(ctx, s.taskGroupConstraint) + + // Apply the bin packing, this depends on the resources needed + // by a particular task group. Enable eviction as system jobs are high + // priority. + s.binPack = NewBinPackIterator(ctx, rankSource, true, 0) + return s +} + +func (s *SystemStack) SetNodes(baseNodes []*structs.Node) { + // Update the set of base nodes + s.source.SetNodes(baseNodes) +} + +func (s *SystemStack) SetJob(job *structs.Job) { + s.jobConstraint.SetConstraints(job.Constraints) + s.binPack.SetPriority(job.Priority) +} + +func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) { + // Reset the binpack selector and context + s.binPack.Reset() + s.ctx.Reset() + start := time.Now() + + // Get the task groups constraints. + tgConstr := taskGroupConstraints(tg) + + // Update the parameters of iterators + s.taskGroupDrivers.SetDrivers(tgConstr.drivers) + s.taskGroupConstraint.SetConstraints(tgConstr.constraints) + s.binPack.SetTasks(tg.Tasks) + + // Get the next option that satisfies the constraints. + option := s.binPack.Next() + + // Ensure that the task resources were specified + if option != nil && len(option.TaskResources) != len(tg.Tasks) { + for _, task := range tg.Tasks { + option.SetTaskResources(task, task.Resources) + } + } + + // Store the compute time + s.ctx.Metrics().AllocationTime = time.Since(start) + return option, tgConstr.size } diff --git a/scheduler/stack_test.go b/scheduler/stack_test.go index 07e94531590..7b4b8acb324 100644 --- a/scheduler/stack_test.go +++ b/scheduler/stack_test.go @@ -10,7 +10,7 @@ import ( func TestServiceStack_SetNodes(t *testing.T) { _, ctx := testContext(t) - stack := NewGenericStack(false, ctx, nil) + stack := NewGenericStack(false, ctx) nodes := []*structs.Node{ mock.Node(), @@ -37,7 +37,7 @@ func TestServiceStack_SetNodes(t *testing.T) { func TestServiceStack_SetJob(t *testing.T) { _, ctx := testContext(t) - stack := NewGenericStack(false, ctx, nil) + stack := NewGenericStack(false, ctx) job := mock.Job() stack.SetJob(job) @@ -55,7 +55,8 @@ func TestServiceStack_Select_Size(t *testing.T) { nodes := []*structs.Node{ mock.Node(), } - stack := NewGenericStack(false, ctx, nodes) + stack := NewGenericStack(false, ctx) + stack.SetNodes(nodes) job := mock.Job() stack.SetJob(job) @@ -85,7 +86,8 @@ func TestServiceStack_Select_MetricsReset(t *testing.T) { mock.Node(), mock.Node(), } - stack := NewGenericStack(false, ctx, nodes) + stack := NewGenericStack(false, ctx) + stack.SetNodes(nodes) job := mock.Job() stack.SetJob(job) @@ -120,7 +122,8 @@ func TestServiceStack_Select_DriverFilter(t *testing.T) { zero := nodes[0] zero.Attributes["driver.foo"] = "1" - stack := NewGenericStack(false, ctx, nodes) + stack := NewGenericStack(false, ctx) + stack.SetNodes(nodes) job := mock.Job() job.TaskGroups[0].Tasks[0].Driver = "foo" @@ -145,7 +148,8 @@ func TestServiceStack_Select_ConstraintFilter(t *testing.T) { zero := nodes[0] zero.Attributes["kernel.name"] = "freebsd" - stack := NewGenericStack(false, ctx, nodes) + stack := NewGenericStack(false, ctx) + stack.SetNodes(nodes) job := mock.Job() job.Constraints[0].RTarget = "freebsd" @@ -182,9 +186,179 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) { one := nodes[1] one.Reserved = one.Resources - stack := NewGenericStack(false, ctx, nodes) + stack := NewGenericStack(false, ctx) + stack.SetNodes(nodes) + + job := mock.Job() + stack.SetJob(job) + + node, _ := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + + if node.Node != zero { + t.Fatalf("bad") + } + + met := ctx.Metrics() + if met.NodesExhausted != 1 { + t.Fatalf("bad: %#v", met) + } + if met.ClassExhausted["linux-medium-pci"] != 1 { + t.Fatalf("bad: %#v", met) + } + if len(met.Scores) != 1 { + t.Fatalf("bad: %#v", met) + } +} + +func TestSystemStack_SetNodes(t *testing.T) { + _, ctx := testContext(t) + stack := NewSystemStack(ctx) + + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + stack.SetNodes(nodes) + + out := collectFeasible(stack.source) + if !reflect.DeepEqual(out, nodes) { + t.Fatalf("bad: %#v", out) + } +} + +func TestSystemStack_SetJob(t *testing.T) { + _, ctx := testContext(t) + stack := NewSystemStack(ctx) + + job := mock.Job() + stack.SetJob(job) + + if stack.binPack.priority != job.Priority { + t.Fatalf("bad") + } + if !reflect.DeepEqual(stack.jobConstraint.constraints, job.Constraints) { + t.Fatalf("bad") + } +} + +func TestSystemStack_Select_Size(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{mock.Node()} + stack := NewSystemStack(ctx) + stack.SetNodes(nodes) + + job := mock.Job() + stack.SetJob(job) + node, size := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + if size == nil { + t.Fatalf("missing size") + } + + if size.CPU != 500 || size.MemoryMB != 256 { + t.Fatalf("bad: %#v", size) + } + + met := ctx.Metrics() + if met.AllocationTime == 0 { + t.Fatalf("missing time") + } +} + +func TestSystemStack_Select_MetricsReset(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + stack := NewSystemStack(ctx) + stack.SetNodes(nodes) + + job := mock.Job() + stack.SetJob(job) + n1, _ := stack.Select(job.TaskGroups[0]) + m1 := ctx.Metrics() + if n1 == nil { + t.Fatalf("missing node %#v", m1) + } + + if m1.NodesEvaluated != 1 { + t.Fatalf("should only be 1") + } + + n2, _ := stack.Select(job.TaskGroups[0]) + m2 := ctx.Metrics() + if n2 == nil { + t.Fatalf("missing node %#v", m2) + } + + // If we don't reset, this would be 2 + if m2.NodesEvaluated != 1 { + t.Fatalf("should only be 2") + } +} + +func TestSystemStack_Select_DriverFilter(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + } + zero := nodes[0] + zero.Attributes["driver.foo"] = "1" + + stack := NewSystemStack(ctx) + stack.SetNodes(nodes) + + job := mock.Job() + job.TaskGroups[0].Tasks[0].Driver = "foo" + stack.SetJob(job) + + node, _ := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + + if node.Node != zero { + t.Fatalf("bad") + } + + zero.Attributes["driver.foo"] = "0" + stack = NewSystemStack(ctx) + stack.SetNodes(nodes) + stack.SetJob(job) + node, _ = stack.Select(job.TaskGroups[0]) + if node != nil { + t.Fatalf("node not filtered %#v", node) + } +} + +func TestSystemStack_Select_ConstraintFilter(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + zero := nodes[1] + zero.Attributes["kernel.name"] = "freebsd" + + stack := NewSystemStack(ctx) + stack.SetNodes(nodes) job := mock.Job() + job.Constraints[0].RTarget = "freebsd" stack.SetJob(job) node, _ := stack.Select(job.TaskGroups[0]) @@ -196,6 +370,43 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) { t.Fatalf("bad") } + met := ctx.Metrics() + if met.NodesFiltered != 1 { + t.Fatalf("bad: %#v", met) + } + if met.ClassFiltered["linux-medium-pci"] != 1 { + t.Fatalf("bad: %#v", met) + } + if met.ConstraintFiltered["$attr.kernel.name = freebsd"] != 1 { + t.Fatalf("bad: %#v", met) + } +} + +func TestSystemStack_Select_BinPack_Overflow(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + zero := nodes[0] + zero.Reserved = zero.Resources + one := nodes[1] + + stack := NewSystemStack(ctx) + stack.SetNodes(nodes) + + job := mock.Job() + stack.SetJob(job) + + node, _ := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + + if node.Node != one { + t.Fatalf("bad") + } + met := ctx.Metrics() if met.NodesExhausted != 1 { t.Fatalf("bad: %#v", met) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go new file mode 100644 index 00000000000..d3f6fb27e60 --- /dev/null +++ b/scheduler/system_sched.go @@ -0,0 +1,265 @@ +package scheduler + +import ( + "fmt" + "log" + + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // maxSystemScheduleAttempts is used to limit the number of times + // we will attempt to schedule if we continue to hit conflicts for system + // jobs. + maxSystemScheduleAttempts = 5 + + // allocNodeTainted is the status used when stopping an alloc because it's + // node is tainted. + allocNodeTainted = "system alloc not needed as node is tainted" +) + +// SystemScheduler is used for 'system' jobs. This scheduler is +// designed for services that should be run on every client. +type SystemScheduler struct { + logger *log.Logger + state State + planner Planner + + eval *structs.Evaluation + job *structs.Job + plan *structs.Plan + ctx *EvalContext + stack *SystemStack + nodes []*structs.Node + + limitReached bool + nextEval *structs.Evaluation +} + +// NewSystemScheduler is a factory function to instantiate a new system +// scheduler. +func NewSystemScheduler(logger *log.Logger, state State, planner Planner) Scheduler { + return &SystemScheduler{ + logger: logger, + state: state, + planner: planner, + } +} + +// Process is used to handle a single evaluation. +func (s *SystemScheduler) Process(eval *structs.Evaluation) error { + // Store the evaluation + s.eval = eval + + // Verify the evaluation trigger reason is understood + switch eval.TriggeredBy { + case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, + structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate: + default: + desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", + eval.TriggeredBy) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc) + } + + // Retry up to the maxSystemScheduleAttempts + if err := retryMax(maxSystemScheduleAttempts, s.process); err != nil { + if statusErr, ok := err.(*SetStatusError); ok { + return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error()) + } + return err + } + + // Update the status to complete + return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusComplete, "") +} + +// process is wrapped in retryMax to iteratively run the handler until we have no +// further work or we've made the maximum number of attempts. +func (s *SystemScheduler) process() (bool, error) { + // Lookup the Job by ID + var err error + s.job, err = s.state.JobByID(s.eval.JobID) + if err != nil { + return false, fmt.Errorf("failed to get job '%s': %v", + s.eval.JobID, err) + } + + // Get the ready nodes in the required datacenters + if s.job != nil { + s.nodes, err = readyNodesInDCs(s.state, s.job.Datacenters) + if err != nil { + return false, fmt.Errorf("failed to get ready nodes: %v", err) + } + } + + // Create a plan + s.plan = s.eval.MakePlan(s.job) + + // Create an evaluation context + s.ctx = NewEvalContext(s.state, s.plan, s.logger) + + // Construct the placement stack + s.stack = NewSystemStack(s.ctx) + if s.job != nil { + s.stack.SetJob(s.job) + } + + // Compute the target job allocations + if err := s.computeJobAllocs(); err != nil { + s.logger.Printf("[ERR] sched: %#v: %v", s.eval, err) + return false, err + } + + // If the plan is a no-op, we can bail + if s.plan.IsNoOp() { + return true, nil + } + + // If the limit of placements was reached we need to create an evaluation + // to pickup from here after the stagger period. + if s.limitReached && s.nextEval == nil { + s.nextEval = s.eval.NextRollingEval(s.job.Update.Stagger) + if err := s.planner.CreateEval(s.nextEval); err != nil { + s.logger.Printf("[ERR] sched: %#v failed to make next eval for rolling update: %v", s.eval, err) + return false, err + } + s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID) + } + + // Submit the plan + result, newState, err := s.planner.SubmitPlan(s.plan) + if err != nil { + return false, err + } + + // If we got a state refresh, try again since we have stale data + if newState != nil { + s.logger.Printf("[DEBUG] sched: %#v: refresh forced", s.eval) + s.state = newState + return false, nil + } + + // Try again if the plan was not fully committed, potential conflict + fullCommit, expected, actual := result.FullCommit(s.plan) + if !fullCommit { + s.logger.Printf("[DEBUG] sched: %#v: attempted %d placements, %d placed", + s.eval, expected, actual) + return false, nil + } + + // Success! + return true, nil +} + +// computeJobAllocs is used to reconcile differences between the job, +// existing allocations and node status to update the allocations. +func (s *SystemScheduler) computeJobAllocs() error { + // Lookup the allocations by JobID + allocs, err := s.state.AllocsByJob(s.eval.JobID) + if err != nil { + return fmt.Errorf("failed to get allocs for job '%s': %v", + s.eval.JobID, err) + } + + // Filter out the allocations in a terminal state + allocs = structs.FilterTerminalAllocs(allocs) + + // Determine the tainted nodes containing job allocs + tainted, err := taintedNodes(s.state, allocs) + if err != nil { + return fmt.Errorf("failed to get tainted nodes for job '%s': %v", + s.eval.JobID, err) + } + + // Diff the required and existing allocations + diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs) + s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff) + + // Add all the allocs to stop + for _, e := range diff.stop { + s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded) + } + + // Attempt to do the upgrades in place + diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update) + + // Check if a rolling upgrade strategy is being used + limit := len(diff.update) + if s.job != nil && s.job.Update.Rolling() { + limit = s.job.Update.MaxParallel + } + + // Treat non in-place updates as an eviction and new placement. + s.limitReached = evictAndPlace(s.ctx, diff, diff.update, allocUpdating, &limit) + + // Nothing remaining to do if placement is not required + if len(diff.place) == 0 { + return nil + } + + // Compute the placements + return s.computePlacements(diff.place) +} + +// computePlacements computes placements for allocations +func (s *SystemScheduler) computePlacements(place []allocTuple) error { + nodeByID := make(map[string]*structs.Node, len(s.nodes)) + for _, node := range s.nodes { + nodeByID[node.ID] = node + } + + // Track the failed task groups so that we can coalesce + // the failures together to avoid creating many failed allocs. + failedTG := make(map[*structs.TaskGroup]*structs.Allocation) + + nodes := make([]*structs.Node, 1) + for _, missing := range place { + node, ok := nodeByID[missing.Alloc.NodeID] + if !ok { + return fmt.Errorf("could not find node %q", missing.Alloc.NodeID) + } + + // Update the set of placement ndoes + nodes[0] = node + s.stack.SetNodes(nodes) + + // Attempt to match the task group + option, size := s.stack.Select(missing.TaskGroup) + + if option == nil { + // Check if this task group has already failed + if alloc, ok := failedTG[missing.TaskGroup]; ok { + alloc.Metrics.CoalescedFailures += 1 + continue + } + } + + // Create an allocation for this + alloc := &structs.Allocation{ + ID: structs.GenerateUUID(), + EvalID: s.eval.ID, + Name: missing.Name, + JobID: s.job.ID, + Job: s.job, + TaskGroup: missing.TaskGroup.Name, + Resources: size, + Metrics: s.ctx.Metrics(), + } + + // Set fields based on if we found an allocation option + if option != nil { + alloc.NodeID = option.Node.ID + alloc.TaskResources = option.TaskResources + alloc.DesiredStatus = structs.AllocDesiredStatusRun + alloc.ClientStatus = structs.AllocClientStatusPending + s.plan.AppendAlloc(alloc) + } else { + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + alloc.DesiredDescription = "failed to find a node for placement" + alloc.ClientStatus = structs.AllocClientStatusFailed + s.plan.AppendFailed(alloc) + failedTG[missing.TaskGroup] = alloc + } + } + return nil +} diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go new file mode 100644 index 00000000000..ece805057f1 --- /dev/null +++ b/scheduler/system_sched_test.go @@ -0,0 +1,651 @@ +package scheduler + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestSystemSched_JobRegister(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobRegister_AddNode(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Add a new node. + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a mock evaluation to deal with the node update + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan had no node updates + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != 0 { + t.Log(len(update)) + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan allocated on the new node + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure it allocated on the right node + if _, ok := plan.NodeAllocation[node.ID]; !ok { + t.Fatalf("allocated on wrong node: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + out = structs.FilterTerminalAllocs(out) + if len(out) != 11 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobRegister_AllocFail(t *testing.T) { + h := NewHarness(t) + + // Create NO nodes + // Create a job + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure no plan as this should be a no-op. + if len(h.Plans) != 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobModify(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Add a few terminal status allocations, these should be ignored + var terminal []*structs.Allocation + for i := 0; i < 5; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = "my-job.web[0]" + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + terminal = append(terminal, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), terminal)) + + // Update the job + job2 := mock.SystemJob() + job2.ID = job.ID + + // Update the task, such that it cannot be done in-place + job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted all allocs + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != len(allocs) { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + out = structs.FilterTerminalAllocs(out) + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobModify_Rolling(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Update the job + job2 := mock.SystemJob() + job2.ID = job.ID + job2.Update = structs.UpdateStrategy{ + Stagger: 30 * time.Second, + MaxParallel: 5, + } + + // Update the task, such that it cannot be done in-place + job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted only MaxParallel + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != job2.Update.MaxParallel { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != job2.Update.MaxParallel { + t.Fatalf("bad: %#v", plan) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // Ensure a follow up eval was created + eval = h.Evals[0] + if eval.NextEval == "" { + t.Fatalf("missing next eval") + } + + // Check for create + if len(h.CreateEvals) == 0 { + t.Fatalf("missing created eval") + } + create := h.CreateEvals[0] + if eval.NextEval != create.ID { + t.Fatalf("ID mismatch") + } + if create.PreviousEval != eval.ID { + t.Fatalf("missing previous eval") + } + + if create.TriggeredBy != structs.EvalTriggerRollingUpdate { + t.Fatalf("bad: %#v", create) + } +} + +func TestSystemSched_JobModify_InPlace(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Update the job + job2 := mock.SystemJob() + job2.ID = job.ID + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan did not evict any allocs + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != 0 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan updated the existing allocs + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + for _, p := range planned { + if p.Job != job2 { + t.Fatalf("should update job") + } + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // Verify the network did not change + for _, alloc := range out { + for _, resources := range alloc.TaskResources { + if resources.Networks[0].ReservedPorts[0] != 5000 { + t.Fatalf("bad: %#v", alloc) + } + } + } +} + +func TestSystemSched_JobDeregister(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted the job from all nodes. + for _, node := range nodes { + if len(plan.NodeUpdate[node.ID]) != 1 { + t.Fatalf("bad: %#v", plan) + } + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure no remaining allocations + out = structs.FilterTerminalAllocs(out) + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_NodeDrain(t *testing.T) { + h := NewHarness(t) + + // Register a draining node + node := mock.Node() + node.Drain = true + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Generate a fake job allocated on that node. + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + NodeID: node.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted all allocs + if len(plan.NodeUpdate[node.ID]) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan updated the allocation. + var planned []*structs.Allocation + for _, allocList := range plan.NodeUpdate { + planned = append(planned, allocList...) + } + if len(planned) != 1 { + t.Log(len(planned)) + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure the allocations is stopped + if planned[0].DesiredStatus != structs.AllocDesiredStatusStop { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_RetryLimit(t *testing.T) { + h := NewHarness(t) + h.Planner = &RejectPlan{h} + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure multiple plans + if len(h.Plans) == 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure no allocations placed + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } + + // Should hit the retry limit + h.AssertEvalStatus(t, structs.EvalStatusFailed) +} diff --git a/scheduler/util.go b/scheduler/util.go index cae0043ab74..3525b2f8db1 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -2,6 +2,7 @@ package scheduler import ( "fmt" + "log" "math/rand" "reflect" @@ -19,6 +20,10 @@ type allocTuple struct { // a job requires. This is used to do the count expansion. func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { out := make(map[string]*structs.TaskGroup) + if job == nil { + return out + } + for _, tg := range job.TaskGroups { for i := 0; i < tg.Count; i++ { name := fmt.Sprintf("%s.%s[%d]", job.Name, tg.Name, i) @@ -38,6 +43,14 @@ func (d *diffResult) GoString() string { len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore)) } +func (d *diffResult) Append(other *diffResult) { + d.place = append(d.place, other.place...) + d.update = append(d.update, other.update...) + d.migrate = append(d.migrate, other.migrate...) + d.stop = append(d.stop, other.stop...) + d.ignore = append(d.ignore, other.ignore...) +} + // diffAllocs is used to do a set difference between the target allocations // and the existing allocations. This returns 5 sets of results, the list of // named task groups that need to be placed (no existing allocation), the @@ -117,6 +130,48 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, return result } +// diffSystemAllocs is like diffAllocs however, the allocations in the +// diffResult contain the specific nodeID they should be allocated on. +func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]bool, + allocs []*structs.Allocation) *diffResult { + + // Build a mapping of nodes to all their allocs. + nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) + for _, alloc := range allocs { + nallocs := append(nodeAllocs[alloc.NodeID], alloc) + nodeAllocs[alloc.NodeID] = nallocs + } + + for _, node := range nodes { + if _, ok := nodeAllocs[node.ID]; !ok { + nodeAllocs[node.ID] = nil + } + } + + // Create the required task groups. + required := materializeTaskGroups(job) + + result := &diffResult{} + for nodeID, allocs := range nodeAllocs { + diff := diffAllocs(job, taintedNodes, required, allocs) + + // Mark the alloc as being for a specific node. + for i := range diff.place { + alloc := &diff.place[i] + alloc.Alloc = &structs.Allocation{NodeID: nodeID} + } + + // Migrate does not apply to system jobs and instead should be marked as + // stop because if a node is tainted, the job is invalid on that node. + diff.stop = append(diff.stop, diff.migrate...) + diff.migrate = nil + + result.Append(diff) + } + + return result +} + // readyNodesInDCs returns all the ready nodes in the given datacenters func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, error) { // Index the DCs @@ -242,3 +297,148 @@ func tasksUpdated(a, b *structs.TaskGroup) bool { } return false } + +// setStatus is used to update the status of the evaluation +func setStatus(logger *log.Logger, planner Planner, eval, nextEval *structs.Evaluation, status, desc string) error { + logger.Printf("[DEBUG] sched: %#v: setting status to %s", eval, status) + newEval := eval.Copy() + newEval.Status = status + newEval.StatusDescription = desc + if nextEval != nil { + newEval.NextEval = nextEval.ID + } + return planner.UpdateEval(newEval) +} + +// inplaceUpdate attempts to update allocations in-place where possible. +func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, + stack Stack, updates []allocTuple) []allocTuple { + + n := len(updates) + inplace := 0 + for i := 0; i < n; i++ { + // Get the update + update := updates[i] + + // Check if the task drivers or config has changed, requires + // a rolling upgrade since that cannot be done in-place. + existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name) + if tasksUpdated(update.TaskGroup, existing) { + continue + } + + // Get the existing node + node, err := ctx.State().NodeByID(update.Alloc.NodeID) + if err != nil { + ctx.Logger().Printf("[ERR] sched: %#v failed to get node '%s': %v", + eval, update.Alloc.NodeID, err) + continue + } + if node == nil { + continue + } + + // Set the existing node as the base set + stack.SetNodes([]*structs.Node{node}) + + // Stage an eviction of the current allocation. This is done so that + // the current allocation is discounted when checking for feasability. + // Otherwise we would be trying to fit the tasks current resources and + // updated resources. After select is called we can remove the evict. + ctx.Plan().AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, + allocInPlace) + + // Attempt to match the task group + option, size := stack.Select(update.TaskGroup) + + // Pop the allocation + ctx.Plan().PopUpdate(update.Alloc) + + // Skip if we could not do an in-place update + if option == nil { + continue + } + + // Restore the network offers from the existing allocation. + // We do not allow network resources (reserved/dynamic ports) + // to be updated. This is guarded in taskUpdated, so we can + // safely restore those here. + for task, resources := range option.TaskResources { + existing := update.Alloc.TaskResources[task] + resources.Networks = existing.Networks + } + + // Create a shallow copy + newAlloc := new(structs.Allocation) + *newAlloc = *update.Alloc + + // Update the allocation + newAlloc.EvalID = eval.ID + newAlloc.Job = job + newAlloc.Resources = size + newAlloc.TaskResources = option.TaskResources + newAlloc.Metrics = ctx.Metrics() + newAlloc.DesiredStatus = structs.AllocDesiredStatusRun + newAlloc.ClientStatus = structs.AllocClientStatusPending + ctx.Plan().AppendAlloc(newAlloc) + + // Remove this allocation from the slice + updates[i] = updates[n-1] + i-- + n-- + inplace++ + } + if len(updates) > 0 { + ctx.Logger().Printf("[DEBUG] sched: %#v: %d in-place updates of %d", eval, inplace, len(updates)) + } + return updates[:n] +} + +// evictAndPlace is used to mark allocations for evicts and add them to the +// placement queue. evictAndPlace modifies both the the diffResult and the +// limit. It returns true if the limit has been reached. +func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc string, limit *int) bool { + n := len(allocs) + for i := 0; i < n && i < *limit; i++ { + a := allocs[i] + ctx.Plan().AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc) + diff.place = append(diff.place, a) + } + if n <= *limit { + *limit -= n + return false + } + *limit = 0 + return true +} + +// tgConstrainTuple is used to store the total constraints of a task group. +type tgConstrainTuple struct { + // Holds the combined constraints of the task group and all it's sub-tasks. + constraints []*structs.Constraint + + // The set of required drivers within the task group. + drivers map[string]struct{} + + // The combined resources of all tasks within the task group. + size *structs.Resources +} + +// taskGroupConstraints collects the constraints, drivers and resources required by each +// sub-task to aggregate the TaskGroup totals +func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple { + c := tgConstrainTuple{ + constraints: make([]*structs.Constraint, 0, len(tg.Constraints)), + drivers: make(map[string]struct{}), + size: new(structs.Resources), + } + + c.constraints = append(c.constraints, tg.Constraints...) + for _, task := range tg.Tasks { + c.drivers[task.Driver] = struct{}{} + c.constraints = append(c.constraints, task.Constraints...) + c.size.Add(task.Resources) + } + + return c +} diff --git a/scheduler/util_test.go b/scheduler/util_test.go index a9d894d8c26..417737dcaad 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -2,6 +2,7 @@ package scheduler import ( "fmt" + "log" "os" "reflect" "testing" @@ -109,6 +110,80 @@ func TestDiffAllocs(t *testing.T) { } } +func TestDiffSystemAllocs(t *testing.T) { + job := mock.SystemJob() + + // Create three alive nodes. + nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}, {ID: "baz"}} + + // The "old" job has a previous modify index + oldJob := new(structs.Job) + *oldJob = *job + oldJob.ModifyIndex -= 1 + + tainted := map[string]bool{ + "dead": true, + "baz": false, + } + + allocs := []*structs.Allocation{ + // Update allocation on baz + &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "baz", + Name: "my-job.web[0]", + Job: oldJob, + }, + + // Ignore allocation on bar + &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "bar", + Name: "my-job.web[0]", + Job: job, + }, + + // Stop allocation on dead. + &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "dead", + Name: "my-job.web[0]", + }, + } + + diff := diffSystemAllocs(job, nodes, tainted, allocs) + place := diff.place + update := diff.update + migrate := diff.migrate + stop := diff.stop + ignore := diff.ignore + + // We should update the first alloc + if len(update) != 1 || update[0].Alloc != allocs[0] { + t.Fatalf("bad: %#v", update) + } + + // We should ignore the second alloc + if len(ignore) != 1 || ignore[0].Alloc != allocs[1] { + t.Fatalf("bad: %#v", ignore) + } + + // We should stop the third alloc + if len(stop) != 1 || stop[0].Alloc != allocs[2] { + t.Fatalf("bad: %#v", stop) + } + + // There should be no migrates. + if len(migrate) != 0 { + t.Fatalf("bad: %#v", migrate) + } + + // We should place 1 + if len(place) != 1 { + t.Fatalf("bad: %#v", place) + } +} + func TestReadyNodesInDCs(t *testing.T) { state, err := state.NewStateStore(os.Stderr) if err != nil { @@ -213,18 +288,25 @@ func TestTaintedNodes(t *testing.T) { } func TestShuffleNodes(t *testing.T) { + // Use a large number of nodes to make the probability of shuffling to the + // original order very low. nodes := []*structs.Node{ mock.Node(), mock.Node(), mock.Node(), mock.Node(), mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), } orig := make([]*structs.Node, len(nodes)) copy(orig, nodes) shuffleNodes(nodes) if reflect.DeepEqual(nodes, orig) { - t.Fatalf("shoudl not match") + t.Fatalf("should not match") } } @@ -266,3 +348,296 @@ func TestTasksUpdated(t *testing.T) { t.Fatalf("bad") } } + +func TestEvictAndPlace_LimitLessThanAllocs(t *testing.T) { + _, ctx := testContext(t) + allocs := []allocTuple{ + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + } + diff := &diffResult{} + + limit := 2 + if !evictAndPlace(ctx, diff, allocs, "", &limit) { + t.Fatal("evictAndReplace() should have returned true") + } + + if limit != 0 { + t.Fatal("evictAndReplace() should decremented limit; got %v; want 0", limit) + } + + if len(diff.place) != 2 { + t.Fatal("evictAndReplace() didn't insert into diffResult properly: %v", diff.place) + } +} + +func TestEvictAndPlace_LimitEqualToAllocs(t *testing.T) { + _, ctx := testContext(t) + allocs := []allocTuple{ + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + } + diff := &diffResult{} + + limit := 4 + if evictAndPlace(ctx, diff, allocs, "", &limit) { + t.Fatal("evictAndReplace() should have returned false") + } + + if limit != 0 { + t.Fatal("evictAndReplace() should decremented limit; got %v; want 0", limit) + } + + if len(diff.place) != 4 { + t.Fatal("evictAndReplace() didn't insert into diffResult properly: %v", diff.place) + } +} + +func TestSetStatus(t *testing.T) { + h := NewHarness(t) + logger := log.New(os.Stderr, "", log.LstdFlags) + eval := mock.Eval() + status := "a" + desc := "b" + if err := setStatus(logger, h, eval, nil, status, desc); err != nil { + t.Fatalf("setStatus() failed: %v", err) + } + + if len(h.Evals) != 1 { + t.Fatalf("setStatus() didn't update plan: %v", h.Evals) + } + + newEval := h.Evals[0] + if newEval.ID != eval.ID || newEval.Status != status || newEval.StatusDescription != desc { + t.Fatalf("setStatus() submited invalid eval: %v", newEval) + } + + h = NewHarness(t) + next := mock.Eval() + if err := setStatus(logger, h, eval, next, status, desc); err != nil { + t.Fatalf("setStatus() failed: %v", err) + } + + if len(h.Evals) != 1 { + t.Fatalf("setStatus() didn't update plan: %v", h.Evals) + } + + newEval = h.Evals[0] + if newEval.NextEval != next.ID { + t.Fatalf("setStatus() didn't set nextEval correctly: %v", newEval) + } +} + +func TestInplaceUpdate_ChangedTaskGroup(t *testing.T) { + state, ctx := testContext(t) + eval := mock.Eval() + job := mock.Job() + + node := mock.Node() + noErr(t, state.UpsertNode(1000, node)) + + // Register an alloc + alloc := &structs.Allocation{ + ID: structs.GenerateUUID(), + EvalID: eval.ID, + NodeID: node.ID, + JobID: job.ID, + Job: job, + Resources: &structs.Resources{ + CPU: 2048, + MemoryMB: 2048, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + } + alloc.TaskResources = map[string]*structs.Resources{"web": alloc.Resources} + noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) + + // Create a new task group that prevents in-place updates. + tg := &structs.TaskGroup{} + *tg = *job.TaskGroups[0] + task := &structs.Task{Name: "FOO"} + tg.Tasks = nil + tg.Tasks = append(tg.Tasks, task) + + updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} + stack := NewGenericStack(false, ctx) + + // Do the inplace update. + unplaced := inplaceUpdate(ctx, eval, job, stack, updates) + + if len(unplaced) != 1 { + t.Fatal("inplaceUpdate incorrectly did an inplace update") + } + + if len(ctx.plan.NodeAllocation) != 0 { + t.Fatal("inplaceUpdate incorrectly did an inplace update") + } +} + +func TestInplaceUpdate_NoMatch(t *testing.T) { + state, ctx := testContext(t) + eval := mock.Eval() + job := mock.Job() + + node := mock.Node() + noErr(t, state.UpsertNode(1000, node)) + + // Register an alloc + alloc := &structs.Allocation{ + ID: structs.GenerateUUID(), + EvalID: eval.ID, + NodeID: node.ID, + JobID: job.ID, + Job: job, + Resources: &structs.Resources{ + CPU: 2048, + MemoryMB: 2048, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + } + alloc.TaskResources = map[string]*structs.Resources{"web": alloc.Resources} + noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) + + // Create a new task group that requires too much resources. + tg := &structs.TaskGroup{} + *tg = *job.TaskGroups[0] + resource := &structs.Resources{CPU: 9999} + tg.Tasks[0].Resources = resource + + updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} + stack := NewGenericStack(false, ctx) + + // Do the inplace update. + unplaced := inplaceUpdate(ctx, eval, job, stack, updates) + + if len(unplaced) != 1 { + t.Fatal("inplaceUpdate incorrectly did an inplace update") + } + + if len(ctx.plan.NodeAllocation) != 0 { + t.Fatal("inplaceUpdate incorrectly did an inplace update") + } +} + +func TestInplaceUpdate_Success(t *testing.T) { + state, ctx := testContext(t) + eval := mock.Eval() + job := mock.Job() + + node := mock.Node() + noErr(t, state.UpsertNode(1000, node)) + + // Register an alloc + alloc := &structs.Allocation{ + ID: structs.GenerateUUID(), + EvalID: eval.ID, + NodeID: node.ID, + JobID: job.ID, + Job: job, + Resources: &structs.Resources{ + CPU: 2048, + MemoryMB: 2048, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + } + alloc.TaskResources = map[string]*structs.Resources{"web": alloc.Resources} + noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) + + // Create a new task group that updates the resources. + tg := &structs.TaskGroup{} + *tg = *job.TaskGroups[0] + resource := &structs.Resources{CPU: 737} + tg.Tasks[0].Resources = resource + + updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} + stack := NewGenericStack(false, ctx) + + // Do the inplace update. + unplaced := inplaceUpdate(ctx, eval, job, stack, updates) + + if len(unplaced) != 0 { + t.Fatal("inplaceUpdate did not do an inplace update") + } + + if len(ctx.plan.NodeAllocation) != 1 { + t.Fatal("inplaceUpdate did not do an inplace update") + } +} + +func TestEvictAndPlace_LimitGreaterThanAllocs(t *testing.T) { + _, ctx := testContext(t) + allocs := []allocTuple{ + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + } + diff := &diffResult{} + + limit := 6 + if evictAndPlace(ctx, diff, allocs, "", &limit) { + t.Fatal("evictAndReplace() should have returned false") + } + + if limit != 2 { + t.Fatal("evictAndReplace() should decremented limit; got %v; want 2", limit) + } + + if len(diff.place) != 4 { + t.Fatal("evictAndReplace() didn't insert into diffResult properly: %v", diff.place) + } +} + +func TestTaskGroupConstraints(t *testing.T) { + constr := &structs.Constraint{Hard: true} + constr2 := &structs.Constraint{LTarget: "foo"} + constr3 := &structs.Constraint{Weight: 10} + + tg := &structs.TaskGroup{ + Name: "web", + Count: 10, + Constraints: []*structs.Constraint{constr}, + Tasks: []*structs.Task{ + &structs.Task{ + Driver: "exec", + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + Constraints: []*structs.Constraint{constr2}, + }, + &structs.Task{ + Driver: "docker", + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + Constraints: []*structs.Constraint{constr3}, + }, + }, + } + + // Build the expected values. + expConstr := []*structs.Constraint{constr, constr2, constr3} + expDrivers := map[string]struct{}{"exec": struct{}{}, "docker": struct{}{}} + expSize := &structs.Resources{ + CPU: 1000, + MemoryMB: 512, + } + + actConstrains := taskGroupConstraints(tg) + if !reflect.DeepEqual(actConstrains.constraints, expConstr) { + t.Fatalf("taskGroupConstraints(%v) returned %v; want %v", tg, actConstrains.constraints, expConstr) + } + if !reflect.DeepEqual(actConstrains.drivers, expDrivers) { + t.Fatalf("taskGroupConstraints(%v) returned %v; want %v", tg, actConstrains.drivers, expDrivers) + } + if !reflect.DeepEqual(actConstrains.size, expSize) { + t.Fatalf("taskGroupConstraints(%v) returned %v; want %v", tg, actConstrains.size, expSize) + } + +}