diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 6a52cd49a76..2e4910181fe 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -221,6 +221,32 @@ func CopySliceAffinities(s []*Affinity) []*Affinity { return c } +func CopySliceSpreads(s []*Spread) []*Spread { + l := len(s) + if l == 0 { + return nil + } + + c := make([]*Spread, l) + for i, v := range s { + c[i] = v.Copy() + } + return c +} + +func CopySliceSpreadTarget(s []*SpreadTarget) []*SpreadTarget { + l := len(s) + if l == 0 { + return nil + } + + c := make([]*SpreadTarget, l) + for i, v := range s { + c[i] = v.Copy() + } + return c +} + // VaultPoliciesSet takes the structure returned by VaultPolicies and returns // the set of required policies func VaultPoliciesSet(policies map[string]map[string]*Vault) []string { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b7fbb38316f..613478d66da 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2008,6 +2008,10 @@ type Job struct { // scheduling preferences that apply to all groups and tasks Affinities []*Affinity + // Spread can be specified at the job level to express spreading + // allocations across a desired attribute, such as datacenter + Spreads []*Spread + // TaskGroups are the collections of task groups that this job needs // to run. Each task group is an atomic unit of scheduling and placement. TaskGroups []*TaskGroup @@ -2185,6 +2189,19 @@ func (j *Job) Validate() error { } } + if j.Type == JobTypeSystem { + if j.Spreads != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza")) + } + } else { + for idx, spread := range j.Spreads { + if err := spread.Validate(); err != nil { + outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err) + mErr.Errors = append(mErr.Errors, outer) + } + } + } + // Check for duplicate task groups taskGroups := make(map[string]int) for idx, tg := range j.TaskGroups { @@ -3336,6 +3353,10 @@ type TaskGroup struct { // Affinities can be specified at the task group level to express // scheduling preferences. Affinities []*Affinity + + // Spread can be specified at the task group level to express spreading + // allocations across a desired attribute, such as datacenter + Spreads []*Spread } func (tg *TaskGroup) Copy() *TaskGroup { @@ -3349,6 +3370,7 @@ func (tg *TaskGroup) Copy() *TaskGroup { ntg.RestartPolicy = ntg.RestartPolicy.Copy() ntg.ReschedulePolicy = ntg.ReschedulePolicy.Copy() ntg.Affinities = CopySliceAffinities(ntg.Affinities) + ntg.Spreads = CopySliceSpreads(ntg.Spreads) if tg.Tasks != nil { tasks := make([]*Task, len(ntg.Tasks)) @@ -3450,6 +3472,19 @@ func (tg *TaskGroup) Validate(j *Job) error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name)) } + if j.Type == JobTypeSystem { + if tg.Spreads != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza")) + } + } else { + for idx, spread := range tg.Spreads { + if err := spread.Validate(); err != nil { + outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err) + mErr.Errors = append(mErr.Errors, outer) + } + } + } + if j.Type == JobTypeSystem { if tg.ReschedulePolicy != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs should not have a reschedule policy")) @@ -5384,6 +5419,101 @@ func (a *Affinity) Validate() error { return mErr.ErrorOrNil() } +// Spread is used to specify desired distribution of allocations according to weight +type Spread struct { + // Attribute is the node attribute used as the spread criteria + Attribute string + + // Weight is the relative weight of this spread, useful when there are multiple + // spread and affinities + Weight int + + // SpreadTarget is used to describe desired percentages for each attribute value + SpreadTarget []*SpreadTarget + + // Memoized string representation + str string +} + +func (s *Spread) Copy() *Spread { + if s == nil { + return nil + } + ns := new(Spread) + *ns = *s + + ns.SpreadTarget = CopySliceSpreadTarget(s.SpreadTarget) + return ns +} + +func (s *Spread) String() string { + if s.str != "" { + return s.str + } + s.str = fmt.Sprintf("%s %s %v", s.Attribute, s.SpreadTarget, s.Weight) + return s.str +} + +func (s *Spread) Validate() error { + var mErr multierror.Error + if s.Attribute == "" { + mErr.Errors = append(mErr.Errors, errors.New("Missing spread attribute")) + } + if s.Weight <= 0 || s.Weight > 100 { + mErr.Errors = append(mErr.Errors, errors.New("Spread stanza must have a positive weight from 0 to 100")) + } + seen := make(map[string]struct{}) + sumPercent := uint32(0) + + for _, target := range s.SpreadTarget { + // Make sure there are no duplicates + _, ok := seen[target.Value] + if !ok { + seen[target.Value] = struct{}{} + } else { + mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target value %q already defined", target.Value))) + } + if target.Percent < 0 || target.Percent > 100 { + mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target percentage for value %q must be between 0 and 100", target.Value))) + } + sumPercent += target.Percent + } + if sumPercent > 100 { + mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Sum of spread target percentages must not be greater than 100%%; got %d%%", sumPercent))) + } + return mErr.ErrorOrNil() +} + +// SpreadTarget is used to specify desired percentages for each attribute value +type SpreadTarget struct { + // Value is a single attribute value, like "dc1" + Value string + + // Percent is the desired percentage of allocs + Percent uint32 + + // Memoized string representation + str string +} + +func (s *SpreadTarget) Copy() *SpreadTarget { + if s == nil { + return nil + } + + ns := new(SpreadTarget) + *ns = *s + return ns +} + +func (s *SpreadTarget) String() string { + if s.str != "" { + return s.str + } + s.str = fmt.Sprintf("%q %v%%", s.Value, s.Percent) + return s.str +} + // EphemeralDisk is an ephemeral disk object type EphemeralDisk struct { // Sticky indicates whether the allocation is sticky to a node diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index f8a97a3bc9d..213ff36d59c 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -405,6 +405,21 @@ func TestJob_SystemJob_Validate(t *testing.T) { err = j.Validate() require.NotNil(t, err) require.Contains(t, err.Error(), "System jobs may not have an affinity stanza") + + // Add spread at job and task group level, that should fail validation + j.Spreads = []*Spread{{ + Attribute: "${node.datacenter}", + Weight: 100, + }} + j.TaskGroups[0].Spreads = []*Spread{{ + Attribute: "${node.datacenter}", + Weight: 100, + }} + + err = j.Validate() + require.NotNil(t, err) + require.Contains(t, err.Error(), "System jobs may not have a spread stanza") + } func TestJob_VaultPolicies(t *testing.T) { @@ -3926,3 +3941,119 @@ func TestNode_Copy(t *testing.T) { require.Equal(node.DrainStrategy, node2.DrainStrategy) require.Equal(node.Drivers, node2.Drivers) } + +func TestSpread_Validate(t *testing.T) { + type tc struct { + spread *Spread + err error + name string + } + + testCases := []tc{ + { + spread: &Spread{}, + err: fmt.Errorf("Missing spread attribute"), + name: "empty spread", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: -1, + }, + err: fmt.Errorf("Spread stanza must have a positive weight from 0 to 100"), + name: "Invalid weight", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: 200, + }, + err: fmt.Errorf("Spread stanza must have a positive weight from 0 to 100"), + name: "Invalid weight", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: 50, + SpreadTarget: []*SpreadTarget{ + { + Value: "dc1", + Percent: 25, + }, + { + Value: "dc2", + Percent: 150, + }, + }, + }, + err: fmt.Errorf("Spread target percentage for value \"dc2\" must be between 0 and 100"), + name: "Invalid percentages", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: 50, + SpreadTarget: []*SpreadTarget{ + { + Value: "dc1", + Percent: 75, + }, + { + Value: "dc2", + Percent: 75, + }, + }, + }, + err: fmt.Errorf("Sum of spread target percentages must not be greater than 100%%; got %d%%", 150), + name: "Invalid percentages", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: 50, + SpreadTarget: []*SpreadTarget{ + { + Value: "dc1", + Percent: 25, + }, + { + Value: "dc1", + Percent: 50, + }, + }, + }, + err: fmt.Errorf("Spread target value \"dc1\" already defined"), + name: "No spread targets", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: 50, + SpreadTarget: []*SpreadTarget{ + { + Value: "dc1", + Percent: 25, + }, + { + Value: "dc2", + Percent: 50, + }, + }, + }, + err: nil, + name: "Valid spread", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.spread.Validate() + if tc.err != nil { + require.NotNil(t, err) + require.Contains(t, err.Error(), tc.err.Error()) + } else { + require.Nil(t, err) + } + }) + } +} diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index d7224178426..d942d622776 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -602,6 +602,174 @@ func TestServiceSched_JobRegister_DistinctProperty_TaskGroup_Incr(t *testing.T) h.AssertEvalStatus(t, structs.EvalStatusComplete) } +// Test job registration with spread configured +func TestServiceSched_Spread(t *testing.T) { + assert := assert.New(t) + + start := uint32(100) + step := uint32(10) + + for i := 0; i < 10; i++ { + name := fmt.Sprintf("%d%% in dc1", start) + t.Run(name, func(t *testing.T) { + h := NewHarness(t) + remaining := uint32(100 - start) + // Create a job that uses spread over data center + job := mock.Job() + job.Datacenters = []string{"dc1", "dc2"} + job.TaskGroups[0].Count = 10 + job.TaskGroups[0].Spreads = append(job.TaskGroups[0].Spreads, + &structs.Spread{ + Attribute: "${node.datacenter}", + Weight: 100, + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: start, + }, + { + Value: "dc2", + Percent: remaining, + }, + }, + }) + assert.Nil(h.State.UpsertJob(h.NextIndex(), job), "UpsertJob") + // Create some nodes, half in dc2 + var nodes []*structs.Node + nodeMap := make(map[string]*structs.Node) + for i := 0; i < 10; i++ { + node := mock.Node() + if i%2 == 0 { + node.Datacenter = "dc2" + } + nodes = append(nodes, node) + assert.Nil(h.State.UpsertNode(h.NextIndex(), node), "UpsertNode") + nodeMap[node.ID] = node + } + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + assert.Nil(h.Process(NewServiceScheduler, eval), "Process") + + // Ensure a single plan + assert.Len(h.Plans, 1, "Number of plans") + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + assert.Nil(plan.Annotations, "Plan.Annotations") + + // Ensure the eval hasn't spawned blocked eval + assert.Len(h.CreateEvals, 0, "Created Evals") + + // Ensure the plan allocated + var planned []*structs.Allocation + dcAllocsMap := make(map[string]int) + for nodeId, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + dc := nodeMap[nodeId].Datacenter + c := dcAllocsMap[dc] + c += len(allocList) + dcAllocsMap[dc] = c + } + assert.Len(planned, 10, "Planned Allocations") + + expectedCounts := make(map[string]int) + expectedCounts["dc1"] = 10 - i + if i > 0 { + expectedCounts["dc2"] = i + } + require.Equal(t, expectedCounts, dcAllocsMap) + + h.AssertEvalStatus(t, structs.EvalStatusComplete) + }) + start = start - step + } +} + +// Test job registration with even spread across dc +func TestServiceSched_EvenSpread(t *testing.T) { + assert := assert.New(t) + + h := NewHarness(t) + // Create a job that uses even spread over data center + job := mock.Job() + job.Datacenters = []string{"dc1", "dc2"} + job.TaskGroups[0].Count = 10 + job.TaskGroups[0].Spreads = append(job.TaskGroups[0].Spreads, + &structs.Spread{ + Attribute: "${node.datacenter}", + Weight: 100, + }) + assert.Nil(h.State.UpsertJob(h.NextIndex(), job), "UpsertJob") + // Create some nodes, half in dc2 + var nodes []*structs.Node + nodeMap := make(map[string]*structs.Node) + for i := 0; i < 10; i++ { + node := mock.Node() + if i%2 == 0 { + node.Datacenter = "dc2" + } + nodes = append(nodes, node) + assert.Nil(h.State.UpsertNode(h.NextIndex(), node), "UpsertNode") + nodeMap[node.ID] = node + } + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + assert.Nil(h.Process(NewServiceScheduler, eval), "Process") + + // Ensure a single plan + assert.Len(h.Plans, 1, "Number of plans") + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + assert.Nil(plan.Annotations, "Plan.Annotations") + + // Ensure the eval hasn't spawned blocked eval + assert.Len(h.CreateEvals, 0, "Created Evals") + + // Ensure the plan allocated + var planned []*structs.Allocation + dcAllocsMap := make(map[string]int) + for nodeId, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + dc := nodeMap[nodeId].Datacenter + c := dcAllocsMap[dc] + c += len(allocList) + dcAllocsMap[dc] = c + } + assert.Len(planned, 10, "Planned Allocations") + + // Expect even split allocs across datacenter + expectedCounts := make(map[string]int) + expectedCounts["dc1"] = 5 + expectedCounts["dc2"] = 5 + + require.Equal(t, expectedCounts, dcAllocsMap) + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobRegister_Annotate(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index c335ec3407d..8ac9b48c6d9 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -23,8 +23,8 @@ type propertySet struct { // taskGroup is optionally set if the constraint is for a task group taskGroup string - // constraint is the constraint this property set is checking - constraint *structs.Constraint + // targetAttribute is the attribute this property set is checking + targetAttribute string // allowedCount is the allowed number of allocations that can have the // distinct property @@ -75,14 +75,7 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup // setConstraint is a shared helper for setting a job or task group constraint. func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup string) { - // Store that this is for a task group - if taskGroup != "" { - p.taskGroup = taskGroup - } - - // Store the constraint - p.constraint = constraint - + var allowedCount uint64 // Determine the number of allowed allocations with the property. if v := constraint.RTarget; v != "" { c, err := strconv.ParseUint(v, 10, 64) @@ -92,14 +85,35 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st return } - p.allowedCount = c + allowedCount = c } else { - p.allowedCount = 1 + allowedCount = 1 } + p.setTargetAttributeWithCount(constraint.LTarget, allowedCount, taskGroup) +} + +// SetTargetAttribute is used to populate this property set without also storing allowed count +// This is used when evaluating spread stanzas +func (p *propertySet) SetTargetAttribute(targetAttribute string, taskGroup string) { + p.setTargetAttributeWithCount(targetAttribute, 0, taskGroup) +} + +// setTargetAttributeWithCount is a shared helper for setting a job or task group attribute and allowedCount +// allowedCount can be zero when this is used in evaluating spread stanzas +func (p *propertySet) setTargetAttributeWithCount(targetAttribute string, allowedCount uint64, taskGroup string) { + // Store that this is for a task group + if taskGroup != "" { + p.taskGroup = taskGroup + } + + // Store the constraint + p.targetAttribute = targetAttribute + + p.allowedCount = allowedCount // Determine the number of existing allocations that are using a property // value - p.populateExisting(constraint) + p.populateExisting() // Populate the proposed when setting the constraint. We do this because // when detecting if we can inplace update an allocation we stage an @@ -110,7 +124,7 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st // populateExisting is a helper shared when setting the constraint to populate // the existing values. -func (p *propertySet) populateExisting(constraint *structs.Constraint) { +func (p *propertySet) populateExisting() { // Retrieve all previously placed allocations ws := memdb.NewWatchSet() allocs, err := p.ctx.State().AllocsByJob(ws, p.namespace, p.jobID, false) @@ -193,19 +207,42 @@ func (p *propertySet) PopulateProposed() { // placements. If the option does not satisfy the constraints an explanation is // given. func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) { + nValue, errorMsg, usedCount := p.UsedCount(option, tg) + if errorMsg != "" { + return false, errorMsg + } + // The property value has been used but within the number of allowed + // allocations. + if usedCount < p.allowedCount { + return true, "" + } + + return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.targetAttribute, nValue, usedCount) +} + +// UsedCount returns the number of times the value of the attribute being tracked by this +// property set is used across current and proposed allocations. It also returns the resolved +// attribute value for the node, and an error message if it couldn't be resolved correctly +func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string, uint64) { // Check if there was an error building if p.errorBuilding != nil { - return false, p.errorBuilding.Error() + return "", p.errorBuilding.Error(), 0 } // Get the nodes property value - nValue, ok := getProperty(option, p.constraint.LTarget) + nValue, ok := getProperty(option, p.targetAttribute) if !ok { - return false, fmt.Sprintf("missing property %q", p.constraint.LTarget) + return nValue, fmt.Sprintf("missing property %q", p.targetAttribute), 0 } + combinedUse := p.GetCombinedUseMap() + usedCount := combinedUse[nValue] + return nValue, "", usedCount +} - // combine the counts of how many times the property has been used by - // existing and proposed allocations +// GetCombinedUseMap counts how many times the property has been used by +// existing and proposed allocations. It also takes into account any stopped +// allocations +func (p *propertySet) GetCombinedUseMap() map[string]uint64 { combinedUse := make(map[string]uint64, helper.IntMax(len(p.existingValues), len(p.proposedValues))) for _, usedValues := range []map[string]uint64{p.existingValues, p.proposedValues} { for propertyValue, usedCount := range usedValues { @@ -228,20 +265,7 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin combinedUse[propertyValue] = 0 } } - - usedCount, used := combinedUse[nValue] - if !used { - // The property value has never been used so we can use it. - return true, "" - } - - // The property value has been used but within the number of allowed - // allocations. - if usedCount < p.allowedCount { - return true, "" - } - - return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.constraint.LTarget, nValue, usedCount) + return combinedUse } // filterAllocs filters a set of allocations to just be those that are running @@ -298,7 +322,7 @@ func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map properties map[string]uint64) { for _, alloc := range allocs { - nProperty, ok := getProperty(nodes[alloc.NodeID], p.constraint.LTarget) + nProperty, ok := getProperty(nodes[alloc.NodeID], p.targetAttribute) if !ok { continue } diff --git a/scheduler/spread.go b/scheduler/spread.go new file mode 100644 index 00000000000..f2d6c3309ea --- /dev/null +++ b/scheduler/spread.go @@ -0,0 +1,253 @@ +package scheduler + +import ( + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // implicitTarget is used to represent any remaining attribute values + // when target percentages don't add up to 100 + implicitTarget = "*" +) + +// SpreadIterator is used to spread allocations across a specified attribute +// according to preset weights +type SpreadIterator struct { + ctx Context + source RankIterator + job *structs.Job + tg *structs.TaskGroup + + // jobSpreads is a slice of spread stored at the job level which apply + // to all task groups + jobSpreads []*structs.Spread + + // tgSpreadInfo is a map per task group with precomputed + // values for desired counts and weight + tgSpreadInfo map[string]spreadAttributeMap + + // sumSpreadWeights tracks the total weight across all spread + // stanzas + sumSpreadWeights int + + // hasSpread is used to early return when the job/task group + // does not have spread configured + hasSpread bool + + // groupProperySets is a memoized map from task group to property sets. + // existing allocs are computed once, and allocs from the plan are updated + // when Reset is called + groupPropertySets map[string][]*propertySet +} + +type spreadAttributeMap map[string]*spreadInfo + +type spreadInfo struct { + weight int + desiredCounts map[string]float64 +} + +func NewSpreadIterator(ctx Context, source RankIterator) *SpreadIterator { + iter := &SpreadIterator{ + ctx: ctx, + source: source, + groupPropertySets: make(map[string][]*propertySet), + tgSpreadInfo: make(map[string]spreadAttributeMap), + } + return iter +} + +func (iter *SpreadIterator) Reset() { + iter.source.Reset() + for _, sets := range iter.groupPropertySets { + for _, ps := range sets { + ps.PopulateProposed() + } + } +} + +func (iter *SpreadIterator) SetJob(job *structs.Job) { + iter.job = job + if job.Spreads != nil { + iter.jobSpreads = job.Spreads + } +} + +func (iter *SpreadIterator) SetTaskGroup(tg *structs.TaskGroup) { + iter.tg = tg + + // Build the property set at the taskgroup level + if _, ok := iter.groupPropertySets[tg.Name]; !ok { + // First add property sets that are at the job level for this task group + for _, spread := range iter.jobSpreads { + pset := NewPropertySet(iter.ctx, iter.job) + pset.SetTargetAttribute(spread.Attribute, tg.Name) + iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) + } + + // Include property sets at the task group level + for _, spread := range tg.Spreads { + pset := NewPropertySet(iter.ctx, iter.job) + pset.SetTargetAttribute(spread.Attribute, tg.Name) + iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) + } + } + + // Check if there are any spreads configured + iter.hasSpread = len(iter.groupPropertySets[tg.Name]) != 0 + + // Build tgSpreadInfo at the task group level + if _, ok := iter.tgSpreadInfo[tg.Name]; !ok { + iter.computeSpreadInfo(tg) + } + +} + +func (iter *SpreadIterator) hasSpreads() bool { + return iter.hasSpread +} + +func (iter *SpreadIterator) Next() *RankedNode { + for { + option := iter.source.Next() + + // Hot path if there is nothing to check + if option == nil || !iter.hasSpreads() { + return option + } + + tgName := iter.tg.Name + propertySets := iter.groupPropertySets[tgName] + // Iterate over each spread attribute's property set and add a weighted score + totalSpreadScore := 0.0 + for _, pset := range propertySets { + nValue, errorMsg, usedCount := pset.UsedCount(option.Node, tgName) + + // Add one to include placement on this node in the scoring calculation + usedCount += 1 + // Set score to -1 if there were errors in building this attribute + if errorMsg != "" { + iter.ctx.Logger().Printf("[WARN] sched: error building spread attributes for task group %v:%v", tgName, errorMsg) + totalSpreadScore -= 1.0 + continue + } + spreadAttributeMap := iter.tgSpreadInfo[tgName] + spreadDetails := spreadAttributeMap[pset.targetAttribute] + + if len(spreadDetails.desiredCounts) == 0 { + // When desired counts map is empty the user didn't specify any targets + // Use even spreading scoring algorithm for this scenario + scoreBoost := evenSpreadScoreBoost(pset, option.Node) + totalSpreadScore += scoreBoost + } else { + + // Get the desired count + desiredCount, ok := spreadDetails.desiredCounts[nValue] + if !ok { + // See if there is an implicit target + desiredCount, ok = spreadDetails.desiredCounts[implicitTarget] + if !ok { + // The desired count for this attribute is zero if it gets here + // so use the maximum possible penalty for this node + totalSpreadScore -= 1.0 + continue + } + } + + // Calculate the relative weight of this specific spread attribute + spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights) + + // Score Boost is proportional the difference between current and desired count + // It is negative when the used count is greater than the desired count + // It is multiplied with the spread weight to account for cases where the job has + // more than one spread attribute + scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight + totalSpreadScore += scoreBoost + } + } + + if totalSpreadScore != 0.0 { + option.Scores = append(option.Scores, totalSpreadScore) + iter.ctx.Metrics().ScoreNode(option.Node, "allocation-spread", totalSpreadScore) + } + return option + } +} + +// evenSpreadScoreBoost is a scoring helper that calculates the score +// for the option when even spread is desired (all attribute values get equal preference) +func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 { + combinedUseMap := pset.GetCombinedUseMap() + if len(combinedUseMap) == 0 { + // Nothing placed yet, so return 0 as the score + return 0.0 + } + // Get the nodes property value + nValue, ok := getProperty(option, pset.targetAttribute) + + // Maximum possible penalty when the attribute isn't set on the node + if !ok { + return -1.0 + } + currentAttributeCount := combinedUseMap[nValue] + minCount := uint64(0) + maxCount := uint64(0) + for _, value := range combinedUseMap { + if minCount == 0 || value < minCount { + minCount = value + } + if maxCount == 0 || value > maxCount { + maxCount = value + } + } + + // calculate boost based on delta between the current and the minimum + var deltaBoost float64 + if minCount == 0 { + deltaBoost = -1.0 + } else { + delta := int(minCount - currentAttributeCount) + deltaBoost = float64(delta) / float64(minCount) + } + if currentAttributeCount != minCount { + // Boost based on delta between current and min + return deltaBoost + } else if minCount == maxCount { + // Maximum possible penalty when the distribution is even + return -1.0 + } + // Penalty based on delta from max value + delta := int(maxCount - minCount) + deltaBoost = float64(delta) / float64(minCount) + return deltaBoost + +} + +// computeSpreadInfo computes and stores percentages and total values +// from all spreads that apply to a specific task group +func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) { + spreadInfos := make(spreadAttributeMap, len(tg.Spreads)) + totalCount := tg.Count + + // Always combine any spread stanzas defined at the job level here + combinedSpreads := make([]*structs.Spread, 0, len(tg.Spreads)+len(iter.jobSpreads)) + combinedSpreads = append(combinedSpreads, tg.Spreads...) + combinedSpreads = append(combinedSpreads, iter.jobSpreads...) + for _, spread := range combinedSpreads { + si := &spreadInfo{weight: spread.Weight, desiredCounts: make(map[string]float64)} + sumDesiredCounts := 0.0 + for _, st := range spread.SpreadTarget { + desiredCount := (float64(st.Percent) / float64(100)) * float64(totalCount) + si.desiredCounts[st.Value] = desiredCount + sumDesiredCounts += desiredCount + } + // Account for remaining count only if there is any spread targets + if sumDesiredCounts > 0 && sumDesiredCounts < float64(totalCount) { + remainingCount := float64(totalCount) - sumDesiredCounts + si.desiredCounts[implicitTarget] = remainingCount + } + spreadInfos[spread.Attribute] = si + iter.sumSpreadWeights += spread.Weight + } + iter.tgSpreadInfo[tg.Name] = spreadInfos +} diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go new file mode 100644 index 00000000000..83fb2749f94 --- /dev/null +++ b/scheduler/spread_test.go @@ -0,0 +1,545 @@ +package scheduler + +import ( + "testing" + + "fmt" + + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestSpreadIterator_SingleAttribute(t *testing.T) { + state, ctx := testContext(t) + dcs := []string{"dc1", "dc2", "dc1", "dc1"} + var nodes []*RankedNode + + // Add these nodes to the state store + for i, dc := range dcs { + node := mock.Node() + node.Datacenter = dc + if err := state.UpsertNode(uint64(100+i), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + } + + static := NewStaticRankIterator(ctx, nodes) + + job := mock.Job() + tg := job.TaskGroups[0] + job.TaskGroups[0].Count = 10 + // add allocs to nodes in dc1 + upserting := []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[2].Node.ID, + }, + } + + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + // Create spread target of 80% in dc1 + // Implicitly, this means 20% in dc2 + spread := &structs.Spread{ + Weight: 100, + Attribute: "${node.datacenter}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: 80, + }, + }, + } + tg.Spreads = []*structs.Spread{spread} + spreadIter := NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter) + + out := collectRanked(scoreNorm) + + // Expect nodes in dc1 with existing allocs to get a boost + // Boost should be ((desiredCount-actual)/desired)*spreadWeight + // For this test, that becomes dc1 = ((8-3)/8 ) = 0.5, and dc2=(2-1)/2 + expectedScores := map[string]float64{ + "dc1": 0.625, + "dc2": 0.5, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + + // Update the plan to add more allocs to nodes in dc1 + // After this step there are enough allocs to meet the desired count in dc1 + ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + // Should be ignored as it is a different job. + { + Namespace: structs.DefaultNamespace, + TaskGroup: "bbb", + JobID: "ignore 2", + Job: job, + ID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + } + ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[3].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[3].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[3].Node.ID, + }, + } + + // Reset the scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect nodes in dc2 with existing allocs to get a boost + // DC1 nodes are not boosted because there are enough allocs to meet + // the desired count + expectedScores = map[string]float64{ + "dc1": 0, + "dc2": 0.5, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } +} + +func TestSpreadIterator_MultipleAttributes(t *testing.T) { + state, ctx := testContext(t) + dcs := []string{"dc1", "dc2", "dc1", "dc1"} + rack := []string{"r1", "r1", "r2", "r2"} + var nodes []*RankedNode + + // Add these nodes to the state store + for i, dc := range dcs { + node := mock.Node() + node.Datacenter = dc + node.Meta["rack"] = rack[i] + if err := state.UpsertNode(uint64(100+i), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + } + + static := NewStaticRankIterator(ctx, nodes) + + job := mock.Job() + tg := job.TaskGroups[0] + job.TaskGroups[0].Count = 10 + // add allocs to nodes in dc1 + upserting := []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[2].Node.ID, + }, + } + + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + spread1 := &structs.Spread{ + Weight: 100, + Attribute: "${node.datacenter}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: 60, + }, + { + Value: "dc2", + Percent: 40, + }, + }, + } + + spread2 := &structs.Spread{ + Weight: 50, + Attribute: "${meta.rack}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "r1", + Percent: 40, + }, + { + Value: "r2", + Percent: 60, + }, + }, + } + + tg.Spreads = []*structs.Spread{spread1, spread2} + spreadIter := NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter) + + out := collectRanked(scoreNorm) + + // Score comes from combining two different spread factors + // Second node should have the highest score because it has no allocs and its in dc2/r1 + expectedScores := map[string]float64{ + nodes[0].Node.ID: 0.500, + nodes[1].Node.ID: 0.667, + nodes[2].Node.ID: 0.556, + nodes[3].Node.ID: 0.556, + } + for _, rn := range out { + require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore)) + } + +} + +func TestSpreadIterator_EvenSpread(t *testing.T) { + state, ctx := testContext(t) + dcs := []string{"dc1", "dc2", "dc1", "dc2", "dc1", "dc2", "dc2", "dc1", "dc1", "dc1"} + var nodes []*RankedNode + + // Add these nodes to the state store + for i, dc := range dcs { + node := mock.Node() + node.Datacenter = dc + if err := state.UpsertNode(uint64(100+i), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + } + + static := NewStaticRankIterator(ctx, nodes) + job := mock.Job() + tg := job.TaskGroups[0] + job.TaskGroups[0].Count = 10 + + // Configure even spread across node.datacenter + spread := &structs.Spread{ + Weight: 100, + Attribute: "${node.datacenter}", + } + tg.Spreads = []*structs.Spread{spread} + spreadIter := NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter) + + out := collectRanked(scoreNorm) + + // Nothing placed so both dc nodes get 0 as the score + expectedScores := map[string]float64{ + "dc1": 0, + "dc2": 0, + } + for _, rn := range out { + require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore)) + } + + // Update the plan to add allocs to nodes in dc1 + // After this step dc2 nodes should get boosted + ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + } + ctx.plan.NodeAllocation[nodes[2].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[2].Node.ID, + }, + } + + // Reset the scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect nodes in dc2 with existing allocs to get a boost + // dc1 nodes are penalized because they have allocs + expectedScores = map[string]float64{ + "dc1": -1, + "dc2": 1, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + + // Update the plan to add more allocs to nodes in dc2 + // After this step dc1 nodes should get boosted + ctx.plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[1].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[1].Node.ID, + }, + } + ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[3].Node.ID, + }, + } + + // Reset the scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect nodes in dc2 to be penalized because there are 3 allocs there now + // dc1 nodes are boosted because that has 2 allocs + expectedScores = map[string]float64{ + "dc1": 0.5, + "dc2": -0.5, + } + for _, rn := range out { + require.Equal(t, fmt.Sprintf("%3.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%3.3f", rn.FinalScore)) + } + + // Add another node in dc3 + node := mock.Node() + node.Datacenter = "dc3" + if err := state.UpsertNode(uint64(1111), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + + // Add another alloc to dc1, now its count matches dc2 + ctx.plan.NodeAllocation[nodes[4].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[4].Node.ID, + }, + } + + // Reset scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect dc1 and dc2 to be penalized because they have 3 allocs + // dc3 should get a boost because it has 0 allocs + expectedScores = map[string]float64{ + "dc1": -1, + "dc2": -1, + "dc3": 1, + } + for _, rn := range out { + require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore)) + } + +} + +// Test scenarios where the spread iterator sets maximum penalty (-1.0) +func TestSpreadIterator_MaxPenalty(t *testing.T) { + state, ctx := testContext(t) + var nodes []*RankedNode + + // Add nodes in dc3 to the state store + for i := 0; i < 5; i++ { + node := mock.Node() + node.Datacenter = "dc3" + if err := state.UpsertNode(uint64(100+i), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + } + + static := NewStaticRankIterator(ctx, nodes) + + job := mock.Job() + tg := job.TaskGroups[0] + job.TaskGroups[0].Count = 5 + + // Create spread target of 80% in dc1 + // and 20% in dc2 + spread := &structs.Spread{ + Weight: 100, + Attribute: "${node.datacenter}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: 80, + }, + { + Value: "dc2", + Percent: 20, + }, + }, + } + tg.Spreads = []*structs.Spread{spread} + spreadIter := NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter) + + out := collectRanked(scoreNorm) + + // All nodes are in dc3 so score should be -1 + for _, rn := range out { + require.Equal(t, -1.0, rn.FinalScore) + } + + // Reset scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + + // Create spread on attribute that doesn't exist on any nodes + spread = &structs.Spread{ + Weight: 100, + Attribute: "${meta.foo}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "bar", + Percent: 80, + }, + { + Value: "baz", + Percent: 20, + }, + }, + } + + tg.Spreads = []*structs.Spread{spread} + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // All nodes don't have the spread attribute so score should be -1 + for _, rn := range out { + require.Equal(t, -1.0, rn.FinalScore) + } + +} diff --git a/scheduler/stack.go b/scheduler/stack.go index 15b8538e5dc..46191c41d51 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -57,6 +57,7 @@ type GenericStack struct { limit *LimitIterator maxScore *MaxScoreIterator nodeAffinity *NodeAffinityIterator + spread *SpreadIterator scoreNorm *ScoreNormalizationIterator } @@ -117,7 +118,9 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty) - s.scoreNorm = NewScoreNormalizationIterator(ctx, s.nodeAffinity) + s.spread = NewSpreadIterator(ctx, s.nodeAffinity) + + s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread) // Apply a limit function. This is to avoid scanning *every* possible node. s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip) @@ -156,6 +159,7 @@ func (s *GenericStack) SetJob(job *structs.Job) { s.binPack.SetPriority(job.Priority) s.jobAntiAff.SetJob(job) s.nodeAffinity.SetJob(job) + s.spread.SetJob(job) s.ctx.Eligibility().SetJob(job) if contextual, ok := s.quota.(ContextualIterator); ok { @@ -200,7 +204,9 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*R s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs) } s.nodeAffinity.SetTaskGroup(tg) - if s.nodeAffinity.hasAffinities() { + s.spread.SetTaskGroup(tg) + + if s.nodeAffinity.hasAffinities() || s.spread.hasSpreads() { s.limit.SetLimit(math.MaxInt32) }