diff --git a/jobspec/parse.go b/jobspec/parse.go index b7ca31c5459..9a98da5223e 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "regexp" + "strconv" "strings" "time" @@ -244,18 +245,32 @@ func parseConstraints(result *[]*structs.Constraint, obj *hclobj.Object) error { // If "version" is provided, set the operand // to "version" and the value to the "RTarget" - if constraint, ok := m["version"]; ok { - m["Operand"] = "version" + if constraint, ok := m[structs.ConstraintVersion]; ok { + m["Operand"] = structs.ConstraintVersion m["RTarget"] = constraint } // If "regexp" is provided, set the operand // to "regexp" and the value to the "RTarget" - if constraint, ok := m["regexp"]; ok { - m["Operand"] = "regexp" + if constraint, ok := m[structs.ConstraintRegex]; ok { + m["Operand"] = structs.ConstraintRegex m["RTarget"] = constraint } + if value, ok := m[structs.ConstraintDistinctHosts]; ok { + enabled, err := strconv.ParseBool(value.(string)) + if err != nil { + return err + } + + // If it is not enabled, skip the constraint. + if !enabled { + continue + } + + m["Operand"] = structs.ConstraintDistinctHosts + } + // Build the constraint var c structs.Constraint if err := mapstructure.WeakDecode(m, &c); err != nil { diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 60c5f1f0946..ccdc9989967 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -165,7 +165,7 @@ func TestParse(t *testing.T) { Hard: true, LTarget: "$attr.kernel.version", RTarget: "~> 3.2", - Operand: "version", + Operand: structs.ConstraintVersion, }, }, }, @@ -185,7 +185,25 @@ func TestParse(t *testing.T) { Hard: true, LTarget: "$attr.kernel.version", RTarget: "[0-9.]+", - Operand: "regexp", + Operand: structs.ConstraintRegex, + }, + }, + }, + false, + }, + + { + "distinctHosts-constraint.hcl", + &structs.Job{ + ID: "foo", + Name: "foo", + Priority: 50, + Region: "global", + Type: "service", + Constraints: []*structs.Constraint{ + &structs.Constraint{ + Hard: true, + Operand: structs.ConstraintDistinctHosts, }, }, }, diff --git a/jobspec/test-fixtures/distinctHosts-constraint.hcl b/jobspec/test-fixtures/distinctHosts-constraint.hcl new file mode 100644 index 00000000000..cf6bc7bfc1f --- /dev/null +++ b/jobspec/test-fixtures/distinctHosts-constraint.hcl @@ -0,0 +1,5 @@ +job "foo" { + constraint { + distinct_hosts = "true" + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 8d58a985784..c82dc88da08 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1027,6 +1027,12 @@ func (t *Task) Validate() error { return mErr.ErrorOrNil() } +const ( + ConstraintDistinctHosts = "distinct_hosts" + ConstraintRegex = "regexp" + ConstraintVersion = "version" +) + // Constraints are used to restrict placement options in the case of // a hard constraint, and used to prefer a placement in the case of // a soft constraint. @@ -1050,11 +1056,11 @@ func (c *Constraint) Validate() error { // Perform additional validation based on operand switch c.Operand { - case "regexp": + case ConstraintRegex: if _, err := regexp.Compile(c.RTarget); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("Regular expression failed to compile: %v", err)) } - case "version": + case ConstraintVersion: if _, err := version.NewConstraint(c.RTarget); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("Version constraint is invalid: %v", err)) } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 6df231a677d..cabf83dfa6e 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -144,7 +144,7 @@ func TestConstraint_Validate(t *testing.T) { } // Perform additional regexp validation - c.Operand = "regexp" + c.Operand = ConstraintRegex c.RTarget = "(foo" err = c.Validate() mErr = err.(*multierror.Error) @@ -153,7 +153,7 @@ func TestConstraint_Validate(t *testing.T) { } // Perform version validation - c.Operand = "version" + c.Operand = ConstraintVersion c.RTarget = "~> foo" err = c.Validate() mErr = err.(*multierror.Error) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 559252045f1..782d7c8751d 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -150,6 +150,106 @@ func (iter *DriverIterator) hasDrivers(option *structs.Node) bool { return true } +// ProposedAllocConstraintIterator is a FeasibleIterator which returns nodes that +// match constraints that are not static such as Node attributes but are +// effected by proposed alloc placements. Examples are distinct_hosts and +// tenancy constraints. This is used to filter on job and task group +// constraints. +type ProposedAllocConstraintIterator struct { + ctx Context + source FeasibleIterator + tg *structs.TaskGroup + job *structs.Job + + // Store whether the Job or TaskGroup has a distinct_hosts constraints so + // they don't have to be calculated every time Next() is called. + tgDistinctHosts bool + jobDistinctHosts bool +} + +// NewProposedAllocConstraintIterator creates a ProposedAllocConstraintIterator +// from a source. +func NewProposedAllocConstraintIterator(ctx Context, source FeasibleIterator) *ProposedAllocConstraintIterator { + iter := &ProposedAllocConstraintIterator{ + ctx: ctx, + source: source, + } + return iter +} + +func (iter *ProposedAllocConstraintIterator) SetTaskGroup(tg *structs.TaskGroup) { + iter.tg = tg + iter.tgDistinctHosts = iter.hasDistinctHostsConstraint(tg.Constraints) +} + +func (iter *ProposedAllocConstraintIterator) SetJob(job *structs.Job) { + iter.job = job + iter.jobDistinctHosts = iter.hasDistinctHostsConstraint(job.Constraints) +} + +func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool { + for _, con := range constraints { + if con.Operand == structs.ConstraintDistinctHosts { + return true + } + } + return false +} + +func (iter *ProposedAllocConstraintIterator) Next() *structs.Node { + for { + // Get the next option from the source + option := iter.source.Next() + + // Hot-path if the option is nil or there are no distinct_hosts constraints. + if option == nil || !(iter.jobDistinctHosts || iter.tgDistinctHosts) { + return option + } + + if !iter.satisfiesDistinctHosts(option) { + iter.ctx.Metrics().FilterNode(option, structs.ConstraintDistinctHosts) + continue + } + + return option + } +} + +// satisfiesDistinctHosts checks if the node satisfies a distinct_hosts +// constraint either specified at the job level or the TaskGroup level. +func (iter *ProposedAllocConstraintIterator) satisfiesDistinctHosts(option *structs.Node) bool { + // Check if there is no constraint set. + if !(iter.jobDistinctHosts || iter.tgDistinctHosts) { + return true + } + + // Get the proposed allocations + proposed, err := iter.ctx.ProposedAllocs(option.ID) + if err != nil { + iter.ctx.Logger().Printf( + "[ERR] scheduler.dynamic-constraint: failed to get proposed allocations: %v", err) + return false + } + + // Skip the node if the task group has already been allocated on it. + for _, alloc := range proposed { + // If the job has a distinct_hosts constraint we only need an alloc + // collision on the JobID but if the constraint is on the TaskGroup then + // we need both a job and TaskGroup collision. + jobCollision := alloc.JobID == iter.job.ID + taskCollision := alloc.TaskGroup == iter.tg.Name + if iter.jobDistinctHosts && jobCollision || jobCollision && taskCollision { + return false + } + } + + return true +} + +func (iter *ProposedAllocConstraintIterator) Reset() { + iter.source.Reset() +} + // ConstraintIterator is a FeasibleIterator which returns nodes // that match a given set of constraints. This is used to filter // on job, task group, and task constraints. @@ -257,6 +357,14 @@ func resolveConstraintTarget(target string, node *structs.Node) (interface{}, bo // checkConstraint checks if a constraint is satisfied func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool { + // Check for constraints not handled by this iterator. + switch operand { + case structs.ConstraintDistinctHosts: + return true + default: + break + } + switch operand { case "=", "==", "is": return reflect.DeepEqual(lVal, rVal) @@ -264,9 +372,9 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool { return !reflect.DeepEqual(lVal, rVal) case "<", "<=", ">", ">=": return checkLexicalOrder(operand, lVal, rVal) - case "version": + case structs.ConstraintVersion: return checkVersionConstraint(ctx, lVal, rVal) - case "regexp": + case structs.ConstraintRegex: return checkRegexpConstraint(ctx, lVal, rVal) default: return false diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 1cf58d1f6b3..b5aead21f83 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -248,12 +248,12 @@ func TestCheckConstraint(t *testing.T) { result: true, }, { - op: "version", + op: structs.ConstraintVersion, lVal: "1.2.3", rVal: "~> 1.0", result: true, }, { - op: "regexp", + op: structs.ConstraintRegex, lVal: "foobarbaz", rVal: "[\\w]+", result: true, }, @@ -382,6 +382,180 @@ func TestCheckRegexpConstraint(t *testing.T) { } } +func TestProposedAllocConstraint_JobDistinctHosts(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + static := NewStaticIterator(ctx, nodes) + + // Create a job with a distinct_hosts constraint and two task groups. + tg1 := &structs.TaskGroup{Name: "bar"} + tg2 := &structs.TaskGroup{Name: "baz"} + + job := &structs.Job{ + ID: "foo", + Constraints: []*structs.Constraint{{Operand: structs.ConstraintDistinctHosts}}, + TaskGroups: []*structs.TaskGroup{tg1, tg2}, + } + + propsed := NewProposedAllocConstraintIterator(ctx, static) + propsed.SetTaskGroup(tg1) + propsed.SetJob(job) + + out := collectFeasible(propsed) + if len(out) != 4 { + t.Fatalf("Bad: %#v", out) + } + + selected := make(map[string]struct{}, 4) + for _, option := range out { + if _, ok := selected[option.ID]; ok { + t.Fatalf("selected node %v for more than one alloc", option) + } + selected[option.ID] = struct{}{} + } +} + +func TestProposedAllocConstraint_JobDistinctHosts_Infeasible(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + static := NewStaticIterator(ctx, nodes) + + // Create a job with a distinct_hosts constraint and two task groups. + tg1 := &structs.TaskGroup{Name: "bar"} + tg2 := &structs.TaskGroup{Name: "baz"} + + job := &structs.Job{ + ID: "foo", + Constraints: []*structs.Constraint{{Operand: structs.ConstraintDistinctHosts}}, + TaskGroups: []*structs.TaskGroup{tg1, tg2}, + } + + // Add allocs placing tg1 on node1 and tg2 on node2. This should make the + // job unsatisfiable. + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: job.ID, + }, + + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: "ignore 2", + }, + } + plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: tg2.Name, + JobID: job.ID, + }, + + // Should be ignored as it is a different job. + &structs.Allocation{ + TaskGroup: tg1.Name, + JobID: "ignore 2", + }, + } + + propsed := NewProposedAllocConstraintIterator(ctx, static) + propsed.SetTaskGroup(tg1) + propsed.SetJob(job) + + out := collectFeasible(propsed) + if len(out) != 0 { + t.Fatalf("Bad: %#v", out) + } +} + +func TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + static := NewStaticIterator(ctx, nodes) + + // Create a job with a distinct_hosts constraint and three task groups. + tg1 := &structs.TaskGroup{Name: "bar"} + tg2 := &structs.TaskGroup{Name: "baz"} + tg3 := &structs.TaskGroup{Name: "bam"} + + job := &structs.Job{ + ID: "foo", + Constraints: []*structs.Constraint{{Operand: structs.ConstraintDistinctHosts}}, + TaskGroups: []*structs.TaskGroup{tg1, tg2, tg3}, + } + + propsed := NewProposedAllocConstraintIterator(ctx, static) + propsed.SetTaskGroup(tg1) + propsed.SetJob(job) + + // It should not be able to place 3 tasks with only two nodes. + out := collectFeasible(propsed) + if len(out) != 2 { + t.Fatalf("Bad: %#v", out) + } +} + +func TestProposedAllocConstraint_TaskGroupDistinctHosts(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + static := NewStaticIterator(ctx, nodes) + + // Create a task group with a distinct_hosts constraint. + taskGroup := &structs.TaskGroup{ + Name: "example", + Constraints: []*structs.Constraint{ + {Operand: structs.ConstraintDistinctHosts}, + }, + } + + // Add a planned alloc to node1. + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: taskGroup.Name, + JobID: "foo", + }, + } + + // Add a planned alloc to node2 with the same task group name but a + // different job. + plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{ + &structs.Allocation{ + TaskGroup: taskGroup.Name, + JobID: "bar", + }, + } + + propsed := NewProposedAllocConstraintIterator(ctx, static) + propsed.SetTaskGroup(taskGroup) + propsed.SetJob(&structs.Job{ID: "foo"}) + + out := collectFeasible(propsed) + if len(out) != 1 { + t.Fatalf("Bad: %#v", out) + } + + // Expect it to skip the first node as there is a previous alloc on it for + // the same task group. + if out[0] != nodes[1] { + t.Fatalf("Bad: %v", out) + } +} + func collectFeasible(iter FeasibleIterator) (out []*structs.Node) { for { next := iter.Next() diff --git a/scheduler/stack.go b/scheduler/stack.go index 2cda626c488..0a00fe686e1 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -35,16 +35,17 @@ type Stack interface { // GenericStack is the Stack used for the Generic scheduler. It is // designed to make better placement decisions at the cost of performance. type GenericStack struct { - batch bool - ctx Context - source *StaticIterator - jobConstraint *ConstraintIterator - taskGroupDrivers *DriverIterator - taskGroupConstraint *ConstraintIterator - binPack *BinPackIterator - jobAntiAff *JobAntiAffinityIterator - limit *LimitIterator - maxScore *MaxScoreIterator + batch bool + ctx Context + source *StaticIterator + jobConstraint *ConstraintIterator + taskGroupDrivers *DriverIterator + taskGroupConstraint *ConstraintIterator + proposedAllocConstraint *ProposedAllocConstraintIterator + binPack *BinPackIterator + jobAntiAff *JobAntiAffinityIterator + limit *LimitIterator + maxScore *MaxScoreIterator } // NewGenericStack constructs a stack used for selecting service placements @@ -69,8 +70,11 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // Filter on task group constraints second s.taskGroupConstraint = NewConstraintIterator(ctx, s.taskGroupDrivers, nil) + // Filter on constraints that are affected by propsed allocations. + s.proposedAllocConstraint = NewProposedAllocConstraintIterator(ctx, s.taskGroupConstraint) + // Upgrade from feasible to rank iterator - rankSource := NewFeasibleRankIterator(ctx, s.taskGroupConstraint) + rankSource := NewFeasibleRankIterator(ctx, s.proposedAllocConstraint) // Apply the bin packing, this depends on the resources needed // by a particular task group. Only enable eviction for the service @@ -119,6 +123,7 @@ func (s *GenericStack) SetNodes(baseNodes []*structs.Node) { func (s *GenericStack) SetJob(job *structs.Job) { s.jobConstraint.SetConstraints(job.Constraints) + s.proposedAllocConstraint.SetJob(job) s.binPack.SetPriority(job.Priority) s.jobAntiAff.SetJob(job.ID) } @@ -135,6 +140,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso // Update the parameters of iterators s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) + s.proposedAllocConstraint.SetTaskGroup(tg) s.binPack.SetTasks(tg.Tasks) // Find the node with the max score diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 417737dcaad..fc726391336 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -555,6 +555,7 @@ func TestInplaceUpdate_Success(t *testing.T) { updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}} stack := NewGenericStack(false, ctx) + stack.SetJob(job) // Do the inplace update. unplaced := inplaceUpdate(ctx, eval, job, stack, updates) diff --git a/website/source/docs/jobspec/index.html.md b/website/source/docs/jobspec/index.html.md index c1f463d002a..c16723e7cd4 100644 --- a/website/source/docs/jobspec/index.html.md +++ b/website/source/docs/jobspec/index.html.md @@ -237,6 +237,18 @@ The `constraint` object supports the following keys: the attribute. This sets the operator to "regexp" and the `value` to the regular expression. +* `distinct_hosts` - `distinct_hosts` accepts a boolean `true`. The default is + `false`. + + When `distinct_hosts` is `true` at the Job level, each instance of all Task + Groups specified in the job is placed on a separate host. + + When `distinct_hosts` is `true` at the Task Group level with count > 1, each + instance of a Task Group is placed on a separate host. Different task groups in + the same job _may_ be co-scheduled. + + Tasks within a task group are always co-scheduled. + Below is a table documenting the variables that can be interpreted: