Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic eval insertion with job (de-)registration #8435

Merged
merged 5 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# vi: set ft=ruby :
#

LINUX_BASE_BOX = "bento/ubuntu-16.04"
LINUX_BASE_BOX = "bento/ubuntu-18.04"
FREEBSD_BASE_BOX = "freebsd/FreeBSD-11.3-STABLE"

LINUX_IP_ADDRESS = "10.199.0.200"
Expand Down
31 changes: 29 additions & 2 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,15 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
}
}

// COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log,
// so this may be nil during server upgrades.
if req.Eval != nil {
notnoop marked this conversation as resolved.
Show resolved Hide resolved
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}

return nil
}

Expand All @@ -565,14 +574,32 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

return n.state.WithWriteTransaction(func(tx state.Txn) error {
if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx); err != nil {
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx)

if err != nil {
n.logger.Error("deregistering job failed", "error", err)
return err
}

return nil
})

// COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log,
// so this may be nil during server upgrades.
// always attempt upsert eval even if job deregister fail
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}
Comment on lines +590 to +596
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that here we attempt to insert the eval even if the job deregistration fails. I find this behavior very odd but it's explicitly tested in

func TestJobEndpoint_Deregister_Nonexistent(t *testing.T) {
- with it insuring that the resulting eval is populated and have JobModifyIndex set ?! I kept the behavior the same

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this was intended to protect against the previous non-atomic behavior? Ex. if you deregister but the eval wasn't persisted, but then tried to deregister again, the job would be potentially gone but you'd still want an eval to clean it up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually seems like a bug to me, but I think one that does not impact correctness (only waste some work doing nothing).

From peeking around generic_sched.go and reconcile.go, I'm not seeing us check if the evaluation is a Deregister. We always seem to checked if Job.Stopped is true! This means if we fail to update the statestore with the stopped Job, the Deregister evaluation will be the same as any other evaluation and end up being a noop for the job as it is not stopped and presumably already scheduled/allocated.

I suspect that @tgross is correct and that the test is merely asserting the non-atomic behavior existed: #981

Since we're making Job+Eval submissions atomic, I think we should at least trying to make this section atomic as well. I can't figure out where a Deregister eval for a non-Stopped job would have a desirable affect, but perhaps somebody else can find a case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to let the current behavior stand as-is, as stopping making an eval is a user visible change. Will follow up in another PR and we can discuss this issue further there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to let the current behavior stand as-is, as stopping making an eval is a user visible change.

I don't find this reason sufficient to keep it since the eval in question is a dereg but the effect would be a noop. If that's the behavior I feel like we're emitting a useless and actively misleading eval which should be treated like a bug and removed. There's no benefit to leaving it in place as anyone observing it would only be confused by its lack of affect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it should be changed - but I believe such a user-visible behavior (albeit a small one) change is outside the scope of this PR, so I will follow up in another PR.


if err != nil {
return err
}

return nil
}

func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} {
Expand Down
180 changes: 107 additions & 73 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,39 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}

// Create a new evaluation
now := time.Now().UnixNano()
submittedEval := false
var eval *structs.Evaluation

// Set the submit time
args.Job.SubmitTime = now

// If the job is periodic or parameterized, we don't create an eval.
if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) {
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
reply.EvalID = eval.ID
}

// Check if the job has changed at all
if existingJob == nil || existingJob.SpecChanged(args.Job) {
// Set the submit time
args.Job.SetSubmitTime()

// COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check to always set args.Eval
// 0.12.1 introduced atomic eval job registration
if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
submittedEval = true
}

// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
Expand All @@ -330,50 +359,42 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis

// Populate the reply with job information
reply.JobModifyIndex = index
reply.Index = index

if submittedEval {
reply.EvalCreateIndex = index
}

} else {
reply.JobModifyIndex = existingJob.JobModifyIndex
}

// used for multiregion start
args.Job.JobModifyIndex = reply.JobModifyIndex

// If the job is periodic or parameterized, we don't create an eval.
if args.Job.IsPeriodic() || args.Job.IsParameterized() {
if eval == nil {
return nil
}

// Create a new evaluation
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
JobModifyIndex: reply.JobModifyIndex,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
if eval != nil && !submittedEval {
eval.JobModifyIndex = reply.JobModifyIndex
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "register")
return err
}
// Commit this evaluation via Raft
// There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not, before 0.12.1
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "register")
return err
}

// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}

// Kick off a multiregion deployment (enterprise only).
if isRunner {
Expand Down Expand Up @@ -689,7 +710,7 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis
}

// Create a new evaluation
now := time.Now().UTC().UnixNano()
now := time.Now().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Expand Down Expand Up @@ -766,6 +787,33 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
}
}

var eval *structs.Evaluation

// The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UnixNano()

// If the job is periodic or parameterized, we don't create an eval.
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
reply.EvalID = eval.ID
}

// COMPAT(1.1.0): remove conditional and always set args.Eval
if ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
}

// Commit the job update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
Expand All @@ -775,6 +823,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD

// Populate the reply with job information
reply.JobModifyIndex = index
reply.EvalCreateIndex = index
reply.Index = index

// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
Expand All @@ -783,44 +833,28 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
result = multierror.Append(result, err)
}

// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
return nil
}
// COMPAT(1.1.0) - Remove entire conditional block
// 0.12.1 introduced atomic job deregistration eval
if eval != nil && args.Eval == nil {
// Create a new evaluation
eval.JobModifyIndex = index
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
}

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}

// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return result.ErrorOrNil()
}

Expand Down Expand Up @@ -883,7 +917,7 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st
}

// Create a new evaluation
now := time.Now().UTC().UnixNano()
now := time.Now().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: jobNS.Namespace,
Expand Down Expand Up @@ -976,7 +1010,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
fmt.Sprintf("task group %q specified for scaling does not exist in job", groupName))
}

now := time.Now().UTC().UnixNano()
now := time.Now().UnixNano()

// If the count is present, commit the job update via Raft
// for now, we'll do this even if count didn't change
Expand Down Expand Up @@ -1651,7 +1685,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
}

// Create an eval and mark it as requiring annotations and insert that as well
now := time.Now().UTC().UnixNano()
now := time.Now().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Expand Down Expand Up @@ -1849,7 +1883,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
// If the job is periodic, we don't create an eval.
if !dispatchJob.IsPeriodic() {
// Create a new evaluation
now := time.Now().UTC().UnixNano()
now := time.Now().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Expand Down
Loading