Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configurable range of Job priorities #16084

Merged
merged 15 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16084.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
agent: Allow configurable range of Job priorities
```
5 changes: 4 additions & 1 deletion api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
// DefaultNamespace is the default namespace.
DefaultNamespace = "default"

// DefaultPriority is the default priority
DefaultPriority = 50

// For Job configuration, GlobalRegion is a sentinel region value
// that users may specify to indicate the job should be run on
// the region of the node that the job was submitted to.
Expand Down Expand Up @@ -938,7 +941,7 @@ func (j *Job) Canonicalize() {
j.Namespace = pointerOf(DefaultNamespace)
}
if j.Priority == nil {
j.Priority = pointerOf(50)
j.Priority = pointerOf(DefaultPriority)
tgross marked this conversation as resolved.
Show resolved Hide resolved
}
if j.Stop == nil {
j.Stop = pointerOf(false)
Expand Down
17 changes: 17 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,23 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
}
}

jobMaxPriority := structs.JobDefaultMaxPriority
if agentConfig.Server.JobMaxPriority != nil && *agentConfig.Server.JobMaxPriority != 0 {
jobMaxPriority = *agentConfig.Server.JobMaxPriority
if jobMaxPriority < structs.JobDefaultMaxPriority || jobMaxPriority > structs.JobMaxPriority {
return nil, fmt.Errorf("job_max_priority cannot be %d. Must be between %d and %d", *agentConfig.Server.JobMaxPriority, structs.JobDefaultMaxPriority, structs.JobMaxPriority)
}
}
jobDefaultPriority := structs.JobDefaultPriority
if agentConfig.Server.JobDefaultPriority != nil && *agentConfig.Server.JobDefaultPriority != 0 {
jobDefaultPriority = *agentConfig.Server.JobDefaultPriority
if jobDefaultPriority < structs.JobDefaultPriority || jobDefaultPriority >= jobMaxPriority {
return nil, fmt.Errorf("job_default_priority cannot be %d. Must be between %d and %d", *agentConfig.Server.JobDefaultPriority, structs.JobDefaultPriority, jobMaxPriority)
}
}
conf.JobMaxPriority = jobMaxPriority
conf.JobDefaultPriority = jobDefaultPriority

// Set up the bind addresses
rpcAddr, err := net.ResolveTCPAddr("tcp", agentConfig.normalizedAddrs.RPC)
if err != nil {
Expand Down
142 changes: 142 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent
import (
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -1569,3 +1570,144 @@ func TestAgent_ProxyRPC_Dev(t *testing.T) {
})

}

func TestAgent_ServerConfig_JobMaxPriority_Ok(t *testing.T) {
ci.Parallel(t)

cases := []struct {
maxPriority *int
jobMaxPriority int
}{
{
maxPriority: nil,
jobMaxPriority: 100,
},

{
maxPriority: pointer.Of(0),
jobMaxPriority: 100,
},
{
maxPriority: pointer.Of(100),
jobMaxPriority: 100,
},
{
maxPriority: pointer.Of(200),
jobMaxPriority: 200,
},
{
maxPriority: pointer.Of(32766),
jobMaxPriority: 32766,
},
}

for _, tc := range cases {
v := "default"
if tc.maxPriority != nil {
v = fmt.Sprintf("%v", *tc.maxPriority)
}
t.Run(v, func(t *testing.T) {
conf := DevConfig(nil)
must.NoError(t, conf.normalizeAddrs())

conf.Server.JobMaxPriority = tc.maxPriority

serverConf, err := convertServerConfig(conf)
must.NoError(t, err)
must.Eq(t, tc.jobMaxPriority, serverConf.JobMaxPriority)
})
}
}

func TestAgent_ServerConfig_JobMaxPriority_Bad(t *testing.T) {
ci.Parallel(t)

cases := []int{
99,
math.MaxInt16,
}

for _, tc := range cases {
t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) {
conf := DevConfig(nil)
must.NoError(t, conf.normalizeAddrs())

conf.Server.JobMaxPriority = &tc

_, err := convertServerConfig(conf)
must.Error(t, err)
must.ErrorContains(t, err, "job_max_priority cannot be")
})
}
}

func TestAgent_ServerConfig_JobDefaultPriority_Ok(t *testing.T) {
ci.Parallel(t)

cases := []struct {
defaultPriority *int
jobDefaultPriority int
}{
{
defaultPriority: nil,
jobDefaultPriority: 50,
},

{
defaultPriority: pointer.Of(0),
jobDefaultPriority: 50,
},
{
defaultPriority: pointer.Of(50),
jobDefaultPriority: 50,
},
{
defaultPriority: pointer.Of(60),
jobDefaultPriority: 60,
},
{
defaultPriority: pointer.Of(99),
jobDefaultPriority: 99,
},
}

for _, tc := range cases {
v := "default"
if tc.defaultPriority != nil {
v = fmt.Sprintf("%v", *tc.defaultPriority)
}
t.Run(v, func(t *testing.T) {
conf := DevConfig(nil)
must.NoError(t, conf.normalizeAddrs())

conf.Server.JobDefaultPriority = tc.defaultPriority

serverConf, err := convertServerConfig(conf)
must.NoError(t, err)

must.Eq(t, tc.jobDefaultPriority, serverConf.JobDefaultPriority)
})
}
}

func TestAgent_ServerConfig_JobDefaultPriority_Bad(t *testing.T) {
ci.Parallel(t)

cases := []int{
49,
100,
}

for _, tc := range cases {
t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) {
conf := DevConfig(nil)
must.NoError(t, conf.normalizeAddrs())

conf.Server.JobDefaultPriority = &tc

_, err := convertServerConfig(conf)
must.Error(t, err)
must.ErrorContains(t, err, "job_default_priority cannot be")
})
}
}
14 changes: 14 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,12 @@ type ServerConfig struct {
// forced to send an entire snapshot. The value passed here is the initial
// setting used. This can be tuned during operation using a hot reload.
RaftTrailingLogs *int `hcl:"raft_trailing_logs"`

// JobDefaultPriority is the default Job priority if not specified.
JobDefaultPriority *int `hcl:"job_default_priority"`

// JobMaxPriority is an upper bound on the Job priority.
JobMaxPriority *int `hcl:"job_max_priority"`
}

func (s *ServerConfig) Copy() *ServerConfig {
Expand All @@ -669,6 +675,8 @@ func (s *ServerConfig) Copy() *ServerConfig {
ns.RaftSnapshotInterval = pointer.Copy(s.RaftSnapshotInterval)
ns.RaftSnapshotThreshold = pointer.Copy(s.RaftSnapshotThreshold)
ns.RaftTrailingLogs = pointer.Copy(s.RaftTrailingLogs)
ns.JobDefaultPriority = pointer.Copy(s.JobDefaultPriority)
ns.JobMaxPriority = pointer.Copy(s.JobMaxPriority)
return &ns
}

Expand Down Expand Up @@ -1867,6 +1875,12 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.JobGCThreshold != "" {
result.JobGCThreshold = b.JobGCThreshold
}
if b.JobDefaultPriority != nil {
result.JobDefaultPriority = pointer.Of(*b.JobDefaultPriority)
}
if b.JobMaxPriority != nil {
result.JobMaxPriority = pointer.Of(*b.JobMaxPriority)
}
if b.EvalGCThreshold != "" {
result.EvalGCThreshold = b.EvalGCThreshold
}
Expand Down
4 changes: 3 additions & 1 deletion command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ var basicConfig = &Config{
ServiceSchedulerEnabled: true,
},
},
LicensePath: "/tmp/nomad.hclic",
LicensePath: "/tmp/nomad.hclic",
JobDefaultPriority: pointer.Of(100),
JobMaxPriority: pointer.Of(200),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down
2 changes: 2 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ func TestConfig_Merge(t *testing.T) {
NodeThreshold: 100,
NodeWindow: 11 * time.Minute,
},
JobMaxPriority: pointer.Of(200),
JobDefaultPriority: pointer.Of(100),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down
2 changes: 2 additions & 0 deletions command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ server {
raft_multiplier = 4
enable_event_broker = false
event_buffer_size = 200
job_default_priority = 100
job_max_priority = 200

plan_rejection_tracker {
enabled = true
Expand Down
4 changes: 3 additions & 1 deletion command/agent/testdata/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@
}]
}],
"upgrade_version": "0.8.0",
"license_path": "/tmp/nomad.hclic"
"license_path": "/tmp/nomad.hclic",
"job_default_priority": 100,
"job_max_priority": 200
}
],
"syslog_facility": "LOCAL1",
Expand Down
8 changes: 8 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,12 @@ type Config struct {
// DeploymentQueryRateLimit is in queries per second and is used by the
// DeploymentWatcher to throttle the amount of simultaneously deployments
DeploymentQueryRateLimit float64

// JobDefaultPriority is the default Job priority if not specified.
JobDefaultPriority int

// JobMaxPriority is an upper bound on the Job priority.
JobMaxPriority int
}

func (c *Config) Copy() *Config {
Expand Down Expand Up @@ -529,6 +535,8 @@ func DefaultConfig() *Config {
},
},
DeploymentQueryRateLimit: deploymentwatcher.LimitStateQueriesPerSecond,
JobDefaultPriority: structs.JobDefaultPriority,
JobMaxPriority: structs.JobDefaultMaxPriority,
}

// Enable all known schedulers by default
Expand Down
4 changes: 2 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewJobEndpoints(s *Server, ctx *RPCContext) *Job {
jobExposeCheckHook{},
jobVaultHook{srv: s},
jobNamespaceConstraintCheckHook{srv: s},
jobValidate{},
&jobValidate{srv: s},
&memoryOversubscriptionValidate{srv: s},
},
}
Expand Down Expand Up @@ -990,7 +990,7 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st
continue
}

priority := structs.JobDefaultPriority
priority := j.srv.config.JobDefaultPriority
jtype := structs.JobTypeService
if job != nil {
priority = job.Priority
Expand Down
12 changes: 9 additions & 3 deletions nomad/job_endpoint_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,15 @@ func mutateConstraint(matcher constraintMatcher, taskGroup *structs.TaskGroup, c
// jobValidate validates a Job and task drivers and returns an error if there is
// a validation problem or if the Job is of a type a user is not allowed to
// submit.
type jobValidate struct{}
type jobValidate struct {
srv *Server
}

func (jobValidate) Name() string {
func (*jobValidate) Name() string {
return "validate"
}

func (jobValidate) Validate(job *structs.Job) (warnings []error, err error) {
func (v *jobValidate) Validate(job *structs.Job) (warnings []error, err error) {
validationErrors := new(multierror.Error)
if err := job.Validate(); err != nil {
multierror.Append(validationErrors, err)
Expand Down Expand Up @@ -287,6 +289,10 @@ func (jobValidate) Validate(job *structs.Job) (warnings []error, err error) {
multierror.Append(validationErrors, fmt.Errorf("job can't be submitted with a payload, only dispatched"))
}

if job.Priority < structs.JobMinPriority || job.Priority > v.srv.config.JobMaxPriority {
multierror.Append(validationErrors, fmt.Errorf("job priority must be between [%d, %d]", structs.JobMinPriority, v.srv.config.JobMaxPriority))
}

return warnings, validationErrors.ErrorOrNil()
}

Expand Down
Loading