Skip to content

Commit

Permalink
evaluations list pagination and filtering (#11648)
Browse files Browse the repository at this point in the history
API queries can request pagination using the `NextToken` and `PerPage`
fields of `QueryOptions`, when supported by the underlying API.

Add a `NextToken` field to the `structs.QueryMeta` so that we have a
common field across RPCs to tell the caller where to resume paging
from on their next API call. Include this field on the `api.QueryMeta`
as well so that it's available for future versions of List HTTP APIs
that wrap the response with `QueryMeta` rather than returning a simple
list of structs. In the meantime callers can get the `X-Nomad-NextToken`.

Add pagination to the `Eval.List` RPC by checking for pagination token
and page size in `QueryOptions`. This will allow resuming from the
last ID seen so long as the query parameters and the state store
itself are unchanged between requests.

Add filtering by job ID or evaluation status over the results we get
out of the state store.

Parse the query parameters of the `Eval.List` API into the arguments
expected for filtering in the RPC call.
  • Loading branch information
tgross authored Dec 10, 2021
1 parent ddca508 commit 972708a
Show file tree
Hide file tree
Showing 12 changed files with 584 additions and 47 deletions.
3 changes: 3 additions & 0 deletions .changelog/11648.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
api: Add pagination and filtering to Evaluations List API
```
18 changes: 16 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ type QueryOptions struct {
// paginated lists.
PerPage int32

// NextToken is the token used indicate where to start paging for queries
// that support paginated lists.
// NextToken is the token used to indicate where to start paging
// for queries that support paginated lists. This token should be
// the ID of the next object after the last one seen in the
// previous response.
NextToken string

// ctx is an optional context pass through to the underlying HTTP
Expand Down Expand Up @@ -113,6 +115,11 @@ type QueryMeta struct {

// How long did the request take
RequestTime time.Duration

// NextToken is the token used to indicate where to start paging
// for queries that support paginated lists. To resume paging from
// this point, pass this token in the next request's QueryOptions
NextToken string
}

// WriteMeta is used to return meta data about a write
Expand Down Expand Up @@ -574,6 +581,12 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.Prefix != "" {
r.params.Set("prefix", q.Prefix)
}
if q.PerPage != 0 {
r.params.Set("per_page", fmt.Sprint(q.PerPage))
}
if q.NextToken != "" {
r.params.Set("next_token", q.NextToken)
}
for k, v := range q.Params {
r.params.Set(k, v)
}
Expand Down Expand Up @@ -958,6 +971,7 @@ func parseQueryMeta(resp *http.Response, q *QueryMeta) error {
return fmt.Errorf("Failed to parse X-Nomad-LastContact: %v", err)
}
q.LastContact = time.Duration(last) * time.Millisecond
q.NextToken = header.Get("X-Nomad-NextToken")

// Parse the X-Nomad-KnownLeader
switch header.Get("X-Nomad-KnownLeader") {
Expand Down
32 changes: 31 additions & 1 deletion api/evaluations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sort"
"strings"
"testing"

"github.com/hashicorp/nomad/api/internal/testutil"
)

func TestEvaluations_List(t *testing.T) {
Expand Down Expand Up @@ -43,10 +45,38 @@ func TestEvaluations_List(t *testing.T) {

// if the eval fails fast there can be more than 1
// but they are in order of most recent first, so look at the last one
if len(result) == 0 {
t.Fatalf("expected eval (%s), got none", resp.EvalID)
}
idx := len(result) - 1
if len(result) == 0 || result[idx].ID != resp.EvalID {
if result[idx].ID != resp.EvalID {
t.Fatalf("expected eval (%s), got: %#v", resp.EvalID, result[idx])
}

// wait until the 2nd eval shows up before we try paging
results := []*Evaluation{}
testutil.WaitForResult(func() (bool, error) {
results, _, err = e.List(nil)
if len(results) < 2 || err != nil {
return false, err
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})

// Check the evaluations again with paging; note that while this
// package sorts by timestamp, the actual HTTP API sorts by ID
// so we need to use that for the NextToken
ids := []string{results[0].ID, results[1].ID}
sort.Strings(ids)
result, qm, err = e.List(&QueryOptions{PerPage: int32(1), NextToken: ids[1]})
if err != nil {
t.Fatalf("err: %s", err)
}
if len(result) != 1 {
t.Fatalf("expected no evals after last one but got %v", result[0])
}
}

func TestEvaluations_PrefixList(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions command/agent/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (
return nil, nil
}

query := req.URL.Query()
args.FilterEvalStatus = query.Get("status")
args.FilterJobID = query.Get("job")

var out structs.EvalListResponse
if err := s.agent.RPC("Eval.List", &args, &out); err != nil {
return nil, err
Expand Down
62 changes: 34 additions & 28 deletions command/agent/eval_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package agent

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -17,39 +20,42 @@ func TestHTTP_EvalList(t *testing.T) {
eval1 := mock.Eval()
eval2 := mock.Eval()
err := state.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(t, err)

// Make the HTTP request
// simple list request
req, err := http.NewRequest("GET", "/v1/evaluations", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(t, err)
respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.EvalsRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
if respW.HeaderMap.Get("X-Nomad-KnownLeader") != "true" {
t.Fatalf("missing known leader")
}
if respW.HeaderMap.Get("X-Nomad-LastContact") == "" {
t.Fatalf("missing last contact")
}
require.NoError(t, err)

// check headers and response body
require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-Index"), "missing index")
require.Equal(t, "true", respW.HeaderMap.Get("X-Nomad-KnownLeader"), "missing known leader")
require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-LastContact"), "missing last contact")
require.Len(t, obj.([]*structs.Evaluation), 2, "expected 2 evals")

// paginated list request
req, err = http.NewRequest("GET", "/v1/evaluations?per_page=1", nil)
require.NoError(t, err)
respW = httptest.NewRecorder()
obj, err = s.Server.EvalsRequest(respW, req)
require.NoError(t, err)

// check response body
require.Len(t, obj.([]*structs.Evaluation), 1, "expected 1 eval")

// filtered list request
req, err = http.NewRequest("GET",
fmt.Sprintf("/v1/evaluations?per_page=10&job=%s", eval2.JobID), nil)
require.NoError(t, err)
respW = httptest.NewRecorder()
obj, err = s.Server.EvalsRequest(respW, req)
require.NoError(t, err)

// check response body
require.Len(t, obj.([]*structs.Evaluation), 1, "expected 1 eval")

// Check the eval
e := obj.([]*structs.Evaluation)
if len(e) != 2 {
t.Fatalf("bad: %#v", e)
}
})
}

Expand Down
11 changes: 9 additions & 2 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,11 +608,19 @@ func setLastContact(resp http.ResponseWriter, last time.Duration) {
resp.Header().Set("X-Nomad-LastContact", strconv.FormatUint(lastMsec, 10))
}

// setNextToken is used to set the next token header for pagination
func setNextToken(resp http.ResponseWriter, nextToken string) {
if nextToken != "" {
resp.Header().Set("X-Nomad-NextToken", nextToken)
}
}

// setMeta is used to set the query response meta data
func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) {
setIndex(resp, m.Index)
setLastContact(resp, m.LastContact)
setKnownLeader(resp, m.KnownLeader)
setNextToken(resp, m.NextToken)
}

// setHeaders is used to set canonical response header fields
Expand Down Expand Up @@ -746,8 +754,7 @@ func parsePagination(req *http.Request, b *structs.QueryOptions) {
}
}

nextToken := query.Get("next_token")
b.NextToken = nextToken
b.NextToken = query.Get("next_token")
}

// parseWriteRequest is a convenience method for endpoints that need to parse a
Expand Down
31 changes: 19 additions & 12 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,32 +349,39 @@ func (e *Eval) List(args *structs.EvalListRequest,
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
run: func(ws memdb.WatchSet, store *state.StateStore) error {
// Scan all the evaluations
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.EvalsByIDPrefix(ws, args.RequestNamespace(), prefix)
iter, err = store.EvalsByIDPrefix(ws, args.RequestNamespace(), prefix)
} else {
iter, err = state.EvalsByNamespace(ws, args.RequestNamespace())
iter, err = store.EvalsByNamespace(ws, args.RequestNamespace())
}
if err != nil {
return err
}

var evals []*structs.Evaluation
for {
raw := iter.Next()
if raw == nil {
break
iter = memdb.NewFilterIterator(iter, func(raw interface{}) bool {
if eval := raw.(*structs.Evaluation); eval != nil {
return args.ShouldBeFiltered(eval)
}
eval := raw.(*structs.Evaluation)
evals = append(evals, eval)
}
return false
})

var evals []*structs.Evaluation
paginator := state.NewPaginator(iter, args.QueryOptions,
func(raw interface{}) {
eval := raw.(*structs.Evaluation)
evals = append(evals, eval)
})

nextToken := paginator.Page()
reply.QueryMeta.NextToken = nextToken
reply.Evaluations = evals

// Use the last index that affected the jobs table
index, err := state.Index("evals")
index, err := store.Index("evals")
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 972708a

Please sign in to comment.