Skip to content

Commit

Permalink
Fix scoring logic for uneven spread to incorporate current alloc count
Browse files Browse the repository at this point in the history
Also addressed other small code review comments
  • Loading branch information
Preetha Appan committed Aug 1, 2018
1 parent 746601a commit d8b5ec2
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 46 deletions.
4 changes: 2 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2191,7 +2191,7 @@ func (j *Job) Validate() error {

if j.Type == JobTypeSystem {
if j.Spreads != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a s stanza"))
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza"))
}
} else {
for idx, spread := range j.Spreads {
Expand Down Expand Up @@ -5479,7 +5479,7 @@ func (s *Spread) Validate() error {
sumPercent += target.Percent
}
if sumPercent > 100 {
mErr.Errors = append(mErr.Errors, errors.New("Sum of spread target percentages must not be greater than 100"))
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Sum of spread target percentages must not be greater than 100%%; got %d%%", sumPercent)))
}
return mErr.ErrorOrNil()
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4004,7 +4004,7 @@ func TestSpread_Validate(t *testing.T) {
},
},
},
err: fmt.Errorf("Sum of spread target percentages must not be greater than 100"),
err: fmt.Errorf("Sum of spread target percentages must not be greater than 100%%; got %d%%", 150),
name: "Invalid percentages",
},
{
Expand Down
117 changes: 101 additions & 16 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,34 +604,117 @@ func TestServiceSched_JobRegister_DistinctProperty_TaskGroup_Incr(t *testing.T)

// Test job registration with spread configured
func TestServiceSched_Spread(t *testing.T) {
h := NewHarness(t)
assert := assert.New(t)

// Create a job that uses spread over data center
start := uint32(100)
step := uint32(10)

for i := 0; i < 10; i++ {
name := fmt.Sprintf("%d%% in dc1", start)
t.Run(name, func(t *testing.T) {
h := NewHarness(t)
remaining := uint32(100 - start)
// Create a job that uses spread over data center
job := mock.Job()
job.Datacenters = []string{"dc1", "dc2"}
job.TaskGroups[0].Count = 10
job.TaskGroups[0].Spreads = append(job.TaskGroups[0].Spreads,
&structs.Spread{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: start,
},
{
Value: "dc2",
Percent: remaining,
},
},
})
assert.Nil(h.State.UpsertJob(h.NextIndex(), job), "UpsertJob")
// Create some nodes, half in dc2
var nodes []*structs.Node
nodeMap := make(map[string]*structs.Node)
for i := 0; i < 10; i++ {
node := mock.Node()
if i%2 == 0 {
node.Datacenter = "dc2"
}
nodes = append(nodes, node)
assert.Nil(h.State.UpsertNode(h.NextIndex(), node), "UpsertNode")
nodeMap[node.ID] = node
}

// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation
assert.Nil(h.Process(NewServiceScheduler, eval), "Process")

// Ensure a single plan
assert.Len(h.Plans, 1, "Number of plans")
plan := h.Plans[0]

// Ensure the plan doesn't have annotations.
assert.Nil(plan.Annotations, "Plan.Annotations")

// Ensure the eval hasn't spawned blocked eval
assert.Len(h.CreateEvals, 0, "Created Evals")

// Ensure the plan allocated
var planned []*structs.Allocation
dcAllocsMap := make(map[string]int)
for nodeId, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
dc := nodeMap[nodeId].Datacenter
c := dcAllocsMap[dc]
c += len(allocList)
dcAllocsMap[dc] = c
}
assert.Len(planned, 10, "Planned Allocations")

expectedCounts := make(map[string]int)
expectedCounts["dc1"] = 10 - i
if i > 0 {
expectedCounts["dc2"] = i
}
require.Equal(t, expectedCounts, dcAllocsMap)

h.AssertEvalStatus(t, structs.EvalStatusComplete)
})
start = start - step
}
}

// Test job registration with even spread across dc
func TestServiceSched_EvenSpread(t *testing.T) {
assert := assert.New(t)

h := NewHarness(t)
// Create a job that uses even spread over data center
job := mock.Job()
job.Datacenters = []string{"dc1", "dc2"}
job.TaskGroups[0].Count = 10
job.TaskGroups[0].Spreads = append(job.TaskGroups[0].Spreads,
&structs.Spread{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 70,
},
{
Value: "dc2",
Percent: 30,
},
},
})
assert.Nil(h.State.UpsertJob(h.NextIndex(), job), "UpsertJob")

// Create some nodes, half in dc2
var nodes []*structs.Node
nodeMap := make(map[string]*structs.Node)
for i := 0; i < 6; i++ {
for i := 0; i < 10; i++ {
node := mock.Node()
if i%2 == 0 {
node.Datacenter = "dc2"
Expand Down Expand Up @@ -677,9 +760,11 @@ func TestServiceSched_Spread(t *testing.T) {
}
assert.Len(planned, 10, "Planned Allocations")

// Expect even split allocs across datacenter
expectedCounts := make(map[string]int)
expectedCounts["dc1"] = 7
expectedCounts["dc2"] = 3
expectedCounts["dc1"] = 5
expectedCounts["dc2"] = 5

require.Equal(t, expectedCounts, dcAllocsMap)

h.AssertEvalStatus(t, structs.EvalStatusComplete)
Expand Down
29 changes: 14 additions & 15 deletions scheduler/spread.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (iter *SpreadIterator) Next() *RankedNode {
for _, pset := range propertySets {
nValue, errorMsg, usedCount := pset.UsedCount(option.Node, tgName)

// Add one to include placement on this node in the scoring calculation
usedCount += 1
// Set score to -1 if there were errors in building this attribute
if errorMsg != "" {
iter.ctx.Logger().Printf("[WARN] sched: error building spread attributes for task group %v:%v", tgName, errorMsg)
Expand Down Expand Up @@ -182,13 +184,12 @@ func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 {
}
// Get the nodes property value
nValue, ok := getProperty(option, pset.targetAttribute)
currentAttributeCount := uint64(0)
if ok {
currentAttributeCount = combinedUseMap[nValue]
} else {
// If the attribute isn't set on the node, it should get the maximum possible penalty

// Maximum possible penalty when the attribute isn't set on the node
if !ok {
return -1.0
}
currentAttributeCount := combinedUseMap[nValue]
minCount := uint64(0)
maxCount := uint64(0)
for _, value := range combinedUseMap {
Expand All @@ -211,17 +212,15 @@ func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 {
if currentAttributeCount != minCount {
// Boost based on delta between current and min
return deltaBoost
} else {
if minCount == maxCount {
// Maximum possible penalty when the distribution is even
return -1.0
} else {
// Penalty based on delta from max value
delta := int(maxCount - minCount)
deltaBoost = float64(delta) / float64(minCount)
return deltaBoost
}
} else if minCount == maxCount {
// Maximum possible penalty when the distribution is even
return -1.0
}
// Penalty based on delta from max value
delta := int(maxCount - minCount)
deltaBoost = float64(delta) / float64(minCount)
return deltaBoost

}

// computeSpreadInfo computes and stores percentages and total values
Expand Down
48 changes: 36 additions & 12 deletions scheduler/spread_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) {

job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 5
job.TaskGroups[0].Count = 10
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Expand Down Expand Up @@ -79,11 +79,11 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) {
out := collectRanked(scoreNorm)

// Expect nodes in dc1 with existing allocs to get a boost
// Boost should be ((desiredCount-actual)/expected)*spreadWeight
// For this test, that becomes dc1 = ((4-2)/4 ) = 0.5, and dc2=(1-0)/1
// Boost should be ((desiredCount-actual)/desired)*spreadWeight
// For this test, that becomes dc1 = ((8-3)/8 ) = 0.5, and dc2=(2-1)/2
expectedScores := map[string]float64{
"dc1": 0.5,
"dc2": 1.0,
"dc1": 0.625,
"dc2": 0.5,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
Expand All @@ -92,6 +92,14 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) {
// Update the plan to add more allocs to nodes in dc1
// After this step there are enough allocs to meet the desired count in dc1
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
Expand Down Expand Up @@ -119,6 +127,22 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) {
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
}

// Reset the scores
Expand All @@ -138,7 +162,7 @@ func TestSpreadIterator_SingleAttribute(t *testing.T) {
// the desired count
expectedScores = map[string]float64{
"dc1": 0,
"dc2": 1.0,
"dc2": 0.5,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
Expand Down Expand Up @@ -166,7 +190,7 @@ func TestSpreadIterator_MultipleAttributes(t *testing.T) {

job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 5
job.TaskGroups[0].Count = 10
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Expand Down Expand Up @@ -232,13 +256,13 @@ func TestSpreadIterator_MultipleAttributes(t *testing.T) {

out := collectRanked(scoreNorm)

// Score come from combining two different spread factors
// Score comes from combining two different spread factors
// Second node should have the highest score because it has no allocs and its in dc2/r1
expectedScores := map[string]float64{
nodes[0].Node.ID: 0.389,
nodes[1].Node.ID: 0.833,
nodes[2].Node.ID: 0.444,
nodes[3].Node.ID: 0.444,
nodes[0].Node.ID: 0.500,
nodes[1].Node.ID: 0.667,
nodes[2].Node.ID: 0.556,
nodes[3].Node.ID: 0.556,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore))
Expand Down

0 comments on commit d8b5ec2

Please sign in to comment.