Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend for spread #4595

Merged
merged 17 commits into from
Aug 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,32 @@ func CopySliceAffinities(s []*Affinity) []*Affinity {
return c
}

func CopySliceSpreads(s []*Spread) []*Spread {
l := len(s)
if l == 0 {
return nil
}

c := make([]*Spread, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}

func CopySliceSpreadTarget(s []*SpreadTarget) []*SpreadTarget {
l := len(s)
if l == 0 {
return nil
}

c := make([]*SpreadTarget, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}

// VaultPoliciesSet takes the structure returned by VaultPolicies and returns
// the set of required policies
func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {
Expand Down
130 changes: 130 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,10 @@ type Job struct {
// scheduling preferences that apply to all groups and tasks
Affinities []*Affinity

// Spread can be specified at the job level to express spreading
// allocations across a desired attribute, such as datacenter
Spreads []*Spread

// TaskGroups are the collections of task groups that this job needs
// to run. Each task group is an atomic unit of scheduling and placement.
TaskGroups []*TaskGroup
Expand Down Expand Up @@ -2185,6 +2189,19 @@ func (j *Job) Validate() error {
}
}

if j.Type == JobTypeSystem {
if j.Spreads != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza"))
}
} else {
for idx, spread := range j.Spreads {
if err := spread.Validate(); err != nil {
outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
}

// Check for duplicate task groups
taskGroups := make(map[string]int)
for idx, tg := range j.TaskGroups {
Expand Down Expand Up @@ -3336,6 +3353,10 @@ type TaskGroup struct {
// Affinities can be specified at the task group level to express
// scheduling preferences.
Affinities []*Affinity

// Spread can be specified at the task group level to express spreading
// allocations across a desired attribute, such as datacenter
Spreads []*Spread
}

func (tg *TaskGroup) Copy() *TaskGroup {
Expand All @@ -3349,6 +3370,7 @@ func (tg *TaskGroup) Copy() *TaskGroup {
ntg.RestartPolicy = ntg.RestartPolicy.Copy()
ntg.ReschedulePolicy = ntg.ReschedulePolicy.Copy()
ntg.Affinities = CopySliceAffinities(ntg.Affinities)
ntg.Spreads = CopySliceSpreads(ntg.Spreads)

if tg.Tasks != nil {
tasks := make([]*Task, len(ntg.Tasks))
Expand Down Expand Up @@ -3450,6 +3472,19 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name))
}

if j.Type == JobTypeSystem {
if tg.Spreads != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza"))
}
} else {
for idx, spread := range tg.Spreads {
if err := spread.Validate(); err != nil {
outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
}

if j.Type == JobTypeSystem {
if tg.ReschedulePolicy != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs should not have a reschedule policy"))
Expand Down Expand Up @@ -5384,6 +5419,101 @@ func (a *Affinity) Validate() error {
return mErr.ErrorOrNil()
}

// Spread is used to specify desired distribution of allocations according to weight
type Spread struct {
// Attribute is the node attribute used as the spread criteria
Attribute string

// Weight is the relative weight of this spread, useful when there are multiple
// spread and affinities
Weight int

// SpreadTarget is used to describe desired percentages for each attribute value
SpreadTarget []*SpreadTarget

// Memoized string representation
str string
}

func (s *Spread) Copy() *Spread {
if s == nil {
return nil
}
ns := new(Spread)
*ns = *s

ns.SpreadTarget = CopySliceSpreadTarget(s.SpreadTarget)
return ns
}

func (s *Spread) String() string {
if s.str != "" {
return s.str
}
s.str = fmt.Sprintf("%s %s %v", s.Attribute, s.SpreadTarget, s.Weight)
return s.str
}

func (s *Spread) Validate() error {
var mErr multierror.Error
if s.Attribute == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing spread attribute"))
}
if s.Weight <= 0 || s.Weight > 100 {
mErr.Errors = append(mErr.Errors, errors.New("Spread stanza must have a positive weight from 0 to 100"))
}
seen := make(map[string]struct{})
sumPercent := uint32(0)

for _, target := range s.SpreadTarget {
// Make sure there are no duplicates
_, ok := seen[target.Value]
if !ok {
seen[target.Value] = struct{}{}
} else {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target value %q already defined", target.Value)))
}
if target.Percent < 0 || target.Percent > 100 {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target percentage for value %q must be between 0 and 100", target.Value)))
}
sumPercent += target.Percent
}
if sumPercent > 100 {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Sum of spread target percentages must not be greater than 100%%; got %d%%", sumPercent)))
}
return mErr.ErrorOrNil()
}

// SpreadTarget is used to specify desired percentages for each attribute value
type SpreadTarget struct {
// Value is a single attribute value, like "dc1"
Value string

// Percent is the desired percentage of allocs
Percent uint32

// Memoized string representation
str string
}

func (s *SpreadTarget) Copy() *SpreadTarget {
if s == nil {
return nil
}

ns := new(SpreadTarget)
*ns = *s
return ns
}

func (s *SpreadTarget) String() string {
if s.str != "" {
return s.str
}
s.str = fmt.Sprintf("%q %v%%", s.Value, s.Percent)
return s.str
}

// EphemeralDisk is an ephemeral disk object
type EphemeralDisk struct {
// Sticky indicates whether the allocation is sticky to a node
Expand Down
131 changes: 131 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,21 @@ func TestJob_SystemJob_Validate(t *testing.T) {
err = j.Validate()
require.NotNil(t, err)
require.Contains(t, err.Error(), "System jobs may not have an affinity stanza")

// Add spread at job and task group level, that should fail validation
j.Spreads = []*Spread{{
Attribute: "${node.datacenter}",
Weight: 100,
}}
j.TaskGroups[0].Spreads = []*Spread{{
Attribute: "${node.datacenter}",
Weight: 100,
}}

err = j.Validate()
require.NotNil(t, err)
require.Contains(t, err.Error(), "System jobs may not have a spread stanza")

}

func TestJob_VaultPolicies(t *testing.T) {
Expand Down Expand Up @@ -3926,3 +3941,119 @@ func TestNode_Copy(t *testing.T) {
require.Equal(node.DrainStrategy, node2.DrainStrategy)
require.Equal(node.Drivers, node2.Drivers)
}

func TestSpread_Validate(t *testing.T) {
type tc struct {
spread *Spread
err error
name string
}

testCases := []tc{
{
spread: &Spread{},
err: fmt.Errorf("Missing spread attribute"),
name: "empty spread",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: -1,
},
err: fmt.Errorf("Spread stanza must have a positive weight from 0 to 100"),
name: "Invalid weight",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 200,
},
err: fmt.Errorf("Spread stanza must have a positive weight from 0 to 100"),
name: "Invalid weight",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 25,
},
{
Value: "dc2",
Percent: 150,
},
},
},
err: fmt.Errorf("Spread target percentage for value \"dc2\" must be between 0 and 100"),
name: "Invalid percentages",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 75,
},
{
Value: "dc2",
Percent: 75,
},
},
},
err: fmt.Errorf("Sum of spread target percentages must not be greater than 100%%; got %d%%", 150),
name: "Invalid percentages",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 25,
},
{
Value: "dc1",
Percent: 50,
},
},
},
err: fmt.Errorf("Spread target value \"dc1\" already defined"),
name: "No spread targets",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 25,
},
{
Value: "dc2",
Percent: 50,
},
},
},
err: nil,
name: "Valid spread",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.spread.Validate()
if tc.err != nil {
require.NotNil(t, err)
require.Contains(t, err.Error(), tc.err.Error())
} else {
require.Nil(t, err)
}
})
}
}
Loading