From 86b0304fc29564904d05ad2628c762dd1add0aa7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 7 Mar 2017 14:20:02 -0800 Subject: [PATCH 1/8] Unoptimized implementation + testing --- jobspec/parse.go | 12 +- jobspec/parse_test.go | 15 + .../distinctProperty-constraint.hcl | 5 + nomad/structs/structs.go | 9 +- scheduler/feasible.go | 144 ++++++- scheduler/feasible_test.go | 401 +++++++++++++++--- scheduler/generic_sched_test.go | 170 ++++++++ 7 files changed, 688 insertions(+), 68 deletions(-) create mode 100644 jobspec/test-fixtures/distinctProperty-constraint.hcl diff --git a/jobspec/parse.go b/jobspec/parse.go index 7c98c22aae3..8a208f9864d 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -412,12 +412,13 @@ func parseConstraints(result *[]*api.Constraint, list *ast.ObjectList) error { // Check for invalid keys valid := []string{ "attribute", + "distinct_hosts", + "distinct_property", "operator", - "value", - "version", "regexp", - "distinct_hosts", "set_contains", + "value", + "version", } if err := checkHCLKeys(o.Val, valid); err != nil { return err @@ -467,6 +468,11 @@ func parseConstraints(result *[]*api.Constraint, list *ast.ObjectList) error { m["Operand"] = structs.ConstraintDistinctHosts } + if property, ok := m[structs.ConstraintDistinctProperty]; ok { + m["Operand"] = structs.ConstraintDistinctProperty + m["LTarget"] = property + } + // Build the constraint var c api.Constraint if err := mapstructure.WeakDecode(m, &c); err != nil { diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 58d9225c155..6090dee5402 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -303,6 +303,21 @@ func TestParse(t *testing.T) { false, }, + { + "distinctProperty-constraint.hcl", + &api.Job{ + ID: helper.StringToPtr("foo"), + Name: helper.StringToPtr("foo"), + Constraints: []*api.Constraint{ + &api.Constraint{ + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }, + }, + }, + false, + }, + { "periodic-cron.hcl", &api.Job{ diff --git a/jobspec/test-fixtures/distinctProperty-constraint.hcl b/jobspec/test-fixtures/distinctProperty-constraint.hcl new file mode 100644 index 00000000000..d7cc1ababfc --- /dev/null +++ b/jobspec/test-fixtures/distinctProperty-constraint.hcl @@ -0,0 +1,5 @@ +job "foo" { + constraint { + distinct_property = "${meta.rack}" + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 56009051053..e96ea290214 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3189,10 +3189,11 @@ func (ta *TaskArtifact) Validate() error { } const ( - ConstraintDistinctHosts = "distinct_hosts" - ConstraintRegex = "regexp" - ConstraintVersion = "version" - ConstraintSetContains = "set_contains" + ConstraintDistinctProperty = "distinct_property" + ConstraintDistinctHosts = "distinct_hosts" + ConstraintRegex = "regexp" + ConstraintVersion = "version" + ConstraintSetContains = "set_contains" ) // Constraints are used to restrict placement options. diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 2ca75ee17d4..5f6a9ba3f47 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/nomad/structs" ) @@ -157,6 +158,9 @@ type ProposedAllocConstraintIterator struct { // they don't have to be calculated every time Next() is called. tgDistinctHosts bool jobDistinctHosts bool + + tgDistinctPropertyConstraints []*structs.Constraint + jobDistinctPropertyConstraints []*structs.Constraint } // NewProposedAllocConstraintIterator creates a ProposedAllocConstraintIterator @@ -171,11 +175,13 @@ func NewProposedAllocConstraintIterator(ctx Context, source FeasibleIterator) *P func (iter *ProposedAllocConstraintIterator) SetTaskGroup(tg *structs.TaskGroup) { iter.tg = tg iter.tgDistinctHosts = iter.hasDistinctHostsConstraint(tg.Constraints) + iter.tgDistinctPropertyConstraints = getDistinctPropertyConstraints(tg.Constraints) } func (iter *ProposedAllocConstraintIterator) SetJob(job *structs.Job) { iter.job = job iter.jobDistinctHosts = iter.hasDistinctHostsConstraint(job.Constraints) + iter.jobDistinctPropertyConstraints = getDistinctPropertyConstraints(job.Constraints) } func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool { @@ -184,22 +190,59 @@ func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constrai return true } } + return false } +// getDistinctPropertyConstraints filters the input constraints returning a list +// of distinct_property constraints. +func getDistinctPropertyConstraints(constraints []*structs.Constraint) []*structs.Constraint { + var distinctProperties []*structs.Constraint + for _, con := range constraints { + if con.Operand == structs.ConstraintDistinctProperty { + distinctProperties = append(distinctProperties, con) + } + } + return distinctProperties +} + func (iter *ProposedAllocConstraintIterator) Next() *structs.Node { +OUTER: for { // Get the next option from the source option := iter.source.Next() - // Hot-path if the option is nil or there are no distinct_hosts constraints. - if option == nil || !(iter.jobDistinctHosts || iter.tgDistinctHosts) { + // Hot-path if the option is nil or there are no distinct_hosts or + // distinct_property constraints. + hosts := iter.jobDistinctHosts || iter.tgDistinctHosts + properties := len(iter.jobDistinctPropertyConstraints) != 0 || len(iter.tgDistinctPropertyConstraints) != 0 + if option == nil || !(hosts || properties) { return option } - if !iter.satisfiesDistinctHosts(option) { - iter.ctx.Metrics().FilterNode(option, structs.ConstraintDistinctHosts) - continue + // Check if the host constraints are satisfied + if hosts { + if !iter.satisfiesDistinctHosts(option) { + iter.ctx.Metrics().FilterNode(option, structs.ConstraintDistinctHosts) + continue + } + } + + // Check if the property constraints are satisfied + if properties { + for _, con := range iter.jobDistinctPropertyConstraints { + if !iter.satisfiesDistinctProperty(option, con, true) { + iter.ctx.Metrics().FilterNode(option, con.String()) + continue OUTER + } + } + + for _, con := range iter.tgDistinctPropertyConstraints { + if !iter.satisfiesDistinctProperty(option, con, true) { + iter.ctx.Metrics().FilterNode(option, con.String()) + continue OUTER + } + } } return option @@ -237,6 +280,95 @@ func (iter *ProposedAllocConstraintIterator) satisfiesDistinctHosts(option *stru return true } +// satisfiesDistinctProperty checks if the node satisfies the +// distinct_property constraint for the passed constraint. +// XXX it is insane to do all this work per constraint. Pass in all constraints +func (iter *ProposedAllocConstraintIterator) satisfiesDistinctProperty( + option *structs.Node, con *structs.Constraint, jobConstraint bool) bool { + + // Check if this node has the property + val, ok := resolveConstraintTarget(con.LTarget, option) + if !ok { + return false + } + optionValue, ok := val.(string) + if !ok { + return false + } + + // Retrieve all previously placed allocations + // XXX this should be cached + ws := memdb.NewWatchSet() + allocs, err := iter.ctx.State().AllocsByJob(ws, iter.job.ID, false) + if err != nil { + iter.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: failed to get job's allocations: %v", err) + return false + } + + // Add the proposed allocations + for _, pallocs := range iter.ctx.Plan().NodeAllocation { + allocs = append(allocs, pallocs...) + } + + // Filter to relevant allocations + var filteredAllocs []*structs.Allocation + for _, alloc := range allocs { + // Ensure the allocation is for the same job + if alloc.JobID != iter.job.ID { + continue + } + + // Ensure the allocation is non-terminal + if alloc.TerminalStatus() { + continue + } + + // We are working on a particular task group so filter allocs not from + // it. + if !jobConstraint && alloc.TaskGroup != iter.tg.Name { + continue + } + + filteredAllocs = append(filteredAllocs, alloc) + } + + // Get all the nodes that have already been used + nodes := make(map[string]*structs.Node) + for _, alloc := range filteredAllocs { + if _, ok := nodes[alloc.NodeID]; ok { + continue + } + + node, err := iter.ctx.State().NodeByID(ws, alloc.NodeID) + if err != nil { + iter.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: failed to lookup node ID %q: %v", alloc.NodeID, err) + return false + } + + nodes[alloc.NodeID] = node + } + + // Check if this options value for the target propery collides with a + // previously selected property value + for _, n := range nodes { + val, ok := resolveConstraintTarget(con.LTarget, n) + if !ok { + continue + } + nodeValue, ok := val.(string) + if !ok { + continue + } + + // We have a duplicate so we aren't valid + if nodeValue == optionValue { + return false + } + } + + return true +} + func (iter *ProposedAllocConstraintIterator) Reset() { iter.source.Reset() } @@ -327,7 +459,7 @@ func resolveConstraintTarget(target string, node *structs.Node) (interface{}, bo func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool { // Check for constraints not handled by this checker. switch operand { - case structs.ConstraintDistinctHosts: + case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty: return true default: break diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index e6fe430e47d..9f6bcc80e22 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -1,6 +1,7 @@ package scheduler import ( + "fmt" "reflect" "testing" @@ -434,49 +435,14 @@ func TestCheckRegexpConstraint(t *testing.T) { } } +// This test puts allocations on the node to test if it detects infeasibility of +// nodes correctly and picks the only feasible one func TestProposedAllocConstraint_JobDistinctHosts(t *testing.T) { _, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), mock.Node(), mock.Node(), - mock.Node(), - } - static := NewStaticIterator(ctx, nodes) - - // Create a job with a distinct_hosts constraint and two task groups. - tg1 := &structs.TaskGroup{Name: "bar"} - tg2 := &structs.TaskGroup{Name: "baz"} - - job := &structs.Job{ - ID: "foo", - Constraints: []*structs.Constraint{{Operand: structs.ConstraintDistinctHosts}}, - TaskGroups: []*structs.TaskGroup{tg1, tg2}, - } - - propsed := NewProposedAllocConstraintIterator(ctx, static) - propsed.SetTaskGroup(tg1) - propsed.SetJob(job) - - out := collectFeasible(propsed) - if len(out) != 4 { - t.Fatalf("Bad: %#v", out) - } - - selected := make(map[string]struct{}, 4) - for _, option := range out { - if _, ok := selected[option.ID]; ok { - t.Fatalf("selected node %v for more than one alloc", option) - } - selected[option.ID] = struct{}{} - } -} - -func TestProposedAllocConstraint_JobDistinctHosts_Infeasible(t *testing.T) { - _, ctx := testContext(t) - nodes := []*structs.Node{ - mock.Node(), - mock.Node(), } static := NewStaticIterator(ctx, nodes) @@ -491,7 +457,7 @@ func TestProposedAllocConstraint_JobDistinctHosts_Infeasible(t *testing.T) { } // Add allocs placing tg1 on node1 and tg2 on node2. This should make the - // job unsatisfiable. + // job unsatisfiable on all nodes but node3 plan := ctx.Plan() plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ &structs.Allocation{ @@ -522,14 +488,18 @@ func TestProposedAllocConstraint_JobDistinctHosts_Infeasible(t *testing.T) { }, } - propsed := NewProposedAllocConstraintIterator(ctx, static) - propsed.SetTaskGroup(tg1) - propsed.SetJob(job) + proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed.SetTaskGroup(tg1) + proposed.SetJob(job) - out := collectFeasible(propsed) - if len(out) != 0 { + out := collectFeasible(proposed) + if len(out) != 1 { t.Fatalf("Bad: %#v", out) } + + if out[0].ID != nodes[2].ID { + t.Fatalf("wrong node picked") + } } func TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount(t *testing.T) { @@ -551,13 +521,31 @@ func TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount(t *testing.T) TaskGroups: []*structs.TaskGroup{tg1, tg2, tg3}, } - propsed := NewProposedAllocConstraintIterator(ctx, static) - propsed.SetTaskGroup(tg1) - propsed.SetJob(job) + // Add allocs placing tg1 on node1 and tg2 on node2. This should make the + // job unsatisfiable for tg3 + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + }, + } + plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + }, + } + + proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed.SetTaskGroup(tg3) + proposed.SetJob(job) // It should not be able to place 3 tasks with only two nodes. - out := collectFeasible(propsed) - if len(out) != 2 { + out := collectFeasible(proposed) + if len(out) != 0 { t.Fatalf("Bad: %#v", out) } } @@ -571,18 +559,19 @@ func TestProposedAllocConstraint_TaskGroupDistinctHosts(t *testing.T) { static := NewStaticIterator(ctx, nodes) // Create a task group with a distinct_hosts constraint. - taskGroup := &structs.TaskGroup{ + tg1 := &structs.TaskGroup{ Name: "example", Constraints: []*structs.Constraint{ {Operand: structs.ConstraintDistinctHosts}, }, } + tg2 := &structs.TaskGroup{Name: "baz"} // Add a planned alloc to node1. plan := ctx.Plan() plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ &structs.Allocation{ - TaskGroup: taskGroup.Name, + TaskGroup: tg1.Name, JobID: "foo", }, } @@ -591,16 +580,16 @@ func TestProposedAllocConstraint_TaskGroupDistinctHosts(t *testing.T) { // different job. plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{ &structs.Allocation{ - TaskGroup: taskGroup.Name, + TaskGroup: tg1.Name, JobID: "bar", }, } - propsed := NewProposedAllocConstraintIterator(ctx, static) - propsed.SetTaskGroup(taskGroup) - propsed.SetJob(&structs.Job{ID: "foo"}) + proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed.SetTaskGroup(tg1) + proposed.SetJob(&structs.Job{ID: "foo"}) - out := collectFeasible(propsed) + out := collectFeasible(proposed) if len(out) != 1 { t.Fatalf("Bad: %#v", out) } @@ -610,6 +599,308 @@ func TestProposedAllocConstraint_TaskGroupDistinctHosts(t *testing.T) { if out[0] != nodes[1] { t.Fatalf("Bad: %v", out) } + + // Since the other task group doesn't have the constraint, both nodes should + // be feasible. + proposed.Reset() + proposed.SetTaskGroup(tg2) + out = collectFeasible(proposed) + if len(out) != 2 { + t.Fatalf("Bad: %#v", out) + } +} + +// This test puts creates allocations across task groups that use a property +// value to detect if the constraint at the job level properly considers all +// task groups. +func TestProposedAllocConstraint_JobDistinctProperty(t *testing.T) { + state, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + + for i, n := range nodes { + n.Meta["rack"] = fmt.Sprintf("%d", i) + + // Add to state store + if err := state.UpsertNode(uint64(100+i), n); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + } + + static := NewStaticIterator(ctx, nodes) + + // Create a job with a distinct_property constraint and a task groups. + tg1 := &structs.TaskGroup{Name: "bar"} + tg2 := &structs.TaskGroup{Name: "baz"} + + job := &structs.Job{ + ID: "foo", + Constraints: []*structs.Constraint{ + { + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }, + }, + TaskGroups: []*structs.TaskGroup{tg1, tg2}, + } + + // Add allocs placing tg1 on node1 and 2 and tg2 on node3 and 4. This should make the + // job unsatisfiable on all nodes but node5. Also mix the allocations + // existing in the plan and the state store. + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + NodeID: nodes[0].ID, + }, + + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: "ignore 2", + ID: structs.GenerateUUID(), + NodeID: nodes[0].ID, + }, + } + plan.NodeAllocation[nodes[2].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + NodeID: nodes[2].ID, + }, + + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: "ignore 2", + ID: structs.GenerateUUID(), + NodeID: nodes[2].ID, + }, + } + + upserting := []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[1].ID, + }, + + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: "ignore 2", + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[1].ID, + }, + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[3].ID, + }, + + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: "ignore 2", + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[3].ID, + }, + } + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed.SetTaskGroup(tg2) + proposed.SetJob(job) + + out := collectFeasible(proposed) + if len(out) != 1 { + t.Fatalf("Bad: %#v", out) + } + if out[0].ID != nodes[4].ID { + t.Fatalf("wrong node picked") + } +} + +// This test creates previous allocations selecting certain property values to +// test if it detects infeasibility of property values correctly and picks the +// only feasible one +func TestProposedAllocConstraint_JobDistinctProperty_Infeasible(t *testing.T) { + state, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + + for i, n := range nodes { + n.Meta["rack"] = fmt.Sprintf("%d", i) + + // Add to state store + if err := state.UpsertNode(uint64(100+i), n); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + } + + static := NewStaticIterator(ctx, nodes) + + // Create a job with a distinct_property constraint and a task groups. + tg1 := &structs.TaskGroup{Name: "bar"} + tg2 := &structs.TaskGroup{Name: "baz"} + tg3 := &structs.TaskGroup{Name: "bam"} + + job := &structs.Job{ + ID: "foo", + Constraints: []*structs.Constraint{ + { + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }, + }, + TaskGroups: []*structs.TaskGroup{tg1, tg2, tg3}, + } + + // Add allocs placing tg1 on node1 and tg2 on node2. This should make the + // job unsatisfiable for tg3. + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + NodeID: nodes[0].ID, + }, + } + upserting := []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[1].ID, + }, + } + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed.SetTaskGroup(tg3) + proposed.SetJob(job) + + out := collectFeasible(proposed) + if len(out) != 0 { + t.Fatalf("Bad: %#v", out) + } +} + +// This test creates previous allocations selecting certain property values to +// test if it detects infeasibility of property values correctly and picks the +// only feasible one when the constraint is at the task group. +func TestProposedAllocConstraint_TaskGroupDistinctProperty(t *testing.T) { + state, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + } + + for i, n := range nodes { + n.Meta["rack"] = fmt.Sprintf("%d", i) + + // Add to state store + if err := state.UpsertNode(uint64(100+i), n); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + } + + static := NewStaticIterator(ctx, nodes) + + // Create a job with a task group with the distinct_property constraint + tg1 := &structs.TaskGroup{ + Name: "example", + Constraints: []*structs.Constraint{ + { + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }, + }, + } + tg2 := &structs.TaskGroup{Name: "baz"} + + job := &structs.Job{ + ID: "foo", + TaskGroups: []*structs.TaskGroup{tg1, tg2}, + } + + // Add allocs placing tg1 on node1 and 2. This should make the + // job unsatisfiable on all nodes but node3. Also mix the allocations + // existing in the plan and the state store. + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + NodeID: nodes[0].ID, + }, + } + upserting := []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[1].ID, + }, + + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: "ignore 2", + ID: structs.GenerateUUID(), + EvalID: structs.GenerateUUID(), + NodeID: nodes[2].ID, + }, + } + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed.SetTaskGroup(tg1) + proposed.SetJob(job) + + out := collectFeasible(proposed) + if len(out) != 1 { + t.Fatalf("Bad: %#v", out) + } + if out[0].ID != nodes[2].ID { + t.Fatalf("wrong node picked") + } + + // Since the other task group doesn't have the constraint, both nodes should + // be feasible. + proposed.Reset() + proposed.SetTaskGroup(tg2) + out = collectFeasible(proposed) + if len(out) != 3 { + t.Fatalf("Bad: %#v", out) + } } func collectFeasible(iter FeasibleIterator) (out []*structs.Node) { diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 36ec50b0fc1..7be10e40ab1 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -229,6 +229,176 @@ func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_DistinctHosts_Job(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job that uses distinct host and has count 1 higher than what is + // possible. + job := mock.Job() + job.TaskGroups[0].Count = 11 + job.Constraints = append(job.Constraints, &structs.Constraint{Operand: structs.ConstraintDistinctHosts}) + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the eval has spawned blocked eval + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + // Ensure the plan failed to alloc + outEval := h.Evals[0] + if len(outEval.FailedTGAllocs) != 1 { + t.Fatalf("bad: %+v", outEval) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + // Ensure different node was used per. + used := make(map[string]struct{}) + for _, alloc := range out { + if _, ok := used[alloc.NodeID]; ok { + t.Fatalf("Node collision %v", alloc.NodeID) + } + used[alloc.NodeID] = struct{}{} + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestServiceSched_JobRegister_DistinctProperty(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + rack := "rack2" + if i < 5 { + rack = "rack1" + } + node.Meta["rack"] = rack + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job that uses distinct property and has count higher than what is + // possible. + job := mock.Job() + job.TaskGroups[0].Count = 4 + job.Constraints = append(job.Constraints, + &structs.Constraint{ + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }) + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval has spawned blocked eval + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + // Ensure the plan failed to alloc + outEval := h.Evals[0] + if len(outEval.FailedTGAllocs) != 1 { + t.Fatalf("bad: %+v", outEval) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 2 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 2 { + t.Fatalf("bad: %#v", out) + } + + // Ensure different node was used per. + used := make(map[string]struct{}) + for _, alloc := range out { + if _, ok := used[alloc.NodeID]; ok { + t.Fatalf("Node collision %v", alloc.NodeID) + } + used[alloc.NodeID] = struct{}{} + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobRegister_Annotate(t *testing.T) { h := NewHarness(t) From fdc430937494b8b442837d0627b13d30c9e0cbf8 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 8 Mar 2017 11:47:55 -0800 Subject: [PATCH 2/8] Property Set --- scheduler/feasible.go | 421 +++++++++++++++++++++++++------- scheduler/feasible_test.go | 138 ++++++++++- scheduler/generic_sched_test.go | 108 +++++++- scheduler/util.go | 5 - 4 files changed, 572 insertions(+), 100 deletions(-) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 5f6a9ba3f47..29ded1d9bd4 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -154,34 +154,39 @@ type ProposedAllocConstraintIterator struct { tg *structs.TaskGroup job *structs.Job + // distinctProperties is used to track the distinct properties of the job + // and to check if node options satisfy these constraints. + distinctProperties *propertySet + // Store whether the Job or TaskGroup has a distinct_hosts constraints so // they don't have to be calculated every time Next() is called. tgDistinctHosts bool jobDistinctHosts bool - - tgDistinctPropertyConstraints []*structs.Constraint - jobDistinctPropertyConstraints []*structs.Constraint } // NewProposedAllocConstraintIterator creates a ProposedAllocConstraintIterator // from a source. func NewProposedAllocConstraintIterator(ctx Context, source FeasibleIterator) *ProposedAllocConstraintIterator { return &ProposedAllocConstraintIterator{ - ctx: ctx, - source: source, + ctx: ctx, + source: source, + distinctProperties: NewPropertySet(ctx), } } func (iter *ProposedAllocConstraintIterator) SetTaskGroup(tg *structs.TaskGroup) { iter.tg = tg iter.tgDistinctHosts = iter.hasDistinctHostsConstraint(tg.Constraints) - iter.tgDistinctPropertyConstraints = getDistinctPropertyConstraints(tg.Constraints) } func (iter *ProposedAllocConstraintIterator) SetJob(job *structs.Job) { iter.job = job iter.jobDistinctHosts = iter.hasDistinctHostsConstraint(job.Constraints) - iter.jobDistinctPropertyConstraints = getDistinctPropertyConstraints(job.Constraints) + + if err := iter.distinctProperties.SetJob(job); err != nil { + iter.ctx.Logger().Printf( + "[ERR] scheduler.dynamic-constraint: failed to build property set: %v", err) + } } func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool { @@ -194,20 +199,7 @@ func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constrai return false } -// getDistinctPropertyConstraints filters the input constraints returning a list -// of distinct_property constraints. -func getDistinctPropertyConstraints(constraints []*structs.Constraint) []*structs.Constraint { - var distinctProperties []*structs.Constraint - for _, con := range constraints { - if con.Operand == structs.ConstraintDistinctProperty { - distinctProperties = append(distinctProperties, con) - } - } - return distinctProperties -} - func (iter *ProposedAllocConstraintIterator) Next() *structs.Node { -OUTER: for { // Get the next option from the source option := iter.source.Next() @@ -215,7 +207,7 @@ OUTER: // Hot-path if the option is nil or there are no distinct_hosts or // distinct_property constraints. hosts := iter.jobDistinctHosts || iter.tgDistinctHosts - properties := len(iter.jobDistinctPropertyConstraints) != 0 || len(iter.tgDistinctPropertyConstraints) != 0 + properties := iter.distinctProperties.HasDistinctPropertyConstraints() if option == nil || !(hosts || properties) { return option } @@ -230,18 +222,10 @@ OUTER: // Check if the property constraints are satisfied if properties { - for _, con := range iter.jobDistinctPropertyConstraints { - if !iter.satisfiesDistinctProperty(option, con, true) { - iter.ctx.Metrics().FilterNode(option, con.String()) - continue OUTER - } - } - - for _, con := range iter.tgDistinctPropertyConstraints { - if !iter.satisfiesDistinctProperty(option, con, true) { - iter.ctx.Metrics().FilterNode(option, con.String()) - continue OUTER - } + satisfied, reason := iter.distinctProperties.SatisfiesDistinctProperties(option, iter.tg.Name) + if !satisfied { + iter.ctx.Metrics().FilterNode(option, reason) + continue } } @@ -280,97 +264,356 @@ func (iter *ProposedAllocConstraintIterator) satisfiesDistinctHosts(option *stru return true } -// satisfiesDistinctProperty checks if the node satisfies the -// distinct_property constraint for the passed constraint. -// XXX it is insane to do all this work per constraint. Pass in all constraints -func (iter *ProposedAllocConstraintIterator) satisfiesDistinctProperty( - option *structs.Node, con *structs.Constraint, jobConstraint bool) bool { +func (iter *ProposedAllocConstraintIterator) Reset() { + iter.source.Reset() - // Check if this node has the property - val, ok := resolveConstraintTarget(con.LTarget, option) - if !ok { - return false + // Repopulate the proposed set every time we are reset because an + // additional allocation may have been added + if err := iter.distinctProperties.PopulateProposed(); err != nil { + iter.ctx.Logger().Printf( + "[ERR] scheduler.dynamic-constraint: failed to populate proposed properties: %v", err) } - optionValue, ok := val.(string) - if !ok { - return false +} + +// propertySet is used to track used values for a particular node property. +type propertySet struct { + + // ctx is used to lookup the plan and state + ctx Context + + // job stores the job the property set is tracking + job *structs.Job + + // jobConstrainedProperties stores the set of LTargets that we are + // constrained on. The outer key is the task group name. The values stored + // in key "" are those constrained at the job level. + jobConstrainedProperties map[string]map[string]struct{} + + // existingProperties is a mapping of task group/job to properties (LTarget) + // to a string set of used values for pre-placed allocation. + existingProperties map[string]map[string]map[string]struct{} + + // proposedCreateProperties is a mapping of task group/job to properties (LTarget) + // to a string set of used values for proposed allocations. + proposedCreateProperties map[string]map[string]map[string]struct{} + + // clearedProposedProperties is a mapping of task group/job to properties (LTarget) + // to a string set of values that have been cleared and are no longer used + clearedProposedProperties map[string]map[string]map[string]struct{} +} + +// NewPropertySet returns a new property set used to guarantee unique property +// values for new allocation placements. +func NewPropertySet(ctx Context) *propertySet { + p := &propertySet{ + ctx: ctx, + jobConstrainedProperties: make(map[string]map[string]struct{}), + existingProperties: make(map[string]map[string]map[string]struct{}), + proposedCreateProperties: make(map[string]map[string]map[string]struct{}), + clearedProposedProperties: make(map[string]map[string]map[string]struct{}), } - // Retrieve all previously placed allocations - // XXX this should be cached - ws := memdb.NewWatchSet() - allocs, err := iter.ctx.State().AllocsByJob(ws, iter.job.ID, false) - if err != nil { - iter.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: failed to get job's allocations: %v", err) - return false + return p +} + +// setJob sets the job the property set is tracking. The distinct property +// constraints and property values already used by existing allocations are +// calculated. +func (p *propertySet) SetJob(j *structs.Job) error { + p.job = j + + p.buildDistinctProperties() + if err := p.populateExisting(); err != nil { + return err } - // Add the proposed allocations - for _, pallocs := range iter.ctx.Plan().NodeAllocation { - allocs = append(allocs, pallocs...) + return nil +} + +// HasDistinctPropertyConstraints returns whether there are distinct_property +// constraints on the job. +func (p *propertySet) HasDistinctPropertyConstraints() bool { + return len(p.jobConstrainedProperties) != 0 +} + +// PopulateProposed populates the set of property values used by the proposed +// allocations for the job. This should be called on every reset +func (p *propertySet) PopulateProposed() error { + // Hot path since there is nothing to do + if !p.HasDistinctPropertyConstraints() { + return nil } - // Filter to relevant allocations - var filteredAllocs []*structs.Allocation - for _, alloc := range allocs { - // Ensure the allocation is for the same job - if alloc.JobID != iter.job.ID { - continue + // Reset the proposed properties + p.proposedCreateProperties = make(map[string]map[string]map[string]struct{}) + p.clearedProposedProperties = make(map[string]map[string]map[string]struct{}) + + // Gather the set of proposed stops. + var stopping []*structs.Allocation + for _, updates := range p.ctx.Plan().NodeUpdate { + stopping = append(stopping, updates...) + } + + // build the property set for the proposed stopped allocations + // This should be called before building the property set for the created + // allocs. + if err := p.buildProperySet(stopping, false, true); err != nil { + return err + } + + // Gather the proposed allocations + var proposed []*structs.Allocation + for _, pallocs := range p.ctx.Plan().NodeAllocation { + proposed = append(proposed, pallocs...) + } + + // build the property set for the proposed new allocations + return p.buildProperySet(proposed, false, false) +} + +// satisfiesDistinctProperties checks if the option satisfies all +// distinct_property constraints given the existing placements and proposed +// placements. If the option does not satisfy the constraints an explanation is +// given. +func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) { + // Hot path if there is nothing to do + jobConstrainedProperties := p.jobConstrainedProperties[""] + tgConstrainedProperties := p.jobConstrainedProperties[tg] + if len(jobConstrainedProperties) == 0 && len(tgConstrainedProperties) == 0 { + return true, "" + } + + // both is used to iterate over both the proposed and existing used + // properties + bothAll := []map[string]map[string]map[string]struct{}{p.existingProperties, p.proposedCreateProperties} + + // Check if the option is unique for all the job properties + for constrainedProperty := range jobConstrainedProperties { + // Get the nodes property value + nValue, ok := p.getProperty(option, constrainedProperty) + if !ok { + return false, fmt.Sprintf("missing property %q", constrainedProperty) } - // Ensure the allocation is non-terminal - if alloc.TerminalStatus() { - continue + // Check if the nodes value has already been used. + for _, usedProperties := range bothAll { + // Since we are checking at the job level, check all task groups + for group, properties := range usedProperties { + setValues, ok := properties[constrainedProperty] + if !ok { + continue + } + + // Check if the nodes value has been used + _, used := setValues[nValue] + if !used { + continue + } + + // The last check is to ensure that the value hasn't been + // removed in the proposed removals. + if _, cleared := p.clearedProposedProperties[group][constrainedProperty][nValue]; cleared { + continue + } + + return false, fmt.Sprintf("distinct_property: %s=%s already used", constrainedProperty, nValue) + } + } + } + + // bothTG is both filtered at by the task group + bothTG := []map[string]map[string]struct{}{p.existingProperties[tg], p.proposedCreateProperties[tg]} + + // Check if the option is unique for all the task group properties + for constrainedProperty := range tgConstrainedProperties { + // Get the nodes property value + nValue, ok := p.getProperty(option, constrainedProperty) + if !ok { + return false, fmt.Sprintf("missing property %q", constrainedProperty) + } + + // Check if the nodes value has already been used. + for _, properties := range bothTG { + setValues, ok := properties[constrainedProperty] + if !ok { + continue + } + + // Check if the nodes value has been used + if _, used := setValues[nValue]; !used { + continue + } + + // The last check is to ensure that the value hasn't been + // removed in the proposed removals. + if _, cleared := p.clearedProposedProperties[tg][constrainedProperty][nValue]; cleared { + continue + } + + return false, fmt.Sprintf("distinct_property: %s=%s already used", constrainedProperty, nValue) } + } - // We are working on a particular task group so filter allocs not from - // it. - if !jobConstraint && alloc.TaskGroup != iter.tg.Name { + return true, "" +} + +// buildDistinctProperties takes the job and populates the map of distinct +// properties that are constrained on for the job. +func (p *propertySet) buildDistinctProperties() { + for _, c := range p.job.Constraints { + if c.Operand != structs.ConstraintDistinctProperty { continue } - filteredAllocs = append(filteredAllocs, alloc) + // Store job properties in the magic empty string since it can't be used + // by any task group. + if _, ok := p.jobConstrainedProperties[""]; !ok { + p.jobConstrainedProperties[""] = make(map[string]struct{}) + } + + p.jobConstrainedProperties[""][c.LTarget] = struct{}{} + } + + for _, tg := range p.job.TaskGroups { + for _, c := range tg.Constraints { + if c.Operand != structs.ConstraintDistinctProperty { + continue + } + + if _, ok := p.jobConstrainedProperties[tg.Name]; !ok { + p.jobConstrainedProperties[tg.Name] = make(map[string]struct{}) + } + + p.jobConstrainedProperties[tg.Name][c.LTarget] = struct{}{} + } + } +} + +// populateExisting populates the set of property values used by existing +// allocations for the job. +func (p *propertySet) populateExisting() error { + // Hot path since there is nothing to do + if !p.HasDistinctPropertyConstraints() { + return nil + } + + // Retrieve all previously placed allocations + ws := memdb.NewWatchSet() + allocs, err := p.ctx.State().AllocsByJob(ws, p.job.ID, false) + if err != nil { + return fmt.Errorf("failed to get job's allocations: %v", err) } - // Get all the nodes that have already been used + return p.buildProperySet(allocs, true, false) +} + +// buildProperySet takes a set of allocations and determines what property +// values have been used by them. The existing boolean marks whether these are +// existing allocations or proposed allocations. Stopping marks whether the +// allocations are being stopped. +func (p *propertySet) buildProperySet(allocs []*structs.Allocation, existing, stopping bool) error { + // Only want running allocs + filtered := allocs + if !stopping { + filtered, _ = structs.FilterTerminalAllocs(allocs) + } + + // Get all the nodes that have been used by the allocs + ws := memdb.NewWatchSet() nodes := make(map[string]*structs.Node) - for _, alloc := range filteredAllocs { + for _, alloc := range filtered { if _, ok := nodes[alloc.NodeID]; ok { continue } - node, err := iter.ctx.State().NodeByID(ws, alloc.NodeID) + node, err := p.ctx.State().NodeByID(ws, alloc.NodeID) if err != nil { - iter.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: failed to lookup node ID %q: %v", alloc.NodeID, err) - return false + return fmt.Errorf("failed to lookup node ID %q: %v", alloc.NodeID, err) } nodes[alloc.NodeID] = node } - // Check if this options value for the target propery collides with a - // previously selected property value - for _, n := range nodes { - val, ok := resolveConstraintTarget(con.LTarget, n) - if !ok { - continue - } - nodeValue, ok := val.(string) - if !ok { - continue + // propertySet is the set we are operating on + propertySet := p.existingProperties + if !existing && !stopping { + propertySet = p.proposedCreateProperties + } else if stopping { + propertySet = p.clearedProposedProperties + } + + // Go through each allocation and build the set of property values that have + // been used + for _, alloc := range filtered { + // Gather job related constrained properties + jobProperties := p.jobConstrainedProperties[""] + for constrainedProperty := range jobProperties { + nProperty, ok := p.getProperty(nodes[alloc.NodeID], constrainedProperty) + if !ok { + continue + } + + if _, exists := propertySet[""]; !exists { + propertySet[""] = make(map[string]map[string]struct{}) + } + + if _, exists := propertySet[""][constrainedProperty]; !exists { + propertySet[""][constrainedProperty] = make(map[string]struct{}) + } + + propertySet[""][constrainedProperty][nProperty] = struct{}{} + + // This is a newly created allocation so clear out the fact that + // proposed property is not being used anymore + if !existing && !stopping { + delete(p.clearedProposedProperties[""][constrainedProperty], nProperty) + } } - // We have a duplicate so we aren't valid - if nodeValue == optionValue { - return false + // Gather task group related constrained properties + groupProperties := p.jobConstrainedProperties[alloc.TaskGroup] + for constrainedProperty := range groupProperties { + nProperty, ok := p.getProperty(nodes[alloc.NodeID], constrainedProperty) + if !ok { + continue + } + + if _, exists := propertySet[alloc.TaskGroup]; !exists { + propertySet[alloc.TaskGroup] = make(map[string]map[string]struct{}) + } + if _, exists := propertySet[alloc.TaskGroup][constrainedProperty]; !exists { + propertySet[alloc.TaskGroup][constrainedProperty] = make(map[string]struct{}) + } + + propertySet[alloc.TaskGroup][constrainedProperty][nProperty] = struct{}{} + + // This is a newly created allocation so clear out the fact that + // proposed property is not being used anymore + if !existing && !stopping { + delete(p.clearedProposedProperties[alloc.TaskGroup][constrainedProperty], nProperty) + } } } - return true + return nil } -func (iter *ProposedAllocConstraintIterator) Reset() { - iter.source.Reset() +// getProperty is used to lookup the property value on the node +func (p *propertySet) getProperty(n *structs.Node, property string) (string, bool) { + if n == nil || property == "" { + return "", false + } + + val, ok := resolveConstraintTarget(property, n) + if !ok { + return "", false + } + nodeValue, ok := val.(string) + if !ok { + return "", false + } + + return nodeValue, true } // ConstraintChecker is a FeasibilityChecker which returns nodes that match a diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 9f6bcc80e22..eb065e5a654 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -653,11 +653,12 @@ func TestProposedAllocConstraint_JobDistinctProperty(t *testing.T) { // job unsatisfiable on all nodes but node5. Also mix the allocations // existing in the plan and the state store. plan := ctx.Plan() + alloc1ID := structs.GenerateUUID() plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ &structs.Allocation{ TaskGroup: tg1.Name, JobID: job.ID, - ID: structs.GenerateUUID(), + ID: alloc1ID, NodeID: nodes[0].ID, }, @@ -686,7 +687,28 @@ func TestProposedAllocConstraint_JobDistinctProperty(t *testing.T) { }, } + // Put an allocation on Node 5 but make it stopped in the plan + stoppingAllocID := structs.GenerateUUID() + plan.NodeUpdate[nodes[4].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: stoppingAllocID, + NodeID: nodes[4].ID, + }, + } + upserting := []*structs.Allocation{ + // Have one of the allocations exist in both the plan and the state + // store. This resembles an allocation update + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: alloc1ID, + EvalID: structs.GenerateUUID(), + NodeID: nodes[0].ID, + }, + &structs.Allocation{ TaskGroup: tg1.Name, JobID: job.ID, @@ -719,14 +741,22 @@ func TestProposedAllocConstraint_JobDistinctProperty(t *testing.T) { EvalID: structs.GenerateUUID(), NodeID: nodes[3].ID, }, + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + ID: stoppingAllocID, + EvalID: structs.GenerateUUID(), + NodeID: nodes[4].ID, + }, } if err := state.UpsertAllocs(1000, upserting); err != nil { t.Fatalf("failed to UpsertAllocs: %v", err) } proposed := NewProposedAllocConstraintIterator(ctx, static) - proposed.SetTaskGroup(tg2) proposed.SetJob(job) + proposed.SetTaskGroup(tg2) + proposed.Reset() out := collectFeasible(proposed) if len(out) != 1 { @@ -737,6 +767,81 @@ func TestProposedAllocConstraint_JobDistinctProperty(t *testing.T) { } } +// This test checks that if a node has an allocation on it that gets stopped, +// there is a plan to re-use that for a new allocation, that the next select +// won't select that node. +func TestProposedAllocConstraint_JobDistinctProperty_RemoveAndReplace(t *testing.T) { + state, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + } + + nodes[0].Meta["rack"] = "1" + + // Add to state store + if err := state.UpsertNode(uint64(100), nodes[0]); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + + static := NewStaticIterator(ctx, nodes) + + // Create a job with a distinct_property constraint and a task groups. + tg1 := &structs.TaskGroup{Name: "bar"} + job := &structs.Job{ + ID: "foo", + Constraints: []*structs.Constraint{ + { + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }, + }, + TaskGroups: []*structs.TaskGroup{tg1}, + } + + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: structs.GenerateUUID(), + NodeID: nodes[0].ID, + }, + } + + stoppingAllocID := structs.GenerateUUID() + plan.NodeUpdate[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: stoppingAllocID, + NodeID: nodes[0].ID, + }, + } + + upserting := []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: stoppingAllocID, + EvalID: structs.GenerateUUID(), + NodeID: nodes[0].ID, + }, + } + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed.SetJob(job) + proposed.SetTaskGroup(tg1) + proposed.Reset() + + out := collectFeasible(proposed) + if len(out) != 0 { + t.Fatalf("Bad: %#v", out) + } +} + // This test creates previous allocations selecting certain property values to // test if it detects infeasibility of property values correctly and picks the // only feasible one @@ -799,8 +904,9 @@ func TestProposedAllocConstraint_JobDistinctProperty_Infeasible(t *testing.T) { } proposed := NewProposedAllocConstraintIterator(ctx, static) - proposed.SetTaskGroup(tg3) proposed.SetJob(job) + proposed.SetTaskGroup(tg3) + proposed.Reset() out := collectFeasible(proposed) if len(out) != 0 { @@ -859,6 +965,18 @@ func TestProposedAllocConstraint_TaskGroupDistinctProperty(t *testing.T) { NodeID: nodes[0].ID, }, } + + // Put an allocation on Node 3 but make it stopped in the plan + stoppingAllocID := structs.GenerateUUID() + plan.NodeUpdate[nodes[2].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: stoppingAllocID, + NodeID: nodes[2].ID, + }, + } + upserting := []*structs.Allocation{ &structs.Allocation{ TaskGroup: tg1.Name, @@ -876,14 +994,23 @@ func TestProposedAllocConstraint_TaskGroupDistinctProperty(t *testing.T) { EvalID: structs.GenerateUUID(), NodeID: nodes[2].ID, }, + + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + ID: stoppingAllocID, + EvalID: structs.GenerateUUID(), + NodeID: nodes[2].ID, + }, } if err := state.UpsertAllocs(1000, upserting); err != nil { t.Fatalf("failed to UpsertAllocs: %v", err) } proposed := NewProposedAllocConstraintIterator(ctx, static) - proposed.SetTaskGroup(tg1) proposed.SetJob(job) + proposed.SetTaskGroup(tg1) + proposed.Reset() out := collectFeasible(proposed) if len(out) != 1 { @@ -895,8 +1022,9 @@ func TestProposedAllocConstraint_TaskGroupDistinctProperty(t *testing.T) { // Since the other task group doesn't have the constraint, both nodes should // be feasible. - proposed.Reset() proposed.SetTaskGroup(tg2) + proposed.Reset() + out = collectFeasible(proposed) if len(out) != 3 { t.Fatalf("Bad: %#v", out) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 7be10e40ab1..a4307bc51cc 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -229,7 +229,7 @@ func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } -func TestServiceSched_JobRegister_DistinctHosts_Job(t *testing.T) { +func TestServiceSched_JobRegister_DistinctHosts(t *testing.T) { h := NewHarness(t) // Create some nodes @@ -1488,6 +1488,112 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) { } } +func TestServiceSched_JobModify_DistinctProperty(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + node.Meta["rack"] = fmt.Sprintf("rack%d", i) + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job that uses distinct property and has count higher than what is + // possible. + job := mock.Job() + job.TaskGroups[0].Count = 11 + job.Constraints = append(job.Constraints, + &structs.Constraint{ + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.rack}", + }) + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + oldJob := job.Copy() + oldJob.JobModifyIndex -= 1 + oldJob.TaskGroups[0].Count = 4 + + // Place 4 of 10 + var allocs []*structs.Allocation + for i := 0; i < 4; i++ { + alloc := mock.Alloc() + alloc.Job = oldJob + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval hasn't spawned blocked eval + if len(h.CreateEvals) != 1 { + t.Fatalf("bad: %#v", h.CreateEvals) + } + + // Ensure the plan failed to alloc + outEval := h.Evals[0] + if len(outEval.FailedTGAllocs) != 1 { + t.Fatalf("bad: %+v", outEval) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", planned) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + // Ensure different node was used per. + used := make(map[string]struct{}) + for _, alloc := range out { + if _, ok := used[alloc.NodeID]; ok { + t.Fatalf("Node collision %v", alloc.NodeID) + } + used[alloc.NodeID] = struct{}{} + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobDeregister(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/util.go b/scheduler/util.go index f305134cdec..c660e122ef2 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -464,11 +464,6 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, // Check if the task drivers or config has changed, requires // a rolling upgrade since that cannot be done in-place. - //existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name) - //if tasksUpdated(update.TaskGroup, existing) { - //continue - //} - existing := update.Alloc.Job if tasksUpdated(job, existing, update.TaskGroup.Name) { continue From 868cbe13a21c3346f7fab3b5d62cc6475255e651 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 8 Mar 2017 17:57:31 -0800 Subject: [PATCH 3/8] cleanup --- scheduler/feasible.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 29ded1d9bd4..55aa92477f7 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -307,10 +307,8 @@ type propertySet struct { func NewPropertySet(ctx Context) *propertySet { p := &propertySet{ ctx: ctx, - jobConstrainedProperties: make(map[string]map[string]struct{}), - existingProperties: make(map[string]map[string]map[string]struct{}), - proposedCreateProperties: make(map[string]map[string]map[string]struct{}), - clearedProposedProperties: make(map[string]map[string]map[string]struct{}), + jobConstrainedProperties: make(map[string]map[string]struct{}), + existingProperties: make(map[string]map[string]map[string]struct{}), } return p @@ -320,9 +318,13 @@ func NewPropertySet(ctx Context) *propertySet { // constraints and property values already used by existing allocations are // calculated. func (p *propertySet) SetJob(j *structs.Job) error { + // Store the job p.job = j + // Capture all the distinct property constraints in the job p.buildDistinctProperties() + + // Capture all the used values for those properties if err := p.populateExisting(); err != nil { return err } From 653a1c37f6e19b34611e59a1beebc4b471ee244d Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 8 Mar 2017 19:00:10 -0800 Subject: [PATCH 4/8] Split distinct property and host iterator and add iterator to system stack --- scheduler/feasible.go | 137 +++++++++++++++++++++++++------------ scheduler/feasible_test.go | 28 ++++---- scheduler/stack.go | 50 +++++++++----- 3 files changed, 138 insertions(+), 77 deletions(-) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 55aa92477f7..70f3899b4d9 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -143,53 +143,40 @@ func (c *DriverChecker) hasDrivers(option *structs.Node) bool { return true } -// ProposedAllocConstraintIterator is a FeasibleIterator which returns nodes that -// match constraints that are not static such as Node attributes but are -// effected by proposed alloc placements. Examples are distinct_hosts and -// tenancy constraints. This is used to filter on job and task group -// constraints. -type ProposedAllocConstraintIterator struct { +// DistinctHostsIterator is a FeasibleIterator which returns nodes that pass the +// distinct_hosts constraint. The constraint ensures that multiple allocations +// do not exist on the same node. +type DistinctHostsIterator struct { ctx Context source FeasibleIterator tg *structs.TaskGroup job *structs.Job - // distinctProperties is used to track the distinct properties of the job - // and to check if node options satisfy these constraints. - distinctProperties *propertySet - // Store whether the Job or TaskGroup has a distinct_hosts constraints so // they don't have to be calculated every time Next() is called. tgDistinctHosts bool jobDistinctHosts bool } -// NewProposedAllocConstraintIterator creates a ProposedAllocConstraintIterator -// from a source. -func NewProposedAllocConstraintIterator(ctx Context, source FeasibleIterator) *ProposedAllocConstraintIterator { - return &ProposedAllocConstraintIterator{ - ctx: ctx, - source: source, - distinctProperties: NewPropertySet(ctx), +// NewDistinctHostsIterator creates a DistinctHostsIterator from a source. +func NewDistinctHostsIterator(ctx Context, source FeasibleIterator) *DistinctHostsIterator { + return &DistinctHostsIterator{ + ctx: ctx, + source: source, } } -func (iter *ProposedAllocConstraintIterator) SetTaskGroup(tg *structs.TaskGroup) { +func (iter *DistinctHostsIterator) SetTaskGroup(tg *structs.TaskGroup) { iter.tg = tg iter.tgDistinctHosts = iter.hasDistinctHostsConstraint(tg.Constraints) } -func (iter *ProposedAllocConstraintIterator) SetJob(job *structs.Job) { +func (iter *DistinctHostsIterator) SetJob(job *structs.Job) { iter.job = job iter.jobDistinctHosts = iter.hasDistinctHostsConstraint(job.Constraints) - - if err := iter.distinctProperties.SetJob(job); err != nil { - iter.ctx.Logger().Printf( - "[ERR] scheduler.dynamic-constraint: failed to build property set: %v", err) - } } -func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool { +func (iter *DistinctHostsIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool { for _, con := range constraints { if con.Operand == structs.ConstraintDistinctHosts { return true @@ -199,7 +186,7 @@ func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constrai return false } -func (iter *ProposedAllocConstraintIterator) Next() *structs.Node { +func (iter *DistinctHostsIterator) Next() *structs.Node { for { // Get the next option from the source option := iter.source.Next() @@ -207,26 +194,14 @@ func (iter *ProposedAllocConstraintIterator) Next() *structs.Node { // Hot-path if the option is nil or there are no distinct_hosts or // distinct_property constraints. hosts := iter.jobDistinctHosts || iter.tgDistinctHosts - properties := iter.distinctProperties.HasDistinctPropertyConstraints() - if option == nil || !(hosts || properties) { + if option == nil || !hosts { return option } // Check if the host constraints are satisfied - if hosts { - if !iter.satisfiesDistinctHosts(option) { - iter.ctx.Metrics().FilterNode(option, structs.ConstraintDistinctHosts) - continue - } - } - - // Check if the property constraints are satisfied - if properties { - satisfied, reason := iter.distinctProperties.SatisfiesDistinctProperties(option, iter.tg.Name) - if !satisfied { - iter.ctx.Metrics().FilterNode(option, reason) - continue - } + if !iter.satisfiesDistinctHosts(option) { + iter.ctx.Metrics().FilterNode(option, structs.ConstraintDistinctHosts) + continue } return option @@ -235,7 +210,7 @@ func (iter *ProposedAllocConstraintIterator) Next() *structs.Node { // satisfiesDistinctHosts checks if the node satisfies a distinct_hosts // constraint either specified at the job level or the TaskGroup level. -func (iter *ProposedAllocConstraintIterator) satisfiesDistinctHosts(option *structs.Node) bool { +func (iter *DistinctHostsIterator) satisfiesDistinctHosts(option *structs.Node) bool { // Check if there is no constraint set. if !(iter.jobDistinctHosts || iter.tgDistinctHosts) { return true @@ -264,7 +239,81 @@ func (iter *ProposedAllocConstraintIterator) satisfiesDistinctHosts(option *stru return true } -func (iter *ProposedAllocConstraintIterator) Reset() { +func (iter *DistinctHostsIterator) Reset() { + iter.source.Reset() +} + +// DistinctPropertyIterator is a FeasibleIterator which returns nodes that pass the +// distinct_property constraint. The constraint ensures that multiple allocations +// do not use the same value of the given property. +type DistinctPropertyIterator struct { + ctx Context + source FeasibleIterator + tg *structs.TaskGroup + job *structs.Job + + // distinctProperties is used to track the distinct properties of the job + // and to check if node options satisfy these constraints. + distinctProperties *propertySet +} + +// NewDistinctPropertyIterator creates a DistinctPropertyIterator from a source. +func NewDistinctPropertyIterator(ctx Context, source FeasibleIterator) *DistinctPropertyIterator { + return &DistinctPropertyIterator{ + ctx: ctx, + source: source, + distinctProperties: NewPropertySet(ctx), + } +} + +func (iter *DistinctPropertyIterator) SetTaskGroup(tg *structs.TaskGroup) { + iter.tg = tg +} + +func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { + iter.job = job + if err := iter.distinctProperties.SetJob(job); err != nil { + iter.ctx.Logger().Printf( + "[ERR] scheduler.dynamic-constraint: failed to build property set: %v", err) + } +} + +func (iter *DistinctPropertyIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool { + for _, con := range constraints { + if con.Operand == structs.ConstraintDistinctHosts { + return true + } + } + + return false +} + +func (iter *DistinctPropertyIterator) Next() *structs.Node { + for { + // Get the next option from the source + option := iter.source.Next() + + // Hot-path if the option is nil or there are no distinct_hosts or + // distinct_property constraints. + properties := iter.distinctProperties.HasDistinctPropertyConstraints() + if option == nil || !properties { + return option + } + + // Check if the property constraints are satisfied + if properties { + satisfied, reason := iter.distinctProperties.SatisfiesDistinctProperties(option, iter.tg.Name) + if !satisfied { + iter.ctx.Metrics().FilterNode(option, reason) + continue + } + } + + return option + } +} + +func (iter *DistinctPropertyIterator) Reset() { iter.source.Reset() // Repopulate the proposed set every time we are reset because an diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index eb065e5a654..097bfc175bf 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -437,7 +437,7 @@ func TestCheckRegexpConstraint(t *testing.T) { // This test puts allocations on the node to test if it detects infeasibility of // nodes correctly and picks the only feasible one -func TestProposedAllocConstraint_JobDistinctHosts(t *testing.T) { +func TestDistinctHostsIterator_JobDistinctHosts(t *testing.T) { _, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), @@ -488,7 +488,7 @@ func TestProposedAllocConstraint_JobDistinctHosts(t *testing.T) { }, } - proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed := NewDistinctHostsIterator(ctx, static) proposed.SetTaskGroup(tg1) proposed.SetJob(job) @@ -502,7 +502,7 @@ func TestProposedAllocConstraint_JobDistinctHosts(t *testing.T) { } } -func TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount(t *testing.T) { +func TestDistinctHostsIterator_JobDistinctHosts_InfeasibleCount(t *testing.T) { _, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), @@ -539,7 +539,7 @@ func TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount(t *testing.T) }, } - proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed := NewDistinctHostsIterator(ctx, static) proposed.SetTaskGroup(tg3) proposed.SetJob(job) @@ -550,7 +550,7 @@ func TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount(t *testing.T) } } -func TestProposedAllocConstraint_TaskGroupDistinctHosts(t *testing.T) { +func TestDistinctHostsIterator_TaskGroupDistinctHosts(t *testing.T) { _, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), @@ -585,7 +585,7 @@ func TestProposedAllocConstraint_TaskGroupDistinctHosts(t *testing.T) { }, } - proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed := NewDistinctHostsIterator(ctx, static) proposed.SetTaskGroup(tg1) proposed.SetJob(&structs.Job{ID: "foo"}) @@ -613,7 +613,7 @@ func TestProposedAllocConstraint_TaskGroupDistinctHosts(t *testing.T) { // This test puts creates allocations across task groups that use a property // value to detect if the constraint at the job level properly considers all // task groups. -func TestProposedAllocConstraint_JobDistinctProperty(t *testing.T) { +func TestDistinctPropertyIterator_JobDistinctProperty(t *testing.T) { state, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), @@ -753,7 +753,7 @@ func TestProposedAllocConstraint_JobDistinctProperty(t *testing.T) { t.Fatalf("failed to UpsertAllocs: %v", err) } - proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed := NewDistinctPropertyIterator(ctx, static) proposed.SetJob(job) proposed.SetTaskGroup(tg2) proposed.Reset() @@ -770,7 +770,7 @@ func TestProposedAllocConstraint_JobDistinctProperty(t *testing.T) { // This test checks that if a node has an allocation on it that gets stopped, // there is a plan to re-use that for a new allocation, that the next select // won't select that node. -func TestProposedAllocConstraint_JobDistinctProperty_RemoveAndReplace(t *testing.T) { +func TestDistinctPropertyIterator_JobDistinctProperty_RemoveAndReplace(t *testing.T) { state, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), @@ -831,7 +831,7 @@ func TestProposedAllocConstraint_JobDistinctProperty_RemoveAndReplace(t *testing t.Fatalf("failed to UpsertAllocs: %v", err) } - proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed := NewDistinctPropertyIterator(ctx, static) proposed.SetJob(job) proposed.SetTaskGroup(tg1) proposed.Reset() @@ -845,7 +845,7 @@ func TestProposedAllocConstraint_JobDistinctProperty_RemoveAndReplace(t *testing // This test creates previous allocations selecting certain property values to // test if it detects infeasibility of property values correctly and picks the // only feasible one -func TestProposedAllocConstraint_JobDistinctProperty_Infeasible(t *testing.T) { +func TestDistinctPropertyIterator_JobDistinctProperty_Infeasible(t *testing.T) { state, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), @@ -903,7 +903,7 @@ func TestProposedAllocConstraint_JobDistinctProperty_Infeasible(t *testing.T) { t.Fatalf("failed to UpsertAllocs: %v", err) } - proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed := NewDistinctPropertyIterator(ctx, static) proposed.SetJob(job) proposed.SetTaskGroup(tg3) proposed.Reset() @@ -917,7 +917,7 @@ func TestProposedAllocConstraint_JobDistinctProperty_Infeasible(t *testing.T) { // This test creates previous allocations selecting certain property values to // test if it detects infeasibility of property values correctly and picks the // only feasible one when the constraint is at the task group. -func TestProposedAllocConstraint_TaskGroupDistinctProperty(t *testing.T) { +func TestDistinctPropertyIterator_TaskGroupDistinctProperty(t *testing.T) { state, ctx := testContext(t) nodes := []*structs.Node{ mock.Node(), @@ -1007,7 +1007,7 @@ func TestProposedAllocConstraint_TaskGroupDistinctProperty(t *testing.T) { t.Fatalf("failed to UpsertAllocs: %v", err) } - proposed := NewProposedAllocConstraintIterator(ctx, static) + proposed := NewDistinctPropertyIterator(ctx, static) proposed.SetJob(job) proposed.SetTaskGroup(tg1) proposed.Reset() diff --git a/scheduler/stack.go b/scheduler/stack.go index d685be1c76e..ef4c3d2d447 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -44,11 +44,12 @@ type GenericStack struct { taskGroupDrivers *DriverChecker taskGroupConstraint *ConstraintChecker - proposedAllocConstraint *ProposedAllocConstraintIterator - binPack *BinPackIterator - jobAntiAff *JobAntiAffinityIterator - limit *LimitIterator - maxScore *MaxScoreIterator + distinctHostsConstraint *DistinctHostsIterator + distinctPropertyConstraint *DistinctPropertyIterator + binPack *BinPackIterator + jobAntiAff *JobAntiAffinityIterator + limit *LimitIterator + maxScore *MaxScoreIterator } // NewGenericStack constructs a stack used for selecting service placements @@ -81,11 +82,14 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs) - // Filter on constraints that are affected by propsed allocations. - s.proposedAllocConstraint = NewProposedAllocConstraintIterator(ctx, s.wrappedChecks) + // Filter on distinct host constraints. + s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks) + + // Filter on distinct property constraints. + s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint) // Upgrade from feasible to rank iterator - rankSource := NewFeasibleRankIterator(ctx, s.proposedAllocConstraint) + rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint) // Apply the bin packing, this depends on the resources needed // by a particular task group. Only enable eviction for the service @@ -134,7 +138,8 @@ func (s *GenericStack) SetNodes(baseNodes []*structs.Node) { func (s *GenericStack) SetJob(job *structs.Job) { s.jobConstraint.SetConstraints(job.Constraints) - s.proposedAllocConstraint.SetJob(job) + s.distinctHostsConstraint.SetJob(job) + s.distinctPropertyConstraint.SetJob(job) s.binPack.SetPriority(job.Priority) s.jobAntiAff.SetJob(job.ID) s.ctx.Eligibility().SetJob(job) @@ -152,7 +157,8 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso // Update the parameters of iterators s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) - s.proposedAllocConstraint.SetTaskGroup(tg) + s.distinctHostsConstraint.SetTaskGroup(tg) + s.distinctPropertyConstraint.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) s.binPack.SetTaskGroup(tg) @@ -187,13 +193,14 @@ func (s *GenericStack) SelectPreferringNodes(tg *structs.TaskGroup, nodes []*str // SystemStack is the Stack used for the System scheduler. It is designed to // attempt to make placements on all nodes. type SystemStack struct { - ctx Context - source *StaticIterator - wrappedChecks *FeasibilityWrapper - jobConstraint *ConstraintChecker - taskGroupDrivers *DriverChecker - taskGroupConstraint *ConstraintChecker - binPack *BinPackIterator + ctx Context + source *StaticIterator + wrappedChecks *FeasibilityWrapper + jobConstraint *ConstraintChecker + taskGroupDrivers *DriverChecker + taskGroupConstraint *ConstraintChecker + distinctPropertyConstraint *DistinctPropertyIterator + binPack *BinPackIterator } // NewSystemStack constructs a stack used for selecting service placements @@ -222,8 +229,11 @@ func NewSystemStack(ctx Context) *SystemStack { tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs) + // Filter on distinct property constraints. + s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.wrappedChecks) + // Upgrade from feasible to rank iterator - rankSource := NewFeasibleRankIterator(ctx, s.wrappedChecks) + rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint) // Apply the bin packing, this depends on the resources needed // by a particular task group. Enable eviction as system jobs are high @@ -239,6 +249,7 @@ func (s *SystemStack) SetNodes(baseNodes []*structs.Node) { func (s *SystemStack) SetJob(job *structs.Job) { s.jobConstraint.SetConstraints(job.Constraints) + s.distinctPropertyConstraint.SetJob(job) s.binPack.SetPriority(job.Priority) s.ctx.Eligibility().SetJob(job) } @@ -255,8 +266,9 @@ func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resou // Update the parameters of iterators s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) - s.binPack.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) + s.distinctPropertyConstraint.SetTaskGroup(tg) + s.binPack.SetTaskGroup(tg) // Get the next option that satisfies the constraints. option := s.binPack.Next() From e4954a48c214fef4759cd6e9d50359f4cea5b562 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 9 Mar 2017 15:20:53 -0800 Subject: [PATCH 5/8] Refactor --- scheduler/feasible.go | 415 +++++---------------------------------- scheduler/propertyset.go | 264 +++++++++++++++++++++++++ 2 files changed, 309 insertions(+), 370 deletions(-) create mode 100644 scheduler/propertyset.go diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 70f3899b4d9..eca90ea07a5 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -7,7 +7,6 @@ import ( "strconv" "strings" - memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/nomad/structs" ) @@ -252,17 +251,17 @@ type DistinctPropertyIterator struct { tg *structs.TaskGroup job *structs.Job - // distinctProperties is used to track the distinct properties of the job - // and to check if node options satisfy these constraints. - distinctProperties *propertySet + hasDistinctPropertyConstraints bool + jobPropertySets []*propertySet + groupPropertySets map[string][]*propertySet } // NewDistinctPropertyIterator creates a DistinctPropertyIterator from a source. func NewDistinctPropertyIterator(ctx Context, source FeasibleIterator) *DistinctPropertyIterator { return &DistinctPropertyIterator{ - ctx: ctx, - source: source, - distinctProperties: NewPropertySet(ctx), + ctx: ctx, + source: source, + groupPropertySets: make(map[string][]*propertySet), } } @@ -272,399 +271,75 @@ func (iter *DistinctPropertyIterator) SetTaskGroup(tg *structs.TaskGroup) { func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { iter.job = job - if err := iter.distinctProperties.SetJob(job); err != nil { - iter.ctx.Logger().Printf( - "[ERR] scheduler.dynamic-constraint: failed to build property set: %v", err) - } -} - -func (iter *DistinctPropertyIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool { - for _, con := range constraints { - if con.Operand == structs.ConstraintDistinctHosts { - return true - } - } - - return false -} - -func (iter *DistinctPropertyIterator) Next() *structs.Node { - for { - // Get the next option from the source - option := iter.source.Next() - - // Hot-path if the option is nil or there are no distinct_hosts or - // distinct_property constraints. - properties := iter.distinctProperties.HasDistinctPropertyConstraints() - if option == nil || !properties { - return option - } - - // Check if the property constraints are satisfied - if properties { - satisfied, reason := iter.distinctProperties.SatisfiesDistinctProperties(option, iter.tg.Name) - if !satisfied { - iter.ctx.Metrics().FilterNode(option, reason) - continue - } - } - - return option - } -} - -func (iter *DistinctPropertyIterator) Reset() { - iter.source.Reset() - - // Repopulate the proposed set every time we are reset because an - // additional allocation may have been added - if err := iter.distinctProperties.PopulateProposed(); err != nil { - iter.ctx.Logger().Printf( - "[ERR] scheduler.dynamic-constraint: failed to populate proposed properties: %v", err) - } -} - -// propertySet is used to track used values for a particular node property. -type propertySet struct { - - // ctx is used to lookup the plan and state - ctx Context - // job stores the job the property set is tracking - job *structs.Job - - // jobConstrainedProperties stores the set of LTargets that we are - // constrained on. The outer key is the task group name. The values stored - // in key "" are those constrained at the job level. - jobConstrainedProperties map[string]map[string]struct{} - - // existingProperties is a mapping of task group/job to properties (LTarget) - // to a string set of used values for pre-placed allocation. - existingProperties map[string]map[string]map[string]struct{} - - // proposedCreateProperties is a mapping of task group/job to properties (LTarget) - // to a string set of used values for proposed allocations. - proposedCreateProperties map[string]map[string]map[string]struct{} - - // clearedProposedProperties is a mapping of task group/job to properties (LTarget) - // to a string set of values that have been cleared and are no longer used - clearedProposedProperties map[string]map[string]map[string]struct{} -} - -// NewPropertySet returns a new property set used to guarantee unique property -// values for new allocation placements. -func NewPropertySet(ctx Context) *propertySet { - p := &propertySet{ - ctx: ctx, - jobConstrainedProperties: make(map[string]map[string]struct{}), - existingProperties: make(map[string]map[string]map[string]struct{}), - } - - return p -} - -// setJob sets the job the property set is tracking. The distinct property -// constraints and property values already used by existing allocations are -// calculated. -func (p *propertySet) SetJob(j *structs.Job) error { - // Store the job - p.job = j - - // Capture all the distinct property constraints in the job - p.buildDistinctProperties() - - // Capture all the used values for those properties - if err := p.populateExisting(); err != nil { - return err - } - - return nil -} - -// HasDistinctPropertyConstraints returns whether there are distinct_property -// constraints on the job. -func (p *propertySet) HasDistinctPropertyConstraints() bool { - return len(p.jobConstrainedProperties) != 0 -} - -// PopulateProposed populates the set of property values used by the proposed -// allocations for the job. This should be called on every reset -func (p *propertySet) PopulateProposed() error { - // Hot path since there is nothing to do - if !p.HasDistinctPropertyConstraints() { - return nil - } - - // Reset the proposed properties - p.proposedCreateProperties = make(map[string]map[string]map[string]struct{}) - p.clearedProposedProperties = make(map[string]map[string]map[string]struct{}) - - // Gather the set of proposed stops. - var stopping []*structs.Allocation - for _, updates := range p.ctx.Plan().NodeUpdate { - stopping = append(stopping, updates...) - } - - // build the property set for the proposed stopped allocations - // This should be called before building the property set for the created - // allocs. - if err := p.buildProperySet(stopping, false, true); err != nil { - return err - } - - // Gather the proposed allocations - var proposed []*structs.Allocation - for _, pallocs := range p.ctx.Plan().NodeAllocation { - proposed = append(proposed, pallocs...) - } - - // build the property set for the proposed new allocations - return p.buildProperySet(proposed, false, false) -} - -// satisfiesDistinctProperties checks if the option satisfies all -// distinct_property constraints given the existing placements and proposed -// placements. If the option does not satisfy the constraints an explanation is -// given. -func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) { - // Hot path if there is nothing to do - jobConstrainedProperties := p.jobConstrainedProperties[""] - tgConstrainedProperties := p.jobConstrainedProperties[tg] - if len(jobConstrainedProperties) == 0 && len(tgConstrainedProperties) == 0 { - return true, "" - } - - // both is used to iterate over both the proposed and existing used - // properties - bothAll := []map[string]map[string]map[string]struct{}{p.existingProperties, p.proposedCreateProperties} - - // Check if the option is unique for all the job properties - for constrainedProperty := range jobConstrainedProperties { - // Get the nodes property value - nValue, ok := p.getProperty(option, constrainedProperty) - if !ok { - return false, fmt.Sprintf("missing property %q", constrainedProperty) - } - - // Check if the nodes value has already been used. - for _, usedProperties := range bothAll { - // Since we are checking at the job level, check all task groups - for group, properties := range usedProperties { - setValues, ok := properties[constrainedProperty] - if !ok { - continue - } - - // Check if the nodes value has been used - _, used := setValues[nValue] - if !used { - continue - } - - // The last check is to ensure that the value hasn't been - // removed in the proposed removals. - if _, cleared := p.clearedProposedProperties[group][constrainedProperty][nValue]; cleared { - continue - } - - return false, fmt.Sprintf("distinct_property: %s=%s already used", constrainedProperty, nValue) - } - } - } - - // bothTG is both filtered at by the task group - bothTG := []map[string]map[string]struct{}{p.existingProperties[tg], p.proposedCreateProperties[tg]} - - // Check if the option is unique for all the task group properties - for constrainedProperty := range tgConstrainedProperties { - // Get the nodes property value - nValue, ok := p.getProperty(option, constrainedProperty) - if !ok { - return false, fmt.Sprintf("missing property %q", constrainedProperty) - } - - // Check if the nodes value has already been used. - for _, properties := range bothTG { - setValues, ok := properties[constrainedProperty] - if !ok { - continue - } - - // Check if the nodes value has been used - if _, used := setValues[nValue]; !used { - continue - } - - // The last check is to ensure that the value hasn't been - // removed in the proposed removals. - if _, cleared := p.clearedProposedProperties[tg][constrainedProperty][nValue]; cleared { - continue - } - - return false, fmt.Sprintf("distinct_property: %s=%s already used", constrainedProperty, nValue) - } - } - - return true, "" -} - -// buildDistinctProperties takes the job and populates the map of distinct -// properties that are constrained on for the job. -func (p *propertySet) buildDistinctProperties() { - for _, c := range p.job.Constraints { + // Build the property sets + for _, c := range job.Constraints { if c.Operand != structs.ConstraintDistinctProperty { continue } - // Store job properties in the magic empty string since it can't be used - // by any task group. - if _, ok := p.jobConstrainedProperties[""]; !ok { - p.jobConstrainedProperties[""] = make(map[string]struct{}) - } - - p.jobConstrainedProperties[""][c.LTarget] = struct{}{} + iter.hasDistinctPropertyConstraints = true + pset := NewPropertySet(iter.ctx, job) + pset.SetJobConstraint(c) + iter.jobPropertySets = append(iter.jobPropertySets, pset) } - for _, tg := range p.job.TaskGroups { + for _, tg := range job.TaskGroups { for _, c := range tg.Constraints { if c.Operand != structs.ConstraintDistinctProperty { continue } - if _, ok := p.jobConstrainedProperties[tg.Name]; !ok { - p.jobConstrainedProperties[tg.Name] = make(map[string]struct{}) - } - - p.jobConstrainedProperties[tg.Name][c.LTarget] = struct{}{} + iter.hasDistinctPropertyConstraints = true + pset := NewPropertySet(iter.ctx, job) + pset.SetTGConstraint(c, tg.Name) + iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) } } } -// populateExisting populates the set of property values used by existing -// allocations for the job. -func (p *propertySet) populateExisting() error { - // Hot path since there is nothing to do - if !p.HasDistinctPropertyConstraints() { - return nil - } - - // Retrieve all previously placed allocations - ws := memdb.NewWatchSet() - allocs, err := p.ctx.State().AllocsByJob(ws, p.job.ID, false) - if err != nil { - return fmt.Errorf("failed to get job's allocations: %v", err) - } - - return p.buildProperySet(allocs, true, false) -} - -// buildProperySet takes a set of allocations and determines what property -// values have been used by them. The existing boolean marks whether these are -// existing allocations or proposed allocations. Stopping marks whether the -// allocations are being stopped. -func (p *propertySet) buildProperySet(allocs []*structs.Allocation, existing, stopping bool) error { - // Only want running allocs - filtered := allocs - if !stopping { - filtered, _ = structs.FilterTerminalAllocs(allocs) - } - - // Get all the nodes that have been used by the allocs - ws := memdb.NewWatchSet() - nodes := make(map[string]*structs.Node) - for _, alloc := range filtered { - if _, ok := nodes[alloc.NodeID]; ok { - continue - } +func (iter *DistinctPropertyIterator) Next() *structs.Node { +OUTER: + for { + // Get the next option from the source + option := iter.source.Next() - node, err := p.ctx.State().NodeByID(ws, alloc.NodeID) - if err != nil { - return fmt.Errorf("failed to lookup node ID %q: %v", alloc.NodeID, err) + // Hot path if there is nothing to check + if option == nil || !iter.hasDistinctPropertyConstraints { + return option } - nodes[alloc.NodeID] = node - } - - // propertySet is the set we are operating on - propertySet := p.existingProperties - if !existing && !stopping { - propertySet = p.proposedCreateProperties - } else if stopping { - propertySet = p.clearedProposedProperties - } - - // Go through each allocation and build the set of property values that have - // been used - for _, alloc := range filtered { - // Gather job related constrained properties - jobProperties := p.jobConstrainedProperties[""] - for constrainedProperty := range jobProperties { - nProperty, ok := p.getProperty(nodes[alloc.NodeID], constrainedProperty) - if !ok { - continue - } - - if _, exists := propertySet[""]; !exists { - propertySet[""] = make(map[string]map[string]struct{}) - } - - if _, exists := propertySet[""][constrainedProperty]; !exists { - propertySet[""][constrainedProperty] = make(map[string]struct{}) - } - - propertySet[""][constrainedProperty][nProperty] = struct{}{} - - // This is a newly created allocation so clear out the fact that - // proposed property is not being used anymore - if !existing && !stopping { - delete(p.clearedProposedProperties[""][constrainedProperty], nProperty) + // Check if the constraints are met + for _, ps := range iter.jobPropertySets { + if satisfies, reason := ps.SatisfiesDistinctProperties(option, iter.tg.Name); !satisfies { + iter.ctx.Metrics().FilterNode(option, reason) + continue OUTER } } - // Gather task group related constrained properties - groupProperties := p.jobConstrainedProperties[alloc.TaskGroup] - for constrainedProperty := range groupProperties { - nProperty, ok := p.getProperty(nodes[alloc.NodeID], constrainedProperty) - if !ok { - continue - } - - if _, exists := propertySet[alloc.TaskGroup]; !exists { - propertySet[alloc.TaskGroup] = make(map[string]map[string]struct{}) - } - if _, exists := propertySet[alloc.TaskGroup][constrainedProperty]; !exists { - propertySet[alloc.TaskGroup][constrainedProperty] = make(map[string]struct{}) - } - - propertySet[alloc.TaskGroup][constrainedProperty][nProperty] = struct{}{} - - // This is a newly created allocation so clear out the fact that - // proposed property is not being used anymore - if !existing && !stopping { - delete(p.clearedProposedProperties[alloc.TaskGroup][constrainedProperty], nProperty) + for _, ps := range iter.groupPropertySets[iter.tg.Name] { + if satisfies, reason := ps.SatisfiesDistinctProperties(option, iter.tg.Name); !satisfies { + iter.ctx.Metrics().FilterNode(option, reason) + continue OUTER } } - } - return nil + return option + } } -// getProperty is used to lookup the property value on the node -func (p *propertySet) getProperty(n *structs.Node, property string) (string, bool) { - if n == nil || property == "" { - return "", false - } +func (iter *DistinctPropertyIterator) Reset() { + iter.source.Reset() - val, ok := resolveConstraintTarget(property, n) - if !ok { - return "", false - } - nodeValue, ok := val.(string) - if !ok { - return "", false + for _, ps := range iter.jobPropertySets { + ps.PopulateProposed() } - return nodeValue, true + for _, sets := range iter.groupPropertySets { + for _, ps := range sets { + ps.PopulateProposed() + } + } } // ConstraintChecker is a FeasibilityChecker which returns nodes that match a diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go new file mode 100644 index 00000000000..811e03b8781 --- /dev/null +++ b/scheduler/propertyset.go @@ -0,0 +1,264 @@ +package scheduler + +import ( + "fmt" + + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/structs" +) + +// propertySet is used to track the values used for a particular property. +type propertySet struct { + // ctx is used to lookup the plan and state + ctx Context + + jobID string + + // constraint is the constraint this property set is checking + constraint *structs.Constraint + + // errorBuilding marks whether there was an error when building the property + // set + errorBuilding error + + // existingValues is the set of values for the given property that have been + // used by pre-existing allocations. + existingValues map[string]struct{} + + // proposedValues is the set of values for the given property that are used + // from proposed allocations. + proposedValues map[string]struct{} + + // clearedValues is the set of values that are no longer being used by + // existingValues because of proposed stops. + clearedValues map[string]struct{} +} + +// NewPropertySet returns a new property set used to guarantee unique property +// values for new allocation placements. +func NewPropertySet(ctx Context, job *structs.Job) *propertySet { + p := &propertySet{ + ctx: ctx, + jobID: job.ID, + existingValues: make(map[string]struct{}), + } + + return p +} + +// SetJobConstraint is used to parameterize the property set for a +// distinct_property constraint set at the job level. +func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) { + // Store the constraint + p.constraint = constraint + + // Retrieve all previously placed allocations + ws := memdb.NewWatchSet() + allocs, err := p.ctx.State().AllocsByJob(ws, p.jobID, false) + if err != nil { + p.errorBuilding = fmt.Errorf("failed to get job's allocations: %v", err) + p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", p.errorBuilding) + return + } + + // Only want running allocs + filtered, _ := structs.FilterTerminalAllocs(allocs) + + // Get all the nodes that have been used by the allocs + nodes, err := p.buildNodeMap(filtered) + if err != nil { + p.errorBuilding = err + p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err) + return + } + + p.populateExisting(constraint, filtered, nodes) +} + +// SetTGConstraint is used to parameterize the property set for a +// distinct_property constraint set at the task group level. The inputs are the +// constraint and the task group name. +func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup string) { + // Store the constraint + p.constraint = constraint + + // Retrieve all previously placed allocations + ws := memdb.NewWatchSet() + allocs, err := p.ctx.State().AllocsByJob(ws, p.jobID, false) + if err != nil { + p.errorBuilding = fmt.Errorf("failed to get job's allocations: %v", err) + p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", p.errorBuilding) + return + } + + // Only want running allocs from the task group + n := len(allocs) + for i := 0; i < n; i++ { + if allocs[i].TaskGroup != taskGroup || allocs[i].TerminalStatus() { + allocs[i], allocs[n-1] = allocs[n-1], nil + i-- + n-- + } + } + allocs = allocs[:n] + + // Get all the nodes that have been used by the allocs + nodes, err := p.buildNodeMap(allocs) + if err != nil { + p.errorBuilding = err + p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err) + return + } + + p.populateExisting(constraint, allocs, nodes) +} + +// populateExisting is a helper shared when setting the constraint to populate +// the existing values. The allocations should be filtered appropriately prior +// to calling. +func (p *propertySet) populateExisting(constraint *structs.Constraint, jobAllocs []*structs.Allocation, nodes map[string]*structs.Node) { + for _, alloc := range jobAllocs { + nProperty, ok := p.getProperty(nodes[alloc.NodeID], constraint.LTarget) + if !ok { + continue + } + + p.existingValues[nProperty] = struct{}{} + } +} + +// PopulateProposed populates the proposed values and recomputes any cleared +// value. It should be called whenever the plan is updated to ensure correct +// results when checking an option. +func (p *propertySet) PopulateProposed() { + + // Reset the proposed properties + p.proposedValues = make(map[string]struct{}) + p.clearedValues = make(map[string]struct{}) + + // Gather the set of proposed stops. + var stopping []*structs.Allocation + for _, updates := range p.ctx.Plan().NodeUpdate { + stopping = append(stopping, updates...) + } + + // Gather the proposed allocations + var proposed []*structs.Allocation + for _, pallocs := range p.ctx.Plan().NodeAllocation { + proposed = append(proposed, pallocs...) + } + proposed, _ = structs.FilterTerminalAllocs(proposed) + + // Get the used nodes + both := make([]*structs.Allocation, 0, len(stopping)+len(proposed)) + both = append(both, stopping...) + both = append(both, proposed...) + nodes, err := p.buildNodeMap(both) + if err != nil { + p.errorBuilding = err + p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err) + return + } + + // Populate the cleared values + for _, alloc := range stopping { + nProperty, ok := p.getProperty(nodes[alloc.NodeID], p.constraint.LTarget) + if !ok { + continue + } + + p.clearedValues[nProperty] = struct{}{} + } + + // Populate the proposed values + for _, alloc := range proposed { + nProperty, ok := p.getProperty(nodes[alloc.NodeID], p.constraint.LTarget) + if !ok { + continue + } + + p.proposedValues[nProperty] = struct{}{} + + // If it was cleared, it is now being used + delete(p.clearedValues, nProperty) + } +} + +// SatisfiesDistinctProperties checks if the option satisfies the +// distinct_property constraints given the existing placements and proposed +// placements. If the option does not satisfy the constraints an explanation is +// given. +func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) { + // Check if there was an error building + if p.errorBuilding != nil { + return false, p.errorBuilding.Error() + } + + // Get the nodes property value + nValue, ok := p.getProperty(option, p.constraint.LTarget) + if !ok { + return false, fmt.Sprintf("missing property %q", p.constraint.LTarget) + } + + // both is used to iterate over both the proposed and existing used + // properties + bothAll := []map[string]struct{}{p.existingValues, p.proposedValues} + + // Check if the nodes value has already been used. + for _, usedProperties := range bothAll { + // Check if the nodes value has been used + _, used := usedProperties[nValue] + if !used { + continue + } + + // Check if the value has been cleared from a proposed stop + if _, cleared := p.clearedValues[nValue]; cleared { + continue + } + + return false, fmt.Sprintf("distinct_property: %s=%s already used", p.constraint.LTarget, nValue) + } + + return true, "" +} + +// buildNodeMap takes a list of allocations and returns a map of the nodes used +// by those allocations +func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*structs.Node, error) { + // Get all the nodes that have been used by the allocs + nodes := make(map[string]*structs.Node) + ws := memdb.NewWatchSet() + for _, alloc := range allocs { + if _, ok := nodes[alloc.NodeID]; ok { + continue + } + + node, err := p.ctx.State().NodeByID(ws, alloc.NodeID) + if err != nil { + return nil, fmt.Errorf("failed to lookup node ID %q: %v", alloc.NodeID, err) + } + + nodes[alloc.NodeID] = node + } + + return nodes, nil +} + +// getProperty is used to lookup the property value on the node +func (p *propertySet) getProperty(n *structs.Node, property string) (string, bool) { + if n == nil || property == "" { + return "", false + } + + val, ok := resolveConstraintTarget(property, n) + if !ok { + return "", false + } + nodeValue, ok := val.(string) + if !ok { + return "", false + } + + return nodeValue, true +} From ddb9292424dcd47211dc88a571e758b91d0be39b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 9 Mar 2017 16:12:43 -0800 Subject: [PATCH 6/8] Fix filtering issue and add a test that would catch it --- scheduler/feasible.go | 5 +- scheduler/generic_sched_test.go | 77 ++++++++++++++++++++++++++++++ scheduler/propertyset.go | 83 +++++++++++++++++---------------- 3 files changed, 122 insertions(+), 43 deletions(-) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index eca90ea07a5..16e5e282f1d 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -267,6 +267,9 @@ func NewDistinctPropertyIterator(ctx Context, source FeasibleIterator) *Distinct func (iter *DistinctPropertyIterator) SetTaskGroup(tg *structs.TaskGroup) { iter.tg = tg + + // Check if there is a distinct property + iter.hasDistinctPropertyConstraints = len(iter.jobPropertySets) != 0 || len(iter.groupPropertySets[tg.Name]) != 0 } func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { @@ -278,7 +281,6 @@ func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { continue } - iter.hasDistinctPropertyConstraints = true pset := NewPropertySet(iter.ctx, job) pset.SetJobConstraint(c) iter.jobPropertySets = append(iter.jobPropertySets, pset) @@ -290,7 +292,6 @@ func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { continue } - iter.hasDistinctPropertyConstraints = true pset := NewPropertySet(iter.ctx, job) pset.SetTGConstraint(c, tg.Name) iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index a4307bc51cc..6df199d6469 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -399,6 +399,83 @@ func TestServiceSched_JobRegister_DistinctProperty(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_DistinctProperty_TaskGroup(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 2; i++ { + node := mock.Node() + node.Meta["ssd"] = "true" + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job that uses distinct property and has count higher than what is + // possible. + job := mock.Job() + job.TaskGroups = append(job.TaskGroups, job.TaskGroups[0].Copy()) + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Constraints = append(job.TaskGroups[0].Constraints, + &structs.Constraint{ + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.ssd}", + }) + + job.TaskGroups[1].Name = "tg2" + job.TaskGroups[1].Count = 1 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval hasn't spawned blocked eval + if len(h.CreateEvals) != 0 { + t.Fatalf("bad: %#v", h.CreateEvals[0]) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 2 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 2 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobRegister_Annotate(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index 811e03b8781..5e42b5eb7eb 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -12,8 +12,12 @@ type propertySet struct { // ctx is used to lookup the plan and state ctx Context + // jobID is the job we are operating on jobID string + // taskGroup is optionally set if the constraint is for a task group + taskGroup string + // constraint is the constraint this property set is checking constraint *structs.Constraint @@ -51,37 +55,25 @@ func NewPropertySet(ctx Context, job *structs.Job) *propertySet { func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) { // Store the constraint p.constraint = constraint - - // Retrieve all previously placed allocations - ws := memdb.NewWatchSet() - allocs, err := p.ctx.State().AllocsByJob(ws, p.jobID, false) - if err != nil { - p.errorBuilding = fmt.Errorf("failed to get job's allocations: %v", err) - p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", p.errorBuilding) - return - } - - // Only want running allocs - filtered, _ := structs.FilterTerminalAllocs(allocs) - - // Get all the nodes that have been used by the allocs - nodes, err := p.buildNodeMap(filtered) - if err != nil { - p.errorBuilding = err - p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err) - return - } - - p.populateExisting(constraint, filtered, nodes) + p.populateExisting(constraint) } // SetTGConstraint is used to parameterize the property set for a // distinct_property constraint set at the task group level. The inputs are the // constraint and the task group name. func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup string) { + // Store that this is for a task group + p.taskGroup = taskGroup + // Store the constraint p.constraint = constraint + p.populateExisting(constraint) +} + +// populateExisting is a helper shared when setting the constraint to populate +// the existing values. +func (p *propertySet) populateExisting(constraint *structs.Constraint) { // Retrieve all previously placed allocations ws := memdb.NewWatchSet() allocs, err := p.ctx.State().AllocsByJob(ws, p.jobID, false) @@ -91,16 +83,8 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup return } - // Only want running allocs from the task group - n := len(allocs) - for i := 0; i < n; i++ { - if allocs[i].TaskGroup != taskGroup || allocs[i].TerminalStatus() { - allocs[i], allocs[n-1] = allocs[n-1], nil - i-- - n-- - } - } - allocs = allocs[:n] + // Filter to the correct set of allocs + allocs = p.filterAllocs(allocs) // Get all the nodes that have been used by the allocs nodes, err := p.buildNodeMap(allocs) @@ -110,14 +94,7 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup return } - p.populateExisting(constraint, allocs, nodes) -} - -// populateExisting is a helper shared when setting the constraint to populate -// the existing values. The allocations should be filtered appropriately prior -// to calling. -func (p *propertySet) populateExisting(constraint *structs.Constraint, jobAllocs []*structs.Allocation, nodes map[string]*structs.Node) { - for _, alloc := range jobAllocs { + for _, alloc := range allocs { nProperty, ok := p.getProperty(nodes[alloc.NodeID], constraint.LTarget) if !ok { continue @@ -141,13 +118,14 @@ func (p *propertySet) PopulateProposed() { for _, updates := range p.ctx.Plan().NodeUpdate { stopping = append(stopping, updates...) } + stopping = p.filterAllocs(stopping) // Gather the proposed allocations var proposed []*structs.Allocation for _, pallocs := range p.ctx.Plan().NodeAllocation { proposed = append(proposed, pallocs...) } - proposed, _ = structs.FilterTerminalAllocs(proposed) + proposed = p.filterAllocs(proposed) // Get the used nodes both := make([]*structs.Allocation, 0, len(stopping)+len(proposed)) @@ -223,6 +201,29 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin return true, "" } +// filterAllocs filters a set of allocations to just be those that are running +// and if the property set is operation at a task group level, for allocations +// for that task group +func (p *propertySet) filterAllocs(allocs []*structs.Allocation) []*structs.Allocation { + n := len(allocs) + for i := 0; i < n; i++ { + remove := allocs[i].TerminalStatus() + + // If the constraint is on the task group filter the allocations to just + // those on the task group + if p.taskGroup != "" { + remove = remove || allocs[i].TaskGroup != p.taskGroup + } + + if remove { + allocs[i], allocs[n-1] = allocs[n-1], nil + i-- + n-- + } + } + return allocs[:n] +} + // buildNodeMap takes a list of allocations and returns a map of the nodes used // by those allocations func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*structs.Node, error) { From cab1a8da95fd9aa55a9cde091e6e32864da4e3a1 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 9 Mar 2017 21:36:27 -0800 Subject: [PATCH 7/8] Feedback addressed --- scheduler/feasible.go | 56 +++++++++++++++++++++------------------- scheduler/propertyset.go | 51 +++++++++++++++++------------------- 2 files changed, 54 insertions(+), 53 deletions(-) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 16e5e282f1d..bd60fc2eaea 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -268,6 +268,19 @@ func NewDistinctPropertyIterator(ctx Context, source FeasibleIterator) *Distinct func (iter *DistinctPropertyIterator) SetTaskGroup(tg *structs.TaskGroup) { iter.tg = tg + // Build the property set at the taskgroup level + if _, ok := iter.groupPropertySets[tg.Name]; !ok { + for _, c := range tg.Constraints { + if c.Operand != structs.ConstraintDistinctProperty { + continue + } + + pset := NewPropertySet(iter.ctx, iter.job) + pset.SetTGConstraint(c, tg.Name) + iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) + } + } + // Check if there is a distinct property iter.hasDistinctPropertyConstraints = len(iter.jobPropertySets) != 0 || len(iter.groupPropertySets[tg.Name]) != 0 } @@ -275,7 +288,7 @@ func (iter *DistinctPropertyIterator) SetTaskGroup(tg *structs.TaskGroup) { func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { iter.job = job - // Build the property sets + // Build the property set at the job level for _, c := range job.Constraints { if c.Operand != structs.ConstraintDistinctProperty { continue @@ -285,22 +298,9 @@ func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { pset.SetJobConstraint(c) iter.jobPropertySets = append(iter.jobPropertySets, pset) } - - for _, tg := range job.TaskGroups { - for _, c := range tg.Constraints { - if c.Operand != structs.ConstraintDistinctProperty { - continue - } - - pset := NewPropertySet(iter.ctx, job) - pset.SetTGConstraint(c, tg.Name) - iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) - } - } } func (iter *DistinctPropertyIterator) Next() *structs.Node { -OUTER: for { // Get the next option from the source option := iter.source.Next() @@ -311,24 +311,28 @@ OUTER: } // Check if the constraints are met - for _, ps := range iter.jobPropertySets { - if satisfies, reason := ps.SatisfiesDistinctProperties(option, iter.tg.Name); !satisfies { - iter.ctx.Metrics().FilterNode(option, reason) - continue OUTER - } - } - - for _, ps := range iter.groupPropertySets[iter.tg.Name] { - if satisfies, reason := ps.SatisfiesDistinctProperties(option, iter.tg.Name); !satisfies { - iter.ctx.Metrics().FilterNode(option, reason) - continue OUTER - } + if !iter.satisfiesProperties(option, iter.jobPropertySets) || + !iter.satisfiesProperties(option, iter.groupPropertySets[iter.tg.Name]) { + continue } return option } } +// satisfiesProperties returns whether the option satisfies the set of +// properties. If not it will be filtered. +func (iter *DistinctPropertyIterator) satisfiesProperties(option *structs.Node, set []*propertySet) bool { + for _, ps := range set { + if satisfies, reason := ps.SatisfiesDistinctProperties(option, iter.tg.Name); !satisfies { + iter.ctx.Metrics().FilterNode(option, reason) + return false + } + } + + return true +} + func (iter *DistinctPropertyIterator) Reset() { iter.source.Reset() diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index 5e42b5eb7eb..aa4a0eab17a 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -94,14 +94,8 @@ func (p *propertySet) populateExisting(constraint *structs.Constraint) { return } - for _, alloc := range allocs { - nProperty, ok := p.getProperty(nodes[alloc.NodeID], constraint.LTarget) - if !ok { - continue - } - - p.existingValues[nProperty] = struct{}{} - } + // Build existing properties map + p.populateProperties(allocs, nodes, p.existingValues) } // PopulateProposed populates the proposed values and recomputes any cleared @@ -139,26 +133,14 @@ func (p *propertySet) PopulateProposed() { } // Populate the cleared values - for _, alloc := range stopping { - nProperty, ok := p.getProperty(nodes[alloc.NodeID], p.constraint.LTarget) - if !ok { - continue - } - - p.clearedValues[nProperty] = struct{}{} - } + p.populateProperties(stopping, nodes, p.clearedValues) // Populate the proposed values - for _, alloc := range proposed { - nProperty, ok := p.getProperty(nodes[alloc.NodeID], p.constraint.LTarget) - if !ok { - continue - } - - p.proposedValues[nProperty] = struct{}{} + p.populateProperties(proposed, nodes, p.proposedValues) - // If it was cleared, it is now being used - delete(p.clearedValues, nProperty) + // Remove any cleared value that is now being used by the proposed allocs + for value := range p.proposedValues { + delete(p.clearedValues, value) } } @@ -173,7 +155,7 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin } // Get the nodes property value - nValue, ok := p.getProperty(option, p.constraint.LTarget) + nValue, ok := getProperty(option, p.constraint.LTarget) if !ok { return false, fmt.Sprintf("missing property %q", p.constraint.LTarget) } @@ -246,8 +228,23 @@ func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*st return nodes, nil } +// populateProperties goes through all allocations and builds up the used +// properties from the nodes storing the results in the passed properties map. +func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map[string]*structs.Node, + properties map[string]struct{}) { + + for _, alloc := range allocs { + nProperty, ok := getProperty(nodes[alloc.NodeID], p.constraint.LTarget) + if !ok { + continue + } + + properties[nProperty] = struct{}{} + } +} + // getProperty is used to lookup the property value on the node -func (p *propertySet) getProperty(n *structs.Node, property string) (string, bool) { +func getProperty(n *structs.Node, property string) (string, bool) { if n == nil || property == "" { return "", false } From 336a976205f3ceee3c650625c6dc23b5a41fe69f Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 9 Mar 2017 22:03:10 -0800 Subject: [PATCH 8/8] Fix in-place update --- scheduler/propertyset.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index aa4a0eab17a..95ed7f9c706 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -84,7 +84,7 @@ func (p *propertySet) populateExisting(constraint *structs.Constraint) { } // Filter to the correct set of allocs - allocs = p.filterAllocs(allocs) + allocs = p.filterAllocs(allocs, true) // Get all the nodes that have been used by the allocs nodes, err := p.buildNodeMap(allocs) @@ -112,14 +112,14 @@ func (p *propertySet) PopulateProposed() { for _, updates := range p.ctx.Plan().NodeUpdate { stopping = append(stopping, updates...) } - stopping = p.filterAllocs(stopping) + stopping = p.filterAllocs(stopping, false) // Gather the proposed allocations var proposed []*structs.Allocation for _, pallocs := range p.ctx.Plan().NodeAllocation { proposed = append(proposed, pallocs...) } - proposed = p.filterAllocs(proposed) + proposed = p.filterAllocs(proposed, true) // Get the used nodes both := make([]*structs.Allocation, 0, len(stopping)+len(proposed)) @@ -186,10 +186,13 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin // filterAllocs filters a set of allocations to just be those that are running // and if the property set is operation at a task group level, for allocations // for that task group -func (p *propertySet) filterAllocs(allocs []*structs.Allocation) []*structs.Allocation { +func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal bool) []*structs.Allocation { n := len(allocs) for i := 0; i < n; i++ { - remove := allocs[i].TerminalStatus() + remove := false + if filterTerminal { + remove = allocs[i].TerminalStatus() + } // If the constraint is on the task group filter the allocations to just // those on the task group