Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Restrict query by header tag #2053

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions src/query/api/v1/handler/fetch_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package handler

import (
"encoding/json"
"fmt"
"math"
"net/http"
Expand Down Expand Up @@ -103,6 +104,7 @@ func (b fetchOptionsBuilder) NewFetchOptions(
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.Limit = limit
if str := req.Header.Get(MetricsTypeHeader); str != "" {
mt, err := storage.ParseMetricsType(str)
Expand All @@ -111,20 +113,43 @@ func (b fetchOptionsBuilder) NewFetchOptions(
"could not parse metrics type: input=%s, err=%v", str, err)
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
fetchOpts.RestrictFetchOptions = newOrExistingRestrictFetchOptions(fetchOpts)
fetchOpts.RestrictFetchOptions.MetricsType = mt

fetchOpts.RestrictQueryOptions = newOrExistingRestrictQueryOptions(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByType =
newOrExistingRestrictQueryOptionsRestrictByType(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByType.MetricsType = mt
}

if str := req.Header.Get(MetricsStoragePolicyHeader); str != "" {
sp, err := policy.ParseStoragePolicy(str)
if err != nil {
err = fmt.Errorf(
"could not parse storage policy: input=%s, err=%v", str, err)
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
fetchOpts.RestrictFetchOptions = newOrExistingRestrictFetchOptions(fetchOpts)
fetchOpts.RestrictFetchOptions.StoragePolicy = sp

fetchOpts.RestrictQueryOptions = newOrExistingRestrictQueryOptions(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByType =
newOrExistingRestrictQueryOptionsRestrictByType(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByType.StoragePolicy = sp
}
if restrict := fetchOpts.RestrictFetchOptions; restrict != nil {

if str := req.Header.Get(QueryOptionsJSONHeader); str != "" {
var opts stringTagOptions
if err := json.Unmarshal([]byte(str), &opts); err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

tagOpts, err := opts.toOptions()
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.RestrictQueryOptions = newOrExistingRestrictQueryOptions(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByTag = tagOpts
}

if restrict := fetchOpts.RestrictQueryOptions; restrict != nil {
if err := restrict.Validate(); err != nil {
err = fmt.Errorf(
"could not validate restrict options: err=%v", err)
Expand Down Expand Up @@ -152,13 +177,22 @@ func (b fetchOptionsBuilder) NewFetchOptions(
return fetchOpts, nil
}

func newOrExistingRestrictFetchOptions(
func newOrExistingRestrictQueryOptions(
fetchOpts *storage.FetchOptions,
) *storage.RestrictQueryOptions {
if v := fetchOpts.RestrictQueryOptions; v != nil {
return v
}
return &storage.RestrictQueryOptions{}
}

func newOrExistingRestrictQueryOptionsRestrictByType(
fetchOpts *storage.FetchOptions,
) *storage.RestrictFetchOptions {
if v := fetchOpts.RestrictFetchOptions; v != nil {
) *storage.RestrictByType {
if v := fetchOpts.RestrictQueryOptions.RestrictByType; v != nil {
return v
}
return &storage.RestrictFetchOptions{}
return &storage.RestrictByType{}
}

// ParseStep parses the step duration for an HTTP request.
Expand Down
74 changes: 65 additions & 9 deletions src/query/api/v1/handler/fetch_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"

"github.com/stretchr/testify/assert"
Expand All @@ -47,7 +48,7 @@ func TestFetchOptionsBuilder(t *testing.T) {
headers map[string]string
query string
expectedLimit int
expectedRestrict *storage.RestrictFetchOptions
expectedRestrict *storage.RestrictQueryOptions
expectedLookback *expectedLookback
expectedErr bool
}{
Expand Down Expand Up @@ -78,8 +79,10 @@ func TestFetchOptionsBuilder(t *testing.T) {
headers: map[string]string{
MetricsTypeHeader: storage.UnaggregatedMetricsType.String(),
},
expectedRestrict: &storage.RestrictFetchOptions{
MetricsType: storage.UnaggregatedMetricsType,
expectedRestrict: &storage.RestrictQueryOptions{
RestrictByType: &storage.RestrictByType{
MetricsType: storage.UnaggregatedMetricsType,
},
},
},
{
Expand All @@ -88,9 +91,11 @@ func TestFetchOptionsBuilder(t *testing.T) {
MetricsTypeHeader: storage.AggregatedMetricsType.String(),
MetricsStoragePolicyHeader: "1m:14d",
},
expectedRestrict: &storage.RestrictFetchOptions{
MetricsType: storage.AggregatedMetricsType,
StoragePolicy: policy.MustParseStoragePolicy("1m:14d"),
expectedRestrict: &storage.RestrictQueryOptions{
RestrictByType: &storage.RestrictByType{
MetricsType: storage.AggregatedMetricsType,
StoragePolicy: policy.MustParseStoragePolicy("1m:14d"),
},
},
},
{
Expand Down Expand Up @@ -167,10 +172,10 @@ func TestFetchOptionsBuilder(t *testing.T) {
require.NoError(t, err)
require.Equal(t, test.expectedLimit, opts.Limit)
if test.expectedRestrict == nil {
require.Nil(t, opts.RestrictFetchOptions)
require.Nil(t, opts.RestrictQueryOptions)
} else {
require.NotNil(t, opts.RestrictFetchOptions)
require.Equal(t, *test.expectedRestrict, *opts.RestrictFetchOptions)
require.NotNil(t, opts.RestrictQueryOptions)
require.Equal(t, *test.expectedRestrict, *opts.RestrictQueryOptions)
}
if test.expectedLookback == nil {
require.Nil(t, opts.LookbackDuration)
Expand Down Expand Up @@ -269,3 +274,54 @@ func TestParseDurationOverflowError(t *testing.T) {
_, err = ParseDuration(r, StepParam)
assert.Error(t, err)
}

func TestFetchOptionsWithHeader(t *testing.T) {
type expectedLookback struct {
value time.Duration
}

headers := map[string]string{
MetricsTypeHeader: storage.AggregatedMetricsType.String(),
MetricsStoragePolicyHeader: "1m:14d",
QueryOptionsJSONHeader: `{
"match":[
{"name":"a", "value":"b", "type":"EQUAL"},
{"name":"c", "value":"d", "type":"NOTEQUAL"},
{"name":"e", "value":"f", "type":"REGEXP"},
{"name":"g", "value":"h", "type":"NOTREGEXP"},
{"name":"i", "value":"j", "type":"EXISTS"},
{"name":"k", "value":"l", "type":"NOTEXISTS"}
],
"strip":["foo"]
}`,
}

builder := NewFetchOptionsBuilder(FetchOptionsBuilderOptions{Limit: 5})
req := httptest.NewRequest("GET", "/", nil)
for k, v := range headers {
req.Header.Add(k, v)
}

opts, err := builder.NewFetchOptions(req)
require.NoError(t, err)
require.NotNil(t, opts.RestrictQueryOptions)
ex := &storage.RestrictQueryOptions{
RestrictByType: &storage.RestrictByType{
MetricsType: storage.AggregatedMetricsType,
StoragePolicy: policy.MustParseStoragePolicy("1m:14d"),
},
RestrictByTag: &storage.RestrictByTag{
Restrict: models.Matchers{
mustMatcher("a", "b", models.MatchEqual),
mustMatcher("c", "d", models.MatchNotEqual),
mustMatcher("e", "f", models.MatchRegexp),
mustMatcher("g", "h", models.MatchNotRegexp),
mustMatcher("i", "j", models.MatchField),
mustMatcher("k", "l", models.MatchNotField),
},
Strip: toStrip("foo"),
},
}

require.Equal(t, ex, opts.RestrictQueryOptions)
}
3 changes: 3 additions & 0 deletions src/query/api/v1/handler/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ const (
// metrics type.
MetricsStoragePolicyHeader = "M3-Storage-Policy"

// QueryOptionsJSONHeader provides tag options to enforces on queries.
QueryOptionsJSONHeader = "M3-Restrict-By-Tags-JSON"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably update the header name to match how the header reads (to be consistent with other headers defined here).

So QueryOptionsJSONHeader -> RestrictByTagsJSONHeader?


// UnaggregatedStoragePolicy specifies the unaggregated storage policy.
UnaggregatedStoragePolicy = "unaggregated"

Expand Down
20 changes: 20 additions & 0 deletions src/query/api/v1/handler/prometheus/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3/src/query/models"
xpromql "github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/query/util"
"github.com/m3db/m3/src/query/util/json"
xhttp "github.com/m3db/m3/src/x/net/http"
Expand Down Expand Up @@ -632,3 +633,22 @@ type PromDebug struct {
Input Response `json:"input"`
Results Response `json:"results"`
}

// FilterSeriesByOptions removes series tags based on options.
func FilterSeriesByOptions(
series []*ts.Series,
opts *storage.FetchOptions,
) []*ts.Series {
if opts == nil {
return series
}

keys := opts.RestrictQueryOptions.GetRestrictByTag().GetFilterByNames()
if len(keys) > 0 {
for i, s := range series {
series[i].Tags = s.Tags.TagsWithoutKeys(keys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever push these filters down to the dbnode?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We push them down through the FetchOptions, e.g. in the WithAppliedOptions function (here)[https://sourcegraph.com/github.com/m3db/m3/-/blob/src/query/storage/index.go#L128], where the additional matchers are added to the query as it's translated to an index.Query (the format used by the db)

This is actually on the path back from dbnode, and it's removing the unneeded tags. For full queries (as opposed to tag completion or search queries), we actually have to complete all processing steps first before we can strip the tags out, to ensure any functions that rely on matching tags to work correctly. If you remove them too early, a case where you add a matcher that looks like foo~=".*" is likely to break as you may have multiple different series with the same "ID", and any aggregations will likely not match series correctly

}
}

return series
}
4 changes: 3 additions & 1 deletion src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
QueryContextOptions: models.QueryContextOptions{
LimitMaxTimeseries: fetchOpts.Limit,
}}
if restrictOpts := fetchOpts.RestrictFetchOptions; restrictOpts != nil {

restrictOpts := fetchOpts.RestrictQueryOptions.GetRestrictByType()
if restrictOpts != nil {
restrict := &models.RestrictFetchTypeQueryContextOptions{
MetricsType: uint(restrictOpts.MetricsType),
StoragePolicy: restrictOpts.StoragePolicy,
Expand Down
2 changes: 2 additions & 0 deletions src/query/api/v1/handler/prometheus/native/read_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sort"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/executor"
"github.com/m3db/m3/src/query/models"
Expand Down Expand Up @@ -150,6 +151,7 @@ func read(
return emptyResult, err
}

series = prometheus.FilterSeriesByOptions(series, fetchOpts)
return readResult{
series: series,
meta: meta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ func (h *PromReadInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
QueryContextOptions: models.QueryContextOptions{
LimitMaxTimeseries: fetchOpts.Limit,
}}
if restrictOpts := fetchOpts.RestrictFetchOptions; restrictOpts != nil {

Copy link
Collaborator Author

@robskillington robskillington Dec 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the result from read(...) not need to be filtered out too for instant queries with prometheus.FilterSeriesByOptions(result.series, fetchOpts)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to a common file.

restrictOpts := fetchOpts.RestrictQueryOptions.GetRestrictByType()
if restrictOpts != nil {
restrict := &models.RestrictFetchTypeQueryContextOptions{
MetricsType: uint(restrictOpts.MetricsType),
StoragePolicy: restrictOpts.StoragePolicy,
Expand Down
4 changes: 4 additions & 0 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ func (h *PromReadHandler) read(
mu.Lock()
meta = meta.CombineMetadata(result.Metadata)
mu.Unlock()
result.SeriesList = prometheus.FilterSeriesByOptions(
result.SeriesList,
fetchOpts,
)
promRes := storage.FetchResultToPromResult(result, h.keepEmpty)
promResults[i] = promRes
}()
Expand Down
2 changes: 0 additions & 2 deletions src/query/api/v1/handler/prometheus/validator/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package validator

import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -335,7 +334,6 @@ func TestValidateEndpoint(t *testing.T) {
recorder := httptest.NewRecorder()
debugHandler.ServeHTTP(recorder, req)

fmt.Println(recorder.Body.String())
var mismatches MismatchesJSON
require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &mismatches))
assert.False(t, mismatches.Correct)
Expand Down
Loading