Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update eval modify index as part of plan apply. #3669

Merged
merged 5 commits into from
Dec 19, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,73 @@ func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
}
}

func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) {
// test enqueueing an eval, updating a plan result for the same eval and de-queueing the eval
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

alloc := mock.Alloc()
job := alloc.Job
alloc.Job = nil

state := s1.fsm.State()

if err := state.UpsertJob(999, job); err != nil {
t.Fatalf("err: %v", err)
}

eval := mock.Eval()
eval.JobID = job.ID

// Create an eval
if err := state.UpsertEvals(1, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}

s1.evalBroker.Enqueue(eval)

// Create a plan result and apply it with a later index
res := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
Job: job,
},
EvalID: eval.ID,
}
assert := assert.New(t)
err := state.UpsertPlanResults(1000, &res)
assert.Nil(err)

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}

// Ensure outstanding
token, ok := s1.evalBroker.Outstanding(eval.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if token != resp.Token {
t.Fatalf("bad token: %#v %#v", token, resp.Token)
}

if resp.WaitIndex != 1000 {
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, 1000)
}
}

func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
Expand Down
4 changes: 2 additions & 2 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return nil
}

// reconcileSummaries re-calculates the queued allocations for every job that we
// reconcileQueuedAllocations re-calculates the queued allocations for every job that we
// created a Job Summary during the snap shot restore
func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
// Get all the jobs
Expand Down Expand Up @@ -1142,7 +1142,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}

snap.UpsertEvals(100, []*structs.Evaluation{eval})
// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner)
if err != nil {
Expand Down
61 changes: 33 additions & 28 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,13 +1213,18 @@ func TestFSM_ApplyPlanResults(t *testing.T) {

alloc.DeploymentID = d.ID

eval := mock.Eval()
eval.JobID = job.ID
fsm.State().UpsertEvals(1, []*structs.Evaluation{eval})

fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{alloc},
},
Deployment: d,
EvalID: eval.ID,
}
buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req)
if err != nil {
Expand All @@ -1233,32 +1238,32 @@ func TestFSM_ApplyPlanResults(t *testing.T) {

// Verify the allocation is registered
ws := memdb.NewWatchSet()
assert := assert.New(t)
out, err := fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
assert.Nil(err)
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
alloc.AllocModifyIndex = out.AllocModifyIndex

// Job should be re-attached
alloc.Job = job
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
}
assert.Equal(alloc, out)

dout, err := fsm.State().DeploymentByID(ws, d.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if tg, ok := dout.TaskGroups[alloc.TaskGroup]; !ok || tg.PlacedAllocs != 1 {
t.Fatalf("err: %v %v", tg, err)
}
assert.Nil(err)
tg, ok := dout.TaskGroups[alloc.TaskGroup]
assert.True(ok)
assert.NotNil(tg)
assert.Equal(1, tg.PlacedAllocs)

// Ensure that the original job is used
evictAlloc := alloc.Copy()
job = mock.Job()
job.Priority = 123
eval = mock.Eval()
eval.JobID = job.ID

fsm.State().UpsertEvals(2, []*structs.Evaluation{eval})

evictAlloc.Job = nil
evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict
Expand All @@ -1267,28 +1272,28 @@ func TestFSM_ApplyPlanResults(t *testing.T) {
Job: job,
Alloc: []*structs.Allocation{evictAlloc},
},
EvalID: eval.ID,
}
buf, err = structs.Encode(structs.ApplyPlanResultsRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
assert.Nil(err)

resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
log := makeLog(buf)
//set the index to something other than 1
log.Index = 25
resp = fsm.Apply(log)
assert.Nil(resp)

// Verify we are evicted
out, err = fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.DesiredStatus != structs.AllocDesiredStatusEvict {
t.Fatalf("alloc found!")
}
if out.Job == nil || out.Job.Priority == 123 {
t.Fatalf("bad job")
}
assert.Nil(err)
assert.Equal(structs.AllocDesiredStatusEvict, out.DesiredStatus)
assert.NotNil(out.Job)
assert.NotEqual(123, out.Job.Priority)

evalOut, err := fsm.State().EvalByID(ws, eval.ID)
assert.Nil(err)
assert.Equal(log.Index, evalOut.ModifyIndex)

}

func TestFSM_DeploymentStatusUpdate(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,8 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
AnnotatePlan: true,
}

snap.UpsertEvals(100, []*structs.Evaluation{eval})

// Create an in-memory Planner that returns no errors and stores the
// submitted plan and created evals.
planner := &scheduler.Harness{
Expand Down
1 change: 1 addition & 0 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
},
Deployment: result.Deployment,
DeploymentUpdates: result.DeploymentUpdates,
EvalID: plan.EvalID,
}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
Expand Down
Loading