diff --git a/scheduler/feasible.go b/scheduler/feasible.go index eca90ea07a5..16e5e282f1d 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -267,6 +267,9 @@ func NewDistinctPropertyIterator(ctx Context, source FeasibleIterator) *Distinct func (iter *DistinctPropertyIterator) SetTaskGroup(tg *structs.TaskGroup) { iter.tg = tg + + // Check if there is a distinct property + iter.hasDistinctPropertyConstraints = len(iter.jobPropertySets) != 0 || len(iter.groupPropertySets[tg.Name]) != 0 } func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { @@ -278,7 +281,6 @@ func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { continue } - iter.hasDistinctPropertyConstraints = true pset := NewPropertySet(iter.ctx, job) pset.SetJobConstraint(c) iter.jobPropertySets = append(iter.jobPropertySets, pset) @@ -290,7 +292,6 @@ func (iter *DistinctPropertyIterator) SetJob(job *structs.Job) { continue } - iter.hasDistinctPropertyConstraints = true pset := NewPropertySet(iter.ctx, job) pset.SetTGConstraint(c, tg.Name) iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index a4307bc51cc..6df199d6469 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -399,6 +399,83 @@ func TestServiceSched_JobRegister_DistinctProperty(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_DistinctProperty_TaskGroup(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 2; i++ { + node := mock.Node() + node.Meta["ssd"] = "true" + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job that uses distinct property and has count higher than what is + // possible. + job := mock.Job() + job.TaskGroups = append(job.TaskGroups, job.TaskGroups[0].Copy()) + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].Constraints = append(job.TaskGroups[0].Constraints, + &structs.Constraint{ + Operand: structs.ConstraintDistinctProperty, + LTarget: "${meta.ssd}", + }) + + job.TaskGroups[1].Name = "tg2" + job.TaskGroups[1].Count = 1 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval hasn't spawned blocked eval + if len(h.CreateEvals) != 0 { + t.Fatalf("bad: %#v", h.CreateEvals[0]) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 2 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 2 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobRegister_Annotate(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index 811e03b8781..5e42b5eb7eb 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -12,8 +12,12 @@ type propertySet struct { // ctx is used to lookup the plan and state ctx Context + // jobID is the job we are operating on jobID string + // taskGroup is optionally set if the constraint is for a task group + taskGroup string + // constraint is the constraint this property set is checking constraint *structs.Constraint @@ -51,37 +55,25 @@ func NewPropertySet(ctx Context, job *structs.Job) *propertySet { func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) { // Store the constraint p.constraint = constraint - - // Retrieve all previously placed allocations - ws := memdb.NewWatchSet() - allocs, err := p.ctx.State().AllocsByJob(ws, p.jobID, false) - if err != nil { - p.errorBuilding = fmt.Errorf("failed to get job's allocations: %v", err) - p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", p.errorBuilding) - return - } - - // Only want running allocs - filtered, _ := structs.FilterTerminalAllocs(allocs) - - // Get all the nodes that have been used by the allocs - nodes, err := p.buildNodeMap(filtered) - if err != nil { - p.errorBuilding = err - p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err) - return - } - - p.populateExisting(constraint, filtered, nodes) + p.populateExisting(constraint) } // SetTGConstraint is used to parameterize the property set for a // distinct_property constraint set at the task group level. The inputs are the // constraint and the task group name. func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup string) { + // Store that this is for a task group + p.taskGroup = taskGroup + // Store the constraint p.constraint = constraint + p.populateExisting(constraint) +} + +// populateExisting is a helper shared when setting the constraint to populate +// the existing values. +func (p *propertySet) populateExisting(constraint *structs.Constraint) { // Retrieve all previously placed allocations ws := memdb.NewWatchSet() allocs, err := p.ctx.State().AllocsByJob(ws, p.jobID, false) @@ -91,16 +83,8 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup return } - // Only want running allocs from the task group - n := len(allocs) - for i := 0; i < n; i++ { - if allocs[i].TaskGroup != taskGroup || allocs[i].TerminalStatus() { - allocs[i], allocs[n-1] = allocs[n-1], nil - i-- - n-- - } - } - allocs = allocs[:n] + // Filter to the correct set of allocs + allocs = p.filterAllocs(allocs) // Get all the nodes that have been used by the allocs nodes, err := p.buildNodeMap(allocs) @@ -110,14 +94,7 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup return } - p.populateExisting(constraint, allocs, nodes) -} - -// populateExisting is a helper shared when setting the constraint to populate -// the existing values. The allocations should be filtered appropriately prior -// to calling. -func (p *propertySet) populateExisting(constraint *structs.Constraint, jobAllocs []*structs.Allocation, nodes map[string]*structs.Node) { - for _, alloc := range jobAllocs { + for _, alloc := range allocs { nProperty, ok := p.getProperty(nodes[alloc.NodeID], constraint.LTarget) if !ok { continue @@ -141,13 +118,14 @@ func (p *propertySet) PopulateProposed() { for _, updates := range p.ctx.Plan().NodeUpdate { stopping = append(stopping, updates...) } + stopping = p.filterAllocs(stopping) // Gather the proposed allocations var proposed []*structs.Allocation for _, pallocs := range p.ctx.Plan().NodeAllocation { proposed = append(proposed, pallocs...) } - proposed, _ = structs.FilterTerminalAllocs(proposed) + proposed = p.filterAllocs(proposed) // Get the used nodes both := make([]*structs.Allocation, 0, len(stopping)+len(proposed)) @@ -223,6 +201,29 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin return true, "" } +// filterAllocs filters a set of allocations to just be those that are running +// and if the property set is operation at a task group level, for allocations +// for that task group +func (p *propertySet) filterAllocs(allocs []*structs.Allocation) []*structs.Allocation { + n := len(allocs) + for i := 0; i < n; i++ { + remove := allocs[i].TerminalStatus() + + // If the constraint is on the task group filter the allocations to just + // those on the task group + if p.taskGroup != "" { + remove = remove || allocs[i].TaskGroup != p.taskGroup + } + + if remove { + allocs[i], allocs[n-1] = allocs[n-1], nil + i-- + n-- + } + } + return allocs[:n] +} + // buildNodeMap takes a list of allocations and returns a map of the nodes used // by those allocations func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*structs.Node, error) {