Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Add resolution exceeds query range warning #2429

Merged
merged 16 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 52 additions & 3 deletions scripts/docker-integration-tests/query_fanout/warning.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ function test_range_query {
}

function test_search {
start=$(date -d "$(date +%Y-%m-%dT%H:%M:%SZ) -1 minute" +%Y-%m-%dT%H:%M:%SZ)
end=$(date -d "$(date +%Y-%m-%dT%H:%M:%SZ) +1 minute" +%Y-%m-%dT%H:%M:%SZ)
d=$(date +%s)
start=$(( $d - 60 ))
end=$(( $d + 60 ))

curl -D headers -X POST 0.0.0.0:7201/search -d '{
"start": "'$start'",
Expand Down Expand Up @@ -345,6 +346,54 @@ function test_fanout_warning_graphite {
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff find_carbon 9 max_fetch_series_limit_applied
}

function verify_range {
RANGE=$1
PORT=$2
EXPECTED=$3
start=$(( $t - 3000 ))
end=$(( $t + 3000 ))
qs="query=sum_over_time($METRIC_NAME[$RANGE])&start=$start&end=$end&step=1s"
query="http://0.0.0.0:$PORT/prometheus/api/v1/query_range?$qs"
curl -sSLg -D $HEADER_FILE "$query" > /dev/null
warn=$(cat $HEADER_FILE | grep M3-Results-Limited | sed 's/M3-Results-Limited: //g')
warn=$(echo $warn | sed 's/ /_/g')
test $warn=$EXPECTED
}

function test_fanout_warning_range {
t=$(date +%s)
METRIC_NAME="quart_$t"
curl -X POST 0.0.0.0:9003/writetagged -d '{
"namespace": "agg",
"id": "{__name__=\"'$METRIC_NAME'\",cluster=\"coordinator-cluster-a\",val=\"1\"}",
"tags": [
{
"name": "__name__",
"value": "'$METRIC_NAME'"
},
{
"name": "cluster",
"value": "coordinator-cluster-a"
},
{
"name": "val",
"value": "1"
}
],
"datapoint": {
"timestamp":'"$t"',
"value": 1
}
}'


ATTEMPTS=3 TIMEOUT=1 retry_with_backoff verify_range 1s 7201 resolution_larger_than_query_range_range:_1s,_resolutions:_5s
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff verify_range 1s 17201 resolution_larger_than_query_range_range:_1s,_resolutions:_5s

ATTEMPTS=3 TIMEOUT=1 retry_with_backoff verify_range 10s 7201
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff verify_range 10s 17201
}

function test_fanout_warning_missing_zone {
docker-compose -f ${COMPOSE_FILE} stop coordinator-cluster-c

Expand Down Expand Up @@ -389,6 +438,6 @@ function test_fanout_warnings {
test_fanout_warning_fetch_id_mismatch
export GRAPHITE="foo.bar.$t"
test_fanout_warning_graphite
test_fanout_warning_range
test_fanout_warning_missing_zone
}

21 changes: 21 additions & 0 deletions src/query/api/v1/handler/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"net/http"

"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/storage/prometheus"
"github.com/prometheus/prometheus/promql"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Separate internal/external imports?

"github.com/prometheus/prometheus/promql/parser"
)

// Options defines options for PromQL handler.
Expand All @@ -52,3 +54,22 @@ func NewReadInstantHandler(opts Options, hOpts options.HandlerOptions) http.Hand
})
return newReadInstantHandler(opts, hOpts, queryable)
}

func applyRangeWarnings(
query string, meta *block.ResultMetadata,
) error {
expr, err := parser.ParseExpr(query)
if err != nil {
return err
}

parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
if n, ok := node.(*parser.MatrixSelector); ok {
meta.VerifyTemporalRange(n.Range)
}

return nil
})

return nil
}
14 changes: 11 additions & 3 deletions src/query/api/v1/handler/prom/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,29 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
request.Params.End,
request.Params.Step)
if err != nil {
h.logger.Error("error creating range query", zap.Error(err), zap.String("query", request.Params.Query))
h.logger.Error("error creating range query",
zap.Error(err), zap.String("query", request.Params.Query))
respondError(w, err, http.StatusInternalServerError)
return
}
defer qry.Close()

res := qry.Exec(ctx)
if res.Err != nil {
h.logger.Error("error executing range query", zap.Error(res.Err), zap.String("query", request.Params.Query))
h.logger.Error("error executing range query",
zap.Error(res.Err), zap.String("query", request.Params.Query))
respondError(w, res.Err, http.StatusInternalServerError)
return
}

handleroptions.AddWarningHeaders(w, resultMetadata)
query := request.Params.Query
err = applyRangeWarnings(query, &resultMetadata)
if err != nil {
h.logger.Warn("error applying range warnings",
zap.Error(err), zap.String("query", query))
}

handleroptions.AddWarningHeaders(w, resultMetadata)
respond(w, &queryData{
Result: res.Value,
ResultType: res.Value.Type(),
Expand Down
11 changes: 9 additions & 2 deletions src/query/api/v1/handler/prom/read_instant.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,26 @@ func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
query,
ts)
if err != nil {
h.logger.Error("error creating instant query", zap.Error(err), zap.String("query", query))
h.logger.Error("error creating instant query",
zap.Error(err), zap.String("query", query))
respondError(w, err, http.StatusInternalServerError)
return
}
defer qry.Close()

res := qry.Exec(ctx)
if res.Err != nil {
h.logger.Error("error executing instant query", zap.Error(res.Err), zap.String("query", query))
h.logger.Error("error executing instant query",
zap.Error(res.Err), zap.String("query", query))
respondError(w, res.Err, http.StatusInternalServerError)
return
}

err = applyRangeWarnings(query, &resultMetadata)
if err != nil {
h.logger.Warn("error applying range warnings",
zap.Error(err), zap.String("query", query))
}
handleroptions.AddWarningHeaders(w, resultMetadata)

respond(w, &queryData{
Expand Down
34 changes: 34 additions & 0 deletions src/query/block/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ package block

import (
"fmt"
"sort"
"strings"
"time"

"github.com/m3db/m3/src/query/models"
)
Expand Down Expand Up @@ -158,6 +161,37 @@ func (m ResultMetadata) IsDefault() bool {
return m.Exhaustive && m.LocalOnly && len(m.Warnings) == 0
}

// VerifyTemporalRange will verify that each resolution seen is below the
// given step size, adding warning headers if it is not.
func (m *ResultMetadata) VerifyTemporalRange(step time.Duration) {
fmt.Println("Verifying with step size", step, "as int", int64(step), "Resos", m.Resolutions)
stepSize := int64(step)
// NB: this map is unlikely to have more than 2 elements in real execution,
// since these correspond to namespace count.
invalidResolutions := make(map[int64]struct{}, 10)
for _, res := range m.Resolutions {
fmt.Println("here res is", res)
fmt.Println("step res is", stepSize)
fmt.Println("res > stepSize", res > stepSize)
if res > stepSize {
invalidResolutions[res] = struct{}{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this map and also the sorting? Probably simpler to just compile []string warnings in this loop, and after if len(warnings) > 0 then m.AddWarning

}
}

fmt.Println("invalid", invalidResolutions)
robskillington marked this conversation as resolved.
Show resolved Hide resolved
if len(invalidResolutions) > 0 {
warnings := make([]string, 0, len(invalidResolutions))
for k := range invalidResolutions {
warnings = append(warnings, fmt.Sprintf("%v", time.Duration(k)))
}

sort.Strings(warnings)
warning := fmt.Sprintf("range: %v, resolutions: %s",
step, strings.Join(warnings, ", "))
m.AddWarning("resolution larger than query range", warning)
}
}

// AddWarning adds a warning to the result metadata.
// NB: warnings are expected to be small in general, so it's better to iterate
// over the array rather than introduce a map.
Expand Down
27 changes: 27 additions & 0 deletions src/query/block/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,30 @@ func TestMergeResolutions(t *testing.T) {
require.Equal(t, 6, len(merge.Resolutions))
assert.Equal(t, []int64{1, 2, 3, 4, 5, 6}, merge.Resolutions)
}

func TestVerifyTemporalRange(t *testing.T) {
r := ResultMetadata{
Exhaustive: true,
Resolutions: []int64{5, 10},
}

ex0 := "resolution larger than query range_range: 1ns, resolutions: 10ns, 5ns"
ex1 := "resolution larger than query range_range: 6ns, resolutions: 10ns"

r.VerifyTemporalRange(11)
assert.Equal(t, 0, len(r.WarningStrings()))

r.VerifyTemporalRange(1)
require.Equal(t, 1, len(r.WarningStrings()))
assert.Equal(t, ex0, r.WarningStrings()[0])

r.VerifyTemporalRange(6)
require.Equal(t, 2, len(r.WarningStrings()))
assert.Equal(t, ex0, r.WarningStrings()[0])
assert.Equal(t, ex1, r.WarningStrings()[1])

r.VerifyTemporalRange(11)
require.Equal(t, 2, len(r.WarningStrings()))
assert.Equal(t, ex0, r.WarningStrings()[0])
assert.Equal(t, ex1, r.WarningStrings()[1])
}
13 changes: 13 additions & 0 deletions src/query/functions/temporal/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ var aggregationTestCases = []testCase{
{5, 5.5, 6, 6.5, 7, 7, 7, 7, 7, 7},
},
},
{
name: "avg_over_time with warning",
opType: AvgType,
vals: [][]float64{
{nan, 1, 2, 3, 4, 0, 1, 2, 3, 4},
{5, 6, 7, 8, 9, 5, 6, 7, 8, 9},
},
expected: [][]float64{
{nan, 1, 1.5, 2, 2.5, 2, 2, 2, 2, 2},
{5, 5.5, 6, 6.5, 7, 7, 7, 7, 7, 7},
},
withWarning: true,
},
{
name: "avg_over_time all NaNs",
opType: AvgType,
Expand Down
7 changes: 7 additions & 0 deletions src/query/functions/temporal/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (c *baseNode) Process(
sp, ctx := opentracing.StartSpanFromContext(queryCtx.Ctx, c.op.OpType())
defer sp.Finish()

resultMeta := b.Meta().ResultMetadata
resultMeta.VerifyTemporalRange(c.op.duration)

meta := b.Meta()
bounds := meta.Bounds
if bounds.Duration == 0 {
Expand All @@ -129,6 +132,7 @@ func (c *baseNode) Process(
aggDuration: xtime.UnixNano(c.op.duration),
stepSize: xtime.UnixNano(bounds.StepSize),
steps: bounds.Steps(),
resultMeta: resultMeta,
}

concurrency := runtime.NumCPU()
Expand Down Expand Up @@ -162,6 +166,7 @@ type blockMeta struct {
stepSize xtime.UnixNano
queryCtx *models.QueryContext
steps int
resultMeta block.ResultMetadata
}

func (c *baseNode) batchProcess(
Expand All @@ -178,6 +183,7 @@ func (c *baseNode) batchProcess(
)

meta := b.Meta()
meta.ResultMetadata = m.resultMeta
builder, err := c.controller.BlockBuilder(m.queryCtx, meta, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -347,6 +353,7 @@ func (c *baseNode) singleProcess(
}

meta := b.Meta()
meta.ResultMetadata = m.resultMeta
builder, err := c.controller.BlockBuilder(m.queryCtx, meta, resultSeriesMeta)
if err != nil {
return nil, err
Expand Down
Loading