Skip to content

Commit

Permalink
Allow configurable range of Job priorities (#16084)
Browse files Browse the repository at this point in the history
  • Loading branch information
alessio-perugini authored Feb 17, 2023
1 parent ebe4b51 commit 365ccf4
Show file tree
Hide file tree
Showing 22 changed files with 365 additions and 46 deletions.
3 changes: 3 additions & 0 deletions .changelog/16084.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
agent: Allow configurable range of Job priorities
```
2 changes: 1 addition & 1 deletion api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func (j *Job) Canonicalize() {
j.Namespace = pointerOf(DefaultNamespace)
}
if j.Priority == nil {
j.Priority = pointerOf(50)
j.Priority = pointerOf(0)
}
if j.Stop == nil {
j.Stop = pointerOf(false)
Expand Down
16 changes: 8 additions & 8 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func TestJobs_Canonicalize(t *testing.T) {
Namespace: pointerOf(DefaultNamespace),
Type: pointerOf("service"),
ParentID: pointerOf(""),
Priority: pointerOf(50),
Priority: pointerOf(0),
AllAtOnce: pointerOf(false),
ConsulToken: pointerOf(""),
ConsulNamespace: pointerOf(""),
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestJobs_Canonicalize(t *testing.T) {
Namespace: pointerOf(DefaultNamespace),
Type: pointerOf("batch"),
ParentID: pointerOf(""),
Priority: pointerOf(50),
Priority: pointerOf(0),
AllAtOnce: pointerOf(false),
ConsulToken: pointerOf(""),
ConsulNamespace: pointerOf(""),
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestJobs_Canonicalize(t *testing.T) {
Region: pointerOf("global"),
Type: pointerOf("service"),
ParentID: pointerOf("lol"),
Priority: pointerOf(50),
Priority: pointerOf(0),
AllAtOnce: pointerOf(false),
ConsulToken: pointerOf(""),
ConsulNamespace: pointerOf(""),
Expand Down Expand Up @@ -621,7 +621,7 @@ func TestJobs_Canonicalize(t *testing.T) {
ID: pointerOf("example_template"),
Name: pointerOf("example_template"),
ParentID: pointerOf(""),
Priority: pointerOf(50),
Priority: pointerOf(0),
Region: pointerOf("global"),
Type: pointerOf("service"),
AllAtOnce: pointerOf(false),
Expand Down Expand Up @@ -791,7 +791,7 @@ func TestJobs_Canonicalize(t *testing.T) {
Name: pointerOf("bar"),
Region: pointerOf("global"),
Type: pointerOf("service"),
Priority: pointerOf(50),
Priority: pointerOf(0),
AllAtOnce: pointerOf(false),
ConsulToken: pointerOf(""),
ConsulNamespace: pointerOf(""),
Expand Down Expand Up @@ -882,7 +882,7 @@ func TestJobs_Canonicalize(t *testing.T) {
Region: pointerOf("global"),
Type: pointerOf("service"),
ParentID: pointerOf("lol"),
Priority: pointerOf(50),
Priority: pointerOf(0),
AllAtOnce: pointerOf(false),
ConsulToken: pointerOf(""),
ConsulNamespace: pointerOf(""),
Expand Down Expand Up @@ -1058,7 +1058,7 @@ func TestJobs_Canonicalize(t *testing.T) {
Region: pointerOf("global"),
Type: pointerOf("service"),
ParentID: pointerOf("lol"),
Priority: pointerOf(50),
Priority: pointerOf(0),
AllAtOnce: pointerOf(false),
ConsulToken: pointerOf(""),
ConsulNamespace: pointerOf(""),
Expand Down Expand Up @@ -1229,7 +1229,7 @@ func TestJobs_Canonicalize(t *testing.T) {
Region: pointerOf("global"),
Type: pointerOf("service"),
ParentID: pointerOf("lol"),
Priority: pointerOf(50),
Priority: pointerOf(0),
AllAtOnce: pointerOf(false),
ConsulToken: pointerOf(""),
ConsulNamespace: pointerOf(""),
Expand Down
17 changes: 17 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,23 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
}
}

jobMaxPriority := structs.JobDefaultMaxPriority
if agentConfig.Server.JobMaxPriority != nil && *agentConfig.Server.JobMaxPriority != 0 {
jobMaxPriority = *agentConfig.Server.JobMaxPriority
if jobMaxPriority < structs.JobDefaultMaxPriority || jobMaxPriority > structs.JobMaxPriority {
return nil, fmt.Errorf("job_max_priority cannot be %d. Must be between %d and %d", *agentConfig.Server.JobMaxPriority, structs.JobDefaultMaxPriority, structs.JobMaxPriority)
}
}
jobDefaultPriority := structs.JobDefaultPriority
if agentConfig.Server.JobDefaultPriority != nil && *agentConfig.Server.JobDefaultPriority != 0 {
jobDefaultPriority = *agentConfig.Server.JobDefaultPriority
if jobDefaultPriority < structs.JobDefaultPriority || jobDefaultPriority >= jobMaxPriority {
return nil, fmt.Errorf("job_default_priority cannot be %d. Must be between %d and %d", *agentConfig.Server.JobDefaultPriority, structs.JobDefaultPriority, jobMaxPriority)
}
}
conf.JobMaxPriority = jobMaxPriority
conf.JobDefaultPriority = jobDefaultPriority

// Set up the bind addresses
rpcAddr, err := net.ResolveTCPAddr("tcp", agentConfig.normalizedAddrs.RPC)
if err != nil {
Expand Down
142 changes: 142 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent
import (
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -1569,3 +1570,144 @@ func TestAgent_ProxyRPC_Dev(t *testing.T) {
})

}

func TestAgent_ServerConfig_JobMaxPriority_Ok(t *testing.T) {
ci.Parallel(t)

cases := []struct {
maxPriority *int
jobMaxPriority int
}{
{
maxPriority: nil,
jobMaxPriority: 100,
},

{
maxPriority: pointer.Of(0),
jobMaxPriority: 100,
},
{
maxPriority: pointer.Of(100),
jobMaxPriority: 100,
},
{
maxPriority: pointer.Of(200),
jobMaxPriority: 200,
},
{
maxPriority: pointer.Of(32766),
jobMaxPriority: 32766,
},
}

for _, tc := range cases {
v := "default"
if tc.maxPriority != nil {
v = fmt.Sprintf("%v", *tc.maxPriority)
}
t.Run(v, func(t *testing.T) {
conf := DevConfig(nil)
must.NoError(t, conf.normalizeAddrs())

conf.Server.JobMaxPriority = tc.maxPriority

serverConf, err := convertServerConfig(conf)
must.NoError(t, err)
must.Eq(t, tc.jobMaxPriority, serverConf.JobMaxPriority)
})
}
}

func TestAgent_ServerConfig_JobMaxPriority_Bad(t *testing.T) {
ci.Parallel(t)

cases := []int{
99,
math.MaxInt16,
}

for _, tc := range cases {
t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) {
conf := DevConfig(nil)
must.NoError(t, conf.normalizeAddrs())

conf.Server.JobMaxPriority = &tc

_, err := convertServerConfig(conf)
must.Error(t, err)
must.ErrorContains(t, err, "job_max_priority cannot be")
})
}
}

func TestAgent_ServerConfig_JobDefaultPriority_Ok(t *testing.T) {
ci.Parallel(t)

cases := []struct {
defaultPriority *int
jobDefaultPriority int
}{
{
defaultPriority: nil,
jobDefaultPriority: 50,
},

{
defaultPriority: pointer.Of(0),
jobDefaultPriority: 50,
},
{
defaultPriority: pointer.Of(50),
jobDefaultPriority: 50,
},
{
defaultPriority: pointer.Of(60),
jobDefaultPriority: 60,
},
{
defaultPriority: pointer.Of(99),
jobDefaultPriority: 99,
},
}

for _, tc := range cases {
v := "default"
if tc.defaultPriority != nil {
v = fmt.Sprintf("%v", *tc.defaultPriority)
}
t.Run(v, func(t *testing.T) {
conf := DevConfig(nil)
must.NoError(t, conf.normalizeAddrs())

conf.Server.JobDefaultPriority = tc.defaultPriority

serverConf, err := convertServerConfig(conf)
must.NoError(t, err)

must.Eq(t, tc.jobDefaultPriority, serverConf.JobDefaultPriority)
})
}
}

func TestAgent_ServerConfig_JobDefaultPriority_Bad(t *testing.T) {
ci.Parallel(t)

cases := []int{
49,
100,
}

for _, tc := range cases {
t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) {
conf := DevConfig(nil)
must.NoError(t, conf.normalizeAddrs())

conf.Server.JobDefaultPriority = &tc

_, err := convertServerConfig(conf)
must.Error(t, err)
must.ErrorContains(t, err, "job_default_priority cannot be")
})
}
}
14 changes: 14 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,12 @@ type ServerConfig struct {
// forced to send an entire snapshot. The value passed here is the initial
// setting used. This can be tuned during operation using a hot reload.
RaftTrailingLogs *int `hcl:"raft_trailing_logs"`

// JobDefaultPriority is the default Job priority if not specified.
JobDefaultPriority *int `hcl:"job_default_priority"`

// JobMaxPriority is an upper bound on the Job priority.
JobMaxPriority *int `hcl:"job_max_priority"`
}

func (s *ServerConfig) Copy() *ServerConfig {
Expand All @@ -673,6 +679,8 @@ func (s *ServerConfig) Copy() *ServerConfig {
ns.RaftSnapshotInterval = pointer.Copy(s.RaftSnapshotInterval)
ns.RaftSnapshotThreshold = pointer.Copy(s.RaftSnapshotThreshold)
ns.RaftTrailingLogs = pointer.Copy(s.RaftTrailingLogs)
ns.JobDefaultPriority = pointer.Copy(s.JobDefaultPriority)
ns.JobMaxPriority = pointer.Copy(s.JobMaxPriority)
return &ns
}

Expand Down Expand Up @@ -1871,6 +1879,12 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.JobGCThreshold != "" {
result.JobGCThreshold = b.JobGCThreshold
}
if b.JobDefaultPriority != nil {
result.JobDefaultPriority = pointer.Of(*b.JobDefaultPriority)
}
if b.JobMaxPriority != nil {
result.JobMaxPriority = pointer.Of(*b.JobMaxPriority)
}
if b.EvalGCThreshold != "" {
result.EvalGCThreshold = b.EvalGCThreshold
}
Expand Down
4 changes: 3 additions & 1 deletion command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ var basicConfig = &Config{
ServiceSchedulerEnabled: true,
},
},
LicensePath: "/tmp/nomad.hclic",
LicensePath: "/tmp/nomad.hclic",
JobDefaultPriority: pointer.Of(100),
JobMaxPriority: pointer.Of(200),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down
2 changes: 2 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ func TestConfig_Merge(t *testing.T) {
NodeThreshold: 100,
NodeWindow: 11 * time.Minute,
},
JobMaxPriority: pointer.Of(200),
JobDefaultPriority: pointer.Of(100),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down
1 change: 0 additions & 1 deletion command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ func (s *HTTPServer) ValidateJobRequest(resp http.ResponseWriter, req *http.Requ
}

job := ApiJobToStructJob(validateRequest.Job)

args := structs.JobValidateRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Expand Down
18 changes: 10 additions & 8 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -2066,11 +2067,11 @@ func TestJobs_ParsingWriteRequest(t *testing.T) {
}

sJob, sWriteReq := srv.apiJobAndRequestToStructs(job, req, apiReq)
require.Equal(t, tc.expectedJobRegion, sJob.Region)
require.Equal(t, tc.expectedNamespace, sJob.Namespace)
require.Equal(t, tc.expectedNamespace, sWriteReq.Namespace)
require.Equal(t, tc.expectedRequestRegion, sWriteReq.Region)
require.Equal(t, tc.expectedToken, sWriteReq.AuthToken)
must.Eq(t, tc.expectedJobRegion, sJob.Region)
must.Eq(t, tc.expectedNamespace, sJob.Namespace)
must.Eq(t, tc.expectedNamespace, sWriteReq.Namespace)
must.Eq(t, tc.expectedRequestRegion, sWriteReq.Region)
must.Eq(t, tc.expectedToken, sWriteReq.AuthToken)
})
}
}
Expand Down Expand Up @@ -3546,16 +3547,17 @@ func TestHTTP_JobValidate_SystemMigrate(t *testing.T) {

// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/validate/job", buf)
require.NoError(t, err)
must.NoError(t, err)
respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.ValidateJobRequest(respW, req)
require.NoError(t, err)
must.NoError(t, err)

// Check the response
resp := obj.(structs.JobValidateResponse)
require.Contains(t, resp.Error, `Job type "system" does not allow migrate block`)
must.StrContains(t, resp.Error, `Job type "system" does not allow migrate block`)
must.Len(t, 1, resp.ValidationErrors)
})
}

Expand Down
2 changes: 2 additions & 0 deletions command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ server {
raft_multiplier = 4
enable_event_broker = false
event_buffer_size = 200
job_default_priority = 100
job_max_priority = 200

plan_rejection_tracker {
enabled = true
Expand Down
4 changes: 3 additions & 1 deletion command/agent/testdata/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@
}]
}],
"upgrade_version": "0.8.0",
"license_path": "/tmp/nomad.hclic"
"license_path": "/tmp/nomad.hclic",
"job_default_priority": 100,
"job_max_priority": 200
}
],
"syslog_facility": "LOCAL1",
Expand Down
Loading

0 comments on commit 365ccf4

Please sign in to comment.