Skip to content

Commit

Permalink
[query] Graphite query timeout propagation and add per endpoint statu…
Browse files Browse the repository at this point in the history
…s/latency metrics (#2880)
  • Loading branch information
robskillington committed Nov 20, 2020
1 parent 3faaacf commit 2bcd6fd
Show file tree
Hide file tree
Showing 44 changed files with 1,404 additions and 694 deletions.
16 changes: 12 additions & 4 deletions src/query/api/v1/handler/database/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
clusterclient "github.com/m3db/m3/src/cluster/client"
dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config"
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/util/queryhttp"
"github.com/m3db/m3/src/x/instrument"
)

Expand All @@ -39,7 +39,7 @@ type Handler struct {

// RegisterRoutes registers the namespace routes
func RegisterRoutes(
addRoute handler.AddRouteFn,
r *queryhttp.EndpointRegistry,
client clusterclient.Client,
cfg config.Configuration,
embeddedDbCfg *dbconfig.DBConfiguration,
Expand All @@ -54,10 +54,18 @@ func RegisterRoutes(

// Register the same handler under two different endpoints. This just makes explaining things in
// our documentation easier so we can separate out concepts, but share the underlying code.
if err := addRoute(CreateURL, createHandler, CreateHTTPMethod); err != nil {
if err := r.Register(queryhttp.RegisterOptions{
Path: CreateURL,
Handler: createHandler,
Methods: []string{CreateHTTPMethod},
}); err != nil {
return err
}
if err := addRoute(CreateNamespaceURL, createHandler, CreateNamespaceHTTPMethod); err != nil {
if err := r.Register(queryhttp.RegisterOptions{
Path: CreateNamespaceURL,
Handler: createHandler,
Methods: []string{CreateNamespaceHTTPMethod},
}); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/graphite/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (h *grahiteFindHandler) ServeHTTP(
prefix += "."
}

handleroptions.AddWarningHeaders(w, meta)
handleroptions.AddResponseHeaders(w, meta, opts)
// TODO: Support multiple result types
if err = findResultsJSON(w, prefix, seenMap); err != nil {
logger.Error("unable to print find results", zap.Error(err))
Expand Down
7 changes: 5 additions & 2 deletions src/query/api/v1/handler/graphite/find_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,11 @@ func testFind(t *testing.T, httpMethod string, ex bool, ex2 bool, header string)
// setup storage and handler
store := setupStorage(ctrl, ex, ex2)

builder := handleroptions.NewFetchOptionsBuilder(
handleroptions.FetchOptionsBuilderOptions{})
builder, err := handleroptions.NewFetchOptionsBuilder(
handleroptions.FetchOptionsBuilderOptions{
Timeout: 15 * time.Second,
})
require.NoError(t, err)
opts := options.EmptyHandlerOptions().
SetFetchOptionsBuilder(builder).
SetStorage(store)
Expand Down
6 changes: 4 additions & 2 deletions src/query/api/v1/handler/graphite/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
// A renderHandler implements the graphite /render endpoint, including full
// support for executing functions. It only works against data in M3.
type renderHandler struct {
opts options.HandlerOptions
engine *native.Engine
queryContextOpts models.QueryContextOptions
graphiteOpts graphite.M3WrappedStorageOptions
Expand All @@ -70,6 +71,7 @@ func NewRenderHandler(opts options.HandlerOptions) http.Handler {
wrappedStore := graphite.NewM3WrappedStorage(opts.Storage(),
opts.M3DBOptions(), opts.InstrumentOpts(), opts.GraphiteStorageOptions())
return &renderHandler{
opts: opts,
engine: native.NewEngine(wrappedStore),
queryContextOpts: opts.QueryContextOptions(),
graphiteOpts: opts.GraphiteStorageOptions(),
Expand All @@ -95,7 +97,7 @@ func (h *renderHandler) serveHTTP(
r *http.Request,
) error {
reqCtx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)
p, err := ParseRenderRequest(r)
p, fetchOpts, err := ParseRenderRequest(r, h.opts)
if err != nil {
return xhttp.NewError(err, http.StatusBadRequest)
}
Expand Down Expand Up @@ -211,7 +213,7 @@ func (h *renderHandler) serveHTTP(
SortApplied: true,
}

handleroptions.AddWarningHeaders(w, meta)
handleroptions.AddResponseHeaders(w, meta, fetchOpts)

return WriteRenderResponse(w, response, p.Format, renderResultsJSONOptions{
renderSeriesAllNaNs: h.graphiteOpts.RenderSeriesAllNaNs,
Expand Down
60 changes: 27 additions & 33 deletions src/query/api/v1/handler/graphite/render_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import (
"time"

"github.com/m3db/m3/src/query/api/v1/handler/graphite/pickle"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/graphite/ts"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/json"
xhttp "github.com/m3db/m3/src/x/net/http"
)
Expand Down Expand Up @@ -68,7 +70,6 @@ func WriteRenderResponse(
const (
tzOffsetForAbsoluteTime = time.Duration(0)
maxTimeout = time.Minute
defaultTimeout = time.Second * 5
)

// RenderRequest are the arguments to a render call.
Expand All @@ -83,21 +84,28 @@ type RenderRequest struct {
}

// ParseRenderRequest parses the arguments to a render call from an incoming request.
func ParseRenderRequest(r *http.Request) (RenderRequest, error) {
var (
p RenderRequest
err error
now = time.Now()
)
func ParseRenderRequest(
r *http.Request,
opts options.HandlerOptions,
) (RenderRequest, *storage.FetchOptions, error) {
fetchOpts, err := opts.FetchOptionsBuilder().NewFetchOptions(r)
if err != nil {
return RenderRequest{}, nil, err
}

if err = r.ParseForm(); err != nil {
return p, err
if err := r.ParseForm(); err != nil {
return RenderRequest{}, nil, err
}

var (
p = RenderRequest{
Timeout: fetchOpts.Timeout,
}
now = time.Now()
)
p.Targets = r.Form["target"]

if len(p.Targets) == 0 {
return p, errNoTarget
return p, nil, errNoTarget
}

fromString, untilString := r.FormValue("from"), r.FormValue("until")
Expand All @@ -114,19 +122,19 @@ func ParseRenderRequest(r *http.Request) (RenderRequest, error) {
now,
tzOffsetForAbsoluteTime,
); err != nil {
return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'from': %s", fromString))
return p, nil, errors.NewInvalidParamsError(fmt.Errorf("invalid 'from': %s", fromString))
}

if p.Until, err = graphite.ParseTime(
untilString,
now,
tzOffsetForAbsoluteTime,
); err != nil {
return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'until': %s", untilString))
return p, nil, errors.NewInvalidParamsError(fmt.Errorf("invalid 'until': %s", untilString))
}

if !p.From.Before(p.Until) {
return p, errFromNotBeforeUntil
return p, nil, errFromNotBeforeUntil
}

// If this is a real-time query, and the query range is large enough, we shift the query
Expand All @@ -147,7 +155,7 @@ func ParseRenderRequest(r *http.Request) (RenderRequest, error) {
dur, err := graphite.ParseDuration(offset)
if err != nil {
err = errors.NewInvalidParamsError(err)
return p, errors.NewRenamedError(err, fmt.Errorf("invalid 'offset': %s", err))
return p, nil, errors.NewRenamedError(err, fmt.Errorf("invalid 'offset': %s", err))
}

p.Until = p.Until.Add(dur)
Expand All @@ -159,7 +167,7 @@ func ParseRenderRequest(r *http.Request) (RenderRequest, error) {
p.MaxDataPoints, err = strconv.ParseInt(maxDataPointsString, 10, 64)

if err != nil || p.MaxDataPoints < 1 {
return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'maxDataPoints': %s", maxDataPointsString))
return p, nil, errors.NewInvalidParamsError(fmt.Errorf("invalid 'maxDataPoints': %s", maxDataPointsString))
}
} else {
p.MaxDataPoints = math.MaxInt64
Expand All @@ -172,28 +180,14 @@ func ParseRenderRequest(r *http.Request) (RenderRequest, error) {
p.From,
tzOffsetForAbsoluteTime,
); err != nil && len(compareString) != 0 {
return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'compare': %s", compareString))
return p, nil, errors.NewInvalidParamsError(fmt.Errorf("invalid 'compare': %s", compareString))
} else if p.From.Before(compareFrom) {
return p, errors.NewInvalidParamsError(fmt.Errorf("'compare' must be in the past"))
return p, nil, errors.NewInvalidParamsError(fmt.Errorf("'compare' must be in the past"))
} else {
p.Compare = compareFrom.Sub(p.From)
}

timeout := r.FormValue("timeout")
if timeout != "" {
duration, err := time.ParseDuration(timeout)
if err != nil {
return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'timeout': %v", err))
}
if duration > maxTimeout {
return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'timeout': greater than %v", maxTimeout))
}
p.Timeout = duration
} else {
p.Timeout = defaultTimeout
}

return p, nil
return p, fetchOpts, nil
}

type renderResultsJSONOptions struct {
Expand Down
43 changes: 22 additions & 21 deletions src/query/api/v1/handler/graphite/render_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ package graphite
import (
"fmt"
"io/ioutil"
"math"
"net/http"
"net/http/httptest"
"math"
"testing"
"time"

"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/graphite/graphite"
Expand All @@ -45,6 +46,18 @@ import (
"github.com/stretchr/testify/require"
)

func testHandlerOptions(t *testing.T) options.HandlerOptions {
fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder(
handleroptions.FetchOptionsBuilderOptions{
Timeout: 15 * time.Second,
})
require.NoError(t, err)

return options.EmptyHandlerOptions().
SetQueryContextOptions(models.QueryContextOptions{}).
SetFetchOptionsBuilder(fetchOptsBuilder)
}

func makeBlockResult(
ctrl *gomock.Controller,
results *storage.FetchResult,
Expand Down Expand Up @@ -78,9 +91,7 @@ func makeBlockResult(
func TestParseNoQuery(t *testing.T) {
mockStorage := mock.NewMockStorage()

opts := options.EmptyHandlerOptions().
SetStorage(mockStorage).
SetQueryContextOptions(models.QueryContextOptions{})
opts := testHandlerOptions(t).SetStorage(mockStorage)
handler := NewRenderHandler(opts)

recorder := httptest.NewRecorder()
Expand All @@ -99,9 +110,7 @@ func TestParseQueryNoResults(t *testing.T) {
store.EXPECT().FetchBlocks(gomock.Any(), gomock.Any(), gomock.Any()).
Return(blockResult, nil)

opts := options.EmptyHandlerOptions().
SetStorage(store).
SetQueryContextOptions(models.QueryContextOptions{})
opts := testHandlerOptions(t).SetStorage(store)
handler := NewRenderHandler(opts)

req := newGraphiteReadHTTPRequest(t)
Expand Down Expand Up @@ -144,9 +153,7 @@ func TestParseQueryResults(t *testing.T) {
store.EXPECT().FetchBlocks(gomock.Any(), gomock.Any(), gomock.Any()).
Return(blockResult, nil)

opts := options.EmptyHandlerOptions().
SetStorage(store).
SetQueryContextOptions(models.QueryContextOptions{})
opts := testHandlerOptions(t).SetStorage(store)
handler := NewRenderHandler(opts)

req := newGraphiteReadHTTPRequest(t)
Expand Down Expand Up @@ -198,9 +205,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) {
store.EXPECT().FetchBlocks(gomock.Any(), gomock.Any(), gomock.Any()).
Return(blockResult, nil)

opts := options.EmptyHandlerOptions().
SetStorage(store).
SetQueryContextOptions(models.QueryContextOptions{})
opts := testHandlerOptions(t).SetStorage(store)
handler := NewRenderHandler(opts)

req := newGraphiteReadHTTPRequest(t)
Expand Down Expand Up @@ -254,9 +259,7 @@ func TestParseQueryResultsMultiTarget(t *testing.T) {
store.EXPECT().FetchBlocks(gomock.Any(), gomock.Any(), gomock.Any()).
Return(makeBlockResult(ctrl, fr), nil)

opts := options.EmptyHandlerOptions().
SetStorage(store).
SetQueryContextOptions(models.QueryContextOptions{})
opts := testHandlerOptions(t).SetStorage(store)
handler := NewRenderHandler(opts)

req := newGraphiteReadHTTPRequest(t)
Expand Down Expand Up @@ -317,9 +320,7 @@ func TestParseQueryResultsMultiTargetWithLimits(t *testing.T) {
store.EXPECT().FetchBlocks(gomock.Any(), gomock.Any(), gomock.Any()).
Return(makeBlockResult(ctrl, frTwo), nil)

opts := options.EmptyHandlerOptions().
SetStorage(store).
SetQueryContextOptions(models.QueryContextOptions{})
opts := testHandlerOptions(t).SetStorage(store)
handler := NewRenderHandler(opts)

req := newGraphiteReadHTTPRequest(t)
Expand Down Expand Up @@ -364,9 +365,9 @@ func TestParseQueryResultsAllNaN(t *testing.T) {
graphiteStorageOpts := graphiteStorage.M3WrappedStorageOptions{
RenderSeriesAllNaNs: true,
}
opts := options.EmptyHandlerOptions().
opts := testHandlerOptions(t).
SetStorage(store).
SetQueryContextOptions(models.QueryContextOptions{}).SetGraphiteStorageOptions(graphiteStorageOpts)
SetGraphiteStorageOptions(graphiteStorageOpts)
handler := NewRenderHandler(opts)

req := newGraphiteReadHTTPRequest(t)
Expand Down
Loading

0 comments on commit 2bcd6fd

Please sign in to comment.