From b5e95845e9cbdb60419bda04921244d66587cc46 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 16 Jul 2018 12:52:24 -0500 Subject: [PATCH 01/17] Structs and validation for spread --- nomad/structs/funcs.go | 26 ++++++++++++++++++ nomad/structs/structs.go | 59 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 6a52cd49a76..2e4910181fe 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -221,6 +221,32 @@ func CopySliceAffinities(s []*Affinity) []*Affinity { return c } +func CopySliceSpreads(s []*Spread) []*Spread { + l := len(s) + if l == 0 { + return nil + } + + c := make([]*Spread, l) + for i, v := range s { + c[i] = v.Copy() + } + return c +} + +func CopySliceSpreadTarget(s []*SpreadTarget) []*SpreadTarget { + l := len(s) + if l == 0 { + return nil + } + + c := make([]*SpreadTarget, l) + for i, v := range s { + c[i] = v.Copy() + } + return c +} + // VaultPoliciesSet takes the structure returned by VaultPolicies and returns // the set of required policies func VaultPoliciesSet(policies map[string]map[string]*Vault) []string { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b7fbb38316f..c4b33cb9ac7 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2008,6 +2008,10 @@ type Job struct { // scheduling preferences that apply to all groups and tasks Affinities []*Affinity + // Spread can be specified at the job level to express spreading + // allocations across a desired attribute, such as datacenter + Spreads []*Spread + // TaskGroups are the collections of task groups that this job needs // to run. Each task group is an atomic unit of scheduling and placement. TaskGroups []*TaskGroup @@ -3336,6 +3340,10 @@ type TaskGroup struct { // Affinities can be specified at the task group level to express // scheduling preferences. Affinities []*Affinity + + // Spread can be specified at the task group level to express spreading + // allocations across a desired attribute, such as datacenter + Spreads []*Spread } func (tg *TaskGroup) Copy() *TaskGroup { @@ -3349,6 +3357,7 @@ func (tg *TaskGroup) Copy() *TaskGroup { ntg.RestartPolicy = ntg.RestartPolicy.Copy() ntg.ReschedulePolicy = ntg.ReschedulePolicy.Copy() ntg.Affinities = CopySliceAffinities(ntg.Affinities) + ntg.Spreads = CopySliceSpreads(ntg.Spreads) if tg.Tasks != nil { tasks := make([]*Task, len(ntg.Tasks)) @@ -5384,6 +5393,56 @@ func (a *Affinity) Validate() error { return mErr.ErrorOrNil() } +type Spread struct { + Attribute string + Weight int + SpreadTarget []*SpreadTarget + str string +} + +type SpreadTarget struct { + Value string + Ratio uint32 + str string +} + +func (s *Spread) Copy() *Spread { + if s == nil { + return nil + } + ns := new(Spread) + *ns = *s + + ns.SpreadTarget = CopySliceSpreadTarget(s.SpreadTarget) + return ns +} + +func (s *SpreadTarget) Copy() *SpreadTarget { + if s == nil { + return nil + } + + ns := new(SpreadTarget) + *ns = *s + return ns +} + +func (s *Spread) String() string { + if s.str != "" { + return s.str + } + s.str = fmt.Sprintf("%s %s %v", s.Attribute, s.Weight, s.SpreadTarget) + return s.str +} + +func (s *SpreadTarget) String() string { + if s.str != "" { + return s.str + } + s.str = fmt.Sprintf("%s %v", s.Value, s.Ratio) + return s.str +} + // EphemeralDisk is an ephemeral disk object type EphemeralDisk struct { // Sticky indicates whether the allocation is sticky to a node From 8e2697de962b6b156b6f40d280365cb6b9c42a71 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 17 Jul 2018 17:21:00 -0500 Subject: [PATCH 02/17] Validate method, and rename ratio field to percent --- nomad/structs/structs.go | 34 ++++++++++++-- nomad/structs/structs_test.go | 88 +++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 4 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c4b33cb9ac7..6539d4b9ac5 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5401,9 +5401,9 @@ type Spread struct { } type SpreadTarget struct { - Value string - Ratio uint32 - str string + Value string + Percent uint32 + str string } func (s *Spread) Copy() *Spread { @@ -5439,10 +5439,36 @@ func (s *SpreadTarget) String() string { if s.str != "" { return s.str } - s.str = fmt.Sprintf("%s %v", s.Value, s.Ratio) + s.str = fmt.Sprintf("%s %v", s.Value, s.Percent) return s.str } +func (s *Spread) Validate() error { + var mErr multierror.Error + if s.Attribute == "" { + mErr.Errors = append(mErr.Errors, errors.New("Missing spread attribute")) + } + if s.Weight <= 0 || s.Weight > 100 { + mErr.Errors = append(mErr.Errors, errors.New("Spread stanza must have a positive weight from 0 to 100")) + } + if len(s.SpreadTarget) == 0 { + // TODO(preetha): This should go away if we can assume even spread if there are no targets + // In that case, the target percentages should be calculated at schedule time + mErr.Errors = append(mErr.Errors, errors.New("Atleast one spread target value must be specified")) + } + seen := make(map[string]struct{}) + for _, target := range s.SpreadTarget { + // Make sure there are no duplicates + _, ok := seen[target.Value] + if !ok { + seen[target.Value] = struct{}{} + } else { + mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target value %q already defined", target.Value))) + } + } + return mErr.ErrorOrNil() +} + // EphemeralDisk is an ephemeral disk object type EphemeralDisk struct { // Sticky indicates whether the allocation is sticky to a node diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index f8a97a3bc9d..51c8d5b98f2 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -3926,3 +3926,91 @@ func TestNode_Copy(t *testing.T) { require.Equal(node.DrainStrategy, node2.DrainStrategy) require.Equal(node.Drivers, node2.Drivers) } + +func TestSpread_Validate(t *testing.T) { + type tc struct { + spread *Spread + err error + name string + } + + testCases := []tc{ + { + spread: &Spread{}, + err: fmt.Errorf("Missing spread attribute"), + name: "empty spread", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: -1, + }, + err: fmt.Errorf("Spread stanza must have a positive weight from 0 to 100"), + name: "Invalid weight", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: 200, + }, + err: fmt.Errorf("Spread stanza must have a positive weight from 0 to 100"), + name: "Invalid weight", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: 50, + }, + err: fmt.Errorf("Atleast one spread target value must be specified"), + name: "No spread targets", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: 50, + SpreadTarget: []*SpreadTarget{ + { + Value: "dc1", + Percent: 25, + }, + { + Value: "dc1", + Percent: 50, + }, + }, + }, + err: fmt.Errorf("Spread target value \"dc1\" already defined"), + name: "No spread targets", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: 50, + SpreadTarget: []*SpreadTarget{ + { + Value: "dc1", + Percent: 25, + }, + { + Value: "dc2", + Percent: 50, + }, + }, + }, + err: nil, + name: "Valid spread", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.spread.Validate() + if tc.err != nil { + require.NotNil(t, err) + require.Contains(t, err.Error(), tc.err.Error()) + } else { + require.Nil(t, err) + } + }) + } +} From df59fb49f70099970ffe9d1962eb981b48fb1445 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 17 Jul 2018 17:25:38 -0500 Subject: [PATCH 03/17] Implement spread iterator that scores according to percentage of desired count in each target. Added this as a new step in the stack and some unit tests --- scheduler/generic_sched_test.go | 83 +++++++++++ scheduler/propertyset.go | 84 +++++++---- scheduler/spread.go | 155 ++++++++++++++++++++ scheduler/spread_test.go | 249 ++++++++++++++++++++++++++++++++ scheduler/stack.go | 10 +- 5 files changed, 547 insertions(+), 34 deletions(-) create mode 100644 scheduler/spread.go create mode 100644 scheduler/spread_test.go diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index d7224178426..fd8d81ec953 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -602,6 +602,89 @@ func TestServiceSched_JobRegister_DistinctProperty_TaskGroup_Incr(t *testing.T) h.AssertEvalStatus(t, structs.EvalStatusComplete) } +// Test job registration with spread configured +func TestServiceSched_Spread(t *testing.T) { + h := NewHarness(t) + assert := assert.New(t) + + // Create a job that uses spread over data center + job := mock.Job() + job.Datacenters = []string{"dc1", "dc2"} + job.TaskGroups[0].Count = 10 + job.TaskGroups[0].Spreads = append(job.TaskGroups[0].Spreads, + &structs.Spread{ + Attribute: "${node.datacenter}", + Weight: 100, + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: 70, + }, + { + Value: "dc2", + Percent: 30, + }, + }, + }) + assert.Nil(h.State.UpsertJob(h.NextIndex(), job), "UpsertJob") + + // Create some nodes, half in dc2 + var nodes []*structs.Node + nodeMap := make(map[string]*structs.Node) + for i := 0; i < 6; i++ { + node := mock.Node() + if i%2 == 0 { + node.Datacenter = "dc2" + } + nodes = append(nodes, node) + assert.Nil(h.State.UpsertNode(h.NextIndex(), node), "UpsertNode") + nodeMap[node.ID] = node + } + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + assert.Nil(h.Process(NewServiceScheduler, eval), "Process") + + // Ensure a single plan + assert.Len(h.Plans, 1, "Number of plans") + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + assert.Nil(plan.Annotations, "Plan.Annotations") + + // Ensure the eval hasn't spawned blocked eval + assert.Len(h.CreateEvals, 0, "Created Evals") + + // Ensure the plan allocated + var planned []*structs.Allocation + dcAllocsMap := make(map[string]int) + for nodeId, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + dc := nodeMap[nodeId].Datacenter + c := dcAllocsMap[dc] + c += len(allocList) + dcAllocsMap[dc] = c + } + assert.Len(planned, 10, "Planned Allocations") + + expectedCounts := make(map[string]int) + expectedCounts["dc1"] = 7 + expectedCounts["dc2"] = 3 + require.Equal(t, expectedCounts, dcAllocsMap) + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobRegister_Annotate(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index c335ec3407d..8e664254e41 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -23,8 +23,8 @@ type propertySet struct { // taskGroup is optionally set if the constraint is for a task group taskGroup string - // constraint is the constraint this property set is checking - constraint *structs.Constraint + // targetAttribute is the attribute this property set is checking + targetAttribute string // allowedCount is the allowed number of allocations that can have the // distinct property @@ -60,7 +60,7 @@ func NewPropertySet(ctx Context, job *structs.Job) *propertySet { return p } -// SetJobConstraint is used to parameterize the property set for a +// SetJobConstraintAttribute is used to parameterize the property set for a // distinct_property constraint set at the job level. func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) { p.setConstraint(constraint, "") @@ -75,14 +75,7 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup // setConstraint is a shared helper for setting a job or task group constraint. func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup string) { - // Store that this is for a task group - if taskGroup != "" { - p.taskGroup = taskGroup - } - - // Store the constraint - p.constraint = constraint - + allowedCount := uint64(0) // Determine the number of allowed allocations with the property. if v := constraint.RTarget; v != "" { c, err := strconv.ParseUint(v, 10, 64) @@ -92,14 +85,35 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st return } - p.allowedCount = c + allowedCount = c } else { - p.allowedCount = 1 + allowedCount = 1 + } + p.setPropertySetInner(constraint.LTarget, allowedCount, taskGroup) +} + +// SetTargetAttribute is used to populate this property set without also storing allowed count +// This is used when evaluating spread stanzas +func (p *propertySet) SetTargetAttribute(targetAttribute string, taskGroup string) { + p.setPropertySetInner(targetAttribute, 0, taskGroup) +} + +// setConstraint is a shared helper for setting a job or task group attribute and allowedCount +// allowedCount can be zero when this is used in evaluating spread stanzas +func (p *propertySet) setPropertySetInner(targetAttribute string, allowedCount uint64, taskGroup string) { + // Store that this is for a task group + if taskGroup != "" { + p.taskGroup = taskGroup } + // Store the constraint + p.targetAttribute = targetAttribute + + p.allowedCount = allowedCount + // Determine the number of existing allocations that are using a property // value - p.populateExisting(constraint) + p.populateExisting(targetAttribute) // Populate the proposed when setting the constraint. We do this because // when detecting if we can inplace update an allocation we stage an @@ -110,7 +124,7 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st // populateExisting is a helper shared when setting the constraint to populate // the existing values. -func (p *propertySet) populateExisting(constraint *structs.Constraint) { +func (p *propertySet) populateExisting(targetAttribute string) { // Retrieve all previously placed allocations ws := memdb.NewWatchSet() allocs, err := p.ctx.State().AllocsByJob(ws, p.namespace, p.jobID, false) @@ -193,15 +207,32 @@ func (p *propertySet) PopulateProposed() { // placements. If the option does not satisfy the constraints an explanation is // given. func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) { + nValue, errorMsg, usedCount := p.UsedCount(option, tg) + if errorMsg != "" { + return false, errorMsg + } + // The property value has been used but within the number of allowed + // allocations. + if usedCount < p.allowedCount { + return true, "" + } + + return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.targetAttribute, nValue, usedCount) +} + +// UsedCount returns the number of times the value of the attribute being tracked by this +// property set is used across current and proposed allocations. It also returns the resolved +// attribute value for the node, and an error message if it couldn't be resolved correctly +func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string, uint64) { // Check if there was an error building if p.errorBuilding != nil { - return false, p.errorBuilding.Error() + return "", p.errorBuilding.Error(), 0 } // Get the nodes property value - nValue, ok := getProperty(option, p.constraint.LTarget) + nValue, ok := getProperty(option, p.targetAttribute) if !ok { - return false, fmt.Sprintf("missing property %q", p.constraint.LTarget) + return nValue, fmt.Sprintf("missing property %q", p.targetAttribute), 0 } // combine the counts of how many times the property has been used by @@ -229,19 +260,8 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin } } - usedCount, used := combinedUse[nValue] - if !used { - // The property value has never been used so we can use it. - return true, "" - } - - // The property value has been used but within the number of allowed - // allocations. - if usedCount < p.allowedCount { - return true, "" - } - - return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.constraint.LTarget, nValue, usedCount) + usedCount := combinedUse[nValue] + return nValue, "", usedCount } // filterAllocs filters a set of allocations to just be those that are running @@ -298,7 +318,7 @@ func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map properties map[string]uint64) { for _, alloc := range allocs { - nProperty, ok := getProperty(nodes[alloc.NodeID], p.constraint.LTarget) + nProperty, ok := getProperty(nodes[alloc.NodeID], p.targetAttribute) if !ok { continue } diff --git a/scheduler/spread.go b/scheduler/spread.go new file mode 100644 index 00000000000..db59b332292 --- /dev/null +++ b/scheduler/spread.go @@ -0,0 +1,155 @@ +package scheduler + +import ( + "github.com/hashicorp/nomad/nomad/structs" +) + +// SpreadIterator is used to spread allocations across a specified attribute +// according to preset weights +type SpreadIterator struct { + ctx Context + source RankIterator + job *structs.Job + tg *structs.TaskGroup + jobSpreads []*structs.Spread + tgSpreadInfo map[string]spreadAttributeMap + sumSpreadWeights int + hasSpread bool + groupPropertySets map[string][]*propertySet +} + +type spreadAttributeMap map[string]*spreadInfo + +type spreadInfo struct { + sumWeight uint32 + weight int + desiredCounts map[string]float64 +} + +func NewSpreadIterator(ctx Context, source RankIterator) *SpreadIterator { + iter := &SpreadIterator{ + ctx: ctx, + source: source, + groupPropertySets: make(map[string][]*propertySet), + tgSpreadInfo: make(map[string]spreadAttributeMap), + } + return iter +} + +func (iter *SpreadIterator) Reset() { + iter.source.Reset() + for _, sets := range iter.groupPropertySets { + for _, ps := range sets { + ps.PopulateProposed() + } + } +} + +func (iter *SpreadIterator) SetJob(job *structs.Job) { + iter.job = job + if job.Spreads != nil { + iter.jobSpreads = job.Spreads + } +} + +func (iter *SpreadIterator) SetTaskGroup(tg *structs.TaskGroup) { + iter.tg = tg + + // Build the property set at the taskgroup level + if _, ok := iter.groupPropertySets[tg.Name]; !ok { + // First add property sets that are at the job level for this task group + for _, spread := range iter.jobSpreads { + pset := NewPropertySet(iter.ctx, iter.job) + pset.SetTargetAttribute(spread.Attribute, tg.Name) + iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) + } + + // Include property sets at the task group level + for _, spread := range tg.Spreads { + pset := NewPropertySet(iter.ctx, iter.job) + pset.SetTargetAttribute(spread.Attribute, tg.Name) + iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) + } + } + + // Check if there is a distinct property + iter.hasSpread = len(iter.groupPropertySets[tg.Name]) != 0 + + // Build tgSpreadInfo at the task group level + if _, ok := iter.tgSpreadInfo[tg.Name]; !ok { + iter.computeSpreadInfo(tg) + } + +} + +func (iter *SpreadIterator) hasSpreads() bool { + return iter.hasSpread +} + +func (iter *SpreadIterator) Next() *RankedNode { + for { + option := iter.source.Next() + + // Hot path if there is nothing to check + if option == nil || !iter.hasSpreads() { + return option + } + + tgName := iter.tg.Name + propertySets := iter.groupPropertySets[tgName] + // Iterate over each spread attribute's property set and add a weighted score + totalSpreadScore := 0.0 + for _, pset := range propertySets { + nValue, errorMsg, usedCount := pset.UsedCount(option.Node, tgName) + if errorMsg != "" { + // Skip if there was errors in resolving this attribute to compute used counts + continue + } + spreadAttributeMap := iter.tgSpreadInfo[tgName] + spreadDetails := spreadAttributeMap[pset.targetAttribute] + // Get the desired count + desiredCount, ok := spreadDetails.desiredCounts[nValue] + if !ok { + // Warn about missing ratio + iter.ctx.Logger().Printf("[WARN] sched: missing desired distribution percentage for attribute value %v in spread stanza for job %v", nValue, iter.job.ID) + continue + } + if float64(usedCount) < desiredCount { + // Calculate the relative weight of this specific spread attribute + spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights) + // Score Boost is proportional the difference between current and desired count + // It is multiplied with the spread weight to account for cases where the job has + // more than one spread attribute + scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight + totalSpreadScore += scoreBoost + } + } + + if totalSpreadScore != 0.0 { + option.Scores = append(option.Scores, totalSpreadScore) + iter.ctx.Metrics().ScoreNode(option.Node, "allocation-spread", totalSpreadScore) + } + return option + } +} + +// computeSpreadInfo computes and stores percentages and total values +// from all spreads that apply to a specific task group +func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) { + spreadInfos := make(spreadAttributeMap, len(tg.Spreads)) + totalCount := tg.Count + for _, spread := range tg.Spreads { + sumWeight := uint32(0) + for _, st := range spread.SpreadTarget { + sumWeight += st.Percent + } + si := &spreadInfo{sumWeight: sumWeight, weight: spread.Weight, desiredCounts: make(map[string]float64)} + for _, st := range spread.SpreadTarget { + desiredCount := (float64(st.Percent) / float64(sumWeight)) * float64(totalCount) + si.desiredCounts[st.Value] = desiredCount + } + spreadInfos[spread.Attribute] = si + iter.sumSpreadWeights += spread.Weight + } + iter.tgSpreadInfo[tg.Name] = spreadInfos +} diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go new file mode 100644 index 00000000000..3ec6cc8c244 --- /dev/null +++ b/scheduler/spread_test.go @@ -0,0 +1,249 @@ +package scheduler + +import ( + "testing" + + "fmt" + + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestSpreadIterator_SingleAttribute(t *testing.T) { + state, ctx := testContext(t) + dcs := []string{"dc1", "dc2", "dc1", "dc1"} + var nodes []*RankedNode + + // Add these nodes to the state store + for i, dc := range dcs { + node := mock.Node() + node.Datacenter = dc + if err := state.UpsertNode(uint64(100+i), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + } + + static := NewStaticRankIterator(ctx, nodes) + + job := mock.Job() + tg := job.TaskGroups[0] + job.TaskGroups[0].Count = 5 + // add allocs to nodes in dc1 + upserting := []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[2].Node.ID, + }, + } + + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + spread := &structs.Spread{ + Weight: 100, + Attribute: "${node.datacenter}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: 80, + }, + { + Value: "dc2", + Percent: 20, + }, + }, + } + tg.Spreads = []*structs.Spread{spread} + spreadIter := NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter) + + out := collectRanked(scoreNorm) + + // Expect nodes in dc1 with existing allocs to get a boost + // Boost should be ((desiredCount-actual)/expected)*spreadWeight + // For this test, that becomes dc1 = ((4-2)/4 ) = 0.5, and dc2=(1-0)/1 + expectedScores := map[string]float64{ + "dc1": 0.5, + "dc2": 1.0, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + + // Update the plan to add more allocs to nodes in dc1 + // After this step there are enough allocs to meet the desired count in dc1 + ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + // Should be ignored as it is a different job. + { + Namespace: structs.DefaultNamespace, + TaskGroup: "bbb", + JobID: "ignore 2", + Job: job, + ID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + } + ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[3].Node.ID, + }, + } + + // Reset the scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect nodes in dc2 with existing allocs to get a boost + // DC1 nodes are not boosted because there are enough allocs to meet + // the desired count + expectedScores = map[string]float64{ + "dc1": 0, + "dc2": 1.0, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } +} + +func TestSpreadIterator_MultipleAttributes(t *testing.T) { + state, ctx := testContext(t) + dcs := []string{"dc1", "dc2", "dc1", "dc1"} + rack := []string{"r1", "r1", "r2", "r2"} + var nodes []*RankedNode + + // Add these nodes to the state store + for i, dc := range dcs { + node := mock.Node() + node.Datacenter = dc + node.Meta["rack"] = rack[i] + if err := state.UpsertNode(uint64(100+i), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + } + + static := NewStaticRankIterator(ctx, nodes) + + job := mock.Job() + tg := job.TaskGroups[0] + job.TaskGroups[0].Count = 5 + // add allocs to nodes in dc1 + upserting := []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[2].Node.ID, + }, + } + + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + spread1 := &structs.Spread{ + Weight: 100, + Attribute: "${node.datacenter}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: 60, + }, + { + Value: "dc2", + Percent: 40, + }, + }, + } + + spread2 := &structs.Spread{ + Weight: 50, + Attribute: "${meta.rack}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "r1", + Percent: 40, + }, + { + Value: "r2", + Percent: 60, + }, + }, + } + + tg.Spreads = []*structs.Spread{spread1, spread2} + spreadIter := NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter) + + out := collectRanked(scoreNorm) + + // Score come from combining two different spread factors + // Second node should have the highest score because it has no allocs and its in dc2/r1 + expectedScores := map[string]float64{ + nodes[0].Node.ID: 0.389, + nodes[1].Node.ID: 0.833, + nodes[2].Node.ID: 0.444, + nodes[3].Node.ID: 0.444, + } + for _, rn := range out { + require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore)) + } + +} diff --git a/scheduler/stack.go b/scheduler/stack.go index 15b8538e5dc..46191c41d51 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -57,6 +57,7 @@ type GenericStack struct { limit *LimitIterator maxScore *MaxScoreIterator nodeAffinity *NodeAffinityIterator + spread *SpreadIterator scoreNorm *ScoreNormalizationIterator } @@ -117,7 +118,9 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty) - s.scoreNorm = NewScoreNormalizationIterator(ctx, s.nodeAffinity) + s.spread = NewSpreadIterator(ctx, s.nodeAffinity) + + s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread) // Apply a limit function. This is to avoid scanning *every* possible node. s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip) @@ -156,6 +159,7 @@ func (s *GenericStack) SetJob(job *structs.Job) { s.binPack.SetPriority(job.Priority) s.jobAntiAff.SetJob(job) s.nodeAffinity.SetJob(job) + s.spread.SetJob(job) s.ctx.Eligibility().SetJob(job) if contextual, ok := s.quota.(ContextualIterator); ok { @@ -200,7 +204,9 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*R s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs) } s.nodeAffinity.SetTaskGroup(tg) - if s.nodeAffinity.hasAffinities() { + s.spread.SetTaskGroup(tg) + + if s.nodeAffinity.hasAffinities() || s.spread.hasSpreads() { s.limit.SetLimit(math.MaxInt32) } From ed96122ec3061fecb17ab2af5a1a0628e92ca046 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 17 Jul 2018 18:10:54 -0500 Subject: [PATCH 04/17] Fix warnings --- nomad/structs/structs.go | 2 +- scheduler/propertyset.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6539d4b9ac5..60be7e20c83 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5431,7 +5431,7 @@ func (s *Spread) String() string { if s.str != "" { return s.str } - s.str = fmt.Sprintf("%s %s %v", s.Attribute, s.Weight, s.SpreadTarget) + s.str = fmt.Sprintf("%s %s %v", s.Attribute, s.SpreadTarget, s.Weight) return s.str } diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index 8e664254e41..0218b032616 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -75,7 +75,7 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup // setConstraint is a shared helper for setting a job or task group constraint. func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup string) { - allowedCount := uint64(0) + var allowedCount uint64 // Determine the number of allowed allocations with the property. if v := constraint.RTarget; v != "" { c, err := strconv.ParseUint(v, 10, 64) From a884077f8ad82a1141b29c3e2caee95660a78020 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 18 Jul 2018 10:53:37 -0500 Subject: [PATCH 05/17] Include spreads configured at job level when precomputing weights/desired counts. --- scheduler/spread.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/scheduler/spread.go b/scheduler/spread.go index db59b332292..d3030519b97 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -101,8 +101,8 @@ func (iter *SpreadIterator) Next() *RankedNode { totalSpreadScore := 0.0 for _, pset := range propertySets { nValue, errorMsg, usedCount := pset.UsedCount(option.Node, tgName) + // Skip if there was errors in resolving this attribute to compute used counts if errorMsg != "" { - // Skip if there was errors in resolving this attribute to compute used counts continue } spreadAttributeMap := iter.tgSpreadInfo[tgName] @@ -138,7 +138,12 @@ func (iter *SpreadIterator) Next() *RankedNode { func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) { spreadInfos := make(spreadAttributeMap, len(tg.Spreads)) totalCount := tg.Count - for _, spread := range tg.Spreads { + + // Always combine any spread stanzas defined at the job level here + combinedSpreads := make([]*structs.Spread, 0, len(tg.Spreads)+len(iter.jobSpreads)) + combinedSpreads = append(combinedSpreads, tg.Spreads...) + combinedSpreads = append(combinedSpreads, iter.jobSpreads...) + for _, spread := range combinedSpreads { sumWeight := uint32(0) for _, st := range spread.SpreadTarget { sumWeight += st.Percent From 4a3890c25a30e74b4bd8845d1752abb59eeea673 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 18 Jul 2018 11:04:59 -0500 Subject: [PATCH 06/17] validate spread from job/task group validate methods --- nomad/structs/structs.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 60be7e20c83..6d2eb986e07 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2189,6 +2189,13 @@ func (j *Job) Validate() error { } } + for idx, spread := range j.Spreads { + if err := spread.Validate(); err != nil { + outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err) + mErr.Errors = append(mErr.Errors, outer) + } + } + // Check for duplicate task groups taskGroups := make(map[string]int) for idx, tg := range j.TaskGroups { @@ -3459,6 +3466,13 @@ func (tg *TaskGroup) Validate(j *Job) error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name)) } + for idx, spread := range tg.Spreads { + if err := spread.Validate(); err != nil { + outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err) + mErr.Errors = append(mErr.Errors, outer) + } + } + if j.Type == JobTypeSystem { if tg.ReschedulePolicy != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs should not have a reschedule policy")) From 2b6b20912f43adc7855af9d5e4711dd0f9a35eff Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 25 Jul 2018 19:08:25 -0500 Subject: [PATCH 07/17] Allow empty spread targets, and validate target percentages. --- nomad/structs/structs.go | 103 +++++++++++++++++++++------------- nomad/structs/structs_test.go | 47 +++++++++++++++- scheduler/propertyset.go | 14 ++--- 3 files changed, 116 insertions(+), 48 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6d2eb986e07..d95c6c83e36 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2189,10 +2189,16 @@ func (j *Job) Validate() error { } } - for idx, spread := range j.Spreads { - if err := spread.Validate(); err != nil { - outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err) - mErr.Errors = append(mErr.Errors, outer) + if j.Type == JobTypeSystem { + if j.Spreads != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a s stanza")) + } + } else { + for idx, spread := range j.Spreads { + if err := spread.Validate(); err != nil { + outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err) + mErr.Errors = append(mErr.Errors, outer) + } } } @@ -3466,10 +3472,16 @@ func (tg *TaskGroup) Validate(j *Job) error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name)) } - for idx, spread := range tg.Spreads { - if err := spread.Validate(); err != nil { - outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err) - mErr.Errors = append(mErr.Errors, outer) + if j.Type == JobTypeSystem { + if tg.Spreads != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza")) + } + } else { + for idx, spread := range tg.Spreads { + if err := spread.Validate(); err != nil { + outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err) + mErr.Errors = append(mErr.Errors, outer) + } } } @@ -5407,19 +5419,18 @@ func (a *Affinity) Validate() error { return mErr.ErrorOrNil() } +// Spread is used to specify desired distribution of allocations according to weight type Spread struct { - Attribute string - Weight int + // Attribute is the node attribute used as the spread criteria + Attribute string + // Weight is the relative weight of this spread, useful when there are multiple + // spread and affinities + Weight int + // SpreadTarget is used to describe desired percentages for each attribute value SpreadTarget []*SpreadTarget str string } -type SpreadTarget struct { - Value string - Percent uint32 - str string -} - func (s *Spread) Copy() *Spread { if s == nil { return nil @@ -5431,16 +5442,6 @@ func (s *Spread) Copy() *Spread { return ns } -func (s *SpreadTarget) Copy() *SpreadTarget { - if s == nil { - return nil - } - - ns := new(SpreadTarget) - *ns = *s - return ns -} - func (s *Spread) String() string { if s.str != "" { return s.str @@ -5449,14 +5450,6 @@ func (s *Spread) String() string { return s.str } -func (s *SpreadTarget) String() string { - if s.str != "" { - return s.str - } - s.str = fmt.Sprintf("%s %v", s.Value, s.Percent) - return s.str -} - func (s *Spread) Validate() error { var mErr multierror.Error if s.Attribute == "" { @@ -5465,12 +5458,9 @@ func (s *Spread) Validate() error { if s.Weight <= 0 || s.Weight > 100 { mErr.Errors = append(mErr.Errors, errors.New("Spread stanza must have a positive weight from 0 to 100")) } - if len(s.SpreadTarget) == 0 { - // TODO(preetha): This should go away if we can assume even spread if there are no targets - // In that case, the target percentages should be calculated at schedule time - mErr.Errors = append(mErr.Errors, errors.New("Atleast one spread target value must be specified")) - } seen := make(map[string]struct{}) + sumPercent := uint32(0) + for _, target := range s.SpreadTarget { // Make sure there are no duplicates _, ok := seen[target.Value] @@ -5479,10 +5469,45 @@ func (s *Spread) Validate() error { } else { mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target value %q already defined", target.Value))) } + if target.Percent < 0 || target.Percent > 100 { + mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target percentage for value %q must be between 0 and 100", target.Value))) + } + sumPercent += target.Percent + } + if sumPercent > 100 { + mErr.Errors = append(mErr.Errors, errors.New("Sum of spread target percentages must not be greater than 100")) } return mErr.ErrorOrNil() } +// SpreadTarget is used to specify desired percentages +// for each attribute value +type SpreadTarget struct { + // Value is a single attribute value, like "dc1" + Value string + // Percent is the desired percentage of allocs + Percent uint32 + str string +} + +func (s *SpreadTarget) Copy() *SpreadTarget { + if s == nil { + return nil + } + + ns := new(SpreadTarget) + *ns = *s + return ns +} + +func (s *SpreadTarget) String() string { + if s.str != "" { + return s.str + } + s.str = fmt.Sprintf("%q %v%%", s.Value, s.Percent) + return s.str +} + // EphemeralDisk is an ephemeral disk object type EphemeralDisk struct { // Sticky indicates whether the allocation is sticky to a node diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 51c8d5b98f2..8f2a60504f3 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -405,6 +405,21 @@ func TestJob_SystemJob_Validate(t *testing.T) { err = j.Validate() require.NotNil(t, err) require.Contains(t, err.Error(), "System jobs may not have an affinity stanza") + + // Add spread at job and task group level, that should fail validation + j.Spreads = []*Spread{{ + Attribute: "${node.datacenter}", + Weight: 100, + }} + j.TaskGroups[0].Spreads = []*Spread{{ + Attribute: "${node.datacenter}", + Weight: 100, + }} + + err = j.Validate() + require.NotNil(t, err) + require.Contains(t, err.Error(), "System jobs may not have a spread stanza") + } func TestJob_VaultPolicies(t *testing.T) { @@ -3960,9 +3975,37 @@ func TestSpread_Validate(t *testing.T) { spread: &Spread{ Attribute: "${node.datacenter}", Weight: 50, + SpreadTarget: []*SpreadTarget{ + { + Value: "dc1", + Percent: 25, + }, + { + Value: "dc2", + Percent: 150, + }, + }, }, - err: fmt.Errorf("Atleast one spread target value must be specified"), - name: "No spread targets", + err: fmt.Errorf("Spread target percentage for value \"dc2\" must be between 0 and 100"), + name: "Invalid percentages", + }, + { + spread: &Spread{ + Attribute: "${node.datacenter}", + Weight: 50, + SpreadTarget: []*SpreadTarget{ + { + Value: "dc1", + Percent: 75, + }, + { + Value: "dc2", + Percent: 75, + }, + }, + }, + err: fmt.Errorf("Sum of spread target percentages must not be greater than 100"), + name: "Invalid percentages", }, { spread: &Spread{ diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index 0218b032616..85fe45d62dc 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -60,7 +60,7 @@ func NewPropertySet(ctx Context, job *structs.Job) *propertySet { return p } -// SetJobConstraintAttribute is used to parameterize the property set for a +// SetJobConstraint is used to parameterize the property set for a // distinct_property constraint set at the job level. func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) { p.setConstraint(constraint, "") @@ -89,18 +89,18 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st } else { allowedCount = 1 } - p.setPropertySetInner(constraint.LTarget, allowedCount, taskGroup) + p.setTargetAttributeWithCount(constraint.LTarget, allowedCount, taskGroup) } // SetTargetAttribute is used to populate this property set without also storing allowed count // This is used when evaluating spread stanzas func (p *propertySet) SetTargetAttribute(targetAttribute string, taskGroup string) { - p.setPropertySetInner(targetAttribute, 0, taskGroup) + p.setTargetAttributeWithCount(targetAttribute, 0, taskGroup) } -// setConstraint is a shared helper for setting a job or task group attribute and allowedCount +// setTargetAttributeWithCount is a shared helper for setting a job or task group attribute and allowedCount // allowedCount can be zero when this is used in evaluating spread stanzas -func (p *propertySet) setPropertySetInner(targetAttribute string, allowedCount uint64, taskGroup string) { +func (p *propertySet) setTargetAttributeWithCount(targetAttribute string, allowedCount uint64, taskGroup string) { // Store that this is for a task group if taskGroup != "" { p.taskGroup = taskGroup @@ -113,7 +113,7 @@ func (p *propertySet) setPropertySetInner(targetAttribute string, allowedCount u // Determine the number of existing allocations that are using a property // value - p.populateExisting(targetAttribute) + p.populateExisting() // Populate the proposed when setting the constraint. We do this because // when detecting if we can inplace update an allocation we stage an @@ -124,7 +124,7 @@ func (p *propertySet) setPropertySetInner(targetAttribute string, allowedCount u // populateExisting is a helper shared when setting the constraint to populate // the existing values. -func (p *propertySet) populateExisting(targetAttribute string) { +func (p *propertySet) populateExisting() { // Retrieve all previously placed allocations ws := memdb.NewWatchSet() allocs, err := p.ctx.State().AllocsByJob(ws, p.namespace, p.jobID, false) From 4ac954413bc2a943b6d8f8cbe9bb1537c490f0af Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 25 Jul 2018 19:28:36 -0500 Subject: [PATCH 08/17] fix comments --- scheduler/spread.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/scheduler/spread.go b/scheduler/spread.go index d3030519b97..04d59635e07 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -7,14 +7,21 @@ import ( // SpreadIterator is used to spread allocations across a specified attribute // according to preset weights type SpreadIterator struct { - ctx Context - source RankIterator - job *structs.Job - tg *structs.TaskGroup - jobSpreads []*structs.Spread - tgSpreadInfo map[string]spreadAttributeMap - sumSpreadWeights int - hasSpread bool + ctx Context + source RankIterator + job *structs.Job + tg *structs.TaskGroup + jobSpreads []*structs.Spread + // tgSpreadInfo is a map per task group with precomputed + // values for desired counts and weight + tgSpreadInfo map[string]spreadAttributeMap + // sumSpreadWeights tracks the total weight across all spread + // stanzas + sumSpreadWeights int + hasSpread bool + // groupProperySets is a memoized map from task group to property sets. + // existing allocs are computed once, and allocs from the plan are updated + // when Reset is called groupPropertySets map[string][]*propertySet } @@ -72,7 +79,7 @@ func (iter *SpreadIterator) SetTaskGroup(tg *structs.TaskGroup) { } } - // Check if there is a distinct property + // Check if there are any spreads configured iter.hasSpread = len(iter.groupPropertySets[tg.Name]) != 0 // Build tgSpreadInfo at the task group level From 81743cd02220782976bff75708ba18d667542005 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 26 Jul 2018 10:32:06 -0500 Subject: [PATCH 09/17] Support implicit spread target to account for remaining desired counts --- scheduler/spread.go | 33 ++++++++++++++++++++++----------- scheduler/spread_test.go | 6 ++---- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/scheduler/spread.go b/scheduler/spread.go index 04d59635e07..f9acede39f6 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -4,6 +4,12 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // ImplicitTarget is used to represent any remaining attribute values + // when target percentages don't add up to 100 + ImplicitTarget = "*" +) + // SpreadIterator is used to spread allocations across a specified attribute // according to preset weights type SpreadIterator struct { @@ -28,7 +34,6 @@ type SpreadIterator struct { type spreadAttributeMap map[string]*spreadInfo type spreadInfo struct { - sumWeight uint32 weight int desiredCounts map[string]float64 } @@ -110,16 +115,20 @@ func (iter *SpreadIterator) Next() *RankedNode { nValue, errorMsg, usedCount := pset.UsedCount(option.Node, tgName) // Skip if there was errors in resolving this attribute to compute used counts if errorMsg != "" { - continue + iter.ctx.Logger().Printf("[WARN] sched: error building spread attributes for task group %v:%v", tgName, errorMsg) } spreadAttributeMap := iter.tgSpreadInfo[tgName] spreadDetails := spreadAttributeMap[pset.targetAttribute] // Get the desired count desiredCount, ok := spreadDetails.desiredCounts[nValue] if !ok { - // Warn about missing ratio - iter.ctx.Logger().Printf("[WARN] sched: missing desired distribution percentage for attribute value %v in spread stanza for job %v", nValue, iter.job.ID) - continue + // See if there is an implicit target + desiredCount, ok = spreadDetails.desiredCounts[ImplicitTarget] + if !ok { + // The desired count for this attribute is zero if it gets here + // don't boost the score + continue + } } if float64(usedCount) < desiredCount { // Calculate the relative weight of this specific spread attribute @@ -151,14 +160,16 @@ func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) { combinedSpreads = append(combinedSpreads, tg.Spreads...) combinedSpreads = append(combinedSpreads, iter.jobSpreads...) for _, spread := range combinedSpreads { - sumWeight := uint32(0) + si := &spreadInfo{weight: spread.Weight, desiredCounts: make(map[string]float64)} + sumDesiredCounts := 0.0 for _, st := range spread.SpreadTarget { - sumWeight += st.Percent - } - si := &spreadInfo{sumWeight: sumWeight, weight: spread.Weight, desiredCounts: make(map[string]float64)} - for _, st := range spread.SpreadTarget { - desiredCount := (float64(st.Percent) / float64(sumWeight)) * float64(totalCount) + desiredCount := (float64(st.Percent) / float64(100)) * float64(totalCount) si.desiredCounts[st.Value] = desiredCount + sumDesiredCounts += desiredCount + } + if sumDesiredCounts < float64(totalCount) { + remainingCount := float64(totalCount) - sumDesiredCounts + si.desiredCounts[ImplicitTarget] = remainingCount } spreadInfos[spread.Attribute] = si iter.sumSpreadWeights += spread.Weight diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go index 3ec6cc8c244..e546a2e6dde 100644 --- a/scheduler/spread_test.go +++ b/scheduler/spread_test.go @@ -57,6 +57,8 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) { t.Fatalf("failed to UpsertAllocs: %v", err) } + // Create spread target of 80% in dc1 + // Implicitly, this means 20% in dc2 spread := &structs.Spread{ Weight: 100, Attribute: "${node.datacenter}", @@ -65,10 +67,6 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) { Value: "dc1", Percent: 80, }, - { - Value: "dc2", - Percent: 20, - }, }, } tg.Spreads = []*structs.Spread{spread} From 55c513186baa024071c802a5cb6785d0cbabf4bc Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 26 Jul 2018 17:14:27 -0500 Subject: [PATCH 10/17] Implement support for even spread across datacenters, with unit test --- scheduler/propertyset.go | 14 +-- scheduler/spread.go | 99 +++++++++++++++++---- scheduler/spread_test.go | 186 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 276 insertions(+), 23 deletions(-) diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index 85fe45d62dc..8ac9b48c6d9 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -234,9 +234,15 @@ func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string if !ok { return nValue, fmt.Sprintf("missing property %q", p.targetAttribute), 0 } + combinedUse := p.GetCombinedUseMap() + usedCount := combinedUse[nValue] + return nValue, "", usedCount +} - // combine the counts of how many times the property has been used by - // existing and proposed allocations +// GetCombinedUseMap counts how many times the property has been used by +// existing and proposed allocations. It also takes into account any stopped +// allocations +func (p *propertySet) GetCombinedUseMap() map[string]uint64 { combinedUse := make(map[string]uint64, helper.IntMax(len(p.existingValues), len(p.proposedValues))) for _, usedValues := range []map[string]uint64{p.existingValues, p.proposedValues} { for propertyValue, usedCount := range usedValues { @@ -259,9 +265,7 @@ func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string combinedUse[propertyValue] = 0 } } - - usedCount := combinedUse[nValue] - return nValue, "", usedCount + return combinedUse } // filterAllocs filters a set of allocations to just be those that are running diff --git a/scheduler/spread.go b/scheduler/spread.go index f9acede39f6..2035ccf591c 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -8,6 +8,18 @@ const ( // ImplicitTarget is used to represent any remaining attribute values // when target percentages don't add up to 100 ImplicitTarget = "*" + + // evenSpreadBoost is a positive boost used when the job has + // even spread over an attribute. Any nodes whose attribute value is + // equal to the minimum count over all possible attributes gets boosted + evenSpreadBoost = 0.01 + + // evenSpreadPenality is a penalty used when the job has + // even spread over an attribute. Any nodes whose attribute value is + // greater than the minimum count over all possible attributes gets this penalty. + // This is to ensure that other nodes are preferred when one of the values + // has a larger number of allocations + evenSpreadPenalty = -0.5 ) // SpreadIterator is used to spread allocations across a specified attribute @@ -119,25 +131,33 @@ func (iter *SpreadIterator) Next() *RankedNode { } spreadAttributeMap := iter.tgSpreadInfo[tgName] spreadDetails := spreadAttributeMap[pset.targetAttribute] - // Get the desired count - desiredCount, ok := spreadDetails.desiredCounts[nValue] - if !ok { - // See if there is an implicit target - desiredCount, ok = spreadDetails.desiredCounts[ImplicitTarget] + + if len(spreadDetails.desiredCounts) == 0 { + // When desired counts map is empty the user didn't specify any targets + // Treat this as a special case + scoreBoost := evenSpreadScoreBoost(pset, option.Node) + totalSpreadScore += scoreBoost + } else { + // Get the desired count + desiredCount, ok := spreadDetails.desiredCounts[nValue] if !ok { - // The desired count for this attribute is zero if it gets here - // don't boost the score - continue + // See if there is an implicit target + desiredCount, ok = spreadDetails.desiredCounts[ImplicitTarget] + if !ok { + // The desired count for this attribute is zero if it gets here + // don't boost the score + continue + } + } + if float64(usedCount) < desiredCount { + // Calculate the relative weight of this specific spread attribute + spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights) + // Score Boost is proportional the difference between current and desired count + // It is multiplied with the spread weight to account for cases where the job has + // more than one spread attribute + scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight + totalSpreadScore += scoreBoost } - } - if float64(usedCount) < desiredCount { - // Calculate the relative weight of this specific spread attribute - spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights) - // Score Boost is proportional the difference between current and desired count - // It is multiplied with the spread weight to account for cases where the job has - // more than one spread attribute - scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight - totalSpreadScore += scoreBoost } } @@ -149,6 +169,48 @@ func (iter *SpreadIterator) Next() *RankedNode { } } +// evenSpreadScoreBoost is a scoring helper that calculates the score +// for the option when even spread is desired (all attribute values get equal preference) +func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 { + combinedUseMap := pset.GetCombinedUseMap() + if len(combinedUseMap) == 0 { + // Nothing placed yet, so return 0 as the score + return 0.0 + } + // Get the nodes property value + nValue, ok := getProperty(option, pset.targetAttribute) + currentAttributeCount := uint64(0) + if ok { + currentAttributeCount = combinedUseMap[nValue] + } + minCount := uint64(0) + maxCount := uint64(0) + for _, value := range combinedUseMap { + if minCount == 0 || value < minCount { + minCount = value + } + if maxCount == 0 || value > maxCount { + maxCount = value + } + } + if currentAttributeCount < minCount { + // Small positive boost for attributes with min count + return evenSpreadBoost + } else if currentAttributeCount > minCount { + // Negative boost if attribute count is greater than minimum + // This is so that other nodes will get a preference over this one + return evenSpreadPenalty + } else { + // When min and max are same the current distribution is even + // so we penalize + if minCount == maxCount { + return evenSpreadPenalty + } else { + return evenSpreadBoost + } + } +} + // computeSpreadInfo computes and stores percentages and total values // from all spreads that apply to a specific task group func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) { @@ -167,7 +229,8 @@ func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) { si.desiredCounts[st.Value] = desiredCount sumDesiredCounts += desiredCount } - if sumDesiredCounts < float64(totalCount) { + // Account for remaining count only if there is any spread targets + if sumDesiredCounts > 0 && sumDesiredCounts < float64(totalCount) { remainingCount := float64(totalCount) - sumDesiredCounts si.desiredCounts[ImplicitTarget] = remainingCount } diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go index e546a2e6dde..e6457d56ead 100644 --- a/scheduler/spread_test.go +++ b/scheduler/spread_test.go @@ -245,3 +245,189 @@ func TestSpreadIterator_MultipleAttributes(t *testing.T) { } } + +func TestSpreadIterator_EvenSpread(t *testing.T) { + state, ctx := testContext(t) + dcs := []string{"dc1", "dc2", "dc1", "dc2", "dc1", "dc2", "dc2", "dc1", "dc1", "dc1"} + var nodes []*RankedNode + + // Add these nodes to the state store + for i, dc := range dcs { + node := mock.Node() + node.Datacenter = dc + if err := state.UpsertNode(uint64(100+i), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + } + + static := NewStaticRankIterator(ctx, nodes) + job := mock.Job() + tg := job.TaskGroups[0] + job.TaskGroups[0].Count = 10 + + // Configure even spread across node.datacenter + spread := &structs.Spread{ + Weight: 100, + Attribute: "${node.datacenter}", + } + tg.Spreads = []*structs.Spread{spread} + spreadIter := NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter) + + out := collectRanked(scoreNorm) + + // Nothing placed so both dc nodes get 0 as the score + expectedScores := map[string]float64{ + "dc1": 0, + "dc2": 0, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + + // Update the plan to add allocs to nodes in dc1 + // After this step dc2 nodes should get boosted + ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + } + ctx.plan.NodeAllocation[nodes[2].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[2].Node.ID, + }, + } + + // Reset the scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect nodes in dc2 with existing allocs to get a boost + // dc1 nodes are penalized because they have allocs + expectedScores = map[string]float64{ + "dc1": -0.5, + "dc2": 0.01, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + + // Update the plan to add more allocs to nodes in dc2 + // After this step dc1 nodes should get boosted + ctx.plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[1].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[1].Node.ID, + }, + } + ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[3].Node.ID, + }, + } + + // Reset the scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect nodes in dc2 to be penalized because there are 3 allocs there now + // dc1 nodes are boosted because that has 2 allocs + expectedScores = map[string]float64{ + "dc1": 0.01, + "dc2": -0.5, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + + // Add another node in dc3 + node := mock.Node() + node.Datacenter = "dc3" + if err := state.UpsertNode(uint64(1111), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + + // Add another alloc to dc1, now its count matches dc2 + ctx.plan.NodeAllocation[nodes[4].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[4].Node.ID, + }, + } + + // Reset scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect dc1 and dc2 to be penalized because they have 3 allocs + // dc3 should get a boost because it has 0 allocs + expectedScores = map[string]float64{ + "dc1": -0.5, + "dc2": -0.5, + "dc3": 0.01, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + +} From 81934afa863343f37cd845b1e5ff0bee216510f8 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 27 Jul 2018 16:15:32 -0500 Subject: [PATCH 11/17] Remove hardcoded boosts for even spread. instead, calculate them based on delta between current and minimum value --- scheduler/spread.go | 62 ++++++++++++++++++++-------------------- scheduler/spread_test.go | 12 ++++---- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/scheduler/spread.go b/scheduler/spread.go index 2035ccf591c..490f58654bd 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -8,18 +8,6 @@ const ( // ImplicitTarget is used to represent any remaining attribute values // when target percentages don't add up to 100 ImplicitTarget = "*" - - // evenSpreadBoost is a positive boost used when the job has - // even spread over an attribute. Any nodes whose attribute value is - // equal to the minimum count over all possible attributes gets boosted - evenSpreadBoost = 0.01 - - // evenSpreadPenality is a penalty used when the job has - // even spread over an attribute. Any nodes whose attribute value is - // greater than the minimum count over all possible attributes gets this penalty. - // This is to ensure that other nodes are preferred when one of the values - // has a larger number of allocations - evenSpreadPenalty = -0.5 ) // SpreadIterator is used to spread allocations across a specified attribute @@ -145,19 +133,20 @@ func (iter *SpreadIterator) Next() *RankedNode { desiredCount, ok = spreadDetails.desiredCounts[ImplicitTarget] if !ok { // The desired count for this attribute is zero if it gets here - // don't boost the score - continue + // so use the maximum possible penalty for this node + totalSpreadScore += -1.0 } } - if float64(usedCount) < desiredCount { - // Calculate the relative weight of this specific spread attribute - spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights) - // Score Boost is proportional the difference between current and desired count - // It is multiplied with the spread weight to account for cases where the job has - // more than one spread attribute - scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight - totalSpreadScore += scoreBoost - } + + // Calculate the relative weight of this specific spread attribute + spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights) + + // Score Boost is proportional the difference between current and desired count + // It is negative when the used count is greater than the desired count + // It is multiplied with the spread weight to account for cases where the job has + // more than one spread attribute + scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight + totalSpreadScore += scoreBoost } } @@ -193,20 +182,31 @@ func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 { maxCount = value } } + + // calculate boost based on delta between the current and the minimum + deltaBoost := 1.0 + if minCount == 0 { + deltaBoost = -1.0 + } else { + delta := int(minCount - currentAttributeCount) + deltaBoost = float64(delta) / float64(minCount) + } if currentAttributeCount < minCount { - // Small positive boost for attributes with min count - return evenSpreadBoost + // positive boost for attributes with min count + return deltaBoost } else if currentAttributeCount > minCount { // Negative boost if attribute count is greater than minimum - // This is so that other nodes will get a preference over this one - return evenSpreadPenalty + return deltaBoost } else { - // When min and max are same the current distribution is even - // so we penalize + // When min and current value are the same we return the maximum + // possible boost or penalty if minCount == maxCount { - return evenSpreadPenalty + // Maximum possible penalty when the distribution is even + return -1.0 } else { - return evenSpreadBoost + // Maximum possible boost when there is another attribute with + // more allocations + return 1.0 } } } diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go index e6457d56ead..94fd6ea3768 100644 --- a/scheduler/spread_test.go +++ b/scheduler/spread_test.go @@ -327,8 +327,8 @@ func TestSpreadIterator_EvenSpread(t *testing.T) { // Expect nodes in dc2 with existing allocs to get a boost // dc1 nodes are penalized because they have allocs expectedScores = map[string]float64{ - "dc1": -0.5, - "dc2": 0.01, + "dc1": -1, + "dc2": 1, } for _, rn := range out { require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) @@ -380,7 +380,7 @@ func TestSpreadIterator_EvenSpread(t *testing.T) { // Expect nodes in dc2 to be penalized because there are 3 allocs there now // dc1 nodes are boosted because that has 2 allocs expectedScores = map[string]float64{ - "dc1": 0.01, + "dc1": 1, "dc2": -0.5, } for _, rn := range out { @@ -422,9 +422,9 @@ func TestSpreadIterator_EvenSpread(t *testing.T) { // Expect dc1 and dc2 to be penalized because they have 3 allocs // dc3 should get a boost because it has 0 allocs expectedScores = map[string]float64{ - "dc1": -0.5, - "dc2": -0.5, - "dc3": 0.01, + "dc1": -1, + "dc2": -1, + "dc3": 1, } for _, rn := range out { require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) From 18ad037931b7a4577709ba43b9480ed26496c309 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Sat, 28 Jul 2018 19:35:28 -0500 Subject: [PATCH 12/17] fix scoring algorithm when min count == current count --- scheduler/spread.go | 17 +++++++---------- scheduler/spread_test.go | 8 ++++---- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/scheduler/spread.go b/scheduler/spread.go index 490f58654bd..452a90ce4c9 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -135,6 +135,7 @@ func (iter *SpreadIterator) Next() *RankedNode { // The desired count for this attribute is zero if it gets here // so use the maximum possible penalty for this node totalSpreadScore += -1.0 + continue } } @@ -191,22 +192,18 @@ func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 { delta := int(minCount - currentAttributeCount) deltaBoost = float64(delta) / float64(minCount) } - if currentAttributeCount < minCount { - // positive boost for attributes with min count - return deltaBoost - } else if currentAttributeCount > minCount { - // Negative boost if attribute count is greater than minimum + if currentAttributeCount != minCount { + // Boost based on delta between current and min return deltaBoost } else { - // When min and current value are the same we return the maximum - // possible boost or penalty if minCount == maxCount { // Maximum possible penalty when the distribution is even return -1.0 } else { - // Maximum possible boost when there is another attribute with - // more allocations - return 1.0 + // Penalty based on delta from max value + delta := int(maxCount - minCount) + deltaBoost = float64(delta) / float64(minCount) + return deltaBoost } } } diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go index 94fd6ea3768..659044b295c 100644 --- a/scheduler/spread_test.go +++ b/scheduler/spread_test.go @@ -286,7 +286,7 @@ func TestSpreadIterator_EvenSpread(t *testing.T) { "dc2": 0, } for _, rn := range out { - require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore)) } // Update the plan to add allocs to nodes in dc1 @@ -380,11 +380,11 @@ func TestSpreadIterator_EvenSpread(t *testing.T) { // Expect nodes in dc2 to be penalized because there are 3 allocs there now // dc1 nodes are boosted because that has 2 allocs expectedScores = map[string]float64{ - "dc1": 1, + "dc1": 0.5, "dc2": -0.5, } for _, rn := range out { - require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + require.Equal(t, fmt.Sprintf("%3.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%3.3f", rn.FinalScore)) } // Add another node in dc3 @@ -427,7 +427,7 @@ func TestSpreadIterator_EvenSpread(t *testing.T) { "dc3": 1, } for _, rn := range out { - require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore)) } } From 02364db282f355a822904c7bb793b832224bb532 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Sat, 28 Jul 2018 19:49:09 -0500 Subject: [PATCH 13/17] comment and formatting cleanup --- nomad/structs/structs.go | 11 +++++++++-- scheduler/spread.go | 22 +++++++++++++++++----- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d95c6c83e36..dd6a3f119e7 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5423,12 +5423,16 @@ func (a *Affinity) Validate() error { type Spread struct { // Attribute is the node attribute used as the spread criteria Attribute string + // Weight is the relative weight of this spread, useful when there are multiple // spread and affinities Weight int + // SpreadTarget is used to describe desired percentages for each attribute value SpreadTarget []*SpreadTarget - str string + + // Memoized string representation + str string } func (s *Spread) Copy() *Spread { @@ -5485,9 +5489,12 @@ func (s *Spread) Validate() error { type SpreadTarget struct { // Value is a single attribute value, like "dc1" Value string + // Percent is the desired percentage of allocs Percent uint32 - str string + + // Memoized string representation + str string } func (s *SpreadTarget) Copy() *SpreadTarget { diff --git a/scheduler/spread.go b/scheduler/spread.go index 452a90ce4c9..e6c3297a985 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -13,18 +13,27 @@ const ( // SpreadIterator is used to spread allocations across a specified attribute // according to preset weights type SpreadIterator struct { - ctx Context - source RankIterator - job *structs.Job - tg *structs.TaskGroup + ctx Context + source RankIterator + job *structs.Job + tg *structs.TaskGroup + + // jobSpreads is a slice of spread stored at the job level which apply + // to all task groups jobSpreads []*structs.Spread + // tgSpreadInfo is a map per task group with precomputed // values for desired counts and weight tgSpreadInfo map[string]spreadAttributeMap + // sumSpreadWeights tracks the total weight across all spread // stanzas sumSpreadWeights int - hasSpread bool + + // hasSpread is used to early return when the job/task group + // does not have spread configured + hasSpread bool + // groupProperySets is a memoized map from task group to property sets. // existing allocs are computed once, and allocs from the plan are updated // when Reset is called @@ -172,6 +181,9 @@ func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 { currentAttributeCount := uint64(0) if ok { currentAttributeCount = combinedUseMap[nValue] + } else { + // If the attribute isn't set on the node, it should get the maximum possible penalty + return -1.0 } minCount := uint64(0) maxCount := uint64(0) From 6124c2cc8489f6ef6530606547e2f60e2852af9b Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Sun, 29 Jul 2018 19:49:56 -0500 Subject: [PATCH 14/17] added some unit tests for -1 spread score --- scheduler/spread.go | 8 ++-- scheduler/spread_test.go | 88 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 4 deletions(-) diff --git a/scheduler/spread.go b/scheduler/spread.go index e6c3297a985..22c30ccc117 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -5,9 +5,9 @@ import ( ) const ( - // ImplicitTarget is used to represent any remaining attribute values + // implicitTarget is used to represent any remaining attribute values // when target percentages don't add up to 100 - ImplicitTarget = "*" + implicitTarget = "*" ) // SpreadIterator is used to spread allocations across a specified attribute @@ -139,7 +139,7 @@ func (iter *SpreadIterator) Next() *RankedNode { desiredCount, ok := spreadDetails.desiredCounts[nValue] if !ok { // See if there is an implicit target - desiredCount, ok = spreadDetails.desiredCounts[ImplicitTarget] + desiredCount, ok = spreadDetails.desiredCounts[implicitTarget] if !ok { // The desired count for this attribute is zero if it gets here // so use the maximum possible penalty for this node @@ -241,7 +241,7 @@ func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) { // Account for remaining count only if there is any spread targets if sumDesiredCounts > 0 && sumDesiredCounts < float64(totalCount) { remainingCount := float64(totalCount) - sumDesiredCounts - si.desiredCounts[ImplicitTarget] = remainingCount + si.desiredCounts[implicitTarget] = remainingCount } spreadInfos[spread.Attribute] = si iter.sumSpreadWeights += spread.Weight diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go index 659044b295c..f7e7222f73a 100644 --- a/scheduler/spread_test.go +++ b/scheduler/spread_test.go @@ -431,3 +431,91 @@ func TestSpreadIterator_EvenSpread(t *testing.T) { } } + +// Test scenarios where the spread iterator sets maximum penalty (-1.0) +func TestSpreadIterator_MaxPenalty(t *testing.T) { + state, ctx := testContext(t) + var nodes []*RankedNode + + // Add nodes in dc3 to the state store + for i := 0; i < 5; i++ { + node := mock.Node() + node.Datacenter = "dc3" + if err := state.UpsertNode(uint64(100+i), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + } + + static := NewStaticRankIterator(ctx, nodes) + + job := mock.Job() + tg := job.TaskGroups[0] + job.TaskGroups[0].Count = 5 + + // Create spread target of 80% in dc1 + // and 20% in dc2 + spread := &structs.Spread{ + Weight: 100, + Attribute: "${node.datacenter}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: 80, + }, + { + Value: "dc2", + Percent: 20, + }, + }, + } + tg.Spreads = []*structs.Spread{spread} + spreadIter := NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter) + + out := collectRanked(scoreNorm) + + // All nodes are in dc3 so score should be -1 + for _, rn := range out { + require.Equal(t, -1.0, rn.FinalScore) + } + + // Reset scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + + // Create spread on attribute that doesn't exist on any nodes + spread = &structs.Spread{ + Weight: 100, + Attribute: "${meta.foo}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "bar", + Percent: 80, + }, + { + Value: "baz", + Percent: 20, + }, + }, + } + + tg.Spreads = []*structs.Spread{spread} + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // All nodes don't have the spread attribute so score should be -1 + for _, rn := range out { + require.Equal(t, -1.0, rn.FinalScore) + } + +} From e642c7382f92227b7f0ef6f54bd43388865397f7 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 30 Jul 2018 08:35:26 -0500 Subject: [PATCH 15/17] more cleanup --- nomad/structs/structs.go | 3 +-- scheduler/spread.go | 10 +++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index dd6a3f119e7..16fbde126b7 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5484,8 +5484,7 @@ func (s *Spread) Validate() error { return mErr.ErrorOrNil() } -// SpreadTarget is used to specify desired percentages -// for each attribute value +// SpreadTarget is used to specify desired percentages for each attribute value type SpreadTarget struct { // Value is a single attribute value, like "dc1" Value string diff --git a/scheduler/spread.go b/scheduler/spread.go index 22c30ccc117..a1acd97be98 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -122,19 +122,23 @@ func (iter *SpreadIterator) Next() *RankedNode { totalSpreadScore := 0.0 for _, pset := range propertySets { nValue, errorMsg, usedCount := pset.UsedCount(option.Node, tgName) - // Skip if there was errors in resolving this attribute to compute used counts + + // Set score to -1 if there were errors in building this attribute if errorMsg != "" { iter.ctx.Logger().Printf("[WARN] sched: error building spread attributes for task group %v:%v", tgName, errorMsg) + totalSpreadScore -= 1.0 + continue } spreadAttributeMap := iter.tgSpreadInfo[tgName] spreadDetails := spreadAttributeMap[pset.targetAttribute] if len(spreadDetails.desiredCounts) == 0 { // When desired counts map is empty the user didn't specify any targets - // Treat this as a special case + // Use even spreading scoring algorithm for this scenario scoreBoost := evenSpreadScoreBoost(pset, option.Node) totalSpreadScore += scoreBoost } else { + // Get the desired count desiredCount, ok := spreadDetails.desiredCounts[nValue] if !ok { @@ -143,7 +147,7 @@ func (iter *SpreadIterator) Next() *RankedNode { if !ok { // The desired count for this attribute is zero if it gets here // so use the maximum possible penalty for this node - totalSpreadScore += -1.0 + totalSpreadScore -= 1.0 continue } } From 3c529f66be777767328af40f50fa2759a21432e0 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 30 Jul 2018 21:59:35 -0500 Subject: [PATCH 16/17] Fix scoring logic for uneven spread to incorporate current alloc count Also addressed other small code review comments --- nomad/structs/structs.go | 4 +- nomad/structs/structs_test.go | 2 +- scheduler/generic_sched_test.go | 117 +++++++++++++++++++++++++++----- scheduler/spread.go | 29 ++++---- scheduler/spread_test.go | 48 +++++++++---- 5 files changed, 154 insertions(+), 46 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 16fbde126b7..613478d66da 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2191,7 +2191,7 @@ func (j *Job) Validate() error { if j.Type == JobTypeSystem { if j.Spreads != nil { - mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a s stanza")) + mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza")) } } else { for idx, spread := range j.Spreads { @@ -5479,7 +5479,7 @@ func (s *Spread) Validate() error { sumPercent += target.Percent } if sumPercent > 100 { - mErr.Errors = append(mErr.Errors, errors.New("Sum of spread target percentages must not be greater than 100")) + mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Sum of spread target percentages must not be greater than 100%%; got %d%%", sumPercent))) } return mErr.ErrorOrNil() } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 8f2a60504f3..213ff36d59c 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -4004,7 +4004,7 @@ func TestSpread_Validate(t *testing.T) { }, }, }, - err: fmt.Errorf("Sum of spread target percentages must not be greater than 100"), + err: fmt.Errorf("Sum of spread target percentages must not be greater than 100%%; got %d%%", 150), name: "Invalid percentages", }, { diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index fd8d81ec953..d942d622776 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -604,10 +604,104 @@ func TestServiceSched_JobRegister_DistinctProperty_TaskGroup_Incr(t *testing.T) // Test job registration with spread configured func TestServiceSched_Spread(t *testing.T) { - h := NewHarness(t) assert := assert.New(t) - // Create a job that uses spread over data center + start := uint32(100) + step := uint32(10) + + for i := 0; i < 10; i++ { + name := fmt.Sprintf("%d%% in dc1", start) + t.Run(name, func(t *testing.T) { + h := NewHarness(t) + remaining := uint32(100 - start) + // Create a job that uses spread over data center + job := mock.Job() + job.Datacenters = []string{"dc1", "dc2"} + job.TaskGroups[0].Count = 10 + job.TaskGroups[0].Spreads = append(job.TaskGroups[0].Spreads, + &structs.Spread{ + Attribute: "${node.datacenter}", + Weight: 100, + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: start, + }, + { + Value: "dc2", + Percent: remaining, + }, + }, + }) + assert.Nil(h.State.UpsertJob(h.NextIndex(), job), "UpsertJob") + // Create some nodes, half in dc2 + var nodes []*structs.Node + nodeMap := make(map[string]*structs.Node) + for i := 0; i < 10; i++ { + node := mock.Node() + if i%2 == 0 { + node.Datacenter = "dc2" + } + nodes = append(nodes, node) + assert.Nil(h.State.UpsertNode(h.NextIndex(), node), "UpsertNode") + nodeMap[node.ID] = node + } + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + assert.Nil(h.Process(NewServiceScheduler, eval), "Process") + + // Ensure a single plan + assert.Len(h.Plans, 1, "Number of plans") + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + assert.Nil(plan.Annotations, "Plan.Annotations") + + // Ensure the eval hasn't spawned blocked eval + assert.Len(h.CreateEvals, 0, "Created Evals") + + // Ensure the plan allocated + var planned []*structs.Allocation + dcAllocsMap := make(map[string]int) + for nodeId, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + dc := nodeMap[nodeId].Datacenter + c := dcAllocsMap[dc] + c += len(allocList) + dcAllocsMap[dc] = c + } + assert.Len(planned, 10, "Planned Allocations") + + expectedCounts := make(map[string]int) + expectedCounts["dc1"] = 10 - i + if i > 0 { + expectedCounts["dc2"] = i + } + require.Equal(t, expectedCounts, dcAllocsMap) + + h.AssertEvalStatus(t, structs.EvalStatusComplete) + }) + start = start - step + } +} + +// Test job registration with even spread across dc +func TestServiceSched_EvenSpread(t *testing.T) { + assert := assert.New(t) + + h := NewHarness(t) + // Create a job that uses even spread over data center job := mock.Job() job.Datacenters = []string{"dc1", "dc2"} job.TaskGroups[0].Count = 10 @@ -615,23 +709,12 @@ func TestServiceSched_Spread(t *testing.T) { &structs.Spread{ Attribute: "${node.datacenter}", Weight: 100, - SpreadTarget: []*structs.SpreadTarget{ - { - Value: "dc1", - Percent: 70, - }, - { - Value: "dc2", - Percent: 30, - }, - }, }) assert.Nil(h.State.UpsertJob(h.NextIndex(), job), "UpsertJob") - // Create some nodes, half in dc2 var nodes []*structs.Node nodeMap := make(map[string]*structs.Node) - for i := 0; i < 6; i++ { + for i := 0; i < 10; i++ { node := mock.Node() if i%2 == 0 { node.Datacenter = "dc2" @@ -677,9 +760,11 @@ func TestServiceSched_Spread(t *testing.T) { } assert.Len(planned, 10, "Planned Allocations") + // Expect even split allocs across datacenter expectedCounts := make(map[string]int) - expectedCounts["dc1"] = 7 - expectedCounts["dc2"] = 3 + expectedCounts["dc1"] = 5 + expectedCounts["dc2"] = 5 + require.Equal(t, expectedCounts, dcAllocsMap) h.AssertEvalStatus(t, structs.EvalStatusComplete) diff --git a/scheduler/spread.go b/scheduler/spread.go index a1acd97be98..ef03db20418 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -123,6 +123,8 @@ func (iter *SpreadIterator) Next() *RankedNode { for _, pset := range propertySets { nValue, errorMsg, usedCount := pset.UsedCount(option.Node, tgName) + // Add one to include placement on this node in the scoring calculation + usedCount += 1 // Set score to -1 if there were errors in building this attribute if errorMsg != "" { iter.ctx.Logger().Printf("[WARN] sched: error building spread attributes for task group %v:%v", tgName, errorMsg) @@ -182,13 +184,12 @@ func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 { } // Get the nodes property value nValue, ok := getProperty(option, pset.targetAttribute) - currentAttributeCount := uint64(0) - if ok { - currentAttributeCount = combinedUseMap[nValue] - } else { - // If the attribute isn't set on the node, it should get the maximum possible penalty + + // Maximum possible penalty when the attribute isn't set on the node + if !ok { return -1.0 } + currentAttributeCount := combinedUseMap[nValue] minCount := uint64(0) maxCount := uint64(0) for _, value := range combinedUseMap { @@ -211,17 +212,15 @@ func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 { if currentAttributeCount != minCount { // Boost based on delta between current and min return deltaBoost - } else { - if minCount == maxCount { - // Maximum possible penalty when the distribution is even - return -1.0 - } else { - // Penalty based on delta from max value - delta := int(maxCount - minCount) - deltaBoost = float64(delta) / float64(minCount) - return deltaBoost - } + } else if minCount == maxCount { + // Maximum possible penalty when the distribution is even + return -1.0 } + // Penalty based on delta from max value + delta := int(maxCount - minCount) + deltaBoost = float64(delta) / float64(minCount) + return deltaBoost + } // computeSpreadInfo computes and stores percentages and total values diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go index f7e7222f73a..83fb2749f94 100644 --- a/scheduler/spread_test.go +++ b/scheduler/spread_test.go @@ -30,7 +30,7 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) { job := mock.Job() tg := job.TaskGroups[0] - job.TaskGroups[0].Count = 5 + job.TaskGroups[0].Count = 10 // add allocs to nodes in dc1 upserting := []*structs.Allocation{ { @@ -79,11 +79,11 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) { out := collectRanked(scoreNorm) // Expect nodes in dc1 with existing allocs to get a boost - // Boost should be ((desiredCount-actual)/expected)*spreadWeight - // For this test, that becomes dc1 = ((4-2)/4 ) = 0.5, and dc2=(1-0)/1 + // Boost should be ((desiredCount-actual)/desired)*spreadWeight + // For this test, that becomes dc1 = ((8-3)/8 ) = 0.5, and dc2=(2-1)/2 expectedScores := map[string]float64{ - "dc1": 0.5, - "dc2": 1.0, + "dc1": 0.625, + "dc2": 0.5, } for _, rn := range out { require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) @@ -92,6 +92,14 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) { // Update the plan to add more allocs to nodes in dc1 // After this step there are enough allocs to meet the desired count in dc1 ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, { Namespace: structs.DefaultNamespace, TaskGroup: tg.Name, @@ -119,6 +127,22 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) { ID: uuid.Generate(), NodeID: nodes[3].Node.ID, }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[3].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[3].Node.ID, + }, } // Reset the scores @@ -138,7 +162,7 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) { // the desired count expectedScores = map[string]float64{ "dc1": 0, - "dc2": 1.0, + "dc2": 0.5, } for _, rn := range out { require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) @@ -166,7 +190,7 @@ func TestSpreadIterator_MultipleAttributes(t *testing.T) { job := mock.Job() tg := job.TaskGroups[0] - job.TaskGroups[0].Count = 5 + job.TaskGroups[0].Count = 10 // add allocs to nodes in dc1 upserting := []*structs.Allocation{ { @@ -232,13 +256,13 @@ func TestSpreadIterator_MultipleAttributes(t *testing.T) { out := collectRanked(scoreNorm) - // Score come from combining two different spread factors + // Score comes from combining two different spread factors // Second node should have the highest score because it has no allocs and its in dc2/r1 expectedScores := map[string]float64{ - nodes[0].Node.ID: 0.389, - nodes[1].Node.ID: 0.833, - nodes[2].Node.ID: 0.444, - nodes[3].Node.ID: 0.444, + nodes[0].Node.ID: 0.500, + nodes[1].Node.ID: 0.667, + nodes[2].Node.ID: 0.556, + nodes[3].Node.ID: 0.556, } for _, rn := range out { require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore)) From 485ff03f68a23550a38ad6295b68cc755bfda3f4 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 31 Jul 2018 20:39:33 -0500 Subject: [PATCH 17/17] fix linting error --- scheduler/spread.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/spread.go b/scheduler/spread.go index ef03db20418..f2d6c3309ea 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -202,7 +202,7 @@ func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 { } // calculate boost based on delta between the current and the minimum - deltaBoost := 1.0 + var deltaBoost float64 if minCount == 0 { deltaBoost = -1.0 } else {