From b24f48a4edc975073847fd86188ae90e2fc83399 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 14 Oct 2015 16:43:06 -0700 Subject: [PATCH 01/10] System scheduler and system stack --- nomad/mock/mock.go | 52 +++ nomad/structs/structs.go | 1 + scheduler/feasible.go | 15 +- scheduler/generic_sched.go | 2 +- scheduler/generic_sched_test.go | 6 +- scheduler/scheduler.go | 1 + scheduler/stack.go | 94 ++++- scheduler/stack_test.go | 202 ++++++++++ scheduler/system_sched.go | 392 +++++++++++++++++++ scheduler/system_sched_test.go | 651 ++++++++++++++++++++++++++++++++ scheduler/util.go | 32 ++ 11 files changed, 1442 insertions(+), 6 deletions(-) create mode 100644 scheduler/system_sched.go create mode 100644 scheduler/system_sched_test.go diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 351d8f7fd86..5ed5af09709 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -109,6 +109,58 @@ func Job() *structs.Job { return job } +func SystemJob() *structs.Job { + job := &structs.Job{ + Region: "global", + ID: structs.GenerateUUID(), + Name: "my-job", + Type: structs.JobTypeSystem, + Priority: 100, + AllAtOnce: false, + Datacenters: []string{"dc1"}, + Constraints: []*structs.Constraint{ + &structs.Constraint{ + Hard: true, + LTarget: "$attr.kernel.name", + RTarget: "linux", + Operand: "=", + }, + }, + TaskGroups: []*structs.TaskGroup{ + &structs.TaskGroup{ + Name: "web", + Tasks: []*structs.Task{ + &structs.Task{ + Name: "web", + Driver: "exec", + Config: map[string]string{ + "command": "/bin/date", + "args": "+%s", + }, + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + MBits: 50, + DynamicPorts: []string{"http"}, + }, + }, + }, + }, + }, + }, + }, + Meta: map[string]string{ + "owner": "armon", + }, + Status: structs.JobStatusPending, + CreateIndex: 42, + ModifyIndex: 99, + } + return job +} + func Eval() *structs.Evaluation { eval := &structs.Evaluation{ ID: structs.GenerateUUID(), diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 09cef210a13..e141bdeb386 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -699,6 +699,7 @@ const ( JobTypeCore = "_core" JobTypeService = "service" JobTypeBatch = "batch" + JobTypeSystem = "system" ) const ( diff --git a/scheduler/feasible.go b/scheduler/feasible.go index cbf811b8cca..559252045f1 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -4,6 +4,7 @@ import ( "fmt" "reflect" "regexp" + "strconv" "strings" "github.com/hashicorp/go-version" @@ -129,10 +130,22 @@ func (iter *DriverIterator) Reset() { func (iter *DriverIterator) hasDrivers(option *structs.Node) bool { for driver := range iter.drivers { driverStr := fmt.Sprintf("driver.%s", driver) - _, ok := option.Attributes[driverStr] + value, ok := option.Attributes[driverStr] if !ok { return false } + + enabled, err := strconv.ParseBool(value) + if err != nil { + iter.ctx.Logger(). + Printf("[WARN] scheduler.DriverIterator: node %v has invalid driver setting %v: %v", + option.ID, driverStr, value) + return false + } + + if !enabled { + return false + } } return true } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index bd5bb81e48e..d29736cb63f 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -275,7 +275,7 @@ func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple { n := len(updates) inplace := 0 for i := 0; i < n; i++ { - // Get the udpate + // Get the update update := updates[i] // Check if the task drivers or config has changed, requires diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index b6cef783795..dfb35cb3c5e 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -22,7 +22,7 @@ func TestServiceSched_JobRegister(t *testing.T) { job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - // Create a mock evaluation to deregister the job + // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, @@ -71,7 +71,7 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - // Create a mock evaluation to deregister the job + // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, @@ -550,7 +550,7 @@ func TestServiceSched_RetryLimit(t *testing.T) { job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - // Create a mock evaluation to deregister the job + // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 8a62d7c85f0..baed71e734e 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -13,6 +13,7 @@ import ( var BuiltinSchedulers = map[string]Factory{ "service": NewServiceScheduler, "batch": NewBatchScheduler, + "system": NewSystemScheduler, } // NewScheduler is used to instantiate and return a new scheduler diff --git a/scheduler/stack.go b/scheduler/stack.go index c1468602f72..e16759bd377 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -109,7 +109,7 @@ func (s *GenericStack) SetNodes(baseNodes []*structs.Node) { // Apply a limit function. This is to avoid scanning *every* possible node. // For batch jobs we only need to evaluate 2 options and depend on the - // powwer of two choices. For services jobs we need to visit "enough". + // power of two choices. For services jobs we need to visit "enough". // Using a log of the total number of nodes is a good restriction, with // at least 2 as the floor limit := 2 @@ -165,3 +165,95 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso s.ctx.Metrics().AllocationTime = time.Since(start) return option, size } + +// SystemStack is the Stack used for the System scheduler. It is designed to +// attempt to make placements on all nodes. +type SystemStack struct { + ctx Context + source *StaticIterator + jobConstraint *ConstraintIterator + taskGroupDrivers *DriverIterator + taskGroupConstraint *ConstraintIterator + binPack *BinPackIterator +} + +// NewSystemStack constructs a stack used for selecting service placements +func NewSystemStack(ctx Context, baseNodes []*structs.Node) *SystemStack { + // Create a new stack + s := &SystemStack{ctx: ctx} + + // Create the source iterator. We visit nodes in a linear order because we + // have to evaluate on all nodes. + s.source = NewStaticIterator(ctx, baseNodes) + + // Attach the job constraints. The job is filled in later. + s.jobConstraint = NewConstraintIterator(ctx, s.source, nil) + + // Filter on task group drivers first as they are faster + s.taskGroupDrivers = NewDriverIterator(ctx, s.jobConstraint, nil) + + // Filter on task group constraints second + s.taskGroupConstraint = NewConstraintIterator(ctx, s.taskGroupDrivers, nil) + + // Upgrade from feasible to rank iterator + rankSource := NewFeasibleRankIterator(ctx, s.taskGroupConstraint) + + // Apply the bin packing, this depends on the resources needed + // by a particular task group. Enable eviction as system jobs are high + // priority. + s.binPack = NewBinPackIterator(ctx, rankSource, true, 0) + + // Set the nodes if given + if len(baseNodes) != 0 { + s.SetNodes(baseNodes) + } + return s +} + +func (s *SystemStack) SetNodes(baseNodes []*structs.Node) { + // Update the set of base nodes + s.source.SetNodes(baseNodes) +} + +func (s *SystemStack) SetJob(job *structs.Job) { + s.jobConstraint.SetConstraints(job.Constraints) + s.binPack.SetPriority(job.Priority) +} + +func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) { + // Reset the binpack selector and context + s.binPack.Reset() + s.ctx.Reset() + start := time.Now() + + // Collect the constraints, drivers and resources required by each + // sub-task to aggregate the TaskGroup totals + constr := make([]*structs.Constraint, 0, len(tg.Constraints)) + drivers := make(map[string]struct{}) + size := new(structs.Resources) + constr = append(constr, tg.Constraints...) + for _, task := range tg.Tasks { + drivers[task.Driver] = struct{}{} + constr = append(constr, task.Constraints...) + size.Add(task.Resources) + } + + // Update the parameters of iterators + s.taskGroupDrivers.SetDrivers(drivers) + s.taskGroupConstraint.SetConstraints(constr) + s.binPack.SetTasks(tg.Tasks) + + // Get the next option that satisfies the constraints. + option := s.binPack.Next() + + // Ensure that the task resources were specified + if option != nil && len(option.TaskResources) != len(tg.Tasks) { + for _, task := range tg.Tasks { + option.SetTaskResources(task, task.Resources) + } + } + + // Store the compute time + s.ctx.Metrics().AllocationTime = time.Since(start) + return option, size +} diff --git a/scheduler/stack_test.go b/scheduler/stack_test.go index 07e94531590..d63306c880c 100644 --- a/scheduler/stack_test.go +++ b/scheduler/stack_test.go @@ -207,3 +207,205 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) { t.Fatalf("bad: %#v", met) } } + +func TestSystemStack_SetNodes(t *testing.T) { + _, ctx := testContext(t) + stack := NewSystemStack(ctx, nil) + + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + stack.SetNodes(nodes) + + out := collectFeasible(stack.source) + if !reflect.DeepEqual(out, nodes) { + t.Fatalf("bad: %#v", out) + } +} + +func TestSystemStack_SetJob(t *testing.T) { + _, ctx := testContext(t) + stack := NewSystemStack(ctx, nil) + + job := mock.Job() + stack.SetJob(job) + + if stack.binPack.priority != job.Priority { + t.Fatalf("bad") + } + if !reflect.DeepEqual(stack.jobConstraint.constraints, job.Constraints) { + t.Fatalf("bad") + } +} + +func TestSystemStack_Select_Size(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + } + stack := NewSystemStack(ctx, nodes) + + job := mock.Job() + stack.SetJob(job) + node, size := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + if size == nil { + t.Fatalf("missing size") + } + + if size.CPU != 500 || size.MemoryMB != 256 { + t.Fatalf("bad: %#v", size) + } + + met := ctx.Metrics() + if met.AllocationTime == 0 { + t.Fatalf("missing time") + } +} + +func TestSystemStack_Select_MetricsReset(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + stack := NewSystemStack(ctx, nodes) + + job := mock.Job() + stack.SetJob(job) + n1, _ := stack.Select(job.TaskGroups[0]) + m1 := ctx.Metrics() + if n1 == nil { + t.Fatalf("missing node %#v", m1) + } + + if m1.NodesEvaluated != 1 { + t.Fatalf("should only be 1") + } + + n2, _ := stack.Select(job.TaskGroups[0]) + m2 := ctx.Metrics() + if n2 == nil { + t.Fatalf("missing node %#v", m2) + } + + // If we don't reset, this would be 2 + if m2.NodesEvaluated != 1 { + t.Fatalf("should only be 2") + } +} + +func TestSystemStack_Select_DriverFilter(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + } + zero := nodes[0] + zero.Attributes["driver.foo"] = "1" + + stack := NewSystemStack(ctx, nodes) + + job := mock.Job() + job.TaskGroups[0].Tasks[0].Driver = "foo" + stack.SetJob(job) + + node, _ := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + + if node.Node != zero { + t.Fatalf("bad") + } + + zero.Attributes["driver.foo"] = "0" + stack = NewSystemStack(ctx, nodes) + stack.SetJob(job) + node, _ = stack.Select(job.TaskGroups[0]) + if node != nil { + t.Fatalf("node not filtered %#v", node) + } +} + +func TestSystemStack_Select_ConstraintFilter(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + zero := nodes[1] + zero.Attributes["kernel.name"] = "freebsd" + + stack := NewSystemStack(ctx, nodes) + + job := mock.Job() + job.Constraints[0].RTarget = "freebsd" + stack.SetJob(job) + + node, _ := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + + if node.Node != zero { + t.Fatalf("bad") + } + + met := ctx.Metrics() + if met.NodesFiltered != 1 { + t.Fatalf("bad: %#v", met) + } + if met.ClassFiltered["linux-medium-pci"] != 1 { + t.Fatalf("bad: %#v", met) + } + if met.ConstraintFiltered["$attr.kernel.name = freebsd"] != 1 { + t.Fatalf("bad: %#v", met) + } +} + +func TestSystemStack_Select_BinPack_Overflow(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + zero := nodes[0] + zero.Reserved = zero.Resources + one := nodes[1] + + stack := NewSystemStack(ctx, nodes) + + job := mock.Job() + stack.SetJob(job) + + node, _ := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + + if node.Node != one { + t.Fatalf("bad") + } + + met := ctx.Metrics() + if met.NodesExhausted != 1 { + t.Fatalf("bad: %#v", met) + } + if met.ClassExhausted["linux-medium-pci"] != 1 { + t.Fatalf("bad: %#v", met) + } + if len(met.Scores) != 1 { + t.Fatalf("bad: %#v", met) + } +} diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go new file mode 100644 index 00000000000..b36983f2728 --- /dev/null +++ b/scheduler/system_sched.go @@ -0,0 +1,392 @@ +package scheduler + +import ( + "fmt" + "log" + + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // maxSystemScheduleAttempts is used to limit the number of times + // we will attempt to schedule if we continue to hit conflicts for system + // jobs. + maxSystemScheduleAttempts = 2 + + // allocNodeTainted is the status used when stopping an alloc because it's + // node is tainted. + allocNodeTainted = "system alloc not needed as node is tainted" +) + +// SystemScheduler is used for 'system' jobs. This scheduler is +// designed for services that should be run on every client. +type SystemScheduler struct { + logger *log.Logger + state State + planner Planner + + eval *structs.Evaluation + job *structs.Job + plan *structs.Plan + ctx *EvalContext + stack *SystemStack + nodes []*structs.Node + + limitReached bool + nextEval *structs.Evaluation +} + +// NewSystemScheduler is a factory function to instantiate a new system +// scheduler. +func NewSystemScheduler(logger *log.Logger, state State, planner Planner) Scheduler { + return &SystemScheduler{ + logger: logger, + state: state, + planner: planner, + } +} + +// setStatus is used to update the status of the evaluation +func (s *SystemScheduler) setStatus(status, desc string) error { + s.logger.Printf("[DEBUG] sched: %#v: setting status to %s", s.eval, status) + newEval := s.eval.Copy() + newEval.Status = status + newEval.StatusDescription = desc + if s.nextEval != nil { + newEval.NextEval = s.nextEval.ID + } + return s.planner.UpdateEval(newEval) +} + +// Process is used to handle a single evaluation. +func (s *SystemScheduler) Process(eval *structs.Evaluation) error { + // Store the evaluation + s.eval = eval + + // Verify the evaluation trigger reason is understood + switch eval.TriggeredBy { + case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, + structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate: + default: + desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", + eval.TriggeredBy) + return s.setStatus(structs.EvalStatusFailed, desc) + } + + // Retry up to the maxSystemScheduleAttempts + if err := retryMax(maxSystemScheduleAttempts, s.process); err != nil { + if statusErr, ok := err.(*SetStatusError); ok { + return s.setStatus(statusErr.EvalStatus, err.Error()) + } + return err + } + + // Update the status to complete + return s.setStatus(structs.EvalStatusComplete, "") +} + +// process is wrapped in retryMax to iteratively run the handler until we have no +// further work or we've made the maximum number of attempts. +func (s *SystemScheduler) process() (bool, error) { + // Lookup the Job by ID + var err error + s.job, err = s.state.JobByID(s.eval.JobID) + if err != nil { + return false, fmt.Errorf("failed to get job '%s': %v", + s.eval.JobID, err) + } + + // Get the ready nodes in the required datacenters + if s.job != nil { + s.nodes, err = readyNodesInDCs(s.state, s.job.Datacenters) + if err != nil { + return false, fmt.Errorf("failed to get ready nodes: %v", err) + } + } + + // Create a plan + s.plan = s.eval.MakePlan(s.job) + + // Create an evaluation context + s.ctx = NewEvalContext(s.state, s.plan, s.logger) + + // Construct the placement stack + s.stack = NewSystemStack(s.ctx, nil) + if s.job != nil { + s.stack.SetJob(s.job) + } + + // Compute the target job allocations + if err := s.computeJobAllocs(); err != nil { + s.logger.Printf("[ERR] sched: %#v: %v", s.eval, err) + return false, err + } + + // If the plan is a no-op, we can bail + if s.plan.IsNoOp() { + return true, nil + } + + // If the limit of placements was reached we need to create an evaluation + // to pickup from here after the stagger period. + if s.limitReached && s.nextEval == nil { + s.nextEval = s.eval.NextRollingEval(s.job.Update.Stagger) + if err := s.planner.CreateEval(s.nextEval); err != nil { + s.logger.Printf("[ERR] sched: %#v failed to make next eval for rolling update: %v", s.eval, err) + return false, err + } + s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID) + } + + // Submit the plan + result, newState, err := s.planner.SubmitPlan(s.plan) + if err != nil { + return false, err + } + + // If we got a state refresh, try again since we have stale data + if newState != nil { + s.logger.Printf("[DEBUG] sched: %#v: refresh forced", s.eval) + s.state = newState + return false, nil + } + + // Try again if the plan was not fully committed, potential conflict + fullCommit, expected, actual := result.FullCommit(s.plan) + if !fullCommit { + s.logger.Printf("[DEBUG] sched: %#v: attempted %d placements, %d placed", + s.eval, expected, actual) + return false, nil + } + + // Success! + return true, nil +} + +// computeJobAllocs is used to reconcile differences between the job, +// existing allocations and node status to update the allocations. +func (s *SystemScheduler) computeJobAllocs() error { + // Materialize all the task groups per node. + var groups map[string]*structs.TaskGroup + if s.job != nil { + groups = materializeSystemTaskGroups(s.job, s.nodes) + } + + // Lookup the allocations by JobID + allocs, err := s.state.AllocsByJob(s.eval.JobID) + if err != nil { + return fmt.Errorf("failed to get allocs for job '%s': %v", + s.eval.JobID, err) + } + + // Filter out the allocations in a terminal state + allocs = structs.FilterTerminalAllocs(allocs) + + // Determine the tainted nodes containing job allocs + tainted, err := taintedNodes(s.state, allocs) + if err != nil { + return fmt.Errorf("failed to get tainted nodes for job '%s': %v", + s.eval.JobID, err) + } + + // Diff the required and existing allocations + diff := diffAllocs(s.job, tainted, groups, allocs) + s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff) + + // Add all the allocs to stop + for _, e := range diff.stop { + s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded) + } + + // Also stop all the allocs that are marked as needing migrating. This + // allows failed nodes to be properly GC'd. + for _, e := range diff.migrate { + s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNodeTainted) + } + + // Attempt to do the upgrades in place + diff.update = s.inplaceUpdate(diff.update) + + // Check if a rolling upgrade strategy is being used + limit := len(diff.update) + if s.job != nil && s.job.Update.Rolling() { + limit = s.job.Update.MaxParallel + } + + // Treat non in-place updates as an eviction and new placement. + s.evictAndPlace(diff, diff.update, allocUpdating, &limit) + + // Nothing remaining to do if placement is not required + if len(diff.place) == 0 { + return nil + } + + // Compute the placements + return s.computePlacements(diff.place) +} + +// evictAndPlace is used to mark allocations for evicts and add them to the placement queue +func (s *SystemScheduler) evictAndPlace(diff *diffResult, allocs []allocTuple, desc string, limit *int) { + n := len(allocs) + for i := 0; i < n && i < *limit; i++ { + a := allocs[i] + s.plan.AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc) + diff.place = append(diff.place, a) + } + if n <= *limit { + *limit -= n + } else { + *limit = 0 + s.limitReached = true + } +} + +// inplaceUpdate attempts to update allocations in-place where possible. +func (s *SystemScheduler) inplaceUpdate(updates []allocTuple) []allocTuple { + n := len(updates) + inplace := 0 + for i := 0; i < n; i++ { + // Get the update + update := updates[i] + + // Check if the task drivers or config has changed, requires + // a rolling upgrade since that cannot be done in-place. + existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name) + if tasksUpdated(update.TaskGroup, existing) { + continue + } + + // Get the existing node + node, err := s.state.NodeByID(update.Alloc.NodeID) + if err != nil { + s.logger.Printf("[ERR] sched: %#v failed to get node '%s': %v", + s.eval, update.Alloc.NodeID, err) + continue + } + if node == nil { + continue + } + + // Set the existing node as the base set + s.stack.SetNodes([]*structs.Node{node}) + + // Stage an eviction of the current allocation + s.plan.AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, + allocInPlace) + + // Attempt to match the task group + option, size := s.stack.Select(update.TaskGroup) + + // Pop the allocation + s.plan.PopUpdate(update.Alloc) + + // Skip if we could not do an in-place update + if option == nil { + continue + } + + // Restore the network offers from the existing allocation. + // We do not allow network resources (reserved/dynamic ports) + // to be updated. This is guarded in taskUpdated, so we can + // safely restore those here. + for task, resources := range option.TaskResources { + existing := update.Alloc.TaskResources[task] + resources.Networks = existing.Networks + } + + // Create a shallow copy + newAlloc := new(structs.Allocation) + *newAlloc = *update.Alloc + + // Update the allocation + newAlloc.EvalID = s.eval.ID + newAlloc.Job = s.job + newAlloc.Resources = size + newAlloc.TaskResources = option.TaskResources + newAlloc.Metrics = s.ctx.Metrics() + newAlloc.DesiredStatus = structs.AllocDesiredStatusRun + newAlloc.ClientStatus = structs.AllocClientStatusPending + s.plan.AppendAlloc(newAlloc) + + // Remove this allocation from the slice + updates[i] = updates[n-1] + i-- + n-- + inplace++ + } + if len(updates) > 0 { + s.logger.Printf("[DEBUG] sched: %#v: %d in-place updates of %d", s.eval, inplace, len(updates)) + } + return updates[:n] +} + +// computePlacements computes placements for allocations +func (s *SystemScheduler) computePlacements(place []allocTuple) error { + nodeByID := make(map[string]*structs.Node, len(s.nodes)) + for _, node := range s.nodes { + nodeByID[node.ID] = node + } + + // Track the failed task groups so that we can coalesce + // the failures together to avoid creating many failed allocs. + failedTG := make(map[*structs.TaskGroup]*structs.Allocation) + + nodes := make([]*structs.Node, 1) + for _, missing := range place { + // Get the node by looking at the name in the task group. + nodeID, err := extractTaskGroupId(missing.Name) + if err != nil { + s.logger.Printf("[ERR] sched: %#v failed to parse node id from %q: %v", + s.eval, missing.Name, err) + return err + } + + node, ok := nodeByID[nodeID] + if !ok { + return fmt.Errorf("could not find node %q", nodeID) + } + + // Update the set of placement ndoes + nodes[0] = node + s.stack.SetNodes(nodes) + + // Attempt to match the task group + option, size := s.stack.Select(missing.TaskGroup) + + if option == nil { + // Check if this task group has already failed + if alloc, ok := failedTG[missing.TaskGroup]; ok { + alloc.Metrics.CoalescedFailures += 1 + continue + } + } + + // Create an allocation for this + alloc := &structs.Allocation{ + ID: structs.GenerateUUID(), + EvalID: s.eval.ID, + Name: missing.Name, + JobID: s.job.ID, + Job: s.job, + TaskGroup: missing.TaskGroup.Name, + Resources: size, + Metrics: s.ctx.Metrics(), + } + + // Set fields based on if we found an allocation option + if option != nil { + alloc.NodeID = option.Node.ID + alloc.TaskResources = option.TaskResources + alloc.DesiredStatus = structs.AllocDesiredStatusRun + alloc.ClientStatus = structs.AllocClientStatusPending + s.plan.AppendAlloc(alloc) + } else { + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + alloc.DesiredDescription = "failed to find a node for placement" + alloc.ClientStatus = structs.AllocClientStatusFailed + s.plan.AppendFailed(alloc) + failedTG[missing.TaskGroup] = alloc + } + } + return nil +} diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go new file mode 100644 index 00000000000..99f612d079d --- /dev/null +++ b/scheduler/system_sched_test.go @@ -0,0 +1,651 @@ +package scheduler + +import ( + "fmt" + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestSystemSched_JobRegister(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobRegister_AddNode(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Add a new node. + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a mock evaluation to deal with the node update + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan had no node updates + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != 0 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan allocated on the new node + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure it allocated on the right node + if _, ok := plan.NodeAllocation[node.ID]; !ok { + t.Fatalf("allocated on wrong node: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + out = structs.FilterTerminalAllocs(out) + if len(out) != 11 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobRegister_AllocFail(t *testing.T) { + h := NewHarness(t) + + // Create NO nodes + // Create a job + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure no plan as this should be a no-op. + if len(h.Plans) != 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobModify(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Add a few terminal status allocations, these should be ignored + var terminal []*structs.Allocation + for i := 0; i < 5; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", nodes[i].ID) + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + terminal = append(terminal, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), terminal)) + + // Update the job + job2 := mock.SystemJob() + job2.ID = job.ID + + // Update the task, such that it cannot be done in-place + job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted all allocs + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != len(allocs) { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + out = structs.FilterTerminalAllocs(out) + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobModify_Rolling(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Update the job + job2 := mock.SystemJob() + job2.ID = job.ID + job2.Update = structs.UpdateStrategy{ + Stagger: 30 * time.Second, + MaxParallel: 5, + } + + // Update the task, such that it cannot be done in-place + job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted only MaxParallel + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != job2.Update.MaxParallel { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != job2.Update.MaxParallel { + t.Fatalf("bad: %#v", plan) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // Ensure a follow up eval was created + eval = h.Evals[0] + if eval.NextEval == "" { + t.Fatalf("missing next eval") + } + + // Check for create + if len(h.CreateEvals) == 0 { + t.Fatalf("missing created eval") + } + create := h.CreateEvals[0] + if eval.NextEval != create.ID { + t.Fatalf("ID mismatch") + } + if create.PreviousEval != eval.ID { + t.Fatalf("missing previous eval") + } + + if create.TriggeredBy != structs.EvalTriggerRollingUpdate { + t.Fatalf("bad: %#v", create) + } +} + +func TestSystemSched_JobModify_InPlace(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Update the job + job2 := mock.SystemJob() + job2.ID = job.ID + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan did not evict any allocs + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != 0 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan updated the existing allocs + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + for _, p := range planned { + if p.Job != job2 { + t.Fatalf("should update job") + } + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // Verify the network did not change + for _, alloc := range out { + for _, resources := range alloc.TaskResources { + if resources.Networks[0].ReservedPorts[0] != 5000 { + t.Fatalf("bad: %#v", alloc) + } + } + } +} + +func TestSystemSched_JobDeregister(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted the job from all nodes. + for _, node := range nodes { + if len(plan.NodeUpdate[node.ID]) != 1 { + t.Fatalf("bad: %#v", plan) + } + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure no remaining allocations + out = structs.FilterTerminalAllocs(out) + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_NodeDrain(t *testing.T) { + h := NewHarness(t) + + // Register a draining node + node := mock.Node() + node.Drain = true + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Generate a fake job allocated on that node. + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + NodeID: node.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted all allocs + if len(plan.NodeUpdate[node.ID]) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan updated the allocation. + var planned []*structs.Allocation + for _, allocList := range plan.NodeUpdate { + planned = append(planned, allocList...) + } + if len(planned) != 1 { + t.Log(len(planned)) + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure the allocations is stopped + if planned[0].DesiredStatus != structs.AllocDesiredStatusStop { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_RetryLimit(t *testing.T) { + h := NewHarness(t) + h.Planner = &RejectPlan{h} + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure multiple plans + if len(h.Plans) == 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure no allocations placed + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } + + // Should hit the retry limit + h.AssertEvalStatus(t, structs.EvalStatusFailed) +} diff --git a/scheduler/util.go b/scheduler/util.go index cae0043ab74..606b2ea3205 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -4,10 +4,16 @@ import ( "fmt" "math/rand" "reflect" + "regexp" "github.com/hashicorp/nomad/nomad/structs" ) +var ( + // Regex to capture the identifier of a task group name. + taskGroupID = regexp.MustCompile(`.+\..+\[(.*)\]`) +) + // allocTuple is a tuple of the allocation name and potential alloc ID type allocTuple struct { Name string @@ -28,6 +34,32 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { return out } +// materializeSystemTaskGroups is used to materialize all the task groups +// a system job requires. This is used to do the node expansion. +func materializeSystemTaskGroups(job *structs.Job, nodes []*structs.Node) map[string]*structs.TaskGroup { + out := make(map[string]*structs.TaskGroup) + for _, tg := range job.TaskGroups { + for _, node := range nodes { + name := fmt.Sprintf("%s.%s[%s]", job.Name, tg.Name, node.ID) + out[name] = tg + } + } + return out +} + +// extractTaskGroupIdreturns the unique identifier for the task group +// name. It returns the id that distinguishes multiple instantiations of a task +// group. In the case of the system scheduler they will be the nodes name and +// otherwise it will be the tasks count. +func extractTaskGroupId(name string) (string, error) { + matches := taskGroupID.FindStringSubmatch(name) + if len(matches) != 2 { + return "", fmt.Errorf("could not determine task group id from %v: %#v", name, matches) + } + + return matches[1], nil +} + // diffResult is used to return the sets that result from the diff type diffResult struct { place, update, migrate, stop, ignore []allocTuple From 3cd3ac65d1ca867361a035ce5b5464e8eb41fdde Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 14 Oct 2015 16:45:19 -0700 Subject: [PATCH 02/10] Use valid driver values in test --- scheduler/feasible_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 2f091c2f457..d0d0e0ac1c3 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -85,8 +85,8 @@ func TestDriverIterator(t *testing.T) { } static := NewStaticIterator(ctx, nodes) - nodes[0].Attributes["driver.foo"] = "2" - nodes[2].Attributes["driver.foo"] = "2" + nodes[0].Attributes["driver.foo"] = "1" + nodes[2].Attributes["driver.foo"] = "1" drivers := map[string]struct{}{ "exec": struct{}{}, From 5cd9a55bcde45933738ed6d91802917e4bf282af Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 14 Oct 2015 17:26:20 -0700 Subject: [PATCH 03/10] Refactor shared code between schedulers --- scheduler/generic_sched.go | 119 ++----------------------------------- scheduler/system_sched.go | 117 ++---------------------------------- scheduler/util.go | 112 ++++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 225 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index d29736cb63f..e1feb89f196 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -82,18 +82,6 @@ func NewBatchScheduler(logger *log.Logger, state State, planner Planner) Schedul return s } -// setStatus is used to update the status of the evaluation -func (s *GenericScheduler) setStatus(status, desc string) error { - s.logger.Printf("[DEBUG] sched: %#v: setting status to %s", s.eval, status) - newEval := s.eval.Copy() - newEval.Status = status - newEval.StatusDescription = desc - if s.nextEval != nil { - newEval.NextEval = s.nextEval.ID - } - return s.planner.UpdateEval(newEval) -} - // Process is used to handle a single evaluation func (s *GenericScheduler) Process(eval *structs.Evaluation) error { // Store the evaluation @@ -106,7 +94,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) - return s.setStatus(structs.EvalStatusFailed, desc) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc) } // Retry up to the maxScheduleAttempts @@ -116,13 +104,13 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { } if err := retryMax(limit, s.process); err != nil { if statusErr, ok := err.(*SetStatusError); ok { - return s.setStatus(statusErr.EvalStatus, err.Error()) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error()) } return err } // Update the status to complete - return s.setStatus(structs.EvalStatusComplete, "") + return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusComplete, "") } // process is wrapped in retryMax to iteratively run the handler until we have no @@ -231,7 +219,7 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Attempt to do the upgrades in place - diff.update = s.inplaceUpdate(diff.update) + diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update) // Check if a rolling upgrade strategy is being used limit := len(diff.update) + len(diff.migrate) @@ -240,10 +228,10 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Treat migrations as an eviction and a new placement. - s.evictAndPlace(diff, diff.migrate, allocMigrating, &limit) + s.limitReached = evictAndPlace(s.ctx, diff, diff.migrate, allocUpdating, &limit) // Treat non in-place updates as an eviction and new placement. - s.evictAndPlace(diff, diff.update, allocUpdating, &limit) + s.limitReached = evictAndPlace(s.ctx, diff, diff.update, allocUpdating, &limit) // Nothing remaining to do if placement is not required if len(diff.place) == 0 { @@ -254,101 +242,6 @@ func (s *GenericScheduler) computeJobAllocs() error { return s.computePlacements(diff.place) } -// evictAndPlace is used to mark allocations for evicts and add them to the placement queue -func (s *GenericScheduler) evictAndPlace(diff *diffResult, allocs []allocTuple, desc string, limit *int) { - n := len(allocs) - for i := 0; i < n && i < *limit; i++ { - a := allocs[i] - s.plan.AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc) - diff.place = append(diff.place, a) - } - if n <= *limit { - *limit -= n - } else { - *limit = 0 - s.limitReached = true - } -} - -// inplaceUpdate attempts to update allocations in-place where possible. -func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple { - n := len(updates) - inplace := 0 - for i := 0; i < n; i++ { - // Get the update - update := updates[i] - - // Check if the task drivers or config has changed, requires - // a rolling upgrade since that cannot be done in-place. - existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name) - if tasksUpdated(update.TaskGroup, existing) { - continue - } - - // Get the existing node - node, err := s.state.NodeByID(update.Alloc.NodeID) - if err != nil { - s.logger.Printf("[ERR] sched: %#v failed to get node '%s': %v", - s.eval, update.Alloc.NodeID, err) - continue - } - if node == nil { - continue - } - - // Set the existing node as the base set - s.stack.SetNodes([]*structs.Node{node}) - - // Stage an eviction of the current allocation - s.plan.AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, - allocInPlace) - - // Attempt to match the task group - option, size := s.stack.Select(update.TaskGroup) - - // Pop the allocation - s.plan.PopUpdate(update.Alloc) - - // Skip if we could not do an in-place update - if option == nil { - continue - } - - // Restore the network offers from the existing allocation. - // We do not allow network resources (reserved/dynamic ports) - // to be updated. This is guarded in taskUpdated, so we can - // safely restore those here. - for task, resources := range option.TaskResources { - existing := update.Alloc.TaskResources[task] - resources.Networks = existing.Networks - } - - // Create a shallow copy - newAlloc := new(structs.Allocation) - *newAlloc = *update.Alloc - - // Update the allocation - newAlloc.EvalID = s.eval.ID - newAlloc.Job = s.job - newAlloc.Resources = size - newAlloc.TaskResources = option.TaskResources - newAlloc.Metrics = s.ctx.Metrics() - newAlloc.DesiredStatus = structs.AllocDesiredStatusRun - newAlloc.ClientStatus = structs.AllocClientStatusPending - s.plan.AppendAlloc(newAlloc) - - // Remove this allocation from the slice - updates[i] = updates[n-1] - i-- - n-- - inplace++ - } - if len(updates) > 0 { - s.logger.Printf("[DEBUG] sched: %#v: %d in-place updates of %d", s.eval, inplace, len(updates)) - } - return updates[:n] -} - // computePlacements computes placements for allocations func (s *GenericScheduler) computePlacements(place []allocTuple) error { // Get the base nodes diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index b36983f2728..dfa0859db6d 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -46,18 +46,6 @@ func NewSystemScheduler(logger *log.Logger, state State, planner Planner) Schedu } } -// setStatus is used to update the status of the evaluation -func (s *SystemScheduler) setStatus(status, desc string) error { - s.logger.Printf("[DEBUG] sched: %#v: setting status to %s", s.eval, status) - newEval := s.eval.Copy() - newEval.Status = status - newEval.StatusDescription = desc - if s.nextEval != nil { - newEval.NextEval = s.nextEval.ID - } - return s.planner.UpdateEval(newEval) -} - // Process is used to handle a single evaluation. func (s *SystemScheduler) Process(eval *structs.Evaluation) error { // Store the evaluation @@ -70,19 +58,19 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) - return s.setStatus(structs.EvalStatusFailed, desc) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc) } // Retry up to the maxSystemScheduleAttempts if err := retryMax(maxSystemScheduleAttempts, s.process); err != nil { if statusErr, ok := err.(*SetStatusError); ok { - return s.setStatus(statusErr.EvalStatus, err.Error()) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error()) } return err } // Update the status to complete - return s.setStatus(structs.EvalStatusComplete, "") + return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusComplete, "") } // process is wrapped in retryMax to iteratively run the handler until we have no @@ -205,7 +193,7 @@ func (s *SystemScheduler) computeJobAllocs() error { } // Attempt to do the upgrades in place - diff.update = s.inplaceUpdate(diff.update) + diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update) // Check if a rolling upgrade strategy is being used limit := len(diff.update) @@ -214,7 +202,7 @@ func (s *SystemScheduler) computeJobAllocs() error { } // Treat non in-place updates as an eviction and new placement. - s.evictAndPlace(diff, diff.update, allocUpdating, &limit) + s.limitReached = evictAndPlace(s.ctx, diff, diff.update, allocUpdating, &limit) // Nothing remaining to do if placement is not required if len(diff.place) == 0 { @@ -225,101 +213,6 @@ func (s *SystemScheduler) computeJobAllocs() error { return s.computePlacements(diff.place) } -// evictAndPlace is used to mark allocations for evicts and add them to the placement queue -func (s *SystemScheduler) evictAndPlace(diff *diffResult, allocs []allocTuple, desc string, limit *int) { - n := len(allocs) - for i := 0; i < n && i < *limit; i++ { - a := allocs[i] - s.plan.AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc) - diff.place = append(diff.place, a) - } - if n <= *limit { - *limit -= n - } else { - *limit = 0 - s.limitReached = true - } -} - -// inplaceUpdate attempts to update allocations in-place where possible. -func (s *SystemScheduler) inplaceUpdate(updates []allocTuple) []allocTuple { - n := len(updates) - inplace := 0 - for i := 0; i < n; i++ { - // Get the update - update := updates[i] - - // Check if the task drivers or config has changed, requires - // a rolling upgrade since that cannot be done in-place. - existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name) - if tasksUpdated(update.TaskGroup, existing) { - continue - } - - // Get the existing node - node, err := s.state.NodeByID(update.Alloc.NodeID) - if err != nil { - s.logger.Printf("[ERR] sched: %#v failed to get node '%s': %v", - s.eval, update.Alloc.NodeID, err) - continue - } - if node == nil { - continue - } - - // Set the existing node as the base set - s.stack.SetNodes([]*structs.Node{node}) - - // Stage an eviction of the current allocation - s.plan.AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, - allocInPlace) - - // Attempt to match the task group - option, size := s.stack.Select(update.TaskGroup) - - // Pop the allocation - s.plan.PopUpdate(update.Alloc) - - // Skip if we could not do an in-place update - if option == nil { - continue - } - - // Restore the network offers from the existing allocation. - // We do not allow network resources (reserved/dynamic ports) - // to be updated. This is guarded in taskUpdated, so we can - // safely restore those here. - for task, resources := range option.TaskResources { - existing := update.Alloc.TaskResources[task] - resources.Networks = existing.Networks - } - - // Create a shallow copy - newAlloc := new(structs.Allocation) - *newAlloc = *update.Alloc - - // Update the allocation - newAlloc.EvalID = s.eval.ID - newAlloc.Job = s.job - newAlloc.Resources = size - newAlloc.TaskResources = option.TaskResources - newAlloc.Metrics = s.ctx.Metrics() - newAlloc.DesiredStatus = structs.AllocDesiredStatusRun - newAlloc.ClientStatus = structs.AllocClientStatusPending - s.plan.AppendAlloc(newAlloc) - - // Remove this allocation from the slice - updates[i] = updates[n-1] - i-- - n-- - inplace++ - } - if len(updates) > 0 { - s.logger.Printf("[DEBUG] sched: %#v: %d in-place updates of %d", s.eval, inplace, len(updates)) - } - return updates[:n] -} - // computePlacements computes placements for allocations func (s *SystemScheduler) computePlacements(place []allocTuple) error { nodeByID := make(map[string]*structs.Node, len(s.nodes)) diff --git a/scheduler/util.go b/scheduler/util.go index 606b2ea3205..f883b34c56e 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -2,6 +2,7 @@ package scheduler import ( "fmt" + "log" "math/rand" "reflect" "regexp" @@ -274,3 +275,114 @@ func tasksUpdated(a, b *structs.TaskGroup) bool { } return false } + +// setStatus is used to update the status of the evaluation +func setStatus(logger *log.Logger, planner Planner, eval, nextEval *structs.Evaluation, status, desc string) error { + logger.Printf("[DEBUG] sched: %#v: setting status to %s", eval, status) + newEval := eval.Copy() + newEval.Status = status + newEval.StatusDescription = desc + if nextEval != nil { + newEval.NextEval = nextEval.ID + } + return planner.UpdateEval(newEval) +} + +// inplaceUpdate attempts to update allocations in-place where possible. +func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, + stack Stack, updates []allocTuple) []allocTuple { + + n := len(updates) + inplace := 0 + for i := 0; i < n; i++ { + // Get the update + update := updates[i] + + // Check if the task drivers or config has changed, requires + // a rolling upgrade since that cannot be done in-place. + existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name) + if tasksUpdated(update.TaskGroup, existing) { + continue + } + + // Get the existing node + node, err := ctx.State().NodeByID(update.Alloc.NodeID) + if err != nil { + ctx.Logger().Printf("[ERR] sched: %#v failed to get node '%s': %v", + eval, update.Alloc.NodeID, err) + continue + } + if node == nil { + continue + } + + // Set the existing node as the base set + stack.SetNodes([]*structs.Node{node}) + + // Stage an eviction of the current allocation + ctx.Plan().AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, + allocInPlace) + + // Attempt to match the task group + option, size := stack.Select(update.TaskGroup) + + // Pop the allocation + ctx.Plan().PopUpdate(update.Alloc) + + // Skip if we could not do an in-place update + if option == nil { + continue + } + + // Restore the network offers from the existing allocation. + // We do not allow network resources (reserved/dynamic ports) + // to be updated. This is guarded in taskUpdated, so we can + // safely restore those here. + for task, resources := range option.TaskResources { + existing := update.Alloc.TaskResources[task] + resources.Networks = existing.Networks + } + + // Create a shallow copy + newAlloc := new(structs.Allocation) + *newAlloc = *update.Alloc + + // Update the allocation + newAlloc.EvalID = eval.ID + newAlloc.Job = job + newAlloc.Resources = size + newAlloc.TaskResources = option.TaskResources + newAlloc.Metrics = ctx.Metrics() + newAlloc.DesiredStatus = structs.AllocDesiredStatusRun + newAlloc.ClientStatus = structs.AllocClientStatusPending + ctx.Plan().AppendAlloc(newAlloc) + + // Remove this allocation from the slice + updates[i] = updates[n-1] + i-- + n-- + inplace++ + } + if len(updates) > 0 { + ctx.Logger().Printf("[DEBUG] sched: %#v: %d in-place updates of %d", eval, inplace, len(updates)) + } + return updates[:n] +} + +// evictAndPlace is used to mark allocations for evicts and add them to the +// placement queue. evictAndPlace modifies both the the diffResult and the +// limit. It returns true if the limit has been reached. +func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc string, limit *int) bool { + n := len(allocs) + for i := 0; i < n && i < *limit; i++ { + a := allocs[i] + ctx.Plan().AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc) + diff.place = append(diff.place, a) + } + if n <= *limit { + *limit -= n + return false + } + *limit = 0 + return true +} From 5bfb712a7dc29c6f136a8aabbf91b6425a4d9573 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 15 Oct 2015 13:14:44 -0700 Subject: [PATCH 04/10] Add diffSystemAlloc which gives richer information which node to place a system allocation --- nomad/mock/mock.go | 3 +- scheduler/generic_sched.go | 2 +- scheduler/system_sched.go | 28 ++------- scheduler/system_sched_test.go | 16 +++--- scheduler/util.go | 101 ++++++++++++++++++++------------- scheduler/util_test.go | 74 ++++++++++++++++++++++++ 6 files changed, 150 insertions(+), 74 deletions(-) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 5ed5af09709..abfb16c8c2e 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -128,7 +128,8 @@ func SystemJob() *structs.Job { }, TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "web", + Name: "web", + Count: 1, Tasks: []*structs.Task{ &structs.Task{ Name: "web", diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index e1feb89f196..f2a34f6eb6a 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -243,7 +243,7 @@ func (s *GenericScheduler) computeJobAllocs() error { } // computePlacements computes placements for allocations -func (s *GenericScheduler) computePlacements(place []allocTuple) error { +func (s *GenericScheduler) computePlacements(place []*allocTuple) error { // Get the base nodes nodes, err := readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index dfa0859db6d..8abed6aee06 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -154,12 +154,6 @@ func (s *SystemScheduler) process() (bool, error) { // computeJobAllocs is used to reconcile differences between the job, // existing allocations and node status to update the allocations. func (s *SystemScheduler) computeJobAllocs() error { - // Materialize all the task groups per node. - var groups map[string]*structs.TaskGroup - if s.job != nil { - groups = materializeSystemTaskGroups(s.job, s.nodes) - } - // Lookup the allocations by JobID allocs, err := s.state.AllocsByJob(s.eval.JobID) if err != nil { @@ -178,7 +172,7 @@ func (s *SystemScheduler) computeJobAllocs() error { } // Diff the required and existing allocations - diff := diffAllocs(s.job, tainted, groups, allocs) + diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs) s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff) // Add all the allocs to stop @@ -186,12 +180,6 @@ func (s *SystemScheduler) computeJobAllocs() error { s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded) } - // Also stop all the allocs that are marked as needing migrating. This - // allows failed nodes to be properly GC'd. - for _, e := range diff.migrate { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNodeTainted) - } - // Attempt to do the upgrades in place diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update) @@ -214,7 +202,7 @@ func (s *SystemScheduler) computeJobAllocs() error { } // computePlacements computes placements for allocations -func (s *SystemScheduler) computePlacements(place []allocTuple) error { +func (s *SystemScheduler) computePlacements(place []*allocTuple) error { nodeByID := make(map[string]*structs.Node, len(s.nodes)) for _, node := range s.nodes { nodeByID[node.ID] = node @@ -226,17 +214,9 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { nodes := make([]*structs.Node, 1) for _, missing := range place { - // Get the node by looking at the name in the task group. - nodeID, err := extractTaskGroupId(missing.Name) - if err != nil { - s.logger.Printf("[ERR] sched: %#v failed to parse node id from %q: %v", - s.eval, missing.Name, err) - return err - } - - node, ok := nodeByID[nodeID] + node, ok := nodeByID[missing.Alloc.NodeID] if !ok { - return fmt.Errorf("could not find node %q", nodeID) + return fmt.Errorf("could not find node %q", missing.Alloc.NodeID) } // Update the set of placement ndoes diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 99f612d079d..ece805057f1 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -1,7 +1,6 @@ package scheduler import ( - "fmt" "testing" "time" @@ -84,7 +83,7 @@ func TestSystemSched_JobRegister_AddNode(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -119,6 +118,7 @@ func TestSystemSched_JobRegister_AddNode(t *testing.T) { update = append(update, updateList...) } if len(update) != 0 { + t.Log(len(update)) t.Fatalf("bad: %#v", plan) } @@ -200,7 +200,7 @@ func TestSystemSched_JobModify(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -212,7 +212,7 @@ func TestSystemSched_JobModify(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = nodes[i].ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", nodes[i].ID) + alloc.Name = "my-job.web[0]" alloc.DesiredStatus = structs.AllocDesiredStatusFailed terminal = append(terminal, alloc) } @@ -298,7 +298,7 @@ func TestSystemSched_JobModify_Rolling(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -399,7 +399,7 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -492,7 +492,7 @@ func TestSystemSched_JobDeregister(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -553,7 +553,7 @@ func TestSystemSched_NodeDrain(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to deal with drain diff --git a/scheduler/util.go b/scheduler/util.go index f883b34c56e..fc85a4e6301 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -5,16 +5,10 @@ import ( "log" "math/rand" "reflect" - "regexp" "github.com/hashicorp/nomad/nomad/structs" ) -var ( - // Regex to capture the identifier of a task group name. - taskGroupID = regexp.MustCompile(`.+\..+\[(.*)\]`) -) - // allocTuple is a tuple of the allocation name and potential alloc ID type allocTuple struct { Name string @@ -26,44 +20,22 @@ type allocTuple struct { // a job requires. This is used to do the count expansion. func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { out := make(map[string]*structs.TaskGroup) - for _, tg := range job.TaskGroups { - for i := 0; i < tg.Count; i++ { - name := fmt.Sprintf("%s.%s[%d]", job.Name, tg.Name, i) - out[name] = tg - } + if job == nil { + return out } - return out -} -// materializeSystemTaskGroups is used to materialize all the task groups -// a system job requires. This is used to do the node expansion. -func materializeSystemTaskGroups(job *structs.Job, nodes []*structs.Node) map[string]*structs.TaskGroup { - out := make(map[string]*structs.TaskGroup) for _, tg := range job.TaskGroups { - for _, node := range nodes { - name := fmt.Sprintf("%s.%s[%s]", job.Name, tg.Name, node.ID) + for i := 0; i < tg.Count; i++ { + name := fmt.Sprintf("%s.%s[%d]", job.Name, tg.Name, i) out[name] = tg } } return out } -// extractTaskGroupIdreturns the unique identifier for the task group -// name. It returns the id that distinguishes multiple instantiations of a task -// group. In the case of the system scheduler they will be the nodes name and -// otherwise it will be the tasks count. -func extractTaskGroupId(name string) (string, error) { - matches := taskGroupID.FindStringSubmatch(name) - if len(matches) != 2 { - return "", fmt.Errorf("could not determine task group id from %v: %#v", name, matches) - } - - return matches[1], nil -} - // diffResult is used to return the sets that result from the diff type diffResult struct { - place, update, migrate, stop, ignore []allocTuple + place, update, migrate, stop, ignore []*allocTuple } func (d *diffResult) GoString() string { @@ -71,6 +43,14 @@ func (d *diffResult) GoString() string { len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore)) } +func (d *diffResult) Append(other *diffResult) { + d.place = append(d.place, other.place...) + d.update = append(d.update, other.update...) + d.migrate = append(d.migrate, other.migrate...) + d.stop = append(d.stop, other.stop...) + d.ignore = append(d.ignore, other.ignore...) +} + // diffAllocs is used to do a set difference between the target allocations // and the existing allocations. This returns 5 sets of results, the list of // named task groups that need to be placed (no existing allocation), the @@ -93,7 +73,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // If not required, we stop the alloc if !ok { - result.stop = append(result.stop, allocTuple{ + result.stop = append(result.stop, &allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -103,7 +83,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // If we are on a tainted node, we must migrate if taintedNodes[exist.NodeID] { - result.migrate = append(result.migrate, allocTuple{ + result.migrate = append(result.migrate, &allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -116,7 +96,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // if the job definition has changed in a way that affects // this allocation and potentially ignore it. if job.ModifyIndex != exist.Job.ModifyIndex { - result.update = append(result.update, allocTuple{ + result.update = append(result.update, &allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -125,7 +105,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, } // Everything is up-to-date - result.ignore = append(result.ignore, allocTuple{ + result.ignore = append(result.ignore, &allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -141,7 +121,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // is an existing allocation, we would have checked for a potential // update or ignore above. if !ok { - result.place = append(result.place, allocTuple{ + result.place = append(result.place, &allocTuple{ Name: name, TaskGroup: tg, }) @@ -150,6 +130,47 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, return result } +// diffSystemAllocs is like diffAllocs however, the allocations in the +// diffResult contain the specific nodeID they should be allocated on. +func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]bool, + allocs []*structs.Allocation) *diffResult { + + // Build a mapping of nodes to all their allocs. + nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) + for _, alloc := range allocs { + nallocs := append(nodeAllocs[alloc.NodeID], alloc) + nodeAllocs[alloc.NodeID] = nallocs + } + + for _, node := range nodes { + if _, ok := nodeAllocs[node.ID]; !ok { + nodeAllocs[node.ID] = nil + } + } + + // Create the required task groups. + required := materializeTaskGroups(job) + + result := &diffResult{} + for nodeID, allocs := range nodeAllocs { + diff := diffAllocs(job, taintedNodes, required, allocs) + + // Mark the alloc as being for a specific node. + for _, alloc := range diff.place { + alloc.Alloc = &structs.Allocation{NodeID: nodeID} + } + + // Migrate does not apply to system jobs and instead should be marked as + // stop because if a node is tainted, the job is invalid on that node. + diff.stop = append(diff.stop, diff.migrate...) + diff.migrate = nil + + result.Append(diff) + } + + return result +} + // readyNodesInDCs returns all the ready nodes in the given datacenters func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, error) { // Index the DCs @@ -290,7 +311,7 @@ func setStatus(logger *log.Logger, planner Planner, eval, nextEval *structs.Eval // inplaceUpdate attempts to update allocations in-place where possible. func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, - stack Stack, updates []allocTuple) []allocTuple { + stack Stack, updates []*allocTuple) []*allocTuple { n := len(updates) inplace := 0 @@ -372,7 +393,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, // evictAndPlace is used to mark allocations for evicts and add them to the // placement queue. evictAndPlace modifies both the the diffResult and the // limit. It returns true if the limit has been reached. -func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc string, limit *int) bool { +func evictAndPlace(ctx Context, diff *diffResult, allocs []*allocTuple, desc string, limit *int) bool { n := len(allocs) for i := 0; i < n && i < *limit; i++ { a := allocs[i] diff --git a/scheduler/util_test.go b/scheduler/util_test.go index a9d894d8c26..38967c83e96 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -109,6 +109,80 @@ func TestDiffAllocs(t *testing.T) { } } +func TestDiffSystemAllocs(t *testing.T) { + job := mock.SystemJob() + + // Create three alive nodes. + nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}, {ID: "baz"}} + + // The "old" job has a previous modify index + oldJob := new(structs.Job) + *oldJob = *job + oldJob.ModifyIndex -= 1 + + tainted := map[string]bool{ + "dead": true, + "baz": false, + } + + allocs := []*structs.Allocation{ + // Update allocation on baz + &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "baz", + Name: "my-job.web[0]", + Job: oldJob, + }, + + // Ignore allocation on bar + &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "bar", + Name: "my-job.web[0]", + Job: job, + }, + + // Stop allocation on dead. + &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "dead", + Name: "my-job.web[0]", + }, + } + + diff := diffSystemAllocs(job, nodes, tainted, allocs) + place := diff.place + update := diff.update + migrate := diff.migrate + stop := diff.stop + ignore := diff.ignore + + // We should update the first alloc + if len(update) != 1 || update[0].Alloc != allocs[0] { + t.Fatalf("bad: %#v", update) + } + + // We should ignore the second alloc + if len(ignore) != 1 || ignore[0].Alloc != allocs[1] { + t.Fatalf("bad: %#v", ignore) + } + + // We should stop the third alloc + if len(stop) != 1 || stop[0].Alloc != allocs[2] { + t.Fatalf("bad: %#v", stop) + } + + // There should be no migrates. + if len(migrate) != 0 { + t.Fatalf("bad: %#v", migrate) + } + + // We should place 1 + if len(place) != 1 { + t.Fatalf("bad: %#v", place) + } +} + func TestReadyNodesInDCs(t *testing.T) { state, err := state.NewStateStore(os.Stderr) if err != nil { From 0c5ee683e7f60097b3dbdd0f584eeaa5eb7ae400 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 16 Oct 2015 11:36:26 -0700 Subject: [PATCH 05/10] Add negative test to DriverIterator, increase system scheduler attempts, and fix evictAndPlace status message --- scheduler/feasible_test.go | 5 ++++- scheduler/generic_sched.go | 2 +- scheduler/system_sched.go | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index d0d0e0ac1c3..1cf58d1f6b3 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -82,11 +82,14 @@ func TestDriverIterator(t *testing.T) { mock.Node(), mock.Node(), mock.Node(), + mock.Node(), } static := NewStaticIterator(ctx, nodes) nodes[0].Attributes["driver.foo"] = "1" - nodes[2].Attributes["driver.foo"] = "1" + nodes[1].Attributes["driver.foo"] = "0" + nodes[2].Attributes["driver.foo"] = "true" + nodes[3].Attributes["driver.foo"] = "False" drivers := map[string]struct{}{ "exec": struct{}{}, diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index f2a34f6eb6a..10e6de75423 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -228,7 +228,7 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Treat migrations as an eviction and a new placement. - s.limitReached = evictAndPlace(s.ctx, diff, diff.migrate, allocUpdating, &limit) + s.limitReached = evictAndPlace(s.ctx, diff, diff.migrate, allocMigrating, &limit) // Treat non in-place updates as an eviction and new placement. s.limitReached = evictAndPlace(s.ctx, diff, diff.update, allocUpdating, &limit) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 8abed6aee06..07ed5d23d2e 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -11,7 +11,7 @@ const ( // maxSystemScheduleAttempts is used to limit the number of times // we will attempt to schedule if we continue to hit conflicts for system // jobs. - maxSystemScheduleAttempts = 2 + maxSystemScheduleAttempts = 5 // allocNodeTainted is the status used when stopping an alloc because it's // node is tainted. From 5bbe7f67cafcc1ff708466c1315c9423cc3f91bf Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 16 Oct 2015 11:43:09 -0700 Subject: [PATCH 06/10] diffResult stores values not pointers --- scheduler/generic_sched.go | 2 +- scheduler/system_sched.go | 2 +- scheduler/util.go | 19 ++++++++++--------- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 10e6de75423..a74e5baca45 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -243,7 +243,7 @@ func (s *GenericScheduler) computeJobAllocs() error { } // computePlacements computes placements for allocations -func (s *GenericScheduler) computePlacements(place []*allocTuple) error { +func (s *GenericScheduler) computePlacements(place []allocTuple) error { // Get the base nodes nodes, err := readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 07ed5d23d2e..0dfded98b11 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -202,7 +202,7 @@ func (s *SystemScheduler) computeJobAllocs() error { } // computePlacements computes placements for allocations -func (s *SystemScheduler) computePlacements(place []*allocTuple) error { +func (s *SystemScheduler) computePlacements(place []allocTuple) error { nodeByID := make(map[string]*structs.Node, len(s.nodes)) for _, node := range s.nodes { nodeByID[node.ID] = node diff --git a/scheduler/util.go b/scheduler/util.go index fc85a4e6301..48a9d0bc797 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -35,7 +35,7 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { // diffResult is used to return the sets that result from the diff type diffResult struct { - place, update, migrate, stop, ignore []*allocTuple + place, update, migrate, stop, ignore []allocTuple } func (d *diffResult) GoString() string { @@ -73,7 +73,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // If not required, we stop the alloc if !ok { - result.stop = append(result.stop, &allocTuple{ + result.stop = append(result.stop, allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -83,7 +83,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // If we are on a tainted node, we must migrate if taintedNodes[exist.NodeID] { - result.migrate = append(result.migrate, &allocTuple{ + result.migrate = append(result.migrate, allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -96,7 +96,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // if the job definition has changed in a way that affects // this allocation and potentially ignore it. if job.ModifyIndex != exist.Job.ModifyIndex { - result.update = append(result.update, &allocTuple{ + result.update = append(result.update, allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -105,7 +105,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, } // Everything is up-to-date - result.ignore = append(result.ignore, &allocTuple{ + result.ignore = append(result.ignore, allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -121,7 +121,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // is an existing allocation, we would have checked for a potential // update or ignore above. if !ok { - result.place = append(result.place, &allocTuple{ + result.place = append(result.place, allocTuple{ Name: name, TaskGroup: tg, }) @@ -156,7 +156,8 @@ func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[ diff := diffAllocs(job, taintedNodes, required, allocs) // Mark the alloc as being for a specific node. - for _, alloc := range diff.place { + for i := range diff.place { + alloc := &diff.place[i] alloc.Alloc = &structs.Allocation{NodeID: nodeID} } @@ -311,7 +312,7 @@ func setStatus(logger *log.Logger, planner Planner, eval, nextEval *structs.Eval // inplaceUpdate attempts to update allocations in-place where possible. func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, - stack Stack, updates []*allocTuple) []*allocTuple { + stack Stack, updates []allocTuple) []allocTuple { n := len(updates) inplace := 0 @@ -393,7 +394,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, // evictAndPlace is used to mark allocations for evicts and add them to the // placement queue. evictAndPlace modifies both the the diffResult and the // limit. It returns true if the limit has been reached. -func evictAndPlace(ctx Context, diff *diffResult, allocs []*allocTuple, desc string, limit *int) bool { +func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc string, limit *int) bool { n := len(allocs) for i := 0; i < n && i < *limit; i++ { a := allocs[i] From 7feb5f1978886919e57f7c7778bafefa0f2225cd Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 16 Oct 2015 14:00:51 -0700 Subject: [PATCH 07/10] Refactor task group constraint logic in generic/system stack --- scheduler/stack.go | 38 +++++++++----------------------- scheduler/util.go | 31 ++++++++++++++++++++++++++ scheduler/util_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 28 deletions(-) diff --git a/scheduler/stack.go b/scheduler/stack.go index e16759bd377..9cfde83568e 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -134,21 +134,12 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso s.ctx.Reset() start := time.Now() - // Collect the constraints, drivers and resources required by each - // sub-task to aggregate the TaskGroup totals - constr := make([]*structs.Constraint, 0, len(tg.Constraints)) - drivers := make(map[string]struct{}) - size := new(structs.Resources) - constr = append(constr, tg.Constraints...) - for _, task := range tg.Tasks { - drivers[task.Driver] = struct{}{} - constr = append(constr, task.Constraints...) - size.Add(task.Resources) - } + // Get the task groups constraints. + tgConstr := taskGroupConstraints(tg) // Update the parameters of iterators - s.taskGroupDrivers.SetDrivers(drivers) - s.taskGroupConstraint.SetConstraints(constr) + s.taskGroupDrivers.SetDrivers(tgConstr.drivers) + s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.binPack.SetTasks(tg.Tasks) // Find the node with the max score @@ -163,7 +154,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso // Store the compute time s.ctx.Metrics().AllocationTime = time.Since(start) - return option, size + return option, tgConstr.size } // SystemStack is the Stack used for the System scheduler. It is designed to @@ -226,21 +217,12 @@ func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resou s.ctx.Reset() start := time.Now() - // Collect the constraints, drivers and resources required by each - // sub-task to aggregate the TaskGroup totals - constr := make([]*structs.Constraint, 0, len(tg.Constraints)) - drivers := make(map[string]struct{}) - size := new(structs.Resources) - constr = append(constr, tg.Constraints...) - for _, task := range tg.Tasks { - drivers[task.Driver] = struct{}{} - constr = append(constr, task.Constraints...) - size.Add(task.Resources) - } + // Get the task groups constraints. + tgConstr := taskGroupConstraints(tg) // Update the parameters of iterators - s.taskGroupDrivers.SetDrivers(drivers) - s.taskGroupConstraint.SetConstraints(constr) + s.taskGroupDrivers.SetDrivers(tgConstr.drivers) + s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.binPack.SetTasks(tg.Tasks) // Get the next option that satisfies the constraints. @@ -255,5 +237,5 @@ func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resou // Store the compute time s.ctx.Metrics().AllocationTime = time.Since(start) - return option, size + return option, tgConstr.size } diff --git a/scheduler/util.go b/scheduler/util.go index 48a9d0bc797..96c5eb3f623 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -408,3 +408,34 @@ func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc stri *limit = 0 return true } + +// tgConstrainTuple is used to store the total constraints of a task group. +type tgConstrainTuple struct { + // Holds the combined constraints of the task group and all it's sub-tasks. + constraints []*structs.Constraint + + // The set of required drivers within the task group. + drivers map[string]struct{} + + // The combined resources of all tasks within the task group. + size *structs.Resources +} + +// taskGroupConstraints collects the constraints, drivers and resources required by each +// sub-task to aggregate the TaskGroup totals +func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple { + c := tgConstrainTuple{ + constraints: make([]*structs.Constraint, 0, len(tg.Constraints)), + drivers: make(map[string]struct{}), + size: new(structs.Resources), + } + + c.constraints = append(c.constraints, tg.Constraints...) + for _, task := range tg.Tasks { + c.drivers[task.Driver] = struct{}{} + c.constraints = append(c.constraints, task.Constraints...) + c.size.Add(task.Resources) + } + + return c +} diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 38967c83e96..e553a01b861 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -340,3 +340,53 @@ func TestTasksUpdated(t *testing.T) { t.Fatalf("bad") } } + +func TestTaskGroupConstraints(t *testing.T) { + constr := &structs.Constraint{Hard: true} + constr2 := &structs.Constraint{LTarget: "foo"} + constr3 := &structs.Constraint{Weight: 10} + + tg := &structs.TaskGroup{ + Name: "web", + Count: 10, + Constraints: []*structs.Constraint{constr}, + Tasks: []*structs.Task{ + &structs.Task{ + Driver: "exec", + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + Constraints: []*structs.Constraint{constr2}, + }, + &structs.Task{ + Driver: "docker", + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + Constraints: []*structs.Constraint{constr3}, + }, + }, + } + + // Build the expected values. + expConstr := []*structs.Constraint{constr, constr2, constr3} + expDrivers := map[string]struct{}{"exec": struct{}{}, "docker": struct{}{}} + expSize := &structs.Resources{ + CPU: 1000, + MemoryMB: 512, + } + + actConstrains := taskGroupConstraints(tg) + if !reflect.DeepEqual(actConstrains.constraints, expConstr) { + t.Fatalf("taskGroupConstraints(%v) returned %v; want %v", tg, actConstrains.constraints, expConstr) + } + if !reflect.DeepEqual(actConstrains.drivers, expDrivers) { + t.Fatalf("taskGroupConstraints(%v) returned %v; want %v", tg, actConstrains.drivers, expDrivers) + } + if !reflect.DeepEqual(actConstrains.size, expSize) { + t.Fatalf("taskGroupConstraints(%v) returned %v; want %v", tg, actConstrains.size, expSize) + } + +} From a0d3eb779240440f7ae2bbfe3243f315e2606bbb Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 16 Oct 2015 14:15:01 -0700 Subject: [PATCH 08/10] Validate task group count on system scheduler --- nomad/structs/structs.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e141bdeb386..8d58a985784 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -829,6 +829,12 @@ func (j *Job) Validate() error { } else { taskGroups[tg.Name] = idx } + + if j.Type == "system" && tg.Count != 1 { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("Job task group %d has count %d. Only count of 1 is supported with system scheduler", + idx+1, tg.Count)) + } } // Validate the task group From 927efaf4e0522385bc92c1e1684b3ab0fb65f615 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 16 Oct 2015 16:35:55 -0700 Subject: [PATCH 09/10] Unit tests for the refactor scheduler methods --- scheduler/util.go | 5 +- scheduler/util_test.go | 244 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 248 insertions(+), 1 deletion(-) diff --git a/scheduler/util.go b/scheduler/util.go index 96c5eb3f623..3525b2f8db1 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -341,7 +341,10 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, // Set the existing node as the base set stack.SetNodes([]*structs.Node{node}) - // Stage an eviction of the current allocation + // Stage an eviction of the current allocation. This is done so that + // the current allocation is discounted when checking for feasability. + // Otherwise we would be trying to fit the tasks current resources and + // updated resources. After select is called we can remove the evict. ctx.Plan().AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, allocInPlace) diff --git a/scheduler/util_test.go b/scheduler/util_test.go index e553a01b861..9f6c12cad24 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -2,6 +2,7 @@ package scheduler import ( "fmt" + "log" "os" "reflect" "testing" @@ -341,6 +342,249 @@ func TestTasksUpdated(t *testing.T) { } } +func TestEvictAndPlace_LimitLessThanAllocs(t *testing.T) { + _, ctx := testContext(t) + allocs := []allocTuple{ + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + } + diff := &diffResult{} + + limit := 2 + if !evictAndPlace(ctx, diff, allocs, "", &limit) { + t.Fatal("evictAndReplace() should have returned true") + } + + if limit != 0 { + t.Fatal("evictAndReplace() should decremented limit; got %v; want 0", limit) + } + + if len(diff.place) != 2 { + t.Fatal("evictAndReplace() didn't insert into diffResult properly: %v", diff.place) + } +} + +func TestEvictAndPlace_LimitEqualToAllocs(t *testing.T) { + _, ctx := testContext(t) + allocs := []allocTuple{ + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + } + diff := &diffResult{} + + limit := 4 + if evictAndPlace(ctx, diff, allocs, "", &limit) { + t.Fatal("evictAndReplace() should have returned false") + } + + if limit != 0 { + t.Fatal("evictAndReplace() should decremented limit; got %v; want 0", limit) + } + + if len(diff.place) != 4 { + t.Fatal("evictAndReplace() didn't insert into diffResult properly: %v", diff.place) + } +} + +func TestSetStatus(t *testing.T) { + h := NewHarness(t) + logger := log.New(os.Stderr, "", log.LstdFlags) + eval := mock.Eval() + status := "a" + desc := "b" + if err := setStatus(logger, h, eval, nil, status, desc); err != nil { + t.Fatalf("setStatus() failed: %v", err) + } + + if len(h.Evals) != 1 { + t.Fatalf("setStatus() didn't update plan: %v", h.Evals) + } + + newEval := h.Evals[0] + if newEval.ID != eval.ID || newEval.Status != status || newEval.StatusDescription != desc { + t.Fatalf("setStatus() submited invalid eval: %v", newEval) + } + + h = NewHarness(t) + next := mock.Eval() + if err := setStatus(logger, h, eval, next, status, desc); err != nil { + t.Fatalf("setStatus() failed: %v", err) + } + + if len(h.Evals) != 1 { + t.Fatalf("setStatus() didn't update plan: %v", h.Evals) + } + + newEval = h.Evals[0] + if newEval.NextEval != next.ID { + t.Fatalf("setStatus() didn't set nextEval correctly: %v", newEval) + } +} + +func TestInplaceUpdate_ChangedTaskGroup(t *testing.T) { + state, ctx := testContext(t) + eval := mock.Eval() + job := mock.Job() + + node := mock.Node() + noErr(t, state.UpsertNode(1000, node)) + + // Register an alloc + alloc := &structs.Allocation{ + ID: structs.GenerateUUID(), + EvalID: eval.ID, + NodeID: node.ID, + JobID: job.ID, + Job: job, + Resources: &structs.Resources{ + CPU: 2048, + MemoryMB: 2048, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + } + alloc.TaskResources = map[string]*structs.Resources{"web": alloc.Resources} + noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) + + // Create a new task group that prevents in-place updates. + tg := &structs.TaskGroup{} + *tg = *job.TaskGroups[0] + task := &structs.Task{Name: "FOO"} + tg.Tasks = nil + tg.Tasks = append(tg.Tasks, task) + + updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} + stack := NewGenericStack(false, ctx, nil) + + // Do the inplace update. + unplaced := inplaceUpdate(ctx, eval, job, stack, updates) + + if len(unplaced) != 1 { + t.Fatal("inplaceUpdate incorrectly did an inplace update") + } + + if len(ctx.plan.NodeAllocation) != 0 { + t.Fatal("inplaceUpdate incorrectly did an inplace update") + } +} + +func TestInplaceUpdate_NoMatch(t *testing.T) { + state, ctx := testContext(t) + eval := mock.Eval() + job := mock.Job() + + node := mock.Node() + noErr(t, state.UpsertNode(1000, node)) + + // Register an alloc + alloc := &structs.Allocation{ + ID: structs.GenerateUUID(), + EvalID: eval.ID, + NodeID: node.ID, + JobID: job.ID, + Job: job, + Resources: &structs.Resources{ + CPU: 2048, + MemoryMB: 2048, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + } + alloc.TaskResources = map[string]*structs.Resources{"web": alloc.Resources} + noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) + + // Create a new task group that requires too much resources. + tg := &structs.TaskGroup{} + *tg = *job.TaskGroups[0] + resource := &structs.Resources{CPU: 9999} + tg.Tasks[0].Resources = resource + + updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} + stack := NewGenericStack(false, ctx, nil) + + // Do the inplace update. + unplaced := inplaceUpdate(ctx, eval, job, stack, updates) + + if len(unplaced) != 1 { + t.Fatal("inplaceUpdate incorrectly did an inplace update") + } + + if len(ctx.plan.NodeAllocation) != 0 { + t.Fatal("inplaceUpdate incorrectly did an inplace update") + } +} + +func TestInplaceUpdate_Success(t *testing.T) { + state, ctx := testContext(t) + eval := mock.Eval() + job := mock.Job() + + node := mock.Node() + noErr(t, state.UpsertNode(1000, node)) + + // Register an alloc + alloc := &structs.Allocation{ + ID: structs.GenerateUUID(), + EvalID: eval.ID, + NodeID: node.ID, + JobID: job.ID, + Job: job, + Resources: &structs.Resources{ + CPU: 2048, + MemoryMB: 2048, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + } + alloc.TaskResources = map[string]*structs.Resources{"web": alloc.Resources} + noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) + + // Create a new task group that updates the resources. + tg := &structs.TaskGroup{} + *tg = *job.TaskGroups[0] + resource := &structs.Resources{CPU: 737} + tg.Tasks[0].Resources = resource + + updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} + stack := NewGenericStack(false, ctx, nil) + + // Do the inplace update. + unplaced := inplaceUpdate(ctx, eval, job, stack, updates) + + if len(unplaced) != 0 { + t.Fatal("inplaceUpdate did not do an inplace update") + } + + if len(ctx.plan.NodeAllocation) != 1 { + t.Fatal("inplaceUpdate did not do an inplace update") + } +} + +func TestEvictAndPlace_LimitGreaterThanAllocs(t *testing.T) { + _, ctx := testContext(t) + allocs := []allocTuple{ + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + allocTuple{Alloc: &structs.Allocation{ID: structs.GenerateUUID()}}, + } + diff := &diffResult{} + + limit := 6 + if evictAndPlace(ctx, diff, allocs, "", &limit) { + t.Fatal("evictAndReplace() should have returned false") + } + + if limit != 2 { + t.Fatal("evictAndReplace() should decremented limit; got %v; want 2", limit) + } + + if len(diff.place) != 4 { + t.Fatal("evictAndReplace() didn't insert into diffResult properly: %v", diff.place) + } +} + func TestTaskGroupConstraints(t *testing.T) { constr := &structs.Constraint{Hard: true} constr2 := &structs.Constraint{LTarget: "foo"} From 240510132837ccaf4c026892d88bfbdeb87e6094 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 16 Oct 2015 17:05:23 -0700 Subject: [PATCH 10/10] Remove base nodes from stack constructors --- scheduler/generic_sched.go | 2 +- scheduler/stack.go | 18 ++++----------- scheduler/stack_test.go | 45 +++++++++++++++++++++++--------------- scheduler/system_sched.go | 2 +- scheduler/util_test.go | 15 +++++++++---- 5 files changed, 44 insertions(+), 38 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index a74e5baca45..7957f2360ff 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -131,7 +131,7 @@ func (s *GenericScheduler) process() (bool, error) { s.ctx = NewEvalContext(s.state, s.plan, s.logger) // Construct the placement stack - s.stack = NewGenericStack(s.batch, s.ctx, nil) + s.stack = NewGenericStack(s.batch, s.ctx) if s.job != nil { s.stack.SetJob(s.job) } diff --git a/scheduler/stack.go b/scheduler/stack.go index 9cfde83568e..2cda626c488 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -48,7 +48,7 @@ type GenericStack struct { } // NewGenericStack constructs a stack used for selecting service placements -func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *GenericStack { +func NewGenericStack(batch bool, ctx Context) *GenericStack { // Create a new stack s := &GenericStack{ batch: batch, @@ -58,7 +58,7 @@ func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *Generi // Create the source iterator. We randomize the order we visit nodes // to reduce collisions between schedulers and to do a basic load // balancing across eligible nodes. - s.source = NewRandomIterator(ctx, baseNodes) + s.source = NewRandomIterator(ctx, nil) // Attach the job constraints. The job is filled in later. s.jobConstraint = NewConstraintIterator(ctx, s.source, nil) @@ -92,11 +92,6 @@ func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *Generi // Select the node with the maximum score for placement s.maxScore = NewMaxScoreIterator(ctx, s.limit) - - // Set the nodes if given - if len(baseNodes) != 0 { - s.SetNodes(baseNodes) - } return s } @@ -169,13 +164,13 @@ type SystemStack struct { } // NewSystemStack constructs a stack used for selecting service placements -func NewSystemStack(ctx Context, baseNodes []*structs.Node) *SystemStack { +func NewSystemStack(ctx Context) *SystemStack { // Create a new stack s := &SystemStack{ctx: ctx} // Create the source iterator. We visit nodes in a linear order because we // have to evaluate on all nodes. - s.source = NewStaticIterator(ctx, baseNodes) + s.source = NewStaticIterator(ctx, nil) // Attach the job constraints. The job is filled in later. s.jobConstraint = NewConstraintIterator(ctx, s.source, nil) @@ -193,11 +188,6 @@ func NewSystemStack(ctx Context, baseNodes []*structs.Node) *SystemStack { // by a particular task group. Enable eviction as system jobs are high // priority. s.binPack = NewBinPackIterator(ctx, rankSource, true, 0) - - // Set the nodes if given - if len(baseNodes) != 0 { - s.SetNodes(baseNodes) - } return s } diff --git a/scheduler/stack_test.go b/scheduler/stack_test.go index d63306c880c..7b4b8acb324 100644 --- a/scheduler/stack_test.go +++ b/scheduler/stack_test.go @@ -10,7 +10,7 @@ import ( func TestServiceStack_SetNodes(t *testing.T) { _, ctx := testContext(t) - stack := NewGenericStack(false, ctx, nil) + stack := NewGenericStack(false, ctx) nodes := []*structs.Node{ mock.Node(), @@ -37,7 +37,7 @@ func TestServiceStack_SetNodes(t *testing.T) { func TestServiceStack_SetJob(t *testing.T) { _, ctx := testContext(t) - stack := NewGenericStack(false, ctx, nil) + stack := NewGenericStack(false, ctx) job := mock.Job() stack.SetJob(job) @@ -55,7 +55,8 @@ func TestServiceStack_Select_Size(t *testing.T) { nodes := []*structs.Node{ mock.Node(), } - stack := NewGenericStack(false, ctx, nodes) + stack := NewGenericStack(false, ctx) + stack.SetNodes(nodes) job := mock.Job() stack.SetJob(job) @@ -85,7 +86,8 @@ func TestServiceStack_Select_MetricsReset(t *testing.T) { mock.Node(), mock.Node(), } - stack := NewGenericStack(false, ctx, nodes) + stack := NewGenericStack(false, ctx) + stack.SetNodes(nodes) job := mock.Job() stack.SetJob(job) @@ -120,7 +122,8 @@ func TestServiceStack_Select_DriverFilter(t *testing.T) { zero := nodes[0] zero.Attributes["driver.foo"] = "1" - stack := NewGenericStack(false, ctx, nodes) + stack := NewGenericStack(false, ctx) + stack.SetNodes(nodes) job := mock.Job() job.TaskGroups[0].Tasks[0].Driver = "foo" @@ -145,7 +148,8 @@ func TestServiceStack_Select_ConstraintFilter(t *testing.T) { zero := nodes[0] zero.Attributes["kernel.name"] = "freebsd" - stack := NewGenericStack(false, ctx, nodes) + stack := NewGenericStack(false, ctx) + stack.SetNodes(nodes) job := mock.Job() job.Constraints[0].RTarget = "freebsd" @@ -182,7 +186,8 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) { one := nodes[1] one.Reserved = one.Resources - stack := NewGenericStack(false, ctx, nodes) + stack := NewGenericStack(false, ctx) + stack.SetNodes(nodes) job := mock.Job() stack.SetJob(job) @@ -210,7 +215,7 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) { func TestSystemStack_SetNodes(t *testing.T) { _, ctx := testContext(t) - stack := NewSystemStack(ctx, nil) + stack := NewSystemStack(ctx) nodes := []*structs.Node{ mock.Node(), @@ -232,7 +237,7 @@ func TestSystemStack_SetNodes(t *testing.T) { func TestSystemStack_SetJob(t *testing.T) { _, ctx := testContext(t) - stack := NewSystemStack(ctx, nil) + stack := NewSystemStack(ctx) job := mock.Job() stack.SetJob(job) @@ -247,10 +252,9 @@ func TestSystemStack_SetJob(t *testing.T) { func TestSystemStack_Select_Size(t *testing.T) { _, ctx := testContext(t) - nodes := []*structs.Node{ - mock.Node(), - } - stack := NewSystemStack(ctx, nodes) + nodes := []*structs.Node{mock.Node()} + stack := NewSystemStack(ctx) + stack.SetNodes(nodes) job := mock.Job() stack.SetJob(job) @@ -280,7 +284,8 @@ func TestSystemStack_Select_MetricsReset(t *testing.T) { mock.Node(), mock.Node(), } - stack := NewSystemStack(ctx, nodes) + stack := NewSystemStack(ctx) + stack.SetNodes(nodes) job := mock.Job() stack.SetJob(job) @@ -314,7 +319,8 @@ func TestSystemStack_Select_DriverFilter(t *testing.T) { zero := nodes[0] zero.Attributes["driver.foo"] = "1" - stack := NewSystemStack(ctx, nodes) + stack := NewSystemStack(ctx) + stack.SetNodes(nodes) job := mock.Job() job.TaskGroups[0].Tasks[0].Driver = "foo" @@ -330,7 +336,8 @@ func TestSystemStack_Select_DriverFilter(t *testing.T) { } zero.Attributes["driver.foo"] = "0" - stack = NewSystemStack(ctx, nodes) + stack = NewSystemStack(ctx) + stack.SetNodes(nodes) stack.SetJob(job) node, _ = stack.Select(job.TaskGroups[0]) if node != nil { @@ -347,7 +354,8 @@ func TestSystemStack_Select_ConstraintFilter(t *testing.T) { zero := nodes[1] zero.Attributes["kernel.name"] = "freebsd" - stack := NewSystemStack(ctx, nodes) + stack := NewSystemStack(ctx) + stack.SetNodes(nodes) job := mock.Job() job.Constraints[0].RTarget = "freebsd" @@ -384,7 +392,8 @@ func TestSystemStack_Select_BinPack_Overflow(t *testing.T) { zero.Reserved = zero.Resources one := nodes[1] - stack := NewSystemStack(ctx, nodes) + stack := NewSystemStack(ctx) + stack.SetNodes(nodes) job := mock.Job() stack.SetJob(job) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 0dfded98b11..d3f6fb27e60 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -99,7 +99,7 @@ func (s *SystemScheduler) process() (bool, error) { s.ctx = NewEvalContext(s.state, s.plan, s.logger) // Construct the placement stack - s.stack = NewSystemStack(s.ctx, nil) + s.stack = NewSystemStack(s.ctx) if s.job != nil { s.stack.SetJob(s.job) } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 9f6c12cad24..417737dcaad 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -288,18 +288,25 @@ func TestTaintedNodes(t *testing.T) { } func TestShuffleNodes(t *testing.T) { + // Use a large number of nodes to make the probability of shuffling to the + // original order very low. nodes := []*structs.Node{ mock.Node(), mock.Node(), mock.Node(), mock.Node(), mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), } orig := make([]*structs.Node, len(nodes)) copy(orig, nodes) shuffleNodes(nodes) if reflect.DeepEqual(nodes, orig) { - t.Fatalf("shoudl not match") + t.Fatalf("should not match") } } @@ -457,7 +464,7 @@ func TestInplaceUpdate_ChangedTaskGroup(t *testing.T) { tg.Tasks = append(tg.Tasks, task) updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} - stack := NewGenericStack(false, ctx, nil) + stack := NewGenericStack(false, ctx) // Do the inplace update. unplaced := inplaceUpdate(ctx, eval, job, stack, updates) @@ -502,7 +509,7 @@ func TestInplaceUpdate_NoMatch(t *testing.T) { tg.Tasks[0].Resources = resource updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} - stack := NewGenericStack(false, ctx, nil) + stack := NewGenericStack(false, ctx) // Do the inplace update. unplaced := inplaceUpdate(ctx, eval, job, stack, updates) @@ -547,7 +554,7 @@ func TestInplaceUpdate_Success(t *testing.T) { tg.Tasks[0].Resources = resource updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} - stack := NewGenericStack(false, ctx, nil) + stack := NewGenericStack(false, ctx) // Do the inplace update. unplaced := inplaceUpdate(ctx, eval, job, stack, updates)