diff --git a/api/jobs.go b/api/jobs.go index a248f4519ca..cc303d5230a 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -159,6 +159,15 @@ func (j *Jobs) Plan(job *Job, diff bool, q *WriteOptions) (*JobPlanResponse, *Wr return &resp, wm, nil } +func (j *Jobs) Summary(jobID string, q *QueryOptions) (*JobSummary, *QueryMeta, error) { + var resp JobSummary + qm, err := j.client.query("/v1/job/"+jobID+"/summary", &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + // periodicForceResponse is used to deserialize a force response type periodicForceResponse struct { EvalID string @@ -199,6 +208,27 @@ type Job struct { JobModifyIndex uint64 } +// JobSummary summarizes the state of the allocations of a job +type JobSummary struct { + JobID string + Summary map[string]TaskGroupSummary + + // Raft Indexes + CreateIndex uint64 + ModifyIndex uint64 +} + +// TaskGroup summarizes the state of all the allocations of a particular +// TaskGroup +type TaskGroupSummary struct { + Queued int + Complete int + Failed int + Running int + Starting int + Lost int +} + // JobListStub is used to return a subset of information about // jobs during list operations. type JobListStub struct { @@ -209,6 +239,7 @@ type JobListStub struct { Priority int Status string StatusDescription string + JobSummary *JobSummary CreateIndex uint64 ModifyIndex uint64 JobModifyIndex uint64 diff --git a/api/jobs_test.go b/api/jobs_test.go index 8bda1708bdd..11efc7a3661 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -488,6 +488,48 @@ func TestJobs_Plan(t *testing.T) { } } +func TestJobs_JobSummary(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Trying to retrieve a job summary before the job exists + // returns an error + _, _, err := jobs.Summary("job1", nil) + if err == nil || !strings.Contains(err.Error(), "not found") { + t.Fatalf("expected not found error, got: %#v", err) + } + + // Register the job + job := testJob() + _, wm, err := jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + // Query the job summary again and ensure it exists + result, qm, err := jobs.Summary("job1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + + expectedJobSummary := JobSummary{ + JobID: job.ID, + Summary: map[string]TaskGroupSummary{ + job.TaskGroups[0].Name: {}, + }, + CreateIndex: result.CreateIndex, + ModifyIndex: result.ModifyIndex, + } + + // Check that the result is what we expect + if !reflect.DeepEqual(&expectedJobSummary, result) { + t.Fatalf("expect: %#v, got: %#v", expectedJobSummary, result) + } +} + func TestJobs_NewBatchJob(t *testing.T) { job := NewBatchJob("job1", "myjob", "region1", 5) expect := &Job{ diff --git a/client/client_test.go b/client/client_test.go index 60af15badf8..105b089dab3 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -357,6 +357,9 @@ func TestClient_UpdateAllocStatus(t *testing.T) { alloc.ClientStatus = originalStatus state := s1.State() + if err := state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)); err != nil { + t.Fatal(err) + } state.UpsertAllocs(100, []*structs.Allocation{alloc}) testutil.WaitForResult(func() (bool, error) { @@ -394,6 +397,12 @@ func TestClient_WatchAllocs(t *testing.T) { alloc2.NodeID = c1.Node().ID state := s1.State() + if err := state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } + if err := state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)); err != nil { + t.Fatal(err) + } err := state.UpsertAllocs(100, []*structs.Allocation{alloc1, alloc2}) if err != nil { @@ -469,8 +478,10 @@ func TestClient_SaveRestoreState(t *testing.T) { task.Config["args"] = []string{"10"} state := s1.State() - err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}) - if err != nil { + if err := state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } + if err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}); err != nil { t.Fatalf("err: %v", err) } @@ -485,8 +496,7 @@ func TestClient_SaveRestoreState(t *testing.T) { }) // Shutdown the client, saves state - err = c1.Shutdown() - if err != nil { + if err := c1.Shutdown(); err != nil { t.Fatalf("err: %v", err) } diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index e2a4efd8330..5b0ce1ca80d 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -16,6 +16,8 @@ func TestHTTP_AllocsList(t *testing.T) { state := s.Agent.server.State() alloc1 := mock.Alloc() alloc2 := mock.Alloc() + state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)) + state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}) if err != nil { @@ -58,13 +60,21 @@ func TestHTTP_AllocsPrefixList(t *testing.T) { httpTest(t, nil, func(s *TestServer) { // Directly manipulate the state state := s.Agent.server.State() + alloc1 := mock.Alloc() alloc1.ID = "aaaaaaaa-e8f7-fd38-c855-ab94ceb89706" alloc2 := mock.Alloc() alloc2.ID = "aaabbbbb-e8f7-fd38-c855-ab94ceb89706" - err := state.UpsertAllocs(1000, - []*structs.Allocation{alloc1, alloc2}) - if err != nil { + summary1 := mock.JobSummary(alloc1.JobID) + summary2 := mock.JobSummary(alloc2.JobID) + if err := state.UpsertJobSummary(998, summary1); err != nil { + t.Fatal(err) + } + if err := state.UpsertJobSummary(999, summary2); err != nil { + t.Fatal(err) + } + if err := state.UpsertAllocs(1000, + []*structs.Allocation{alloc1, alloc2}); err != nil { t.Fatalf("err: %v", err) } @@ -110,6 +120,9 @@ func TestHTTP_AllocQuery(t *testing.T) { // Directly manipulate the state state := s.Agent.server.State() alloc := mock.Alloc() + if err := state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)); err != nil { + t.Fatal(err) + } err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) if err != nil { diff --git a/command/agent/eval_endpoint_test.go b/command/agent/eval_endpoint_test.go index 102c3d7c266..4cf28443863 100644 --- a/command/agent/eval_endpoint_test.go +++ b/command/agent/eval_endpoint_test.go @@ -111,6 +111,8 @@ func TestHTTP_EvalAllocations(t *testing.T) { alloc1 := mock.Alloc() alloc2 := mock.Alloc() alloc2.EvalID = alloc1.EvalID + state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)) + state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}) if err != nil { diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 12e40e54207..6fd0814a577 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -54,6 +54,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ case strings.HasSuffix(path, "/plan"): jobName := strings.TrimSuffix(path, "/plan") return s.jobPlan(resp, req, jobName) + case strings.HasSuffix(path, "/summary"): + jobName := strings.TrimSuffix(path, "/summary") + return s.jobSummaryRequest(resp, req, jobName) default: return s.jobCRUD(resp, req, path) } @@ -241,3 +244,24 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, setIndex(resp, out.Index) return out, nil } + +func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) { + args := structs.JobSummaryRequest{ + JobID: name, + } + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.JobSummaryResponse + if err := s.agent.RPC("Job.Summary", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if out.JobSummary == nil { + return nil, CodedError(404, "job not found") + } + setIndex(resp, out.Index) + return out.JobSummary, nil +} diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index a63739a18bf..a9fe426bf2a 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -129,6 +129,9 @@ func TestHTTP_NodeForceEval(t *testing.T) { state := s.Agent.server.State() alloc1 := mock.Alloc() alloc1.NodeID = node.ID + if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) if err != nil { t.Fatalf("err: %v", err) @@ -177,6 +180,9 @@ func TestHTTP_NodeAllocations(t *testing.T) { state := s.Agent.server.State() alloc1 := mock.Alloc() alloc1.NodeID = node.ID + if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) if err != nil { t.Fatalf("err: %v", err) @@ -231,6 +237,9 @@ func TestHTTP_NodeDrain(t *testing.T) { state := s.Agent.server.State() alloc1 := mock.Alloc() alloc1.NodeID = node.ID + if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) if err != nil { t.Fatalf("err: %v", err) diff --git a/command/status.go b/command/status.go index 9d22afcb584..825d805c4db 100644 --- a/command/status.go +++ b/command/status.go @@ -246,6 +246,29 @@ func (c *StatusCommand) outputJobInfo(client *api.Client, job *api.Job) error { return fmt.Errorf("Error querying job evaluations: %s", err) } + // Query the summary + summary, _, err := client.Jobs().Summary(job.ID, nil) + if err != nil { + return fmt.Errorf("Error querying job summary: %s", err) + } + + // Format the summary + c.Ui.Output(c.Colorize().Color("\n[bold]Summary[reset]")) + if summary != nil { + summaries := make([]string, len(summary.Summary)+1) + summaries[0] = "Task Group|Queued|Starting|Running|Failed|Complete|Lost" + idx := 1 + for tg, tgs := range summary.Summary { + summaries[idx] = fmt.Sprintf("%s|%d|%d|%d|%d|%d|%d", + tg, tgs.Queued, tgs.Starting, + tgs.Running, tgs.Failed, + tgs.Complete, tgs.Lost, + ) + idx += 1 + } + c.Ui.Output(formatList(summaries)) + } + // Determine latest evaluation with failures whose follow up hasn't // completed, this is done while formatting var latestFailedPlacement *api.Evaluation diff --git a/command/status_test.go b/command/status_test.go index c4288bcaf1d..270d913571b 100644 --- a/command/status_test.go +++ b/command/status_test.go @@ -63,6 +63,9 @@ func TestStatusCommand_Run(t *testing.T) { if !strings.Contains(out, "Allocations") { t.Fatalf("should dump allocations") } + if !strings.Contains(out, "Summary") { + t.Fatalf("should dump summary") + } ui.OutputWriter.Reset() // Query a single job showing evals diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index 263dc7d4ca6..3af23f322e3 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -19,9 +19,13 @@ func TestAllocEndpoint_List(t *testing.T) { // Create the register request alloc := mock.Alloc() + summary := mock.JobSummary(alloc.JobID) state := s1.fsm.State() - err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) - if err != nil { + + if err := state.UpsertJobSummary(999, summary); err != nil { + t.Fatalf("err: %v", err) + } + if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } @@ -75,6 +79,10 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) { // Create the alloc alloc := mock.Alloc() + summary := mock.JobSummary(alloc.JobID) + if err := state.UpsertJobSummary(1, summary); err != nil { + t.Fatalf("err: %v", err) + } // Upsert alloc triggers watches time.AfterFunc(100*time.Millisecond, func() { if err := state.UpsertAllocs(2, []*structs.Allocation{alloc}); err != nil { @@ -109,12 +117,13 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) { alloc2.ID = alloc.ID alloc2.ClientStatus = structs.AllocClientStatusRunning time.AfterFunc(100*time.Millisecond, func() { - if err := state.UpdateAllocsFromClient(3, []*structs.Allocation{alloc2}); err != nil { + state.UpsertJobSummary(3, mock.JobSummary(alloc2.JobID)) + if err := state.UpdateAllocsFromClient(4, []*structs.Allocation{alloc2}); err != nil { t.Fatalf("err: %v", err) } }) - req.MinQueryIndex = 2 + req.MinQueryIndex = 3 start = time.Now() var resp2 structs.AllocListResponse if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp2); err != nil { @@ -124,8 +133,8 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) { if elapsed := time.Since(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) } - if resp2.Index != 3 { - t.Fatalf("Bad index: %d %d", resp2.Index, 3) + if resp2.Index != 4 { + t.Fatalf("Bad index: %d %d", resp2.Index, 4) } if len(resp2.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID || resp2.Allocations[0].ClientStatus != structs.AllocClientStatusRunning { @@ -142,6 +151,7 @@ func TestAllocEndpoint_GetAlloc(t *testing.T) { // Create the register request alloc := mock.Alloc() state := s1.fsm.State() + state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) if err != nil { t.Fatalf("err: %v", err) @@ -178,6 +188,7 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) { // First create an unrelated alloc time.AfterFunc(100*time.Millisecond, func() { + state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)) err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}) if err != nil { t.Fatalf("err: %v", err) @@ -186,6 +197,7 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) { // Create the alloc we are watching later time.AfterFunc(200*time.Millisecond, func() { + state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) err := state.UpsertAllocs(200, []*structs.Allocation{alloc2}) if err != nil { t.Fatalf("err: %v", err) @@ -227,6 +239,8 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) { alloc := mock.Alloc() alloc2 := mock.Alloc() state := s1.fsm.State() + state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) + state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) err := state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2}) if err != nil { t.Fatalf("err: %v", err) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index bf1db2eabb8..6ba4a8816bb 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -18,6 +18,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) { state := s1.fsm.State() eval := mock.Eval() eval.Status = structs.EvalStatusFailed + state.UpsertJobSummary(999, mock.JobSummary(eval.JobID)) err := state.UpsertEvals(1000, []*structs.Evaluation{eval}) if err != nil { t.Fatalf("err: %v", err) @@ -27,6 +28,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) { alloc := mock.Alloc() alloc.EvalID = eval.ID alloc.DesiredStatus = structs.AllocDesiredStatusStop + alloc.JobID = eval.JobID err = state.UpsertAllocs(1001, []*structs.Allocation{alloc}) if err != nil { t.Fatalf("err: %v", err) @@ -157,6 +159,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) { state := s1.fsm.State() eval := mock.Eval() eval.Status = structs.EvalStatusComplete + state.UpsertJobSummary(999, mock.JobSummary(eval.JobID)) err := state.UpsertEvals(1000, []*structs.Evaluation{eval}) if err != nil { t.Fatalf("err: %v", err) @@ -166,7 +169,8 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) { alloc := mock.Alloc() alloc.EvalID = eval.ID alloc.DesiredStatus = structs.AllocDesiredStatusStop - err = state.UpsertAllocs(1001, []*structs.Allocation{alloc}) + state.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID)) + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc}) if err != nil { t.Fatalf("err: %v", err) } @@ -174,7 +178,8 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) { // Insert "running" alloc alloc2 := mock.Alloc() alloc2.EvalID = eval.ID - err = state.UpsertAllocs(1002, []*structs.Allocation{alloc2}) + state.UpsertJobSummary(1003, mock.JobSummary(alloc2.JobID)) + err = state.UpsertAllocs(1004, []*structs.Allocation{alloc2}) if err != nil { t.Fatalf("err: %v", err) } @@ -233,6 +238,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) { state := s1.fsm.State() eval := mock.Eval() eval.Status = structs.EvalStatusFailed + state.UpsertJobSummary(999, mock.JobSummary(eval.JobID)) err := state.UpsertEvals(1000, []*structs.Evaluation{eval}) if err != nil { t.Fatalf("err: %v", err) @@ -242,7 +248,8 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) { alloc := mock.Alloc() alloc.EvalID = eval.ID alloc.DesiredStatus = structs.AllocDesiredStatusStop - err = state.UpsertAllocs(1001, []*structs.Allocation{alloc}) + state.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID)) + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc}) if err != nil { t.Fatalf("err: %v", err) } @@ -255,7 +262,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) { core := NewCoreScheduler(s1, snap) // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobForceGC, 1001) + gc := s1.coreJobEval(structs.CoreJobForceGC, 1002) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) @@ -338,7 +345,8 @@ func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) { // Insert a terminal alloc on that node alloc := mock.Alloc() alloc.DesiredStatus = structs.AllocDesiredStatusStop - if err := state.UpsertAllocs(1001, []*structs.Allocation{alloc}); err != nil { + state.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID)) + if err := state.UpsertAllocs(1002, []*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } @@ -389,7 +397,8 @@ func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) { alloc.NodeID = node.ID alloc.DesiredStatus = structs.AllocDesiredStatusRun alloc.ClientStatus = structs.AllocClientStatusRunning - if err := state.UpsertAllocs(1001, []*structs.Allocation{alloc}); err != nil { + state.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID)) + if err := state.UpsertAllocs(1002, []*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 668c4465bab..6524180b5f1 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -500,6 +500,8 @@ func TestEvalEndpoint_Allocations(t *testing.T) { alloc2 := mock.Alloc() alloc2.EvalID = alloc1.EvalID state := s1.fsm.State() + state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)) + state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}) if err != nil { @@ -537,6 +539,7 @@ func TestEvalEndpoint_Allocations_Blocking(t *testing.T) { // Upsert an unrelated alloc first time.AfterFunc(100*time.Millisecond, func() { + state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)) err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}) if err != nil { t.Fatalf("err: %v", err) @@ -545,6 +548,7 @@ func TestEvalEndpoint_Allocations_Blocking(t *testing.T) { // Upsert an alloc which will trigger the watch later time.AfterFunc(200*time.Millisecond, func() { + state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID)) err := state.UpsertAllocs(200, []*structs.Allocation{alloc2}) if err != nil { t.Fatalf("err: %v", err) diff --git a/nomad/fsm.go b/nomad/fsm.go index f6af44004ba..9a0884d07f1 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -9,6 +9,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/raft" "github.com/ugorji/go/codec" ) @@ -410,6 +411,14 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } + // Updating the allocs with the job id and task group name + for _, alloc := range req.Alloc { + if existing, _ := n.state.AllocByID(alloc.ID); existing != nil { + alloc.JobID = existing.JobID + alloc.TaskGroup = existing.TaskGroup + } + } + // Update all the client allocations if err := n.state.UpdateAllocsFromClient(index, req.Alloc); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpdateAllocFromClient failed: %v", err) @@ -569,7 +578,83 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { } } - // Commit the state restore + // Create Job Summaries + // The entire snapshot has to be restored first before we create the missing + // job summaries so that the indexes are updated and we know the highest + // index + // COMPAT 0.4 -> 0.4.1 + jobs, err := restore.JobsWithoutSummary() + if err != nil { + fmt.Errorf("error retreiving jobs during restore: %v", err) + } + if err := restore.CreateJobSummaries(jobs); err != nil { + return fmt.Errorf("error creating job summaries: %v", err) + } + + restore.Commit() + + // Reconciling the queued allocations + return n.reconcileSummaries(jobs) +} + +// reconcileSummaries re-calculates the queued allocations for every job that we +// created a Job Summary during the snap shot restore +func (n *nomadFSM) reconcileSummaries(jobs []*structs.Job) error { + // Start the state restore + restore, err := n.state.Restore() + if err != nil { + return err + } + defer restore.Abort() + snap, err := n.state.Snapshot() + if err != nil { + return fmt.Errorf("unable to create snapshot: %v", err) + } + for _, job := range jobs { + planner := &scheduler.Harness{ + State: &snap.StateStore, + } + // Create an eval and mark it as requiring annotations and insert that as well + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + Type: job.Type, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + JobModifyIndex: job.JobModifyIndex + 1, + Status: structs.EvalStatusPending, + AnnotatePlan: true, + } + + // Create the scheduler and run it + sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner) + if err != nil { + return err + } + + if err := sched.Process(eval); err != nil { + return err + } + summary, err := snap.JobSummaryByID(job.ID) + if err != nil { + return err + } + if l := len(planner.Evals); l != 1 { + return fmt.Errorf("unexpected number of evals during restore %d. Please file an issue including the logs", l) + } + for tg, queued := range planner.Evals[0].QueuedAllocations { + tgSummary, ok := summary.Summary[tg] + if !ok { + return fmt.Errorf("task group %q not found while updating queued count", tg) + } + tgSummary.Queued = queued + summary.Summary[tg] = tgSummary + } + + if err := restore.JobSummaryRestore(summary); err != nil { + return err + } + } restore.Commit() return nil } @@ -801,7 +886,7 @@ func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink, break } - jobSummary := raw.(*structs.JobSummary) + jobSummary := raw.(structs.JobSummary) sink.Write([]byte{byte(JobSummarySnapshot)}) if err := encoder.Encode(jobSummary); err != nil { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 897a975debc..ddf92dd7724 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -489,6 +489,7 @@ func TestFSM_UpsertAllocs(t *testing.T) { fsm := testFSM(t) alloc := mock.Alloc() + fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) req := structs.AllocUpdateRequest{ Alloc: []*structs.Allocation{alloc}, } @@ -544,6 +545,7 @@ func TestFSM_UpsertAllocs_SharedJob(t *testing.T) { fsm := testFSM(t) alloc := mock.Alloc() + fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) job := alloc.Job alloc.Job = nil req := structs.AllocUpdateRequest{ @@ -611,6 +613,7 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) { fsm := testFSM(t) alloc := mock.Alloc() + fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) job := alloc.Job resources := alloc.Resources alloc.Resources = nil @@ -667,7 +670,9 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { alloc.NodeID = node.ID alloc2 := mock.Alloc() alloc2.NodeID = node.ID - state.UpsertAllocs(1, []*structs.Allocation{alloc, alloc2}) + state.UpsertJobSummary(8, mock.JobSummary(alloc.JobID)) + state.UpsertJobSummary(9, mock.JobSummary(alloc2.JobID)) + state.UpsertAllocs(10, []*structs.Allocation{alloc, alloc2}) clientAlloc := new(structs.Allocation) *clientAlloc = *alloc @@ -730,7 +735,8 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { state := fsm.State() alloc := mock.Alloc() - state.UpsertAllocs(1, []*structs.Allocation{alloc}) + state.UpsertJobSummary(9, mock.JobSummary(alloc.JobID)) + state.UpsertAllocs(10, []*structs.Allocation{alloc}) clientAlloc := new(structs.Allocation) *clientAlloc = *alloc @@ -757,7 +763,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { clientAlloc.CreateIndex = out.CreateIndex clientAlloc.ModifyIndex = out.ModifyIndex if !reflect.DeepEqual(clientAlloc, out) { - t.Fatalf("bad: %#v %#v", clientAlloc, out) + t.Fatalf("err: %#v,%#v", clientAlloc, out) } } @@ -857,8 +863,10 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) { fsm := testFSM(t) state := fsm.State() alloc1 := mock.Alloc() - state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) alloc2 := mock.Alloc() + state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)) + state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) + state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) state.UpsertAllocs(1001, []*structs.Allocation{alloc2}) // Verify the contents @@ -958,9 +966,38 @@ func TestFSM_SnapshotRestore_JobSummary(t *testing.T) { out1, _ := state2.JobSummaryByID(job1.ID) out2, _ := state2.JobSummaryByID(job2.ID) if !reflect.DeepEqual(js1, out1) { - t.Fatalf("bad: \n%#v\n%#v", js1, job1) + t.Fatalf("bad: \n%#v\n%#v", js1, out1) } if !reflect.DeepEqual(js2, out2) { - t.Fatalf("bad: \n%#v\n%#v", js2, job2) + t.Fatalf("bad: \n%#v\n%#v", js2, out2) + } +} + +func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { + // Add some state + fsm := testFSM(t) + state := fsm.State() + + job1 := mock.Job() + state.UpsertJob(1000, job1) + state.DeleteJobSummary(1010, job1.ID) + + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + latestIndex, _ := state.LatestIndex() + out1, _ := state2.JobSummaryByID(job1.ID) + expected := structs.JobSummary{ + JobID: job1.ID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{ + Queued: 10, + }, + }, + CreateIndex: latestIndex, + ModifyIndex: latestIndex, + } + + if !reflect.DeepEqual(&expected, out1) { + t.Fatalf("expected: %#v, actual: %#v", &expected, out1) } } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 455d1448736..849e6e42698 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -113,6 +113,50 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return nil } +// Summary retreives the summary of a job +func (j *Job) Summary(args *structs.JobSummaryRequest, + reply *structs.JobSummaryResponse) error { + if done, err := j.srv.forward("Job.Summary", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job_summary", "get_job_summary"}, time.Now()) + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + watch: watch.NewItems(watch.Item{JobSummary: args.JobID}), + run: func() error { + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + // Look for job summary + out, err := snap.JobSummaryByID(args.JobID) + if err != nil { + return err + } + + // Setup the output + reply.JobSummary = out + if out != nil { + reply.Index = out.ModifyIndex + } else { + // Use the last index that affected the job_summary table + index, err := snap.Index("job_summary") + if err != nil { + return err + } + reply.Index = index + } + + // Set the query response + j.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return j.srv.blockingRPC(&opts) +} + // Evaluate is used to force a job for re-evaluation func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error { if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done { @@ -322,7 +366,11 @@ func (j *Job) List(args *structs.JobListRequest, break } job := raw.(*structs.Job) - jobs = append(jobs, job.Stub()) + summary, err := snap.JobSummaryByID(job.ID) + if err != nil { + return fmt.Errorf("unable to look up summary for job: %v", job.ID) + } + jobs = append(jobs, job.Stub(summary)) } reply.Jobs = jobs diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index da69a684e7d..d05785438ab 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -713,6 +713,146 @@ func TestJobEndpoint_GetJob(t *testing.T) { } } +func TestJobEndpoint_GetJobSummary(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + job.CreateIndex = resp.JobModifyIndex + job.ModifyIndex = resp.JobModifyIndex + job.JobModifyIndex = resp.JobModifyIndex + + // Lookup the job summary + get := &structs.JobSummaryRequest{ + JobID: job.ID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp2 structs.JobSummaryResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Summary", get, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + if resp2.Index != resp.JobModifyIndex { + t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index) + } + + expectedJobSummary := structs.JobSummary{ + JobID: job.ID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{}, + }, + CreateIndex: job.CreateIndex, + ModifyIndex: job.CreateIndex, + } + + if !reflect.DeepEqual(resp2.JobSummary, &expectedJobSummary) { + t.Fatalf("exptected: %v, actual: %v", expectedJobSummary, resp2.JobSummary) + } +} + +func TestJobEndpoint_GetJobSummary_Blocking(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create a job and insert it + job1 := mock.Job() + time.AfterFunc(200*time.Millisecond, func() { + if err := state.UpsertJob(100, job1); err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Ensure the job summary request gets fired + req := &structs.JobSummaryRequest{ + JobID: job1.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 50, + }, + } + var resp structs.JobSummaryResponse + start := time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Job.Summary", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + + // Upsert an allocation for the job which should trigger the watch. + time.AfterFunc(200*time.Millisecond, func() { + alloc := mock.Alloc() + alloc.JobID = job1.ID + alloc.Job = job1 + if err := state.UpsertAllocs(200, []*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + }) + req = &structs.JobSummaryRequest{ + JobID: job1.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 199, + }, + } + start = time.Now() + var resp1 structs.JobSummaryResponse + start = time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Job.Summary", req, &resp1); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp1.Index != 200 { + t.Fatalf("Bad index: %d %d", resp.Index, 200) + } + if resp1.JobSummary == nil { + t.Fatalf("bad: %#v", resp) + } + + // Job delete fires watches + time.AfterFunc(100*time.Millisecond, func() { + if err := state.DeleteJob(300, job1.ID); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req.QueryOptions.MinQueryIndex = 250 + start = time.Now() + + var resp2 structs.SingleJobResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Summary", req, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } + if resp2.Index != 300 { + t.Fatalf("Bad index: %d %d", resp2.Index, 300) + } + if resp2.Job != nil { + t.Fatalf("bad: %#v", resp2.Job) + } +} + func TestJobEndpoint_GetJob_Blocking(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() @@ -915,6 +1055,8 @@ func TestJobEndpoint_Allocations(t *testing.T) { alloc2 := mock.Alloc() alloc2.JobID = alloc1.JobID state := s1.fsm.State() + state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)) + state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}) if err != nil { @@ -953,6 +1095,7 @@ func TestJobEndpoint_Allocations_Blocking(t *testing.T) { // First upsert an unrelated alloc time.AfterFunc(100*time.Millisecond, func() { + state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)) err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}) if err != nil { t.Fatalf("err: %v", err) @@ -961,6 +1104,7 @@ func TestJobEndpoint_Allocations_Blocking(t *testing.T) { // Upsert an alloc for the job we are interested in later time.AfterFunc(200*time.Millisecond, func() { + state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID)) err := state.UpsertAllocs(200, []*structs.Allocation{alloc2}) if err != nil { t.Fatalf("err: %v", err) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 7c9b4fb46e4..8a449197e8b 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -231,6 +231,19 @@ func Eval() *structs.Evaluation { return eval } +func JobSummary(jobID string) *structs.JobSummary { + js := &structs.JobSummary{ + JobID: jobID, + Summary: map[string]structs.TaskGroupSummary{ + "web": { + Queued: 0, + Starting: 0, + }, + }, + } + return js +} + func Alloc() *structs.Allocation { alloc := &structs.Allocation{ ID: structs.GenerateUUID(), diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index e8ad81a6aa8..9f43c020968 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -482,8 +482,10 @@ func TestClientEndpoint_GetNode(t *testing.T) { t.Fatalf("bad ComputedClass: %#v", resp2.Node) } + // Update the status updated at value + node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt if !reflect.DeepEqual(node, resp2.Node) { - t.Fatalf("bad: %#v %#v", node, resp2.Node) + t.Fatalf("bad: %#v \n %#v", node, resp2.Node) } // Lookup non-existing node @@ -625,6 +627,7 @@ func TestClientEndpoint_GetAllocs(t *testing.T) { alloc := mock.Alloc() alloc.NodeID = node.ID state := s1.fsm.State() + state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) err := state.UpsertAllocs(100, []*structs.Allocation{alloc}) if err != nil { t.Fatalf("err: %v", err) @@ -685,6 +688,7 @@ func TestClientEndpoint_GetClientAllocs(t *testing.T) { alloc := mock.Alloc() alloc.NodeID = node.ID state := s1.fsm.State() + state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) err := state.UpsertAllocs(100, []*structs.Allocation{alloc}) if err != nil { t.Fatalf("err: %v", err) @@ -746,6 +750,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { alloc := mock.Alloc() alloc.NodeID = node.ID state := s1.fsm.State() + state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) start := time.Now() time.AfterFunc(100*time.Millisecond, func() { err := state.UpsertAllocs(100, []*structs.Allocation{alloc}) @@ -787,6 +792,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { allocUpdate.NodeID = alloc.NodeID allocUpdate.ID = alloc.ID allocUpdate.ClientStatus = structs.AllocClientStatusRunning + state.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID)) err := state.UpsertAllocs(200, []*structs.Allocation{allocUpdate}) if err != nil { t.Fatalf("err: %v", err) @@ -835,6 +841,7 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) { alloc := mock.Alloc() alloc.NodeID = node.ID state := s1.fsm.State() + state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) start := time.Now() time.AfterFunc(100*time.Millisecond, func() { err := state.UpsertAllocs(100, []*structs.Allocation{alloc}) @@ -876,6 +883,7 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) { allocUpdate.NodeID = alloc.NodeID allocUpdate.ID = alloc.ID allocUpdate.ClientStatus = structs.AllocClientStatusRunning + state.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID)) err := state.UpdateAllocsFromClient(200, []*structs.Allocation{allocUpdate}) if err != nil { t.Fatalf("err: %v", err) @@ -922,6 +930,7 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { alloc := mock.Alloc() alloc.NodeID = node.ID state := s1.fsm.State() + state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) err := state.UpsertAllocs(100, []*structs.Allocation{alloc}) if err != nil { t.Fatalf("err: %v", err) @@ -982,6 +991,7 @@ func TestClientEndpoint_BatchUpdate(t *testing.T) { alloc := mock.Alloc() alloc.NodeID = node.ID state := s1.fsm.State() + state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) err := state.UpsertAllocs(100, []*structs.Allocation{alloc}) if err != nil { t.Fatalf("err: %v", err) @@ -1021,13 +1031,14 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) { // Inject fake evaluations alloc := mock.Alloc() state := s1.fsm.State() - if err := state.UpsertAllocs(1, []*structs.Allocation{alloc}); err != nil { + state.UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) + if err := state.UpsertAllocs(2, []*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } // Inject a fake system job. job := mock.SystemJob() - if err := state.UpsertJob(1, job); err != nil { + if err := state.UpsertJob(3, job); err != nil { t.Fatalf("err: %v", err) } @@ -1115,7 +1126,8 @@ func TestClientEndpoint_Evaluate(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - err = state.UpsertAllocs(2, []*structs.Allocation{alloc}) + state.UpsertJobSummary(2, mock.JobSummary(alloc.JobID)) + err = state.UpsertAllocs(3, []*structs.Allocation{alloc}) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 8fde59f9e5a..2584556b0b7 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -40,6 +40,23 @@ func testRegisterNode(t *testing.T, s *Server, n *structs.Node) { } } +func testRegisterJob(t *testing.T, s *Server, j *structs.Job) { + // Create the register request + req := &structs.JobRegisterRequest{ + Job: j, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := s.RPC("Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } +} + func TestPlanApply_applyPlan(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() @@ -51,6 +68,7 @@ func TestPlanApply_applyPlan(t *testing.T) { // Register alloc alloc := mock.Alloc() + s1.State().UpsertJobSummary(1000, mock.JobSummary(alloc.JobID)) plan := &structs.PlanResult{ NodeAllocation: map[string][]*structs.Allocation{ node.ID: []*structs.Allocation{alloc}, @@ -100,6 +118,7 @@ func TestPlanApply_applyPlan(t *testing.T) { allocEvict.Job = nil alloc2 := mock.Alloc() alloc2.Job = nil + s1.State().UpsertJobSummary(1500, mock.JobSummary(alloc2.JobID)) plan = &structs.PlanResult{ NodeUpdate: map[string][]*structs.Allocation{ node.ID: []*structs.Allocation{allocEvict}, @@ -362,12 +381,15 @@ func TestPlanApply_EvalNodePlan_NodeFull(t *testing.T) { alloc.NodeID = node.ID node.Resources = alloc.Resources node.Reserved = nil + state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) state.UpsertNode(1000, node) state.UpsertAllocs(1001, []*structs.Allocation{alloc}) - snap, _ := state.Snapshot() alloc2 := mock.Alloc() alloc2.NodeID = node.ID + state.UpsertJobSummary(1200, mock.JobSummary(alloc2.JobID)) + + snap, _ := state.Snapshot() plan := &structs.Plan{ NodeAllocation: map[string][]*structs.Allocation{ node.ID: []*structs.Allocation{alloc2}, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index cb808803ff4..521519300b7 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -85,6 +85,33 @@ func (s *StateStore) StopWatch(items watch.Items, notify chan struct{}) { s.watch.stopWatch(items, notify) } +// UpsertJobSummary upserts a job summary into the state store. This is for +// testing purposes +func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error { + txn := s.db.Txn(true) + defer txn.Abort() + + if err := txn.Insert("job_summary", *jobSummary); err != nil { + return err + } + txn.Commit() + return nil +} + +// DeleteJobSummary deletes the job summary with the given ID. This is for +// testing purposes only. +func (s *StateStore) DeleteJobSummary(index uint64, id string) error { + txn := s.db.Txn(true) + defer txn.Abort() + + // Delete the job summary + if _, err := txn.DeleteAll("job_summary", "id", id); err != nil { + return fmt.Errorf("deleting job summary failed: %v", err) + } + txn.Commit() + return nil +} + // UpsertNode is used to register a node or update a node definition // This is assumed to be triggered by the client, so we retain the value // of drain which is set by the scheduler. @@ -204,6 +231,9 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error if alloc.ClientStatus == structs.AllocClientStatusPending || alloc.ClientStatus == structs.AllocClientStatusRunning { copyAlloc.ClientStatus = structs.AllocClientStatusLost + if err := s.updateSummaryWithAlloc(index, copyAlloc, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } if err := txn.Insert("allocs", copyAlloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } @@ -336,6 +366,10 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { } } + if err := s.updateSummaryWithJob(index, job, watcher, txn); err != nil { + return fmt.Errorf("unable to create job summary: %v", err) + } + // Insert the job if err := txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -344,11 +378,6 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("index update failed: %v", err) } - // Update the job summary - if err := s.updateSummaryWithJob(job, txn); err != nil { - return fmt.Errorf("job summary update failed: %v", err) - } - txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -371,6 +400,8 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { watcher := watch.NewItems() watcher.Add(watch.Item{Table: "jobs"}) watcher.Add(watch.Item{Job: jobID}) + watcher.Add(watch.Item{Table: "job_summary"}) + watcher.Add(watch.Item{JobSummary: jobID}) // Delete the node if err := txn.Delete("jobs", existing); err != nil { @@ -384,6 +415,9 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { if _, err = txn.DeleteAll("job_summary", "id", jobID); err != nil { return fmt.Errorf("deleing job summary failed: %v", err) } + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() @@ -474,7 +508,8 @@ func (s *StateStore) JobSummaryByID(jobID string) (*structs.JobSummary, error) { return nil, err } if existing != nil { - return existing.(*structs.JobSummary), nil + summary := existing.(structs.JobSummary) + return summary.Copy(), nil } return nil, nil @@ -646,6 +681,36 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct eval.ModifyIndex = index } + // Update the job summary + summaryRaw, err := txn.First("job_summary", "id", eval.JobID) + if err != nil { + return fmt.Errorf("job summary lookup failed: %v", err) + } + if summaryRaw != nil { + js := summaryRaw.(structs.JobSummary) + var hasSummaryChanged bool + for tg, num := range eval.QueuedAllocations { + if summary, ok := js.Summary[tg]; ok && summary.Queued != num { + summary.Queued = num + js.Summary[tg] = summary + hasSummaryChanged = true + } else { + s.logger.Printf("[ERR] state_store: unable to update queued for job %q and task group %q", eval.JobID, tg) + } + } + + // Insert the job summary + if hasSummaryChanged { + js.ModifyIndex = index + if err := txn.Insert("job_summary", js); err != nil { + return fmt.Errorf("job summary insert failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + } + } + // Insert the eval if err := txn.Insert("evals", eval); err != nil { return fmt.Errorf("eval insert failed: %v", err) @@ -792,6 +857,9 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo // Handle each of the updated allocations for _, alloc := range allocs { + if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } if err := s.nestedUpdateAllocFromClient(txn, watcher, index, alloc); err != nil { return err } @@ -821,11 +889,6 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, watcher watch.I } exist := existing.(*structs.Allocation) - // Update the job summary - if err := s.updateSummaryWithAlloc(alloc, exist, txn); err != nil { - return fmt.Errorf("unable to update job summary: %v", err) - } - // Trigger the watcher watcher.Add(watch.Item{Alloc: alloc.ID}) watcher.Add(watch.Item{AllocEval: exist.EvalID}) @@ -873,15 +936,15 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er // Handle the allocations jobs := make(map[string]string, 1) for _, alloc := range allocs { + if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { return fmt.Errorf("alloc lookup failed: %v", err) } exist, _ := existing.(*structs.Allocation) - if err := s.updateSummaryWithAlloc(alloc, exist, txn); err != nil { - return fmt.Errorf("updating job summary failed: %v", err) - } if exist == nil { alloc.CreateIndex = index alloc.ModifyIndex = index @@ -1215,16 +1278,21 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b // updateSummaryWithJob creates or updates job summaries when new jobs are // upserted or existing ones are updated -func (s *StateStore) updateSummaryWithJob(job *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, + watcher watch.Items, txn *memdb.Txn) error { + existing, err := s.JobSummaryByID(job.ID) if err != nil { return fmt.Errorf("unable to retrieve summary for job: %v", err) } + var hasSummaryChanged bool if existing == nil { existing = &structs.JobSummary{ - JobID: job.ID, - Summary: make(map[string]structs.TaskGroupSummary), + JobID: job.ID, + Summary: make(map[string]structs.TaskGroupSummary), + CreateIndex: index, } + hasSummaryChanged = true } for _, tg := range job.TaskGroups { if _, ok := existing.Summary[tg.Name]; !ok { @@ -1235,53 +1303,74 @@ func (s *StateStore) updateSummaryWithJob(job *structs.Job, txn *memdb.Txn) erro Starting: 0, } existing.Summary[tg.Name] = newSummary + hasSummaryChanged = true } } - if err := txn.Insert("job_summary", existing); err != nil { - return err + // The job summary has changed, so add to watcher and update the modify + // index. + if hasSummaryChanged { + existing.ModifyIndex = index + watcher.Add(watch.Item{Table: "job_summary"}) + watcher.Add(watch.Item{JobSummary: job.ID}) + + // Update the indexes table for job summary + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + if err := txn.Insert("job_summary", *existing); err != nil { + return err + } } + return nil } // updateSummaryWithAlloc updates the job summary when allocations are updated // or inserted -func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation, - existingAlloc *structs.Allocation, txn *memdb.Txn) error { - - existing, err := s.JobSummaryByID(newAlloc.JobID) +func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation, + watcher watch.Items, txn *memdb.Txn) error { + summaryRaw, err := txn.First("job_summary", "id", alloc.JobID) if err != nil { - return fmt.Errorf("lookup of job summary failed: %v", err) + return fmt.Errorf("unable to lookup job summary for job id %q: %v", err) + } + jobSummary, ok := summaryRaw.(structs.JobSummary) + if !ok { + return fmt.Errorf("job summary for job %q is not present", alloc.JobID) } - // If we can't find an existing job summary entry then we are not going to create a - // new job summary entry for an allocation with that job id since we don't - // know the task group counts for that job - // TODO May be we can query the job and scan all the allocations for that - // job and create the summary before applying the change of summary state - // that this allocation would cause. - if existing == nil { - return nil + currentJSModifyIndex := jobSummary.ModifyIndex + // Look for existing alloc + existing, err := s.AllocByID(alloc.ID) + if err != nil { + return fmt.Errorf("alloc lookup failed: %v", err) } - tgSummary, ok := existing.Summary[newAlloc.TaskGroup] + tgSummary, ok := jobSummary.Summary[alloc.TaskGroup] if !ok { - return nil + return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup) } - if existingAlloc == nil { - switch newAlloc.DesiredStatus { + if existing == nil { + switch alloc.DesiredStatus { case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: - s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", newAlloc.DesiredStatus) + s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", + alloc.ID, alloc.DesiredStatus) } - switch newAlloc.ClientStatus { + switch alloc.ClientStatus { case structs.AllocClientStatusPending: tgSummary.Starting += 1 - case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: - s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", newAlloc.ClientStatus) + if tgSummary.Queued > 0 { + tgSummary.Queued -= 1 + } + jobSummary.ModifyIndex = index + case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, + structs.AllocClientStatusComplete: + s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", + alloc.ID, alloc.ClientStatus) } - } else if existingAlloc.ClientStatus != newAlloc.ClientStatus { + } else if existing.ClientStatus != alloc.ClientStatus { // Incrementing the client of the bin of the current state - switch newAlloc.ClientStatus { + switch alloc.ClientStatus { case structs.AllocClientStatusRunning: tgSummary.Running += 1 case structs.AllocClientStatusFailed: @@ -1295,24 +1384,35 @@ func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation, } // Decrementing the count of the bin of the last state - switch existingAlloc.ClientStatus { + switch existing.ClientStatus { case structs.AllocClientStatusRunning: tgSummary.Running -= 1 - case structs.AllocClientStatusFailed: - tgSummary.Failed -= 1 case structs.AllocClientStatusPending: tgSummary.Starting -= 1 - case structs.AllocClientStatusComplete: - tgSummary.Complete -= 1 case structs.AllocClientStatusLost: tgSummary.Lost -= 1 + case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: + s.logger.Printf("[ERR] state_store: invalid old state of allocation with id:%v, and state: %v", + existing.ID, existing.ClientStatus) } + jobSummary.ModifyIndex = index } + jobSummary.Summary[alloc.TaskGroup] = tgSummary + + if currentJSModifyIndex < jobSummary.ModifyIndex { + watcher.Add(watch.Item{Table: "job_summary"}) + watcher.Add(watch.Item{JobSummary: alloc.JobID}) - existing.Summary[newAlloc.TaskGroup] = tgSummary - if err := txn.Insert("job_summary", existing); err != nil { - return fmt.Errorf("inserting job summary failed: %v", err) + // Update the indexes table for job summary + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + if err := txn.Insert("job_summary", jobSummary); err != nil { + return fmt.Errorf("updating job summary failed: %v", err) + } } + return nil } @@ -1325,9 +1425,10 @@ type StateSnapshot struct { // restoring state by only using a single large transaction // instead of thousands of sub transactions type StateRestore struct { - txn *memdb.Txn - watch *stateWatch - items watch.Items + txn *memdb.Txn + watch *stateWatch + items watch.Items + latestIndex uint64 } // Abort is used to abort the restore operation @@ -1389,6 +1490,10 @@ func (r *StateRestore) IndexRestore(idx *IndexEntry) error { if err := r.txn.Insert("index", idx); err != nil { return fmt.Errorf("index insert failed: %v", err) } + + if idx.Value > r.latestIndex { + r.latestIndex = idx.Value + } return nil } @@ -1404,12 +1509,98 @@ func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) err // JobSummaryRestore is used to restore a job summary func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error { - if err := r.txn.Insert("job_summary", jobSummary); err != nil { + if err := r.txn.Insert("job_summary", *jobSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) } return nil } +// JobsWithoutSummary returns the list of jobs which don't have any summary +func (r *StateRestore) JobsWithoutSummary() ([]*structs.Job, error) { + // Get all the jobs + var jobs []*structs.Job + iter, err := r.txn.Get("jobs", "id") + if err != nil { + return nil, fmt.Errorf("couldn't retrieve jobs: %v", err) + } + for { + raw := iter.Next() + if raw == nil { + break + } + + // Filter the jobs which have summaries + job := raw.(*structs.Job) + jobSummary, err := r.txn.First("job_summary", "id", job.ID) + if err != nil { + return nil, fmt.Errorf("unable to get job summary: %v", err) + } + if jobSummary != nil { + continue + } + + jobs = append(jobs, job) + } + return jobs, nil +} + +// CreateJobSummaries computes the job summaries for all the jobs +func (r *StateRestore) CreateJobSummaries(jobs []*structs.Job) error { + for _, job := range jobs { + // Get all the allocations for the job + iter, err := r.txn.Get("allocs", "job", job.ID) + if err != nil { + return fmt.Errorf("couldn't retrieve allocations for job %v: %v", job.ID, err) + } + var allocs []*structs.Allocation + for { + raw := iter.Next() + if raw == nil { + break + } + allocs = append(allocs, raw.(*structs.Allocation)) + } + + // Create a job summary for the job + summary := structs.JobSummary{ + JobID: job.ID, + Summary: make(map[string]structs.TaskGroupSummary), + } + for _, tg := range job.TaskGroups { + summary.Summary[tg.Name] = structs.TaskGroupSummary{} + } + // Calculate the summary for the job + for _, alloc := range allocs { + if _, ok := summary.Summary[alloc.TaskGroup]; !ok { + summary.Summary[alloc.TaskGroup] = structs.TaskGroupSummary{} + } + tg := summary.Summary[alloc.TaskGroup] + switch alloc.ClientStatus { + case structs.AllocClientStatusFailed: + tg.Failed += 1 + case structs.AllocClientStatusLost: + tg.Lost += 1 + case structs.AllocClientStatusComplete: + tg.Complete += 1 + case structs.AllocClientStatusRunning: + tg.Running += 1 + case structs.AllocClientStatusPending: + tg.Starting += 1 + } + summary.Summary[alloc.TaskGroup] = tg + } + // Insert the job summary + + summary.CreateIndex = r.latestIndex + summary.ModifyIndex = r.latestIndex + if err := r.txn.Insert("job_summary", summary); err != nil { + return fmt.Errorf("error inserting job summary: %v", err) + } + } + + return nil +} + // stateWatch holds shared state for watching updates. This is // outside of StateStore so it can be shared with snapshots. type stateWatch struct { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1dc7f22b2f0..893045a7da4 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -142,14 +142,34 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { alloc.NodeID = node.ID alloc1.NodeID = node.ID alloc2.NodeID = node.ID - alloc.ClientStatus = structs.AllocClientStatusRunning - alloc1.ClientStatus = structs.AllocClientStatusFailed + alloc.ClientStatus = structs.AllocClientStatusPending + alloc1.ClientStatus = structs.AllocClientStatusPending alloc2.ClientStatus = structs.AllocClientStatusPending - + if err := state.UpsertJobSummary(990, mock.JobSummary(alloc.JobID)); err != nil { + t.Fatal(err) + } + if err := state.UpsertJobSummary(990, mock.JobSummary(alloc1.JobID)); err != nil { + t.Fatal(err) + } + if err := state.UpsertJobSummary(990, mock.JobSummary(alloc2.JobID)); err != nil { + t.Fatal(err) + } if err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc1, alloc2}); err != nil { t.Fatalf("err: %v", err) } - if err = state.UpdateNodeStatus(1003, node.ID, structs.NodeStatusDown); err != nil { + + // Change the state of the allocs to running and failed + newAlloc := new(structs.Allocation) + *newAlloc = *alloc + newAlloc.ClientStatus = structs.AllocClientStatusRunning + newAlloc1 := new(structs.Allocation) + *newAlloc1 = *alloc1 + newAlloc1.ClientStatus = structs.AllocClientStatusFailed + if err = state.UpdateAllocsFromClient(1003, []*structs.Allocation{newAlloc, newAlloc1}); err != nil { + t.Fatalf("err: %v", err) + } + + if err = state.UpdateNodeStatus(1004, node.ID, structs.NodeStatusDown); err != nil { t.Fatalf("err: %v", err) } @@ -177,6 +197,20 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { t.Fatalf("expected alloc status: %v, actual: %v", structs.AllocClientStatusLost, alloc2Out.ClientStatus) } + js1, _ := state.JobSummaryByID(alloc.JobID) + js2, _ := state.JobSummaryByID(alloc1.JobID) + js3, _ := state.JobSummaryByID(alloc2.JobID) + + if js1.Summary["web"].Lost != 1 { + t.Fatalf("expected: %v, got: %v", 1, js1.Summary["web"].Lost) + } + if js2.Summary["web"].Failed != 1 { + t.Fatalf("expected: %v, got: %v", 1, js2.Summary["web"].Failed) + } + if js3.Summary["web"].Lost != 1 { + t.Fatalf("expected: %v, got: %v", 1, js3.Summary["web"].Lost) + } + notify.verify(t) } @@ -1102,6 +1136,86 @@ func TestStateStore_RestoreJobSummary(t *testing.T) { } } +func TestStateStore_CreateJobSummaries(t *testing.T) { + state := testStateStore(t) + restore, err := state.Restore() + if err != nil { + t.Fatalf("err: %v", err) + } + // Restore a job + job := mock.Job() + if err := restore.JobRestore(job); err != nil { + t.Fatalf("err: %v", err) + } + + // Restore an Index + index := IndexEntry{ + Key: "Foo", + Value: 100, + } + + if err := restore.IndexRestore(&index); err != nil { + t.Fatalf("err: %v", err) + } + + // Restore an allocation + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.Job = job + if err := restore.AllocRestore(alloc); err != nil { + t.Fatalf("err: %v", err) + } + + // Create the job summaries + if err := restore.CreateJobSummaries([]*structs.Job{job}); err != nil { + t.Fatalf("err: %v", err) + } + restore.Commit() + + summary, err := state.JobSummaryByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + expected := structs.JobSummary{ + JobID: job.ID, + Summary: map[string]structs.TaskGroupSummary{ + "web": { + Starting: 1, + }, + }, + CreateIndex: 100, + ModifyIndex: 100, + } + + if !reflect.DeepEqual(summary, &expected) { + t.Fatalf("Bad: %#v %#v", summary, expected) + } +} + +func TestStateRestore_JobsWithoutSummaries(t *testing.T) { + state := testStateStore(t) + restore, err := state.Restore() + if err != nil { + t.Fatalf("err: %v", err) + } + // Restore a job + job := mock.Job() + if err := restore.JobRestore(job); err != nil { + t.Fatalf("err: %v", err) + } + + jobs, err := restore.JobsWithoutSummary() + if err != nil { + t.Fatalf("err: %v", err) + } + if len(jobs) != 1 { + t.Fatalf("expected: %v, actual: %v", 1, len(jobs)) + } + if !reflect.DeepEqual(job, jobs[0]) { + t.Fatalf("Bad: %#v %#v", job, jobs[0]) + } +} + func TestStateStore_Indexes(t *testing.T) { state := testStateStore(t) node := mock.Node() @@ -1286,6 +1400,10 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) { watch.Item{AllocNode: alloc1.NodeID}, watch.Item{AllocNode: alloc2.NodeID}) + state.UpsertJobSummary(900, mock.JobSummary(eval1.JobID)) + state.UpsertJobSummary(901, mock.JobSummary(eval2.JobID)) + state.UpsertJobSummary(902, mock.JobSummary(alloc1.JobID)) + state.UpsertJobSummary(903, mock.JobSummary(alloc2.JobID)) err := state.UpsertEvals(1000, []*structs.Evaluation{eval1, eval2}) if err != nil { t.Fatalf("err: %v", err) @@ -1540,6 +1658,8 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { watch.Item{AllocJob: alloc2.JobID}, watch.Item{AllocNode: alloc2.NodeID}) + state.UpsertJobSummary(900, mock.JobSummary(alloc.JobID)) + state.UpsertJobSummary(901, mock.JobSummary(alloc2.JobID)) if err := state.UpsertJob(999, alloc.Job); err != nil { t.Fatalf("err: %v", err) } @@ -1624,7 +1744,7 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { } tgSummary2 := summary2.Summary["web"] if tgSummary2.Running != 1 { - t.Fatalf("expected running: %v, actual: %v", 1, tgSummary2.Failed) + t.Fatalf("expected running: %v, actual: %v", 1, tgSummary2.Running) } notify.verify(t) @@ -1687,6 +1807,7 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) { func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc() + state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) if err := state.UpsertJob(999, alloc.Job); err != nil { t.Fatalf("err: %v", err) @@ -1709,6 +1830,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { alloc2 := mock.Alloc() alloc2.ID = alloc.ID alloc2.NodeID = alloc.NodeID + ".new" + state.UpsertJobSummary(1001, mock.JobSummary(alloc2.JobID)) notify := setupNotifyTest( state, @@ -1718,7 +1840,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { watch.Item{AllocJob: alloc2.JobID}, watch.Item{AllocNode: alloc2.NodeID}) - err = state.UpsertAllocs(1001, []*structs.Allocation{alloc2}) + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc2}) if err != nil { t.Fatalf("err: %v", err) } @@ -1735,7 +1857,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { if out.CreateIndex != 1000 { t.Fatalf("bad: %#v", out) } - if out.ModifyIndex != 1001 { + if out.ModifyIndex != 1002 { t.Fatalf("bad: %#v", out) } @@ -1743,7 +1865,7 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - if index != 1001 { + if index != 1002 { t.Fatalf("bad: %d", index) } @@ -1764,6 +1886,7 @@ func TestStateStore_EvictAlloc_Alloc(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc() + state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) if err != nil { t.Fatalf("err: %v", err) @@ -1805,6 +1928,10 @@ func TestStateStore_AllocsByNode(t *testing.T) { allocs = append(allocs, alloc) } + for idx, alloc := range allocs { + state.UpsertJobSummary(uint64(900+idx), mock.JobSummary(alloc.JobID)) + } + err := state.UpsertAllocs(1000, allocs) if err != nil { t.Fatalf("err: %v", err) @@ -1839,6 +1966,10 @@ func TestStateStore_AllocsByNodeTerminal(t *testing.T) { allocs = append(allocs, alloc) } + for idx, alloc := range allocs { + state.UpsertJobSummary(uint64(900+idx), mock.JobSummary(alloc.JobID)) + } + err := state.UpsertAllocs(1000, allocs) if err != nil { t.Fatalf("err: %v", err) @@ -1881,6 +2012,10 @@ func TestStateStore_AllocsByJob(t *testing.T) { allocs = append(allocs, alloc) } + for i, alloc := range allocs { + state.UpsertJobSummary(uint64(900+i), mock.JobSummary(alloc.JobID)) + } + err := state.UpsertAllocs(1000, allocs) if err != nil { t.Fatalf("err: %v", err) @@ -1920,6 +2055,10 @@ func TestStateStore_AllocsByIDPrefix(t *testing.T) { allocs = append(allocs, alloc) } + for i, alloc := range allocs { + state.UpsertJobSummary(uint64(900+i), mock.JobSummary(alloc.JobID)) + } + err := state.UpsertAllocs(1000, allocs) if err != nil { t.Fatalf("err: %v", err) @@ -1974,6 +2113,9 @@ func TestStateStore_Allocs(t *testing.T) { alloc := mock.Alloc() allocs = append(allocs, alloc) } + for i, alloc := range allocs { + state.UpsertJobSummary(uint64(900+i), mock.JobSummary(alloc.JobID)) + } err := state.UpsertAllocs(1000, allocs) if err != nil { @@ -2185,6 +2327,7 @@ func TestStateStore_GetJobStatus_DeadEvalsAndAllocs(t *testing.T) { alloc := mock.Alloc() alloc.JobID = job.ID alloc.DesiredStatus = structs.AllocDesiredStatusStop + state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } @@ -2216,6 +2359,7 @@ func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) { alloc := mock.Alloc() alloc.JobID = job.ID alloc.DesiredStatus = structs.AllocDesiredStatusRun + state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } @@ -2333,11 +2477,8 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) { t.Fatalf("err: %v", err) } - job.TaskGroups[0].Count = 1 - err = state.UpsertJob(1003, job) - if err != nil { - t.Fatalf("err: %v", err) - } + outA, _ := state.AllocByID(alloc3.ID) + summary, _ = state.JobSummaryByID(job.ID) expectedSummary := structs.JobSummary{ JobID: job.ID, @@ -2346,6 +2487,8 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) { Starting: 3, }, }, + CreateIndex: job.CreateIndex, + ModifyIndex: outA.ModifyIndex, } if !reflect.DeepEqual(summary, &expectedSummary) { t.Fatalf("expected summary: %v, actual: %v", expectedSummary, summary) @@ -2368,6 +2511,7 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) { if err := state.UpsertAllocs(1004, []*structs.Allocation{alloc4, alloc5}); err != nil { t.Fatalf("err: %v", err) } + outA, _ = state.AllocByID(alloc5.ID) summary, _ = state.JobSummaryByID(job.ID) expectedSummary = structs.JobSummary{ JobID: job.ID, @@ -2377,6 +2521,8 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) { Starting: 1, }, }, + CreateIndex: job.CreateIndex, + ModifyIndex: outA.ModifyIndex, } if !reflect.DeepEqual(summary, &expectedSummary) { t.Fatalf("expected: %v, actual: %v", expectedSummary, summary) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6e162c297d8..f6a9cf13d2f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -258,6 +258,12 @@ type JobPlanRequest struct { WriteRequest } +// JobSummaryRequest is used when we just need to get a specific job summary +type JobSummaryRequest struct { + JobID string + QueryOptions +} + // NodeListRequest is used to parameterize a list request type NodeListRequest struct { QueryOptions @@ -441,6 +447,12 @@ type SingleJobResponse struct { QueryMeta } +// JobSummaryResponse is used to return a single job summary +type JobSummaryResponse struct { + JobSummary *JobSummary + QueryMeta +} + // JobListResponse is used for a list request type JobListResponse struct { Jobs []*JobListStub @@ -964,11 +976,28 @@ const ( type JobSummary struct { JobID string Summary map[string]TaskGroupSummary + + // Raft Indexes + CreateIndex uint64 + ModifyIndex uint64 +} + +// Copy returns a new copy of JobSummary +func (js *JobSummary) Copy() *JobSummary { + newJobSummary := new(JobSummary) + *newJobSummary = *js + newTGSummary := make(map[string]TaskGroupSummary, len(js.Summary)) + for k, v := range js.Summary { + newTGSummary[k] = v + } + newJobSummary.Summary = newTGSummary + return newJobSummary } // TaskGroup summarizes the state of all the allocations of a particular // TaskGroup type TaskGroupSummary struct { + Queued int Complete int Failed int Running int @@ -1166,7 +1195,7 @@ func (j *Job) LookupTaskGroup(name string) *TaskGroup { } // Stub is used to return a summary of the job -func (j *Job) Stub() *JobListStub { +func (j *Job) Stub(summary *JobSummary) *JobListStub { return &JobListStub{ ID: j.ID, ParentID: j.ParentID, @@ -1178,6 +1207,7 @@ func (j *Job) Stub() *JobListStub { CreateIndex: j.CreateIndex, ModifyIndex: j.ModifyIndex, JobModifyIndex: j.JobModifyIndex, + JobSummary: summary, } } @@ -1196,6 +1226,7 @@ type JobListStub struct { Priority int Status string StatusDescription string + JobSummary *JobSummary CreateIndex uint64 ModifyIndex uint64 JobModifyIndex uint64 @@ -2813,6 +2844,10 @@ type Evaluation struct { // scheduler. SnapshotIndex uint64 + // QueuedAllocations is the number of unplaced allocations at the time the + // evaluation was processed. The map is keyed by Task Group names. + QueuedAllocations map[string]int + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -2858,6 +2893,15 @@ func (e *Evaluation) Copy() *Evaluation { ne.FailedTGAllocs = failedTGs } + // Copy queued allocations + if e.QueuedAllocations != nil { + queuedAllocations := make(map[string]int, len(e.QueuedAllocations)) + for tg, num := range e.QueuedAllocations { + queuedAllocations[tg] = num + } + ne.QueuedAllocations = queuedAllocations + } + return ne } diff --git a/nomad/watch/watch.go b/nomad/watch/watch.go index 4e9bafbc903..b6a71749c85 100644 --- a/nomad/watch/watch.go +++ b/nomad/watch/watch.go @@ -9,14 +9,15 @@ package watch // multiple fields does not place a watch on multiple items. Each Item // describes exactly one scoped watch. type Item struct { - Alloc string - AllocEval string - AllocJob string - AllocNode string - Eval string - Job string - Node string - Table string + Alloc string + AllocEval string + AllocJob string + AllocNode string + Eval string + Job string + JobSummary string + Node string + Table string } // Items is a helper used to construct a set of watchItems. It deduplicates diff --git a/nomad/worker.go b/nomad/worker.go index 775c4b5c2fd..b6a23ce1f9f 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -430,6 +430,29 @@ func (w *Worker) ReblockEval(eval *structs.Evaluation) error { } defer metrics.MeasureSince([]string{"nomad", "worker", "reblock_eval"}, time.Now()) + // Update the evaluation if the queued jobs is not same as what is + // recorded in the job summary + summary, err := w.srv.fsm.state.JobSummaryByID(eval.JobID) + if err != nil { + return fmt.Errorf("couldn't retreive job summary: %v", err) + } + if summary != nil { + var hasChanged bool + for tg, summary := range summary.Summary { + if queued, ok := eval.QueuedAllocations[tg]; ok { + if queued != summary.Queued { + hasChanged = true + break + } + } + } + if hasChanged { + if err := w.UpdateEval(eval); err != nil { + return err + } + } + } + // Store the snapshot index in the eval eval.SnapshotIndex = w.snapshotIndex diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 0a04a92cbf5..fea703cebb4 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -252,8 +252,10 @@ func TestWorker_SubmitPlan(t *testing.T) { node := mock.Node() testRegisterNode(t, s1, node) - // Create the register request eval1 := mock.Eval() + s1.fsm.State().UpsertJobSummary(1000, mock.JobSummary(eval1.JobID)) + + // Create the register request s1.evalBroker.Enqueue(eval1) evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) @@ -266,6 +268,7 @@ func TestWorker_SubmitPlan(t *testing.T) { // Create an allocation plan alloc := mock.Alloc() + s1.fsm.State().UpsertJobSummary(1200, mock.JobSummary(alloc.JobID)) plan := &structs.Plan{ EvalID: eval1.ID, NodeAllocation: map[string][]*structs.Allocation{ @@ -463,12 +466,22 @@ func TestWorker_ReblockEval(t *testing.T) { // Create the blocked eval eval1 := mock.Eval() eval1.Status = structs.EvalStatusBlocked + eval1.QueuedAllocations = map[string]int{"cache": 100} // Insert it into the state store if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil { t.Fatal(err) } + // Create the job summary + js := mock.JobSummary(eval1.JobID) + tg := js.Summary["web"] + tg.Queued = 100 + js.Summary["web"] = tg + if err := s1.fsm.State().UpsertJobSummary(1001, js); err != nil { + t.Fatal(err) + } + // Enqueue the eval and then dequeue s1.evalBroker.Enqueue(eval1) evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) @@ -480,6 +493,7 @@ func TestWorker_ReblockEval(t *testing.T) { } eval2 := evalOut.Copy() + eval2.QueuedAllocations = map[string]int{"web": 50} // Attempt to reblock eval w := &Worker{srv: s1, logger: s1.logger, evalToken: token} @@ -497,6 +511,15 @@ func TestWorker_ReblockEval(t *testing.T) { t.Fatalf("ReblockEval didn't insert eval into the blocked eval tracker: %#v", bStats) } + // Check that the eval was updated + eval, err := s1.fsm.State().EvalByID(eval2.ID) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(eval.QueuedAllocations, eval2.QueuedAllocations) { + t.Fatalf("expected: %#v, actual: %#v", eval2.QueuedAllocations, eval.QueuedAllocations) + } + // Check that the snapshot index was set properly by unblocking the eval and // then dequeuing. s1.blockedEvals.Unblock("foobar", 1000) diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 4b0e4011f91..00027c77900 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -64,6 +64,7 @@ func TestEvalContext_ProposedAlloc(t *testing.T) { }, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -76,7 +77,10 @@ func TestEvalContext_ProposedAlloc(t *testing.T) { }, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", } + noErr(t, state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))) + noErr(t, state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) // Add a planned eviction to alloc1 diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index e6cf2465c7c..acafac9c5b5 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -72,6 +72,7 @@ type GenericScheduler struct { blocked *structs.Evaluation failedTGAllocs map[string]*structs.AllocMetric + queuedAllocs map[string]int } // NewServiceScheduler is a factory function to instantiate a new service scheduler @@ -110,7 +111,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, - s.failedTGAllocs, structs.EvalStatusFailed, desc) + s.failedTGAllocs, structs.EvalStatusFailed, desc, s.queuedAllocs) } // Retry up to the maxScheduleAttempts and reset if progress is made. @@ -128,7 +129,8 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { mErr.Errors = append(mErr.Errors, err) } if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, - s.failedTGAllocs, statusErr.EvalStatus, err.Error()); err != nil { + s.failedTGAllocs, statusErr.EvalStatus, err.Error(), + s.queuedAllocs); err != nil { mErr.Errors = append(mErr.Errors, err) } return mErr.ErrorOrNil() @@ -148,7 +150,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { // Update the status to complete return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, - s.failedTGAllocs, structs.EvalStatusComplete, "") + s.failedTGAllocs, structs.EvalStatusComplete, "", s.queuedAllocs) } // createBlockedEval creates a blocked eval and submits it to the planner. If @@ -184,6 +186,11 @@ func (s *GenericScheduler) process() (bool, error) { return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err) } + numTaskGroups := 0 + if s.job != nil { + numTaskGroups = len(s.job.TaskGroups) + } + s.queuedAllocs = make(map[string]int, numTaskGroups) // Create a plan s.plan = s.eval.MakePlan(s.job) @@ -241,6 +248,10 @@ func (s *GenericScheduler) process() (bool, error) { return false, err } + // Decrement the number of allocations pending per task group based on the + // number of allocations successfully placed + adjustQueuedAllocations(s.logger, result, s.queuedAllocs) + // If we got a state refresh, try again since we have stale data if newState != nil { s.logger.Printf("[DEBUG] sched: %#v: refresh forced", s.eval) @@ -384,6 +395,11 @@ func (s *GenericScheduler) computeJobAllocs() error { return nil } + // Record the number of allocations that needs to be placed per Task Group + for _, allocTuple := range diff.place { + s.queuedAllocs[allocTuple.TaskGroup.Name] += 1 + } + // Compute the placements return s.computePlacements(diff.place) } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index daf85869eb4..da199aeba78 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -272,6 +272,11 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { t.Fatalf("bad: %#v", metrics) } + // Check queued allocations + queued := outEval.QueuedAllocations["web"] + if queued != 10 { + t.Fatalf("expected queued: %v, actual: %v", 10, queued) + } h.AssertEvalStatus(t, structs.EvalStatusComplete) } @@ -490,6 +495,71 @@ func TestServiceSched_EvaluateMaxPlanEval(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_Plan_Partial_Progress(t *testing.T) { + h := NewHarness(t) + + // Create a node + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a job with a high resource ask so that all the allocations can't + // be placed on a single node. + job := mock.Job() + job.TaskGroups[0].Count = 3 + job.TaskGroups[0].Tasks[0].Resources.CPU = 3600 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure only one allocations placed + if len(out) != 1 { + t.Fatalf("bad: %#v", out) + } + + queued := h.Evals[0].QueuedAllocations["web"] + if queued != 2 { + t.Fatalf("expected: %v, actual: %v", 2, queued) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_EvaluateBlockedEval(t *testing.T) { h := NewHarness(t) @@ -608,6 +678,12 @@ func TestServiceSched_EvaluateBlockedEval_Finished(t *testing.T) { } h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // Ensure queued allocations is zero + queued := h.Evals[0].QueuedAllocations["web"] + if queued != 0 { + t.Fatalf("expected queued: %v, actual: %v", 0, queued) + } } func TestServiceSched_JobModify(t *testing.T) { @@ -1107,6 +1183,9 @@ func TestServiceSched_JobDeregister(t *testing.T) { alloc.JobID = job.ID allocs = append(allocs, alloc) } + for _, alloc := range allocs { + h.State.UpsertJobSummary(h.NextIndex(), mock.JobSummary(alloc.JobID)) + } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) // Create a mock evaluation to deregister the job @@ -1231,6 +1310,53 @@ func TestServiceSched_NodeDrain(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) { + h := NewHarness(t) + + // Register a draining node + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Generate a fake job with allocations and an update policy. + job := mock.Job() + job.TaskGroups[0].Count = 2 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for i := 0; i < 2; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + node.Drain = true + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + NodeID: node.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + queued := h.Evals[0].QueuedAllocations["web"] + if queued != 2 { + t.Fatalf("expected: %v, actual: %v", 2, queued) + } +} + func TestServiceSched_NodeDrain_UpdateStrategy(t *testing.T) { h := NewHarness(t) @@ -1509,9 +1635,59 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) { t.Fatalf("bad: %#v", out) } + // Ensure that the scheduler is recording the correct number of queued + // allocations + queued := h.Evals[0].QueuedAllocations["web"] + if queued != 0 { + t.Fatalf("expected: %v, actual: %v", 1, queued) + } + h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) { + h := NewHarness(t) + + node := mock.Node() + node.Drain = true + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a job + job := mock.Job() + job.TaskGroups[0].Count = 1 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a failed alloc + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + alloc.ClientStatus = structs.AllocClientStatusFailed + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewBatchScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure that the scheduler is recording the correct number of queued + // allocations + queued := h.Evals[0].QueuedAllocations["web"] + if queued != 1 { + t.Fatalf("expected: %v, actual: %v", 1, queued) + } +} + func TestBatchSched_ReRun_SuccessfullyFinishedAlloc(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 63af1154195..f33cd55212e 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -204,6 +204,7 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { }, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -216,7 +217,10 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { }, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", } + noErr(t, state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))) + noErr(t, state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) task := &structs.Task{ @@ -280,6 +284,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { }, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -292,7 +297,10 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { }, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", } + noErr(t, state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))) + noErr(t, state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) // Add a planned eviction to alloc1 diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index ab135c63fb3..812a8b16e6b 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -38,6 +38,7 @@ type SystemScheduler struct { nextEval *structs.Evaluation failedTGAllocs map[string]*structs.AllocMetric + queuedAllocs map[string]int } // NewSystemScheduler is a factory function to instantiate a new system @@ -62,20 +63,23 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) - return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, structs.EvalStatusFailed, desc) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, structs.EvalStatusFailed, desc, + s.queuedAllocs) } // Retry up to the maxSystemScheduleAttempts and reset if progress is made. progress := func() bool { return progressMade(s.planResult) } if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil { if statusErr, ok := err.(*SetStatusError); ok { - return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, statusErr.EvalStatus, err.Error()) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, statusErr.EvalStatus, err.Error(), + s.queuedAllocs) } return err } // Update the status to complete - return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, structs.EvalStatusComplete, "") + return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, structs.EvalStatusComplete, "", + s.queuedAllocs) } // process is wrapped in retryMax to iteratively run the handler until we have no @@ -88,6 +92,11 @@ func (s *SystemScheduler) process() (bool, error) { return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err) } + numTaskGroups := 0 + if s.job != nil { + numTaskGroups = len(s.job.TaskGroups) + } + s.queuedAllocs = make(map[string]int, numTaskGroups) // Get the ready nodes in the required datacenters if s.job != nil { @@ -142,6 +151,10 @@ func (s *SystemScheduler) process() (bool, error) { return false, err } + // Decrement the number of allocations pending per task group based on the + // number of allocations successfully placed + adjustQueuedAllocations(s.logger, result, s.queuedAllocs) + // If we got a state refresh, try again since we have stale data if newState != nil { s.logger.Printf("[DEBUG] sched: %#v: refresh forced", s.eval) @@ -214,6 +227,11 @@ func (s *SystemScheduler) computeJobAllocs() error { return nil } + // Record the number of allocations that needs to be placed per Task Group + for _, allocTuple := range diff.place { + s.queuedAllocs[allocTuple.TaskGroup.Name] += 1 + } + // Compute the placements return s.computePlacements(diff.place) } diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 1e300982635..46865cccfe9 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -70,9 +70,66 @@ func TestSystemSched_JobRegister(t *testing.T) { t.Fatalf("bad: %#v", out[0].Metrics) } + // Ensure no allocations are queued + queued := h.Evals[0].QueuedAllocations["web"] + if queued != 0 { + t.Fatalf("expected queued allocations: %v, actual: %v", 0, queued) + } + h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestSystemSched_ExhaustResources(t *testing.T) { + h := NewHarness(t) + + // Create a nodes + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a service job which consumes most of the system resources + svcJob := mock.Job() + svcJob.TaskGroups[0].Count = 1 + svcJob.TaskGroups[0].Tasks[0].Resources.CPU = 3600 + noErr(t, h.State.UpsertJob(h.NextIndex(), svcJob)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: svcJob.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: svcJob.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a system job + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval1 := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + if err := h.Process(NewSystemScheduler, eval1); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure that we have one allocation queued from the system job eval + queued := h.Evals[1].QueuedAllocations["web"] + if queued != 1 { + t.Fatalf("expected: %v, actual: %v", 1, queued) + } +} + func TestSystemSched_JobRegister_Annotate(t *testing.T) { h := NewHarness(t) @@ -587,6 +644,9 @@ func TestSystemSched_JobDeregister(t *testing.T) { alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } + for _, alloc := range allocs { + noErr(t, h.State.UpsertJobSummary(h.NextIndex(), mock.JobSummary(alloc.JobID))) + } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) // Create a mock evaluation to deregister the job diff --git a/scheduler/util.go b/scheduler/util.go index 5e12e119e5f..fbed1ea3bcd 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -368,7 +368,8 @@ func networkPortMap(n *structs.NetworkResource) map[string]int { // setStatus is used to update the status of the evaluation func setStatus(logger *log.Logger, planner Planner, eval, nextEval, spawnedBlocked *structs.Evaluation, - tgMetrics map[string]*structs.AllocMetric, status, desc string) error { + tgMetrics map[string]*structs.AllocMetric, status, desc string, + queuedAllocs map[string]int) error { logger.Printf("[DEBUG] sched: %#v: setting status to %s", eval, status) newEval := eval.Copy() @@ -381,6 +382,10 @@ func setStatus(logger *log.Logger, planner Planner, if spawnedBlocked != nil { newEval.BlockedEval = spawnedBlocked.ID } + if queuedAllocs != nil { + newEval.QueuedAllocations = queuedAllocs + } + return planner.UpdateEval(newEval) } @@ -453,8 +458,6 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, newAlloc.Resources = nil // Computed in Plan Apply newAlloc.TaskResources = option.TaskResources newAlloc.Metrics = ctx.Metrics() - newAlloc.DesiredStatus = structs.AllocDesiredStatusRun - newAlloc.ClientStatus = structs.AllocClientStatusPending ctx.Plan().AppendAlloc(newAlloc) // Remove this allocation from the slice @@ -593,3 +596,24 @@ func desiredUpdates(diff *diffResult, inplaceUpdates, return desiredTgs } + +// adjustQueuedAllocations decrements the number of allocations pending per task +// group based on the number of allocations successfully placed +func adjustQueuedAllocations(logger *log.Logger, result *structs.PlanResult, queuedAllocs map[string]int) { + if result != nil { + for _, allocations := range result.NodeAllocation { + for _, allocation := range allocations { + // Ensure that the allocation is newly created + if allocation.CreateIndex != result.AllocIndex { + continue + } + + if _, ok := queuedAllocs[allocation.TaskGroup]; ok { + queuedAllocs[allocation.TaskGroup] -= 1 + } else { + logger.Printf("[ERR] sched: allocation %q placed but not in list of unplaced allocations", allocation.TaskGroup) + } + } + } + } +} diff --git a/scheduler/util_test.go b/scheduler/util_test.go index df8c342dd84..adbf6db49da 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -488,7 +488,7 @@ func TestSetStatus(t *testing.T) { eval := mock.Eval() status := "a" desc := "b" - if err := setStatus(logger, h, eval, nil, nil, nil, status, desc); err != nil { + if err := setStatus(logger, h, eval, nil, nil, nil, status, desc, nil); err != nil { t.Fatalf("setStatus() failed: %v", err) } @@ -504,7 +504,7 @@ func TestSetStatus(t *testing.T) { // Test next evals h = NewHarness(t) next := mock.Eval() - if err := setStatus(logger, h, eval, next, nil, nil, status, desc); err != nil { + if err := setStatus(logger, h, eval, next, nil, nil, status, desc, nil); err != nil { t.Fatalf("setStatus() failed: %v", err) } @@ -520,7 +520,7 @@ func TestSetStatus(t *testing.T) { // Test blocked evals h = NewHarness(t) blocked := mock.Eval() - if err := setStatus(logger, h, eval, nil, blocked, nil, status, desc); err != nil { + if err := setStatus(logger, h, eval, nil, blocked, nil, status, desc, nil); err != nil { t.Fatalf("setStatus() failed: %v", err) } @@ -536,7 +536,7 @@ func TestSetStatus(t *testing.T) { // Test metrics h = NewHarness(t) metrics := map[string]*structs.AllocMetric{"foo": nil} - if err := setStatus(logger, h, eval, nil, nil, metrics, status, desc); err != nil { + if err := setStatus(logger, h, eval, nil, nil, metrics, status, desc, nil); err != nil { t.Fatalf("setStatus() failed: %v", err) } @@ -548,6 +548,23 @@ func TestSetStatus(t *testing.T) { if !reflect.DeepEqual(newEval.FailedTGAllocs, metrics) { t.Fatalf("setStatus() didn't set failed task group metrics correctly: %v", newEval) } + + // Test queued allocations + h = NewHarness(t) + queuedAllocs := map[string]int{"web": 1} + + if err := setStatus(logger, h, eval, nil, nil, metrics, status, desc, queuedAllocs); err != nil { + t.Fatalf("setStatus() failed: %v", err) + } + + if len(h.Evals) != 1 { + t.Fatalf("setStatus() didn't update plan: %v", h.Evals) + } + + newEval = h.Evals[0] + if !reflect.DeepEqual(newEval.QueuedAllocations, queuedAllocs) { + t.Fatalf("setStatus() didn't set failed task group metrics correctly: %v", newEval) + } } func TestInplaceUpdate_ChangedTaskGroup(t *testing.T) { @@ -556,7 +573,7 @@ func TestInplaceUpdate_ChangedTaskGroup(t *testing.T) { job := mock.Job() node := mock.Node() - noErr(t, state.UpsertNode(1000, node)) + noErr(t, state.UpsertNode(900, node)) // Register an alloc alloc := &structs.Allocation{ @@ -570,8 +587,10 @@ func TestInplaceUpdate_ChangedTaskGroup(t *testing.T) { MemoryMB: 2048, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskGroup: "web", } alloc.TaskResources = map[string]*structs.Resources{"web": alloc.Resources} + noErr(t, state.UpsertJobSummary(1000, mock.JobSummary(alloc.JobID))) noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) // Create a new task group that prevents in-place updates. @@ -602,7 +621,7 @@ func TestInplaceUpdate_NoMatch(t *testing.T) { job := mock.Job() node := mock.Node() - noErr(t, state.UpsertNode(1000, node)) + noErr(t, state.UpsertNode(900, node)) // Register an alloc alloc := &structs.Allocation{ @@ -616,8 +635,10 @@ func TestInplaceUpdate_NoMatch(t *testing.T) { MemoryMB: 2048, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskGroup: "web", } alloc.TaskResources = map[string]*structs.Resources{"web": alloc.Resources} + noErr(t, state.UpsertJobSummary(1000, mock.JobSummary(alloc.JobID))) noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) // Create a new task group that requires too much resources. @@ -647,7 +668,7 @@ func TestInplaceUpdate_Success(t *testing.T) { job := mock.Job() node := mock.Node() - noErr(t, state.UpsertNode(1000, node)) + noErr(t, state.UpsertNode(900, node)) // Register an alloc alloc := &structs.Allocation{ @@ -664,6 +685,7 @@ func TestInplaceUpdate_Success(t *testing.T) { DesiredStatus: structs.AllocDesiredStatusRun, } alloc.TaskResources = map[string]*structs.Resources{"web": alloc.Resources} + noErr(t, state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))) noErr(t, state.UpsertAllocs(1001, []*structs.Allocation{alloc})) // Create a new task group that updates the resources. @@ -891,3 +913,37 @@ func TestDesiredUpdates(t *testing.T) { t.Fatalf("desiredUpdates() returned %#v; want %#v", desired, expected) } } + +func TestUtil_AdjustQueuedAllocations(t *testing.T) { + logger := log.New(os.Stderr, "", log.LstdFlags) + alloc1 := mock.Alloc() + alloc2 := mock.Alloc() + alloc2.CreateIndex = 4 + alloc3 := mock.Alloc() + alloc3.CreateIndex = 3 + alloc4 := mock.Alloc() + alloc4.CreateIndex = 6 + + planResult := structs.PlanResult{ + NodeUpdate: map[string][]*structs.Allocation{ + "node-1": []*structs.Allocation{alloc1}, + }, + NodeAllocation: map[string][]*structs.Allocation{ + "node-1": []*structs.Allocation{ + alloc2, + }, + "node-2": []*structs.Allocation{ + alloc3, alloc4, + }, + }, + RefreshIndex: 3, + AllocIndex: 4, + } + + queuedAllocs := map[string]int{"web": 2} + adjustQueuedAllocations(logger, &planResult, queuedAllocs) + + if queuedAllocs["web"] != 1 { + t.Fatalf("expected: %v, actual: %v", 1, queuedAllocs["web"]) + } +} diff --git a/scripts/test.sh b/scripts/test.sh index 5aaf27772bd..b9cde8b9a52 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -15,4 +15,4 @@ go list ./... | grep -v '^github.com/hashicorp/nomad/vendor/' | \ sudo \ -E PATH=$TEMPDIR:$PATH \ -E GOPATH=$GOPATH \ - xargs $GOBIN test ${GOTEST_FLAGS:--cover -timeout=900s} + xargs $GOBIN test -v ${GOTEST_FLAGS:--cover -timeout=900s}