Skip to content

Commit

Permalink
[query] Take bounds into account for list endpoints (#3110)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Jan 21, 2021
1 parent 7ae7b3c commit 9be8ef9
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 52 deletions.
40 changes: 31 additions & 9 deletions src/query/api/v1/handler/prometheus/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,35 @@ func parseTagCompletionQueries(r *http.Request) ([]string, error) {
return queries, nil
}

// ParseStartAndEnd parses start and end params from the request.
func ParseStartAndEnd(
r *http.Request,
parseOpts xpromql.ParseOptions,
) (time.Time, time.Time, error) {
if err := r.ParseForm(); err != nil {
return time.Time{}, time.Time{}, xerrors.NewInvalidParamsError(err)
}

start, err := util.ParseTimeStringWithDefault(r.FormValue("start"),
time.Unix(0, 0))
if err != nil {
return time.Time{}, time.Time{}, xerrors.NewInvalidParamsError(err)
}

end, err := util.ParseTimeStringWithDefault(r.FormValue("end"),
parseOpts.NowFn()())
if err != nil {
return time.Time{}, time.Time{}, xerrors.NewInvalidParamsError(err)
}

if start.After(end) {
err := fmt.Errorf("start %v must be after end %v", start, end)
return time.Time{}, time.Time{}, xerrors.NewInvalidParamsError(err)
}

return start, end, nil
}

// ParseSeriesMatchQuery parses all params from the GET request.
func ParseSeriesMatchQuery(
r *http.Request,
Expand All @@ -188,16 +217,9 @@ func ParseSeriesMatchQuery(
return nil, xerrors.NewInvalidParamsError(errors.ErrInvalidMatchers)
}

start, err := util.ParseTimeStringWithDefault(r.FormValue("start"),
time.Unix(0, 0))
if err != nil {
return nil, xerrors.NewInvalidParamsError(err)
}

end, err := util.ParseTimeStringWithDefault(r.FormValue("end"),
time.Now())
start, end, err := ParseStartAndEnd(r, parseOpts)
if err != nil {
return nil, xerrors.NewInvalidParamsError(err)
return nil, err
}

queries := make([]*storage.FetchQuery, len(matcherValues))
Expand Down
60 changes: 59 additions & 1 deletion src/query/api/v1/handler/prometheus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ package prometheus

import (
"bytes"
"context"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/test"
xerrors "github.com/m3db/m3/src/x/errors"
xhttp "github.com/m3db/m3/src/x/net/http"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPromCompressedReadSuccess(t *testing.T) {
Expand Down Expand Up @@ -102,7 +108,7 @@ func TestRenderSeriesMatchResultsNoTags(t *testing.T) {
}

seriesMatchResult := []models.Metrics{
models.Metrics{
{
toTags("name", tag{name: "a", value: "b"}, tag{name: "role", value: "appears"}),
toTags("name2", tag{name: "c", value: "d"}, tag{name: "e", value: "f"}),
},
Expand Down Expand Up @@ -135,3 +141,55 @@ func TestRenderSeriesMatchResultsNoTags(t *testing.T) {
assert.Equal(t, expected, w.value)
}
}

func TestParseStartAndEnd(t *testing.T) {
endTime := time.Now().Truncate(time.Hour)
opts := promql.NewParseOptions().SetNowFn(func() time.Time { return endTime })

tests := []struct {
querystring string
exStart time.Time
exEnd time.Time
exErr bool
}{
{querystring: "", exStart: time.Unix(0, 0), exEnd: endTime},
{querystring: "start=100", exStart: time.Unix(100, 0), exEnd: endTime},
{querystring: "start=100&end=200", exStart: time.Unix(100, 0), exEnd: time.Unix(200, 0)},
{querystring: "start=200&end=100", exErr: true},
{querystring: "start=foo&end=100", exErr: true},
{querystring: "start=100&end=bar", exErr: true},
}

for _, tt := range tests {
t.Run(fmt.Sprintf("GET_%s", tt.querystring), func(t *testing.T) {
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet,
fmt.Sprintf("/?%s", tt.querystring), nil)
require.NoError(t, err)

start, end, err := ParseStartAndEnd(req, opts)
if tt.exErr {
require.Error(t, err)
} else {
assert.Equal(t, tt.exStart, start)
assert.Equal(t, tt.exEnd, end)
}
})
}

for _, tt := range tests {
t.Run(fmt.Sprintf("POST_%s", tt.querystring), func(t *testing.T) {
b := bytes.NewBuffer([]byte(tt.querystring))
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, "/", b)
require.NoError(t, err)
req.Header.Add(xhttp.HeaderContentType, xhttp.ContentTypeFormURLEncoded)

start, end, err := ParseStartAndEnd(req, opts)
if tt.exErr {
require.Error(t, err)
} else {
assert.Equal(t, tt.exStart, start)
assert.Equal(t, tt.exEnd, end)
}
})
}
}
19 changes: 11 additions & 8 deletions src/query/api/v1/handler/prometheus/native/list_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ package native
import (
"context"
"net/http"
"time"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/instrument"
xhttp "github.com/m3db/m3/src/x/net/http"

Expand All @@ -53,7 +52,7 @@ var (
type ListTagsHandler struct {
storage storage.Storage
fetchOptionsBuilder handleroptions.FetchOptionsBuilder
nowFn clock.NowFn
parseOpts promql.ParseOptions
instrumentOpts instrument.Options
}

Expand All @@ -62,7 +61,7 @@ func NewListTagsHandler(opts options.HandlerOptions) http.Handler {
return &ListTagsHandler{
storage: opts.Storage(),
fetchOptionsBuilder: opts.FetchOptionsBuilder(),
nowFn: opts.NowFn(),
parseOpts: promql.NewParseOptions().SetNowFn(opts.NowFn()),
instrumentOpts: opts.InstrumentOpts(),
}
}
Expand All @@ -72,13 +71,17 @@ func (h *ListTagsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := logging.WithContext(ctx, h.instrumentOpts)
w.Header().Set(xhttp.HeaderContentType, xhttp.ContentTypeJSON)

start, end, err := prometheus.ParseStartAndEnd(r, h.parseOpts)
if err != nil {
xhttp.WriteError(w, err)
return
}

query := &storage.CompleteTagsQuery{
CompleteNameOnly: true,
TagMatchers: models.Matchers{{Type: models.MatchAll}},

// NB: necessarily spans entire possible query range.
Start: time.Time{},
End: h.nowFn(),
Start: start,
End: end,
}

opts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r)
Expand Down
20 changes: 7 additions & 13 deletions src/query/api/v1/handler/prometheus/native/list_tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
)

type listTagsMatcher struct {
now time.Time
start, end time.Time
}

func (m *listTagsMatcher) String() string { return "list tags query" }
Expand All @@ -52,12 +52,12 @@ func (m *listTagsMatcher) Matches(x interface{}) bool {
return false
}

if !q.Start.Equal(time.Time{}) {
if !q.Start.Equal(m.start) {
return false
}

// NB: end time for the query should be roughly `Now`
if !q.End.Equal(m.now) {
if !q.End.Equal(m.end) {
return false
}

Expand Down Expand Up @@ -119,7 +119,7 @@ func testListTags(t *testing.T, meta block.ResultMetadata, header string) {
SetNowFn(nowFn)
h := NewListTagsHandler(opts)
for _, method := range []string{"GET", "POST"} {
matcher := &listTagsMatcher{now: now}
matcher := &listTagsMatcher{start: time.Unix(0, 0), end: now}
store.EXPECT().CompleteTags(gomock.Any(), matcher, gomock.Any()).
Return(storeResult, nil)

Expand Down Expand Up @@ -150,25 +150,19 @@ func TestListErrorTags(t *testing.T) {

// setup storage and handler
store := storage.NewMockStorage(ctrl)
now := time.Now()
nowFn := func() time.Time {
return now
}

fb, err := handleroptions.NewFetchOptionsBuilder(
handleroptions.FetchOptionsBuilderOptions{Timeout: 15 * time.Second})
require.NoError(t, err)
opts := options.EmptyHandlerOptions().
SetStorage(store).
SetFetchOptionsBuilder(fb).
SetNowFn(nowFn)
SetFetchOptionsBuilder(fb)
handler := NewListTagsHandler(opts)
for _, method := range []string{"GET", "POST"} {
matcher := &listTagsMatcher{now: now}
matcher := &listTagsMatcher{start: time.Unix(100, 0), end: time.Unix(1000, 0)}
store.EXPECT().CompleteTags(gomock.Any(), matcher, gomock.Any()).
Return(nil, errors.New("err"))

req := httptest.NewRequest(method, "/labels", nil)
req := httptest.NewRequest(method, "/labels?start=100&end=1000", nil)
w := httptest.NewRecorder()

handler.ServeHTTP(w, req)
Expand Down
21 changes: 12 additions & 9 deletions src/query/api/v1/handler/prometheus/remote/tag_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ package remote
import (
"context"
"net/http"
"time"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/errors"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/instrument"
xhttp "github.com/m3db/m3/src/x/net/http"

Expand All @@ -58,7 +57,7 @@ const (
type TagValuesHandler struct {
storage storage.Storage
fetchOptionsBuilder handleroptions.FetchOptionsBuilder
nowFn clock.NowFn
parseOpts promql.ParseOptions
instrumentOpts instrument.Options
}

Expand All @@ -72,7 +71,7 @@ func NewTagValuesHandler(options options.HandlerOptions) http.Handler {
return &TagValuesHandler{
storage: options.Storage(),
fetchOptionsBuilder: options.FetchOptionsBuilder(),
nowFn: options.NowFn(),
parseOpts: promql.NewParseOptions().SetNowFn(options.NowFn()),
instrumentOpts: options.InstrumentOpts(),
}
}
Expand All @@ -85,7 +84,7 @@ func (h *TagValuesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
query, err := h.parseTagValuesToQuery(r)
if err != nil {
logger.Error("unable to parse tag values to query", zap.Error(err))
xhttp.WriteError(w, xhttp.NewError(err, http.StatusBadRequest))
xhttp.WriteError(w, err)
return
}

Expand Down Expand Up @@ -117,14 +116,18 @@ func (h *TagValuesHandler) parseTagValuesToQuery(
vars := mux.Vars(r)
name, ok := vars[NameReplace]
if !ok || len(name) == 0 {
return nil, errors.ErrNoName
return nil, xhttp.NewError(errors.ErrNoName, http.StatusBadRequest)
}

start, end, err := prometheus.ParseStartAndEnd(r, h.parseOpts)
if err != nil {
return nil, err
}

nameBytes := []byte(name)
return &storage.CompleteTagsQuery{
// NB: necessarily spans the entire timerange for the index.
Start: time.Time{},
End: h.nowFn(),
Start: start,
End: end,
CompleteNameOnly: false,
FilterNameTags: [][]byte{nameBytes},
TagMatchers: models.Matchers{
Expand Down
15 changes: 8 additions & 7 deletions src/query/api/v1/handler/prometheus/remote/tag_values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import (
)

type tagValuesMatcher struct {
now time.Time
filterTag string
start, end time.Time
filterTag string
}

func (m *tagValuesMatcher) String() string { return "tag values query" }
Expand All @@ -55,11 +55,11 @@ func (m *tagValuesMatcher) Matches(x interface{}) bool {
return false
}

if !q.Start.Equal(time.Time{}) {
if !q.Start.Equal(m.start) {
return false
}

if !q.End.Equal(m.now) {
if !q.End.Equal(m.end) {
return false
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func TestTagValues(t *testing.T) {
url := fmt.Sprintf("/label/{%s}/values", NameReplace)

for _, tt := range names {
path := fmt.Sprintf("/label/%s/values", tt.name)
path := fmt.Sprintf("/label/%s/values?start=100", tt.name)
req, err := http.NewRequest("GET", path, nil)
if err != nil {
t.Fatal(err)
Expand All @@ -132,7 +132,8 @@ func TestTagValues(t *testing.T) {
rr := httptest.NewRecorder()
router := mux.NewRouter()
matcher := &tagValuesMatcher{
now: now,
start: time.Unix(100, 0),
end: now,
filterTag: tt.name,
}

Expand All @@ -146,7 +147,7 @@ func TestTagValues(t *testing.T) {
},
Metadata: block.ResultMetadata{
Exhaustive: false,
Warnings: []block.Warning{block.Warning{Name: "foo", Message: "bar"}},
Warnings: []block.Warning{{Name: "foo", Message: "bar"}},
},
}

Expand Down
Loading

0 comments on commit 9be8ef9

Please sign in to comment.