From 97c69ee9a7312aaa315fec5850fe0a58e13a238a Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Fri, 10 Jul 2020 13:31:55 -0400 Subject: [PATCH 1/5] Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters. --- nomad/fsm.go | 32 ++- nomad/job_endpoint.go | 160 ++++++------ nomad/job_endpoint_test.go | 494 +++++++++++++++++++++++++++++++++++++ nomad/leader.go | 2 + nomad/periodic.go | 65 ++--- nomad/state/state_store.go | 17 ++ nomad/structs/structs.go | 6 + 7 files changed, 676 insertions(+), 100 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 10ca959fb76..28f12da5f10 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -555,6 +555,13 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { } } + if req.Eval != nil { + req.Eval.JobModifyIndex = index + if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil { + return err + } + } + return nil } @@ -565,14 +572,30 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - return n.state.WithWriteTransaction(func(tx state.Txn) error { - if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx); err != nil { + err := n.state.WithWriteTransaction(func(tx state.Txn) error { + err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx) + + if err != nil { n.logger.Error("deregistering job failed", "error", err) return err } return nil }) + + // always attempt upsert eval even if job deregister fail + if req.Eval != nil { + req.Eval.JobModifyIndex = index + if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil { + return err + } + } + + if err != nil { + return err + } + + return nil } func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} { @@ -663,7 +686,10 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { } func (n *nomadFSM) upsertEvals(index uint64, evals []*structs.Evaluation) error { - if err := n.state.UpsertEvals(index, evals); err != nil { + if err := n.state.UpsertEvals(index, evals); len(evals) == 1 && err == state.ErrDuplicateEval { + // the request is a duplicate, ignore processing it + return nil + } else if err != nil { n.logger.Error("UpsertEvals failed", "error", err) return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index a1b5df3c8f9..424f2d51b68 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -312,10 +312,31 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } + // Create a new evaluation + now := time.Now().UTC().UnixNano() + submittedEval := false + + // Set the submit time + args.Job.SubmitTime = time.Now().UTC().UnixNano() + + // If the job is periodic or parameterized, we don't create an eval. + if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) { + args.Eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: args.Job.Priority, + Type: args.Job.Type, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: args.Job.ID, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + reply.EvalID = args.Eval.ID + } + // Check if the job has changed at all if existingJob == nil || existingJob.SpecChanged(args.Job) { - // Set the submit time - args.Job.SetSubmitTime() // Commit this update via Raft fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args) @@ -328,8 +349,16 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } + submittedEval = true + // Populate the reply with job information reply.JobModifyIndex = index + reply.Index = index + + if args.Eval != nil { + reply.EvalCreateIndex = index + } + } else { reply.JobModifyIndex = existingJob.JobModifyIndex } @@ -337,43 +366,34 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // used for multiregion start args.Job.JobModifyIndex = reply.JobModifyIndex - // If the job is periodic or parameterized, we don't create an eval. - if args.Job.IsPeriodic() || args.Job.IsParameterized() { + if args.Eval == nil { return nil } - // Create a new evaluation - now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: args.Job.Priority, - Type: args.Job.Type, - TriggeredBy: structs.EvalTriggerJobRegister, - JobID: args.Job.ID, - JobModifyIndex: reply.JobModifyIndex, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, - } + // COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check. + // 0.12.1 introduced atomic eval job registration + if args.Eval != nil && + !(submittedEval && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false)) { + args.Eval.JobModifyIndex = reply.JobModifyIndex + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{args.Eval}, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } - // Commit this evaluation via Raft - // XXX: There is a risk of partial failure where the JobRegister succeeds - // but that the EvalUpdate does not. - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - j.logger.Error("eval create failed", "error", err, "method", "register") - return err - } + // Commit this evaluation via Raft + // There is a risk of partial failure where the JobRegister succeeds + // but that the EvalUpdate does not, before 0.12.1 + _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + j.logger.Error("eval create failed", "error", err, "method", "register") + return err + } - // Populate the reply with eval information - reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex + if !submittedEval { + reply.EvalCreateIndex = evalIndex + reply.Index = evalIndex + } + } // Kick off a multiregion deployment (enterprise only). if isRunner { @@ -766,6 +786,25 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD } } + // The job priority / type is strange for this, since it's not a high + // priority even if the job was. + now := time.Now().UTC().UnixNano() + // If the job is periodic or parameterized, we don't create an eval. + if job == nil || !(job.IsPeriodic() || job.IsParameterized()) { + args.Eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: structs.JobDefaultPriority, + Type: structs.JobTypeService, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: args.JobID, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + reply.EvalID = args.Eval.ID + } + // Commit the job update via Raft _, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args) if err != nil { @@ -775,6 +814,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // Populate the reply with job information reply.JobModifyIndex = index + reply.EvalCreateIndex = index + reply.Index = index // Make a raft apply to release the CSI volume claims of terminal allocs. var result *multierror.Error @@ -783,44 +824,25 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD result = multierror.Append(result, err) } - // If the job is periodic or parameterized, we don't create an eval. - if job != nil && (job.IsPeriodic() || job.IsParameterized()) { - return nil - } - - // Create a new evaluation - // XXX: The job priority / type is strange for this, since it's not a high - // priority even if the job was. - now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: structs.JobDefaultPriority, - Type: structs.JobTypeService, - TriggeredBy: structs.EvalTriggerJobDeregister, - JobID: args.JobID, - JobModifyIndex: index, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, - } + // COMPAT(1.1.0) - 0.12.1 introduced atomic job deregistration eval + if args.Eval != nil && + !ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) { + // Create a new evaluation + args.Eval.JobModifyIndex = index + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{args.Eval}, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } - // Commit this evaluation via Raft - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - result = multierror.Append(result, err) - j.logger.Error("eval create failed", "error", err, "method", "deregister") - return result.ErrorOrNil() + // Commit this evaluation via Raft + _, _, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + result = multierror.Append(result, err) + j.logger.Error("eval create failed", "error", err, "method", "deregister") + return result.ErrorOrNil() + } } - // Populate the reply with eval information - reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex return result.ErrorOrNil() } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index e1dc0727ab3..dbc96f1336a 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -10,6 +10,7 @@ import ( memdb "github.com/hashicorp/go-memdb" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/raft" "github.com/kr/pretty" "github.com/stretchr/testify/require" @@ -1571,6 +1572,318 @@ func TestJobEndpoint_Register_SemverConstraint(t *testing.T) { }) } +// TestJobEndpoint_Register_EvalCreation_Modern asserts that job register creates an eval +// atomically with the registration +func TestJobEndpoint_Register_EvalCreation_Modern(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + t.Run("job registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + //// initial registration should create the job and a new eval + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.NotEmpty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + + // Lookup the evaluation + eval, err := state.EvalByID(nil, resp.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp.EvalCreateIndex, eval.CreateIndex) + require.Nil(t, evalUpdateFromRaft(t, s1, eval.ID)) + + //// re-registration should create a new eval, but leave the job untouched + var resp2 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp2) + require.NoError(t, err) + require.NotZero(t, resp2.Index) + require.NotEmpty(t, resp2.EvalID) + require.NotEqual(t, resp.EvalID, resp2.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp2.JobModifyIndex, out.CreateIndex) + require.Equal(t, out.CreateIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp2.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp2.EvalCreateIndex, eval.CreateIndex) + + raftEval := evalUpdateFromRaft(t, s1, eval.ID) + require.NotNil(t, raftEval) + require.Equal(t, eval.CreateIndex, raftEval.CreateIndex) + require.Equal(t, eval, raftEval) + + //// an update should update the job and create a new eval + req.Job.TaskGroups[0].Name += "a" + var resp3 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp3) + require.NoError(t, err) + require.NotZero(t, resp3.Index) + require.NotEmpty(t, resp3.EvalID) + require.NotEqual(t, resp.EvalID, resp3.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp3.JobModifyIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp3.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp3.EvalCreateIndex, eval.CreateIndex) + + require.Nil(t, evalUpdateFromRaft(t, s1, eval.ID)) + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.Empty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + }) +} + +// TestJobEndpoint_Register_EvalCreation_Legacy asserts that job register creates an eval +// atomically with the registration, but handle legacy clients by adding a new eval update +func TestJobEndpoint_Register_EvalCreation_Legacy(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + s2, cleanupS2 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + + // simulate presense of a server that doesn't handle + // new registration eval + c.Build = "0.12.0" + }) + defer cleanupS2() + + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // keep s1 as the leader + if leader, _ := s1.getLeader(); !leader { + s1, s2 = s2, s1 + } + + codec := rpcClient(t, s1) + + // Create the register request + t.Run("job registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + //// initial registration should create the job and a new eval + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.NotEmpty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + + // Lookup the evaluation + eval, err := state.EvalByID(nil, resp.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp.EvalCreateIndex, eval.CreateIndex) + + raftEval := evalUpdateFromRaft(t, s1, eval.ID) + require.NotNil(t, raftEval) + require.Equal(t, eval.ID, raftEval.ID) + require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex) + require.Greater(t, raftEval.CreateIndex, eval.CreateIndex) + + //// re-registration should create a new eval, but leave the job untouched + var resp2 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp2) + require.NoError(t, err) + require.NotZero(t, resp2.Index) + require.NotEmpty(t, resp2.EvalID) + require.NotEqual(t, resp.EvalID, resp2.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp2.JobModifyIndex, out.CreateIndex) + require.Equal(t, out.CreateIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp2.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp2.EvalCreateIndex, eval.CreateIndex) + + // this raft eval is the one found above + raftEval = evalUpdateFromRaft(t, s1, eval.ID) + require.Equal(t, raftEval.CreateIndex, eval.CreateIndex) + require.Equal(t, eval, raftEval) + + //// an update should update the job and create a new eval + req.Job.TaskGroups[0].Name += "a" + var resp3 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp3) + require.NoError(t, err) + require.NotZero(t, resp3.Index) + require.NotEmpty(t, resp3.EvalID) + require.NotEqual(t, resp.EvalID, resp3.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp3.JobModifyIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp3.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp3.EvalCreateIndex, eval.CreateIndex) + + raftEval = evalUpdateFromRaft(t, s1, eval.ID) + require.NotNil(t, raftEval) + require.Equal(t, eval.ID, raftEval.ID) + require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex) + require.Greater(t, raftEval.CreateIndex, eval.CreateIndex) + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.Empty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + }) +} + +// evalUpdateFromRaft searches the raft logs for the eval update pertaining to the eval +func evalUpdateFromRaft(t *testing.T, s *Server, evalID string) *structs.Evaluation { + var store raft.LogStore = s.raftInmem + if store == nil { + store = s.raftStore + } + require.NotNil(t, store) + + li, _ := store.LastIndex() + for i, _ := store.FirstIndex(); i <= li; i++ { + var log raft.Log + err := store.GetLog(i, &log) + require.NoError(t, err) + + if log.Type != raft.LogCommand { + continue + } + + if structs.MessageType(log.Data[0]) != structs.EvalUpdateRequestType { + continue + } + + var req structs.EvalUpdateRequest + structs.Decode(log.Data[1:], &req) + require.NoError(t, err) + + for _, eval := range req.Evals { + if eval.ID == evalID { + eval.CreateIndex = i + eval.ModifyIndex = i + return eval + } + } + } + + return nil +} + func TestJobEndpoint_Revert(t *testing.T) { t.Parallel() @@ -2839,6 +3152,187 @@ func TestJobEndpoint_Deregister_ParameterizedJob(t *testing.T) { } } +// TestJobEndpoint_Deregister_EvalCreation_Modern asserts that job deregister creates an eval +// atomically with the registration +func TestJobEndpoint_Deregister_EvalCreation_Modern(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + t.Run("job de-registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.NotEmpty(t, resp2.EvalID) + + state := s1.fsm.State() + eval, err := state.EvalByID(nil, resp2.EvalID) + require.Nil(t, err) + require.NotNil(t, eval) + require.EqualValues(t, resp2.EvalCreateIndex, eval.CreateIndex) + + require.Nil(t, evalUpdateFromRaft(t, s1, eval.ID)) + + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.Empty(t, resp2.EvalID) + }) +} + +// TestJobEndpoint_Register_EvalCreation_Legacy asserts that job deregister creates an eval +// atomically with the registration, but handle legacy clients by adding a new eval update +func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + s2, cleanupS2 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + + // simulate presense of a server that doesn't handle + // new registration eval + c.Build = "0.12.0" + }) + defer cleanupS2() + + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // keep s1 as the leader + if leader, _ := s1.getLeader(); !leader { + s1, s2 = s2, s1 + } + + codec := rpcClient(t, s1) + + // Create the register request + t.Run("job registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.NotEmpty(t, resp2.EvalID) + + state := s1.fsm.State() + eval, err := state.EvalByID(nil, resp2.EvalID) + require.Nil(t, err) + require.NotNil(t, eval) + require.EqualValues(t, resp2.EvalCreateIndex, eval.CreateIndex) + + raftEval := evalUpdateFromRaft(t, s1, eval.ID) + require.NotNil(t, raftEval) + require.Equal(t, eval.ID, raftEval.ID) + require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex) + require.Greater(t, raftEval.CreateIndex, eval.CreateIndex) + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.Empty(t, resp2.EvalID) + }) +} + func TestJobEndpoint_BatchDeregister(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/leader.go b/nomad/leader.go index ca071d2c832..c19b2159b26 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -46,6 +46,8 @@ var minSchedulerConfigVersion = version.Must(version.NewVersion("0.9.0")) var minClusterIDVersion = version.Must(version.NewVersion("0.10.4")) +var minJobRegisterAtomicEvalVersion = version.Must(version.NewVersion("0.12.1")) + // monitorLeadership is used to monitor if we acquire or lose our role // as the leader in the Raft cluster. There is some work the leader is // expected to do, so we must react to changes diff --git a/nomad/periodic.go b/nomad/periodic.go index 0a64fac7dfa..a2ea2219f3e 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -46,10 +46,24 @@ type JobEvalDispatcher interface { // DispatchJob creates an evaluation for the passed job and commits both the // evaluation and the job to the raft log. It returns the eval. func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) { + now := time.Now().UTC().UnixNano() + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: job.Namespace, + Priority: job.Priority, + Type: job.Type, + TriggeredBy: structs.EvalTriggerPeriodicJob, + JobID: job.ID, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + // Commit this update via Raft job.SetSubmitTime() req := structs.JobRegisterRequest{ - Job: job, + Job: job, + Eval: eval, WriteRequest: structs.WriteRequest{ Namespace: job.Namespace, }, @@ -62,35 +76,30 @@ func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) { return nil, err } - // Create a new evaluation - now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: job.Namespace, - Priority: job.Priority, - Type: job.Type, - TriggeredBy: structs.EvalTriggerPeriodicJob, - JobID: job.ID, - JobModifyIndex: index, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - } - - // Commit this evaluation via Raft - // XXX: There is a risk of partial failure where the JobRegister succeeds - // but that the EvalUpdate does not. - _, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - return nil, err + eval.CreateIndex = index + eval.ModifyIndex = index + + // COMPAT(1.1): Remove in 1.1.0 - 0.12.1 introduced atomic eval job registration + if !ServersMeetMinimumVersion(s.Members(), minJobRegisterAtomicEvalVersion, false) { + // Create a new evaluation + eval.JobModifyIndex = index + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + + // Commit this evaluation via Raft + // There is a risk of partial failure where the JobRegister succeeds + // but that the EvalUpdate does not, before Nomad 0.12.1 + _, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + return nil, err + } + + // Update its indexes. + eval.CreateIndex = evalIndex + eval.ModifyIndex = evalIndex } - // Update its indexes. - eval.CreateIndex = evalIndex - eval.ModifyIndex = evalIndex return eval, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index eb57f38144a..058237367a8 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2540,6 +2540,19 @@ func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, t return nil } +// ErrDuplicateEval indicates that the eval is already present in state and ought not +// to be reprocessed again. +// +// Such duplication might occur in the background compatibility path in job registration +// or deregistration +var ErrDuplicateEval = fmt.Errorf("eval already exists") + +func isPotentialDuplicateEval(eval *structs.Evaluation) bool { + return eval.Status == structs.EvalStatusPending && + (eval.TriggeredBy == structs.EvalTriggerJobRegister || + eval.TriggeredBy == structs.EvalTriggerPeriodicJob) +} + // nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error { // Lookup the evaluation @@ -2548,6 +2561,10 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct return fmt.Errorf("eval lookup failed: %v", err) } + if existing != nil && isPotentialDuplicateEval(eval) { + return ErrDuplicateEval + } + // Update the indexes if existing != nil { eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index cc0550aba56..a5ad2eea259 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -563,6 +563,9 @@ type JobRegisterRequest struct { // PolicyOverride is set when the user is attempting to override any policies PolicyOverride bool + // Eval is the evaluation that is associated with the job registration + Eval *Evaluation + WriteRequest } @@ -576,6 +579,9 @@ type JobDeregisterRequest struct { // garbage collector Purge bool + // Eval is the evaluation to create that's associated with job deregister + Eval *Evaluation + WriteRequest } From 6a082ade337a2331513427940d9b7d0b28005058 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 15 Jul 2020 08:49:17 -0400 Subject: [PATCH 2/5] time.Now().UTC().UnixNano() -> time.Now().UnixNano() --- nomad/job_endpoint.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 424f2d51b68..d6e8aaf9cd3 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -313,11 +313,11 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } // Create a new evaluation - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() submittedEval := false // Set the submit time - args.Job.SubmitTime = time.Now().UTC().UnixNano() + args.Job.SubmitTime = now // If the job is periodic or parameterized, we don't create an eval. if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) { @@ -709,7 +709,7 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis } // Create a new evaluation - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), @@ -788,7 +788,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // The job priority / type is strange for this, since it's not a high // priority even if the job was. - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() // If the job is periodic or parameterized, we don't create an eval. if job == nil || !(job.IsPeriodic() || job.IsParameterized()) { args.Eval = &structs.Evaluation{ @@ -905,7 +905,7 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st } // Create a new evaluation - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: jobNS.Namespace, @@ -998,7 +998,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes fmt.Sprintf("task group %q specified for scaling does not exist in job", groupName)) } - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() // If the count is present, commit the job update via Raft // for now, we'll do this even if count didn't change @@ -1673,7 +1673,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) } // Create an eval and mark it as requiring annotations and insert that as well - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), @@ -1871,7 +1871,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa // If the job is periodic, we don't create an eval. if !dispatchJob.IsPeriodic() { // Create a new evaluation - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), From a6a96c47e46391feb68063a6bd5b79d47c074660 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 15 Jul 2020 11:10:57 -0400 Subject: [PATCH 3/5] only set args.Eval after all servers upgrade We set the Eval field on job (de-)registration only after all servers get upgraded, to avoid dealing with duplicate evals. --- Vagrantfile | 2 +- nomad/job_endpoint.go | 60 +++++++++++++++++++++++--------------- nomad/job_endpoint_test.go | 20 +++---------- 3 files changed, 41 insertions(+), 41 deletions(-) diff --git a/Vagrantfile b/Vagrantfile index 96d70fab9c3..6f744f6849f 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -2,7 +2,7 @@ # vi: set ft=ruby : # -LINUX_BASE_BOX = "bento/ubuntu-16.04" +LINUX_BASE_BOX = "bento/ubuntu-18.04" FREEBSD_BASE_BOX = "freebsd/FreeBSD-11.3-STABLE" LINUX_IP_ADDRESS = "10.199.0.200" diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index d6e8aaf9cd3..1980dbd01da 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -315,13 +315,14 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // Create a new evaluation now := time.Now().UnixNano() submittedEval := false + var eval *structs.Evaluation // Set the submit time args.Job.SubmitTime = now // If the job is periodic or parameterized, we don't create an eval. if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) { - args.Eval = &structs.Evaluation{ + eval = &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), Priority: args.Job.Priority, @@ -332,12 +333,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis CreateTime: now, ModifyTime: now, } - reply.EvalID = args.Eval.ID + reply.EvalID = eval.ID } // Check if the job has changed at all if existingJob == nil || existingJob.SpecChanged(args.Job) { + // COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check to always set args.Eval + // 0.12.1 introduced atomic eval job registration + if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) { + args.Eval = eval + submittedEval = true + } + // Commit this update via Raft fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args) if err, ok := fsmErr.(error); ok && err != nil { @@ -349,13 +357,11 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } - submittedEval = true - // Populate the reply with job information reply.JobModifyIndex = index reply.Index = index - if args.Eval != nil { + if submittedEval { reply.EvalCreateIndex = index } @@ -366,17 +372,14 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // used for multiregion start args.Job.JobModifyIndex = reply.JobModifyIndex - if args.Eval == nil { + if eval == nil { return nil } - // COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check. - // 0.12.1 introduced atomic eval job registration - if args.Eval != nil && - !(submittedEval && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false)) { - args.Eval.JobModifyIndex = reply.JobModifyIndex + if eval != nil && !submittedEval { + eval.JobModifyIndex = reply.JobModifyIndex update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{args.Eval}, + Evals: []*structs.Evaluation{eval}, WriteRequest: structs.WriteRequest{Region: args.Region}, } @@ -389,10 +392,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } - if !submittedEval { - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex - } + reply.EvalCreateIndex = evalIndex + reply.Index = evalIndex } // Kick off a multiregion deployment (enterprise only). @@ -786,12 +787,15 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD } } + var eval *structs.Evaluation + // The job priority / type is strange for this, since it's not a high // priority even if the job was. now := time.Now().UnixNano() + // If the job is periodic or parameterized, we don't create an eval. if job == nil || !(job.IsPeriodic() || job.IsParameterized()) { - args.Eval = &structs.Evaluation{ + eval = &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), Priority: structs.JobDefaultPriority, @@ -802,7 +806,12 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD CreateTime: now, ModifyTime: now, } - reply.EvalID = args.Eval.ID + reply.EvalID = eval.ID + } + + // COMPAT(1.1.0): remove conditional and always set args.Eval + if ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) { + args.Eval = eval } // Commit the job update via Raft @@ -824,23 +833,26 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD result = multierror.Append(result, err) } - // COMPAT(1.1.0) - 0.12.1 introduced atomic job deregistration eval - if args.Eval != nil && - !ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) { + // COMPAT(1.1.0) - Remove entire conditional block + // 0.12.1 introduced atomic job deregistration eval + if eval != nil && args.Eval == nil { // Create a new evaluation - args.Eval.JobModifyIndex = index + eval.JobModifyIndex = index update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{args.Eval}, + Evals: []*structs.Evaluation{eval}, WriteRequest: structs.WriteRequest{Region: args.Region}, } // Commit this evaluation via Raft - _, _, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) if err != nil { result = multierror.Append(result, err) j.logger.Error("eval create failed", "error", err, "method", "deregister") return result.ErrorOrNil() } + + reply.EvalCreateIndex = evalIndex + reply.Index = evalIndex } return result.ErrorOrNil() diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index dbc96f1336a..9edd97c279c 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -1640,9 +1640,7 @@ func TestJobEndpoint_Register_EvalCreation_Modern(t *testing.T) { require.Equal(t, resp2.EvalCreateIndex, eval.CreateIndex) raftEval := evalUpdateFromRaft(t, s1, eval.ID) - require.NotNil(t, raftEval) - require.Equal(t, eval.CreateIndex, raftEval.CreateIndex) - require.Equal(t, eval, raftEval) + require.Equal(t, raftEval, eval) //// an update should update the job and create a new eval req.Job.TaskGroups[0].Name += "a" @@ -1759,10 +1757,7 @@ func TestJobEndpoint_Register_EvalCreation_Legacy(t *testing.T) { require.Equal(t, resp.EvalCreateIndex, eval.CreateIndex) raftEval := evalUpdateFromRaft(t, s1, eval.ID) - require.NotNil(t, raftEval) - require.Equal(t, eval.ID, raftEval.ID) - require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex) - require.Greater(t, raftEval.CreateIndex, eval.CreateIndex) + require.Equal(t, eval, raftEval) //// re-registration should create a new eval, but leave the job untouched var resp2 structs.JobRegisterResponse @@ -1788,7 +1783,6 @@ func TestJobEndpoint_Register_EvalCreation_Legacy(t *testing.T) { // this raft eval is the one found above raftEval = evalUpdateFromRaft(t, s1, eval.ID) - require.Equal(t, raftEval.CreateIndex, eval.CreateIndex) require.Equal(t, eval, raftEval) //// an update should update the job and create a new eval @@ -1814,10 +1808,7 @@ func TestJobEndpoint_Register_EvalCreation_Legacy(t *testing.T) { require.Equal(t, resp3.EvalCreateIndex, eval.CreateIndex) raftEval = evalUpdateFromRaft(t, s1, eval.ID) - require.NotNil(t, raftEval) - require.Equal(t, eval.ID, raftEval.ID) - require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex) - require.Greater(t, raftEval.CreateIndex, eval.CreateIndex) + require.Equal(t, eval, raftEval) }) // Registering a parameterized job shouldn't create an eval @@ -3297,10 +3288,7 @@ func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) { require.EqualValues(t, resp2.EvalCreateIndex, eval.CreateIndex) raftEval := evalUpdateFromRaft(t, s1, eval.ID) - require.NotNil(t, raftEval) - require.Equal(t, eval.ID, raftEval.ID) - require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex) - require.Greater(t, raftEval.CreateIndex, eval.CreateIndex) + require.Equal(t, eval, raftEval) }) // Registering a parameterized job shouldn't create an eval From 921a42b48716106a901633a786dcaf5cceebbd07 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 15 Jul 2020 11:14:49 -0400 Subject: [PATCH 4/5] no need to handle duplicate evals anymore --- nomad/fsm.go | 5 +---- nomad/state/state_store.go | 17 ----------------- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 28f12da5f10..ca043d4e837 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -686,10 +686,7 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { } func (n *nomadFSM) upsertEvals(index uint64, evals []*structs.Evaluation) error { - if err := n.state.UpsertEvals(index, evals); len(evals) == 1 && err == state.ErrDuplicateEval { - // the request is a duplicate, ignore processing it - return nil - } else if err != nil { + if err := n.state.UpsertEvals(index, evals); err != nil { n.logger.Error("UpsertEvals failed", "error", err) return err } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 058237367a8..eb57f38144a 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2540,19 +2540,6 @@ func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, t return nil } -// ErrDuplicateEval indicates that the eval is already present in state and ought not -// to be reprocessed again. -// -// Such duplication might occur in the background compatibility path in job registration -// or deregistration -var ErrDuplicateEval = fmt.Errorf("eval already exists") - -func isPotentialDuplicateEval(eval *structs.Evaluation) bool { - return eval.Status == structs.EvalStatusPending && - (eval.TriggeredBy == structs.EvalTriggerJobRegister || - eval.TriggeredBy == structs.EvalTriggerPeriodicJob) -} - // nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error { // Lookup the evaluation @@ -2561,10 +2548,6 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct return fmt.Errorf("eval lookup failed: %v", err) } - if existing != nil && isPotentialDuplicateEval(eval) { - return ErrDuplicateEval - } - // Update the indexes if existing != nil { eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex From 37ad9476073d2215fbff06e80d9296ebc253c07f Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 15 Jul 2020 11:23:49 -0400 Subject: [PATCH 5/5] comment compat concern in fsm.go --- nomad/fsm.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nomad/fsm.go b/nomad/fsm.go index ca043d4e837..98a65590baf 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -555,6 +555,8 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { } } + // COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log, + // so this may be nil during server upgrades. if req.Eval != nil { req.Eval.JobModifyIndex = index if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil { @@ -583,6 +585,8 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { return nil }) + // COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log, + // so this may be nil during server upgrades. // always attempt upsert eval even if job deregister fail if req.Eval != nil { req.Eval.JobModifyIndex = index