Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Add resolution exceeds query range warning #2429

Merged
merged 16 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions scripts/docker-integration-tests/query_fanout/warning.sh
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,54 @@ function test_fanout_warning_graphite {
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff find_carbon 9 max_fetch_series_limit_applied
}

function verify_range {
RANGE=$1
PORT=$2
EXPECTED=$3
start=$(( $t - 3000 ))
end=$(( $t + 3000 ))
qs="query=sum_over_time($METRIC_NAME[$RANGE])&start=$start&end=$end&step=1s"
query="http://0.0.0.0:$PORT/prometheus/api/v1/query_range?$qs"
curl -sSLg -D $HEADER_FILE "$query" > /dev/null
warn=$(cat $HEADER_FILE | grep M3-Results-Limited | sed 's/M3-Results-Limited: //g')
warn=$(echo $warn | sed 's/ /_/g')
test $warn=$EXPECTED
}

function test_fanout_warning_range {
t=$(date +%s)
METRIC_NAME="quart_$t"
curl -X POST 0.0.0.0:9003/writetagged -d '{
"namespace": "agg",
"id": "{__name__=\"'$METRIC_NAME'\",cluster=\"coordinator-cluster-a\",val=\"1\"}",
"tags": [
{
"name": "__name__",
"value": "'$METRIC_NAME'"
},
{
"name": "cluster",
"value": "coordinator-cluster-a"
},
{
"name": "val",
"value": "1"
}
],
"datapoint": {
"timestamp":'"$t"',
"value": 1
}
}'


ATTEMPTS=3 TIMEOUT=1 retry_with_backoff verify_range 1s 7201 resolution_larger_than_query_range_range:_1s,_resolutions:_5s
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff verify_range 1s 17201 resolution_larger_than_query_range_range:_1s,_resolutions:_5s

ATTEMPTS=3 TIMEOUT=1 retry_with_backoff verify_range 10s 7201
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff verify_range 10s 17201
}

function test_fanout_warning_missing_zone {
docker-compose -f ${COMPOSE_FILE} stop coordinator-cluster-c

Expand Down Expand Up @@ -390,5 +438,6 @@ function test_fanout_warnings {
test_fanout_warning_fetch_id_mismatch
export GRAPHITE="foo.bar.$t"
test_fanout_warning_graphite
test_fanout_warning_range
test_fanout_warning_missing_zone
}
10 changes: 5 additions & 5 deletions src/query/api/v1/handler/graphite/render_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestParseQueryResults(t *testing.T) {
}

meta := block.NewResultMetadata()
meta.Resolutions = []int64{int64(resolution)}
meta.Resolutions = []time.Duration{resolution}
fr := &storage.FetchResult{
SeriesList: seriesList,
Metadata: meta,
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) {
}

meta := block.NewResultMetadata()
meta.Resolutions = []int64{int64(resolution)}
meta.Resolutions = []time.Duration{resolution}
fr := &storage.FetchResult{
SeriesList: seriesList,
Metadata: meta,
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestParseQueryResultsMultiTarget(t *testing.T) {
}

meta := block.NewResultMetadata()
meta.Resolutions = []int64{int64(resolution)}
meta.Resolutions = []time.Duration{resolution}
fr := &storage.FetchResult{
SeriesList: seriesList,
Metadata: meta,
Expand Down Expand Up @@ -301,12 +301,12 @@ func TestParseQueryResultsMultiTargetWithLimits(t *testing.T) {
}

meta := block.NewResultMetadata()
meta.Resolutions = []int64{int64(resolution)}
meta.Resolutions = []time.Duration{resolution}
meta.Exhaustive = tt.ex
frOne := &storage.FetchResult{SeriesList: seriesList, Metadata: meta}

metaTwo := block.NewResultMetadata()
metaTwo.Resolutions = []int64{int64(resolution)}
metaTwo.Resolutions = []time.Duration{resolution}
if !tt.ex2 {
metaTwo.AddWarning("foo", "bar")
}
Expand Down
21 changes: 21 additions & 0 deletions src/query/api/v1/handler/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"time"

"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/storage/prometheus"

"github.com/prometheus/prometheus/promql"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Separate internal/external imports?

"github.com/prometheus/prometheus/promql/parser"
)

// NB: since Prometheus engine is not brought up in the usual fashion,
Expand Down Expand Up @@ -60,3 +62,22 @@ func NewReadInstantHandler(opts Options, hOpts options.HandlerOptions) http.Hand
})
return newReadInstantHandler(opts, hOpts, queryable)
}

func applyRangeWarnings(
query string, meta *block.ResultMetadata,
) error {
expr, err := parser.ParseExpr(query)
if err != nil {
return err
}

parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
if n, ok := node.(*parser.MatrixSelector); ok {
meta.VerifyTemporalRange(n.Range)
}

return nil
})

return nil
}
14 changes: 11 additions & 3 deletions src/query/api/v1/handler/prom/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,29 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
request.Params.End,
request.Params.Step)
if err != nil {
h.logger.Error("error creating range query", zap.Error(err), zap.String("query", request.Params.Query))
h.logger.Error("error creating range query",
zap.Error(err), zap.String("query", request.Params.Query))
respondError(w, err, http.StatusInternalServerError)
return
}
defer qry.Close()

res := qry.Exec(ctx)
if res.Err != nil {
h.logger.Error("error executing range query", zap.Error(res.Err), zap.String("query", request.Params.Query))
h.logger.Error("error executing range query",
zap.Error(res.Err), zap.String("query", request.Params.Query))
respondError(w, res.Err, http.StatusInternalServerError)
return
}

handleroptions.AddWarningHeaders(w, resultMetadata)
query := request.Params.Query
err = applyRangeWarnings(query, &resultMetadata)
if err != nil {
h.logger.Warn("error applying range warnings",
zap.Error(err), zap.String("query", query))
}

handleroptions.AddWarningHeaders(w, resultMetadata)
respond(w, &queryData{
Result: res.Value,
ResultType: res.Value.Type(),
Expand Down
11 changes: 9 additions & 2 deletions src/query/api/v1/handler/prom/read_instant.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,26 @@ func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
query,
ts)
if err != nil {
h.logger.Error("error creating instant query", zap.Error(err), zap.String("query", query))
h.logger.Error("error creating instant query",
zap.Error(err), zap.String("query", query))
respondError(w, err, http.StatusInternalServerError)
return
}
defer qry.Close()

res := qry.Exec(ctx)
if res.Err != nil {
h.logger.Error("error executing instant query", zap.Error(res.Err), zap.String("query", query))
h.logger.Error("error executing instant query",
zap.Error(res.Err), zap.String("query", query))
respondError(w, res.Err, http.StatusInternalServerError)
return
}

err = applyRangeWarnings(query, &resultMetadata)
if err != nil {
h.logger.Warn("error applying range warnings",
zap.Error(err), zap.String("query", query))
}
handleroptions.AddWarningHeaders(w, resultMetadata)

respond(w, &queryData{
Expand Down
34 changes: 31 additions & 3 deletions src/query/block/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ package block

import (
"fmt"
"sort"
"strings"
"time"

"github.com/m3db/m3/src/query/models"
)
Expand Down Expand Up @@ -64,7 +67,7 @@ type ResultMetadata struct {
// incomplete results.
Warnings Warnings
// Resolutions is a list of resolutions for series obtained by this query.
Resolutions []int64
Resolutions []time.Duration
}

// NewResultMetadata creates a new result metadata.
Expand All @@ -75,7 +78,7 @@ func NewResultMetadata() ResultMetadata {
}
}

func combineResolutions(a, b []int64) []int64 {
func combineResolutions(a, b []time.Duration) []time.Duration {
if len(a) == 0 {
if len(b) != 0 {
return b
Expand All @@ -85,7 +88,7 @@ func combineResolutions(a, b []int64) []int64 {
return a
}

combined := make([]int64, 0, len(a)+len(b))
combined := make([]time.Duration, 0, len(a)+len(b))
combined = append(combined, a...)
combined = append(combined, b...)
return combined
Expand Down Expand Up @@ -158,6 +161,31 @@ func (m ResultMetadata) IsDefault() bool {
return m.Exhaustive && m.LocalOnly && len(m.Warnings) == 0
}

// VerifyTemporalRange will verify that each resolution seen is below the
// given step size, adding warning headers if it is not.
func (m *ResultMetadata) VerifyTemporalRange(step time.Duration) {
// NB: this map is unlikely to have more than 2 elements in real execution,
// since these correspond to namespace count.
invalidResolutions := make(map[time.Duration]struct{}, 10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a silly question, but why not use make(map[time.Duration]struct{}, 0, 10) here so that you can append below? This way you won't have to worry about indexing out of bounds (even if it is unlikely)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a map not a slice :p

Realistically not expecting this to have more than 2 or at most 3 different values, which is why I'm only initing to 10 here 👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, missed that! My bad. SGTM

for _, res := range m.Resolutions {
if res > step {
invalidResolutions[res] = struct{}{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this map and also the sorting? Probably simpler to just compile []string warnings in this loop, and after if len(warnings) > 0 then m.AddWarning

}
}

if len(invalidResolutions) > 0 {
warnings := make([]string, 0, len(invalidResolutions))
for k := range invalidResolutions {
warnings = append(warnings, fmt.Sprintf("%v", time.Duration(k)))
}

sort.Strings(warnings)
warning := fmt.Sprintf("range: %v, resolutions: %s",
step, strings.Join(warnings, ", "))
m.AddWarning("resolution larger than query range", warning)
}
}

// AddWarning adds a warning to the result metadata.
// NB: warnings are expected to be small in general, so it's better to iterate
// over the array rather than introduce a map.
Expand Down
34 changes: 31 additions & 3 deletions src/query/block/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package block

import (
"testing"
"time"

"github.com/m3db/m3/src/query/models"

Expand Down Expand Up @@ -104,7 +105,7 @@ func TestMergeIntoEmptyWarnings(t *testing.T) {
}

func TestMergeResolutions(t *testing.T) {
expected := []int64{1, 2, 3}
expected := []time.Duration{1, 2, 3}
r := ResultMetadata{}
rTwo := ResultMetadata{}
merge := r.CombineMetadata(rTwo)
Expand All @@ -127,10 +128,37 @@ func TestMergeResolutions(t *testing.T) {
require.Equal(t, 3, len(merge.Resolutions))
assert.Equal(t, expected, merge.Resolutions)

rTwo = ResultMetadata{Resolutions: []int64{4, 5, 6}}
rTwo = ResultMetadata{Resolutions: []time.Duration{4, 5, 6}}
merge = r.CombineMetadata(rTwo)
assert.Equal(t, 3, len(r.Resolutions))
assert.Equal(t, 3, len(rTwo.Resolutions))
require.Equal(t, 6, len(merge.Resolutions))
assert.Equal(t, []int64{1, 2, 3, 4, 5, 6}, merge.Resolutions)
assert.Equal(t, []time.Duration{1, 2, 3, 4, 5, 6}, merge.Resolutions)
}

func TestVerifyTemporalRange(t *testing.T) {
r := ResultMetadata{
Exhaustive: true,
Resolutions: []time.Duration{5, 10},
}

ex0 := "resolution larger than query range_range: 1ns, resolutions: 10ns, 5ns"
ex1 := "resolution larger than query range_range: 6ns, resolutions: 10ns"

r.VerifyTemporalRange(11)
assert.Equal(t, 0, len(r.WarningStrings()))

r.VerifyTemporalRange(1)
require.Equal(t, 1, len(r.WarningStrings()))
assert.Equal(t, ex0, r.WarningStrings()[0])

r.VerifyTemporalRange(6)
require.Equal(t, 2, len(r.WarningStrings()))
assert.Equal(t, ex0, r.WarningStrings()[0])
assert.Equal(t, ex1, r.WarningStrings()[1])

r.VerifyTemporalRange(11)
require.Equal(t, 2, len(r.WarningStrings()))
assert.Equal(t, ex0, r.WarningStrings()[0])
assert.Equal(t, ex1, r.WarningStrings()[1])
}
13 changes: 13 additions & 0 deletions src/query/functions/temporal/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ var aggregationTestCases = []testCase{
{5, 5.5, 6, 6.5, 7, 7, 7, 7, 7, 7},
},
},
{
name: "avg_over_time with warning",
opType: AvgType,
vals: [][]float64{
{nan, 1, 2, 3, 4, 0, 1, 2, 3, 4},
{5, 6, 7, 8, 9, 5, 6, 7, 8, 9},
},
expected: [][]float64{
{nan, 1, 1.5, 2, 2.5, 2, 2, 2, 2, 2},
{5, 5.5, 6, 6.5, 7, 7, 7, 7, 7, 7},
},
withWarning: true,
},
{
name: "avg_over_time all NaNs",
opType: AvgType,
Expand Down
7 changes: 7 additions & 0 deletions src/query/functions/temporal/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (c *baseNode) Process(
sp, ctx := opentracing.StartSpanFromContext(queryCtx.Ctx, c.op.OpType())
defer sp.Finish()

resultMeta := b.Meta().ResultMetadata
resultMeta.VerifyTemporalRange(c.op.duration)

meta := b.Meta()
bounds := meta.Bounds
if bounds.Duration == 0 {
Expand All @@ -129,6 +132,7 @@ func (c *baseNode) Process(
aggDuration: xtime.UnixNano(c.op.duration),
stepSize: xtime.UnixNano(bounds.StepSize),
steps: bounds.Steps(),
resultMeta: resultMeta,
}

concurrency := runtime.NumCPU()
Expand Down Expand Up @@ -162,6 +166,7 @@ type blockMeta struct {
stepSize xtime.UnixNano
queryCtx *models.QueryContext
steps int
resultMeta block.ResultMetadata
}

func (c *baseNode) batchProcess(
Expand All @@ -178,6 +183,7 @@ func (c *baseNode) batchProcess(
)

meta := b.Meta()
meta.ResultMetadata = m.resultMeta
builder, err := c.controller.BlockBuilder(m.queryCtx, meta, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -347,6 +353,7 @@ func (c *baseNode) singleProcess(
}

meta := b.Meta()
meta.ResultMetadata = m.resultMeta
builder, err := c.controller.BlockBuilder(m.queryCtx, meta, resultSeriesMeta)
if err != nil {
return nil, err
Expand Down
Loading