Skip to content

Commit

Permalink
core: allow setting and propagation of eval priority on job de/regist…
Browse files Browse the repository at this point in the history
…ration (#11532)

This change modifies the Nomad job register and deregister RPCs to
accept an updated option set which includes eval priority. This
param is optional and override the use of the job priority to set
the eval priority.

In order to ensure all evaluations as a result of the request use
the same eval priority, the priority is shared to the
allocReconciler and deploymentWatcher. This creates a new
distinction between eval priority and job priority.

The Nomad agent HTTP API has been modified to allow setting the
eval priority on job update and delete. To keep consistency with
the current v1 API, job update accepts this as a payload param;
job delete accepts this as a query param.

Any user supplied value is validated within the agent HTTP handler
removing the need to pass invalid requests to the server.

The register and deregister opts functions now all for setting
the eval priority on requests.

The change includes a small change to the DeregisterOpts function
which handles nil opts. This brings the function inline with the
RegisterOpts.
  • Loading branch information
jrasell authored Nov 23, 2021
1 parent 4b73036 commit 80dcae7
Show file tree
Hide file tree
Showing 24 changed files with 1,071 additions and 168 deletions.
3 changes: 3 additions & 0 deletions .changelog/11532.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core: allow setting and propagation of eval priority on job de/registration
```
35 changes: 31 additions & 4 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type RegisterOptions struct {
ModifyIndex uint64
PolicyOverride bool
PreserveCounts bool
EvalPriority int
}

// Register is used to register a new job. It returns the ID
Expand All @@ -105,8 +106,8 @@ func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (*
return j.RegisterOpts(job, &opts, q)
}

// Register is used to register a new job. It returns the ID
// of the evaluation, along with any errors encountered.
// RegisterOpts is used to register a new job with the passed RegisterOpts. It
// returns the ID of the evaluation, along with any errors encountered.
func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
// Format the request
req := &JobRegisterRequest{
Expand All @@ -119,6 +120,7 @@ func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*
}
req.PolicyOverride = opts.PolicyOverride
req.PreserveCounts = opts.PreserveCounts
req.EvalPriority = opts.EvalPriority
}

var resp JobRegisterResponse
Expand Down Expand Up @@ -290,14 +292,31 @@ type DeregisterOptions struct {
// If Global is set to true, all regions of a multiregion job will be
// stopped.
Global bool

// EvalPriority is an optional priority to use on any evaluation created as
// a result on this job deregistration. This value must be between 1-100
// inclusively, where a larger value corresponds to a higher priority. This
// is useful when an operator wishes to push through a job deregistration
// in busy clusters with a large evaluation backlog.
EvalPriority int
}

// DeregisterOpts is used to remove an existing job. See DeregisterOptions
// for parameters.
func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOptions) (string, *WriteMeta, error) {
var resp JobDeregisterResponse
wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t&global=%t",
url.PathEscape(jobID), opts.Purge, opts.Global), &resp, q)

// The base endpoint to add query params to.
endpoint := "/v1/job/" + url.PathEscape(jobID)

// Protect against nil opts. url.Values expects a string, and so using
// fmt.Sprintf is the best way to do this.
if opts != nil {
endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v",
opts.Purge, opts.Global, opts.EvalPriority)
}

wm, err := j.client.delete(endpoint, &resp, q)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -1170,6 +1189,14 @@ type JobRegisterRequest struct {
PolicyOverride bool `json:",omitempty"`
PreserveCounts bool `json:",omitempty"`

// EvalPriority is an optional priority to use on any evaluation created as
// a result on this job registration. This value must be between 1-100
// inclusively, where a larger value corresponds to a higher priority. This
// is useful when an operator wishes to push through a job registration in
// busy clusters with a large evaluation backlog. This avoids needing to
// change the job priority which also impacts preemption.
EvalPriority int `json:",omitempty"`

WriteRequest
}

Expand Down
116 changes: 116 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,60 @@ func TestJobs_Register_NoPreserveCounts(t *testing.T) {
require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specified
}

func TestJobs_Register_EvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()

// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)

// Create a job and register it with an eval priority.
job := testJob()
registerResp, wm, err := c.Jobs().RegisterOpts(job, &RegisterOptions{EvalPriority: 99}, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)

// Check the created job evaluation has a priority that matches our desired
// value.
evalInfo, _, err := c.Evaluations().Info(registerResp.EvalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(99, evalInfo.Priority)
}

func TestJobs_Register_NoEvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()

// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)

// Create a job and register it with an eval priority.
job := testJob()
registerResp, wm, err := c.Jobs().RegisterOpts(job, nil, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)

// Check the created job evaluation has a priority that matches the job
// priority.
evalInfo, _, err := c.Evaluations().Info(registerResp.EvalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(*job.Priority, evalInfo.Priority)
}

func TestJobs_Validate(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
Expand Down Expand Up @@ -1628,6 +1682,68 @@ func TestJobs_Deregister(t *testing.T) {
}
}

func TestJobs_Deregister_EvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()

// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)

// Create a job and register it.
job := testJob()
registerResp, wm, err := c.Jobs().Register(job, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)

// Deregister the job with an eval priority.
evalID, _, err := c.Jobs().DeregisterOpts(*job.ID, &DeregisterOptions{EvalPriority: 97}, nil)
requireAssert.NoError(err)
requireAssert.NotEmpty(t, evalID)

// Lookup the eval and check the priority on it.
evalInfo, _, err := c.Evaluations().Info(evalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(97, evalInfo.Priority)
}

func TestJobs_Deregister_NoEvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()

// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)

// Create a job and register it.
job := testJob()
registerResp, wm, err := c.Jobs().Register(job, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)

// Deregister the job with an eval priority.
evalID, _, err := c.Jobs().DeregisterOpts(*job.ID, &DeregisterOptions{}, nil)
requireAssert.NoError(err)
requireAssert.NotEmpty(t, evalID)

// Lookup the eval and check the priority on it.
evalInfo, _, err := c.Evaluations().Info(evalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(*job.Priority, evalInfo.Priority)
}

func TestJobs_ForceEvaluate(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
Expand Down
13 changes: 13 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,19 @@ func parseBool(req *http.Request, field string) (*bool, error) {
return nil, nil
}

// parseInt parses a query parameter to a int or returns (nil, nil) if the
// parameter is not present.
func parseInt(req *http.Request, field string) (*int, error) {
if str := req.URL.Query().Get(field); str != "" {
param, err := strconv.Atoi(str)
if err != nil {
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a int: %v", field, str, err)
}
return &param, nil
}
return nil, nil
}

// parseToken is used to parse the X-Nomad-Token param
func (s *HTTPServer) parseToken(req *http.Request, token *string) {
if other := req.Header.Get("X-Nomad-Token"); other != "" {
Expand Down
47 changes: 47 additions & 0 deletions command/agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,53 @@ func TestParseBool(t *testing.T) {
}
}

func Test_parseInt(t *testing.T) {
t.Parallel()

cases := []struct {
Input string
Expected *int
Err bool
}{
{
Input: "",
Expected: nil,
},
{
Input: "13",
Expected: helper.IntToPtr(13),
},
{
Input: "99",
Expected: helper.IntToPtr(99),
},
{
Input: "ten",
Err: true,
},
}

for i := range cases {
tc := cases[i]
t.Run("Input-"+tc.Input, func(t *testing.T) {
testURL, err := url.Parse("http://localhost/foo?eval_priority=" + tc.Input)
require.NoError(t, err)
req := &http.Request{
URL: testURL,
}

result, err := parseInt(req, "eval_priority")
if tc.Err {
require.Error(t, err)
require.Nil(t, result)
} else {
require.NoError(t, err)
require.Equal(t, tc.Expected, result)
}
})
}
}

func TestParsePagination(t *testing.T) {
t.Parallel()
s := makeHTTPServer(t, nil)
Expand Down
44 changes: 40 additions & 4 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,13 +390,23 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
}
}

// Validate the evaluation priority if the user supplied a non-default
// value. It's more efficient to do it here, within the agent rather than
// sending a bad request for the server to reject.
if args.EvalPriority != 0 {
if err := validateEvalPriorityOpt(args.EvalPriority); err != nil {
return nil, err
}
}

sJob, writeReq := s.apiJobAndRequestToStructs(args.Job, req, args.WriteRequest)
regReq := structs.JobRegisterRequest{
Job: sJob,
EnforceIndex: args.EnforceIndex,
JobModifyIndex: args.JobModifyIndex,
PolicyOverride: args.PolicyOverride,
PreserveCounts: args.PreserveCounts,
EvalPriority: args.EvalPriority,
WriteRequest: *writeReq,
}

Expand All @@ -411,6 +421,9 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {

args := structs.JobDeregisterRequest{JobID: jobName}

// Identify the purge query param and parse.
purgeStr := req.URL.Query().Get("purge")
var purgeBool bool
if purgeStr != "" {
Expand All @@ -420,7 +433,9 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "purge", purgeStr, err)
}
}
args.Purge = purgeBool

// Identify the global query param and parse.
globalStr := req.URL.Query().Get("global")
var globalBool bool
if globalStr != "" {
Expand All @@ -430,12 +445,24 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "global", globalStr, err)
}
}
args.Global = globalBool

args := structs.JobDeregisterRequest{
JobID: jobName,
Purge: purgeBool,
Global: globalBool,
// Parse the eval priority from the request URL query if present.
evalPriority, err := parseInt(req, "eval_priority")
if err != nil {
return nil, err
}

// Validate the evaluation priority if the user supplied a non-default
// value. It's more efficient to do it here, within the agent rather than
// sending a bad request for the server to reject.
if evalPriority != nil && *evalPriority > 0 {
if err := validateEvalPriorityOpt(*evalPriority); err != nil {
return nil, err
}
args.EvalPriority = *evalPriority
}

s.parseWriteRequest(req, &args.WriteRequest)

var out structs.JobDeregisterResponse
Expand Down Expand Up @@ -1661,3 +1688,12 @@ func ApiSpreadToStructs(a1 *api.Spread) *structs.Spread {
}
return ret
}

// validateEvalPriorityOpt ensures the supplied evaluation priority override
// value is within acceptable bounds.
func validateEvalPriorityOpt(priority int) HTTPCodedError {
if priority < 1 || priority > 100 {
return CodedError(400, "Eval priority must be between 1 and 100 inclusively")
}
return nil
}
Loading

0 comments on commit 80dcae7

Please sign in to comment.