Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Take bounds into account for list endpoints #3110

Merged
merged 2 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions src/query/api/v1/handler/prometheus/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,35 @@ func parseTagCompletionQueries(r *http.Request) ([]string, error) {
return queries, nil
}

// ParseStartAndEnd parses start and end params from the request.
func ParseStartAndEnd(
r *http.Request,
parseOpts xpromql.ParseOptions,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need the whole parseOpts instead of only the NowFn()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured this was cleaner than building a new parseOpts on each query; can go that route if preferred

) (time.Time, time.Time, error) {
if err := r.ParseForm(); err != nil {
return time.Time{}, time.Time{}, xerrors.NewInvalidParamsError(err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this propagate as a 400?

}

start, err := util.ParseTimeStringWithDefault(r.FormValue("start"),
time.Unix(0, 0))
if err != nil {
return time.Time{}, time.Time{}, xerrors.NewInvalidParamsError(err)
}

end, err := util.ParseTimeStringWithDefault(r.FormValue("end"),
parseOpts.NowFn()())
if err != nil {
return time.Time{}, time.Time{}, xerrors.NewInvalidParamsError(err)
}

if start.After(end) {
err := fmt.Errorf("start %v must be after end %v", start, end)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before end

return time.Time{}, time.Time{}, xerrors.NewInvalidParamsError(err)
}

return start, end, nil
}

// ParseSeriesMatchQuery parses all params from the GET request.
func ParseSeriesMatchQuery(
r *http.Request,
Expand All @@ -188,16 +217,9 @@ func ParseSeriesMatchQuery(
return nil, xerrors.NewInvalidParamsError(errors.ErrInvalidMatchers)
}

start, err := util.ParseTimeStringWithDefault(r.FormValue("start"),
time.Unix(0, 0))
if err != nil {
return nil, xerrors.NewInvalidParamsError(err)
}

end, err := util.ParseTimeStringWithDefault(r.FormValue("end"),
time.Now())
start, end, err := ParseStartAndEnd(r, parseOpts)
if err != nil {
return nil, xerrors.NewInvalidParamsError(err)
return nil, err
}

queries := make([]*storage.FetchQuery, len(matcherValues))
Expand Down
60 changes: 59 additions & 1 deletion src/query/api/v1/handler/prometheus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ package prometheus

import (
"bytes"
"context"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/test"
xerrors "github.com/m3db/m3/src/x/errors"
xhttp "github.com/m3db/m3/src/x/net/http"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPromCompressedReadSuccess(t *testing.T) {
Expand Down Expand Up @@ -102,7 +108,7 @@ func TestRenderSeriesMatchResultsNoTags(t *testing.T) {
}

seriesMatchResult := []models.Metrics{
models.Metrics{
{
toTags("name", tag{name: "a", value: "b"}, tag{name: "role", value: "appears"}),
toTags("name2", tag{name: "c", value: "d"}, tag{name: "e", value: "f"}),
},
Expand Down Expand Up @@ -135,3 +141,55 @@ func TestRenderSeriesMatchResultsNoTags(t *testing.T) {
assert.Equal(t, expected, w.value)
}
}

func TestParseStartAndEnd(t *testing.T) {
endTime := time.Now().Truncate(time.Hour)
opts := promql.NewParseOptions().SetNowFn(func() time.Time { return endTime })

tests := []struct {
querystring string
exStart time.Time
exEnd time.Time
exErr bool
}{
{querystring: "", exStart: time.Unix(0, 0), exEnd: endTime},
{querystring: "start=100", exStart: time.Unix(100, 0), exEnd: endTime},
{querystring: "start=100&end=200", exStart: time.Unix(100, 0), exEnd: time.Unix(200, 0)},
{querystring: "start=200&end=100", exErr: true},
{querystring: "start=foo&end=100", exErr: true},
{querystring: "start=100&end=bar", exErr: true},
}

for _, tt := range tests {
t.Run(fmt.Sprintf("GET_%s", tt.querystring), func(t *testing.T) {
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet,
fmt.Sprintf("/?%s", tt.querystring), nil)
require.NoError(t, err)

start, end, err := ParseStartAndEnd(req, opts)
if tt.exErr {
require.Error(t, err)
} else {
assert.Equal(t, tt.exStart, start)
assert.Equal(t, tt.exEnd, end)
}
})
}

for _, tt := range tests {
t.Run(fmt.Sprintf("POST_%s", tt.querystring), func(t *testing.T) {
b := bytes.NewBuffer([]byte(tt.querystring))
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, "/", b)
require.NoError(t, err)
req.Header.Add(xhttp.HeaderContentType, xhttp.ContentTypeFormURLEncoded)

start, end, err := ParseStartAndEnd(req, opts)
if tt.exErr {
require.Error(t, err)
} else {
assert.Equal(t, tt.exStart, start)
assert.Equal(t, tt.exEnd, end)
}
})
}
}
19 changes: 11 additions & 8 deletions src/query/api/v1/handler/prometheus/native/list_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ package native
import (
"context"
"net/http"
"time"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/instrument"
xhttp "github.com/m3db/m3/src/x/net/http"

Expand All @@ -53,7 +52,7 @@ var (
type ListTagsHandler struct {
storage storage.Storage
fetchOptionsBuilder handleroptions.FetchOptionsBuilder
nowFn clock.NowFn
parseOpts promql.ParseOptions
instrumentOpts instrument.Options
}

Expand All @@ -62,7 +61,7 @@ func NewListTagsHandler(opts options.HandlerOptions) http.Handler {
return &ListTagsHandler{
storage: opts.Storage(),
fetchOptionsBuilder: opts.FetchOptionsBuilder(),
nowFn: opts.NowFn(),
parseOpts: promql.NewParseOptions().SetNowFn(opts.NowFn()),
instrumentOpts: opts.InstrumentOpts(),
}
}
Expand All @@ -72,13 +71,17 @@ func (h *ListTagsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := logging.WithContext(ctx, h.instrumentOpts)
w.Header().Set(xhttp.HeaderContentType, xhttp.ContentTypeJSON)

start, end, err := prometheus.ParseStartAndEnd(r, h.parseOpts)
if err != nil {
xhttp.WriteError(w, err)
return
}

query := &storage.CompleteTagsQuery{
CompleteNameOnly: true,
TagMatchers: models.Matchers{{Type: models.MatchAll}},

// NB: necessarily spans entire possible query range.
Start: time.Time{},
End: h.nowFn(),
Start: start,
End: end,
}

opts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r)
Expand Down
20 changes: 7 additions & 13 deletions src/query/api/v1/handler/prometheus/native/list_tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
)

type listTagsMatcher struct {
now time.Time
start, end time.Time
}

func (m *listTagsMatcher) String() string { return "list tags query" }
Expand All @@ -52,12 +52,12 @@ func (m *listTagsMatcher) Matches(x interface{}) bool {
return false
}

if !q.Start.Equal(time.Time{}) {
if !q.Start.Equal(m.start) {
return false
}

// NB: end time for the query should be roughly `Now`
if !q.End.Equal(m.now) {
if !q.End.Equal(m.end) {
return false
}

Expand Down Expand Up @@ -119,7 +119,7 @@ func testListTags(t *testing.T, meta block.ResultMetadata, header string) {
SetNowFn(nowFn)
h := NewListTagsHandler(opts)
for _, method := range []string{"GET", "POST"} {
matcher := &listTagsMatcher{now: now}
matcher := &listTagsMatcher{start: time.Unix(0, 0), end: now}
store.EXPECT().CompleteTags(gomock.Any(), matcher, gomock.Any()).
Return(storeResult, nil)

Expand Down Expand Up @@ -150,25 +150,19 @@ func TestListErrorTags(t *testing.T) {

// setup storage and handler
store := storage.NewMockStorage(ctrl)
now := time.Now()
nowFn := func() time.Time {
return now
}

fb, err := handleroptions.NewFetchOptionsBuilder(
handleroptions.FetchOptionsBuilderOptions{Timeout: 15 * time.Second})
require.NoError(t, err)
opts := options.EmptyHandlerOptions().
SetStorage(store).
SetFetchOptionsBuilder(fb).
SetNowFn(nowFn)
SetFetchOptionsBuilder(fb)
handler := NewListTagsHandler(opts)
for _, method := range []string{"GET", "POST"} {
matcher := &listTagsMatcher{now: now}
matcher := &listTagsMatcher{start: time.Unix(100, 0), end: time.Unix(1000, 0)}
store.EXPECT().CompleteTags(gomock.Any(), matcher, gomock.Any()).
Return(nil, errors.New("err"))

req := httptest.NewRequest(method, "/labels", nil)
req := httptest.NewRequest(method, "/labels?start=100&end=1000", nil)
w := httptest.NewRecorder()

handler.ServeHTTP(w, req)
Expand Down
21 changes: 12 additions & 9 deletions src/query/api/v1/handler/prometheus/remote/tag_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ package remote
import (
"context"
"net/http"
"time"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/errors"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/instrument"
xhttp "github.com/m3db/m3/src/x/net/http"

Expand All @@ -58,7 +57,7 @@ const (
type TagValuesHandler struct {
storage storage.Storage
fetchOptionsBuilder handleroptions.FetchOptionsBuilder
nowFn clock.NowFn
parseOpts promql.ParseOptions
instrumentOpts instrument.Options
}

Expand All @@ -72,7 +71,7 @@ func NewTagValuesHandler(options options.HandlerOptions) http.Handler {
return &TagValuesHandler{
storage: options.Storage(),
fetchOptionsBuilder: options.FetchOptionsBuilder(),
nowFn: options.NowFn(),
parseOpts: promql.NewParseOptions().SetNowFn(options.NowFn()),
instrumentOpts: options.InstrumentOpts(),
}
}
Expand All @@ -85,7 +84,7 @@ func (h *TagValuesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
query, err := h.parseTagValuesToQuery(r)
if err != nil {
logger.Error("unable to parse tag values to query", zap.Error(err))
xhttp.WriteError(w, xhttp.NewError(err, http.StatusBadRequest))
xhttp.WriteError(w, err)
return
}

Expand Down Expand Up @@ -117,14 +116,18 @@ func (h *TagValuesHandler) parseTagValuesToQuery(
vars := mux.Vars(r)
name, ok := vars[NameReplace]
if !ok || len(name) == 0 {
return nil, errors.ErrNoName
return nil, xhttp.NewError(errors.ErrNoName, http.StatusBadRequest)
}

start, end, err := prometheus.ParseStartAndEnd(r, h.parseOpts)
if err != nil {
return nil, err
}

nameBytes := []byte(name)
return &storage.CompleteTagsQuery{
// NB: necessarily spans the entire timerange for the index.
Start: time.Time{},
End: h.nowFn(),
Start: start,
End: end,
CompleteNameOnly: false,
FilterNameTags: [][]byte{nameBytes},
TagMatchers: models.Matchers{
Expand Down
15 changes: 8 additions & 7 deletions src/query/api/v1/handler/prometheus/remote/tag_values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import (
)

type tagValuesMatcher struct {
now time.Time
filterTag string
start, end time.Time
filterTag string
}

func (m *tagValuesMatcher) String() string { return "tag values query" }
Expand All @@ -55,11 +55,11 @@ func (m *tagValuesMatcher) Matches(x interface{}) bool {
return false
}

if !q.Start.Equal(time.Time{}) {
if !q.Start.Equal(m.start) {
return false
}

if !q.End.Equal(m.now) {
if !q.End.Equal(m.end) {
return false
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func TestTagValues(t *testing.T) {
url := fmt.Sprintf("/label/{%s}/values", NameReplace)

for _, tt := range names {
path := fmt.Sprintf("/label/%s/values", tt.name)
path := fmt.Sprintf("/label/%s/values?start=100", tt.name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a case with an end too?

req, err := http.NewRequest("GET", path, nil)
if err != nil {
t.Fatal(err)
Expand All @@ -132,7 +132,8 @@ func TestTagValues(t *testing.T) {
rr := httptest.NewRecorder()
router := mux.NewRouter()
matcher := &tagValuesMatcher{
now: now,
start: time.Unix(100, 0),
end: now,
filterTag: tt.name,
}

Expand All @@ -146,7 +147,7 @@ func TestTagValues(t *testing.T) {
},
Metadata: block.ResultMetadata{
Exhaustive: false,
Warnings: []block.Warning{block.Warning{Name: "foo", Message: "bar"}},
Warnings: []block.Warning{{Name: "foo", Message: "bar"}},
},
}

Expand Down
Loading