Skip to content

Commit

Permalink
Merge pull request #1455 from hashicorp/f-job-summary
Browse files Browse the repository at this point in the history
Job Summary - Part 2
  • Loading branch information
diptanu authored Jul 26, 2016
2 parents 6349cd3 + 2892224 commit 5081b94
Show file tree
Hide file tree
Showing 34 changed files with 1,462 additions and 127 deletions.
31 changes: 31 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,15 @@ func (j *Jobs) Plan(job *Job, diff bool, q *WriteOptions) (*JobPlanResponse, *Wr
return &resp, wm, nil
}

func (j *Jobs) Summary(jobID string, q *QueryOptions) (*JobSummary, *QueryMeta, error) {
var resp JobSummary
qm, err := j.client.query("/v1/job/"+jobID+"/summary", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// periodicForceResponse is used to deserialize a force response
type periodicForceResponse struct {
EvalID string
Expand Down Expand Up @@ -199,6 +208,27 @@ type Job struct {
JobModifyIndex uint64
}

// JobSummary summarizes the state of the allocations of a job
type JobSummary struct {
JobID string
Summary map[string]TaskGroupSummary

// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}

// TaskGroup summarizes the state of all the allocations of a particular
// TaskGroup
type TaskGroupSummary struct {
Queued int
Complete int
Failed int
Running int
Starting int
Lost int
}

// JobListStub is used to return a subset of information about
// jobs during list operations.
type JobListStub struct {
Expand All @@ -209,6 +239,7 @@ type JobListStub struct {
Priority int
Status string
StatusDescription string
JobSummary *JobSummary
CreateIndex uint64
ModifyIndex uint64
JobModifyIndex uint64
Expand Down
42 changes: 42 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,48 @@ func TestJobs_Plan(t *testing.T) {
}
}

func TestJobs_JobSummary(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()

// Trying to retrieve a job summary before the job exists
// returns an error
_, _, err := jobs.Summary("job1", nil)
if err == nil || !strings.Contains(err.Error(), "not found") {
t.Fatalf("expected not found error, got: %#v", err)
}

// Register the job
job := testJob()
_, wm, err := jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertWriteMeta(t, wm)

// Query the job summary again and ensure it exists
result, qm, err := jobs.Summary("job1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertQueryMeta(t, qm)

expectedJobSummary := JobSummary{
JobID: job.ID,
Summary: map[string]TaskGroupSummary{
job.TaskGroups[0].Name: {},
},
CreateIndex: result.CreateIndex,
ModifyIndex: result.ModifyIndex,
}

// Check that the result is what we expect
if !reflect.DeepEqual(&expectedJobSummary, result) {
t.Fatalf("expect: %#v, got: %#v", expectedJobSummary, result)
}
}

func TestJobs_NewBatchJob(t *testing.T) {
job := NewBatchJob("job1", "myjob", "region1", 5)
expect := &Job{
Expand Down
18 changes: 14 additions & 4 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ func TestClient_UpdateAllocStatus(t *testing.T) {
alloc.ClientStatus = originalStatus

state := s1.State()
if err := state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
state.UpsertAllocs(100, []*structs.Allocation{alloc})

testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -394,6 +397,12 @@ func TestClient_WatchAllocs(t *testing.T) {
alloc2.NodeID = c1.Node().ID

state := s1.State()
if err := state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(100,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
Expand Down Expand Up @@ -469,8 +478,10 @@ func TestClient_SaveRestoreState(t *testing.T) {
task.Config["args"] = []string{"10"}

state := s1.State()
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
if err := state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}

Expand All @@ -485,8 +496,7 @@ func TestClient_SaveRestoreState(t *testing.T) {
})

// Shutdown the client, saves state
err = c1.Shutdown()
if err != nil {
if err := c1.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down
19 changes: 16 additions & 3 deletions command/agent/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func TestHTTP_AllocsList(t *testing.T) {
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
Expand Down Expand Up @@ -58,13 +60,21 @@ func TestHTTP_AllocsPrefixList(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Directly manipulate the state
state := s.Agent.server.State()

alloc1 := mock.Alloc()
alloc1.ID = "aaaaaaaa-e8f7-fd38-c855-ab94ceb89706"
alloc2 := mock.Alloc()
alloc2.ID = "aaabbbbb-e8f7-fd38-c855-ab94ceb89706"
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
summary1 := mock.JobSummary(alloc1.JobID)
summary2 := mock.JobSummary(alloc2.JobID)
if err := state.UpsertJobSummary(998, summary1); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(999, summary2); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2}); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -110,6 +120,9 @@ func TestHTTP_AllocQuery(t *testing.T) {
// Directly manipulate the state
state := s.Agent.server.State()
alloc := mock.Alloc()
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc})
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions command/agent/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func TestHTTP_EvalAllocations(t *testing.T) {
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc2.EvalID = alloc1.EvalID
state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/plan"):
jobName := strings.TrimSuffix(path, "/plan")
return s.jobPlan(resp, req, jobName)
case strings.HasSuffix(path, "/summary"):
jobName := strings.TrimSuffix(path, "/summary")
return s.jobSummaryRequest(resp, req, jobName)
default:
return s.jobCRUD(resp, req, path)
}
Expand Down Expand Up @@ -241,3 +244,24 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
setIndex(resp, out.Index)
return out, nil
}

func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) {
args := structs.JobSummaryRequest{
JobID: name,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

var out structs.JobSummaryResponse
if err := s.agent.RPC("Job.Summary", &args, &out); err != nil {
return nil, err
}

setMeta(resp, &out.QueryMeta)
if out.JobSummary == nil {
return nil, CodedError(404, "job not found")
}
setIndex(resp, out.Index)
return out.JobSummary, nil
}
9 changes: 9 additions & 0 deletions command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func TestHTTP_NodeForceEval(t *testing.T) {
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -177,6 +180,9 @@ func TestHTTP_NodeAllocations(t *testing.T) {
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -231,6 +237,9 @@ func TestHTTP_NodeDrain(t *testing.T) {
state := s.Agent.server.State()
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down
23 changes: 23 additions & 0 deletions command/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,29 @@ func (c *StatusCommand) outputJobInfo(client *api.Client, job *api.Job) error {
return fmt.Errorf("Error querying job evaluations: %s", err)
}

// Query the summary
summary, _, err := client.Jobs().Summary(job.ID, nil)
if err != nil {
return fmt.Errorf("Error querying job summary: %s", err)
}

// Format the summary
c.Ui.Output(c.Colorize().Color("\n[bold]Summary[reset]"))
if summary != nil {
summaries := make([]string, len(summary.Summary)+1)
summaries[0] = "Task Group|Queued|Starting|Running|Failed|Complete|Lost"
idx := 1
for tg, tgs := range summary.Summary {
summaries[idx] = fmt.Sprintf("%s|%d|%d|%d|%d|%d|%d",
tg, tgs.Queued, tgs.Starting,
tgs.Running, tgs.Failed,
tgs.Complete, tgs.Lost,
)
idx += 1
}
c.Ui.Output(formatList(summaries))
}

// Determine latest evaluation with failures whose follow up hasn't
// completed, this is done while formatting
var latestFailedPlacement *api.Evaluation
Expand Down
3 changes: 3 additions & 0 deletions command/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func TestStatusCommand_Run(t *testing.T) {
if !strings.Contains(out, "Allocations") {
t.Fatalf("should dump allocations")
}
if !strings.Contains(out, "Summary") {
t.Fatalf("should dump summary")
}
ui.OutputWriter.Reset()

// Query a single job showing evals
Expand Down
26 changes: 20 additions & 6 deletions nomad/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ func TestAllocEndpoint_List(t *testing.T) {

// Create the register request
alloc := mock.Alloc()
summary := mock.JobSummary(alloc.JobID)
state := s1.fsm.State()
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {

if err := state.UpsertJobSummary(999, summary); err != nil {
t.Fatalf("err: %v", err)
}
if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -75,6 +79,10 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
// Create the alloc
alloc := mock.Alloc()

summary := mock.JobSummary(alloc.JobID)
if err := state.UpsertJobSummary(1, summary); err != nil {
t.Fatalf("err: %v", err)
}
// Upsert alloc triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertAllocs(2, []*structs.Allocation{alloc}); err != nil {
Expand Down Expand Up @@ -109,12 +117,13 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
alloc2.ID = alloc.ID
alloc2.ClientStatus = structs.AllocClientStatusRunning
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpdateAllocsFromClient(3, []*structs.Allocation{alloc2}); err != nil {
state.UpsertJobSummary(3, mock.JobSummary(alloc2.JobID))
if err := state.UpdateAllocsFromClient(4, []*structs.Allocation{alloc2}); err != nil {
t.Fatalf("err: %v", err)
}
})

req.MinQueryIndex = 2
req.MinQueryIndex = 3
start = time.Now()
var resp2 structs.AllocListResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp2); err != nil {
Expand All @@ -124,8 +133,8 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
if resp2.Index != 4 {
t.Fatalf("Bad index: %d %d", resp2.Index, 4)
}
if len(resp2.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID ||
resp2.Allocations[0].ClientStatus != structs.AllocClientStatusRunning {
Expand All @@ -142,6 +151,7 @@ func TestAllocEndpoint_GetAlloc(t *testing.T) {
// Create the register request
alloc := mock.Alloc()
state := s1.fsm.State()
state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -178,6 +188,7 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) {

// First create an unrelated alloc
time.AfterFunc(100*time.Millisecond, func() {
state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
Expand All @@ -186,6 +197,7 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) {

// Create the alloc we are watching later
time.AfterFunc(200*time.Millisecond, func() {
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -227,6 +239,8 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) {
alloc := mock.Alloc()
alloc2 := mock.Alloc()
state := s1.fsm.State()
state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down
Loading

0 comments on commit 5081b94

Please sign in to comment.