Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Affinity struct, API and parsing #4512

Merged
merged 8 commits into from
Jul 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/compose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestCompose(t *testing.T) {
// Compose a task group
grp := NewTaskGroup("grp1", 2).
Constrain(NewConstraint("kernel.name", "=", "linux")).
AddAffinity(NewAffinity("${node.class}", "=", "large", 50)).
SetMeta("foo", "bar").
AddTask(task)

Expand Down Expand Up @@ -72,6 +73,14 @@ func TestCompose(t *testing.T) {
Operand: "=",
},
},
Affinities: []*Affinity{
{
LTarget: "${node.class}",
RTarget: "large",
Operand: "=",
Weight: 50,
},
},
Tasks: []*Task{
{
Name: "task1",
Expand Down
7 changes: 7 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ type Job struct {
AllAtOnce *bool `mapstructure:"all_at_once"`
Datacenters []string
Constraints []*Constraint
Affinities []*Affinity
TaskGroups []*TaskGroup
Update *UpdateStrategy
Periodic *PeriodicConfig
Expand Down Expand Up @@ -836,6 +837,12 @@ func (j *Job) Constrain(c *Constraint) *Job {
return j
}

// AddAffinity is used to add an affinity to a job.
func (j *Job) AddAffinity(a *Affinity) *Job {
j.Affinities = append(j.Affinities, a)
return j
}

// AddTaskGroup adds a task group to an existing job.
func (j *Job) AddTaskGroup(grp *TaskGroup) *Job {
j.TaskGroups = append(j.TaskGroups, grp)
Expand Down
36 changes: 36 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,42 @@ func TestJobs_Constrain(t *testing.T) {
}
}

func TestJobs_AddAffinity(t *testing.T) {
t.Parallel()
job := &Job{Affinities: nil}

// Create and add an affinity
out := job.AddAffinity(NewAffinity("kernel.version", "=", "4.6", 100))
if n := len(job.Affinities); n != 1 {
t.Fatalf("expected 1 affinity, got: %d", n)
}

// Check that the job was returned
if job != out {
t.Fatalf("expect: %#v, got: %#v", job, out)
}

// Adding another affinity preserves the original
job.AddAffinity(NewAffinity("${node.datacenter}", "=", "dc2", 50))
expect := []*Affinity{
{
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
},
{
LTarget: "${node.datacenter}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
},
}
if !reflect.DeepEqual(job.Affinities, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, job.Affinities)
}
}

func TestJobs_Sort(t *testing.T) {
t.Parallel()
jobs := []*JobListStub{
Expand Down
31 changes: 31 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,23 @@ func (r *ReschedulePolicy) Canonicalize(jobType string) {
}
}

// Affinity is used to serialize task group affinities
type Affinity struct {
LTarget string // Left-hand target
RTarget string // Right-hand target
Operand string // Constraint operand (<=, <, =, !=, >, >=), set_contains_all, set_contains_any
Weight float64 // Weight applied to nodes that match the affinity. Can be negative
}

func NewAffinity(LTarget string, Operand string, RTarget string, Weight float64) *Affinity {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to change; just fyi: you can skip repeating the type if it's the same between arguments:

func NewAffinity(LTarget, Operand, RTarget string, Weight float64) *Affinity {

return &Affinity{
LTarget: LTarget,
RTarget: RTarget,
Operand: Operand,
Weight: Weight,
}
}

func NewDefaultReschedulePolicy(jobType string) *ReschedulePolicy {
var dp *ReschedulePolicy
switch jobType {
Expand Down Expand Up @@ -413,6 +430,7 @@ type TaskGroup struct {
Name *string
Count *int
Constraints []*Constraint
Affinities []*Affinity
Tasks []*Task
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
Expand Down Expand Up @@ -543,6 +561,12 @@ func (g *TaskGroup) AddTask(t *Task) *TaskGroup {
return g
}

// AddAffinity is used to add a new affinity to a task group.
func (g *TaskGroup) AddAffinity(a *Affinity) *TaskGroup {
g.Affinities = append(g.Affinities, a)
return g
}

// RequireDisk adds a ephemeral disk to the task group
func (g *TaskGroup) RequireDisk(disk *EphemeralDisk) *TaskGroup {
g.EphemeralDisk = disk
Expand Down Expand Up @@ -583,6 +607,7 @@ type Task struct {
User string
Config map[string]interface{}
Constraints []*Constraint
Affinities []*Affinity
Env map[string]string
Services []*Service
Resources *Resources
Expand Down Expand Up @@ -771,6 +796,12 @@ func (t *Task) Constrain(c *Constraint) *Task {
return t
}

// AddAffinity adds a new affinity to a single task.
func (t *Task) AddAffinity(a *Affinity) *Task {
t.Affinities = append(t.Affinities, a)
return t
}

// SetLogConfig sets a log config to a task
func (t *Task) SetLogConfig(l *LogConfig) *Task {
t.LogConfig = l
Expand Down
72 changes: 72 additions & 0 deletions api/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTaskGroup_NewTaskGroup(t *testing.T) {
Expand Down Expand Up @@ -56,6 +57,42 @@ func TestTaskGroup_Constrain(t *testing.T) {
}
}

func TestTaskGroup_AddAffinity(t *testing.T) {
t.Parallel()
grp := NewTaskGroup("grp1", 1)

// Add an affinity to the group
out := grp.AddAffinity(NewAffinity("kernel.version", "=", "4.6", 100))
if n := len(grp.Affinities); n != 1 {
t.Fatalf("expected 1 affinity, got: %d", n)
}

// Check that the group was returned
if out != grp {
t.Fatalf("expected: %#v, got: %#v", grp, out)
}

// Add a second affinity
grp.AddAffinity(NewAffinity("${node.affinity}", "=", "dc2", 50))
expect := []*Affinity{
{
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
},
{
LTarget: "${node.affinity}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
},
}
if !reflect.DeepEqual(grp.Affinities, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, grp.Constraints)
}
}

func TestTaskGroup_SetMeta(t *testing.T) {
t.Parallel()
grp := NewTaskGroup("grp1", 1)
Expand Down Expand Up @@ -232,6 +269,41 @@ func TestTask_Constrain(t *testing.T) {
}
}

func TestTask_AddAffinity(t *testing.T) {
t.Parallel()
task := NewTask("task1", "exec")

// Add an affinity to the task
out := task.AddAffinity(NewAffinity("kernel.version", "=", "4.6", 100))
require := require.New(t)
require.Len(out, 1)

// Check that the task was returned
if out != task {
t.Fatalf("expected: %#v, got: %#v", task, out)
}

// Add a second affinity
task.AddAffinity(NewAffinity("${node.datacenter}", "=", "dc2", 50))
expect := []*Affinity{
{
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
},
{
LTarget: "${node.datacenter}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
},
}
if !reflect.DeepEqual(task.Affinities, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, task.Affinities)
}
}

func TestTask_Artifact(t *testing.T) {
t.Parallel()
a := TaskArtifact{
Expand Down
30 changes: 30 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,13 @@ func ApiJobToStructJob(job *api.Job) *structs.Job {
}
}

if l := len(job.Affinities); l != 0 {
j.Affinities = make([]*structs.Affinity, l)
for i, a := range job.Affinities {
j.Affinities[i] = ApiAffinityToStructs(a)
}
}

// COMPAT: Remove in 0.7.0. Update has been pushed into the task groups
if job.Update != nil {
j.Update = structs.UpdateStrategy{}
Expand Down Expand Up @@ -675,6 +682,13 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
}
}

if l := len(taskGroup.Affinities); l != 0 {
tg.Affinities = make([]*structs.Affinity, l)
for k, affinity := range taskGroup.Affinities {
tg.Affinities[k] = ApiAffinityToStructs(affinity)
}
}

tg.RestartPolicy = &structs.RestartPolicy{
Attempts: *taskGroup.RestartPolicy.Attempts,
Interval: *taskGroup.RestartPolicy.Interval,
Expand Down Expand Up @@ -754,6 +768,13 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
}
}

if l := len(apiTask.Affinities); l != 0 {
structsTask.Affinities = make([]*structs.Affinity, l)
for i, a := range apiTask.Affinities {
structsTask.Affinities[i] = ApiAffinityToStructs(a)
}
}

if l := len(apiTask.Services); l != 0 {
structsTask.Services = make([]*structs.Service, l)
for i, service := range apiTask.Services {
Expand Down Expand Up @@ -892,3 +913,12 @@ func ApiConstraintToStructs(c1 *api.Constraint, c2 *structs.Constraint) {
c2.RTarget = c1.RTarget
c2.Operand = c1.Operand
}

func ApiAffinityToStructs(a1 *api.Affinity) *structs.Affinity {
return &structs.Affinity{
LTarget: a1.LTarget,
Operand: a1.Operand,
RTarget: a1.RTarget,
Weight: a1.Weight,
}
}
48 changes: 48 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "c",
},
},
Affinities: []*api.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},
Update: &api.UpdateStrategy{
Stagger: helper.TimeToPtr(1 * time.Second),
MaxParallel: helper.IntToPtr(5),
Expand Down Expand Up @@ -1248,6 +1256,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*api.Affinity{
{
LTarget: "x",
RTarget: "y",
Operand: "z",
Weight: 100,
},
},
RestartPolicy: &api.RestartPolicy{
Interval: helper.TimeToPtr(1 * time.Second),
Attempts: helper.IntToPtr(5),
Expand Down Expand Up @@ -1303,6 +1319,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*api.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},

Services: []*api.Service{
{
Expand Down Expand Up @@ -1443,6 +1467,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "c",
},
},
Affinities: []*structs.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},
Update: structs.UpdateStrategy{
Stagger: 1 * time.Second,
MaxParallel: 5,
Expand Down Expand Up @@ -1474,6 +1506,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*structs.Affinity{
{
LTarget: "x",
RTarget: "y",
Operand: "z",
Weight: 100,
},
},
RestartPolicy: &structs.RestartPolicy{
Interval: 1 * time.Second,
Attempts: 5,
Expand Down Expand Up @@ -1528,6 +1568,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*structs.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},
Env: map[string]string{
"hello": "world",
},
Expand Down
Loading