Skip to content

Commit

Permalink
[query] Add new aggregate endpoint to query
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola committed Mar 29, 2019
1 parent 2427bbb commit c535a7d
Show file tree
Hide file tree
Showing 35 changed files with 1,186 additions and 436 deletions.
16 changes: 16 additions & 0 deletions scripts/docker-integration-tests/carbon/expected/a.ba.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[
{
"id": "a.bag",
"text": "bag",
"leaf": 1,
"expandable": 0,
"allowChildren": 0
},
{
"id": "a.bar",
"text": "bar",
"leaf": 0,
"expandable": 1,
"allowChildren": 1
}
]
3 changes: 2 additions & 1 deletion scripts/docker-integration-tests/carbon/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function read_carbon {
function find_carbon {
query=$1
expected_file=$2
RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/graphite/metrics/find?query=$query")
RESPONSE=$(curl -sSg "http://localhost:7201/api/v1/graphite/metrics/find?query=$query")
ACTUAL=$(echo $RESPONSE | jq '. | sort')
EXPECTED=$(cat $EXPECTED_PATH/$expected_file | jq '. | sort')
if [ "$ACTUAL" == "$EXPECTED" ]
Expand Down Expand Up @@ -83,6 +83,7 @@ echo "a.bar.caw.daz 0 $t" | nc 0.0.0.0 7204
echo "a.bag 0 $t" | nc 0.0.0.0 7204
ATTEMPTS=5 TIMEOUT=1 retry_with_backoff find_carbon a* a.json
ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.b* a.b.json
ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.ba[rg] a.ba.json
ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.b*.c* a.b.c.json
ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.b*.caw.* a.b.c.d.json
ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon x none.json
Expand Down
100 changes: 72 additions & 28 deletions src/query/api/v1/handler/graphite/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
package graphite

import (
"bytes"
"context"
"errors"
"net/http"
"sync"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/net/http"
xerrors "github.com/m3db/m3x/errors"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -57,52 +59,94 @@ func NewFindHandler(
}
}

func mergeTags(
terminatedResult *storage.CompleteTagsResult,
childResult *storage.CompleteTagsResult,
) (map[string]bool, error) {
// sanity check the case.
if terminatedResult.CompleteNameOnly {
return nil, errors.New("terminated result is completing name only")
}

if childResult.CompleteNameOnly {
return nil, errors.New("child result is completing name only")
}

mapLength := len(terminatedResult.CompletedTags) + len(childResult.CompletedTags)
tagMap := make(map[string]bool, mapLength)

for _, tag := range terminatedResult.CompletedTags {
for _, value := range tag.Values {
tagMap[string(value)] = false
}
}

// NB: fine to overwrite any tags which were present in the `terminatedResult` map
// since if they appear in `childResult`, then they exist AND have children.
for _, tag := range childResult.CompletedTags {
for _, value := range tag.Values {
tagMap[string(value)] = true
}
}

return tagMap, nil
}

func (h *grahiteFindHandler) ServeHTTP(
w http.ResponseWriter,
r *http.Request,
) {
ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)
logger := logging.WithContext(ctx)
w.Header().Set("Content-Type", "application/json")
query, rErr := parseFindParamsToQuery(r)

// NB: need to run two separate queries, one of which will match only the
// provided matchers, and one which will match the provided matchers with at
// least one more child node. For further information, refer to the comment
// for parseFindParamsToQueries
terminatedQuery, childQuery, raw, rErr := parseFindParamsToQueries(r)
if rErr != nil {
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
}

opts := storage.NewFetchOptions()
result, err := h.storage.FetchTags(ctx, query, opts)
if err != nil {
var (
terminatedResult *storage.CompleteTagsResult
tErr error
childResult *storage.CompleteTagsResult
cErr error
opts = storage.NewFetchOptions()

wg sync.WaitGroup
)

wg.Add(2)
go func() {
terminatedResult, tErr = h.storage.CompleteTags(ctx, terminatedQuery, opts)
wg.Done()
}()

go func() {
childResult, cErr = h.storage.CompleteTags(ctx, childQuery, opts)
wg.Done()
}()

wg.Wait()
if err := xerrors.FirstError(tErr, cErr); err != nil {
logger.Error("unable to complete tags", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
return
}

partCount := graphite.CountMetricParts(query.Raw)
partName := graphite.TagName(partCount - 1)
seenMap := make(map[string]bool, len(result.Metrics))
for _, m := range result.Metrics {
tags := m.Tags.Tags
index := 0
// TODO: make this more performant by computing the index for the tag name.
for i, tag := range tags {
if bytes.Equal(partName, tag.Name) {
index = i
break
}
}

value := tags[index].Value
// If this value has already been encountered, check if
if hadExtra, seen := seenMap[string(value)]; seen && hadExtra {
continue
}

hasExtraParts := len(tags) > partCount
seenMap[string(value)] = hasExtraParts
// NB: merge results from both queries to specify which series have children
seenMap, err := mergeTags(terminatedResult, childResult)
if err != nil {
logger.Error("unable to complete tags", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
return
}

prefix := graphite.DropLastMetricPart(query.Raw)
prefix := graphite.DropLastMetricPart(raw)
if len(prefix) > 0 {
prefix += "."
}
Expand Down
88 changes: 68 additions & 20 deletions src/query/api/v1/handler/graphite/find_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,41 @@ import (
"github.com/m3db/m3/src/query/errors"
"github.com/m3db/m3/src/query/graphite/graphite"
graphiteStorage "github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/json"
"github.com/m3db/m3/src/x/net/http"
)

func parseFindParamsToQuery(r *http.Request) (
*storage.FetchQuery,
*xhttp.ParseError,
// parseFindParamsToQueries parses an incoming request to two find queries,
// which are then combined to give the final result.
// It returns, in order:
// the given query; this will return all values for exactly that tag which have
// _terminatedQuery, which adds an explicit terminator after the last term in
// no child nodes
// _childQuery, which adds an explicit match all after the last term in the
// given query; this will return all values for exactly that tag which have at
// least one child node.
// _rawQueryString, which is the initial query request (bar final
// matcher), which is used to reconstruct the return values.
// _err, any error encountered during parsing.
//
// As an example, given the query `a.b*`, and metrics `a.bar.c` and `a.biz`,
// terminatedQuery will return only [biz], and childQuery will return only
// [bar].
func parseFindParamsToQueries(r *http.Request) (
_terminatedQuery *storage.CompleteTagsQuery,
_childQuery *storage.CompleteTagsQuery,
_rawQueryString string,
_err *xhttp.ParseError,
) {
values := r.URL.Query()
query := values.Get("query")
if query == "" {
return nil, nil, "",
xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest)
}

now := time.Now()
fromString, untilString := r.FormValue("from"), r.FormValue("until")
if len(fromString) == 0 {
Expand All @@ -56,8 +81,9 @@ func parseFindParamsToQuery(r *http.Request) (
)

if err != nil {
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'from': %s", fromString),
http.StatusBadRequest)
return nil, nil, "",
xhttp.NewParseError(fmt.Errorf("invalid 'from': %s", fromString),
http.StatusBadRequest)
}

until, err := graphite.ParseTime(
Expand All @@ -67,28 +93,50 @@ func parseFindParamsToQuery(r *http.Request) (
)

if err != nil {
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'until': %s", untilString),
http.StatusBadRequest)
return nil, nil, "",
xhttp.NewParseError(fmt.Errorf("invalid 'until': %s", untilString),
http.StatusBadRequest)
}

query := values.Get("query")
if query == "" {
return nil, xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest)
matchers, err := graphiteStorage.TranslateQueryToMatchersWithTerminator(query)
if err != nil {
return nil, nil, "",
xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query),
http.StatusBadRequest)
}

matchers, err := graphiteStorage.TranslateQueryToMatchers(query)
if err != nil {
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query),
// NB: Filter will always be the second last term in the matchers, and the
// matchers should always have a length of at least 2 (term + terminator)
// so this is a sanity check and unexpected in actual execution.
if len(matchers) < 2 {
return nil, nil, "", xhttp.NewParseError(fmt.Errorf("unable to parse "+
"'query': %s", query),
http.StatusBadRequest)
}

return &storage.FetchQuery{
Raw: query,
TagMatchers: matchers,
Start: from,
End: until,
Interval: 0,
}, nil
filter := [][]byte{matchers[len(matchers)-2].Name}
terminatedQuery := &storage.CompleteTagsQuery{
CompleteNameOnly: false,
FilterNameTags: filter,
TagMatchers: matchers,
Start: from,
End: until,
}

clonedMatchers := make([]models.Matcher, len(matchers))
copy(clonedMatchers, matchers)
// NB: change terminator from `MatchNotRegexp` to `MatchRegexp` to ensure
// segments with children are matched.
clonedMatchers[len(clonedMatchers)-1].Type = models.MatchRegexp
childQuery := &storage.CompleteTagsQuery{
CompleteNameOnly: false,
FilterNameTags: filter,
TagMatchers: clonedMatchers,
Start: from,
End: until,
}

return terminatedQuery, childQuery, query, nil
}

func findResultsJSON(
Expand Down
Loading

0 comments on commit c535a7d

Please sign in to comment.