diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index ff70661bd2a..2e28341d697 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -570,26 +570,36 @@ func (j *Job) Evaluations(args *structs.JobSpecificRequest, } defer metrics.MeasureSince([]string{"nomad", "job", "evaluations"}, time.Now()) - // Capture the evaluations - snap, err := j.srv.fsm.State().Snapshot() - if err != nil { - return err - } - reply.Evaluations, err = snap.EvalsByJob(args.JobID) - if err != nil { - return err - } + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + watch: watch.NewItems(watch.Item{EvalJob: args.JobID}), + run: func() error { + // Capture the evals + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } - // Use the last index that affected the evals table - index, err := snap.Index("evals") - if err != nil { - return err - } - reply.Index = index + reply.Evaluations, err = snap.EvalsByJob(args.JobID) + if err != nil { + return err + } - // Set the query response - j.srv.setQueryMeta(&reply.QueryMeta) - return nil + // Use the last index that affected the evals table + index, err := snap.Index("evals") + if err != nil { + return err + } + reply.Index = index + + // Set the query response + j.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + + return j.srv.blockingRPC(&opts) } // Plan is used to cause a dry-run evaluation of the Job and return the results diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index ad6799fd6b7..3405fa10100 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -1421,6 +1421,59 @@ func TestJobEndpoint_Evaluations(t *testing.T) { } } +func TestJobEndpoint_Evaluations_Blocking(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + eval1 := mock.Eval() + eval2 := mock.Eval() + eval2.JobID = "job1" + state := s1.fsm.State() + + // First upsert an unrelated eval + time.AfterFunc(100*time.Millisecond, func() { + err := state.UpsertEvals(100, []*structs.Evaluation{eval1}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Upsert an eval for the job we are interested in later + time.AfterFunc(200*time.Millisecond, func() { + err := state.UpsertEvals(200, []*structs.Evaluation{eval2}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Lookup the jobs + get := &structs.JobSpecificRequest{ + JobID: "job1", + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 50, + }, + } + var resp structs.JobEvaluationsResponse + start := time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Job.Evaluations", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp.Index != 200 { + t.Fatalf("Bad index: %d %d", resp.Index, 200) + } + if len(resp.Evaluations) != 1 || resp.Evaluations[0].JobID != "job1" { + t.Fatalf("bad: %#v", resp.Evaluations) + } +} + func TestJobEndpoint_Plan_WithDiff(t *testing.T) { s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 13b6e00bdc4..c3cb7e3c15b 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -638,6 +638,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro jobs := make(map[string]string, len(evals)) for _, eval := range evals { watcher.Add(watch.Item{Eval: eval.ID}) + watcher.Add(watch.Item{EvalJob: eval.JobID}) if err := s.nestedUpsertEval(txn, index, eval); err != nil { return err } @@ -734,8 +735,10 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e if err := txn.Delete("evals", existing); err != nil { return fmt.Errorf("eval delete failed: %v", err) } + jobID := existing.(*structs.Evaluation).JobID watcher.Add(watch.Item{Eval: eval}) - jobs[existing.(*structs.Evaluation).JobID] = "" + watcher.Add(watch.Item{EvalJob: jobID}) + jobs[jobID] = "" } for _, alloc := range allocs { @@ -1729,6 +1732,7 @@ func (r *StateRestore) JobRestore(job *structs.Job) error { func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error { r.items.Add(watch.Item{Table: "evals"}) r.items.Add(watch.Item{Eval: eval.ID}) + r.items.Add(watch.Item{EvalJob: eval.JobID}) if err := r.txn.Insert("evals", eval); err != nil { return fmt.Errorf("eval insert failed: %v", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index dcf5775a036..bc61213a16b 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1227,7 +1227,8 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) { notify := setupNotifyTest( state, watch.Item{Table: "evals"}, - watch.Item{Eval: eval.ID}) + watch.Item{Eval: eval.ID}, + watch.Item{EvalJob: eval.JobID}) err := state.UpsertEvals(1000, []*structs.Evaluation{eval}) if err != nil { @@ -1266,10 +1267,12 @@ func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) { notify := setupNotifyTest( state, watch.Item{Table: "evals"}, - watch.Item{Eval: eval.ID}) + watch.Item{Eval: eval.ID}, + watch.Item{EvalJob: eval.JobID}) eval2 := mock.Eval() eval2.ID = eval.ID + eval2.JobID = eval.JobID err = state.UpsertEvals(1001, []*structs.Evaluation{eval2}) if err != nil { t.Fatalf("err: %v", err) @@ -1315,6 +1318,8 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) { watch.Item{Table: "allocs"}, watch.Item{Eval: eval1.ID}, watch.Item{Eval: eval2.ID}, + watch.Item{EvalJob: eval1.JobID}, + watch.Item{EvalJob: eval2.JobID}, watch.Item{Alloc: alloc1.ID}, watch.Item{Alloc: alloc2.ID}, watch.Item{AllocEval: alloc1.EvalID}, diff --git a/nomad/watch/watch.go b/nomad/watch/watch.go index b6a71749c85..8578df33f85 100644 --- a/nomad/watch/watch.go +++ b/nomad/watch/watch.go @@ -14,6 +14,7 @@ type Item struct { AllocJob string AllocNode string Eval string + EvalJob string Job string JobSummary string Node string