Skip to content

Commit

Permalink
Merge pull request #1892 from hashicorp/f-eval-blocking
Browse files Browse the repository at this point in the history
Implement blocking queries for /v1/job/evaluations
  • Loading branch information
dadgar authored Oct 31, 2016
2 parents 40d0a4e + 59b988b commit 21d73a6
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 21 deletions.
46 changes: 28 additions & 18 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,26 +570,36 @@ func (j *Job) Evaluations(args *structs.JobSpecificRequest,
}
defer metrics.MeasureSince([]string{"nomad", "job", "evaluations"}, time.Now())

// Capture the evaluations
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
reply.Evaluations, err = snap.EvalsByJob(args.JobID)
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{EvalJob: args.JobID}),
run: func() error {
// Capture the evals
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}

// Use the last index that affected the evals table
index, err := snap.Index("evals")
if err != nil {
return err
}
reply.Index = index
reply.Evaluations, err = snap.EvalsByJob(args.JobID)
if err != nil {
return err
}

// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Use the last index that affected the evals table
index, err := snap.Index("evals")
if err != nil {
return err
}
reply.Index = index

// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}

return j.srv.blockingRPC(&opts)
}

// Plan is used to cause a dry-run evaluation of the Job and return the results
Expand Down
53 changes: 53 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,59 @@ func TestJobEndpoint_Evaluations(t *testing.T) {
}
}

func TestJobEndpoint_Evaluations_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
eval1 := mock.Eval()
eval2 := mock.Eval()
eval2.JobID = "job1"
state := s1.fsm.State()

// First upsert an unrelated eval
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertEvals(100, []*structs.Evaluation{eval1})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Upsert an eval for the job we are interested in later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertEvals(200, []*structs.Evaluation{eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Lookup the jobs
get := &structs.JobSpecificRequest{
JobID: "job1",
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
},
}
var resp structs.JobEvaluationsResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Job.Evaluations", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if len(resp.Evaluations) != 1 || resp.Evaluations[0].JobID != "job1" {
t.Fatalf("bad: %#v", resp.Evaluations)
}
}

func TestJobEndpoint_Plan_WithDiff(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
Expand Down
6 changes: 5 additions & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
jobs := make(map[string]string, len(evals))
for _, eval := range evals {
watcher.Add(watch.Item{Eval: eval.ID})
watcher.Add(watch.Item{EvalJob: eval.JobID})
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
return err
}
Expand Down Expand Up @@ -734,8 +735,10 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
if err := txn.Delete("evals", existing); err != nil {
return fmt.Errorf("eval delete failed: %v", err)
}
jobID := existing.(*structs.Evaluation).JobID
watcher.Add(watch.Item{Eval: eval})
jobs[existing.(*structs.Evaluation).JobID] = ""
watcher.Add(watch.Item{EvalJob: jobID})
jobs[jobID] = ""
}

for _, alloc := range allocs {
Expand Down Expand Up @@ -1729,6 +1732,7 @@ func (r *StateRestore) JobRestore(job *structs.Job) error {
func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
r.items.Add(watch.Item{Table: "evals"})
r.items.Add(watch.Item{Eval: eval.ID})
r.items.Add(watch.Item{EvalJob: eval.JobID})
if err := r.txn.Insert("evals", eval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
}
Expand Down
9 changes: 7 additions & 2 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,8 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) {
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: eval.ID})
watch.Item{Eval: eval.ID},
watch.Item{EvalJob: eval.JobID})

err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
if err != nil {
Expand Down Expand Up @@ -1266,10 +1267,12 @@ func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) {
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: eval.ID})
watch.Item{Eval: eval.ID},
watch.Item{EvalJob: eval.JobID})

eval2 := mock.Eval()
eval2.ID = eval.ID
eval2.JobID = eval.JobID
err = state.UpsertEvals(1001, []*structs.Evaluation{eval2})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -1315,6 +1318,8 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
watch.Item{Table: "allocs"},
watch.Item{Eval: eval1.ID},
watch.Item{Eval: eval2.ID},
watch.Item{EvalJob: eval1.JobID},
watch.Item{EvalJob: eval2.JobID},
watch.Item{Alloc: alloc1.ID},
watch.Item{Alloc: alloc2.ID},
watch.Item{AllocEval: alloc1.EvalID},
Expand Down
1 change: 1 addition & 0 deletions nomad/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Item struct {
AllocJob string
AllocNode string
Eval string
EvalJob string
Job string
JobSummary string
Node string
Expand Down

0 comments on commit 21d73a6

Please sign in to comment.