From 3eeded7d5393ec7f5724ada558848caa42b5cb0c Mon Sep 17 00:00:00 2001 From: James Rasell Date: Thu, 18 Nov 2021 09:22:55 +0100 Subject: [PATCH 1/8] core: allow setting and propagation of eval priority. This change modifies the Nomad job register and deregister RPCs to accept an updated option set which includes eval priority. This param is optional and override the use of the job priority to set the eval priority. In order to ensure all evaluations as a result of the request use the same eval priority, the priority is shared to the allocReconciler and deploymentWatcher. This creates a new distinction between eval priority and job priority. --- nomad/deploymentwatcher/deployment_watcher.go | 2 +- nomad/job_endpoint.go | 39 ++- nomad/job_endpoint_test.go | 74 ++++ nomad/state/state_store_test.go | 2 +- nomad/structs/structs.go | 24 +- scheduler/generic_sched.go | 2 +- scheduler/reconcile.go | 16 +- scheduler/reconcile_test.go | 323 +++++++++++------- 8 files changed, 333 insertions(+), 149 deletions(-) diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index 896b48e4463..b70548a3e3d 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -827,7 +827,7 @@ func (w *deploymentWatcher) getEval() *structs.Evaluation { return &structs.Evaluation{ ID: uuid.Generate(), Namespace: w.j.Namespace, - Priority: w.j.Priority, + Priority: w.d.EvalPriority, Type: w.j.Type, TriggeredBy: structs.EvalTriggerDeploymentWatcher, JobID: w.j.ID, diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index f98a6e4f270..a2e4d7508c1 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -357,10 +357,18 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // If the job is periodic or parameterized, we don't create an eval. if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) { + + // Initially set the eval priority to that of the job priority. If the + // user supplied an eval priority override, we subsequently use this. + evalPriority := args.Job.Priority + if args.EvalPriority > 0 { + evalPriority = args.EvalPriority + } + eval = &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), - Priority: args.Job.Priority, + Priority: evalPriority, Type: args.Job.Type, TriggeredBy: structs.EvalTriggerJobRegister, JobID: args.Job.ID, @@ -829,22 +837,23 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // priority even if the job was. now := time.Now().UnixNano() - // Set our default priority initially, but update this to that configured - // within the job if possible. It is reasonable from a user perspective - // that jobs with a higher priority have their deregister evaluated before - // those of a lower priority. - // - // Alternatively, the previous behaviour was to set the eval priority to - // the default value. Jobs with a lower than default register priority - // would therefore have their deregister eval priorities higher than - // expected. - priority := structs.JobDefaultPriority - if job != nil { - priority = job.Priority - } - // If the job is periodic or parameterized, we don't create an eval. if job == nil || !(job.IsPeriodic() || job.IsParameterized()) { + + // The evaluation priority is determined by several factors. It + // defaults to the job default priority and is overridden by the + // priority set on the job specification. + // + // If the user supplied an eval priority override, we subsequently + // use this. + priority := structs.JobDefaultPriority + if job != nil { + priority = job.Priority + } + if args.EvalPriority > 0 { + priority = args.EvalPriority + } + eval = &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 3c3c30281d6..460ebd780cd 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -169,6 +169,38 @@ func TestJobEndpoint_Register_PreserveCounts(t *testing.T) { require.Equal(2, out.TaskGroups[1].Count) // should be as in job spec } +func TestJobEndpoint_Register_EvalPriority(t *testing.T) { + t.Parallel() + requireAssert := require.New(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { c.NumSchedulers = 0 }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + job.TaskGroups[0].Name = "group1" + job.Canonicalize() + + // Register the job. + requireAssert.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + EvalPriority: 99, + }, &structs.JobRegisterResponse{})) + + // Grab the eval from the state, and check its priority is as expected. + state := s1.fsm.State() + out, err := state.EvalsByJob(nil, job.Namespace, job.ID) + requireAssert.NoError(err) + requireAssert.Len(out, 1) + requireAssert.Equal(99, out[0].Priority) +} + func TestJobEndpoint_Register_Connect(t *testing.T) { t.Parallel() require := require.New(t) @@ -3365,6 +3397,48 @@ func TestJobEndpoint_Deregister_Nonexistent(t *testing.T) { } } +func TestJobEndpoint_Deregister_EvalPriority(t *testing.T) { + t.Parallel() + requireAssert := require.New(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + job.Canonicalize() + + // Register the job. + requireAssert.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + }, &structs.JobRegisterResponse{})) + + // Create the deregister request. + deregReq := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + EvalPriority: 99, + } + var deregResp structs.JobDeregisterResponse + requireAssert.NoError(msgpackrpc.CallWithCodec(codec, "Job.Deregister", deregReq, &deregResp)) + + // Grab the eval from the state, and check its priority is as expected. + out, err := s1.fsm.State().EvalByID(nil, deregResp.EvalID) + requireAssert.NoError(err) + requireAssert.Equal(99, out.Priority) +} + func TestJobEndpoint_Deregister_Periodic(t *testing.T) { t.Parallel() diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 3d71a1dd66b..4942574a82e 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -6781,7 +6781,7 @@ func TestStateStore_UpsertDeploymentStatusUpdate_Successful(t *testing.T) { } // Insert a deployment - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) if err := state.UpsertDeployment(2, d); err != nil { t.Fatalf("bad: %v", err) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1c3ed824cdc..e7b1aaea1b6 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -586,6 +586,14 @@ type JobRegisterRequest struct { // PolicyOverride is set when the user is attempting to override any policies PolicyOverride bool + // EvalPriority is an optional priority to use on any evaluation created as + // a result on this job registration. This value must be between 1-100 + // inclusively, where a larger value corresponds to a higher priority. This + // is useful when an operator wishes to push through a job registration in + // busy clusters with a large evaluation backlog. This avoids needing to + // change the job priority which also impacts preemption. + EvalPriority int + // Eval is the evaluation that is associated with the job registration Eval *Evaluation @@ -606,6 +614,13 @@ type JobDeregisterRequest struct { // deregistered. It is ignored for single-region jobs. Global bool + // EvalPriority is an optional priority to use on any evaluation created as + // a result on this job deregistration. This value must be between 1-100 + // inclusively, where a larger value corresponds to a higher priority. This + // is useful when an operator wishes to push through a job deregistration + // in busy clusters with a large evaluation backlog. + EvalPriority int + // Eval is the evaluation to create that's associated with job deregister Eval *Evaluation @@ -8847,12 +8862,18 @@ type Deployment struct { // status. StatusDescription string + // EvalPriority tracks the priority of the evaluation which lead to the + // creation of this Deployment object. Any additional evaluations created + // as a result of this deployment can therefore inherit this value, which + // is not guaranteed to be that of the job priority parameter. + EvalPriority int + CreateIndex uint64 ModifyIndex uint64 } // NewDeployment creates a new deployment given the job. -func NewDeployment(job *Job) *Deployment { +func NewDeployment(job *Job, evalPriority int) *Deployment { return &Deployment{ ID: uuid.Generate(), Namespace: job.Namespace, @@ -8865,6 +8886,7 @@ func NewDeployment(job *Job) *Deployment { Status: DeploymentStatusRunning, StatusDescription: DeploymentStatusDescriptionRunning, TaskGroups: make(map[string]*DeploymentState, len(job.TaskGroups)), + EvalPriority: evalPriority, } } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 69fcfbddb21..fd19e259bd6 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -351,7 +351,7 @@ func (s *GenericScheduler) computeJobAllocs() error { reconciler := NewAllocReconciler(s.logger, genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID), - s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID) + s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID, s.eval.Priority) results := reconciler.Compute() s.logger.Debug("reconciled current state with desired state", "results", log.Fmt("%#v", results)) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index ba9e20e819d..b08274198c3 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -73,8 +73,10 @@ type allocReconciler struct { // existingAllocs is non-terminal existing allocations existingAllocs []*structs.Allocation - // evalID is the ID of the evaluation that triggered the reconciler - evalID string + // evalID and evalPriority is the ID and Priority of the evaluation that + // triggered the reconciler. + evalID string + evalPriority int // now is the time used when determining rescheduling eligibility // defaults to time.Now, and overidden in unit tests @@ -160,7 +162,8 @@ func (r *reconcileResults) Changes() int { // the changes required to bring the cluster state inline with the declared jobspec func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch bool, jobID string, job *structs.Job, deployment *structs.Deployment, - existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string) *allocReconciler { + existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string, + evalPriority int) *allocReconciler { return &allocReconciler{ logger: logger.Named("reconciler"), allocUpdateFn: allocUpdateFn, @@ -171,6 +174,7 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch existingAllocs: existingAllocs, taintedNodes: taintedNodes, evalID: evalID, + evalPriority: evalPriority, now: time.Now(), result: &reconcileResults{ desiredTGUpdates: make(map[string]*structs.DesiredUpdates), @@ -555,7 +559,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { if !existingDeployment && !strategy.IsEmpty() && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) { // A previous group may have made the deployment already if a.deployment == nil { - a.deployment = structs.NewDeployment(a.job) + a.deployment = structs.NewDeployment(a.job, a.evalPriority) // in multiregion jobs, most deployments start in a pending state if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) { a.deployment.Status = structs.DeploymentStatusPending @@ -942,7 +946,7 @@ func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedReschedule eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: a.job.Namespace, - Priority: a.job.Priority, + Priority: a.evalPriority, Type: a.job.Type, TriggeredBy: structs.EvalTriggerRetryFailedAlloc, JobID: a.job.ID, @@ -963,7 +967,7 @@ func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedReschedule eval = &structs.Evaluation{ ID: uuid.Generate(), Namespace: a.job.Namespace, - Priority: a.job.Priority, + Priority: a.evalPriority, Type: a.job.Type, TriggeredBy: structs.EvalTriggerRetryFailedAlloc, JobID: a.job.ID, diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index d698b2d8744..c80252d3be5 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -262,7 +262,7 @@ type resultExpectation struct { func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) { t.Helper() - assert := assert.New(t) + assertion := assert.New(t) if exp.createDeployment != nil && r.deployment == nil { t.Errorf("Expect a created deployment got none") @@ -277,20 +277,22 @@ func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) { } } - assert.EqualValues(exp.deploymentUpdates, r.deploymentUpdates, "Expected Deployment Updates") - assert.Len(r.place, exp.place, "Expected Placements") - assert.Len(r.destructiveUpdate, exp.destructive, "Expected Destructive") - assert.Len(r.inplaceUpdate, exp.inplace, "Expected Inplace Updates") - assert.Len(r.attributeUpdates, exp.attributeUpdates, "Expected Attribute Updates") - assert.Len(r.stop, exp.stop, "Expected Stops") - assert.EqualValues(exp.desiredTGUpdates, r.desiredTGUpdates, "Expected Desired TG Update Annotations") + assertion.EqualValues(exp.deploymentUpdates, r.deploymentUpdates, "Expected Deployment Updates") + assertion.Len(r.place, exp.place, "Expected Placements") + assertion.Len(r.destructiveUpdate, exp.destructive, "Expected Destructive") + assertion.Len(r.inplaceUpdate, exp.inplace, "Expected Inplace Updates") + assertion.Len(r.attributeUpdates, exp.attributeUpdates, "Expected Attribute Updates") + assertion.Len(r.stop, exp.stop, "Expected Stops") + assertion.EqualValues(exp.desiredTGUpdates, r.desiredTGUpdates, "Expected Desired TG Update Annotations") } // Tests the reconciler properly handles placements for a job that has no // existing allocations func TestReconciler_Place_NoExisting(t *testing.T) { job := mock.Job() - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, nil, nil, "") + reconciler := NewAllocReconciler( + testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, nil, nil, "", job.Priority) r := reconciler.Compute() // Assert the correct results @@ -326,7 +328,8 @@ func TestReconciler_Place_Existing(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -364,7 +367,8 @@ func TestReconciler_ScaleDown_Partial(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -403,7 +407,8 @@ func TestReconciler_ScaleDown_Zero(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -443,7 +448,8 @@ func TestReconciler_ScaleDown_Zero_DuplicateNames(t *testing.T) { expectedStopped = append(expectedStopped, i%2) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -478,7 +484,8 @@ func TestReconciler_Inplace(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnInplace, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -516,7 +523,8 @@ func TestReconciler_Inplace_ScaleUp(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnInplace, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -556,7 +564,8 @@ func TestReconciler_Inplace_ScaleDown(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnInplace, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -618,7 +627,7 @@ func TestReconciler_Inplace_Rollback(t *testing.T) { }, allocUpdateFnDestructive) reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFn, - false, job.ID, job, nil, allocs, nil, uuid.Generate()) + false, job.ID, job, nil, allocs, nil, uuid.Generate(), 50) r := reconciler.Compute() // Assert the correct results @@ -661,7 +670,8 @@ func TestReconciler_Destructive(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -694,7 +704,8 @@ func TestReconciler_DestructiveMaxParallel(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -730,7 +741,8 @@ func TestReconciler_Destructive_ScaleUp(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -769,7 +781,8 @@ func TestReconciler_Destructive_ScaleDown(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -814,7 +827,8 @@ func TestReconciler_LostNode(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, tainted, "", 50) r := reconciler.Compute() // Assert the correct results @@ -864,7 +878,8 @@ func TestReconciler_LostNode_ScaleUp(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, tainted, "", 50) r := reconciler.Compute() // Assert the correct results @@ -914,7 +929,8 @@ func TestReconciler_LostNode_ScaleDown(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, tainted, "", 50) r := reconciler.Compute() // Assert the correct results @@ -959,7 +975,8 @@ func TestReconciler_DrainNode(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, tainted, "", 50) r := reconciler.Compute() // Assert the correct results @@ -1011,7 +1028,8 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, tainted, "", 50) r := reconciler.Compute() // Assert the correct results @@ -1064,7 +1082,8 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, tainted, "", 50) r := reconciler.Compute() // Assert the correct results @@ -1109,7 +1128,8 @@ func TestReconciler_RemovedTG(t *testing.T) { newName := "different" job.TaskGroups[0].Name = newName - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -1171,7 +1191,8 @@ func TestReconciler_JobStopped(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, c.jobID, c.job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, c.jobID, c.job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -1237,7 +1258,8 @@ func TestReconciler_JobStopped_TerminalAllocs(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, c.jobID, c.job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, c.jobID, c.job, + nil, allocs, nil, "", 50) r := reconciler.Compute() require.Len(t, r.stop, 0) // Assert the correct results @@ -1273,7 +1295,8 @@ func TestReconciler_MultiTG(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -1320,12 +1343,13 @@ func TestReconciler_MultiTG_SingleUpdateStanza(t *testing.T) { } } - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ DesiredTotal: 10, } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -1401,7 +1425,8 @@ func TestReconciler_RescheduleLater_Batch(t *testing.T) { // Mark one as complete allocs[5].ClientStatus = structs.AllocClientStatusComplete - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil, uuid.Generate()) + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, + nil, allocs, nil, uuid.Generate(), 50) r := reconciler.Compute() // Two reschedule attempts were already made, one more can be made at a future time @@ -1481,7 +1506,8 @@ func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) { FinishedAt: now.Add(10 * time.Second)}} } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil, uuid.Generate()) + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, + nil, allocs, nil, uuid.Generate(), 50) r := reconciler.Compute() // Verify that two follow up evals were created @@ -1575,7 +1601,8 @@ func TestReconciler_RescheduleNow_Batch(t *testing.T) { // Mark one as complete allocs[5].ClientStatus = structs.AllocClientStatusComplete - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, + nil, allocs, nil, "", 50) reconciler.now = now r := reconciler.Compute() @@ -1650,7 +1677,8 @@ func TestReconciler_RescheduleLater_Service(t *testing.T) { // Mark one as desired state stop allocs[4].DesiredStatus = structs.AllocDesiredStatusStop - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, uuid.Generate()) + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, uuid.Generate(), 50) r := reconciler.Compute() // Should place a new placement and create a follow up eval for the delayed reschedule @@ -1719,7 +1747,8 @@ func TestReconciler_Service_ClientStatusComplete(t *testing.T) { // Mark one as client status complete allocs[4].ClientStatus = structs.AllocClientStatusComplete - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Should place a new placement for the alloc that was marked complete @@ -1775,7 +1804,8 @@ func TestReconciler_Service_DesiredStop_ClientStatusComplete(t *testing.T) { allocs[4].ClientStatus = structs.AllocClientStatusFailed allocs[4].DesiredStatus = structs.AllocDesiredStatusStop - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Should place a new placement for the alloc that was marked stopped @@ -1852,7 +1882,8 @@ func TestReconciler_RescheduleNow_Service(t *testing.T) { // Mark one as desired state stop allocs[4].DesiredStatus = structs.AllocDesiredStatusStop - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Verify that no follow up evals were created @@ -1930,7 +1961,8 @@ func TestReconciler_RescheduleNow_WithinAllowedTimeWindow(t *testing.T) { FinishedAt: now.Add(-4 * time.Second)}} allocs[1].ClientStatus = structs.AllocClientStatusFailed - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) reconciler.now = now r := reconciler.Compute() @@ -2011,7 +2043,8 @@ func TestReconciler_RescheduleNow_EvalIDMatch(t *testing.T) { allocs[1].ClientStatus = structs.AllocClientStatusFailed allocs[1].FollowupEvalID = evalID - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, evalID) + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, evalID, 50) reconciler.now = now.Add(-30 * time.Second) r := reconciler.Compute() @@ -2065,7 +2098,7 @@ func TestReconciler_RescheduleNow_Service_WithCanaries(t *testing.T) { job2 := job.Copy() job2.Version++ - d := structs.NewDeployment(job2) + d := structs.NewDeployment(job2, 50) d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion s := &structs.DeploymentState{ DesiredCanaries: 2, @@ -2120,7 +2153,8 @@ func TestReconciler_RescheduleNow_Service_WithCanaries(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job2, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job2, + d, allocs, nil, "", 50) r := reconciler.Compute() // Verify that no follow up evals were created @@ -2171,7 +2205,7 @@ func TestReconciler_RescheduleNow_Service_Canaries(t *testing.T) { job2 := job.Copy() job2.Version++ - d := structs.NewDeployment(job2) + d := structs.NewDeployment(job2, 50) d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion s := &structs.DeploymentState{ DesiredCanaries: 2, @@ -2243,7 +2277,8 @@ func TestReconciler_RescheduleNow_Service_Canaries(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job2, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job2, + d, allocs, nil, "", 50) reconciler.now = now r := reconciler.Compute() @@ -2298,7 +2333,7 @@ func TestReconciler_RescheduleNow_Service_Canaries_Limit(t *testing.T) { job2 := job.Copy() job2.Version++ - d := structs.NewDeployment(job2) + d := structs.NewDeployment(job2, 50) d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion s := &structs.DeploymentState{ DesiredCanaries: 2, @@ -2370,7 +2405,8 @@ func TestReconciler_RescheduleNow_Service_Canaries_Limit(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job2, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job2, + d, allocs, nil, "", 50) reconciler.now = now r := reconciler.Compute() @@ -2435,7 +2471,8 @@ func TestReconciler_DontReschedule_PreviouslyRescheduled(t *testing.T) { // Mark one as desired state stop allocs[4].DesiredStatus = structs.AllocDesiredStatusStop - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Should place 1 - one is a new placement to make up the desired count of 5 @@ -2463,8 +2500,8 @@ func TestReconciler_CancelDeployment_JobStop(t *testing.T) { job := mock.Job() job.Stop = true - running := structs.NewDeployment(job) - failed := structs.NewDeployment(job) + running := structs.NewDeployment(job, 50) + failed := structs.NewDeployment(job, 50) failed.Status = structs.DeploymentStatusFailed cases := []struct { @@ -2522,7 +2559,8 @@ func TestReconciler_CancelDeployment_JobStop(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, c.jobID, c.job, c.deployment, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, c.jobID, c.job, + c.deployment, allocs, nil, "", 50) r := reconciler.Compute() var updates []*structs.DeploymentStatusUpdate @@ -2561,8 +2599,8 @@ func TestReconciler_CancelDeployment_JobUpdate(t *testing.T) { job := mock.Job() // Create two deployments - running := structs.NewDeployment(job) - failed := structs.NewDeployment(job) + running := structs.NewDeployment(job, 50) + failed := structs.NewDeployment(job, 50) failed.Status = structs.DeploymentStatusFailed // Make the job newer than the deployment @@ -2599,7 +2637,8 @@ func TestReconciler_CancelDeployment_JobUpdate(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, c.deployment, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + c.deployment, allocs, nil, "", 50) r := reconciler.Compute() var updates []*structs.DeploymentStatusUpdate @@ -2648,10 +2687,11 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Destructive(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ DesiredTotal: 10, } @@ -2691,10 +2731,11 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Inplace(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnInplace, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ DesiredTotal: 10, } @@ -2733,10 +2774,11 @@ func TestReconciler_CreateDeployment_NewerCreateIndex(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ DesiredTotal: 5, } @@ -2777,7 +2819,8 @@ func TestReconciler_DontCreateDeployment_NoChanges(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -2822,7 +2865,7 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreCanaries(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { // Create a deployment that is paused/failed and has placed some canaries - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.Status = c.deploymentStatus d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: false, @@ -2855,7 +2898,8 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreCanaries(t *testing.T) { d.TaskGroups[canary.TaskGroup].PlacedCanaries = []string{canary.ID} mockUpdateFn := allocUpdateFnMock(map[string]allocUpdateType{canary.ID: allocUpdateFnIgnore}, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -2900,7 +2944,7 @@ func TestReconciler_PausedOrFailedDeployment_NoMorePlacements(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { // Create a deployment that is paused and has placed some canaries - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.Status = c.deploymentStatus d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: false, @@ -2920,7 +2964,8 @@ func TestReconciler_PausedOrFailedDeployment_NoMorePlacements(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -2963,7 +3008,7 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreDestructiveUpdates(t *testing for _, c := range cases { t.Run(c.name, func(t *testing.T) { // Create a deployment that is paused and has placed some canaries - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.Status = c.deploymentStatus d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: false, @@ -2994,7 +3039,8 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreDestructiveUpdates(t *testing allocs = append(allocs, newAlloc) mockUpdateFn := allocUpdateFnMock(map[string]allocUpdateType{newAlloc.ID: allocUpdateFnIgnore}, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -3020,7 +3066,7 @@ func TestReconciler_DrainNode_Canary(t *testing.T) { job.TaskGroups[0].Update = canaryUpdate // Create a deployment that is paused and has placed some canaries - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) s := &structs.DeploymentState{ Promoted: false, DesiredTotal: 10, @@ -3065,7 +3111,8 @@ func TestReconciler_DrainNode_Canary(t *testing.T) { tainted[n.ID] = n mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, + d, allocs, tainted, "", 50) r := reconciler.Compute() // Assert the correct results @@ -3092,7 +3139,7 @@ func TestReconciler_LostNode_Canary(t *testing.T) { job.TaskGroups[0].Update = canaryUpdate // Create a deployment that is paused and has placed some canaries - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) s := &structs.DeploymentState{ Promoted: false, DesiredTotal: 10, @@ -3137,7 +3184,8 @@ func TestReconciler_LostNode_Canary(t *testing.T) { tainted[n.ID] = n mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, + d, allocs, tainted, "", 50) r := reconciler.Compute() // Assert the correct results @@ -3165,7 +3213,7 @@ func TestReconciler_StopOldCanaries(t *testing.T) { job.TaskGroups[0].Update = canaryUpdate // Create an old deployment that has placed some canaries - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) s := &structs.DeploymentState{ Promoted: false, DesiredTotal: 10, @@ -3203,10 +3251,11 @@ func TestReconciler_StopOldCanaries(t *testing.T) { allocs = append(allocs, canary) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, d, + allocs, nil, "", 50) r := reconciler.Compute() - newD := structs.NewDeployment(job) + newD := structs.NewDeployment(job, 50) newD.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion newD.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ DesiredCanaries: 2, @@ -3256,10 +3305,11 @@ func TestReconciler_NewCanaries(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() - newD := structs.NewDeployment(job) + newD := structs.NewDeployment(job, 50) newD.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion newD.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ DesiredCanaries: 2, @@ -3304,10 +3354,11 @@ func TestReconciler_NewCanaries_CountGreater(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() - newD := structs.NewDeployment(job) + newD := structs.NewDeployment(job, 50) newD.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion state := &structs.DeploymentState{ DesiredCanaries: 7, @@ -3355,10 +3406,11 @@ func TestReconciler_NewCanaries_MultiTG(t *testing.T) { } } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() - newD := structs.NewDeployment(job) + newD := structs.NewDeployment(job, 50) newD.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion state := &structs.DeploymentState{ DesiredCanaries: 2, @@ -3408,10 +3460,11 @@ func TestReconciler_NewCanaries_ScaleUp(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() - newD := structs.NewDeployment(job) + newD := structs.NewDeployment(job, 50) newD.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion newD.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ DesiredCanaries: 2, @@ -3456,10 +3509,11 @@ func TestReconciler_NewCanaries_ScaleDown(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() - newD := structs.NewDeployment(job) + newD := structs.NewDeployment(job, 50) newD.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion newD.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ DesiredCanaries: 2, @@ -3498,7 +3552,7 @@ func TestReconciler_NewCanaries_FillNames(t *testing.T) { } // Create an existing deployment that has placed some canaries - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) s := &structs.DeploymentState{ Promoted: false, DesiredTotal: 10, @@ -3533,7 +3587,8 @@ func TestReconciler_NewCanaries_FillNames(t *testing.T) { allocs = append(allocs, canary) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -3561,7 +3616,7 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) { // Create an existing deployment that has placed some canaries and mark them // promoted - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) s := &structs.DeploymentState{ Promoted: true, DesiredTotal: 10, @@ -3602,7 +3657,8 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -3634,7 +3690,7 @@ func TestReconciler_PromoteCanaries_CanariesEqualCount(t *testing.T) { // Create an existing deployment that has placed some canaries and mark them // promoted - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) s := &structs.DeploymentState{ Promoted: true, DesiredTotal: 2, @@ -3676,7 +3732,8 @@ func TestReconciler_PromoteCanaries_CanariesEqualCount(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() updates := []*structs.DeploymentStatusUpdate{ @@ -3736,7 +3793,7 @@ func TestReconciler_DeploymentLimit_HealthAccounting(t *testing.T) { t.Run(fmt.Sprintf("%d healthy", c.healthy), func(t *testing.T) { // Create an existing deployment that has placed some canaries and mark them // promoted - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: true, DesiredTotal: 10, @@ -3775,7 +3832,8 @@ func TestReconciler_DeploymentLimit_HealthAccounting(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -3805,7 +3863,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { job.TaskGroups[0].Update = noCanaryUpdate // Create an existing deployment that has some placed allocs - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: true, DesiredTotal: 10, @@ -3856,7 +3914,8 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, + d, allocs, tainted, "", 50) r := reconciler.Compute() // Assert the correct results @@ -3889,7 +3948,7 @@ func TestReconciler_FailedDeployment_TaintedNodes(t *testing.T) { job.TaskGroups[0].Update = noCanaryUpdate // Create an existing failed deployment that has some placed allocs - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.Status = structs.DeploymentStatusFailed d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: true, @@ -3941,7 +4000,8 @@ func TestReconciler_FailedDeployment_TaintedNodes(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, + d, allocs, tainted, "", 50) r := reconciler.Compute() // Assert the correct results @@ -3971,7 +4031,7 @@ func TestReconciler_CompleteDeployment(t *testing.T) { job := mock.Job() job.TaskGroups[0].Update = canaryUpdate - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.Status = structs.DeploymentStatusSuccessful d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: true, @@ -3997,7 +4057,8 @@ func TestReconciler_CompleteDeployment(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -4022,7 +4083,7 @@ func TestReconciler_MarkDeploymentComplete_FailedAllocations(t *testing.T) { job := mock.Job() job.TaskGroups[0].Update = noCanaryUpdate - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ DesiredTotal: 10, PlacedAllocs: 20, @@ -4052,7 +4113,8 @@ func TestReconciler_MarkDeploymentComplete_FailedAllocations(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, + job, d, allocs, nil, "", 50) r := reconciler.Compute() updates := []*structs.DeploymentStatusUpdate{ @@ -4087,7 +4149,7 @@ func TestReconciler_FailedDeployment_CancelCanaries(t *testing.T) { job.TaskGroups[1].Name = "two" // Create an existing failed deployment that has promoted one task group - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.Status = structs.DeploymentStatusFailed s0 := &structs.DeploymentState{ Promoted: true, @@ -4147,7 +4209,8 @@ func TestReconciler_FailedDeployment_CancelCanaries(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -4177,7 +4240,7 @@ func TestReconciler_FailedDeployment_NewJob(t *testing.T) { job.TaskGroups[0].Update = noCanaryUpdate // Create an existing failed deployment that has some placed allocs - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.Status = structs.DeploymentStatusFailed d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: true, @@ -4216,10 +4279,11 @@ func TestReconciler_FailedDeployment_NewJob(t *testing.T) { jobNew := job.Copy() jobNew.Version += 100 - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, jobNew, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, jobNew, + d, allocs, nil, "", 50) r := reconciler.Compute() - dnew := structs.NewDeployment(jobNew) + dnew := structs.NewDeployment(jobNew, 50) dnew.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ DesiredTotal: 10, } @@ -4245,7 +4309,7 @@ func TestReconciler_MarkDeploymentComplete(t *testing.T) { job := mock.Job() job.TaskGroups[0].Update = noCanaryUpdate - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: true, DesiredTotal: 10, @@ -4269,7 +4333,8 @@ func TestReconciler_MarkDeploymentComplete(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() updates := []*structs.DeploymentStatusUpdate{ @@ -4304,7 +4369,7 @@ func TestReconciler_JobChange_ScaleUp_SecondEval(t *testing.T) { job.TaskGroups[0].Count = 30 // Create a deployment that is paused and has placed some canaries - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: false, DesiredTotal: 30, @@ -4338,7 +4403,8 @@ func TestReconciler_JobChange_ScaleUp_SecondEval(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), mockUpdateFn, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -4373,10 +4439,11 @@ func TestReconciler_RollingUpgrade_MissingAllocs(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ DesiredTotal: 10, } @@ -4425,7 +4492,8 @@ func TestReconciler_Batch_Rerun(t *testing.T) { job2 := job.Copy() job2.CreateIndex++ - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job2.ID, job2, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job2.ID, job2, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Assert the correct results @@ -4454,7 +4522,7 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) { tgName := job.TaskGroups[0].Name now := time.Now() // Create an existing failed deployment that has some placed allocs - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.Status = structs.DeploymentStatusFailed d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: true, @@ -4486,7 +4554,8 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) { StartedAt: now.Add(-1 * time.Hour), FinishedAt: now.Add(-10 * time.Second)}} - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert that no rescheduled placements were created @@ -4511,7 +4580,7 @@ func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) { now := time.Now() // Mock deployment with failed allocs, but deployment watcher hasn't marked it as failed yet - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.Status = structs.DeploymentStatusRunning d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: false, @@ -4541,7 +4610,8 @@ func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) { allocs[i].DesiredTransition.Reschedule = helper.BoolToPtr(true) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert that no rescheduled placements were created @@ -4584,7 +4654,7 @@ func TestReconciler_FailedDeployment_AutoRevert_CancelCanaries(t *testing.T) { jobv2.Version = 2 jobv2.TaskGroups[0].Meta = map[string]string{"version": "2"} - d := structs.NewDeployment(jobv2) + d := structs.NewDeployment(jobv2, 50) state := &structs.DeploymentState{ Promoted: true, DesiredTotal: 3, @@ -4626,7 +4696,8 @@ func TestReconciler_FailedDeployment_AutoRevert_CancelCanaries(t *testing.T) { allocs = append(allocs, new) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, jobv2, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, jobv2, + d, allocs, nil, "", 50) r := reconciler.Compute() updates := []*structs.DeploymentStatusUpdate{ @@ -4663,7 +4734,7 @@ func TestReconciler_SuccessfulDeploymentWithFailedAllocs_Reschedule(t *testing.T now := time.Now() // Mock deployment with failed allocs, but deployment watcher hasn't marked it as failed yet - d := structs.NewDeployment(job) + d := structs.NewDeployment(job, 50) d.Status = structs.DeploymentStatusSuccessful d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: false, @@ -4688,7 +4759,8 @@ func TestReconciler_SuccessfulDeploymentWithFailedAllocs_Reschedule(t *testing.T allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnDestructive, false, job.ID, job, + d, allocs, nil, "", 50) r := reconciler.Compute() // Assert that rescheduled placements were created @@ -4752,7 +4824,8 @@ func TestReconciler_ForceReschedule_Service(t *testing.T) { // Mark DesiredTransition ForceReschedule allocs[0].DesiredTransition = structs.DesiredTransition{ForceReschedule: helper.BoolToPtr(true)} - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Verify that no follow up evals were created @@ -4834,7 +4907,8 @@ func TestReconciler_RescheduleNot_Service(t *testing.T) { // Mark one as desired state stop allocs[4].DesiredStatus = structs.AllocDesiredStatusStop - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nil, "", 50) r := reconciler.Compute() // Verify that no follow up evals were created @@ -4919,7 +4993,8 @@ func TestReconciler_RescheduleNot_Batch(t *testing.T) { // Mark one as complete allocs[5].ClientStatus = structs.AllocClientStatusComplete - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil, "") + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, + nil, allocs, nil, "", 50) reconciler.now = now r := reconciler.Compute() From db84a1f38b2db0d9dffc44283ecf5b2a8d5b1c4a Mon Sep 17 00:00:00 2001 From: James Rasell Date: Thu, 18 Nov 2021 11:40:28 +0100 Subject: [PATCH 2/8] http: allow setting eval priority in job update and delete. The Nomad agent HTTP API has been modified to allow setting the eval priority on job update and delete. To keep consistency with the current v1 API, job update accepts this as a payload param; job delete accepts this as a query param. Any user supplied value is validated within the agent HTTP handler removing the need to pass invalid requests to the server. --- command/agent/http.go | 13 ++ command/agent/http_test.go | 47 +++++++ command/agent/job_endpoint.go | 44 +++++- command/agent/job_endpoint_test.go | 207 +++++++++++++++++++++++++++++ 4 files changed, 307 insertions(+), 4 deletions(-) diff --git a/command/agent/http.go b/command/agent/http.go index 4a93ae74f76..7124737fe14 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -687,6 +687,19 @@ func parseBool(req *http.Request, field string) (*bool, error) { return nil, nil } +// parseInt parses a query parameter to a int or returns (nil, nil) if the +// parameter is not present. +func parseInt(req *http.Request, field string) (*int, error) { + if str := req.URL.Query().Get(field); str != "" { + param, err := strconv.Atoi(str) + if err != nil { + return nil, fmt.Errorf("Failed to parse value of %q (%v) as a int: %v", field, str, err) + } + return ¶m, nil + } + return nil, nil +} + // parseToken is used to parse the X-Nomad-Token param func (s *HTTPServer) parseToken(req *http.Request, token *string) { if other := req.Header.Get("X-Nomad-Token"); other != "" { diff --git a/command/agent/http_test.go b/command/agent/http_test.go index fd8c8f539a0..3871ac1a986 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -574,6 +574,53 @@ func TestParseBool(t *testing.T) { } } +func Test_parseInt(t *testing.T) { + t.Parallel() + + cases := []struct { + Input string + Expected *int + Err bool + }{ + { + Input: "", + Expected: nil, + }, + { + Input: "13", + Expected: helper.IntToPtr(13), + }, + { + Input: "99", + Expected: helper.IntToPtr(99), + }, + { + Input: "ten", + Err: true, + }, + } + + for i := range cases { + tc := cases[i] + t.Run("Input-"+tc.Input, func(t *testing.T) { + testURL, err := url.Parse("http://localhost/foo?eval_priority=" + tc.Input) + require.NoError(t, err) + req := &http.Request{ + URL: testURL, + } + + result, err := parseInt(req, "eval_priority") + if tc.Err { + require.Error(t, err) + require.Nil(t, result) + } else { + require.NoError(t, err) + require.Equal(t, tc.Expected, result) + } + }) + } +} + func TestParsePagination(t *testing.T) { t.Parallel() s := makeHTTPServer(t, nil) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 512331e4ab0..8a9da76feba 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -390,6 +390,15 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, } } + // Validate the evaluation priority if the user supplied a non-default + // value. It's more efficient to do it here, within the agent rather than + // sending a bad request for the server to reject. + if args.EvalPriority != 0 { + if err := validateEvalPriorityOpt(args.EvalPriority); err != nil { + return nil, err + } + } + sJob, writeReq := s.apiJobAndRequestToStructs(args.Job, req, args.WriteRequest) regReq := structs.JobRegisterRequest{ Job: sJob, @@ -397,6 +406,7 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, JobModifyIndex: args.JobModifyIndex, PolicyOverride: args.PolicyOverride, PreserveCounts: args.PreserveCounts, + EvalPriority: args.EvalPriority, WriteRequest: *writeReq, } @@ -411,6 +421,9 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, jobName string) (interface{}, error) { + args := structs.JobDeregisterRequest{JobID: jobName} + + // Identify the purge query param and parse. purgeStr := req.URL.Query().Get("purge") var purgeBool bool if purgeStr != "" { @@ -420,7 +433,9 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "purge", purgeStr, err) } } + args.Purge = purgeBool + // Identify the global query param and parse. globalStr := req.URL.Query().Get("global") var globalBool bool if globalStr != "" { @@ -430,12 +445,24 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "global", globalStr, err) } } + args.Global = globalBool - args := structs.JobDeregisterRequest{ - JobID: jobName, - Purge: purgeBool, - Global: globalBool, + // Parse the eval priority from the request URL query if present. + evalPriority, err := parseInt(req, "eval_priority") + if err != nil { + return nil, err } + + // Validate the evaluation priority if the user supplied a non-default + // value. It's more efficient to do it here, within the agent rather than + // sending a bad request for the server to reject. + if evalPriority != nil && *evalPriority > 0 { + if err := validateEvalPriorityOpt(*evalPriority); err != nil { + return nil, err + } + args.EvalPriority = *evalPriority + } + s.parseWriteRequest(req, &args.WriteRequest) var out structs.JobDeregisterResponse @@ -1661,3 +1688,12 @@ func ApiSpreadToStructs(a1 *api.Spread) *structs.Spread { } return ret } + +// validateEvalPriorityOpt ensures the supplied evaluation priority override +// value is within acceptable bounds. +func validateEvalPriorityOpt(priority int) HTTPCodedError { + if priority < 1 || priority > 100 { + return CodedError(400, "Eval priority must be between 1 and 100 inclusively") + } + return nil +} diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 470c488f07c..841090014b2 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -4,6 +4,7 @@ import ( "net/http" "net/http/httptest" "reflect" + "strconv" "testing" "time" @@ -596,6 +597,101 @@ func TestHTTP_JobUpdate(t *testing.T) { }) } +func TestHTTP_JobUpdate_EvalPriority(t *testing.T) { + t.Parallel() + + testCases := []struct { + inputEvalPriority int + expectedError bool + name string + }{ + { + inputEvalPriority: 95, + expectedError: false, + name: "valid input eval priority", + }, + { + inputEvalPriority: 99999999999, + expectedError: true, + name: "invalid input eval priority", + }, + { + inputEvalPriority: 0, + expectedError: false, + name: "no input eval priority", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + httpTest(t, nil, func(s *TestAgent) { + // Create the job + job := MockJob() + args := api.JobRegisterRequest{ + Job: job, + WriteRequest: api.WriteRequest{ + Region: "global", + Namespace: api.DefaultNamespace, + }, + } + + // Add our eval priority query param if set. + if tc.inputEvalPriority > 0 { + args.EvalPriority = tc.inputEvalPriority + } + buf := encodeReq(args) + + // Make the HTTP request + req, err := http.NewRequest("PUT", "/v1/job/"+*job.ID, buf) + assert.Nil(t, err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.JobSpecificRequest(respW, req) + if tc.expectedError { + assert.NotNil(t, err) + return + } else { + assert.Nil(t, err) + } + + // Check the response + regResp := obj.(structs.JobRegisterResponse) + assert.NotEmpty(t, regResp.EvalID) + assert.NotEmpty(t, respW.Result().Header.Get("X-Nomad-Index")) + + // Check the job is registered + getReq := structs.JobSpecificRequest{ + JobID: *job.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var getResp structs.SingleJobResponse + assert.Nil(t, s.Agent.RPC("Job.GetJob", &getReq, &getResp)) + assert.NotNil(t, getResp.Job) + + // Check the evaluation that resulted from the job register. + evalInfoReq, err := http.NewRequest("GET", "/v1/evaluation/"+regResp.EvalID, nil) + assert.Nil(t, err) + respW.Flush() + + evalRaw, err := s.Server.EvalSpecificRequest(respW, evalInfoReq) + assert.Nil(t, err) + evalRespObj := evalRaw.(*structs.Evaluation) + + if tc.inputEvalPriority > 0 { + assert.Equal(t, tc.inputEvalPriority, evalRespObj.Priority) + } else { + assert.Equal(t, *job.Priority, evalRespObj.Priority) + } + }) + }) + } +} + func TestHTTP_JobUpdateRegion(t *testing.T) { t.Parallel() @@ -797,6 +893,117 @@ func TestHTTP_JobDelete(t *testing.T) { }) } +func TestHTTP_JobDelete_EvalPriority(t *testing.T) { + t.Parallel() + + testCases := []struct { + inputEvalPriority int + expectedError bool + name string + }{ + { + inputEvalPriority: 95, + expectedError: false, + name: "valid input eval priority", + }, + { + inputEvalPriority: 99999999999, + expectedError: true, + name: "invalid input eval priority", + }, + { + inputEvalPriority: 0, + expectedError: false, + name: "no input eval priority", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + httpTest(t, nil, func(s *TestAgent) { + // Create the job + job := MockJob() + args := api.JobRegisterRequest{ + Job: job, + WriteRequest: api.WriteRequest{ + Region: "global", + Namespace: api.DefaultNamespace, + }, + } + buf := encodeReq(args) + + // Make the HTTP request + regReq, err := http.NewRequest("PUT", "/v1/job/"+*job.ID, buf) + assert.Nil(t, err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.JobSpecificRequest(respW, regReq) + assert.Nil(t, err) + + // Check the response + regResp := obj.(structs.JobRegisterResponse) + assert.NotEmpty(t, regResp.EvalID) + assert.NotEmpty(t, respW.Result().Header.Get("X-Nomad-Index")) + + // Check the job is registered + getReq := structs.JobSpecificRequest{ + JobID: *job.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var getResp structs.SingleJobResponse + assert.Nil(t, s.Agent.RPC("Job.GetJob", &getReq, &getResp)) + assert.NotNil(t, getResp.Job) + + // Delete the job. + deleteReq, err := http.NewRequest("DELETE", "/v1/job/"+*job.ID+"?purge=true", nil) + assert.Nil(t, err) + respW.Flush() + + // Add our eval priority query param if set. + if tc.inputEvalPriority > 0 { + q := deleteReq.URL.Query() + q.Add("eval_priority", strconv.Itoa(tc.inputEvalPriority)) + deleteReq.URL.RawQuery = q.Encode() + } + + // Make the request + obj, err = s.Server.JobSpecificRequest(respW, deleteReq) + if tc.expectedError { + assert.NotNil(t, err) + return + } else { + assert.Nil(t, err) + } + + // Check the response + dereg := obj.(structs.JobDeregisterResponse) + assert.NotEmpty(t, dereg.EvalID) + assert.NotEmpty(t, respW.Result().Header.Get("X-Nomad-Index")) + + // Check the evaluation that resulted from the job register. + evalInfoReq, err := http.NewRequest("GET", "/v1/evaluation/"+dereg.EvalID, nil) + assert.Nil(t, err) + respW.Flush() + + evalRaw, err := s.Server.EvalSpecificRequest(respW, evalInfoReq) + assert.Nil(t, err) + evalRespObj := evalRaw.(*structs.Evaluation) + + if tc.inputEvalPriority > 0 { + assert.Equal(t, tc.inputEvalPriority, evalRespObj.Priority) + } else { + assert.Equal(t, *job.Priority, evalRespObj.Priority) + } + }) + }) + } +} + func TestHTTP_Job_ScaleTaskGroup(t *testing.T) { t.Parallel() From b0dd99dccbe463fc343a90278f0c0a33e0d725c1 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Thu, 18 Nov 2021 11:40:48 +0100 Subject: [PATCH 3/8] api: allow setting eval priority on job update and delete. The register and deregister opts functions now all for setting the eval priority on requests. The change includes a small change to the DeregisterOpts function which handles nil opts. This brings the function inline with the RegisterOpts. --- api/jobs.go | 34 ++++++++++++-- api/jobs_test.go | 116 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 4 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 46a3fff7491..52240e5b68a 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -91,6 +91,7 @@ type RegisterOptions struct { ModifyIndex uint64 PolicyOverride bool PreserveCounts bool + EvalPriority int } // Register is used to register a new job. It returns the ID @@ -105,8 +106,8 @@ func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (* return j.RegisterOpts(job, &opts, q) } -// Register is used to register a new job. It returns the ID -// of the evaluation, along with any errors encountered. +// RegisterOpts is used to register a new job with the passed RegisterOpts. It +// returns the ID of the evaluation, along with any errors encountered. func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { // Format the request req := &JobRegisterRequest{ @@ -119,6 +120,7 @@ func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (* } req.PolicyOverride = opts.PolicyOverride req.PreserveCounts = opts.PreserveCounts + req.EvalPriority = opts.EvalPriority } var resp JobRegisterResponse @@ -290,14 +292,30 @@ type DeregisterOptions struct { // If Global is set to true, all regions of a multiregion job will be // stopped. Global bool + + // EvalPriority is an optional priority to use on any evaluation created as + // a result on this job deregistration. This value must be between 1-100 + // inclusively, where a larger value corresponds to a higher priority. This + // is useful when an operator wishes to push through a job deregistration + // in busy clusters with a large evaluation backlog. + EvalPriority int } // DeregisterOpts is used to remove an existing job. See DeregisterOptions // for parameters. func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOptions) (string, *WriteMeta, error) { var resp JobDeregisterResponse - wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t&global=%t", - url.PathEscape(jobID), opts.Purge, opts.Global), &resp, q) + + // The base endpoint to add query params to. + endpoint := "/v1/job/" + url.PathEscape(jobID) + + // Protect against nil opts. + if opts != nil { + endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v", + opts.Purge, opts.Global, opts.EvalPriority) + } + + wm, err := j.client.delete(endpoint, &resp, q) if err != nil { return "", nil, err } @@ -1170,6 +1188,14 @@ type JobRegisterRequest struct { PolicyOverride bool `json:",omitempty"` PreserveCounts bool `json:",omitempty"` + // EvalPriority is an optional priority to use on any evaluation created as + // a result on this job registration. This value must be between 1-100 + // inclusively, where a larger value corresponds to a higher priority. This + // is useful when an operator wishes to push through a job registration in + // busy clusters with a large evaluation backlog. This avoids needing to + // change the job priority which also impacts preemption. + EvalPriority int `json:",omitempty"` + WriteRequest } diff --git a/api/jobs_test.go b/api/jobs_test.go index 56ecaa20fbe..f21f7cab248 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -187,6 +187,60 @@ func TestJobs_Register_NoPreserveCounts(t *testing.T) { require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specified } +func TestJobs_Register_EvalPriority(t *testing.T) { + t.Parallel() + requireAssert := require.New(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + + // Listing jobs before registering returns nothing + listResp, _, err := c.Jobs().List(nil) + requireAssert.Nil(err) + requireAssert.Len(listResp, 0) + + // Create a job and register it with an eval priority. + job := testJob() + registerResp, wm, err := c.Jobs().RegisterOpts(job, &RegisterOptions{EvalPriority: 99}, nil) + requireAssert.Nil(err) + requireAssert.NotNil(registerResp) + requireAssert.NotEmpty(registerResp.EvalID) + assertWriteMeta(t, wm) + + // Check the created job evaluation has a priority that matches our desired + // value. + evalInfo, _, err := c.Evaluations().Info(registerResp.EvalID, nil) + requireAssert.NoError(err) + requireAssert.Equal(99, evalInfo.Priority) +} + +func TestJobs_Register_NoEvalPriority(t *testing.T) { + t.Parallel() + requireAssert := require.New(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + + // Listing jobs before registering returns nothing + listResp, _, err := c.Jobs().List(nil) + requireAssert.Nil(err) + requireAssert.Len(listResp, 0) + + // Create a job and register it with an eval priority. + job := testJob() + registerResp, wm, err := c.Jobs().RegisterOpts(job, nil, nil) + requireAssert.Nil(err) + requireAssert.NotNil(registerResp) + requireAssert.NotEmpty(registerResp.EvalID) + assertWriteMeta(t, wm) + + // Check the created job evaluation has a priority that matches the job + // priority. + evalInfo, _, err := c.Evaluations().Info(registerResp.EvalID, nil) + requireAssert.NoError(err) + requireAssert.Equal(*job.Priority, evalInfo.Priority) +} + func TestJobs_Validate(t *testing.T) { t.Parallel() c, s := makeClient(t, nil, nil) @@ -1628,6 +1682,68 @@ func TestJobs_Deregister(t *testing.T) { } } +func TestJobs_Deregister_EvalPriority(t *testing.T) { + t.Parallel() + requireAssert := require.New(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + + // Listing jobs before registering returns nothing + listResp, _, err := c.Jobs().List(nil) + requireAssert.Nil(err) + requireAssert.Len(listResp, 0) + + // Create a job and register it. + job := testJob() + registerResp, wm, err := c.Jobs().Register(job, nil) + requireAssert.Nil(err) + requireAssert.NotNil(registerResp) + requireAssert.NotEmpty(registerResp.EvalID) + assertWriteMeta(t, wm) + + // Deregister the job with an eval priority. + evalID, _, err := c.Jobs().DeregisterOpts(*job.ID, &DeregisterOptions{EvalPriority: 97}, nil) + requireAssert.NoError(err) + requireAssert.NotEmpty(t, evalID) + + // Lookup the eval and check the priority on it. + evalInfo, _, err := c.Evaluations().Info(evalID, nil) + requireAssert.NoError(err) + requireAssert.Equal(97, evalInfo.Priority) +} + +func TestJobs_Deregister_NoEvalPriority(t *testing.T) { + t.Parallel() + requireAssert := require.New(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + + // Listing jobs before registering returns nothing + listResp, _, err := c.Jobs().List(nil) + requireAssert.Nil(err) + requireAssert.Len(listResp, 0) + + // Create a job and register it. + job := testJob() + registerResp, wm, err := c.Jobs().Register(job, nil) + requireAssert.Nil(err) + requireAssert.NotNil(registerResp) + requireAssert.NotEmpty(registerResp.EvalID) + assertWriteMeta(t, wm) + + // Deregister the job with an eval priority. + evalID, _, err := c.Jobs().DeregisterOpts(*job.ID, &DeregisterOptions{}, nil) + requireAssert.NoError(err) + requireAssert.NotEmpty(t, evalID) + + // Lookup the eval and check the priority on it. + evalInfo, _, err := c.Evaluations().Info(evalID, nil) + requireAssert.NoError(err) + requireAssert.Equal(*job.Priority, evalInfo.Priority) +} + func TestJobs_ForceEvaluate(t *testing.T) { t.Parallel() c, s := makeClient(t, nil, nil) From 6463de5cc969368e9866b19bb53db4d2ca4a06ea Mon Sep 17 00:00:00 2001 From: James Rasell Date: Thu, 18 Nov 2021 11:42:14 +0100 Subject: [PATCH 4/8] cli: add eval-priority flag to job run and job stop. --- command/job_run.go | 15 ++++++++++++--- command/job_stop.go | 19 +++++++++++++------ 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/command/job_run.go b/command/job_run.go index a2ef4cb40ca..ab1fde96204 100644 --- a/command/job_run.go +++ b/command/job_run.go @@ -86,6 +86,10 @@ Run Options: the evaluation ID will be printed to the screen, which can be used to examine the evaluation using the eval-status command. + -eval-priority + Override the priority of the evaluations produced as a result of this job + submission. By default, this is set to the priority of the job. + -hcl1 Parses the job file as HCLv1. @@ -152,6 +156,7 @@ func (c *JobRunCommand) AutocompleteFlags() complete.Flags { "-hcl2-strict": complete.PredictNothing, "-var": complete.PredictAnything, "-var-file": complete.PredictFiles("*.var"), + "-eval-priority": complete.PredictNothing, }) } @@ -165,6 +170,7 @@ func (c *JobRunCommand) Run(args []string) int { var detach, verbose, output, override, preserveCounts, hcl2Strict bool var checkIndexStr, consulToken, consulNamespace, vaultToken, vaultNamespace string var varArgs, varFiles flaghelper.StringFlag + var evalPriority int flagSet := c.Meta.FlagSet(c.Name(), FlagSetClient) flagSet.Usage = func() { c.Ui.Output(c.Help()) } @@ -182,6 +188,7 @@ func (c *JobRunCommand) Run(args []string) int { flagSet.StringVar(&vaultNamespace, "vault-namespace", "", "") flagSet.Var(&varArgs, "var", "") flagSet.Var(&varFiles, "var-file", "") + flagSet.IntVar(&evalPriority, "eval-priority", 0, "") if err := flagSet.Parse(args); err != nil { return 1 @@ -282,13 +289,15 @@ func (c *JobRunCommand) Run(args []string) int { } // Set the register options - opts := &api.RegisterOptions{} + opts := &api.RegisterOptions{ + PolicyOverride: override, + PreserveCounts: preserveCounts, + EvalPriority: evalPriority, + } if enforce { opts.EnforceIndex = true opts.ModifyIndex = checkIndex } - opts.PolicyOverride = override - opts.PreserveCounts = preserveCounts // Submit the job resp, _, err := client.Jobs().RegisterOpts(job, opts, nil) diff --git a/command/job_stop.go b/command/job_stop.go index 06aff8087a2..8dd5d8a1197 100644 --- a/command/job_stop.go +++ b/command/job_stop.go @@ -39,6 +39,10 @@ Stop Options: screen, which can be used to examine the evaluation using the eval-status command. + -eval-priority + Override the priority of the evaluations produced as a result of this job + deregistration. By default, this is set to the priority of the job. + -purge Purge is used to stop the job and purge it from the system. If not set, the job will still be queryable and will be purged by the garbage collector. @@ -63,11 +67,12 @@ func (c *JobStopCommand) Synopsis() string { func (c *JobStopCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ - "-detach": complete.PredictNothing, - "-purge": complete.PredictNothing, - "-global": complete.PredictNothing, - "-yes": complete.PredictNothing, - "-verbose": complete.PredictNothing, + "-detach": complete.PredictNothing, + "-eval-priority": complete.PredictNothing, + "-purge": complete.PredictNothing, + "-global": complete.PredictNothing, + "-yes": complete.PredictNothing, + "-verbose": complete.PredictNothing, }) } @@ -90,6 +95,7 @@ func (c *JobStopCommand) Name() string { return "job stop" } func (c *JobStopCommand) Run(args []string) int { var detach, purge, verbose, global, autoYes bool + var evalPriority int flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } @@ -98,6 +104,7 @@ func (c *JobStopCommand) Run(args []string) int { flags.BoolVar(&global, "global", false, "") flags.BoolVar(&autoYes, "yes", false, "") flags.BoolVar(&purge, "purge", false, "") + flags.IntVar(&evalPriority, "eval-priority", 0, "") if err := flags.Parse(args); err != nil { return 1 @@ -192,7 +199,7 @@ func (c *JobStopCommand) Run(args []string) int { } // Invoke the stop - opts := &api.DeregisterOptions{Purge: purge, Global: global} + opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority} wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace} evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq) if err != nil { From c0989ada76c6dae906194059489f3c78f20f428d Mon Sep 17 00:00:00 2001 From: James Rasell Date: Thu, 18 Nov 2021 11:43:07 +0100 Subject: [PATCH 5/8] e2e: add EvalPriority test suite to test setting eval priorities. --- e2e/e2e_test.go | 2 +- e2e/e2eutil/job.go | 19 +- e2e/eval_priority/eval_priority.go | 214 ++++++++++++++++++ .../inputs/default_job_priority.nomad | 17 ++ 4 files changed, 250 insertions(+), 2 deletions(-) create mode 100644 e2e/eval_priority/eval_priority.go create mode 100644 e2e/eval_priority/inputs/default_job_priority.nomad diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index b3c1f27ee8c..332ece3eac0 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -10,12 +10,12 @@ import ( _ "github.com/hashicorp/nomad/e2e/affinities" _ "github.com/hashicorp/nomad/e2e/clientstate" - _ "github.com/hashicorp/nomad/e2e/connect" _ "github.com/hashicorp/nomad/e2e/consul" _ "github.com/hashicorp/nomad/e2e/consultemplate" _ "github.com/hashicorp/nomad/e2e/csi" _ "github.com/hashicorp/nomad/e2e/deployment" + _ "github.com/hashicorp/nomad/e2e/eval_priority" _ "github.com/hashicorp/nomad/e2e/events" _ "github.com/hashicorp/nomad/e2e/example" _ "github.com/hashicorp/nomad/e2e/isolation" diff --git a/e2e/e2eutil/job.go b/e2e/e2eutil/job.go index 61408f4fb3c..518e725799c 100644 --- a/e2e/e2eutil/job.go +++ b/e2e/e2eutil/job.go @@ -12,7 +12,24 @@ import ( // Register registers a jobspec from a file but with a unique ID. // The caller is responsible for recording that ID for later cleanup. func Register(jobID, jobFilePath string) error { - cmd := exec.Command("nomad", "job", "run", "-detach", "-") + return register(jobID, jobFilePath, exec.Command("nomad", "job", "run", "-detach", "-")) +} + +// RegisterWithArgs registers a jobspec from a file but with a unique ID. The +// optional args are added to the run command. The caller is responsible for +// recording that ID for later cleanup. +func RegisterWithArgs(jobID, jobFilePath string, args ...string) error { + + baseArgs := []string{"job", "run", "-detach"} + for i := range args { + baseArgs = append(baseArgs, args[i]) + } + baseArgs = append(baseArgs, "-") + + return register(jobID, jobFilePath, exec.Command("nomad", baseArgs...)) +} + +func register(jobID, jobFilePath string, cmd *exec.Cmd) error { stdin, err := cmd.StdinPipe() if err != nil { return fmt.Errorf("could not open stdin?: %w", err) diff --git a/e2e/eval_priority/eval_priority.go b/e2e/eval_priority/eval_priority.go new file mode 100644 index 00000000000..abf6bf03d8b --- /dev/null +++ b/e2e/eval_priority/eval_priority.go @@ -0,0 +1,214 @@ +package eval_priority + +import ( + "fmt" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/helper/uuid" +) + +type EvalPriorityTest struct { + framework.TC + jobIDs []string +} + +func init() { + framework.AddSuites(&framework.TestSuite{ + Component: "EvalPriority", + CanRunLocal: true, + Cases: []framework.TestCase{ + new(EvalPriorityTest), + }, + }) +} + +func (tc *EvalPriorityTest) BeforeAll(f *framework.F) { + e2eutil.WaitForLeader(f.T(), tc.Nomad()) + e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), 1) +} + +func (tc *EvalPriorityTest) AfterEach(f *framework.F) { + for _, id := range tc.jobIDs { + _, _, err := tc.Nomad().Jobs().Deregister(id, true, nil) + f.NoError(err) + } + tc.jobIDs = []string{} + + _, err := e2eutil.Command("nomad", "system", "gc") + f.NoError(err) +} + +// TestJobRegisterWithoutEvalPriority makes sure that when not specifying an +// eval priority on job register, the job priority is used. +func (tc *EvalPriorityTest) TestJobRegisterWithoutEvalPriority(f *framework.F) { + + // Generate a jobID and attempt to register the job using the eval + // priority. In case there is a problem found here and the job registers, + // we need to ensure it gets cleaned up. + jobID := "test-eval-priority-" + uuid.Generate()[0:8] + f.NoError(e2eutil.Register(jobID, "eval_priority/inputs/default_job_priority.nomad")) + tc.jobIDs = append(tc.jobIDs, jobID) + + // Wait for the deployment to finish. + f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful", nil)) + + // Pull the job evaluation list from the API and ensure that this didn't + // error and contains two evals. + // + // Eval 1: the job registration eval. + // Eval 2: the deployment watcher eval. + evals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) + f.NoError(err) + f.Len(evals, 2, "job expected to have one eval") + + // All evaluations should have the higher priority as they are as a result + // of an operator request. + for _, eval := range evals { + f.Equal(50, eval.Priority) + } +} + +// TestJobRegisterWithEvalPriority tests registering a job with a specified +// evaluation priority. It checks whether the created evaluation has a priority +// that matches our supplied value. +func (tc *EvalPriorityTest) TestJobRegisterWithEvalPriority(f *framework.F) { + + // Pick an eval priority. + evalPriority := 93 + + // Generate a jobID and register the job using the eval priority. + jobID := "test-eval-priority-" + uuid.Generate()[0:8] + f.NoError(e2eutil.RegisterWithArgs(jobID, + "eval_priority/inputs/default_job_priority.nomad", + fmt.Sprintf("-eval-priority=%v", evalPriority))) + tc.jobIDs = append(tc.jobIDs, jobID) + + // Wait for the deployment to finish. + f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful", nil)) + + // Pull the job evaluation list from the API and ensure that this didn't + // error and contains two evals. + // + // Eval 1: the job registration eval. + // Eval 2: the deployment watcher eval. + evals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) + f.NoError(err) + f.Len(evals, 2, "job expected to have one eval") + + // All evaluations should have the higher priority as they are as a result + // of an operator request. + for _, eval := range evals { + f.Equal(evalPriority, eval.Priority) + } +} + +// TestJobRegisterWithInvalidEvalPriority tests registering a job with a +// specified evaluation priority that is not valid. +func (tc *EvalPriorityTest) TestJobRegisterWithInvalidEvalPriority(f *framework.F) { + + // Pick an eval priority that is outside the supported bounds of 1-100. + evalPriority := 999 + + // Generate a jobID and attempt to register the job using the eval + // priority. In case there is a problem found here and the job registers, + // we need to ensure it gets cleaned up. + jobID := "test-eval-priority-" + uuid.Generate()[0:8] + f.Error(e2eutil.RegisterWithArgs(jobID, + "eval_priority/inputs/default_job_priority.nomad", + fmt.Sprintf("-eval-priority=%v", evalPriority))) +} + +// TestJobDeregisterWithoutEvalPriority makes sure that when not specifying an +// eval priority on job deregister, the job priority is used. +func (tc *EvalPriorityTest) TestJobDeregisterWithoutEvalPriority(f *framework.F) { + + // Generate a jobID and attempt to register the job using the eval + // priority. In case there is a problem found here and the job registers, + // we need to ensure it gets cleaned up. + jobID := "test-eval-priority-" + uuid.Generate()[0:8] + f.NoError(e2eutil.Register(jobID, "eval_priority/inputs/default_job_priority.nomad")) + tc.jobIDs = append(tc.jobIDs, jobID) + + // Wait for the deployment to finish. + f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful", nil)) + + // Deregister the job. + _, _, err := tc.Nomad().Jobs().Deregister(jobID, true, nil) + f.NoError(err) + + // Grab the evals from the server and ensure we actually got some. + evals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) + f.NoError(err) + f.NotZero(len(evals)) + + // Identify the evaluation which was a result of the deregsiter and check + // the priority. + var deregEval *api.Evaluation + for _, eval := range evals { + if eval.TriggeredBy == "job-deregister" { + deregEval = eval + break + } + } + f.NotNil(deregEval) + f.Equal(50, deregEval.Priority) + tc.jobIDs = []string{} +} + +// TestJobDeregisterWithEvalPriority tests deregistering a job with a specified +// evaluation priority. It checks whether the created evaluation has a priority +// that matches our supplied value. +func (tc *EvalPriorityTest) TestJobDeregisterWithEvalPriority(f *framework.F) { + + // Generate a jobID and register the job using the eval priority. + jobID := "test-eval-priority-" + uuid.Generate()[0:8] + f.NoError(e2eutil.Register(jobID, "eval_priority/inputs/default_job_priority.nomad")) + tc.jobIDs = append(tc.jobIDs, jobID) + + // Wait for the deployment to finish. + f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful", nil)) + + // Pick an eval priority. + evalPriority := 91 + + // Deregister the job. + _, _, err := tc.Nomad().Jobs().DeregisterOpts(jobID, &api.DeregisterOptions{EvalPriority: evalPriority}, nil) + f.NoError(err) + + // Grab the evals from the server and ensure we actually got some. + evals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) + f.NoError(err) + f.NotZero(len(evals)) + + // Identify the evaluation which was a result of the deregsiter and check + // the priority. + var deregEval *api.Evaluation + for _, eval := range evals { + if eval.TriggeredBy == "job-deregister" { + deregEval = eval + break + } + } + f.NotNil(deregEval) + f.Equal(evalPriority, deregEval.Priority) + tc.jobIDs = []string{} +} + +// TestJobDeregisterWithInvalidEvalPriority tests deregistering a job with a +// specified evaluation priority that is not valid. There is no need to +// register a job first as the validation is done within the agent HTTP handler +// before sending a server RPC request. +func (tc *EvalPriorityTest) TestJobDeregisterWithInvalidEvalPriority(f *framework.F) { + + // Pick an eval priority that is outside the supported bounds of 1-100. + evalPriority := 999 + + // Generate a jobID and attempt to register the job using the eval + // priority. In case there is a problem found here and the job registers, + // we need to ensure it gets cleaned up. + jobID := "test-eval-priority-" + uuid.Generate()[0:8] + _, _, err := tc.Nomad().Jobs().DeregisterOpts(jobID, &api.DeregisterOptions{EvalPriority: evalPriority}, nil) + f.Error(err, "an error was expected due to invalid eval priority") +} diff --git a/e2e/eval_priority/inputs/default_job_priority.nomad b/e2e/eval_priority/inputs/default_job_priority.nomad new file mode 100644 index 00000000000..9224889d0a2 --- /dev/null +++ b/e2e/eval_priority/inputs/default_job_priority.nomad @@ -0,0 +1,17 @@ +job "networking" { + datacenters = ["dc1", "dc2"] + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + group "bridged" { + task "sleep" { + driver = "docker" + config { + image = "busybox:1" + command = "/bin/sleep" + args = ["300"] + } + } + } +} From 75809b02e48e27d46a8615ad2b966ea792513dd5 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Thu, 18 Nov 2021 11:43:32 +0100 Subject: [PATCH 6/8] docs: update website to detail eval priority CLI and API param. --- website/content/api-docs/jobs.mdx | 12 ++++++++++++ website/content/docs/commands/job/run.mdx | 3 +++ website/content/docs/commands/job/stop.mdx | 3 +++ 3 files changed, 18 insertions(+) diff --git a/website/content/api-docs/jobs.mdx b/website/content/api-docs/jobs.mdx index 3fffaf83fe9..bc05b8a53e2 100644 --- a/website/content/api-docs/jobs.mdx +++ b/website/content/api-docs/jobs.mdx @@ -111,6 +111,10 @@ The table below shows this endpoint's support for the register only occurs if the job is new. This paradigm allows check-and-set style job updating. +- `EvalPriority` `(int: 0)` - Override the priority of the evaluations produced + as a result of this job registration. By default, this is set to the priority + of the job. + - `JobModifyIndex` `(int: 0)` - Specifies the `JobModifyIndex` to enforce the current job is at. @@ -1558,6 +1562,10 @@ The table below shows this endpoint's support for the register only occurs if the job is new. This paradigm allows check-and-set style job updating. +- `EvalPriority` `(int: 0)` - Override the priority of the evaluations produced + as a result of this job update. By default, this is set to the priority of + the job. + - `JobModifyIndex` `(int: 0)` - Specifies the `JobModifyIndex` to enforce the current job is at. @@ -2099,6 +2107,10 @@ The table below shows this endpoint's support for - `:job_id` `(string: )` - Specifies the ID of the job (as specified in the job file during submission). This is specified as part of the path. +- `eval_priority` `(int: 0)` - Override the priority of the evaluations produced + as a result of this job deregistration. By default, this is set to the priority + of the job. + - `purge` `(bool: false)` - Specifies that the job should stopped and purged immediately. This means the job will not be queryable after being stopped. If not set, the job will be purged by the garbage collector. diff --git a/website/content/docs/commands/job/run.mdx b/website/content/docs/commands/job/run.mdx index 1f032a84398..73f50f15ccd 100644 --- a/website/content/docs/commands/job/run.mdx +++ b/website/content/docs/commands/job/run.mdx @@ -70,6 +70,9 @@ that volume. will be output, which can be used to examine the evaluation using the [eval status] command. +- `-eval-priority`: Override the priority of the evaluations produced as a result + of this job submission. By default, this is set to the priority of the job. + - `-hcl1`: If set, HCL1 parser is used for parsing the job spec. - `-hcl2-strict`: Whether an error should be produced from the HCL2 parser where diff --git a/website/content/docs/commands/job/stop.mdx b/website/content/docs/commands/job/stop.mdx index 5255e838a0b..004520b978e 100644 --- a/website/content/docs/commands/job/stop.mdx +++ b/website/content/docs/commands/job/stop.mdx @@ -40,6 +40,9 @@ When ACLs are enabled, this command requires a token with the `submit-job`, deregister command is submitted, a new evaluation ID is printed to the screen, which can be used to examine the evaluation using the [eval status] command. +- `-eval-priority`: Override the priority of the evaluations produced as a result + of this job deregistration. By default, this is set to the priority of the job. + - `-verbose`: Show full information. - `-yes`: Automatic yes to prompts. From e606ca6cddec34644fc2d1e3e2e1cc6375ba3977 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 19 Nov 2021 12:16:17 +0100 Subject: [PATCH 7/8] review: e2e and deployment watcher modification from review. --- api/jobs.go | 3 +- e2e/eval_priority/eval_priority.go | 225 ++++++++---------- ...rity.nomad => thirteen_job_priority.nomad} | 5 +- nomad/deploymentwatcher/deployment_watcher.go | 12 +- 4 files changed, 117 insertions(+), 128 deletions(-) rename e2e/eval_priority/inputs/{default_job_priority.nomad => thirteen_job_priority.nomad} (75%) diff --git a/api/jobs.go b/api/jobs.go index 52240e5b68a..146f65cf713 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -309,7 +309,8 @@ func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOpt // The base endpoint to add query params to. endpoint := "/v1/job/" + url.PathEscape(jobID) - // Protect against nil opts. + // Protect against nil opts. url.Values expects a string, and so using + // fmt.Sprintf is the best way to do this. if opts != nil { endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v", opts.Purge, opts.Global, opts.EvalPriority) diff --git a/e2e/eval_priority/eval_priority.go b/e2e/eval_priority/eval_priority.go index abf6bf03d8b..73d208463a3 100644 --- a/e2e/eval_priority/eval_priority.go +++ b/e2e/eval_priority/eval_priority.go @@ -1,8 +1,6 @@ package eval_priority import ( - "fmt" - "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/e2e/e2eutil" "github.com/hashicorp/nomad/e2e/framework" @@ -40,15 +38,16 @@ func (tc *EvalPriorityTest) AfterEach(f *framework.F) { f.NoError(err) } -// TestJobRegisterWithoutEvalPriority makes sure that when not specifying an -// eval priority on job register, the job priority is used. -func (tc *EvalPriorityTest) TestJobRegisterWithoutEvalPriority(f *framework.F) { +// TestEvalPrioritySet performs a test which registers, updates, and +// deregsiters a job setting the eval priority on every call. +func (tc *EvalPriorityTest) TestEvalPrioritySet(f *framework.F) { // Generate a jobID and attempt to register the job using the eval // priority. In case there is a problem found here and the job registers, // we need to ensure it gets cleaned up. jobID := "test-eval-priority-" + uuid.Generate()[0:8] - f.NoError(e2eutil.Register(jobID, "eval_priority/inputs/default_job_priority.nomad")) + f.NoError(e2eutil.RegisterWithArgs(jobID, "eval_priority/inputs/thirteen_job_priority.nomad", + "-eval-priority=80")) tc.jobIDs = append(tc.jobIDs, jobID) // Wait for the deployment to finish. @@ -59,156 +58,132 @@ func (tc *EvalPriorityTest) TestJobRegisterWithoutEvalPriority(f *framework.F) { // // Eval 1: the job registration eval. // Eval 2: the deployment watcher eval. - evals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) + registerEvals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) f.NoError(err) - f.Len(evals, 2, "job expected to have one eval") - - // All evaluations should have the higher priority as they are as a result - // of an operator request. - for _, eval := range evals { - f.Equal(50, eval.Priority) - } -} + f.Len(registerEvals, 2, "job expected to have two evals") -// TestJobRegisterWithEvalPriority tests registering a job with a specified -// evaluation priority. It checks whether the created evaluation has a priority -// that matches our supplied value. -func (tc *EvalPriorityTest) TestJobRegisterWithEvalPriority(f *framework.F) { + // seenEvals tracks the evaluations we have tested for priority quality + // against our expected value. This allows us to easily perform multiple + // checks with confidence. + seenEvals := map[string]bool{} - // Pick an eval priority. - evalPriority := 93 - - // Generate a jobID and register the job using the eval priority. - jobID := "test-eval-priority-" + uuid.Generate()[0:8] - f.NoError(e2eutil.RegisterWithArgs(jobID, - "eval_priority/inputs/default_job_priority.nomad", - fmt.Sprintf("-eval-priority=%v", evalPriority))) - tc.jobIDs = append(tc.jobIDs, jobID) + // All evaluations should have the priority set to the overridden priority. + for _, eval := range registerEvals { + f.Equal(80, eval.Priority) + seenEvals[eval.ID] = true + } - // Wait for the deployment to finish. - f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful", nil)) + // Update the job image and set an eval priority higher than the job + // priority. + f.NoError(e2eutil.RegisterWithArgs(jobID, "eval_priority/inputs/thirteen_job_priority.nomad", + "-eval-priority=7", "-var", "image=busybox:1.34")) + f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful", + &e2eutil.WaitConfig{Retries: 200})) - // Pull the job evaluation list from the API and ensure that this didn't - // error and contains two evals. - // - // Eval 1: the job registration eval. - // Eval 2: the deployment watcher eval. - evals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) + // Pull the latest list of evaluations for the job which will include those + // as a result of the job update. + updateEvals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) f.NoError(err) - f.Len(evals, 2, "job expected to have one eval") - - // All evaluations should have the higher priority as they are as a result - // of an operator request. - for _, eval := range evals { - f.Equal(evalPriority, eval.Priority) + f.NotNil(updateEvals, "expected non-nil evaluation list response") + f.NotEmpty(updateEvals, "expected non-empty evaluation list response") + + // Iterate the evals, ignoring those we have already seen and check their + // priority is as expected. + for _, eval := range updateEvals { + if ok := seenEvals[eval.ID]; ok { + continue + } + f.Equal(7, eval.Priority) + seenEvals[eval.ID] = true } -} -// TestJobRegisterWithInvalidEvalPriority tests registering a job with a -// specified evaluation priority that is not valid. -func (tc *EvalPriorityTest) TestJobRegisterWithInvalidEvalPriority(f *framework.F) { + // Deregister the job using an increased priority. + deregOpts := api.DeregisterOptions{EvalPriority: 100, Purge: true} + deregEvalID, _, err := tc.Nomad().Jobs().DeregisterOpts(jobID, &deregOpts, nil) + f.NoError(err) + f.NotEmpty(deregEvalID, "expected non-empty evaluation ID") - // Pick an eval priority that is outside the supported bounds of 1-100. - evalPriority := 999 + // Detail the deregistration evaluation and check its priority. + evalInfo, _, err := tc.Nomad().Evaluations().Info(deregEvalID, nil) + f.NoError(err) + f.Equal(100, evalInfo.Priority) - // Generate a jobID and attempt to register the job using the eval - // priority. In case there is a problem found here and the job registers, - // we need to ensure it gets cleaned up. - jobID := "test-eval-priority-" + uuid.Generate()[0:8] - f.Error(e2eutil.RegisterWithArgs(jobID, - "eval_priority/inputs/default_job_priority.nomad", - fmt.Sprintf("-eval-priority=%v", evalPriority))) + // If the job was successfully purged, clear the test suite state. + if err == nil { + tc.jobIDs = []string{} + } } -// TestJobDeregisterWithoutEvalPriority makes sure that when not specifying an -// eval priority on job deregister, the job priority is used. -func (tc *EvalPriorityTest) TestJobDeregisterWithoutEvalPriority(f *framework.F) { +// TestEvalPriorityNotSet performs a test which registers, updates, and +// deregsiters a job never setting the eval priority. +func (tc *EvalPriorityTest) TestEvalPriorityNotSet(f *framework.F) { // Generate a jobID and attempt to register the job using the eval // priority. In case there is a problem found here and the job registers, // we need to ensure it gets cleaned up. jobID := "test-eval-priority-" + uuid.Generate()[0:8] - f.NoError(e2eutil.Register(jobID, "eval_priority/inputs/default_job_priority.nomad")) + f.NoError(e2eutil.Register(jobID, "eval_priority/inputs/thirteen_job_priority.nomad")) tc.jobIDs = append(tc.jobIDs, jobID) // Wait for the deployment to finish. f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful", nil)) - // Deregister the job. - _, _, err := tc.Nomad().Jobs().Deregister(jobID, true, nil) - f.NoError(err) - - // Grab the evals from the server and ensure we actually got some. - evals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) + // Pull the job evaluation list from the API and ensure that this didn't + // error and contains two evals. + // + // Eval 1: the job registration eval. + // Eval 2: the deployment watcher eval. + registerEvals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) f.NoError(err) - f.NotZero(len(evals)) - - // Identify the evaluation which was a result of the deregsiter and check - // the priority. - var deregEval *api.Evaluation - for _, eval := range evals { - if eval.TriggeredBy == "job-deregister" { - deregEval = eval - break - } - } - f.NotNil(deregEval) - f.Equal(50, deregEval.Priority) - tc.jobIDs = []string{} -} - -// TestJobDeregisterWithEvalPriority tests deregistering a job with a specified -// evaluation priority. It checks whether the created evaluation has a priority -// that matches our supplied value. -func (tc *EvalPriorityTest) TestJobDeregisterWithEvalPriority(f *framework.F) { - - // Generate a jobID and register the job using the eval priority. - jobID := "test-eval-priority-" + uuid.Generate()[0:8] - f.NoError(e2eutil.Register(jobID, "eval_priority/inputs/default_job_priority.nomad")) - tc.jobIDs = append(tc.jobIDs, jobID) + f.Len(registerEvals, 2, "job expected to have two evals") - // Wait for the deployment to finish. - f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful", nil)) + // seenEvals tracks the evaluations we have tested for priority quality + // against our expected value. This allows us to easily perform multiple + // checks with confidence. + seenEvals := map[string]bool{} - // Pick an eval priority. - evalPriority := 91 + // All evaluations should have the priority set to the job priority. + for _, eval := range registerEvals { + f.Equal(13, eval.Priority) + seenEvals[eval.ID] = true + } - // Deregister the job. - _, _, err := tc.Nomad().Jobs().DeregisterOpts(jobID, &api.DeregisterOptions{EvalPriority: evalPriority}, nil) - f.NoError(err) + // Update the job image without setting an eval priority. + f.NoError(e2eutil.RegisterWithArgs(jobID, "eval_priority/inputs/thirteen_job_priority.nomad", + "-var", "image=busybox:1.34")) + f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful", + &e2eutil.WaitConfig{Retries: 200})) - // Grab the evals from the server and ensure we actually got some. - evals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) + // Pull the latest list of evaluations for the job which will include those + // as a result of the job update. + updateEvals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil) f.NoError(err) - f.NotZero(len(evals)) - - // Identify the evaluation which was a result of the deregsiter and check - // the priority. - var deregEval *api.Evaluation - for _, eval := range evals { - if eval.TriggeredBy == "job-deregister" { - deregEval = eval - break + f.NotNil(updateEvals, "expected non-nil evaluation list response") + f.NotEmpty(updateEvals, "expected non-empty evaluation list response") + + // Iterate the evals, ignoring those we have already seen and check their + // priority is as expected. + for _, eval := range updateEvals { + if ok := seenEvals[eval.ID]; ok { + continue } + f.Equal(13, eval.Priority) + seenEvals[eval.ID] = true } - f.NotNil(deregEval) - f.Equal(evalPriority, deregEval.Priority) - tc.jobIDs = []string{} -} -// TestJobDeregisterWithInvalidEvalPriority tests deregistering a job with a -// specified evaluation priority that is not valid. There is no need to -// register a job first as the validation is done within the agent HTTP handler -// before sending a server RPC request. -func (tc *EvalPriorityTest) TestJobDeregisterWithInvalidEvalPriority(f *framework.F) { + // Deregister the job without setting an eval priority. + deregOpts := api.DeregisterOptions{Purge: true} + deregEvalID, _, err := tc.Nomad().Jobs().DeregisterOpts(jobID, &deregOpts, nil) + f.NoError(err) + f.NotEmpty(deregEvalID, "expected non-empty evaluation ID") - // Pick an eval priority that is outside the supported bounds of 1-100. - evalPriority := 999 + // Detail the deregistration evaluation and check its priority. + evalInfo, _, err := tc.Nomad().Evaluations().Info(deregEvalID, nil) + f.NoError(err) + f.Equal(13, evalInfo.Priority) - // Generate a jobID and attempt to register the job using the eval - // priority. In case there is a problem found here and the job registers, - // we need to ensure it gets cleaned up. - jobID := "test-eval-priority-" + uuid.Generate()[0:8] - _, _, err := tc.Nomad().Jobs().DeregisterOpts(jobID, &api.DeregisterOptions{EvalPriority: evalPriority}, nil) - f.Error(err, "an error was expected due to invalid eval priority") + // If the job was successfully purged, clear the test suite state. + if err == nil { + tc.jobIDs = []string{} + } } diff --git a/e2e/eval_priority/inputs/default_job_priority.nomad b/e2e/eval_priority/inputs/thirteen_job_priority.nomad similarity index 75% rename from e2e/eval_priority/inputs/default_job_priority.nomad rename to e2e/eval_priority/inputs/thirteen_job_priority.nomad index 9224889d0a2..c21d6fedc03 100644 --- a/e2e/eval_priority/inputs/default_job_priority.nomad +++ b/e2e/eval_priority/inputs/thirteen_job_priority.nomad @@ -1,5 +1,8 @@ +variable "image" { default = "busybox:1" } + job "networking" { datacenters = ["dc1", "dc2"] + priority = 13 constraint { attribute = "${attr.kernel.name}" value = "linux" @@ -8,7 +11,7 @@ job "networking" { task "sleep" { driver = "docker" config { - image = "busybox:1" + image = var.image command = "/bin/sleep" args = ["300"] } diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index b70548a3e3d..f12357d1551 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -824,10 +824,20 @@ func (w *deploymentWatcher) createBatchedUpdate(allowReplacements []string, forI // getEval returns an evaluation suitable for the deployment func (w *deploymentWatcher) getEval() *structs.Evaluation { now := time.Now().UTC().UnixNano() + + // During a server upgrade it's possible we end up with deployments created + // on the previous version that are then "watched" on a leader that's on + // the new version. This would result in an eval with its priority set to + // zero which would be bad. This therefore protects against that. + priority := w.d.EvalPriority + if priority == 0 { + priority = w.j.Priority + } + return &structs.Evaluation{ ID: uuid.Generate(), Namespace: w.j.Namespace, - Priority: w.d.EvalPriority, + Priority: priority, Type: w.j.Type, TriggeredBy: structs.EvalTriggerDeploymentWatcher, JobID: w.j.ID, From b041cdae7d0dc592b10e88e8680ec009c5411f8d Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 19 Nov 2021 12:17:05 +0100 Subject: [PATCH 8/8] changelog: add entry for #11532 --- .changelog/11532.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/11532.txt diff --git a/.changelog/11532.txt b/.changelog/11532.txt new file mode 100644 index 00000000000..eba4fdec760 --- /dev/null +++ b/.changelog/11532.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core: allow setting and propagation of eval priority on job de/registration +```