From c535a7d4196e25b6a9080d64ff3f7b06b54b9952 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Wed, 20 Mar 2019 15:02:06 -0400 Subject: [PATCH] [query] Add new aggregate endpoint to query --- .../carbon/expected/a.ba.json | 16 ++ .../docker-integration-tests/carbon/test.sh | 3 +- src/query/api/v1/handler/graphite/find.go | 100 +++++-- .../api/v1/handler/graphite/find_parser.go | 88 +++++-- .../api/v1/handler/graphite/find_test.go | 245 ++++++++++++++++++ src/query/api/v1/handler/prometheus/common.go | 131 +++++----- .../api/v1/handler/prometheus/common_test.go | 121 ++++----- .../api/v1/handler/prometheus/remote/match.go | 23 +- .../api/v1/handler/prometheus/remote/read.go | 1 + src/query/api/v1/handler/search.go | 8 +- src/query/generated/mocks/generate.go | 1 + src/query/graphite/graphite/glob.go | 28 +- src/query/graphite/graphite/glob_test.go | 41 ++- src/query/graphite/storage/converter.go | 38 ++- src/query/graphite/storage/converter_test.go | 37 ++- src/query/graphite/storage/m3_wrapper.go | 37 ++- src/query/graphite/storage/m3_wrapper_test.go | 16 +- src/query/models/strconv/checker.go | 42 +++ src/query/models/strconv/checker_test.go | 32 +++ src/query/storage/completed_tags.go | 8 + src/query/storage/fanout/storage.go | 4 +- src/query/storage/fanout/storage_test.go | 8 +- src/query/storage/index.go | 69 +++-- src/query/storage/index_test.go | 45 +++- src/query/storage/m3/storage.go | 114 +++++--- src/query/storage/m3/storage_test.go | 128 ++++++--- src/query/storage/mock/storage.go | 6 +- src/query/storage/remote/storage.go | 4 +- src/query/storage/storage_mock.go | 159 ++++++++++++ src/query/storage/types.go | 6 +- src/query/storage/validator/storage.go | 4 +- src/query/stores/m3db/async_session.go | 46 ++-- src/query/stores/m3db/async_session_test.go | 7 + src/query/test/storage.go | 4 +- src/query/tsdb/remote/client.go | 2 +- 35 files changed, 1186 insertions(+), 436 deletions(-) create mode 100644 scripts/docker-integration-tests/carbon/expected/a.ba.json create mode 100644 src/query/api/v1/handler/graphite/find_test.go create mode 100644 src/query/storage/storage_mock.go diff --git a/scripts/docker-integration-tests/carbon/expected/a.ba.json b/scripts/docker-integration-tests/carbon/expected/a.ba.json new file mode 100644 index 0000000000..d14ff031ce --- /dev/null +++ b/scripts/docker-integration-tests/carbon/expected/a.ba.json @@ -0,0 +1,16 @@ +[ + { + "id": "a.bag", + "text": "bag", + "leaf": 1, + "expandable": 0, + "allowChildren": 0 + }, + { + "id": "a.bar", + "text": "bar", + "leaf": 0, + "expandable": 1, + "allowChildren": 1 + } +] diff --git a/scripts/docker-integration-tests/carbon/test.sh b/scripts/docker-integration-tests/carbon/test.sh index 7341b7ff21..35557b7006 100755 --- a/scripts/docker-integration-tests/carbon/test.sh +++ b/scripts/docker-integration-tests/carbon/test.sh @@ -34,7 +34,7 @@ function read_carbon { function find_carbon { query=$1 expected_file=$2 - RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/graphite/metrics/find?query=$query") + RESPONSE=$(curl -sSg "http://localhost:7201/api/v1/graphite/metrics/find?query=$query") ACTUAL=$(echo $RESPONSE | jq '. | sort') EXPECTED=$(cat $EXPECTED_PATH/$expected_file | jq '. | sort') if [ "$ACTUAL" == "$EXPECTED" ] @@ -83,6 +83,7 @@ echo "a.bar.caw.daz 0 $t" | nc 0.0.0.0 7204 echo "a.bag 0 $t" | nc 0.0.0.0 7204 ATTEMPTS=5 TIMEOUT=1 retry_with_backoff find_carbon a* a.json ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.b* a.b.json +ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.ba[rg] a.ba.json ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.b*.c* a.b.c.json ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.b*.caw.* a.b.c.d.json ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon x none.json diff --git a/src/query/api/v1/handler/graphite/find.go b/src/query/api/v1/handler/graphite/find.go index 16f5c35cf8..ed9c2fd580 100644 --- a/src/query/api/v1/handler/graphite/find.go +++ b/src/query/api/v1/handler/graphite/find.go @@ -21,15 +21,17 @@ package graphite import ( - "bytes" "context" + "errors" "net/http" + "sync" "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/net/http" + xerrors "github.com/m3db/m3x/errors" "go.uber.org/zap" ) @@ -57,6 +59,39 @@ func NewFindHandler( } } +func mergeTags( + terminatedResult *storage.CompleteTagsResult, + childResult *storage.CompleteTagsResult, +) (map[string]bool, error) { + // sanity check the case. + if terminatedResult.CompleteNameOnly { + return nil, errors.New("terminated result is completing name only") + } + + if childResult.CompleteNameOnly { + return nil, errors.New("child result is completing name only") + } + + mapLength := len(terminatedResult.CompletedTags) + len(childResult.CompletedTags) + tagMap := make(map[string]bool, mapLength) + + for _, tag := range terminatedResult.CompletedTags { + for _, value := range tag.Values { + tagMap[string(value)] = false + } + } + + // NB: fine to overwrite any tags which were present in the `terminatedResult` map + // since if they appear in `childResult`, then they exist AND have children. + for _, tag := range childResult.CompletedTags { + for _, value := range tag.Values { + tagMap[string(value)] = true + } + } + + return tagMap, nil +} + func (h *grahiteFindHandler) ServeHTTP( w http.ResponseWriter, r *http.Request, @@ -64,45 +99,54 @@ func (h *grahiteFindHandler) ServeHTTP( ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) logger := logging.WithContext(ctx) w.Header().Set("Content-Type", "application/json") - query, rErr := parseFindParamsToQuery(r) + + // NB: need to run two separate queries, one of which will match only the + // provided matchers, and one which will match the provided matchers with at + // least one more child node. For further information, refer to the comment + // for parseFindParamsToQueries + terminatedQuery, childQuery, raw, rErr := parseFindParamsToQueries(r) if rErr != nil { xhttp.Error(w, rErr.Inner(), rErr.Code()) return } - opts := storage.NewFetchOptions() - result, err := h.storage.FetchTags(ctx, query, opts) - if err != nil { + var ( + terminatedResult *storage.CompleteTagsResult + tErr error + childResult *storage.CompleteTagsResult + cErr error + opts = storage.NewFetchOptions() + + wg sync.WaitGroup + ) + + wg.Add(2) + go func() { + terminatedResult, tErr = h.storage.CompleteTags(ctx, terminatedQuery, opts) + wg.Done() + }() + + go func() { + childResult, cErr = h.storage.CompleteTags(ctx, childQuery, opts) + wg.Done() + }() + + wg.Wait() + if err := xerrors.FirstError(tErr, cErr); err != nil { logger.Error("unable to complete tags", zap.Error(err)) xhttp.Error(w, err, http.StatusBadRequest) return } - partCount := graphite.CountMetricParts(query.Raw) - partName := graphite.TagName(partCount - 1) - seenMap := make(map[string]bool, len(result.Metrics)) - for _, m := range result.Metrics { - tags := m.Tags.Tags - index := 0 - // TODO: make this more performant by computing the index for the tag name. - for i, tag := range tags { - if bytes.Equal(partName, tag.Name) { - index = i - break - } - } - - value := tags[index].Value - // If this value has already been encountered, check if - if hadExtra, seen := seenMap[string(value)]; seen && hadExtra { - continue - } - - hasExtraParts := len(tags) > partCount - seenMap[string(value)] = hasExtraParts + // NB: merge results from both queries to specify which series have children + seenMap, err := mergeTags(terminatedResult, childResult) + if err != nil { + logger.Error("unable to complete tags", zap.Error(err)) + xhttp.Error(w, err, http.StatusBadRequest) + return } - prefix := graphite.DropLastMetricPart(query.Raw) + prefix := graphite.DropLastMetricPart(raw) if len(prefix) > 0 { prefix += "." } diff --git a/src/query/api/v1/handler/graphite/find_parser.go b/src/query/api/v1/handler/graphite/find_parser.go index bc2ae371ab..2947c5cd7b 100644 --- a/src/query/api/v1/handler/graphite/find_parser.go +++ b/src/query/api/v1/handler/graphite/find_parser.go @@ -29,16 +29,41 @@ import ( "github.com/m3db/m3/src/query/errors" "github.com/m3db/m3/src/query/graphite/graphite" graphiteStorage "github.com/m3db/m3/src/query/graphite/storage" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/json" "github.com/m3db/m3/src/x/net/http" ) -func parseFindParamsToQuery(r *http.Request) ( - *storage.FetchQuery, - *xhttp.ParseError, +// parseFindParamsToQueries parses an incoming request to two find queries, +// which are then combined to give the final result. +// It returns, in order: +// the given query; this will return all values for exactly that tag which have +// _terminatedQuery, which adds an explicit terminator after the last term in +// no child nodes +// _childQuery, which adds an explicit match all after the last term in the +// given query; this will return all values for exactly that tag which have at +// least one child node. +// _rawQueryString, which is the initial query request (bar final +// matcher), which is used to reconstruct the return values. +// _err, any error encountered during parsing. +// +// As an example, given the query `a.b*`, and metrics `a.bar.c` and `a.biz`, +// terminatedQuery will return only [biz], and childQuery will return only +// [bar]. +func parseFindParamsToQueries(r *http.Request) ( + _terminatedQuery *storage.CompleteTagsQuery, + _childQuery *storage.CompleteTagsQuery, + _rawQueryString string, + _err *xhttp.ParseError, ) { values := r.URL.Query() + query := values.Get("query") + if query == "" { + return nil, nil, "", + xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest) + } + now := time.Now() fromString, untilString := r.FormValue("from"), r.FormValue("until") if len(fromString) == 0 { @@ -56,8 +81,9 @@ func parseFindParamsToQuery(r *http.Request) ( ) if err != nil { - return nil, xhttp.NewParseError(fmt.Errorf("invalid 'from': %s", fromString), - http.StatusBadRequest) + return nil, nil, "", + xhttp.NewParseError(fmt.Errorf("invalid 'from': %s", fromString), + http.StatusBadRequest) } until, err := graphite.ParseTime( @@ -67,28 +93,50 @@ func parseFindParamsToQuery(r *http.Request) ( ) if err != nil { - return nil, xhttp.NewParseError(fmt.Errorf("invalid 'until': %s", untilString), - http.StatusBadRequest) + return nil, nil, "", + xhttp.NewParseError(fmt.Errorf("invalid 'until': %s", untilString), + http.StatusBadRequest) } - query := values.Get("query") - if query == "" { - return nil, xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest) + matchers, err := graphiteStorage.TranslateQueryToMatchersWithTerminator(query) + if err != nil { + return nil, nil, "", + xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query), + http.StatusBadRequest) } - matchers, err := graphiteStorage.TranslateQueryToMatchers(query) - if err != nil { - return nil, xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query), + // NB: Filter will always be the second last term in the matchers, and the + // matchers should always have a length of at least 2 (term + terminator) + // so this is a sanity check and unexpected in actual execution. + if len(matchers) < 2 { + return nil, nil, "", xhttp.NewParseError(fmt.Errorf("unable to parse "+ + "'query': %s", query), http.StatusBadRequest) } - return &storage.FetchQuery{ - Raw: query, - TagMatchers: matchers, - Start: from, - End: until, - Interval: 0, - }, nil + filter := [][]byte{matchers[len(matchers)-2].Name} + terminatedQuery := &storage.CompleteTagsQuery{ + CompleteNameOnly: false, + FilterNameTags: filter, + TagMatchers: matchers, + Start: from, + End: until, + } + + clonedMatchers := make([]models.Matcher, len(matchers)) + copy(clonedMatchers, matchers) + // NB: change terminator from `MatchNotRegexp` to `MatchRegexp` to ensure + // segments with children are matched. + clonedMatchers[len(clonedMatchers)-1].Type = models.MatchRegexp + childQuery := &storage.CompleteTagsQuery{ + CompleteNameOnly: false, + FilterNameTags: filter, + TagMatchers: clonedMatchers, + Start: from, + End: until, + } + + return terminatedQuery, childQuery, query, nil } func findResultsJSON( diff --git a/src/query/api/v1/handler/graphite/find_test.go b/src/query/api/v1/handler/graphite/find_test.go new file mode 100644 index 0000000000..93d4a1902b --- /dev/null +++ b/src/query/api/v1/handler/graphite/find_test.go @@ -0,0 +1,245 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package graphite + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/url" + "sort" + "strings" + "testing" + "time" + + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/util/logging" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +// dates is a tuple of a date with a valid string representation +type date struct { + t time.Time + s string +} + +var ( + from = date{ + s: "14:38_20150618", + t: time.Date(2015, time.June, 18, 14, 38, 0, 0, time.UTC), + } + until = date{ + s: "1432581620", + t: time.Date(2015, time.May, 25, 19, 20, 20, 0, time.UTC), + } +) + +type completeTagQueryMatcher struct { + matchers []models.Matcher +} + +func (m *completeTagQueryMatcher) String() string { return "complete tag query" } +func (m *completeTagQueryMatcher) Matches(x interface{}) bool { + q, ok := x.(*storage.CompleteTagsQuery) + if !ok { + return false + } + + if !q.Start.Equal(from.t) { + fmt.Println("Not equal start", q.Start, from.t) + return false + } + + if !q.End.Equal(until.t) { + fmt.Println("Not equal end", q.End, until.t) + return false + } + + if q.CompleteNameOnly { + return false + } + + if len(q.FilterNameTags) != 1 { + return false + } + + // both queries should filter on __g1__ + if !bytes.Equal(q.FilterNameTags[0], []byte("__g1__")) { + return false + } + + if len(q.TagMatchers) != len(m.matchers) { + return false + } + + for i, qMatcher := range q.TagMatchers { + if !bytes.Equal(qMatcher.Name, m.matchers[i].Name) { + return false + } + if !bytes.Equal(qMatcher.Value, m.matchers[i].Value) { + return false + } + if qMatcher.Type != m.matchers[i].Type { + return false + } + } + + return true +} + +var _ gomock.Matcher = &completeTagQueryMatcher{} + +func b(s string) []byte { return []byte(s) } +func bs(ss ...string) [][]byte { + bb := make([][]byte, len(ss)) + for i, s := range ss { + bb[i] = b(s) + } + + return bb +} + +func setupStorage(ctrl *gomock.Controller) storage.Storage { + store := storage.NewMockStorage(ctrl) + // set up no children case + noChildrenMatcher := &completeTagQueryMatcher{ + matchers: []models.Matcher{ + {Type: models.MatchEqual, Name: b("__g0__"), Value: b("foo")}, + {Type: models.MatchRegexp, Name: b("__g1__"), Value: b(`b[^\.]*`)}, + {Type: models.MatchNotRegexp, Name: b("__g2__"), Value: b(".*")}, + }, + } + + noChildrenResult := &storage.CompleteTagsResult{ + CompleteNameOnly: false, + CompletedTags: []storage.CompletedTag{ + {Name: b("__g1__"), Values: bs("bug", "bar", "baz")}, + }, + } + + store.EXPECT().CompleteTags(gomock.Any(), noChildrenMatcher, gomock.Any()). + Return(noChildrenResult, nil) + + // set up children case + childrenMatcher := &completeTagQueryMatcher{ + matchers: []models.Matcher{ + {Type: models.MatchEqual, Name: b("__g0__"), Value: b("foo")}, + {Type: models.MatchRegexp, Name: b("__g1__"), Value: b(`b[^\.]*`)}, + {Type: models.MatchRegexp, Name: b("__g2__"), Value: b(".*")}, + }, + } + + childrenResult := &storage.CompleteTagsResult{ + CompleteNameOnly: false, + CompletedTags: []storage.CompletedTag{ + {Name: b("__g1__"), Values: bs("baz", "bix", "bug")}, + }, + } + + store.EXPECT().CompleteTags(gomock.Any(), childrenMatcher, gomock.Any()). + Return(childrenResult, nil) + + return store +} + +type writer struct { + results []string +} + +var _ http.ResponseWriter = &writer{} + +func (w *writer) WriteHeader(_ int) {} +func (w *writer) Header() http.Header { return make(http.Header) } +func (w *writer) Write(b []byte) (int, error) { + if w.results == nil { + w.results = make([]string, 0, 10) + } + + w.results = append(w.results, string(b)) + return len(b), nil +} + +type result struct { + ID string `json:"id"` + Text string `json:"text"` + Leaf int `json:"leaf"` + Expandable int `json:"expandable"` + AllowChildren int `json:"allowChildren"` +} + +type results []result + +func (r results) Len() int { return len(r) } +func (r results) Swap(i, j int) { r[i], r[j] = r[j], r[i] } +func (r results) Less(i, j int) bool { + return strings.Compare(r[i].ID, r[j].ID) == -1 +} + +func TestFind(t *testing.T) { + logging.InitWithCores(nil) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // setup storage and handler + store := setupStorage(ctrl) + handler := NewFindHandler(store) + + // execute the query + w := &writer{} + req := &http.Request{ + URL: &url.URL{ + RawQuery: fmt.Sprintf("query=foo.b*&from=%s&until=%s", from.s, until.s), + }, + } + + handler.ServeHTTP(w, req) + + // convert results to comparable format + require.Equal(t, 1, len(w.results)) + r := make(results, 0) + decoder := json.NewDecoder(bytes.NewBufferString((w.results[0]))) + require.NoError(t, decoder.Decode(&r)) + sort.Sort(r) + + makeNoChildrenResult := func(t string) result { + return result{ID: fmt.Sprintf("foo.%s", t), Text: t, Leaf: 1, + Expandable: 0, AllowChildren: 0} + } + + makeWithChildrenResult := func(t string) result { + return result{ID: fmt.Sprintf("foo.%s", t), Text: t, Leaf: 0, + Expandable: 1, AllowChildren: 1} + } + + expected := results{ + makeNoChildrenResult("bar"), + makeWithChildrenResult("baz"), + makeWithChildrenResult("bix"), + makeWithChildrenResult("bug"), + } + + require.Equal(t, expected, r) +} diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 4c0bcce7a4..696bf02cba 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -21,6 +21,7 @@ package prometheus import ( + "bytes" "fmt" "io" "io/ioutil" @@ -41,7 +42,7 @@ import ( ) const ( - // NameReplace is the parameter that gets replaced + // NameReplace is the parameter that gets replaced. NameReplace = "name" queryParam = "query" filterNameTagsParam = "tag" @@ -52,14 +53,15 @@ const ( var ( matchValues = []byte(".*") + roleName = []byte("role") ) -// TimeoutOpts stores options related to various timeout configurations +// TimeoutOpts stores options related to various timeout configurations. type TimeoutOpts struct { FetchTimeout time.Duration } -// ParsePromCompressedRequest parses a snappy compressed request from Prometheus +// ParsePromCompressedRequest parses a snappy compressed request from Prometheus. func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) { body := r.Body if r.Body == nil { @@ -85,7 +87,7 @@ func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) { return reqBuf, nil } -// ParseRequestTimeout parses the input request timeout with a default +// ParseRequestTimeout parses the input request timeout with a default. func ParseRequestTimeout(r *http.Request, configFetchTimeout time.Duration) (time.Duration, error) { timeout := r.Header.Get("timeout") if timeout == "" { @@ -104,11 +106,24 @@ func ParseRequestTimeout(r *http.Request, configFetchTimeout time.Duration) (tim return duration, nil } -// ParseTagCompletionParamsToQuery parses all params from the GET request +// ParseTagCompletionParamsToQuery parses all params from the GET request. func ParseTagCompletionParamsToQuery( r *http.Request, ) (*storage.CompleteTagsQuery, *xhttp.ParseError) { - tagQuery := storage.CompleteTagsQuery{} + start, err := parseTimeWithDefault(r, "start", time.Time{}) + if err != nil { + return nil, xhttp.NewParseError(err, http.StatusBadRequest) + } + + end, err := parseTimeWithDefault(r, "end", time.Now()) + if err != nil { + return nil, xhttp.NewParseError(err, http.StatusBadRequest) + } + + tagQuery := storage.CompleteTagsQuery{ + Start: start, + End: end, + } query, err := parseTagCompletionQuery(r) if err != nil { @@ -170,18 +185,18 @@ func parseTimeWithDefault( return defaultTime, nil } -// ParseSeriesMatchQuery parses all params from the GET request +// ParseSeriesMatchQuery parses all params from the GET request. func ParseSeriesMatchQuery( r *http.Request, tagOptions models.TagOptions, -) (*storage.SeriesMatchQuery, *xhttp.ParseError) { +) ([]*storage.FetchQuery, *xhttp.ParseError) { r.ParseForm() matcherValues := r.Form["match[]"] if len(matcherValues) == 0 { return nil, xhttp.NewParseError(errors.ErrInvalidMatchers, http.StatusBadRequest) } - start, err := parseTimeWithDefault(r, "start", time.Now().Add(time.Hour*24*-40)) + start, err := parseTimeWithDefault(r, "start", time.Time{}) if err != nil { return nil, xhttp.NewParseError(err, http.StatusBadRequest) } @@ -191,7 +206,7 @@ func ParseSeriesMatchQuery( return nil, xhttp.NewParseError(err, http.StatusBadRequest) } - tagMatchers := make([]models.Matchers, len(matcherValues)) + queries := make([]*storage.FetchQuery, len(matcherValues)) for i, s := range matcherValues { promMatchers, err := promql.ParseMetricSelector(s) if err != nil { @@ -203,17 +218,18 @@ func ParseSeriesMatchQuery( return nil, xhttp.NewParseError(err, http.StatusBadRequest) } - tagMatchers[i] = matchers + queries[i] = &storage.FetchQuery{ + Raw: fmt.Sprintf("match[]=%s", s), + TagMatchers: matchers, + Start: start, + End: end, + } } - return &storage.SeriesMatchQuery{ - TagMatchers: tagMatchers, - Start: start, - End: end, - }, nil + return queries, nil } -// ParseTagValuesToQuery parses a tag values request to a complete tags query +// ParseTagValuesToQuery parses a tag values request to a complete tags query. func ParseTagValuesToQuery( r *http.Request, ) (*storage.CompleteTagsQuery, error) { @@ -288,7 +304,7 @@ func renderDefaultTagCompletionResultsJSON( return jw.Close() } -// RenderTagCompletionResultsJSON renders tag completion results to json format +// RenderTagCompletionResultsJSON renders tag completion results to json format. func RenderTagCompletionResultsJSON( w io.Writer, result *storage.CompleteTagsResult, @@ -301,7 +317,7 @@ func RenderTagCompletionResultsJSON( return renderDefaultTagCompletionResultsJSON(w, results) } -// RenderTagValuesResultsJSON renders tag values results to json format +// RenderTagValuesResultsJSON renders tag values results to json format. func RenderTagValuesResultsJSON( w io.Writer, result *storage.CompleteTagsResult, @@ -346,56 +362,11 @@ func RenderTagValuesResultsJSON( return jw.Close() } -type tag struct { - name string - value string -} - -func writeTagsHelper( - jw *json.Writer, - completedTags []storage.CompletedTag, - tags []tag, -) { - if len(completedTags) == 0 { - jw.BeginObject() - for _, tag := range tags { - jw.BeginObjectField(tag.name) - jw.WriteString(tag.value) - } - - jw.EndObject() - return - } - - firstResult := completedTags[0] - name := string(firstResult.Name) - - copiedTags := make([]tag, len(tags)+1) - copy(copiedTags, tags) - for _, value := range firstResult.Values { - copiedTags[len(tags)] = tag{name: name, value: string(value)} - writeTagsHelper(jw, completedTags[1:], copiedTags) - } -} - -func writeTags( - jw *json.Writer, - results []*storage.CompleteTagsResult, -) { - for _, result := range results { - jw.BeginArray() - tags := result.CompletedTags - if len(tags) > 0 { - writeTagsHelper(jw, result.CompletedTags, nil) - } - jw.EndArray() - } -} - -// RenderSeriesMatchResultsJSON renders series match results to json format +// RenderSeriesMatchResultsJSON renders series match results to json format. func RenderSeriesMatchResultsJSON( w io.Writer, - results []*storage.CompleteTagsResult, + results []models.Metrics, + dropRole bool, ) error { jw := json.NewWriter(w) jw.BeginObject() @@ -404,13 +375,33 @@ func RenderSeriesMatchResultsJSON( jw.WriteString("success") jw.BeginObjectField("data") - writeTags(jw, results) + jw.BeginArray() + + for _, result := range results { + for _, tags := range result { + jw.BeginObject() + for _, tag := range tags.Tags.Tags { + if bytes.Equal(tag.Name, roleName) && dropRole { + // NB: When data is written from Prometheus remote write, additional + // `"role":"remote"` tag is added, which should not be included in the + // results. + continue + } + jw.BeginObjectField(string(tag.Name)) + jw.WriteString(string(tag.Value)) + } + + jw.EndObject() + } + } + + jw.EndArray() jw.EndObject() return jw.Close() } -// PromResp represents Prometheus's query response +// PromResp represents Prometheus's query response. type PromResp struct { Status string `json:"status"` Data struct { @@ -424,7 +415,7 @@ type PromResp struct { } `json:"data"` } -// PromDebug represents the input and output that are used in the debug endpoint +// PromDebug represents the input and output that are used in the debug endpoint. type PromDebug struct { Input PromResp `json:"input"` Results PromResp `json:"results"` diff --git a/src/query/api/v1/handler/prometheus/common_test.go b/src/query/api/v1/handler/prometheus/common_test.go index cd02b4d9ac..dc4ec68ff0 100644 --- a/src/query/api/v1/handler/prometheus/common_test.go +++ b/src/query/api/v1/handler/prometheus/common_test.go @@ -22,12 +22,13 @@ package prometheus import ( "bytes" + "fmt" "net/http" "strings" "testing" "time" - "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/test" "github.com/stretchr/testify/assert" @@ -87,84 +88,68 @@ func (w *writer) Write(p []byte) (n int, err error) { return len(p), nil } -func makeResult() []*storage.CompleteTagsResult { - return []*storage.CompleteTagsResult{ - &storage.CompleteTagsResult{ - CompletedTags: []storage.CompletedTag{ - storage.CompletedTag{ - Name: []byte("a"), - Values: [][]byte{[]byte("1"), []byte("2"), []byte("3")}, - }, - storage.CompletedTag{ - Name: []byte("b"), - Values: [][]byte{[]byte("1"), []byte("2")}, - }, - storage.CompletedTag{ - Name: []byte("c"), - Values: [][]byte{[]byte("1"), []byte("2"), []byte("3")}, - }, - }, - }, - } +type tag struct { + name, value string } -func TestRenderSeriesMatchResults(t *testing.T) { - w := &writer{value: ""} - seriesMatchResult := makeResult() - - expectedWhitespace := `{ - "status":"success", - "data":[ - {"a":"1","b":"1","c":"1"}, - {"a":"1","b":"1","c":"2"}, - {"a":"1","b":"1","c":"3"}, - {"a":"1","b":"2","c":"1"}, - {"a":"1","b":"2","c":"2"}, - {"a":"1","b":"2","c":"3"}, - {"a":"2","b":"1","c":"1"}, - {"a":"2","b":"1","c":"2"}, - {"a":"2","b":"1","c":"3"}, - {"a":"2","b":"2","c":"1"}, - {"a":"2","b":"2","c":"2"}, - {"a":"2","b":"2","c":"3"}, - {"a":"3","b":"1","c":"1"}, - {"a":"3","b":"1","c":"2"}, - {"a":"3","b":"1","c":"3"}, - {"a":"3","b":"2","c":"1"}, - {"a":"3","b":"2","c":"2"}, - {"a":"3","b":"2","c":"3"} - ] - }` - - err := RenderSeriesMatchResultsJSON(w, seriesMatchResult) - assert.NoError(t, err) - fields := strings.Fields(expectedWhitespace) - expected := "" - for _, field := range fields { - expected = expected + field +func toTags(name string, tags ...tag) models.Metric { + tagOpts := models.NewTagOptions() + ts := models.NewTags(len(tags), tagOpts) + ts = ts.SetName([]byte(name)) + for _, tag := range tags { + ts = ts.AddTag(models.Tag{Name: []byte(tag.name), Value: []byte(tag.value)}) } - assert.Equal(t, expected, w.value) + return models.Metric{Tags: ts} } func TestRenderSeriesMatchResultsNoTags(t *testing.T) { w := &writer{value: ""} - seriesMatchResult := []*storage.CompleteTagsResult{ - &storage.CompleteTagsResult{}, + tests := []struct { + dropRole bool + additional string + }{ + { + dropRole: true, + additional: "", + }, + { + dropRole: false, + additional: `,"role":"appears"`, + }, } - expectedWhitespace := `{ + seriesMatchResult := []models.Metrics{ + models.Metrics{ + toTags("name", tag{name: "a", value: "b"}, tag{name: "role", value: "appears"}), + toTags("name2", tag{name: "c", value: "d"}, tag{name: "e", value: "f"}), + }, + } + + for _, tt := range tests { + expectedWhitespace := fmt.Sprintf(`{ "status":"success", - "data":[] - }` + "data":[ + { + "__name__":"name", + "a":"b"%s + }, + { + "__name__":"name2", + "c":"d", + "e":"f" + } + ] + }`, tt.additional) - err := RenderSeriesMatchResultsJSON(w, seriesMatchResult) - assert.NoError(t, err) - fields := strings.Fields(expectedWhitespace) - expected := "" - for _, field := range fields { - expected = expected + field - } + err := RenderSeriesMatchResultsJSON(w, seriesMatchResult, tt.dropRole) + assert.NoError(t, err) + fields := strings.Fields(expectedWhitespace) + expected := "" + for _, field := range fields { + expected = expected + field + } - assert.Equal(t, expected, w.value) + assert.Equal(t, expected, w.value) + } } diff --git a/src/query/api/v1/handler/prometheus/remote/match.go b/src/query/api/v1/handler/prometheus/remote/match.go index a7954e6470..37fde44dda 100644 --- a/src/query/api/v1/handler/prometheus/remote/match.go +++ b/src/query/api/v1/handler/prometheus/remote/match.go @@ -65,7 +65,7 @@ func (h *PromSeriesMatchHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") - query, err := prometheus.ParseSeriesMatchQuery(r, h.tagOptions) + queries, err := prometheus.ParseSeriesMatchQuery(r, h.tagOptions) if err != nil { logger.Error("unable to parse series match values to query", zap.Error(err)) xhttp.Error(w, err, http.StatusBadRequest) @@ -73,29 +73,22 @@ func (h *PromSeriesMatchHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques } opts := storage.NewFetchOptions() - matchers := query.TagMatchers - results := make([]*storage.CompleteTagsResult, len(matchers)) - // TODO: parallel execution - for i, matcher := range matchers { - completeTagsQuery := &storage.CompleteTagsQuery{ - CompleteNameOnly: false, - TagMatchers: matcher, - } - - result, err := h.storage.CompleteTags(ctx, completeTagsQuery, opts) + results := make([]models.Metrics, len(queries)) + for i, query := range queries { + result, err := h.storage.SearchSeries(ctx, query, opts) if err != nil { logger.Error("unable to get matched series", zap.Error(err)) xhttp.Error(w, err, http.StatusBadRequest) return } - results[i] = result + results[i] = result.Metrics } // TODO: Support multiple result types - if renderErr := prometheus.RenderSeriesMatchResultsJSON(w, results); renderErr != nil { - logger.Error("unable to write matched series", zap.Error(renderErr)) - xhttp.Error(w, renderErr, http.StatusBadRequest) + if err := prometheus.RenderSeriesMatchResultsJSON(w, results, false); err != nil { + logger.Error("unable to write matched series", zap.Error(err)) + xhttp.Error(w, err, http.StatusBadRequest) return } } diff --git a/src/query/api/v1/handler/prometheus/remote/read.go b/src/query/api/v1/handler/prometheus/remote/read.go index 6f073f0179..d1e2a90d7a 100644 --- a/src/query/api/v1/handler/prometheus/remote/read.go +++ b/src/query/api/v1/handler/prometheus/remote/read.go @@ -80,6 +80,7 @@ func newPromReadMetrics(scope tally.Scope) promReadMetrics { func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) + logger := logging.WithContext(ctx) req, rErr := h.parseRequest(r) diff --git a/src/query/api/v1/handler/search.go b/src/query/api/v1/handler/search.go index bf4fee0278..4229970821 100644 --- a/src/query/api/v1/handler/search.go +++ b/src/query/api/v1/handler/search.go @@ -110,8 +110,12 @@ func (h *SearchHandler) parseURLParams(r *http.Request) *storage.FetchOptions { return &fetchOptions } -func (h *SearchHandler) search(ctx context.Context, query *storage.FetchQuery, opts *storage.FetchOptions) (*storage.SearchResults, error) { - return h.store.FetchTags(ctx, query, opts) +func (h *SearchHandler) search( + ctx context.Context, + query *storage.FetchQuery, + opts *storage.FetchOptions, +) (*storage.SearchResults, error) { + return h.store.SearchSeries(ctx, query, opts) } func newFetchOptions(limit int) storage.FetchOptions { diff --git a/src/query/generated/mocks/generate.go b/src/query/generated/mocks/generate.go index 337f080c57..4aca80e535 100644 --- a/src/query/generated/mocks/generate.go +++ b/src/query/generated/mocks/generate.go @@ -20,6 +20,7 @@ // mockgen rules for generating mocks for exported interfaces (reflection mode). //go:generate sh -c "mockgen -package=downsample $PACKAGE/src/cmd/services/m3coordinator/downsample Downsampler,MetricsAppender,SamplesAppender | genclean -pkg $PACKAGE/src/cmd/services/m3coordinator/downsample -out $GOPATH/src/$PACKAGE/src/cmd/services/m3coordinator/downsample/downsample_mock.go" +//go:generate sh -c "mockgen -package=storage -destination=$GOPATH/src/$PACKAGE/src/query/storage/storage_mock.go $PACKAGE/src/query/storage Storage" //go:generate sh -c "mockgen -package=block -destination=$GOPATH/src/$PACKAGE/src/query/block/block_mock.go $PACKAGE/src/query/block Block,StepIter,SeriesIter,Builder,Step" //go:generate sh -c "mockgen -package=ingest -destination=$GOPATH/src/$PACKAGE/src/cmd/services/m3coordinator/ingest/write_mock.go $PACKAGE/src/cmd/services/m3coordinator/ingest DownsamplerAndWriter" //go:generate sh -c "mockgen -package=transform -destination=$GOPATH/src/$PACKAGE/src/query/executor/transform/types_mock.go $PACKAGE/src/query/executor/transform OpNode" diff --git a/src/query/graphite/graphite/glob.go b/src/query/graphite/graphite/glob.go index 00853e1a69..20e7c0d264 100644 --- a/src/query/graphite/graphite/glob.go +++ b/src/query/graphite/graphite/glob.go @@ -36,19 +36,15 @@ const ( "$-_'|<>%#/" ) -var ( - // ErrNotPattern signifies that the provided string is not a glob pattern - ErrNotPattern = errors.NewInvalidParamsError(fmt.Errorf("not a pattern")) -) - -// GlobToRegexPattern converts a graphite-style glob into a regex pattern -func GlobToRegexPattern(glob string) (string, error) { +// GlobToRegexPattern converts a graphite-style glob into a regex pattern, with +// a boolean indicating if the glob is regexed or not +func GlobToRegexPattern(glob string) ([]byte, bool, error) { return globToRegexPattern(glob, GlobOptions{}) } // ExtendedGlobToRegexPattern converts a graphite-style glob into a regex pattern // with extended options -func ExtendedGlobToRegexPattern(glob string, opts GlobOptions) (string, error) { +func ExtendedGlobToRegexPattern(glob string, opts GlobOptions) ([]byte, bool, error) { return globToRegexPattern(glob, opts) } @@ -92,7 +88,7 @@ func (p *pattern) UnwriteLast() { p.lastWriteLen = 0 } -func globToRegexPattern(glob string, opts GlobOptions) (string, error) { +func globToRegexPattern(glob string, opts GlobOptions) ([]byte, bool, error) { var ( pattern pattern escaping = false @@ -141,7 +137,7 @@ func globToRegexPattern(glob string, opts GlobOptions) (string, error) { // End non-capturing group priorGroupStart := groupStartStack[len(groupStartStack)-1] if priorGroupStart != '{' { - return "", errors.NewInvalidParamsError(fmt.Errorf("invalid '}' at %d, no prior for '{' in %s", i, glob)) + return nil, false, errors.NewInvalidParamsError(fmt.Errorf("invalid '}' at %d, no prior for '{' in %s", i, glob)) } pattern.WriteRune(')') @@ -155,7 +151,7 @@ func globToRegexPattern(glob string, opts GlobOptions) (string, error) { // End character range priorGroupStart := groupStartStack[len(groupStartStack)-1] if priorGroupStart != '[' { - return "", errors.NewInvalidParamsError(fmt.Errorf("invalid ']' at %d, no prior for '[' in %s", i, glob)) + return nil, false, errors.NewInvalidParamsError(fmt.Errorf("invalid ']' at %d, no prior for '[' in %s", i, glob)) } pattern.WriteRune(']') @@ -168,21 +164,19 @@ func globToRegexPattern(glob string, opts GlobOptions) (string, error) { if groupStartStack[len(groupStartStack)-1] == '{' { pattern.WriteRune('|') } else { - return "", errors.NewInvalidParamsError(fmt.Errorf("invalid ',' outside of matching group at pos %d in %s", i, glob)) + return nil, false, errors.NewInvalidParamsError(fmt.Errorf("invalid ',' outside of matching group at pos %d in %s", i, glob)) } default: if !strings.ContainsRune(ValidIdentifierRunes, r) { - return "", errors.NewInvalidParamsError(fmt.Errorf("invalid character %c at pos %d in %s", r, i, glob)) + return nil, false, errors.NewInvalidParamsError(fmt.Errorf("invalid character %c at pos %d in %s", r, i, glob)) } pattern.WriteRune(r) } } if len(groupStartStack) > 1 { - return "", errors.NewInvalidParamsError(fmt.Errorf("unbalanced '%c' in %s", groupStartStack[len(groupStartStack)-1], glob)) - } else if !regexed { - return "", ErrNotPattern + return nil, false, errors.NewInvalidParamsError(fmt.Errorf("unbalanced '%c' in %s", groupStartStack[len(groupStartStack)-1], glob)) } - return pattern.String(), nil + return pattern.buff.Bytes(), regexed, nil } diff --git a/src/query/graphite/graphite/glob_test.go b/src/query/graphite/graphite/glob_test.go index 195bf05555..2059cac59b 100644 --- a/src/query/graphite/graphite/glob_test.go +++ b/src/query/graphite/graphite/glob_test.go @@ -31,18 +31,41 @@ import ( func TestGlobToRegexPattern(t *testing.T) { tests := []struct { - glob string - regex string + glob string + isRegex bool + regex []byte }{ - {"foo\\+bar.'baz<1001>'.qux", "foo\\+bar\\.+\\'baz\\<1001\\>\\'\\.+qux"}, - {"foo.host.me{1,2,3}.*", "foo\\.+host\\.+me(1|2|3)\\.+[^\\.]*"}, - {"bar.zed.whatever[0-9].*.*.bar", "bar\\.+zed\\.+whatever[0-9]\\.+[^\\.]*\\.+[^\\.]*\\.+bar"}, - {"foo{0[3-9],1[0-9],20}", "foo(0[3-9]|1[0-9]|20)"}, + { + glob: "barbaz", + isRegex: false, + regex: []byte("barbaz"), + }, + { + glob: "foo\\+bar.'baz<1001>'.qux", + isRegex: true, + regex: []byte("foo\\+bar\\.+\\'baz\\<1001\\>\\'\\.+qux"), + }, + { + glob: "foo.host.me{1,2,3}.*", + isRegex: true, + regex: []byte("foo\\.+host\\.+me(1|2|3)\\.+[^\\.]*"), + }, + { + glob: "bar.zed.whatever[0-9].*.*.bar", + isRegex: true, + regex: []byte("bar\\.+zed\\.+whatever[0-9]\\.+[^\\.]*\\.+[^\\.]*\\.+bar"), + }, + { + glob: "foo{0[3-9],1[0-9],20}", + isRegex: true, + regex: []byte("foo(0[3-9]|1[0-9]|20)"), + }, } for _, test := range tests { - pattern, err := GlobToRegexPattern(test.glob) + pattern, isRegex, err := GlobToRegexPattern(test.glob) require.NoError(t, err) + assert.Equal(t, test.isRegex, isRegex) assert.Equal(t, test.regex, pattern, "bad pattern for %s", test.glob) } } @@ -59,7 +82,7 @@ func TestGlobToRegexPatternErrors(t *testing.T) { } for _, test := range tests { - _, err := GlobToRegexPattern(test.glob) + _, _, err := GlobToRegexPattern(test.glob) require.Error(t, err) assert.Equal(t, test.err, err.Error(), "invalid error for %s", test.glob) } @@ -98,7 +121,7 @@ func TestCompileGlob(t *testing.T) { } for _, test := range tests { - rePattern, err := GlobToRegexPattern(test.glob) + rePattern, _, err := GlobToRegexPattern(test.glob) require.NoError(t, err) re := regexp.MustCompile(fmt.Sprintf("^%s$", rePattern)) for _, s := range test.toMatch { diff --git a/src/query/graphite/storage/converter.go b/src/query/graphite/storage/converter.go index 01d4624f11..19fc9cf093 100644 --- a/src/query/graphite/storage/converter.go +++ b/src/query/graphite/storage/converter.go @@ -34,35 +34,27 @@ var ( wildcard = []byte(".*") ) -func glob(metric string) []byte { - globLen := len(metric) - for _, c := range metric { - if c == carbonGlobRune { - globLen++ - } +func convertMetricPartToMatcher( + count int, + metric string, +) (models.Matcher, error) { + var matchType models.MatchType + value, isRegex, err := graphite.GlobToRegexPattern(metric) + if err != nil { + return models.Matcher{}, err } - glob := make([]byte, globLen) - i := 0 - for _, c := range metric { - if c == carbonGlobRune { - glob[i] = carbonSeparatorByte - i++ - } - - glob[i] = byte(c) - i++ + if isRegex { + matchType = models.MatchRegexp + } else { + matchType = models.MatchEqual } - return glob -} - -func convertMetricPartToMatcher(count int, metric string) models.Matcher { return models.Matcher{ - Type: models.MatchRegexp, + Type: matchType, Name: graphite.TagName(count), - Value: glob(metric), - } + Value: value, + }, nil } func matcherTerminator(count int) models.Matcher { diff --git a/src/query/graphite/storage/converter_test.go b/src/query/graphite/storage/converter_test.go index c5f5fb2ae4..7bcffac6c7 100644 --- a/src/query/graphite/storage/converter_test.go +++ b/src/query/graphite/storage/converter_test.go @@ -30,37 +30,34 @@ import ( "github.com/stretchr/testify/require" ) -func TestGlob(t *testing.T) { - noGlob := "some_sort of-string,with~no=globbing" - expected := []byte(noGlob) - actual := glob(noGlob) - require.Equal(t, actual, expected) - - globbed := "foo*bar" - expected = []byte("foo.*bar") - actual = glob(globbed) - require.Equal(t, actual, expected) - - globAndRegex := "foo*bar[rz]*(qux|quail)" - expected = []byte("foo.*bar[rz].*(qux|quail)") - actual = glob(globAndRegex) - require.Equal(t, actual, expected) -} - func TestConvertMetricPartToMatcher(t *testing.T) { for i := 0; i < 100; i++ { - globAndRegex := "foo*bar[rz]*(qux|quail)" + globAndRegex := "foo*bar[rz]*{qux|quail}" expected := models.Matcher{ Type: models.MatchRegexp, Name: graphite.TagName(i), - Value: []byte("foo.*bar[rz].*(qux|quail)"), + Value: []byte(`foo[^\.]*bar[rz][^\.]*(qux|quail)`), } - actual := convertMetricPartToMatcher(i, globAndRegex) + actual, err := convertMetricPartToMatcher(i, globAndRegex) + require.NoError(t, err) assert.Equal(t, expected, actual) } } +func TestConvertAlphanumericMetricPartToMatcher(t *testing.T) { + metric := "abcdefg" + expected := models.Matcher{ + Type: models.MatchEqual, + Name: graphite.TagName(0), + Value: []byte("abcdefg"), + } + + actual, err := convertMetricPartToMatcher(0, metric) + require.NoError(t, err) + assert.Equal(t, expected, actual) +} + func TestMatcherTerminator(t *testing.T) { for i := 0; i < 100; i++ { expected := models.Matcher{ diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index 3c437f9190..82a250eb9e 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -60,42 +60,35 @@ func NewM3WrappedStorage( return &m3WrappedStore{m3: m3storage, enforcer: enforcer} } -// translates a graphite query to tag matcher pairs. -func translateQueryToMatchers( +// TranslateQueryToMatchersWithTerminator converts a graphite query to tag +// matcher pairs, and adds a terminator matcher to the end. +func TranslateQueryToMatchersWithTerminator( query string, - withTerminator bool, ) (models.Matchers, error) { metricLength := graphite.CountMetricParts(query) - matchersLength := metricLength - if withTerminator { - // Add space for a terminator character. - matchersLength++ - } - + // Add space for a terminator character. + matchersLength := metricLength + 1 matchers := make(models.Matchers, matchersLength) for i := 0; i < metricLength; i++ { metric := graphite.ExtractNthMetricPart(query, i) if len(metric) > 0 { - matchers[i] = convertMetricPartToMatcher(i, metric) + m, err := convertMetricPartToMatcher(i, metric) + if err != nil { + return nil, err + } + + matchers[i] = m } else { return nil, fmt.Errorf("invalid matcher format: %s", query) } } - if withTerminator { - // Add a terminator matcher at the end to ensure expansion is terminated at - // the last given metric part. - matchers[metricLength] = matcherTerminator(metricLength) - } - + // Add a terminator matcher at the end to ensure expansion is terminated at + // the last given metric part. + matchers[metricLength] = matcherTerminator(metricLength) return matchers, nil } -// TranslateQueryToMatchers converts a graphite query to tag matcher pairs. -func TranslateQueryToMatchers(query string) (models.Matchers, error) { - return translateQueryToMatchers(query, false) -} - // GetQueryTerminatorTagName will return the name for the terminator matcher in // the given pattern. This is useful for filtering out any additional results. func GetQueryTerminatorTagName(query string) []byte { @@ -104,7 +97,7 @@ func GetQueryTerminatorTagName(query string) []byte { } func translateQuery(query string, opts FetchOptions) (*storage.FetchQuery, error) { - matchers, err := translateQueryToMatchers(query, true) + matchers, err := TranslateQueryToMatchersWithTerminator(query) if err != nil { return nil, err } diff --git a/src/query/graphite/storage/m3_wrapper_test.go b/src/query/graphite/storage/m3_wrapper_test.go index f6e523d01d..58bbd6368f 100644 --- a/src/query/graphite/storage/m3_wrapper_test.go +++ b/src/query/graphite/storage/m3_wrapper_test.go @@ -60,14 +60,14 @@ func TestTranslateQuery(t *testing.T) { assert.Equal(t, query, translated.Raw) matchers := translated.TagMatchers expected := models.Matchers{ - {Type: models.MatchRegexp, Name: graphite.TagName(0), Value: []byte("foo")}, + {Type: models.MatchEqual, Name: graphite.TagName(0), Value: []byte("foo")}, {Type: models.MatchRegexp, Name: graphite.TagName(1), Value: []byte("ba[rz]")}, - {Type: models.MatchRegexp, Name: graphite.TagName(2), Value: []byte("q.*x")}, - {Type: models.MatchRegexp, Name: graphite.TagName(3), Value: []byte("terminator")}, - {Type: models.MatchRegexp, Name: graphite.TagName(4), Value: []byte("will")}, - {Type: models.MatchRegexp, Name: graphite.TagName(5), Value: []byte("be")}, - {Type: models.MatchRegexp, Name: graphite.TagName(6), Value: []byte(".*")}, - {Type: models.MatchRegexp, Name: graphite.TagName(7), Value: []byte("back?")}, + {Type: models.MatchRegexp, Name: graphite.TagName(2), Value: []byte(`q[^\.]*x`)}, + {Type: models.MatchEqual, Name: graphite.TagName(3), Value: []byte("terminator")}, + {Type: models.MatchEqual, Name: graphite.TagName(4), Value: []byte("will")}, + {Type: models.MatchEqual, Name: graphite.TagName(5), Value: []byte("be")}, + {Type: models.MatchRegexp, Name: graphite.TagName(6), Value: []byte(`[^\.]*`)}, + {Type: models.MatchRegexp, Name: graphite.TagName(7), Value: []byte(`back[^\.]`)}, {Type: models.MatchNotRegexp, Name: graphite.TagName(8), Value: []byte(".*")}, } @@ -90,7 +90,7 @@ func TestTranslateQueryTrailingDot(t *testing.T) { assert.Nil(t, translated) assert.Error(t, err) - matchers, err := TranslateQueryToMatchers(query) + matchers, err := TranslateQueryToMatchersWithTerminator(query) assert.Nil(t, matchers) assert.Error(t, err) } diff --git a/src/query/models/strconv/checker.go b/src/query/models/strconv/checker.go index f5bea9e978..a013f4d01d 100644 --- a/src/query/models/strconv/checker.go +++ b/src/query/models/strconv/checker.go @@ -64,3 +64,45 @@ func NeedToEscape(bb []byte) bool { return false } + +// Determines valid alphanumeric characters. +var alphaNumeric = [256]bool{ + // 1 2 3 4 5 6 7 8 9 A B C D E F + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0x00-0x0F + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0x10-0x1F + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0x20-0x2F + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, ff, ff, ff, ff, ff, ff, // 0x30-0x3F + ff, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0x40-0x4F + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, ff, ff, ff, ff, ff, // 0x50-0x5F + ff, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, // 0x60-0x6F + tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, tt, ff, ff, ff, ff, ff, // 0x70-0x7F + // 1 2 3 4 5 6 7 8 9 A B C D E F + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0x80-0x8F + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0x90-0x9F + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0xA0-0xAF + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0xB0-0xBF + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0xC0-0xCF + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0xD0-0xDF + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0xE0-0xEF + ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, ff, // 0xF0-0xFF +} + +// IsAlphaNumeric returns true if the given string is alpha numeric. +// +// NB: here this means that it contains only characters in [0-9A-Za-z]) +func IsAlphaNumeric(str string) bool { + for _, c := range str { + if !alphaNumeric[c] { + return false + } + } + + return true +} + +// IsRuneAlphaNumeric returns true if the given rune is alpha numeric. +// +// NB: here this means that it contains only characters in [0-9A-Za-z]) +func IsRuneAlphaNumeric(r rune) bool { + return alphaNumeric[r] +} diff --git a/src/query/models/strconv/checker_test.go b/src/query/models/strconv/checker_test.go index cbd08f5823..7966933880 100644 --- a/src/query/models/strconv/checker_test.go +++ b/src/query/models/strconv/checker_test.go @@ -63,3 +63,35 @@ func TestSliceWithControlCharactersNeedsToEscape(t *testing.T) { unescaped = append(unescaped, highByte) assert.True(t, NeedToEscape(unescaped)) } + +func TestIsAlphaNumeric(t *testing.T) { + alphaStr := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + assert.True(t, IsAlphaNumeric(alphaStr)) + + // Ensure each rune is alpha numeric + for _, r := range alphaStr { + assert.True(t, IsRuneAlphaNumeric(r)) + } +} + +func TestEachNonAlphaNumeric(t *testing.T) { + // NB: generate every character then remove any alphanumeric + charMap := make(map[int]string, 256) + for i := 0; i < 256; i++ { + charMap[i] = string(byte(i)) + } + + alphaStr := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + for _, c := range alphaStr { + delete(charMap, int(c)) + } + + for _, invalid := range charMap { + assert.False(t, IsAlphaNumeric(invalid)) + + // Ensure each rune is alpha numeric + for _, r := range invalid { + assert.False(t, IsRuneAlphaNumeric(r)) + } + } +} diff --git a/src/query/storage/completed_tags.go b/src/query/storage/completed_tags.go index ab2630e257..3c9e85bf1b 100644 --- a/src/query/storage/completed_tags.go +++ b/src/query/storage/completed_tags.go @@ -24,9 +24,11 @@ import ( "bytes" "errors" "sort" + "sync" ) type completeTagsResultBuilder struct { + sync.RWMutex nameOnly bool tagBuilders map[string]completedTagBuilder } @@ -41,6 +43,9 @@ func NewCompleteTagsResultBuilder( } func (b *completeTagsResultBuilder) Add(tagResult *CompleteTagsResult) error { + b.Lock() + defer b.Unlock() + nameOnly := b.nameOnly if nameOnly != tagResult.CompleteNameOnly { return errors.New("incoming tag result has mismatched type") @@ -81,6 +86,9 @@ func (s completedTagsByName) Less(i, j int) bool { } func (b *completeTagsResultBuilder) Build() CompleteTagsResult { + b.RLock() + defer b.RUnlock() + result := make([]CompletedTag, 0, len(b.tagBuilders)) if b.nameOnly { for name := range b.tagBuilders { diff --git a/src/query/storage/fanout/storage.go b/src/query/storage/fanout/storage.go index 644d663946..ce8b8a9937 100644 --- a/src/query/storage/fanout/storage.go +++ b/src/query/storage/fanout/storage.go @@ -118,7 +118,7 @@ func handleFetchResponses(requests []execution.Request) (*storage.FetchResult, e return result, nil } -func (s *fanoutStorage) FetchTags( +func (s *fanoutStorage) SearchSeries( ctx context.Context, query *storage.FetchQuery, options *storage.FetchOptions, @@ -127,7 +127,7 @@ func (s *fanoutStorage) FetchTags( stores := filterStores(s.stores, s.fetchFilter, query) for _, store := range stores { - results, err := store.FetchTags(ctx, query, options) + results, err := store.SearchSeries(ctx, query, options) if err != nil { return nil, err } diff --git a/src/query/storage/fanout/storage_test.go b/src/query/storage/fanout/storage_test.go index 1840436b57..83243992ec 100644 --- a/src/query/storage/fanout/storage_test.go +++ b/src/query/storage/fanout/storage_test.go @@ -116,6 +116,8 @@ func setupFanoutWrite(t *testing.T, output bool, errs ...error) storage.Storage Return(nil, nil).AnyTimes() session1.EXPECT().FetchTaggedIDs(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, true, errs[0]).AnyTimes() + session1.EXPECT().Aggregate(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, true, errs[0]).AnyTimes() session2.EXPECT(). WriteTagged(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). @@ -161,7 +163,7 @@ func TestFanoutReadSuccess(t *testing.T) { func TestFanoutSearchEmpty(t *testing.T) { store := setupFanoutRead(t, false) - res, err := store.FetchTags(context.TODO(), nil, nil) + res, err := store.SearchSeries(context.TODO(), nil, nil) assert.NoError(t, err, "No error") require.NotNil(t, res, "Non empty result") assert.Len(t, res.Metrics, 0, "No series") @@ -170,7 +172,7 @@ func TestFanoutSearchEmpty(t *testing.T) { func TestFanoutSearchError(t *testing.T) { store := setupFanoutRead(t, true) opts := storage.NewFetchOptions() - _, err := store.FetchTags(context.TODO(), &storage.FetchQuery{}, opts) + _, err := store.SearchSeries(context.TODO(), &storage.FetchQuery{}, opts) assert.Error(t, err) } @@ -202,7 +204,7 @@ func TestFanoutWriteSuccess(t *testing.T) { assert.NoError(t, err) } -func TestCompleteTagsFailure(t *testing.T) { +func TestCompleteTagsError(t *testing.T) { store := setupFanoutWrite(t, true, fmt.Errorf("err")) datapoints := make(ts.Datapoints, 1) datapoints[0] = ts.Datapoint{Timestamp: time.Now(), Value: 1} diff --git a/src/query/storage/index.go b/src/query/storage/index.go index 22dcae15ba..83ace4df3b 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -30,14 +30,14 @@ import ( "github.com/m3db/m3x/ident" ) -// QueryConversionCache represents the query conversion LRU cache +// QueryConversionCache represents the query conversion LRU cache. type QueryConversionCache struct { - mu sync.Mutex + sync.RWMutex lru *QueryConversionLRU } -// NewQueryConversionCache creates a new QueryConversionCache with a provided LRU cache +// NewQueryConversionCache creates a new QueryConversionCache with a provided LRU cache. func NewQueryConversionCache(lru *QueryConversionLRU) *QueryConversionCache { return &QueryConversionCache{ lru: lru, @@ -52,7 +52,7 @@ func (q *QueryConversionCache) get(k []byte) (idx.Query, bool) { return q.lru.Get(k) } -// FromM3IdentToMetric converts an M3 ident metric to a coordinator metric +// FromM3IdentToMetric converts an M3 ident metric to a coordinator metric. func FromM3IdentToMetric( identID ident.ID, iterTags ident.TagIterator, @@ -69,7 +69,7 @@ func FromM3IdentToMetric( }, nil } -// FromIdentTagIteratorToTags converts ident tags to coordinator tags +// FromIdentTagIteratorToTags converts ident tags to coordinator tags. func FromIdentTagIteratorToTags( identTags ident.TagIterator, tagOptions models.TagOptions, @@ -90,9 +90,9 @@ func FromIdentTagIteratorToTags( return tags, nil } -// TagsToIdentTagIterator converts coordinator tags to ident tags +// TagsToIdentTagIterator converts coordinator tags to ident tags. func TagsToIdentTagIterator(tags models.Tags) ident.TagIterator { - //TODO get a tags and tag iterator from an ident.Pool here rather than allocing them here + // TODO: get a tags and tag iterator from an ident.Pool here rather than allocing them here identTags := make([]ident.Tag, 0, tags.Len()) for _, t := range tags.Tags { identTags = append(identTags, ident.Tag{ @@ -104,7 +104,7 @@ func TagsToIdentTagIterator(tags models.Tags) ident.TagIterator { return ident.NewTagsIterator(ident.NewTags(identTags...)) } -// FetchOptionsToM3Options converts a set of coordinator options to M3 options +// FetchOptionsToM3Options converts a set of coordinator options to M3 options. func FetchOptionsToM3Options(fetchOptions *FetchOptions, fetchQuery *FetchQuery) index.QueryOptions { return index.QueryOptions{ Limit: fetchOptions.Limit, @@ -113,6 +113,31 @@ func FetchOptionsToM3Options(fetchOptions *FetchOptions, fetchQuery *FetchQuery) } } +func convertAggregateQueryType(completeNameOnly bool) index.AggregationType { + if completeNameOnly { + return index.AggregateTagNames + } + + return index.AggregateTagNamesAndValues +} + +// FetchOptionsToAggregateOptions converts a set of coordinator options as well +// as complete tags query to an M3 aggregate query option. +func FetchOptionsToAggregateOptions( + fetchOptions *FetchOptions, + tagQuery *CompleteTagsQuery, +) index.AggregationOptions { + return index.AggregationOptions{ + QueryOptions: index.QueryOptions{ + Limit: fetchOptions.Limit, + StartInclusive: tagQuery.Start, + EndExclusive: tagQuery.End, + }, + TermFilter: tagQuery.FilterNameTags, + Type: convertAggregateQueryType(tagQuery.CompleteNameOnly), + } +} + var ( // byte representation for [1,2,3,4] lookup = [4]byte{49, 50, 51, 52} @@ -136,15 +161,24 @@ func queryKey(m models.Matchers) []byte { return key } -// FetchQueryToM3Query converts an m3coordinator fetch query to an M3 query -func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConversionCache) (index.Query, error) { +// FetchQueryToM3Query converts an m3coordinator fetch query to an M3 query. +func FetchQueryToM3Query( + fetchQuery *FetchQuery, + cache *QueryConversionCache, +) (index.Query, error) { matchers := fetchQuery.TagMatchers - k := queryKey(matchers) - - cache.mu.Lock() - defer cache.mu.Unlock() + // If no matchers provided, explicitly set this to an AllQuery + if len(matchers) == 0 { + return index.Query{ + Query: idx.NewAllQuery(), + }, nil + } - if val, ok := cache.get(k); ok { + k := queryKey(matchers) + cache.RLock() + val, ok := cache.get(k) + cache.RUnlock() + if ok { return index.Query{Query: val}, nil } @@ -155,7 +189,9 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConversionCache) (i return index.Query{}, err } + cache.Lock() cache.set(k, q) + cache.Unlock() return index.Query{Query: q}, nil } @@ -169,7 +205,10 @@ func FetchQueryToM3Query(fetchQuery *FetchQuery, cache *QueryConversionCache) (i } q := idx.NewConjunctionQuery(idxQueries...) + cache.Lock() cache.set(k, q) + cache.Unlock() + return index.Query{Query: q}, nil } diff --git a/src/query/storage/index_test.go b/src/query/storage/index_test.go index 71541e05d5..fb42d1bf37 100644 --- a/src/query/storage/index_test.go +++ b/src/query/storage/index_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3x/ident" @@ -130,6 +131,11 @@ func TestFetchQueryToM3Query(t *testing.T) { }, }, }, + { + name: "all matchers", + expected: "all()", + matchers: models.Matchers{}, + }, } lru, err := NewQueryConversionLRU(10) @@ -153,10 +159,12 @@ func TestFetchQueryToM3Query(t *testing.T) { require.NoError(t, err) assert.Equal(t, test.expected, m3Query.String()) - k := queryKey(test.matchers) - q, ok := cache.get(k) - require.True(t, ok) - assert.Equal(t, test.expected, q.String()) + if len(test.matchers) > 0 { + k := queryKey(test.matchers) + q, ok := cache.get(k) + require.True(t, ok) + assert.Equal(t, test.expected, q.String()) + } }) } } @@ -225,3 +233,32 @@ func TestQueryKey(t *testing.T) { }) } } + +func TestFetchOptionsToAggregateOptions(t *testing.T) { + fetchOptions := &FetchOptions{ + Limit: 7, + } + + end := time.Now() + start := end.Add(-1 * time.Hour) + filter := [][]byte{[]byte("filter")} + matchers := models.Matchers{ + models.Matcher{Type: models.MatchNotRegexp, + Name: []byte("foo"), Value: []byte("bar")}, + } + + tagQuery := &CompleteTagsQuery{ + Start: start, + End: end, + TagMatchers: matchers, + FilterNameTags: filter, + CompleteNameOnly: true, + } + + aggOpts := FetchOptionsToAggregateOptions(fetchOptions, tagQuery) + assert.Equal(t, end, aggOpts.EndExclusive) + assert.Equal(t, start, aggOpts.StartInclusive) + assert.Equal(t, index.AggregateTagNames, aggOpts.Type) + require.Equal(t, 1, len(aggOpts.TermFilter)) + require.Equal(t, "filter", string(aggOpts.TermFilter[0])) +} diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index cb4ca646e7..f373b96665 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -21,13 +21,13 @@ package m3 import ( - "bytes" "context" goerrors "errors" "fmt" "sync" "time" + "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/cost" @@ -291,7 +291,7 @@ func (s *m3storage) fetchCompressed( return result, err } -func (s *m3storage) FetchTags( +func (s *m3storage) SearchSeries( ctx context.Context, query *storage.FetchQuery, options *storage.FetchOptions, @@ -329,58 +329,94 @@ func (s *m3storage) CompleteTags( default: } - // TODO: instead of aggregating locally, have the DB aggregate it before - // sending results back. fetchQuery := &storage.FetchQuery{ TagMatchers: query.TagMatchers, - // NB: complete tags matches every tag from the start of time until now - Start: time.Time{}, - End: time.Now(), } - results, cleanup, err := s.SearchCompressed(ctx, fetchQuery, options) - defer cleanup() + m3query, err := storage.FetchQueryToM3Query(fetchQuery, s.conversionCache) if err != nil { return nil, err } - accumulatedTags := storage.NewCompleteTagsResultBuilder(query.CompleteNameOnly) - // only filter if there are tags to filter on. - filtering := len(query.FilterNameTags) > 0 - for _, elem := range results { - it := elem.Iter - tags := make([]storage.CompletedTag, 0, it.Len()) - for i := 0; it.Next(); i++ { - tag := it.Current() - name := tag.Name.Bytes() - if filtering { - found := false - for _, filterName := range query.FilterNameTags { - if bytes.Equal(filterName, name) { - found = true - break - } + aggOpts := storage.FetchOptionsToAggregateOptions(options, query) + + var ( + namespaces = s.clusters.ClusterNamespaces() + accumulatedTags = storage.NewCompleteTagsResultBuilder(query.CompleteNameOnly) + multiErr syncMultiErrs + wg sync.WaitGroup + ) + + if len(namespaces) == 0 { + return nil, errNoNamespacesConfigured + } + + var mu sync.Mutex + aggIterators := make([]client.AggregatedTagsIterator, 0, len(namespaces)) + defer func() { + mu.Lock() + for _, it := range aggIterators { + it.Finalize() + } + + mu.Unlock() + }() + + wg.Add(len(namespaces)) + for _, namespace := range namespaces { + namespace := namespace // Capture var + go func() { + defer wg.Done() + session := namespace.Session() + namespaceID := namespace.NamespaceID() + aggTagIter, _, err := session.Aggregate(namespaceID, m3query, aggOpts) + if err != nil { + multiErr.add(err) + return + } + + mu.Lock() + aggIterators = append(aggIterators, aggTagIter) + mu.Unlock() + + completedTags := make([]storage.CompletedTag, aggTagIter.Remaining()) + for i := 0; aggTagIter.Next(); i++ { + name, values := aggTagIter.Current() + tagValues := make([][]byte, values.Remaining()) + for j := 0; values.Next(); j++ { + tagValues[j] = values.Current().Bytes() + } + + if err := values.Err(); err != nil { + multiErr.add(err) + return } - if !found { - continue + completedTags[i] = storage.CompletedTag{ + Name: name.Bytes(), + Values: tagValues, } } - tags = append(tags, storage.CompletedTag{ - Name: name, - Values: [][]byte{tag.Value.Bytes()}, - }) - } + if err := aggTagIter.Err(); err != nil { + multiErr.add(err) + return + } - if err := elem.Iter.Err(); err != nil { - return nil, err - } + result := &storage.CompleteTagsResult{ + CompleteNameOnly: query.CompleteNameOnly, + CompletedTags: completedTags, + } - accumulatedTags.Add(&storage.CompleteTagsResult{ - CompleteNameOnly: query.CompleteNameOnly, - CompletedTags: tags, - }) + if err := accumulatedTags.Add(result); err != nil { + multiErr.add(err) + } + }() + } + + wg.Wait() + if err := multiErr.lastError(); err != nil { + return nil, err } built := accumulatedTags.Build() diff --git a/src/query/storage/m3/storage_test.go b/src/query/storage/m3/storage_test.go index e38f8c7075..1c826a35b5 100644 --- a/src/query/storage/m3/storage_test.go +++ b/src/query/storage/m3/storage_test.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/test/seriesiter" "github.com/m3db/m3/src/query/ts" + bytetest "github.com/m3db/m3/src/x/test" "github.com/m3db/m3x/ident" "github.com/m3db/m3x/sync" xtest "github.com/m3db/m3x/test" @@ -443,7 +444,7 @@ func TestLocalSearchError(t *testing.T) { }) searchReq := newFetchReq() - _, err := store.FetchTags(context.TODO(), searchReq, buildFetchOpts()) + _, err := store.SearchSeries(context.TODO(), searchReq, buildFetchOpts()) assert.Error(t, err) } @@ -535,7 +536,7 @@ func TestLocalSearchSuccess(t *testing.T) { Return(nil, nil).AnyTimes() }) searchReq := newFetchReq() - result, err := store.FetchTags(context.TODO(), searchReq, buildFetchOpts()) + result, err := store.SearchSeries(context.TODO(), searchReq, buildFetchOpts()) require.NoError(t, err) require.Equal(t, len(fetches), len(result.Metrics)) @@ -600,36 +601,26 @@ func TestLocalCompleteTagsSuccess(t *testing.T) { store, sessions := setup(t, ctrl) type testFetchTaggedID struct { - id string - namespace string - tagName string - tagValue string + tagName string + tagValue string } fetches := []testFetchTaggedID{ { - id: "foo", - namespace: "metrics_unaggregated", - tagName: "qux", - tagValue: "qaz", + tagName: "qux", + tagValue: "qaz", }, { - id: "bar", - namespace: "metrics_aggregated_1m:30d", - tagName: "qel", - tagValue: "quz", + tagName: "aba", + tagValue: "quz", }, { - id: "baz", - namespace: "metrics_aggregated_5m:90d", - tagName: "qam", - tagValue: "qak", + tagName: "qam", + tagValue: "qak", }, { - id: "qux", - namespace: "metrics_aggregated_10m:365d", - tagName: "qux", - tagValue: "qaz2", + tagName: "qux", + tagValue: "qaz2", }, } @@ -646,40 +637,33 @@ func TestLocalCompleteTagsSuccess(t *testing.T) { f = fetches[3] default: // Not expecting from other (partial) namespaces - iter := client.NewMockTaggedIDsIterator(ctrl) + iter := client.NewMockAggregatedTagsIterator(ctrl) gomock.InOrder( + iter.EXPECT().Remaining().Return(0), iter.EXPECT().Next().Return(false), iter.EXPECT().Err().Return(nil), iter.EXPECT().Finalize(), ) - session.EXPECT().FetchTaggedIDs(gomock.Any(), gomock.Any(), gomock.Any()). + session.EXPECT().Aggregate(gomock.Any(), gomock.Any(), gomock.Any()). Return(iter, true, nil) - session.EXPECT().IteratorPools(). - Return(nil, nil).AnyTimes() return } - iter := client.NewMockTaggedIDsIterator(ctrl) + + iter := client.NewMockAggregatedTagsIterator(ctrl) gomock.InOrder( + iter.EXPECT().Remaining().Return(1), iter.EXPECT().Next().Return(true), iter.EXPECT().Current().Return( - ident.StringID(f.namespace), - ident.StringID(f.id), - ident.NewTagsIterator(ident.NewTags( - ident.Tag{ - Name: ident.StringID(f.tagName), - Value: ident.StringID(f.tagValue), - })), + ident.StringID(f.tagName), + ident.NewIDsIterator(ident.StringID(f.tagValue)), ), iter.EXPECT().Next().Return(false), iter.EXPECT().Err().Return(nil), iter.EXPECT().Finalize(), ) - session.EXPECT().FetchTaggedIDs(gomock.Any(), gomock.Any(), gomock.Any()). + session.EXPECT().Aggregate(gomock.Any(), gomock.Any(), gomock.Any()). Return(iter, true, nil) - - session.EXPECT().IteratorPools(). - Return(nil, nil).AnyTimes() }) req := newCompleteTagsReq() @@ -687,9 +671,17 @@ func TestLocalCompleteTagsSuccess(t *testing.T) { require.NoError(t, err) require.False(t, result.CompleteNameOnly) - - require.Equal(t, 1, len(result.CompletedTags)) + require.Equal(t, 3, len(result.CompletedTags)) + // NB: expected will be sorted alphabetically expected := []storage.CompletedTag{ + { + Name: []byte("aba"), + Values: [][]byte{[]byte("quz")}, + }, + { + Name: []byte("qam"), + Values: [][]byte{[]byte("qak")}, + }, { Name: []byte("qux"), Values: [][]byte{[]byte("qaz"), []byte("qaz2")}, @@ -698,3 +690,59 @@ func TestLocalCompleteTagsSuccess(t *testing.T) { assert.Equal(t, expected, result.CompletedTags) } + +func TestLocalCompleteTagsSuccessFinalize(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + unagg := client.NewMockSession(ctrl) + clusters, err := NewClusters(UnaggregatedClusterNamespaceDefinition{ + NamespaceID: ident.StringID("metrics_unaggregated"), + Session: unagg, + Retention: test1MonthRetention, + }) + + require.NoError(t, err) + store := newTestStorage(t, clusters) + + name, value := ident.StringID("name"), ident.StringID("value") + iter := client.NewMockAggregatedTagsIterator(ctrl) + gomock.InOrder( + iter.EXPECT().Remaining().Return(1), + iter.EXPECT().Next().Return(true), + iter.EXPECT().Current().Return( + name, + ident.NewIDsIterator(value), + ), + iter.EXPECT().Next().Return(false), + iter.EXPECT().Err().Return(nil), + iter.EXPECT().Finalize().Do(func() { + name.Finalize() + value.Finalize() + }), + ) + + unagg.EXPECT().Aggregate(gomock.Any(), gomock.Any(), gomock.Any()). + Return(iter, true, nil) + + req := newCompleteTagsReq() + result, err := store.CompleteTags(context.TODO(), req, buildFetchOpts()) + require.NoError(t, err) + + require.False(t, result.CompleteNameOnly) + require.Equal(t, 1, len(result.CompletedTags)) + // NB: expected will be sorted alphabetically + expected := []storage.CompletedTag{ + { + Name: []byte("name"), + Values: [][]byte{[]byte("value")}, + }, + } + + require.Equal(t, expected, result.CompletedTags) + + // ensure that the tag names and values are not backed by the same data. + n, v := result.CompletedTags[0].Name, result.CompletedTags[0].Values[0] + assert.False(t, bytetest.ByteSlicesBackedBySameData(name.Bytes(), n)) + assert.False(t, bytetest.ByteSlicesBackedBySameData(value.Bytes(), v)) +} diff --git a/src/query/storage/mock/storage.go b/src/query/storage/mock/storage.go index 1ff562c513..7a8f4d2370 100644 --- a/src/query/storage/mock/storage.go +++ b/src/query/storage/mock/storage.go @@ -36,7 +36,7 @@ type Storage interface { SetTypeResult(storage.Type) LastFetchOptions() *storage.FetchOptions SetFetchResult(*storage.FetchResult, error) - SetFetchTagsResult(*storage.SearchResults, error) + SetSearchSeriesResult(*storage.SearchResults, error) SetCompleteTagsResult(*storage.CompleteTagsResult, error) SetWriteResult(error) SetFetchBlocksResult(block.Result, error) @@ -93,7 +93,7 @@ func (s *mockStorage) SetFetchResult(result *storage.FetchResult, err error) { s.fetchResult.err = err } -func (s *mockStorage) SetFetchTagsResult(result *storage.SearchResults, err error) { +func (s *mockStorage) SetSearchSeriesResult(result *storage.SearchResults, err error) { s.Lock() defer s.Unlock() s.fetchTagsResult.result = result @@ -159,7 +159,7 @@ func (s *mockStorage) FetchBlocks( return s.fetchBlocksResult.result, s.fetchBlocksResult.err } -func (s *mockStorage) FetchTags( +func (s *mockStorage) SearchSeries( ctx context.Context, query *storage.FetchQuery, _ *storage.FetchOptions, diff --git a/src/query/storage/remote/storage.go b/src/query/storage/remote/storage.go index 8ff3529d3c..0ac29f7232 100644 --- a/src/query/storage/remote/storage.go +++ b/src/query/storage/remote/storage.go @@ -54,12 +54,12 @@ func (s *remoteStorage) FetchBlocks( return s.client.FetchBlocks(ctx, query, options) } -func (s *remoteStorage) FetchTags( +func (s *remoteStorage) SearchSeries( ctx context.Context, query *storage.FetchQuery, options *storage.FetchOptions, ) (*storage.SearchResults, error) { - return s.client.FetchTags(ctx, query, options) + return s.client.SearchSeries(ctx, query, options) } func (s *remoteStorage) CompleteTags( diff --git a/src/query/storage/storage_mock.go b/src/query/storage/storage_mock.go new file mode 100644 index 0000000000..41dc320687 --- /dev/null +++ b/src/query/storage/storage_mock.go @@ -0,0 +1,159 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/m3db/m3/src/query/storage (interfaces: Storage) + +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package storage is a generated GoMock package. +package storage + +import ( + "context" + "reflect" + + "github.com/m3db/m3/src/query/block" + + "github.com/golang/mock/gomock" +) + +// MockStorage is a mock of Storage interface +type MockStorage struct { + ctrl *gomock.Controller + recorder *MockStorageMockRecorder +} + +// MockStorageMockRecorder is the mock recorder for MockStorage +type MockStorageMockRecorder struct { + mock *MockStorage +} + +// NewMockStorage creates a new mock instance +func NewMockStorage(ctrl *gomock.Controller) *MockStorage { + mock := &MockStorage{ctrl: ctrl} + mock.recorder = &MockStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockStorage) EXPECT() *MockStorageMockRecorder { + return m.recorder +} + +// Close mocks base method +func (m *MockStorage) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockStorageMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStorage)(nil).Close)) +} + +// CompleteTags mocks base method +func (m *MockStorage) CompleteTags(arg0 context.Context, arg1 *CompleteTagsQuery, arg2 *FetchOptions) (*CompleteTagsResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CompleteTags", arg0, arg1, arg2) + ret0, _ := ret[0].(*CompleteTagsResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CompleteTags indicates an expected call of CompleteTags +func (mr *MockStorageMockRecorder) CompleteTags(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CompleteTags", reflect.TypeOf((*MockStorage)(nil).CompleteTags), arg0, arg1, arg2) +} + +// Fetch mocks base method +func (m *MockStorage) Fetch(arg0 context.Context, arg1 *FetchQuery, arg2 *FetchOptions) (*FetchResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Fetch", arg0, arg1, arg2) + ret0, _ := ret[0].(*FetchResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Fetch indicates an expected call of Fetch +func (mr *MockStorageMockRecorder) Fetch(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Fetch", reflect.TypeOf((*MockStorage)(nil).Fetch), arg0, arg1, arg2) +} + +// FetchBlocks mocks base method +func (m *MockStorage) FetchBlocks(arg0 context.Context, arg1 *FetchQuery, arg2 *FetchOptions) (block.Result, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchBlocks", arg0, arg1, arg2) + ret0, _ := ret[0].(block.Result) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchBlocks indicates an expected call of FetchBlocks +func (mr *MockStorageMockRecorder) FetchBlocks(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchBlocks", reflect.TypeOf((*MockStorage)(nil).FetchBlocks), arg0, arg1, arg2) +} + +// SearchSeries mocks base method +func (m *MockStorage) SearchSeries(arg0 context.Context, arg1 *FetchQuery, arg2 *FetchOptions) (*SearchResults, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SearchSeries", arg0, arg1, arg2) + ret0, _ := ret[0].(*SearchResults) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SearchSeries indicates an expected call of SearchSeries +func (mr *MockStorageMockRecorder) SearchSeries(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchSeries", reflect.TypeOf((*MockStorage)(nil).SearchSeries), arg0, arg1, arg2) +} + +// Type mocks base method +func (m *MockStorage) Type() Type { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Type") + ret0, _ := ret[0].(Type) + return ret0 +} + +// Type indicates an expected call of Type +func (mr *MockStorageMockRecorder) Type() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Type", reflect.TypeOf((*MockStorage)(nil).Type)) +} + +// Write mocks base method +func (m *MockStorage) Write(arg0 context.Context, arg1 *WriteQuery) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Write", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Write indicates an expected call of Write +func (mr *MockStorageMockRecorder) Write(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockStorage)(nil).Write), arg0, arg1) +} diff --git a/src/query/storage/types.go b/src/query/storage/types.go index e117ec96d5..7b7341dab7 100644 --- a/src/query/storage/types.go +++ b/src/query/storage/types.go @@ -153,8 +153,8 @@ type Querier interface { options *FetchOptions, ) (block.Result, error) - // FetchTags returns search results for tags - FetchTags( + // SearchSeries returns series IDs matching the current query + SearchSeries( ctx context.Context, query *FetchQuery, options *FetchOptions, @@ -187,6 +187,8 @@ type CompleteTagsQuery struct { CompleteNameOnly bool FilterNameTags [][]byte TagMatchers models.Matchers + Start time.Time + End time.Time } // SeriesMatchQuery represents a query that returns a set of series diff --git a/src/query/storage/validator/storage.go b/src/query/storage/validator/storage.go index 41cb3fb301..be8be43eef 100644 --- a/src/query/storage/validator/storage.go +++ b/src/query/storage/validator/storage.go @@ -158,12 +158,12 @@ func (s *debugStorage) Type() storage.Type { return storage.TypeDebug } -func (s *debugStorage) FetchTags( +func (s *debugStorage) SearchSeries( ctx context.Context, query *storage.FetchQuery, _ *storage.FetchOptions, ) (*storage.SearchResults, error) { - return nil, errors.New("FetchTags not implemented") + return nil, errors.New("SearchSeries not implemented") } func (s *debugStorage) CompleteTags( diff --git a/src/query/stores/m3db/async_session.go b/src/query/stores/m3db/async_session.go index 928aa21b63..6da2ba7b17 100644 --- a/src/query/stores/m3db/async_session.go +++ b/src/query/stores/m3db/async_session.go @@ -41,9 +41,10 @@ var ( errSessionUninitialized = errors.New("M3DB session not yet initialized") ) -// AsyncSession is a thin wrapper around an M3DB session that does not block on initialization. -// Calls to methods while uninitialized will return an uninitialized error. The done channel -// is to notify the caller that the session has finished _attempting_ to get initialized. +// AsyncSession is a thin wrapper around an M3DB session that does not block +// on initialization. Calls to methods while uninitialized will return an +// uninitialized error. The done channel is to notify the caller that the +// session has finished _attempting_ to get initialized. type AsyncSession struct { sync.RWMutex session client.Session @@ -54,7 +55,7 @@ type AsyncSession struct { // NewClientFn provides a DB client. type NewClientFn func() (client.Client, error) -// NewAsyncSession returns a new AsyncSession +// NewAsyncSession returns a new AsyncSession. func NewAsyncSession(fn NewClientFn, done chan<- struct{}) *AsyncSession { asyncSession := &AsyncSession{ done: done, @@ -92,8 +93,9 @@ func NewAsyncSession(fn NewClientFn, done chan<- struct{}) *AsyncSession { return asyncSession } -// Write writes a value to the database for an ID -func (s *AsyncSession) Write(namespace, id ident.ID, t time.Time, value float64, unit xtime.Unit, annotation []byte) error { +// Write writes a value to the database for an ID. +func (s *AsyncSession) Write(namespace, id ident.ID, t time.Time, value float64, + unit xtime.Unit, annotation []byte) error { s.RLock() defer s.RUnlock() if s.err != nil { @@ -103,8 +105,9 @@ func (s *AsyncSession) Write(namespace, id ident.ID, t time.Time, value float64, return s.session.Write(namespace, id, t, value, unit, annotation) } -// WriteTagged writes a value to the database for an ID and given tags -func (s *AsyncSession) WriteTagged(namespace, id ident.ID, tags ident.TagIterator, t time.Time, value float64, unit xtime.Unit, annotation []byte) error { +// WriteTagged writes a value to the database for an ID and given tags. +func (s *AsyncSession) WriteTagged(namespace, id ident.ID, tags ident.TagIterator, + t time.Time, value float64, unit xtime.Unit, annotation []byte) error { s.RLock() defer s.RUnlock() if s.err != nil { @@ -114,8 +117,9 @@ func (s *AsyncSession) WriteTagged(namespace, id ident.ID, tags ident.TagIterato return s.session.WriteTagged(namespace, id, tags, t, value, unit, annotation) } -// Fetch fetches values from the database for an ID -func (s *AsyncSession) Fetch(namespace, id ident.ID, startInclusive, endExclusive time.Time) (encoding.SeriesIterator, error) { +// Fetch fetches values from the database for an ID. +func (s *AsyncSession) Fetch(namespace, id ident.ID, startInclusive, + endExclusive time.Time) (encoding.SeriesIterator, error) { s.RLock() defer s.RUnlock() if s.err != nil { @@ -125,8 +129,9 @@ func (s *AsyncSession) Fetch(namespace, id ident.ID, startInclusive, endExclusiv return s.session.Fetch(namespace, id, startInclusive, endExclusive) } -// FetchIDs fetches values from the database for a set of IDs -func (s *AsyncSession) FetchIDs(namespace ident.ID, ids ident.Iterator, startInclusive, endExclusive time.Time) (encoding.SeriesIterators, error) { +// FetchIDs fetches values from the database for a set of IDs. +func (s *AsyncSession) FetchIDs(namespace ident.ID, ids ident.Iterator, + startInclusive, endExclusive time.Time) (encoding.SeriesIterators, error) { s.RLock() defer s.RUnlock() if s.err != nil { @@ -136,8 +141,11 @@ func (s *AsyncSession) FetchIDs(namespace ident.ID, ids ident.Iterator, startInc return s.session.FetchIDs(namespace, ids, startInclusive, endExclusive) } -// FetchTagged resolves the provided query to known IDs, and fetches the data for them -func (s *AsyncSession) FetchTagged(namespace ident.ID, q index.Query, opts index.QueryOptions) (results encoding.SeriesIterators, exhaustive bool, err error) { +// FetchTagged resolves the provided query to known IDs, and +// fetches the data for them. +func (s *AsyncSession) FetchTagged(namespace ident.ID, q index.Query, + opts index.QueryOptions) (results encoding.SeriesIterators, + exhaustive bool, err error) { s.RLock() defer s.RUnlock() if s.err != nil { @@ -148,7 +156,8 @@ func (s *AsyncSession) FetchTagged(namespace ident.ID, q index.Query, opts index } // FetchTaggedIDs resolves the provided query to known IDs. -func (s *AsyncSession) FetchTaggedIDs(namespace ident.ID, q index.Query, opts index.QueryOptions) (client.TaggedIDsIterator, bool, error) { +func (s *AsyncSession) FetchTaggedIDs(namespace ident.ID, q index.Query, + opts index.QueryOptions) (client.TaggedIDsIterator, bool, error) { s.RLock() defer s.RUnlock() if s.err != nil { @@ -171,7 +180,7 @@ func (s *AsyncSession) Aggregate(namespace ident.ID, q index.Query, opts index.A // ShardID returns the given shard for an ID for callers // to easily discern what shard is failing when operations -// for given IDs begin failing +// for given IDs begin failing. func (s *AsyncSession) ShardID(id ident.ID) (uint32, error) { s.RLock() defer s.RUnlock() @@ -182,7 +191,8 @@ func (s *AsyncSession) ShardID(id ident.ID) (uint32, error) { return s.session.ShardID(id) } -// IteratorPools exposes the internal iterator pools used by the session to clients +// IteratorPools exposes the internal iterator pools used by the session to +// clients. func (s *AsyncSession) IteratorPools() (encoding.IteratorPools, error) { s.RLock() defer s.RUnlock() @@ -192,7 +202,7 @@ func (s *AsyncSession) IteratorPools() (encoding.IteratorPools, error) { return s.session.IteratorPools() } -// Close closes the session +// Close closes the session. func (s *AsyncSession) Close() error { s.RLock() defer s.RUnlock() diff --git a/src/query/stores/m3db/async_session_test.go b/src/query/stores/m3db/async_session_test.go index 80d6a2c8f7..406e4084de 100644 --- a/src/query/stores/m3db/async_session_test.go +++ b/src/query/stores/m3db/async_session_test.go @@ -106,6 +106,9 @@ func TestAsyncSessionUninitialized(t *testing.T) { _, _, err = asyncSession.FetchTaggedIDs(namespace, index.Query{}, index.QueryOptions{}) assert.Equal(t, err, errSessionUninitialized) + _, _, err = asyncSession.Aggregate(namespace, index.Query{}, index.AggregationOptions{}) + assert.Equal(t, err, errSessionUninitialized) + id, err := asyncSession.ShardID(nil) assert.Equal(t, uint32(0), id) assert.Equal(t, err, errSessionUninitialized) @@ -154,6 +157,10 @@ func TestAsyncSessionInitialized(t *testing.T) { _, _, err = asyncSession.FetchTaggedIDs(namespace, index.Query{}, index.QueryOptions{}) assert.NoError(t, err) + mockSession.EXPECT().Aggregate(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false, nil) + _, _, err = asyncSession.Aggregate(namespace, index.Query{}, index.AggregationOptions{}) + assert.NoError(t, err) + mockSession.EXPECT().ShardID(gomock.Any()).Return(uint32(0), nil) _, err = asyncSession.ShardID(nil) assert.NoError(t, err) diff --git a/src/query/test/storage.go b/src/query/test/storage.go index 599fcd6031..09a25cf45c 100644 --- a/src/query/test/storage.go +++ b/src/query/test/storage.go @@ -60,13 +60,13 @@ func (s *slowStorage) FetchBlocks( return s.storage.FetchBlocks(ctx, query, options) } -func (s *slowStorage) FetchTags( +func (s *slowStorage) SearchSeries( ctx context.Context, query *storage.FetchQuery, options *storage.FetchOptions, ) (*storage.SearchResults, error) { time.Sleep(s.delay) - return s.storage.FetchTags(ctx, query, options) + return s.storage.SearchSeries(ctx, query, options) } func (s *slowStorage) CompleteTags( diff --git a/src/query/tsdb/remote/client.go b/src/query/tsdb/remote/client.go index 5121aee90a..29688f0129 100644 --- a/src/query/tsdb/remote/client.go +++ b/src/query/tsdb/remote/client.go @@ -213,7 +213,7 @@ func (c *grpcClient) FetchBlocks( return res, nil } -func (c *grpcClient) FetchTags( +func (c *grpcClient) SearchSeries( ctx context.Context, query *storage.FetchQuery, options *storage.FetchOptions,