diff --git a/src/query/api/v1/handler/database/common.go b/src/query/api/v1/handler/database/common.go index 950bed6396..525268e4d8 100644 --- a/src/query/api/v1/handler/database/common.go +++ b/src/query/api/v1/handler/database/common.go @@ -24,8 +24,8 @@ import ( clusterclient "github.com/m3db/m3/src/cluster/client" dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" - "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" + "github.com/m3db/m3/src/query/util/queryhttp" "github.com/m3db/m3/src/x/instrument" ) @@ -39,7 +39,7 @@ type Handler struct { // RegisterRoutes registers the namespace routes func RegisterRoutes( - addRoute handler.AddRouteFn, + r *queryhttp.EndpointRegistry, client clusterclient.Client, cfg config.Configuration, embeddedDbCfg *dbconfig.DBConfiguration, @@ -54,10 +54,18 @@ func RegisterRoutes( // Register the same handler under two different endpoints. This just makes explaining things in // our documentation easier so we can separate out concepts, but share the underlying code. - if err := addRoute(CreateURL, createHandler, CreateHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: CreateURL, + Handler: createHandler, + Methods: []string{CreateHTTPMethod}, + }); err != nil { return err } - if err := addRoute(CreateNamespaceURL, createHandler, CreateNamespaceHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: CreateNamespaceURL, + Handler: createHandler, + Methods: []string{CreateNamespaceHTTPMethod}, + }); err != nil { return err } diff --git a/src/query/api/v1/handler/graphite/find.go b/src/query/api/v1/handler/graphite/find.go index 81b2aca985..a919031f18 100644 --- a/src/query/api/v1/handler/graphite/find.go +++ b/src/query/api/v1/handler/graphite/find.go @@ -168,7 +168,7 @@ func (h *grahiteFindHandler) ServeHTTP( prefix += "." } - handleroptions.AddWarningHeaders(w, meta) + handleroptions.AddResponseHeaders(w, meta, opts) // TODO: Support multiple result types if err = findResultsJSON(w, prefix, seenMap); err != nil { logger.Error("unable to print find results", zap.Error(err)) diff --git a/src/query/api/v1/handler/graphite/find_test.go b/src/query/api/v1/handler/graphite/find_test.go index f7651ff976..bc352f65a0 100644 --- a/src/query/api/v1/handler/graphite/find_test.go +++ b/src/query/api/v1/handler/graphite/find_test.go @@ -227,8 +227,11 @@ func testFind(t *testing.T, httpMethod string, ex bool, ex2 bool, header string) // setup storage and handler store := setupStorage(ctrl, ex, ex2) - builder := handleroptions.NewFetchOptionsBuilder( - handleroptions.FetchOptionsBuilderOptions{}) + builder, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{ + Timeout: 15 * time.Second, + }) + require.NoError(t, err) opts := options.EmptyHandlerOptions(). SetFetchOptionsBuilder(builder). SetStorage(store) diff --git a/src/query/api/v1/handler/graphite/render.go b/src/query/api/v1/handler/graphite/render.go index 489531f7ad..2ebe317b21 100644 --- a/src/query/api/v1/handler/graphite/render.go +++ b/src/query/api/v1/handler/graphite/render.go @@ -55,6 +55,7 @@ var ( // A renderHandler implements the graphite /render endpoint, including full // support for executing functions. It only works against data in M3. type renderHandler struct { + opts options.HandlerOptions engine *native.Engine queryContextOpts models.QueryContextOptions graphiteOpts graphite.M3WrappedStorageOptions @@ -70,6 +71,7 @@ func NewRenderHandler(opts options.HandlerOptions) http.Handler { wrappedStore := graphite.NewM3WrappedStorage(opts.Storage(), opts.M3DBOptions(), opts.InstrumentOpts(), opts.GraphiteStorageOptions()) return &renderHandler{ + opts: opts, engine: native.NewEngine(wrappedStore), queryContextOpts: opts.QueryContextOptions(), graphiteOpts: opts.GraphiteStorageOptions(), @@ -95,7 +97,7 @@ func (h *renderHandler) serveHTTP( r *http.Request, ) error { reqCtx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) - p, err := ParseRenderRequest(r) + p, fetchOpts, err := ParseRenderRequest(r, h.opts) if err != nil { return xhttp.NewError(err, http.StatusBadRequest) } @@ -211,7 +213,7 @@ func (h *renderHandler) serveHTTP( SortApplied: true, } - handleroptions.AddWarningHeaders(w, meta) + handleroptions.AddResponseHeaders(w, meta, fetchOpts) return WriteRenderResponse(w, response, p.Format, renderResultsJSONOptions{ renderSeriesAllNaNs: h.graphiteOpts.RenderSeriesAllNaNs, diff --git a/src/query/api/v1/handler/graphite/render_parser.go b/src/query/api/v1/handler/graphite/render_parser.go index 2b8124e74c..2659fd4969 100644 --- a/src/query/api/v1/handler/graphite/render_parser.go +++ b/src/query/api/v1/handler/graphite/render_parser.go @@ -29,9 +29,11 @@ import ( "time" "github.com/m3db/m3/src/query/api/v1/handler/graphite/pickle" + "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/graphite/errors" "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/graphite/ts" + "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/json" xhttp "github.com/m3db/m3/src/x/net/http" ) @@ -68,7 +70,6 @@ func WriteRenderResponse( const ( tzOffsetForAbsoluteTime = time.Duration(0) maxTimeout = time.Minute - defaultTimeout = time.Second * 5 ) // RenderRequest are the arguments to a render call. @@ -83,21 +84,28 @@ type RenderRequest struct { } // ParseRenderRequest parses the arguments to a render call from an incoming request. -func ParseRenderRequest(r *http.Request) (RenderRequest, error) { - var ( - p RenderRequest - err error - now = time.Now() - ) +func ParseRenderRequest( + r *http.Request, + opts options.HandlerOptions, +) (RenderRequest, *storage.FetchOptions, error) { + fetchOpts, err := opts.FetchOptionsBuilder().NewFetchOptions(r) + if err != nil { + return RenderRequest{}, nil, err + } - if err = r.ParseForm(); err != nil { - return p, err + if err := r.ParseForm(); err != nil { + return RenderRequest{}, nil, err } + var ( + p = RenderRequest{ + Timeout: fetchOpts.Timeout, + } + now = time.Now() + ) p.Targets = r.Form["target"] - if len(p.Targets) == 0 { - return p, errNoTarget + return p, nil, errNoTarget } fromString, untilString := r.FormValue("from"), r.FormValue("until") @@ -114,7 +122,7 @@ func ParseRenderRequest(r *http.Request) (RenderRequest, error) { now, tzOffsetForAbsoluteTime, ); err != nil { - return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'from': %s", fromString)) + return p, nil, errors.NewInvalidParamsError(fmt.Errorf("invalid 'from': %s", fromString)) } if p.Until, err = graphite.ParseTime( @@ -122,11 +130,11 @@ func ParseRenderRequest(r *http.Request) (RenderRequest, error) { now, tzOffsetForAbsoluteTime, ); err != nil { - return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'until': %s", untilString)) + return p, nil, errors.NewInvalidParamsError(fmt.Errorf("invalid 'until': %s", untilString)) } if !p.From.Before(p.Until) { - return p, errFromNotBeforeUntil + return p, nil, errFromNotBeforeUntil } // If this is a real-time query, and the query range is large enough, we shift the query @@ -147,7 +155,7 @@ func ParseRenderRequest(r *http.Request) (RenderRequest, error) { dur, err := graphite.ParseDuration(offset) if err != nil { err = errors.NewInvalidParamsError(err) - return p, errors.NewRenamedError(err, fmt.Errorf("invalid 'offset': %s", err)) + return p, nil, errors.NewRenamedError(err, fmt.Errorf("invalid 'offset': %s", err)) } p.Until = p.Until.Add(dur) @@ -159,7 +167,7 @@ func ParseRenderRequest(r *http.Request) (RenderRequest, error) { p.MaxDataPoints, err = strconv.ParseInt(maxDataPointsString, 10, 64) if err != nil || p.MaxDataPoints < 1 { - return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'maxDataPoints': %s", maxDataPointsString)) + return p, nil, errors.NewInvalidParamsError(fmt.Errorf("invalid 'maxDataPoints': %s", maxDataPointsString)) } } else { p.MaxDataPoints = math.MaxInt64 @@ -172,28 +180,14 @@ func ParseRenderRequest(r *http.Request) (RenderRequest, error) { p.From, tzOffsetForAbsoluteTime, ); err != nil && len(compareString) != 0 { - return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'compare': %s", compareString)) + return p, nil, errors.NewInvalidParamsError(fmt.Errorf("invalid 'compare': %s", compareString)) } else if p.From.Before(compareFrom) { - return p, errors.NewInvalidParamsError(fmt.Errorf("'compare' must be in the past")) + return p, nil, errors.NewInvalidParamsError(fmt.Errorf("'compare' must be in the past")) } else { p.Compare = compareFrom.Sub(p.From) } - timeout := r.FormValue("timeout") - if timeout != "" { - duration, err := time.ParseDuration(timeout) - if err != nil { - return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'timeout': %v", err)) - } - if duration > maxTimeout { - return p, errors.NewInvalidParamsError(fmt.Errorf("invalid 'timeout': greater than %v", maxTimeout)) - } - p.Timeout = duration - } else { - p.Timeout = defaultTimeout - } - - return p, nil + return p, fetchOpts, nil } type renderResultsJSONOptions struct { diff --git a/src/query/api/v1/handler/graphite/render_test.go b/src/query/api/v1/handler/graphite/render_test.go index eb1ebc2cfa..de764d2539 100644 --- a/src/query/api/v1/handler/graphite/render_test.go +++ b/src/query/api/v1/handler/graphite/render_test.go @@ -23,12 +23,13 @@ package graphite import ( "fmt" "io/ioutil" + "math" "net/http" "net/http/httptest" - "math" "testing" "time" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/graphite/graphite" @@ -45,6 +46,18 @@ import ( "github.com/stretchr/testify/require" ) +func testHandlerOptions(t *testing.T) options.HandlerOptions { + fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{ + Timeout: 15 * time.Second, + }) + require.NoError(t, err) + + return options.EmptyHandlerOptions(). + SetQueryContextOptions(models.QueryContextOptions{}). + SetFetchOptionsBuilder(fetchOptsBuilder) +} + func makeBlockResult( ctrl *gomock.Controller, results *storage.FetchResult, @@ -78,9 +91,7 @@ func makeBlockResult( func TestParseNoQuery(t *testing.T) { mockStorage := mock.NewMockStorage() - opts := options.EmptyHandlerOptions(). - SetStorage(mockStorage). - SetQueryContextOptions(models.QueryContextOptions{}) + opts := testHandlerOptions(t).SetStorage(mockStorage) handler := NewRenderHandler(opts) recorder := httptest.NewRecorder() @@ -99,9 +110,7 @@ func TestParseQueryNoResults(t *testing.T) { store.EXPECT().FetchBlocks(gomock.Any(), gomock.Any(), gomock.Any()). Return(blockResult, nil) - opts := options.EmptyHandlerOptions(). - SetStorage(store). - SetQueryContextOptions(models.QueryContextOptions{}) + opts := testHandlerOptions(t).SetStorage(store) handler := NewRenderHandler(opts) req := newGraphiteReadHTTPRequest(t) @@ -144,9 +153,7 @@ func TestParseQueryResults(t *testing.T) { store.EXPECT().FetchBlocks(gomock.Any(), gomock.Any(), gomock.Any()). Return(blockResult, nil) - opts := options.EmptyHandlerOptions(). - SetStorage(store). - SetQueryContextOptions(models.QueryContextOptions{}) + opts := testHandlerOptions(t).SetStorage(store) handler := NewRenderHandler(opts) req := newGraphiteReadHTTPRequest(t) @@ -198,9 +205,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) { store.EXPECT().FetchBlocks(gomock.Any(), gomock.Any(), gomock.Any()). Return(blockResult, nil) - opts := options.EmptyHandlerOptions(). - SetStorage(store). - SetQueryContextOptions(models.QueryContextOptions{}) + opts := testHandlerOptions(t).SetStorage(store) handler := NewRenderHandler(opts) req := newGraphiteReadHTTPRequest(t) @@ -254,9 +259,7 @@ func TestParseQueryResultsMultiTarget(t *testing.T) { store.EXPECT().FetchBlocks(gomock.Any(), gomock.Any(), gomock.Any()). Return(makeBlockResult(ctrl, fr), nil) - opts := options.EmptyHandlerOptions(). - SetStorage(store). - SetQueryContextOptions(models.QueryContextOptions{}) + opts := testHandlerOptions(t).SetStorage(store) handler := NewRenderHandler(opts) req := newGraphiteReadHTTPRequest(t) @@ -317,9 +320,7 @@ func TestParseQueryResultsMultiTargetWithLimits(t *testing.T) { store.EXPECT().FetchBlocks(gomock.Any(), gomock.Any(), gomock.Any()). Return(makeBlockResult(ctrl, frTwo), nil) - opts := options.EmptyHandlerOptions(). - SetStorage(store). - SetQueryContextOptions(models.QueryContextOptions{}) + opts := testHandlerOptions(t).SetStorage(store) handler := NewRenderHandler(opts) req := newGraphiteReadHTTPRequest(t) @@ -364,9 +365,9 @@ func TestParseQueryResultsAllNaN(t *testing.T) { graphiteStorageOpts := graphiteStorage.M3WrappedStorageOptions{ RenderSeriesAllNaNs: true, } - opts := options.EmptyHandlerOptions(). + opts := testHandlerOptions(t). SetStorage(store). - SetQueryContextOptions(models.QueryContextOptions{}).SetGraphiteStorageOptions(graphiteStorageOpts) + SetGraphiteStorageOptions(graphiteStorageOpts) handler := NewRenderHandler(opts) req := newGraphiteReadHTTPRequest(t) diff --git a/src/query/api/v1/handler/namespace/common.go b/src/query/api/v1/handler/namespace/common.go index 8290490395..f58ccf05d2 100644 --- a/src/query/api/v1/handler/namespace/common.go +++ b/src/query/api/v1/handler/namespace/common.go @@ -31,9 +31,9 @@ import ( "github.com/m3db/m3/src/cluster/kv" nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace" "github.com/m3db/m3/src/dbnode/namespace" - "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/storage/m3" + "github.com/m3db/m3/src/query/util/queryhttp" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" ) @@ -106,89 +106,94 @@ type applyMiddlewareFn func( r *http.Request, ) -type addRouteFn func( - path string, - applyMiddlewareFn applyMiddlewareFn, - methods ...string, -) error - // RegisterRoutes registers the namespace routes. func RegisterRoutes( - addRouteFn handler.AddRouteFn, + r *queryhttp.EndpointRegistry, client clusterclient.Client, clusters m3.Clusters, defaults []handleroptions.ServiceOptionsDefault, instrumentOpts instrument.Options, ) error { - addRoute := applyMiddlewareToRoute(addRouteFn, defaults) + applyMiddleware := func( + f func(svc handleroptions.ServiceNameAndDefaults, + w http.ResponseWriter, r *http.Request), + defaults []handleroptions.ServiceOptionsDefault, + ) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + svc := handleroptions.ServiceNameAndDefaults{ + ServiceName: handleroptions.M3DBServiceName, + Defaults: defaults, + } + f(svc, w, r) + }) + } // Get M3DB namespaces. - getHandler := NewGetHandler(client, instrumentOpts).ServeHTTP - if err := addRoute(M3DBGetURL, getHandler, GetHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: M3DBGetURL, + Handler: applyMiddleware(NewGetHandler(client, instrumentOpts).ServeHTTP, defaults), + Methods: []string{GetHTTPMethod}, + }); err != nil { return err } // Add M3DB namespaces. - addHandler := NewAddHandler(client, instrumentOpts).ServeHTTP - if err := addRoute(M3DBAddURL, addHandler, AddHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: M3DBAddURL, + Handler: applyMiddleware(NewAddHandler(client, instrumentOpts).ServeHTTP, defaults), + Methods: []string{AddHTTPMethod}, + }); err != nil { return err } // Update M3DB namespaces. - updateHandler := NewUpdateHandler(client, instrumentOpts).ServeHTTP - if err := addRoute(M3DBUpdateURL, updateHandler, UpdateHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: M3DBUpdateURL, + Handler: applyMiddleware(NewUpdateHandler(client, instrumentOpts).ServeHTTP, defaults), + Methods: []string{UpdateHTTPMethod}, + }); err != nil { return err } // Delete M3DB namespaces. - deleteHandler := NewDeleteHandler(client, instrumentOpts).ServeHTTP - if err := addRoute(M3DBDeleteURL, deleteHandler, DeleteHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: M3DBDeleteURL, + Handler: applyMiddleware(NewDeleteHandler(client, instrumentOpts).ServeHTTP, defaults), + Methods: []string{DeleteHTTPMethod}, + }); err != nil { return err } // Deploy M3DB schemas. - schemaHandler := NewSchemaHandler(client, instrumentOpts).ServeHTTP - if err := addRoute(M3DBSchemaURL, schemaHandler, SchemaDeployHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: M3DBSchemaURL, + Handler: applyMiddleware(NewSchemaHandler(client, instrumentOpts).ServeHTTP, defaults), + Methods: []string{SchemaDeployHTTPMethod}, + }); err != nil { return err } // Reset M3DB schemas. - schemaResetHandler := NewSchemaResetHandler(client, instrumentOpts).ServeHTTP - if err := addRoute(M3DBSchemaURL, schemaResetHandler, DeleteHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: M3DBSchemaURL, + Handler: applyMiddleware(NewSchemaResetHandler(client, instrumentOpts).ServeHTTP, defaults), + Methods: []string{DeleteHTTPMethod}, + }); err != nil { return err } // Mark M3DB namespace as ready. - readyHandler := NewReadyHandler(client, clusters, instrumentOpts).ServeHTTP - if err := addRoute(M3DBReadyURL, readyHandler, ReadyHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: M3DBReadyURL, + Handler: applyMiddleware(NewReadyHandler(client, clusters, instrumentOpts).ServeHTTP, defaults), + Methods: []string{ReadyHTTPMethod}, + }); err != nil { return err } return nil } -func applyMiddlewareToRoute( - addRouteFn handler.AddRouteFn, - defaults []handleroptions.ServiceOptionsDefault, -) addRouteFn { - applyMiddleware := func( - applyMiddlewareFn applyMiddlewareFn, - defaults []handleroptions.ServiceOptionsDefault, - ) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - svc := handleroptions.ServiceNameAndDefaults{ - ServiceName: handleroptions.M3DBServiceName, - Defaults: defaults, - } - applyMiddlewareFn(svc, w, r) - }) - } - - return func(path string, f applyMiddlewareFn, methods ...string) error { - return addRouteFn(path, applyMiddleware(f, defaults), methods...) - } -} - func validateNamespaceAggregationOptions(mds []namespace.Metadata) error { resolutionRetentionMap := make(map[resolutionRetentionKey]bool, len(mds)) diff --git a/src/query/api/v1/handler/placement/common.go b/src/query/api/v1/handler/placement/common.go index 4afba97187..d28ec4096e 100644 --- a/src/query/api/v1/handler/placement/common.go +++ b/src/query/api/v1/handler/placement/common.go @@ -37,9 +37,8 @@ import ( "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/cmd/services/m3query/config" - "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" - "github.com/m3db/m3/src/query/util/logging" + "github.com/m3db/m3/src/query/util/queryhttp" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" @@ -221,113 +220,119 @@ func ConvertInstancesProto(instancesProto []*placementpb.Instance) ([]placement. // RegisterRoutes registers the placement routes func RegisterRoutes( - addRoute handler.AddRouteFn, + r *queryhttp.EndpointRegistry, defaults []handleroptions.ServiceOptionsDefault, opts HandlerOptions, ) error { // Init var ( initHandler = NewInitHandler(opts) - initFn = applyMiddleware(initHandler.ServeHTTP, defaults, opts.instrumentOptions) + initFn = applyMiddleware(initHandler.ServeHTTP, defaults) ) - - if err := addRoute(M3DBInitURL, initFn, InitHTTPMethod); err != nil { - return err - } - if err := addRoute(M3AggInitURL, initFn, InitHTTPMethod); err != nil { - return err - } - if err := addRoute(M3CoordinatorInitURL, initFn, InitHTTPMethod); err != nil { + if err := r.RegisterPaths([]string{ + M3DBInitURL, + M3AggInitURL, + M3CoordinatorInitURL, + }, queryhttp.RegisterPathsOptions{ + Handler: initFn, + Methods: []string{InitHTTPMethod}, + }); err != nil { return err } // Get var ( getHandler = NewGetHandler(opts) - getFn = applyMiddleware(getHandler.ServeHTTP, defaults, opts.instrumentOptions) + getFn = applyMiddleware(getHandler.ServeHTTP, defaults) ) - if err := addRoute(M3DBGetURL, getFn, GetHTTPMethod); err != nil { - return err - } - if err := addRoute(M3AggGetURL, getFn, GetHTTPMethod); err != nil { - return err - } - if err := addRoute(M3CoordinatorGetURL, getFn, GetHTTPMethod); err != nil { + if err := r.RegisterPaths([]string{ + M3DBGetURL, + M3AggGetURL, + M3CoordinatorGetURL, + }, queryhttp.RegisterPathsOptions{ + Handler: getFn, + Methods: []string{GetHTTPMethod}, + }); err != nil { return err } // Delete all var ( deleteAllHandler = NewDeleteAllHandler(opts) - deleteAllFn = applyMiddleware(deleteAllHandler.ServeHTTP, defaults, opts.instrumentOptions) + deleteAllFn = applyMiddleware(deleteAllHandler.ServeHTTP, defaults) ) - if err := addRoute(M3DBDeleteAllURL, deleteAllFn, DeleteAllHTTPMethod); err != nil { - return err - } - if err := addRoute(M3AggDeleteAllURL, deleteAllFn, DeleteAllHTTPMethod); err != nil { - return err - } - if err := addRoute(M3CoordinatorDeleteAllURL, deleteAllFn, DeleteAllHTTPMethod); err != nil { + if err := r.RegisterPaths([]string{ + M3DBDeleteAllURL, + M3AggDeleteAllURL, + M3CoordinatorDeleteAllURL, + }, queryhttp.RegisterPathsOptions{ + Handler: deleteAllFn, + Methods: []string{DeleteAllHTTPMethod}, + }); err != nil { return err } // Add var ( addHandler = NewAddHandler(opts) - addFn = applyMiddleware(addHandler.ServeHTTP, defaults, opts.instrumentOptions) + addFn = applyMiddleware(addHandler.ServeHTTP, defaults) ) - if err := addRoute(M3DBAddURL, addFn, AddHTTPMethod); err != nil { - return err - } - if err := addRoute(M3AggAddURL, addFn, AddHTTPMethod); err != nil { - return err - } - if err := addRoute(M3CoordinatorAddURL, addFn, AddHTTPMethod); err != nil { + if err := r.RegisterPaths([]string{ + M3DBAddURL, + M3AggAddURL, + M3CoordinatorAddURL, + }, queryhttp.RegisterPathsOptions{ + Handler: addFn, + Methods: []string{AddHTTPMethod}, + }); err != nil { return err } // Delete var ( deleteHandler = NewDeleteHandler(opts) - deleteFn = applyMiddleware(deleteHandler.ServeHTTP, defaults, opts.instrumentOptions) + deleteFn = applyMiddleware(deleteHandler.ServeHTTP, defaults) ) - if err := addRoute(M3DBDeleteURL, deleteFn, DeleteHTTPMethod); err != nil { - return err - } - if err := addRoute(M3AggDeleteURL, deleteFn, DeleteHTTPMethod); err != nil { - return err - } - if err := addRoute(M3CoordinatorDeleteURL, deleteFn, DeleteHTTPMethod); err != nil { + if err := r.RegisterPaths([]string{ + M3DBDeleteURL, + M3AggDeleteURL, + M3CoordinatorDeleteURL, + }, queryhttp.RegisterPathsOptions{ + Handler: deleteFn, + Methods: []string{DeleteHTTPMethod}, + }); err != nil { return err } // Replace var ( replaceHandler = NewReplaceHandler(opts) - replaceFn = applyMiddleware(replaceHandler.ServeHTTP, defaults, opts.instrumentOptions) + replaceFn = applyMiddleware(replaceHandler.ServeHTTP, defaults) ) - if err := addRoute(M3DBReplaceURL, replaceFn, ReplaceHTTPMethod); err != nil { - return err - } - if err := addRoute(M3AggReplaceURL, replaceFn, ReplaceHTTPMethod); err != nil { - return err - } - if err := addRoute(M3CoordinatorReplaceURL, replaceFn, ReplaceHTTPMethod); err != nil { + if err := r.RegisterPaths([]string{ + M3DBReplaceURL, + M3AggReplaceURL, + M3CoordinatorReplaceURL, + }, queryhttp.RegisterPathsOptions{ + Handler: replaceFn, + Methods: []string{ReplaceHTTPMethod}, + }); err != nil { return err } // Set var ( setHandler = NewSetHandler(opts) - setFn = applyMiddleware(setHandler.ServeHTTP, defaults, opts.instrumentOptions) + setFn = applyMiddleware(setHandler.ServeHTTP, defaults) ) - if err := addRoute(M3DBSetURL, setFn, SetHTTPMethod); err != nil { - return err - } - if err := addRoute(M3AggSetURL, setFn, SetHTTPMethod); err != nil { - return err - } - if err := addRoute(M3CoordinatorSetURL, setFn, SetHTTPMethod); err != nil { + if err := r.RegisterPaths([]string{ + M3DBSetURL, + M3AggSetURL, + M3CoordinatorSetURL, + }, queryhttp.RegisterPathsOptions{ + Handler: setFn, + Methods: []string{SetHTTPMethod}, + }); err != nil { return err } @@ -427,21 +432,10 @@ func validateAllAvailable(p placement.Placement) error { } func applyMiddleware( - f func(svc handleroptions.ServiceNameAndDefaults, w http.ResponseWriter, r *http.Request), - defaults []handleroptions.ServiceOptionsDefault, - instrumentOpts instrument.Options, -) http.Handler { - return logging.WithResponseTimeAndPanicErrorLoggingFunc( - parseServiceMiddleware(f, defaults), - instrumentOpts, - ) -} - -func parseServiceMiddleware( next func(svc handleroptions.ServiceNameAndDefaults, w http.ResponseWriter, r *http.Request), defaults []handleroptions.ServiceOptionsDefault, -) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { +) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var ( svc = handleroptions.ServiceNameAndDefaults{Defaults: defaults} err error @@ -453,7 +447,7 @@ func parseServiceMiddleware( } next(svc, w, r) - } + }) } func parseServiceFromRequest(r *http.Request) (string, error) { diff --git a/src/query/api/v1/handler/prom/read.go b/src/query/api/v1/handler/prom/read.go index 25845907f7..3d379e22fb 100644 --- a/src/query/api/v1/handler/prom/read.go +++ b/src/query/api/v1/handler/prom/read.go @@ -126,7 +126,7 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { zap.Error(err), zap.String("query", query)) } - handleroptions.AddWarningHeaders(w, resultMetadata) + handleroptions.AddResponseHeaders(w, resultMetadata, fetchOptions) respond(w, &queryData{ Result: res.Value, ResultType: res.Value.Type(), diff --git a/src/query/api/v1/handler/prom/read_instant.go b/src/query/api/v1/handler/prom/read_instant.go index c74d376cc7..96a0d98b25 100644 --- a/src/query/api/v1/handler/prom/read_instant.go +++ b/src/query/api/v1/handler/prom/read_instant.go @@ -129,7 +129,7 @@ func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.logger.Warn("error applying range warnings", zap.Error(err), zap.String("query", query)) } - handleroptions.AddWarningHeaders(w, resultMetadata) + handleroptions.AddResponseHeaders(w, resultMetadata, fetchOptions) respond(w, &queryData{ Result: res.Value, diff --git a/src/query/api/v1/handler/prom/read_test.go b/src/query/api/v1/handler/prom/read_test.go index bdda76aa2f..f18815c60f 100644 --- a/src/query/api/v1/handler/prom/read_test.go +++ b/src/query/api/v1/handler/prom/read_test.go @@ -29,7 +29,6 @@ import ( "testing" "time" - "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/native" "github.com/m3db/m3/src/query/api/v1/options" @@ -61,12 +60,12 @@ func setupTest(t *testing.T) testHandlers { opts := Options{ PromQLEngine: testPromQLEngine, } - timeoutOpts := &prometheus.TimeoutOpts{ - FetchTimeout: 15 * time.Second, - } - fetchOptsBuilderCfg := handleroptions.FetchOptionsBuilderOptions{} - fetchOptsBuilder := handleroptions.NewFetchOptionsBuilder(fetchOptsBuilderCfg) + fetchOptsBuilderCfg := handleroptions.FetchOptionsBuilderOptions{ + Timeout: 15 * time.Second, + } + fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder(fetchOptsBuilderCfg) + require.NoError(t, err) instrumentOpts := instrument.NewOptions() engineOpts := executor.NewEngineOptions(). SetLookbackDuration(time.Minute). @@ -74,8 +73,7 @@ func setupTest(t *testing.T) testHandlers { engine := executor.NewEngine(engineOpts) hOpts := options.EmptyHandlerOptions(). SetFetchOptionsBuilder(fetchOptsBuilder). - SetEngine(engine). - SetTimeoutOpts(timeoutOpts) + SetEngine(engine) queryable := &mockQueryable{} readHandler := newReadHandler(opts, hOpts, queryable) readInstantHandler := newReadInstantHandler(opts, hOpts, queryable) diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 5192b82a11..ded4732681 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -53,11 +53,6 @@ var ( roleName = []byte("role") ) -// TimeoutOpts stores options related to various timeout configurations. -type TimeoutOpts struct { - FetchTimeout time.Duration -} - // ParsePromCompressedRequestResult is the result of a // ParsePromCompressedRequest call. type ParsePromCompressedRequestResult struct { @@ -95,38 +90,6 @@ func ParsePromCompressedRequest( }, nil } -// ParseRequestTimeout parses the input request timeout with a default. -func ParseRequestTimeout( - r *http.Request, - configFetchTimeout time.Duration, -) (time.Duration, error) { - var timeout string - if v := r.FormValue("timeout"); v != "" { - timeout = v - } - // Note: Header should take precedence. - if v := r.Header.Get("timeout"); v != "" { - timeout = v - } - - if timeout == "" { - return configFetchTimeout, nil - } - - duration, err := time.ParseDuration(timeout) - if err != nil { - return 0, xerrors.NewInvalidParamsError( - fmt.Errorf("invalid 'timeout': %v", err)) - } - - if duration > maxTimeout { - return 0, xerrors.NewInvalidParamsError( - fmt.Errorf("invalid 'timeout': greater than %v", maxTimeout)) - } - - return duration, nil -} - // TagCompletionQueries are tag completion queries. type TagCompletionQueries struct { // Queries are the tag completion queries. diff --git a/src/query/api/v1/handler/prometheus/common_test.go b/src/query/api/v1/handler/prometheus/common_test.go index cacbfbfc44..7e7e0d091d 100644 --- a/src/query/api/v1/handler/prometheus/common_test.go +++ b/src/query/api/v1/handler/prometheus/common_test.go @@ -23,20 +23,15 @@ package prometheus import ( "bytes" "fmt" - "mime/multipart" "net/http/httptest" - "net/url" "strings" "testing" - "time" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/test" xerrors "github.com/m3db/m3/src/x/errors" - xhttp "github.com/m3db/m3/src/x/net/http" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestPromCompressedReadSuccess(t *testing.T) { @@ -66,53 +61,6 @@ func TestPromCompressedReadInvalidEncoding(t *testing.T) { assert.True(t, xerrors.IsInvalidParams(err)) } -func TestTimeoutParseWithHeader(t *testing.T) { - req := httptest.NewRequest("POST", "/dummy", nil) - req.Header.Add("timeout", "1ms") - - timeout, err := ParseRequestTimeout(req, time.Second) - assert.NoError(t, err) - assert.Equal(t, timeout, time.Millisecond) - - req.Header.Del("timeout") - timeout, err = ParseRequestTimeout(req, 2*time.Minute) - assert.NoError(t, err) - assert.Equal(t, timeout, 2*time.Minute) - - req.Header.Add("timeout", "invalid") - _, err = ParseRequestTimeout(req, 15*time.Second) - assert.Error(t, err) - assert.True(t, xerrors.IsInvalidParams(err)) -} - -func TestTimeoutParseWithPostRequestParam(t *testing.T) { - params := url.Values{} - params.Add("timeout", "1ms") - - buff := bytes.NewBuffer(nil) - form := multipart.NewWriter(buff) - form.WriteField("timeout", "1ms") - require.NoError(t, form.Close()) - - req := httptest.NewRequest("POST", "/dummy", buff) - req.Header.Set(xhttp.HeaderContentType, form.FormDataContentType()) - - timeout, err := ParseRequestTimeout(req, time.Second) - assert.NoError(t, err) - assert.Equal(t, timeout, time.Millisecond) -} - -func TestTimeoutParseWithGetRequestParam(t *testing.T) { - params := url.Values{} - params.Add("timeout", "1ms") - - req := httptest.NewRequest("GET", "/dummy?"+params.Encode(), nil) - - timeout, err := ParseRequestTimeout(req, time.Second) - assert.NoError(t, err) - assert.Equal(t, timeout, time.Millisecond) -} - type writer struct { value string } diff --git a/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go b/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go index ac34a43f05..1e75b9f9c1 100644 --- a/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go +++ b/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go @@ -42,8 +42,11 @@ const ( StepParam = "step" // LookbackParam is the lookback parameter. LookbackParam = "lookback" - maxInt64 = float64(math.MaxInt64) - minInt64 = float64(math.MinInt64) + // TimeoutParam is the timeout parameter. + TimeoutParam = "timeout" + maxInt64 = float64(math.MaxInt64) + minInt64 = float64(math.MinInt64) + maxTimeout = 10 * time.Minute ) // FetchOptionsBuilder builds fetch options based on a request and default @@ -58,6 +61,12 @@ type FetchOptionsBuilder interface { type FetchOptionsBuilderOptions struct { Limits FetchOptionsBuilderLimitsOptions RestrictByTag *storage.RestrictByTag + Timeout time.Duration +} + +// Validate validates the fetch options builder options. +func (o FetchOptionsBuilderOptions) Validate() error { + return validateTimeout(o.Timeout) } // FetchOptionsBuilderLimitsOptions provides limits options to use when @@ -75,8 +84,11 @@ type fetchOptionsBuilder struct { // NewFetchOptionsBuilder returns a new fetch options builder. func NewFetchOptionsBuilder( opts FetchOptionsBuilderOptions, -) FetchOptionsBuilder { - return fetchOptionsBuilder{opts: opts} +) (FetchOptionsBuilder, error) { + if err := opts.Validate(); err != nil { + return nil, err + } + return fetchOptionsBuilder{opts: opts}, nil } // ParseLimit parses request limit from either header or query string. @@ -247,6 +259,11 @@ func (b fetchOptionsBuilder) newFetchOptions( fetchOpts.LookbackDuration = &lookback } + fetchOpts.Timeout, err = ParseRequestTimeout(req, b.opts.Timeout) + if err != nil { + return nil, fmt.Errorf("could not parse timeout: err=%v", err) + } + return fetchOpts, nil } @@ -345,3 +362,46 @@ func ParseDuration(r *http.Request, key string) (time.Duration, error) { return 0, fmt.Errorf("cannot parse duration='%s': as_duration_err=%s, as_float_err=%s", str, durationErr, floatErr) } + +// ParseRequestTimeout parses the input request timeout with a default. +func ParseRequestTimeout( + r *http.Request, + configFetchTimeout time.Duration, +) (time.Duration, error) { + var timeout string + if v := r.FormValue(TimeoutParam); v != "" { + timeout = v + } + // Note: Header should take precedence. + if v := r.Header.Get(TimeoutParam); v != "" { + timeout = v + } + + if timeout == "" { + return configFetchTimeout, nil + } + + duration, err := time.ParseDuration(timeout) + if err != nil { + return 0, xerrors.NewInvalidParamsError( + fmt.Errorf("invalid 'timeout': %v", err)) + } + + if err := validateTimeout(duration); err != nil { + return 0, err + } + + return duration, nil +} + +func validateTimeout(v time.Duration) error { + if v <= 0 { + return xerrors.NewInvalidParamsError( + fmt.Errorf("invalid 'timeout': less than or equal to zero %v", v)) + } + if v > maxTimeout { + return xerrors.NewInvalidParamsError( + fmt.Errorf("invalid 'timeout': %v greater than max %v", v, maxTimeout)) + } + return nil +} diff --git a/src/query/api/v1/handler/prometheus/handleroptions/fetch_options_test.go b/src/query/api/v1/handler/prometheus/handleroptions/fetch_options_test.go index 427c78ad1a..4ccbb1490d 100644 --- a/src/query/api/v1/handler/prometheus/handleroptions/fetch_options_test.go +++ b/src/query/api/v1/handler/prometheus/handleroptions/fetch_options_test.go @@ -21,8 +21,10 @@ package handleroptions import ( + "bytes" "fmt" "math" + "mime/multipart" "net/http" "net/http/httptest" "net/url" @@ -34,7 +36,9 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/m3/storagemetadata" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/headers" + xhttp "github.com/m3db/m3/src/x/net/http" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -214,12 +218,14 @@ func TestFetchOptionsBuilder(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - builder := NewFetchOptionsBuilder(FetchOptionsBuilderOptions{ + builder, err := NewFetchOptionsBuilder(FetchOptionsBuilderOptions{ Limits: FetchOptionsBuilderLimitsOptions{ SeriesLimit: test.defaultLimit, }, RestrictByTag: test.defaultRestrictByTag, + Timeout: 10 * time.Second, }) + require.NoError(t, err) url := "/foo" if test.query != "" { @@ -231,7 +237,6 @@ func TestFetchOptionsBuilder(t *testing.T) { } opts, err := builder.NewFetchOptions(req) - if !test.expectedErr { require.NoError(t, err) require.Equal(t, test.expectedLimit, opts.SeriesLimit) @@ -247,6 +252,7 @@ func TestFetchOptionsBuilder(t *testing.T) { require.NotNil(t, opts.LookbackDuration) require.Equal(t, test.expectedLookback.value, *opts.LookbackDuration) } + require.Equal(t, 10*time.Second, opts.Timeout) } else { require.Error(t, err) } @@ -360,11 +366,14 @@ func TestFetchOptionsWithHeader(t *testing.T) { }`, } - builder := NewFetchOptionsBuilder(FetchOptionsBuilderOptions{ + builder, err := NewFetchOptionsBuilder(FetchOptionsBuilderOptions{ Limits: FetchOptionsBuilderLimitsOptions{ SeriesLimit: 5, }, + Timeout: 10 * time.Second, }) + require.NoError(t, err) + req := httptest.NewRequest("GET", "/", nil) for k, v := range headers { req.Header.Add(k, v) @@ -397,3 +406,57 @@ func TestFetchOptionsWithHeader(t *testing.T) { func stripSpace(str string) string { return regexp.MustCompile(`\s+`).ReplaceAllString(str, "") } + +func TestParseRequestTimeout(t *testing.T) { + req := httptest.NewRequest("GET", "/read?timeout=2m", nil) + dur, err := ParseRequestTimeout(req, time.Second) + require.NoError(t, err) + assert.Equal(t, 2*time.Minute, dur) +} + +func TestTimeoutParseWithHeader(t *testing.T) { + req := httptest.NewRequest("POST", "/dummy", nil) + req.Header.Add("timeout", "1ms") + + timeout, err := ParseRequestTimeout(req, time.Second) + assert.NoError(t, err) + assert.Equal(t, timeout, time.Millisecond) + + req.Header.Del("timeout") + timeout, err = ParseRequestTimeout(req, 2*time.Minute) + assert.NoError(t, err) + assert.Equal(t, timeout, 2*time.Minute) + + req.Header.Add("timeout", "invalid") + _, err = ParseRequestTimeout(req, 15*time.Second) + assert.Error(t, err) + assert.True(t, xerrors.IsInvalidParams(err)) +} + +func TestTimeoutParseWithPostRequestParam(t *testing.T) { + params := url.Values{} + params.Add("timeout", "1ms") + + buff := bytes.NewBuffer(nil) + form := multipart.NewWriter(buff) + form.WriteField("timeout", "1ms") + require.NoError(t, form.Close()) + + req := httptest.NewRequest("POST", "/dummy", buff) + req.Header.Set(xhttp.HeaderContentType, form.FormDataContentType()) + + timeout, err := ParseRequestTimeout(req, time.Second) + assert.NoError(t, err) + assert.Equal(t, timeout, time.Millisecond) +} + +func TestTimeoutParseWithGetRequestParam(t *testing.T) { + params := url.Values{} + params.Add("timeout", "1ms") + + req := httptest.NewRequest("GET", "/dummy?"+params.Encode(), nil) + + timeout, err := ParseRequestTimeout(req, time.Second) + assert.NoError(t, err) + assert.Equal(t, timeout, time.Millisecond) +} diff --git a/src/query/api/v1/handler/prometheus/handleroptions/header_test.go b/src/query/api/v1/handler/prometheus/handleroptions/header_test.go index 89d90a5778..4961d47727 100644 --- a/src/query/api/v1/handler/prometheus/handleroptions/header_test.go +++ b/src/query/api/v1/handler/prometheus/handleroptions/header_test.go @@ -24,37 +24,45 @@ import ( "fmt" "net/http/httptest" "testing" + "time" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/x/headers" "github.com/stretchr/testify/assert" ) -func TestAddWarningHeaders(t *testing.T) { +func TestAddResponseHeaders(t *testing.T) { recorder := httptest.NewRecorder() meta := block.NewResultMetadata() - AddWarningHeaders(recorder, meta) + AddResponseHeaders(recorder, meta, nil) assert.Equal(t, 0, len(recorder.Header())) recorder = httptest.NewRecorder() meta.Exhaustive = false ex := headers.LimitHeaderSeriesLimitApplied - AddWarningHeaders(recorder, meta) + AddResponseHeaders(recorder, meta, nil) assert.Equal(t, 1, len(recorder.Header())) assert.Equal(t, ex, recorder.Header().Get(headers.LimitHeader)) recorder = httptest.NewRecorder() meta.AddWarning("foo", "bar") ex = fmt.Sprintf("%s,%s_%s", headers.LimitHeaderSeriesLimitApplied, "foo", "bar") - AddWarningHeaders(recorder, meta) + AddResponseHeaders(recorder, meta, nil) assert.Equal(t, 1, len(recorder.Header())) assert.Equal(t, ex, recorder.Header().Get(headers.LimitHeader)) recorder = httptest.NewRecorder() meta.Exhaustive = true ex = "foo_bar" - AddWarningHeaders(recorder, meta) + AddResponseHeaders(recorder, meta, nil) assert.Equal(t, 1, len(recorder.Header())) assert.Equal(t, ex, recorder.Header().Get(headers.LimitHeader)) + + recorder = httptest.NewRecorder() + meta = block.NewResultMetadata() + AddResponseHeaders(recorder, meta, &storage.FetchOptions{Timeout: 5 * time.Second}) + assert.Equal(t, 1, len(recorder.Header())) + assert.Equal(t, "5s", recorder.Header().Get(headers.TimeoutHeader)) } diff --git a/src/query/api/v1/handler/prometheus/handleroptions/headers.go b/src/query/api/v1/handler/prometheus/handleroptions/headers.go index 0694db73ab..96de80ea81 100644 --- a/src/query/api/v1/handler/prometheus/handleroptions/headers.go +++ b/src/query/api/v1/handler/prometheus/handleroptions/headers.go @@ -25,12 +25,22 @@ import ( "strings" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/x/headers" ) -// AddWarningHeaders adds any warning headers present in the result's metadata. -// No-op if no warnings encountered. -func AddWarningHeaders(w http.ResponseWriter, meta block.ResultMetadata) { +// AddResponseHeaders adds any warning headers present in the result's metadata, +// and also effective parameters relative to the request such as effective +// timeout in use. +func AddResponseHeaders( + w http.ResponseWriter, + meta block.ResultMetadata, + fetchOpts *storage.FetchOptions, +) { + if fetchOpts != nil { + w.Header().Set(headers.TimeoutHeader, fetchOpts.Timeout.String()) + } + ex := meta.Exhaustive warns := len(meta.Warnings) if !ex { diff --git a/src/query/api/v1/handler/prometheus/native/common.go b/src/query/api/v1/handler/prometheus/native/common.go index 59ec8630e0..ce3e7db828 100644 --- a/src/query/api/v1/handler/prometheus/native/common.go +++ b/src/query/api/v1/handler/prometheus/native/common.go @@ -28,7 +28,6 @@ import ( "strconv" "time" - "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/errors" @@ -69,7 +68,6 @@ func parseTime(r *http.Request, key string, now time.Time) (time.Time, error) { func parseParams( r *http.Request, engineOpts executor.EngineOptions, - timeoutOpts *prometheus.TimeoutOpts, fetchOpts *storage.FetchOptions, ) (models.RequestParams, error) { var params models.RequestParams @@ -89,12 +87,6 @@ func parseParams( } } - t, err := prometheus.ParseRequestTimeout(r, timeoutOpts.FetchTimeout) - if err != nil { - return params, xerrors.NewInvalidParamsError(err) - } - params.Timeout = t - start, err := parseTime(r, startParam, params.Now) if err != nil { err = fmt.Errorf(formatErrStr, startParam, err) @@ -113,6 +105,14 @@ func parseParams( } params.End = end + timeout := fetchOpts.Timeout + if timeout <= 0 { + err := fmt.Errorf("expected positive timeout, instead got: %d", timeout) + return params, xerrors.NewInvalidParamsError( + fmt.Errorf(formatErrStr, handleroptions.TimeoutParam, err)) + } + params.Timeout = timeout + step := fetchOpts.Step if step <= 0 { err := fmt.Errorf("expected positive step size, instead got: %d", step) @@ -180,7 +180,6 @@ func parseParams( func parseInstantaneousParams( r *http.Request, engineOpts executor.EngineOptions, - timeoutOpts *prometheus.TimeoutOpts, fetchOpts *storage.FetchOptions, ) (models.RequestParams, error) { if err := r.ParseForm(); err != nil { @@ -193,7 +192,7 @@ func parseInstantaneousParams( r.Form.Set(startParam, nowTimeValue) r.Form.Set(endParam, nowTimeValue) - params, err := parseParams(r, engineOpts, timeoutOpts, fetchOpts) + params, err := parseParams(r, engineOpts, fetchOpts) if err != nil { return params, err } diff --git a/src/query/api/v1/handler/prometheus/native/common_test.go b/src/query/api/v1/handler/prometheus/native/common_test.go index 8b1a1888fc..3d675559c4 100644 --- a/src/query/api/v1/handler/prometheus/native/common_test.go +++ b/src/query/api/v1/handler/prometheus/native/common_test.go @@ -30,12 +30,10 @@ import ( "testing" "time" - "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" - "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/ts" xerrors "github.com/m3db/m3/src/x/errors" @@ -52,12 +50,6 @@ const ( promQuery = `http_requests_total{job="prometheus",group="canary"}` ) -var ( - timeoutOpts = &prometheus.TimeoutOpts{ - FetchTimeout: 15 * time.Second, - } -) - func defaultParams() url.Values { vals := url.Values{} now := time.Now() @@ -69,14 +61,20 @@ func defaultParams() url.Values { } func testParseParams(req *http.Request) (models.RequestParams, error) { - fetchOpts, err := handleroptions. - NewFetchOptionsBuilder(handleroptions.FetchOptionsBuilderOptions{}). - NewFetchOptions(req) + fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{ + Timeout: 15 * time.Second, + }) if err != nil { return models.RequestParams{}, err } - return parseParams(req, executor.NewEngineOptions(), timeoutOpts, fetchOpts) + fetchOpts, err := fetchOptsBuilder.NewFetchOptions(req) + if err != nil { + return models.RequestParams{}, err + } + + return parseParams(req, executor.NewEngineOptions(), fetchOpts) } func TestParamParsing(t *testing.T) { @@ -105,10 +103,17 @@ func TestInstantaneousParamParsing(t *testing.T) { params.Add(queryParam, promQuery) params.Add(timeParam, now.Format(time.RFC3339)) req.URL.RawQuery = params.Encode() + fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{ + Timeout: 10 * time.Second, + }) + require.NoError(t, err) + fetchOpts, err := fetchOptsBuilder.NewFetchOptions(req) + require.NoError(t, err) r, err := parseInstantaneousParams(req, executor.NewEngineOptions(), - timeoutOpts, storage.NewFetchOptions()) - require.Nil(t, err, "unable to parse request") + fetchOpts) + require.NoError(t, err, "unable to parse request") require.Equal(t, promQuery, r.Query) } diff --git a/src/query/api/v1/handler/prometheus/native/complete_tags.go b/src/query/api/v1/handler/prometheus/native/complete_tags.go index 85815900f6..afc4289985 100644 --- a/src/query/api/v1/handler/prometheus/native/complete_tags.go +++ b/src/query/api/v1/handler/prometheus/native/complete_tags.go @@ -125,7 +125,7 @@ func (h *CompleteTagsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) return } - handleroptions.AddWarningHeaders(w, meta) + handleroptions.AddResponseHeaders(w, meta, opts) result := resultBuilder.Build() if err := prometheus.RenderTagCompletionResultsJSON(w, result); err != nil { logger.Error("unable to render results", zap.Error(err)) diff --git a/src/query/api/v1/handler/prometheus/native/complete_tags_test.go b/src/query/api/v1/handler/prometheus/native/complete_tags_test.go index 843bd86bff..c5ba68c5b5 100644 --- a/src/query/api/v1/handler/prometheus/native/complete_tags_test.go +++ b/src/query/api/v1/handler/prometheus/native/complete_tags_test.go @@ -25,6 +25,7 @@ import ( "io/ioutil" "net/http/httptest" "testing" + "time" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/options" @@ -88,8 +89,9 @@ func testCompleteTags(t *testing.T, meta block.ResultMetadata, header string) { Metadata: meta, } - fb := handleroptions.NewFetchOptionsBuilder( - handleroptions.FetchOptionsBuilderOptions{}) + fb, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{Timeout: 15 * time.Second}) + require.NoError(t, err) opts := options.EmptyHandlerOptions(). SetStorage(store). SetFetchOptionsBuilder(fb) @@ -169,8 +171,9 @@ func TestMultiCompleteTags(t *testing.T) { Metadata: barMeta, } - fb := handleroptions.NewFetchOptionsBuilder( - handleroptions.FetchOptionsBuilderOptions{}) + fb, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{Timeout: 15 * time.Second}) + require.NoError(t, err) opts := options.EmptyHandlerOptions(). SetStorage(store). SetFetchOptionsBuilder(fb) diff --git a/src/query/api/v1/handler/prometheus/native/list_tags.go b/src/query/api/v1/handler/prometheus/native/list_tags.go index 8f100d4a06..7841a0dca7 100644 --- a/src/query/api/v1/handler/prometheus/native/list_tags.go +++ b/src/query/api/v1/handler/prometheus/native/list_tags.go @@ -94,7 +94,7 @@ func (h *ListTagsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - handleroptions.AddWarningHeaders(w, result.Metadata) + handleroptions.AddResponseHeaders(w, result.Metadata, opts) if err = prometheus.RenderListTagResultsJSON(w, result); err != nil { logger.Error("unable to render results", zap.Error(err)) xhttp.WriteError(w, err) diff --git a/src/query/api/v1/handler/prometheus/native/list_tags_test.go b/src/query/api/v1/handler/prometheus/native/list_tags_test.go index 8ef8a9c32f..0ecc6dfa04 100644 --- a/src/query/api/v1/handler/prometheus/native/list_tags_test.go +++ b/src/query/api/v1/handler/prometheus/native/list_tags_test.go @@ -111,8 +111,9 @@ func testListTags(t *testing.T, meta block.ResultMetadata, header string) { return now } - fb := handleroptions.NewFetchOptionsBuilder( - handleroptions.FetchOptionsBuilderOptions{}) + fb, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{Timeout: 15 * time.Second}) + require.NoError(t, err) opts := options.EmptyHandlerOptions(). SetStorage(store). SetFetchOptionsBuilder(fb). @@ -155,8 +156,9 @@ func TestListErrorTags(t *testing.T) { return now } - fb := handleroptions.NewFetchOptionsBuilder( - handleroptions.FetchOptionsBuilderOptions{}) + fb, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{Timeout: 15 * time.Second}) + require.NoError(t, err) opts := options.EmptyHandlerOptions(). SetStorage(store). SetFetchOptionsBuilder(fb). diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index be153d0d4c..255f6dd69c 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -150,7 +150,7 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } w.Header().Set(xhttp.HeaderContentType, xhttp.ContentTypeJSON) - handleroptions.AddWarningHeaders(w, result.Meta) + handleroptions.AddResponseHeaders(w, result.Meta, parsedOptions.FetchOpts) h.promReadMetrics.fetchSuccess.Inc(1) keepNaNs := h.opts.Config().ResultOptions.KeepNaNs diff --git a/src/query/api/v1/handler/prometheus/native/read_common.go b/src/query/api/v1/handler/prometheus/native/read_common.go index b2c1760125..5420693b7b 100644 --- a/src/query/api/v1/handler/prometheus/native/read_common.go +++ b/src/query/api/v1/handler/prometheus/native/read_common.go @@ -102,7 +102,8 @@ func parseRequest( LimitMaxTimeseries: fetchOpts.SeriesLimit, LimitMaxDocs: fetchOpts.DocsLimit, Instantaneous: instantaneous, - }} + }, + } restrictOpts := fetchOpts.RestrictQueryOptions.GetRestrictByType() if restrictOpts != nil { @@ -119,11 +120,9 @@ func parseRequest( params models.RequestParams ) if instantaneous { - params, err = parseInstantaneousParams(r, engine.Options(), - opts.TimeoutOpts(), fetchOpts) + params, err = parseInstantaneousParams(r, engine.Options(), fetchOpts) } else { - params, err = parseParams(r, engine.Options(), - opts.TimeoutOpts(), fetchOpts) + params, err = parseParams(r, engine.Options(), fetchOpts) } if err != nil { return ParsedOptions{}, err diff --git a/src/query/api/v1/handler/prometheus/native/read_instantaneous_test.go b/src/query/api/v1/handler/prometheus/native/read_instantaneous_test.go index a1352be788..a4c65f0e51 100644 --- a/src/query/api/v1/handler/prometheus/native/read_instantaneous_test.go +++ b/src/query/api/v1/handler/prometheus/native/read_instantaneous_test.go @@ -95,7 +95,7 @@ func testPromReadInstantHandler( ) { values, bounds := test.GenerateValuesAndBounds(nil, nil) - setup := newTestSetup(timeoutOpts, nil) + setup := newTestSetup(t, nil) promReadInstant := setup.Handlers.instantRead seriesMeta := test.NewSeriesMeta("dummy", len(values)) @@ -172,7 +172,7 @@ func testPromReadInstantHandler( } func TestPromReadInstantHandlerStorageError(t *testing.T) { - setup := newTestSetup(timeoutOpts, nil) + setup := newTestSetup(t, nil) promReadInstant := setup.Handlers.instantRead storageErr := fmt.Errorf("storage err") diff --git a/src/query/api/v1/handler/prometheus/native/read_test.go b/src/query/api/v1/handler/prometheus/native/read_test.go index c1b3bffca9..7808dd914c 100644 --- a/src/query/api/v1/handler/prometheus/native/read_test.go +++ b/src/query/api/v1/handler/prometheus/native/read_test.go @@ -30,7 +30,6 @@ import ( "github.com/golang/mock/gomock" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler" - "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/block" @@ -49,16 +48,14 @@ import ( ) func TestParseRequest(t *testing.T) { - setup := newTestSetup(&prometheus.TimeoutOpts{ - FetchTimeout: 10 * time.Second, - }, nil) + setup := newTestSetup(t, nil) req, _ := http.NewRequest("GET", PromReadURL, nil) req.URL.RawQuery = defaultParams().Encode() parsed, err := ParseRequest(req.Context(), req, false, setup.options) require.NoError(t, err) - require.Equal(t, time.Second*10, parsed.Params.Timeout) - require.Equal(t, time.Second*0, parsed.FetchOpts.Timeout) + require.Equal(t, 15*time.Second, parsed.Params.Timeout) + require.Equal(t, 15*time.Second, parsed.FetchOpts.Timeout) require.Equal(t, 0, parsed.FetchOpts.DocsLimit) require.Equal(t, 0, parsed.FetchOpts.SeriesLimit) require.Equal(t, false, parsed.FetchOpts.RequireExhaustive) @@ -95,7 +92,7 @@ func TestPromReadHandlerWithTimeout(t *testing.T) { return nil, nil }) - setup := newTestSetup(nil, engine) + setup := newTestSetup(t, engine) promRead := setup.Handlers.read req, _ := http.NewRequest("GET", PromReadURL, nil) @@ -129,7 +126,7 @@ func testPromReadHandlerRead( ) { values, bounds := test.GenerateValuesAndBounds(nil, nil) - setup := newTestSetup(timeoutOpts, nil) + setup := newTestSetup(t, nil) promRead := setup.Handlers.read seriesMeta := test.NewSeriesMeta("dummy", len(values)) @@ -176,12 +173,11 @@ func newReadRequest(t *testing.T, params url.Values) *http.Request { } type testSetup struct { - Storage mock.Storage - Handlers testSetupHandlers - QueryOpts *executor.QueryOptions - FetchOpts *storage.FetchOptions - TimeoutOpts *prometheus.TimeoutOpts - options options.HandlerOptions + Storage mock.Storage + Handlers testSetupHandlers + QueryOpts *executor.QueryOptions + FetchOpts *storage.FetchOptions + options options.HandlerOptions } type testSetupHandlers struct { @@ -190,7 +186,7 @@ type testSetupHandlers struct { } func newTestSetup( - timeout *prometheus.TimeoutOpts, + t *testing.T, mockEngine *executor.MockEngine, ) *testSetup { mockStorage := mock.NewMockStorage() @@ -204,8 +200,11 @@ func newTestSetup( if mockEngine != nil { engine = mockEngine } - fetchOptsBuilderCfg := handleroptions.FetchOptionsBuilderOptions{} - fetchOptsBuilder := handleroptions.NewFetchOptionsBuilder(fetchOptsBuilderCfg) + fetchOptsBuilderCfg := handleroptions.FetchOptionsBuilderOptions{ + Timeout: 15 * time.Second, + } + fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder(fetchOptsBuilderCfg) + require.NoError(t, err) tagOpts := models.NewTagOptions() limitsConfig := config.LimitsConfiguration{} keepNaNs := false @@ -214,7 +213,6 @@ func newTestSetup( SetEngine(engine). SetFetchOptionsBuilder(fetchOptsBuilder). SetTagOptions(tagOpts). - SetTimeoutOpts(timeout). SetInstrumentOpts(instrumentOpts). SetConfig(config.Configuration{ Limits: limitsConfig, @@ -232,10 +230,9 @@ func newTestSetup( read: read, instantRead: instantRead, }, - QueryOpts: &executor.QueryOptions{}, - FetchOpts: storage.NewFetchOptions(), - TimeoutOpts: timeoutOpts, - options: opts, + QueryOpts: &executor.QueryOptions{}, + FetchOpts: storage.NewFetchOptions(), + options: opts, } } diff --git a/src/query/api/v1/handler/prometheus/remote/match.go b/src/query/api/v1/handler/prometheus/remote/match.go index 7244b0e552..7359bc743e 100644 --- a/src/query/api/v1/handler/prometheus/remote/match.go +++ b/src/query/api/v1/handler/prometheus/remote/match.go @@ -103,7 +103,7 @@ func (h *PromSeriesMatchHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques meta = meta.CombineMetadata(result.Metadata) } - handleroptions.AddWarningHeaders(w, meta) + handleroptions.AddResponseHeaders(w, meta, opts) // TODO: Support multiple result types if err := prometheus.RenderSeriesMatchResultsJSON(w, results, false); err != nil { logger.Error("unable to write matched series", zap.Error(err)) diff --git a/src/query/api/v1/handler/prometheus/remote/read.go b/src/query/api/v1/handler/prometheus/remote/read.go index 8c7f050beb..83f32b8f27 100644 --- a/src/query/api/v1/handler/prometheus/remote/read.go +++ b/src/query/api/v1/handler/prometheus/remote/read.go @@ -130,6 +130,9 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // Write headers before response. + handleroptions.AddResponseHeaders(w, readResult.Meta, fetchOpts) + // NB: if this errors, all relevant headers and information should already // be sent to the writer; so it is not necessary to do anything here other // than increment success/failure metrics. @@ -174,8 +177,6 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } w.Header().Set(xhttp.HeaderContentType, xhttp.ContentTypeJSON) - handleroptions.AddWarningHeaders(w, readResult.Meta) - err = json.NewEncoder(w).Encode(result) default: err = WriteSnappyCompressed(w, readResult, logger) @@ -228,7 +229,6 @@ func WriteSnappyCompressed( w.Header().Set(xhttp.HeaderContentType, xhttp.ContentTypeProtobuf) w.Header().Set("Content-Encoding", "snappy") - handleroptions.AddWarningHeaders(w, readResult.Meta) compressed := snappy.Encode(nil, data) if _, err := w.Write(compressed); err != nil { @@ -404,18 +404,11 @@ func parseRequest( return nil, nil, err } - timeout := opts.TimeoutOpts().FetchTimeout - timeout, err = prometheus.ParseRequestTimeout(r, timeout) - if err != nil { - return nil, nil, err - } - fetchOpts, rErr := opts.FetchOptionsBuilder().NewFetchOptions(r) if rErr != nil { return nil, nil, rErr } - fetchOpts.Timeout = timeout return req, fetchOpts, nil } diff --git a/src/query/api/v1/handler/prometheus/remote/read_test.go b/src/query/api/v1/handler/prometheus/remote/read_test.go index 9ded0bdc53..7d723518cb 100644 --- a/src/query/api/v1/handler/prometheus/remote/read_test.go +++ b/src/query/api/v1/handler/prometheus/remote/read_test.go @@ -37,7 +37,6 @@ import ( "github.com/m3db/m3/src/dbnode/client" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" "github.com/m3db/m3/src/query/api/v1/handler" - "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/block" @@ -62,10 +61,6 @@ import ( var ( promReadTestMetrics = newPromReadMetrics(tally.NewTestScope("", nil)) defaultLookbackDuration = time.Minute - - timeoutOpts = &prometheus.TimeoutOpts{ - FetchTimeout: 15 * time.Second, - } ) type testVals struct { @@ -151,25 +146,26 @@ func setupServer(t *testing.T) *httptest.Server { Return(nil, client.FetchResponseMetadata{Exhaustive: false}, fmt.Errorf("not initialized")).MaxTimes(1) storage := test.NewSlowStorage(lstore, 10*time.Millisecond) - promRead := readHandler(storage, timeoutOpts) + promRead := readHandler(t, storage) server := httptest.NewServer(test.NewSlowHandler(promRead, 10*time.Millisecond)) return server } -func readHandler(store storage.Storage, - timeoutOpts *prometheus.TimeoutOpts) http.Handler { +func readHandler(t *testing.T, store storage.Storage) http.Handler { fetchOpts := handleroptions.FetchOptionsBuilderOptions{ Limits: handleroptions.FetchOptionsBuilderLimitsOptions{ SeriesLimit: 100, }, + Timeout: 15 * time.Second, } + fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder(fetchOpts) + require.NoError(t, err) iOpts := instrument.NewOptions() engine := newEngine(store, defaultLookbackDuration, iOpts) opts := options.EmptyHandlerOptions(). SetEngine(engine). SetInstrumentOpts(iOpts). - SetFetchOptionsBuilder(handleroptions.NewFetchOptionsBuilder(fetchOpts)). - SetTimeoutOpts(timeoutOpts) + SetFetchOptionsBuilder(fetchOptsBuilder) return NewPromReadHandler(opts) } @@ -181,14 +177,16 @@ func TestPromReadParsing(t *testing.T) { Limits: handleroptions.FetchOptionsBuilderLimitsOptions{ SeriesLimit: 100, }, + Timeout: 15 * time.Second, } + fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder(builderOpts) + require.NoError(t, err) engine := newEngine(storage, defaultLookbackDuration, instrument.NewOptions()) opts := options.EmptyHandlerOptions(). SetEngine(engine). - SetFetchOptionsBuilder(handleroptions.NewFetchOptionsBuilder(builderOpts)). - SetTimeoutOpts(timeoutOpts) + SetFetchOptionsBuilder(fetchOptsBuilder) req := httptest.NewRequest("POST", PromReadURL, test.GeneratePromReadBody(t)) r, fetchOpts, err := ParseRequest(context.TODO(), req, opts) @@ -197,14 +195,6 @@ func TestPromReadParsing(t *testing.T) { fmt.Println(fetchOpts) } -func TestPromFetchTimeoutParsing(t *testing.T) { - url := fmt.Sprintf("%s?timeout=2m", PromReadURL) - req := httptest.NewRequest("POST", url, test.GeneratePromReadBody(t)) - dur, err := prometheus.ParseRequestTimeout(req, time.Second) - require.NoError(t, err) - assert.Equal(t, 2*time.Minute, dur) -} - func TestPromReadParsingBad(t *testing.T) { req := httptest.NewRequest("POST", PromReadURL, strings.NewReader("bad body")) _, _, err := ParseRequest(context.TODO(), req, options.EmptyHandlerOptions()) @@ -293,13 +283,15 @@ func TestReadErrorMetricsCount(t *testing.T) { Limits: handleroptions.FetchOptionsBuilderLimitsOptions{ SeriesLimit: 100, }, + Timeout: 15 * time.Second, } + fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder(buildOpts) + require.NoError(t, err) engine := newEngine(storage, defaultLookbackDuration, instrument.NewOptions()) opts := options.EmptyHandlerOptions(). SetEngine(engine). - SetTimeoutOpts(&prometheus.TimeoutOpts{FetchTimeout: time.Minute}). - SetFetchOptionsBuilder(handleroptions.NewFetchOptionsBuilder(buildOpts)) + SetFetchOptionsBuilder(fetchOptsBuilder) promRead := &promReadHandler{ promReadMetrics: readMetrics, opts: opts, diff --git a/src/query/api/v1/handler/prometheus/remote/tag_values.go b/src/query/api/v1/handler/prometheus/remote/tag_values.go index a8aa873a35..f146ffb649 100644 --- a/src/query/api/v1/handler/prometheus/remote/tag_values.go +++ b/src/query/api/v1/handler/prometheus/remote/tag_values.go @@ -102,7 +102,7 @@ func (h *TagValuesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - handleroptions.AddWarningHeaders(w, result.Metadata) + handleroptions.AddResponseHeaders(w, result.Metadata, opts) // TODO: Support multiple result types err = prometheus.RenderTagValuesResultsJSON(w, result) if err != nil { diff --git a/src/query/api/v1/handler/prometheus/remote/tag_values_test.go b/src/query/api/v1/handler/prometheus/remote/tag_values_test.go index 23309b5f89..89a799f39c 100644 --- a/src/query/api/v1/handler/prometheus/remote/tag_values_test.go +++ b/src/query/api/v1/handler/prometheus/remote/tag_values_test.go @@ -103,8 +103,11 @@ func TestTagValues(t *testing.T) { return now } - fb := handleroptions.NewFetchOptionsBuilder( - handleroptions.FetchOptionsBuilderOptions{}) + fb, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{ + Timeout: 15 * time.Second, + }) + require.NoError(t, err) opts := options.EmptyHandlerOptions(). SetStorage(store). SetNowFn(nowFn). @@ -176,8 +179,11 @@ func TestTagValueErrors(t *testing.T) { return now } - fb := handleroptions.NewFetchOptionsBuilder( - handleroptions.FetchOptionsBuilderOptions{}) + fb, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{ + Timeout: 15 * time.Second, + }) + require.NoError(t, err) opts := options.EmptyHandlerOptions(). SetStorage(store). SetNowFn(nowFn). diff --git a/src/query/api/v1/handler/search_test.go b/src/query/api/v1/handler/search_test.go index f6d38cb080..87201665f9 100644 --- a/src/query/api/v1/handler/search_test.go +++ b/src/query/api/v1/handler/search_test.go @@ -101,8 +101,11 @@ func searchServer(t *testing.T) *SearchHandler { session.EXPECT().FetchTaggedIDs(gomock.Any(), gomock.Any(), gomock.Any()). Return(mockTaggedIDsIter, client.FetchResponseMetadata{Exhaustive: false}, nil).AnyTimes() - builder := handleroptions.NewFetchOptionsBuilder( - handleroptions.FetchOptionsBuilderOptions{}) + builder, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{ + Timeout: 15 * time.Second, + }) + require.NoError(t, err) opts := options.EmptyHandlerOptions(). SetStorage(storage).SetFetchOptionsBuilder(builder) search := NewSearchHandler(opts) diff --git a/src/query/api/v1/handler/topic/common.go b/src/query/api/v1/handler/topic/common.go index dc8f461e36..9e2b369149 100644 --- a/src/query/api/v1/handler/topic/common.go +++ b/src/query/api/v1/handler/topic/common.go @@ -28,8 +28,8 @@ import ( "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/msg/topic" - "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" + "github.com/m3db/m3/src/query/util/queryhttp" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" @@ -70,32 +70,46 @@ func Service(clusterClient clusterclient.Client, opts handleroptions.ServiceOpti // RegisterRoutes registers the topic routes func RegisterRoutes( - addRoute handler.AddRouteFn, + r *queryhttp.EndpointRegistry, client clusterclient.Client, cfg config.Configuration, instrumentOpts instrument.Options, ) error { - if err := addRoute(InitURL, newInitHandler(client, cfg, instrumentOpts), - InitHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: InitURL, + Handler: newInitHandler(client, cfg, instrumentOpts), + Methods: []string{InitHTTPMethod}, + }); err != nil { return err } - if err := addRoute(GetURL, newGetHandler(client, cfg, instrumentOpts), - GetHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: GetURL, + Handler: newGetHandler(client, cfg, instrumentOpts), + Methods: []string{GetHTTPMethod}, + }); err != nil { return err } - if err := addRoute(AddURL, newAddHandler(client, cfg, instrumentOpts), - AddHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: AddURL, + Handler: newAddHandler(client, cfg, instrumentOpts), + Methods: []string{AddHTTPMethod}, + }); err != nil { return err } - if err := addRoute(UpdateURL, newUpdateHandler(client, cfg, instrumentOpts), - UpdateHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: UpdateURL, + Handler: newUpdateHandler(client, cfg, instrumentOpts), + Methods: []string{UpdateHTTPMethod}, + }); err != nil { return err } - if err := addRoute(DeleteURL, newDeleteHandler(client, cfg, instrumentOpts), - DeleteHTTPMethod); err != nil { + if err := r.Register(queryhttp.RegisterOptions{ + Path: DeleteURL, + Handler: newDeleteHandler(client, cfg, instrumentOpts), + Methods: []string{DeleteHTTPMethod}, + }); err != nil { return err } - return nil } diff --git a/src/query/api/v1/handler/types.go b/src/query/api/v1/handler/types.go index 5b0f523987..54a6a605ae 100644 --- a/src/query/api/v1/handler/types.go +++ b/src/query/api/v1/handler/types.go @@ -20,14 +20,9 @@ package handler -import "net/http" - // HeaderKeyType is the type for the header key. type HeaderKeyType int -// AddRouteFn is the function type for adding new HTTP route. -type AddRouteFn func(path string, handler http.Handler, methods ...string) error - const ( // HeaderKey is the key which headers will be added to in the request context. HeaderKey HeaderKeyType = iota diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 8e188b6a74..2f8d07a9b2 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -43,6 +43,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/topic" "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/util/logging" + "github.com/m3db/m3/src/query/util/queryhttp" xdebug "github.com/m3db/m3/src/x/debug" "github.com/m3db/m3/src/x/headers" xhttp "github.com/m3db/m3/src/x/net/http" @@ -76,7 +77,7 @@ var ( // Handler represents the top-level HTTP handler. type Handler struct { - router *mux.Router + registry *queryhttp.EndpointRegistry handler http.Handler options options.HandlerOptions customHandlers []options.CustomHandler @@ -97,8 +98,10 @@ func NewHandler( handlerWithMiddleware := applyMiddleware(r, opentracing.GlobalTracer()) logger := handlerOptions.InstrumentOpts().Logger() + instrumentOpts := handlerOptions.InstrumentOpts().SetMetricsScope( + handlerOptions.InstrumentOpts().MetricsScope().SubScope("http_handler")) return &Handler{ - router: r, + registry: queryhttp.NewEndpointRegistry(r, instrumentOpts), handler: handlerWithMiddleware, options: handlerOptions, customHandlers: customHandlers, @@ -131,36 +134,22 @@ func applyMiddleware(base *mux.Router, tracer opentracing.Tracer) http.Handler { // RegisterRoutes registers all http routes. func (h *Handler) RegisterRoutes() error { - var ( - instrumentOpts = h.options.InstrumentOpts() - - // Wrap requests with response time logging as well as panic recovery. - wrapped = func(n http.Handler) http.Handler { - return logging.WithResponseTimeAndPanicErrorLogging(n, instrumentOpts) - } - - panicOnly = func(n http.Handler) http.Handler { - return logging.WithPanicErrorResponder(n, instrumentOpts) - } - - wrappedRouteFn = func(path string, handler http.Handler, methods ...string) error { - return h.addRouteHandlerFn(h.router, path, wrapped(handler).ServeHTTP, methods...) - } - - routeFn = func(path string, handler http.Handler, methods ...string) error { - return h.addRouteHandlerFn(h.router, path, handler.ServeHTTP, methods...) - } - ) - - if err := wrappedRouteFn(openapi.URL, openapi.NewDocHandler(instrumentOpts), - openapi.HTTPMethod); err != nil { + instrumentOpts := h.options.InstrumentOpts() + + // OpenAPI. + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: openapi.URL, + Handler: openapi.NewDocHandler(instrumentOpts), + Methods: methods(openapi.HTTPMethod), + }); err != nil { + return err + } + if err := h.registry.Register(queryhttp.RegisterOptions{ + PathPrefix: openapi.StaticURLPrefix, + Handler: openapi.StaticHandler(), + }); err != nil { return err } - - h.router.PathPrefix(openapi.StaticURLPrefix). - Handler(wrapped(openapi.StaticHandler())). - Name(openapi.StaticURLPrefix) - // Prometheus remote read/write endpoints. remoteSourceOpts := h.options.SetInstrumentOpts(instrumentOpts. @@ -201,99 +190,160 @@ func (h *Handler) RegisterRoutes() error { M3QueryHandler: nativePromReadInstantHandler.ServeHTTP, }) - if err := wrappedRouteFn(native.PromReadURL, h.options.QueryRouter(), - native.PromReadHTTPMethods...); err != nil { + // Query routable endpoints. + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: native.PromReadURL, + Handler: h.options.QueryRouter(), + Methods: native.PromReadHTTPMethods, + }); err != nil { return err } - if err := wrappedRouteFn(native.PromReadInstantURL, h.options.InstantQueryRouter(), - native.PromReadInstantHTTPMethods...); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: native.PromReadInstantURL, + Handler: h.options.InstantQueryRouter(), + Methods: native.PromReadInstantHTTPMethods, + }); err != nil { return err } - if err := wrappedRouteFn(native.PrometheusReadURL, promqlQueryHandler, - native.PromReadHTTPMethods...); err != nil { + // Prometheus endpoints. + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: "/prometheus" + native.PromReadURL, + Handler: promqlQueryHandler, + Methods: native.PromReadHTTPMethods, + }); err != nil { return err } - - if err := wrappedRouteFn(native.PrometheusReadInstantURL, promqlInstantQueryHandler, - native.PromReadInstantHTTPMethods...); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: "/prometheus" + native.PromReadInstantURL, + Handler: promqlInstantQueryHandler, + Methods: native.PromReadInstantHTTPMethods, + }); err != nil { return err } - if err := wrappedRouteFn(remote.PromReadURL, promRemoteReadHandler, - remote.PromReadHTTPMethods...); err != nil { + // M3Query endpoints. + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: "/m3query" + native.PromReadURL, + Handler: nativePromReadHandler, + Methods: native.PromReadHTTPMethods, + }); err != nil { return err } - if err := routeFn(remote.PromWriteURL, panicOnly(promRemoteWriteHandler), - remote.PromWriteHTTPMethod); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: "/m3query" + native.PromReadInstantURL, + Handler: nativePromReadInstantHandler, + Methods: native.PromReadInstantHTTPMethods, + }); err != nil { return err } - if err := wrappedRouteFn(native.M3QueryReadURL, nativePromReadHandler, - native.PromReadHTTPMethods...); err != nil { + // Prometheus remote read and write endpoints. + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: remote.PromReadURL, + Handler: promRemoteReadHandler, + Methods: remote.PromReadHTTPMethods, + }); err != nil { return err } - if err := wrappedRouteFn(native.M3QueryReadInstantURL, nativePromReadInstantHandler, - native.PromReadInstantHTTPMethods...); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: remote.PromWriteURL, + Handler: promRemoteWriteHandler, + Methods: methods(remote.PromWriteHTTPMethod), + // Register with no response logging for write calls since so frequent. + }, logging.WithNoResponseLog()); err != nil { return err } // InfluxDB write endpoint. - if err := wrappedRouteFn(influxdb.InfluxWriteURL, influxdb.NewInfluxWriterHandler(h.options), - influxdb.InfluxWriteHTTPMethod); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: influxdb.InfluxWriteURL, + Handler: influxdb.NewInfluxWriterHandler(h.options), + Methods: methods(influxdb.InfluxWriteHTTPMethod), + // Register with no response logging for write calls since so frequent. + }, logging.WithNoResponseLog()); err != nil { return err } // Native M3 search and write endpoints. - if err := wrappedRouteFn(handler.SearchURL, handler.NewSearchHandler(h.options), - handler.SearchHTTPMethod); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: handler.SearchURL, + Handler: handler.NewSearchHandler(h.options), + Methods: methods(handler.SearchHTTPMethod), + }); err != nil { return err } - if err := wrappedRouteFn(m3json.WriteJSONURL, m3json.NewWriteJSONHandler(h.options), - m3json.JSONWriteHTTPMethod); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: m3json.WriteJSONURL, + Handler: m3json.NewWriteJSONHandler(h.options), + Methods: methods(m3json.JSONWriteHTTPMethod), + }); err != nil { return err } // Tag completion endpoints. - if err := wrappedRouteFn(native.CompleteTagsURL, native.NewCompleteTagsHandler(h.options), - native.CompleteTagsHTTPMethod); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: native.CompleteTagsURL, + Handler: native.NewCompleteTagsHandler(h.options), + Methods: methods(native.CompleteTagsHTTPMethod), + }); err != nil { return err } - if err := wrappedRouteFn(remote.TagValuesURL, remote.NewTagValuesHandler(h.options), - remote.TagValuesHTTPMethod); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: remote.TagValuesURL, + Handler: remote.NewTagValuesHandler(h.options), + Methods: methods(remote.TagValuesHTTPMethod), + }); err != nil { return err } // List tag endpoints. - if err := wrappedRouteFn(native.ListTagsURL, native.NewListTagsHandler(h.options), - native.ListTagsHTTPMethods...); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: native.ListTagsURL, + Handler: native.NewListTagsHandler(h.options), + Methods: native.ListTagsHTTPMethods, + }); err != nil { return err } // Query parse endpoints. - if err := wrappedRouteFn(native.PromParseURL, native.NewPromParseHandler(h.options), - native.PromParseHTTPMethod); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: native.PromParseURL, + Handler: native.NewPromParseHandler(h.options), + Methods: methods(native.PromParseHTTPMethod), + }); err != nil { return err } - if err := wrappedRouteFn(native.PromThresholdURL, native.NewPromThresholdHandler(h.options), - native.PromThresholdHTTPMethod); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: native.PromThresholdURL, + Handler: native.NewPromThresholdHandler(h.options), + Methods: methods(native.PromThresholdHTTPMethod), + }); err != nil { return err } // Series match endpoints. - if err := wrappedRouteFn(remote.PromSeriesMatchURL, - remote.NewPromSeriesMatchHandler(h.options), - remote.PromSeriesMatchHTTPMethods...); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: remote.PromSeriesMatchURL, + Handler: remote.NewPromSeriesMatchHandler(h.options), + Methods: remote.PromSeriesMatchHTTPMethods, + }); err != nil { return err } // Graphite endpoints. - if err := wrappedRouteFn(graphite.ReadURL, graphite.NewRenderHandler(h.options), - graphite.ReadHTTPMethods...); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: graphite.ReadURL, + Handler: graphite.NewRenderHandler(h.options), + Methods: graphite.ReadHTTPMethods, + }); err != nil { return err } - if err := wrappedRouteFn(graphite.FindURL, graphite.NewFindHandler(h.options), - graphite.FindHTTPMethods...); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: graphite.FindURL, + Handler: graphite.NewFindHandler(h.options), + Methods: graphite.FindHTTPMethods, + }); err != nil { return err } @@ -329,26 +379,36 @@ func (h *Handler) RegisterRoutes() error { } // Register debug dump handler. - if err := wrappedRouteFn(xdebug.DebugURL, debugWriter.HTTPHandler()); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: xdebug.DebugURL, + Handler: debugWriter.HTTPHandler(), + Methods: methods(xdebug.DebugMethod), + }); err != nil { return err } if clusterClient != nil { - if err := database.RegisterRoutes(wrappedRouteFn, clusterClient, + err = database.RegisterRoutes(h.registry, clusterClient, h.options.Config(), h.options.EmbeddedDbCfg(), - serviceOptionDefaults, instrumentOpts); err != nil { + serviceOptionDefaults, instrumentOpts) + if err != nil { return err } - if err := placement.RegisterRoutes(routeFn, serviceOptionDefaults, - placementOpts); err != nil { + + err = placement.RegisterRoutes(h.registry, + serviceOptionDefaults, placementOpts) + if err != nil { return err } - if err := namespace.RegisterRoutes(wrappedRouteFn, clusterClient, - h.options.Clusters(), serviceOptionDefaults, instrumentOpts); err != nil { + + err = namespace.RegisterRoutes(h.registry, clusterClient, + h.options.Clusters(), serviceOptionDefaults, instrumentOpts) + if err != nil { return err } - if err := topic.RegisterRoutes(wrappedRouteFn, clusterClient, config, - instrumentOpts); err != nil { + + err = topic.RegisterRoutes(h.registry, clusterClient, config, instrumentOpts) + if err != nil { return err } @@ -361,8 +421,11 @@ func (h *Handler) RegisterRoutes() error { Tagged(remoteSource). Tagged(experimentalAPIGroup), ) - if err := wrappedRouteFn(annotated.WriteURL, experimentalAnnotatedWriteHandler, - annotated.WriteHTTPMethod); err != nil { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: annotated.WriteURL, + Handler: experimentalAnnotatedWriteHandler, + Methods: methods(annotated.WriteHTTPMethod), + }); err != nil { return err } } @@ -371,32 +434,40 @@ func (h *Handler) RegisterRoutes() error { if err := h.registerHealthEndpoints(); err != nil { return err } - h.registerProfileEndpoints() + if err := h.registerProfileEndpoints(); err != nil { + return err + } if err := h.registerRoutesEndpoint(); err != nil { return err } - // Register custom endpoints. + // Register custom endpoints last to have these conflict with + // any existing routes. for _, custom := range h.customHandlers { for _, method := range custom.Methods() { - routeName := routeName(custom.Route(), method) - route := h.router.Get(routeName) var prevHandler http.Handler - if route != nil { + route, prevRoute := h.registry.PathRoute(custom.Route(), method) + if prevRoute { prevHandler = route.GetHandler() } - customHandler, err := custom.Handler(nativeSourceOpts, prevHandler) + + handler, err := custom.Handler(nativeSourceOpts, prevHandler) if err != nil { - return fmt.Errorf("failed to register custom handler with path %s: %w", - routeName, err) + return err } - if route == nil { - if err := wrappedRouteFn(custom.Route(), customHandler, method); err != nil { + if !prevRoute { + if err := h.registry.Register(queryhttp.RegisterOptions{ + Path: custom.Route(), + Handler: handler, + Methods: methods(method), + }); err != nil { return err } } else { - route.Handler(wrapped(customHandler)) + // Do not re-instrument this route since the prev handler + // is already instrumented. + route.Handler(handler) } } } @@ -404,34 +475,6 @@ func (h *Handler) RegisterRoutes() error { return nil } -func (h *Handler) addRouteHandlerFn( - router *mux.Router, - path string, - handlerFn http.HandlerFunc, - methods ...string, -) error { - for _, method := range methods { - routeName := routeName(path, method) - if previousRoute := router.Get(routeName); previousRoute != nil { - return fmt.Errorf("route already exists: %s", routeName) - } - - router. - HandleFunc(path, handlerFn). - Name(routeName). - Methods(method) - } - - return nil -} - -func routeName(p string, method string) string { - if method == "" { - return p - } - return fmt.Sprintf("%s %s", p, method) -} - func (h *Handler) placementOpts() (placement.HandlerOptions, error) { return placement.NewHandlerOptions( h.options.ClusterClient(), @@ -466,44 +509,56 @@ func (h *Handler) m3AggServiceOptions() *handleroptions.M3AggServiceOptions { // Endpoints useful for profiling the service. func (h *Handler) registerHealthEndpoints() error { - return h.addRouteHandlerFn(h.router, healthURL, func(w http.ResponseWriter, r *http.Request) { - json.NewEncoder(w).Encode(struct { - Uptime string `json:"uptime"` - }{ - Uptime: time.Since(h.options.CreatedAt()).String(), - }) - }, http.MethodGet) + return h.registry.Register(queryhttp.RegisterOptions{ + Path: healthURL, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewEncoder(w).Encode(struct { + Uptime string `json:"uptime"` + }{ + Uptime: time.Since(h.options.CreatedAt()).String(), + }) + }), + Methods: methods(http.MethodGet), + }) } // Endpoints useful for profiling the service. -func (h *Handler) registerProfileEndpoints() { - h.router. - PathPrefix("/debug/pprof/"). - Handler(http.DefaultServeMux). - Name("/debug/pprof/") +func (h *Handler) registerProfileEndpoints() error { + return h.registry.Register(queryhttp.RegisterOptions{ + PathPrefix: "/debug/pprof", + Handler: http.DefaultServeMux, + }) } // Endpoints useful for viewing routes directory. func (h *Handler) registerRoutesEndpoint() error { - return h.addRouteHandlerFn(h.router, routesURL, func(w http.ResponseWriter, r *http.Request) { - var routes []string - err := h.router.Walk( - func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { - str, err := route.GetPathTemplate() - if err != nil { - return err - } - routes = append(routes, str) - return nil + return h.registry.Register(queryhttp.RegisterOptions{ + Path: routesURL, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var routes []string + err := h.registry.Walk( + func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { + str, err := route.GetPathTemplate() + if err != nil { + return err + } + routes = append(routes, str) + return nil + }) + if err != nil { + xhttp.WriteError(w, err) + return + } + json.NewEncoder(w).Encode(struct { + Routes []string `json:"routes"` + }{ + Routes: routes, }) - if err != nil { - xhttp.WriteError(w, err) - return - } - json.NewEncoder(w).Encode(struct { - Routes []string `json:"routes"` - }{ - Routes: routes, - }) - }, http.MethodGet) + }), + Methods: methods(http.MethodGet), + }) +} + +func methods(str ...string) []string { + return str } diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index 69ad5ba9ae..51b0b52e1f 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -29,9 +29,7 @@ import ( "time" "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" - dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" - "github.com/m3db/m3/src/dbnode/client" m3json "github.com/m3db/m3/src/query/api/v1/handler/json" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/native" @@ -43,6 +41,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/test/m3" "github.com/m3db/m3/src/query/ts/m3db" + "github.com/m3db/m3/src/query/util/queryhttp" "github.com/m3db/m3/src/x/instrument" xsync "github.com/m3db/m3/src/x/sync" @@ -91,6 +90,13 @@ func setupHandler( instrumentOpts := instrument.NewOptions() downsamplerAndWriter := ingest.NewDownsamplerAndWriter(store, nil, testWorkerPool, instrument.NewOptions()) engine := newEngine(store, time.Minute, instrumentOpts) + fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{ + Timeout: 15 * time.Second, + }) + if err != nil { + return nil, err + } opts, err := options.NewHandlerOptions( downsamplerAndWriter, makeTagOptions(), @@ -100,7 +106,7 @@ func setupHandler( nil, config.Configuration{LookbackDuration: &defaultLookbackDuration}, nil, - handleroptions.NewFetchOptionsBuilder(handleroptions.FetchOptionsBuilderOptions{}), + fetchOptsBuilder, models.QueryContextOptions{}, instrumentOpts, defaultCPUProfileduration, @@ -111,7 +117,6 @@ func setupHandler( graphite.M3WrappedStorageOptions{}, testM3DBOpts, ) - if err != nil { return nil, err } @@ -119,41 +124,6 @@ func setupHandler( return NewHandler(opts, customHandlers...), nil } -func TestHandlerFetchTimeout(t *testing.T) { - ctrl := gomock.NewController(t) - storage, _ := m3.NewStorageAndSession(t, ctrl) - downsamplerAndWriter := ingest.NewDownsamplerAndWriter(storage, nil, testWorkerPool, instrument.NewOptions()) - - fourMin := 4 * time.Minute - dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: &fourMin}} - engine := newEngine(storage, time.Minute, instrument.NewOptions()) - cfg := config.Configuration{LookbackDuration: &defaultLookbackDuration} - opts, err := options.NewHandlerOptions( - downsamplerAndWriter, - makeTagOptions(), - engine, - nil, - nil, - nil, - cfg, - dbconfig, - handleroptions.NewFetchOptionsBuilder(handleroptions.FetchOptionsBuilderOptions{}), - models.QueryContextOptions{}, - instrument.NewOptions(), - defaultCPUProfileduration, - defaultPlacementServices, - svcDefaultOptions, - nil, - nil, - graphite.M3WrappedStorageOptions{}, - testM3DBOpts, - ) - require.NoError(t, err) - - h := NewHandler(opts) - assert.Equal(t, 4*time.Minute, h.options.TimeoutOpts().FetchTimeout) -} - func TestPromRemoteReadGet(t *testing.T) { req := httptest.NewRequest("GET", remote.PromReadURL, nil) res := httptest.NewRecorder() @@ -162,7 +132,6 @@ func TestPromRemoteReadGet(t *testing.T) { h, err := setupHandler(storage) require.NoError(t, err, "unable to setup handler") - assert.Equal(t, 30*time.Second, h.options.TimeoutOpts().FetchTimeout) err = h.RegisterRoutes() require.NoError(t, err, "unable to register routes") h.Router().ServeHTTP(res, req) @@ -311,7 +280,7 @@ func TestCORSMiddleware(t *testing.T) { h, err := setupHandler(s) require.NoError(t, err, "unable to setup handler") - setupTestRoute(h.router) + setupTestRouteRegistry(h.registry) res := doTestRequest(h.Router()) assert.Equal(t, "hello!", res.Body.String()) @@ -328,7 +297,7 @@ func doTestRequest(handler http.Handler) *httptest.ResponseRecorder { func TestTracingMiddleware(t *testing.T) { mtr := mocktracer.New() router := mux.NewRouter() - setupTestRoute(router) + setupTestRouteRouter(router) handler := applyMiddleware(router, mtr) doTestRequest(handler) @@ -339,7 +308,7 @@ func TestTracingMiddleware(t *testing.T) { func TestCompressionMiddleware(t *testing.T) { mtr := mocktracer.New() router := mux.NewRouter() - setupTestRoute(router) + setupTestRouteRouter(router) handler := applyMiddleware(router, mtr) req := httptest.NewRequest("GET", testRoute, nil) @@ -355,7 +324,18 @@ func TestCompressionMiddleware(t *testing.T) { const testRoute = "/foobar" -func setupTestRoute(r *mux.Router) { +func setupTestRouteRegistry(r *queryhttp.EndpointRegistry) { + r.Register(queryhttp.RegisterOptions{ + Path: testRoute, + Handler: http.HandlerFunc(func(writer http.ResponseWriter, r *http.Request) { + writer.WriteHeader(http.StatusOK) + writer.Write([]byte("hello!")) + }), + Methods: methods(http.MethodGet), + }) +} + +func setupTestRouteRouter(r *mux.Router) { r.HandleFunc(testRoute, func(writer http.ResponseWriter, r *http.Request) { writer.WriteHeader(http.StatusOK) writer.Write([]byte("hello!")) @@ -380,10 +360,10 @@ func init() { type assertFn func(t *testing.T, prev http.Handler, r *http.Request) type customHandler struct { - t *testing.T + t *testing.T routeName string - methods []string - assertFn assertFn + methods []string + assertFn assertFn } func (h *customHandler) Route() string { return h.routeName } @@ -408,43 +388,46 @@ func TestCustomRoutes(t *testing.T) { instrumentOpts := instrument.NewOptions() downsamplerAndWriter := ingest.NewDownsamplerAndWriter(store, nil, testWorkerPool, instrument.NewOptions()) engine := newEngine(store, time.Minute, instrumentOpts) + fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{ + Timeout: 15 * time.Second, + }) + require.NoError(t, err) opts, err := options.NewHandlerOptions( downsamplerAndWriter, makeTagOptions().SetMetricName([]byte("z")), engine, nil, nil, nil, config.Configuration{LookbackDuration: &defaultLookbackDuration}, nil, - handleroptions.NewFetchOptionsBuilder(handleroptions.FetchOptionsBuilderOptions{}), - models.QueryContextOptions{}, instrumentOpts, defaultCPUProfileduration, + fetchOptsBuilder, models.QueryContextOptions{}, instrumentOpts, defaultCPUProfileduration, defaultPlacementServices, svcDefaultOptions, NewQueryRouter(), NewQueryRouter(), graphite.M3WrappedStorageOptions{}, testM3DBOpts) - require.NoError(t, err) custom := &customHandler{ - t: t, + t: t, routeName: "/custom", - methods: []string{http.MethodGet, http.MethodHead}, + methods: []string{http.MethodGet, http.MethodHead}, assertFn: func(t *testing.T, prev http.Handler, r *http.Request) { assert.Nil(t, prev, "Should not shadow already existing handler") }, } customShadowGet := &customHandler{ - t: t, + t: t, routeName: "/custom", - methods: []string{http.MethodGet}, + methods: []string{http.MethodGet}, assertFn: func(t *testing.T, prev http.Handler, r *http.Request) { assert.NotNil(t, prev, "Should shadow already existing handler") }, } customShadowHead := &customHandler{ - t: t, + t: t, routeName: "/custom", - methods: []string{http.MethodHead}, + methods: []string{http.MethodHead}, assertFn: func(t *testing.T, prev http.Handler, r *http.Request) { assert.NotNil(t, prev, "Should shadow already existing handler") }, } customNew := &customHandler{ - t: t, + t: t, routeName: "/custom/new", - methods: []string{http.MethodGet, http.MethodHead}, + methods: []string{http.MethodGet, http.MethodHead}, assertFn: func(t *testing.T, prev http.Handler, r *http.Request) { assert.Nil(t, prev, "Should not shadow already existing handler") }, @@ -472,23 +455,3 @@ func assertRoute(t *testing.T, routeName string, method string, handler *Handler handler.Router().ServeHTTP(res, req) require.Equal(t, expectedStatusCode, res.Code) } - -func TestRouteName(t *testing.T) { - assert.Equal( - t, - "/api/v1/test GET", - routeName("/api/v1/test", "GET"), - ) - - assert.Equal( - t, - "/api/v1/test", - routeName("/api/v1/test", ""), - ) - - assert.Equal( - t, - "/api/v1/test POST", - routeName("/api/v1/test", "POST"), - ) -} diff --git a/src/query/api/v1/options/handler.go b/src/query/api/v1/options/handler.go index a178b1bd33..db69f58611 100644 --- a/src/query/api/v1/options/handler.go +++ b/src/query/api/v1/options/handler.go @@ -30,7 +30,6 @@ import ( "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" - "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/executor" graphite "github.com/m3db/m3/src/query/graphite/storage" @@ -143,11 +142,6 @@ type HandlerOptions interface { // SetTagOptions sets the tag options. SetTagOptions(opts models.TagOptions) HandlerOptions - // TimeoutOpts returns the timeout options. - TimeoutOpts() *prometheus.TimeoutOpts - // SetTimeoutOpts sets the timeout options. - SetTimeoutOpts(t *prometheus.TimeoutOpts) HandlerOptions - // FetchOptionsBuilder returns the fetch options builder. FetchOptionsBuilder() handleroptions.FetchOptionsBuilder // SetFetchOptionsBuilder sets the fetch options builder. @@ -230,7 +224,6 @@ type handlerOptions struct { embeddedDbCfg *dbconfig.DBConfiguration createdAt time.Time tagOptions models.TagOptions - timeoutOpts *prometheus.TimeoutOpts fetchOptionsBuilder handleroptions.FetchOptionsBuilder queryContextOptions models.QueryContextOptions instrumentOpts instrument.Options @@ -275,13 +268,6 @@ func NewHandlerOptions( graphiteStorageOpts graphite.M3WrappedStorageOptions, m3dbOpts m3db.Options, ) (HandlerOptions, error) { - timeout := cfg.Query.TimeoutOrDefault() - if embeddedDbCfg != nil && - embeddedDbCfg.Client.FetchTimeout != nil && - *embeddedDbCfg.Client.FetchTimeout > timeout { - timeout = *embeddedDbCfg.Client.FetchTimeout - } - storeMetricsType := false if cfg.StoreMetricsType != nil { storeMetricsType = *cfg.StoreMetricsType @@ -306,14 +292,11 @@ func NewHandlerOptions( placementServiceNames: placementServiceNames, serviceOptionDefaults: serviceOptionDefaults, nowFn: time.Now, - timeoutOpts: &prometheus.TimeoutOpts{ - FetchTimeout: timeout, - }, - queryRouter: queryRouter, - instantQueryRouter: instantQueryRouter, - graphiteStorageOpts: graphiteStorageOpts, - m3dbOpts: m3dbOpts, - storeMetricsType: storeMetricsType, + queryRouter: queryRouter, + instantQueryRouter: instantQueryRouter, + graphiteStorageOpts: graphiteStorageOpts, + m3dbOpts: m3dbOpts, + storeMetricsType: storeMetricsType, }, nil } @@ -414,16 +397,6 @@ func (o *handlerOptions) SetTagOptions(tags models.TagOptions) HandlerOptions { return &opts } -func (o *handlerOptions) TimeoutOpts() *prometheus.TimeoutOpts { - return o.timeoutOpts -} - -func (o *handlerOptions) SetTimeoutOpts(t *prometheus.TimeoutOpts) HandlerOptions { - opts := *o - opts.timeoutOpts = t - return &opts -} - func (o *handlerOptions) FetchOptionsBuilder() handleroptions.FetchOptionsBuilder { return o.fetchOptionsBuilder } diff --git a/src/query/server/query.go b/src/query/server/query.go index 8dfdfa407b..303ea23fd4 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -273,18 +273,30 @@ func Run(runOpts RunOptions) { logger.Fatal("could not parse query restrict tags config", zap.Error(err)) } + timeout := cfg.Query.TimeoutOrDefault() + if runOpts.DBConfig != nil && + runOpts.DBConfig.Client.FetchTimeout != nil && + *runOpts.DBConfig.Client.FetchTimeout > timeout { + timeout = *runOpts.DBConfig.Client.FetchTimeout + } + + fetchOptsBuilderLimitsOpts := cfg.Limits.PerQuery.AsFetchOptionsBuilderLimitsOptions() + fetchOptsBuilder, err := handleroptions.NewFetchOptionsBuilder( + handleroptions.FetchOptionsBuilderOptions{ + Limits: fetchOptsBuilderLimitsOpts, + RestrictByTag: storageRestrictByTags, + Timeout: timeout, + }) + if err != nil { + logger.Fatal("could not set fetch options parser", zap.Error(err)) + } + var ( - clusterNamespacesWatcher m3.ClusterNamespacesWatcher - backendStorage storage.Storage - clusterClient clusterclient.Client - downsampler downsample.Downsampler - fetchOptsBuilderLimitsOpts = cfg.Limits.PerQuery.AsFetchOptionsBuilderLimitsOptions() - fetchOptsBuilder = handleroptions.NewFetchOptionsBuilder( - handleroptions.FetchOptionsBuilderOptions{ - Limits: fetchOptsBuilderLimitsOpts, - RestrictByTag: storageRestrictByTags, - }) - queryCtxOpts = models.QueryContextOptions{ + clusterNamespacesWatcher m3.ClusterNamespacesWatcher + backendStorage storage.Storage + clusterClient clusterclient.Client + downsampler downsample.Downsampler + queryCtxOpts = models.QueryContextOptions{ LimitMaxTimeseries: fetchOptsBuilderLimitsOpts.SeriesLimit, LimitMaxDocs: fetchOptsBuilderLimitsOpts.DocsLimit, RequireExhaustive: fetchOptsBuilderLimitsOpts.RequireExhaustive, diff --git a/src/query/util/logging/log.go b/src/query/util/logging/log.go index 07b31c7ce4..683eff8bfc 100644 --- a/src/query/util/logging/log.go +++ b/src/query/util/logging/log.go @@ -23,6 +23,7 @@ package logging import ( "context" "fmt" + "io" "net/http" "sync" "time" @@ -121,9 +122,24 @@ func withResponseTimeLogging( func withResponseTimeLoggingFunc( next func(w http.ResponseWriter, r *http.Request), instrumentOpts instrument.Options, + opts ...MiddlewareOption, ) http.HandlerFunc { + middlewareOpts := defaultMiddlewareOptions + for _, opt := range opts { + opt(&middlewareOpts) + } + + threshold := middlewareOpts.responseLogThreshold return func(w http.ResponseWriter, r *http.Request) { startTime := time.Now() + for _, fn := range middlewareOpts.preHooks { + fn(r) + } + + // Track status code. + statusCodeTracking := &statusCodeTracker{ResponseWriter: w} + w = statusCodeTracking.wrappedResponseWriter() + rqCtx := NewContextWithGeneratedID(r.Context(), instrumentOpts) logger := WithContext(rqCtx, instrumentOpts) @@ -137,10 +153,20 @@ func withResponseTimeLoggingFunc( next(w, r.WithContext(rqCtx)) endTime := time.Now() d := endTime.Sub(startTime) - if d > time.Second { + if threshold > 0 && d >= threshold { logger.Info("finished handling request", zap.Time("time", endTime), zap.Duration("response", d), zap.String("url", r.URL.RequestURI())) } + + for _, fn := range middlewareOpts.postHooks { + fn(r, RequestMiddlewareMetadata{ + Start: startTime, + End: endTime, + Duration: d, + WroteHeader: statusCodeTracking.wroteHeader, + StatusCode: statusCodeTracking.status, + }) + } } } @@ -225,8 +251,10 @@ func (w *responseWrittenResponseWriter) WriteHeader(statusCode int) { func WithResponseTimeAndPanicErrorLogging( next http.Handler, instrumentOpts instrument.Options, + opts ...MiddlewareOption, ) http.Handler { - return WithResponseTimeAndPanicErrorLoggingFunc(next.ServeHTTP, instrumentOpts) + return WithResponseTimeAndPanicErrorLoggingFunc(next.ServeHTTP, + instrumentOpts, opts...) } // WithResponseTimeAndPanicErrorLoggingFunc wraps around the http request @@ -234,10 +262,318 @@ func WithResponseTimeAndPanicErrorLogging( func WithResponseTimeAndPanicErrorLoggingFunc( next func(w http.ResponseWriter, r *http.Request), instrumentOpts instrument.Options, + opts ...MiddlewareOption, ) http.Handler { // Wrap panic first, to be able to capture slow requests that panic in the // logs. return withResponseTimeLoggingFunc( withPanicErrorResponderFunc(next, instrumentOpts), - instrumentOpts) + instrumentOpts, opts...) +} + +type middlewareOptions struct { + responseLogThreshold time.Duration + preHooks []PreRequestMiddleware + postHooks []PostRequestMiddleware +} + +var ( + defaultMiddlewareOptions = middlewareOptions{ + responseLogThreshold: time.Second, + } +) + +// PreRequestMiddleware is middleware that runs before a request. +type PreRequestMiddleware func(req *http.Request) + +// RequestMiddlewareMetadata is metadata available to middleware about a request. +type RequestMiddlewareMetadata struct { + Start time.Time + End time.Time + Duration time.Duration + WroteHeader bool + StatusCode int +} + +// PostRequestMiddleware is middleware that runs before a request. +type PostRequestMiddleware func(req *http.Request, meta RequestMiddlewareMetadata) + +// MiddlewareOption is an option to pass to a middleware. +type MiddlewareOption func(*middlewareOptions) + +// WithResponseLogThreshold is a middleware option to set response log threshold. +func WithResponseLogThreshold(threshold time.Duration) MiddlewareOption { + return func(opts *middlewareOptions) { + opts.responseLogThreshold = threshold + } +} + +// WithNoResponseLog is a middleware option to disable response logging. +func WithNoResponseLog() MiddlewareOption { + return func(opts *middlewareOptions) { + opts.responseLogThreshold = 0 + } +} + +// WithPreRequestMiddleware is a middleware option to set pre-request middleware. +func WithPreRequestMiddleware(m PreRequestMiddleware) MiddlewareOption { + return func(opts *middlewareOptions) { + opts.preHooks = append(opts.preHooks, m) + } +} + +// WithPostRequestMiddleware is a middleware option to set post-request middleware. +func WithPostRequestMiddleware(m PostRequestMiddleware) MiddlewareOption { + return func(opts *middlewareOptions) { + opts.postHooks = append(opts.postHooks, m) + } +} + +type statusCodeTracker struct { + http.ResponseWriter + status int + wroteHeader bool +} + +func (w *statusCodeTracker) WriteHeader(status int) { + w.status = status + w.wroteHeader = true + w.ResponseWriter.WriteHeader(status) +} + +func (w *statusCodeTracker) Write(b []byte) (int, error) { + if !w.wroteHeader { + w.wroteHeader = true + w.status = 200 + } + return w.ResponseWriter.Write(b) +} + +// wrappedResponseWriter returns a wrapped version of the original +// ResponseWriter and only implements the same combination of additional +// interfaces as the original. This implementation is based on +// https://github.com/felixge/httpsnoop. +func (w *statusCodeTracker) wrappedResponseWriter() http.ResponseWriter { + var ( + hj, i0 = w.ResponseWriter.(http.Hijacker) + cn, i1 = w.ResponseWriter.(http.CloseNotifier) + pu, i2 = w.ResponseWriter.(http.Pusher) + fl, i3 = w.ResponseWriter.(http.Flusher) + rf, i4 = w.ResponseWriter.(io.ReaderFrom) + ) + + switch { + case !i0 && !i1 && !i2 && !i3 && !i4: + return struct { + http.ResponseWriter + }{w} + case !i0 && !i1 && !i2 && !i3 && i4: + return struct { + http.ResponseWriter + io.ReaderFrom + }{w, rf} + case !i0 && !i1 && !i2 && i3 && !i4: + return struct { + http.ResponseWriter + http.Flusher + }{w, fl} + case !i0 && !i1 && !i2 && i3 && i4: + return struct { + http.ResponseWriter + http.Flusher + io.ReaderFrom + }{w, fl, rf} + case !i0 && !i1 && i2 && !i3 && !i4: + return struct { + http.ResponseWriter + http.Pusher + }{w, pu} + case !i0 && !i1 && i2 && !i3 && i4: + return struct { + http.ResponseWriter + http.Pusher + io.ReaderFrom + }{w, pu, rf} + case !i0 && !i1 && i2 && i3 && !i4: + return struct { + http.ResponseWriter + http.Pusher + http.Flusher + }{w, pu, fl} + case !i0 && !i1 && i2 && i3 && i4: + return struct { + http.ResponseWriter + http.Pusher + http.Flusher + io.ReaderFrom + }{w, pu, fl, rf} + case !i0 && i1 && !i2 && !i3 && !i4: + return struct { + http.ResponseWriter + http.CloseNotifier + }{w, cn} + case !i0 && i1 && !i2 && !i3 && i4: + return struct { + http.ResponseWriter + http.CloseNotifier + io.ReaderFrom + }{w, cn, rf} + case !i0 && i1 && !i2 && i3 && !i4: + return struct { + http.ResponseWriter + http.CloseNotifier + http.Flusher + }{w, cn, fl} + case !i0 && i1 && !i2 && i3 && i4: + return struct { + http.ResponseWriter + http.CloseNotifier + http.Flusher + io.ReaderFrom + }{w, cn, fl, rf} + case !i0 && i1 && i2 && !i3 && !i4: + return struct { + http.ResponseWriter + http.CloseNotifier + http.Pusher + }{w, cn, pu} + case !i0 && i1 && i2 && !i3 && i4: + return struct { + http.ResponseWriter + http.CloseNotifier + http.Pusher + io.ReaderFrom + }{w, cn, pu, rf} + case !i0 && i1 && i2 && i3 && !i4: + return struct { + http.ResponseWriter + http.CloseNotifier + http.Pusher + http.Flusher + }{w, cn, pu, fl} + case !i0 && i1 && i2 && i3 && i4: + return struct { + http.ResponseWriter + http.CloseNotifier + http.Pusher + http.Flusher + io.ReaderFrom + }{w, cn, pu, fl, rf} + case i0 && !i1 && !i2 && !i3 && !i4: + return struct { + http.ResponseWriter + http.Hijacker + }{w, hj} + case i0 && !i1 && !i2 && !i3 && i4: + return struct { + http.ResponseWriter + http.Hijacker + io.ReaderFrom + }{w, hj, rf} + case i0 && !i1 && !i2 && i3 && !i4: + return struct { + http.ResponseWriter + http.Hijacker + http.Flusher + }{w, hj, fl} + case i0 && !i1 && !i2 && i3 && i4: + return struct { + http.ResponseWriter + http.Hijacker + http.Flusher + io.ReaderFrom + }{w, hj, fl, rf} + case i0 && !i1 && i2 && !i3 && !i4: + return struct { + http.ResponseWriter + http.Hijacker + http.Pusher + }{w, hj, pu} + case i0 && !i1 && i2 && !i3 && i4: + return struct { + http.ResponseWriter + http.Hijacker + http.Pusher + io.ReaderFrom + }{w, hj, pu, rf} + case i0 && !i1 && i2 && i3 && !i4: + return struct { + http.ResponseWriter + http.Hijacker + http.Pusher + http.Flusher + }{w, hj, pu, fl} + case i0 && !i1 && i2 && i3 && i4: + return struct { + http.ResponseWriter + http.Hijacker + http.Pusher + http.Flusher + io.ReaderFrom + }{w, hj, pu, fl, rf} + case i0 && i1 && !i2 && !i3 && !i4: + return struct { + http.ResponseWriter + http.Hijacker + http.CloseNotifier + }{w, hj, cn} + case i0 && i1 && !i2 && !i3 && i4: + return struct { + http.ResponseWriter + http.Hijacker + http.CloseNotifier + io.ReaderFrom + }{w, hj, cn, rf} + case i0 && i1 && !i2 && i3 && !i4: + return struct { + http.ResponseWriter + http.Hijacker + http.CloseNotifier + http.Flusher + }{w, hj, cn, fl} + case i0 && i1 && !i2 && i3 && i4: + return struct { + http.ResponseWriter + http.Hijacker + http.CloseNotifier + http.Flusher + io.ReaderFrom + }{w, hj, cn, fl, rf} + case i0 && i1 && i2 && !i3 && !i4: + return struct { + http.ResponseWriter + http.Hijacker + http.CloseNotifier + http.Pusher + }{w, hj, cn, pu} + case i0 && i1 && i2 && !i3 && i4: + return struct { + http.ResponseWriter + http.Hijacker + http.CloseNotifier + http.Pusher + io.ReaderFrom + }{w, hj, cn, pu, rf} + case i0 && i1 && i2 && i3 && !i4: + return struct { + http.ResponseWriter + http.Hijacker + http.CloseNotifier + http.Pusher + http.Flusher + }{w, hj, cn, pu, fl} + case i0 && i1 && i2 && i3 && i4: + return struct { + http.ResponseWriter + http.Hijacker + http.CloseNotifier + http.Pusher + http.Flusher + io.ReaderFrom + }{w, hj, cn, pu, fl, rf} + default: + return struct { + http.ResponseWriter + }{w} + } } diff --git a/src/query/util/queryhttp/queryhttp.go b/src/query/util/queryhttp/queryhttp.go new file mode 100644 index 0000000000..9ad8fbcf47 --- /dev/null +++ b/src/query/util/queryhttp/queryhttp.go @@ -0,0 +1,273 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package queryhttp + +import ( + "fmt" + "net/http" + "strconv" + "sync" + + "github.com/m3db/m3/src/query/util/logging" + "github.com/m3db/m3/src/x/instrument" + "github.com/uber-go/tally" + + "github.com/gorilla/mux" +) + +var ( + histogramTimerOptions = instrument.NewHistogramTimerOptions( + instrument.HistogramTimerOptions{ + // Use sparse histogram timer buckets to not overload with latency metrics. + HistogramBuckets: instrument.SparseHistogramTimerHistogramBuckets(), + }) +) + +// NewEndpointRegistry returns a new endpoint registry. +func NewEndpointRegistry( + router *mux.Router, + instrumentOpts instrument.Options, +) *EndpointRegistry { + return &EndpointRegistry{ + router: router, + instrumentOpts: instrumentOpts.SetMetricsScope( + instrumentOpts.MetricsScope().SubScope("http_handler")), + registered: make(map[routeKey]*mux.Route), + } +} + +// EndpointRegistry is an endpoint registry that can register routes +// and instrument them. +type EndpointRegistry struct { + router *mux.Router + instrumentOpts instrument.Options + registered map[routeKey]*mux.Route +} + +type routeKey struct { + path string + pathPrefix string + method string +} + +// RegisterOptions are options for registering a handler. +type RegisterOptions struct { + Path string + PathPrefix string + Handler http.Handler + Methods []string +} + +// Register registers an endpoint. +func (r *EndpointRegistry) Register( + opts RegisterOptions, + middlewareOpts ...logging.MiddlewareOption, +) error { + // Wrap requests with response time logging as well as panic recovery. + var ( + route *mux.Route + metrics = newRouteMetrics(r.instrumentOpts) + middlewareOptions []logging.MiddlewareOption + ) + postRequestOption := logging.WithPostRequestMiddleware( + logging.PostRequestMiddleware(func( + r *http.Request, + meta logging.RequestMiddlewareMetadata, + ) { + if !meta.WroteHeader { + return + } + + p, err := route.GetPathTemplate() + if err != nil { + p = "unknown" + } + + counter, timer := metrics.metric(p, meta.StatusCode) + counter.Inc(1) + timer.Record(meta.Duration) + })) + middlewareOptions = append(middlewareOptions, postRequestOption) + middlewareOptions = append(middlewareOptions, middlewareOpts...) + + wrapped := func(n http.Handler) http.Handler { + return logging.WithResponseTimeAndPanicErrorLogging(n, r.instrumentOpts, + middlewareOptions...) + } + + handler := wrapped(opts.Handler) + if p := opts.Path; p != "" && len(opts.Methods) > 0 { + for _, method := range opts.Methods { + key := routeKey{ + path: p, + method: method, + } + if _, ok := r.registered[key]; ok { + return fmt.Errorf("route already exists: path=%s, method=%s", + p, method) + } + + route = r.router.HandleFunc(p, handler.ServeHTTP).Methods(method) + r.registered[key] = route + } + } else if p := opts.PathPrefix; p != "" { + key := routeKey{ + pathPrefix: p, + } + if _, ok := r.registered[key]; ok { + return fmt.Errorf("route already exists: pathPrefix=%s", p) + } + route = r.router.PathPrefix(p).Handler(handler) + r.registered[key] = route + } else { + return fmt.Errorf("no path and methods or path prefix set: +%v", opts) + } + + return nil +} + +// RegisterPathsOptions is options for registering multiple paths +// with the same handler. +type RegisterPathsOptions struct { + Handler http.Handler + Methods []string +} + +// RegisterPaths registers multiple paths for the same handler. +func (r *EndpointRegistry) RegisterPaths( + paths []string, + opts RegisterPathsOptions, + middlewareOpts ...logging.MiddlewareOption, +) error { + for _, p := range paths { + if err := r.Register(RegisterOptions{ + Path: p, + Handler: opts.Handler, + Methods: opts.Methods, + }); err != nil { + return err + } + } + return nil +} + +// PathRoute resolves a registered route that was registered by path and method, +// not by path prefix. +func (r *EndpointRegistry) PathRoute(path, method string) (*mux.Route, bool) { + key := routeKey{ + path: path, + method: method, + } + h, ok := r.registered[key] + return h, ok +} + +// PathPrefixRoute resolves a registered route that was registered by path +// prefix, not by path and method. +func (r *EndpointRegistry) PathPrefixRoute(pathPrefix string) (*mux.Route, bool) { + key := routeKey{ + pathPrefix: pathPrefix, + } + h, ok := r.registered[key] + return h, ok +} + +// Walk walks the router and all its sub-routers, calling walkFn for each route +// in the tree. The routes are walked in the order they were added. Sub-routers +// are explored depth-first. +func (r *EndpointRegistry) Walk(walkFn mux.WalkFunc) error { + return r.router.Walk(walkFn) +} + +func routeName(p string, method string) string { + if method == "" { + return p + } + return fmt.Sprintf("%s %s", p, method) +} + +type routeMetrics struct { + sync.RWMutex + instrumentOpts instrument.Options + metrics map[routeMetricKey]routeMetric + timers map[string]tally.Timer +} + +type routeMetricKey struct { + path string + status int +} + +type routeMetric struct { + status tally.Counter +} + +func newRouteMetrics(instrumentOpts instrument.Options) *routeMetrics { + return &routeMetrics{ + instrumentOpts: instrumentOpts, + metrics: make(map[routeMetricKey]routeMetric), + timers: make(map[string]tally.Timer), + } +} + +func (m *routeMetrics) metric(path string, status int) (tally.Counter, tally.Timer) { + key := routeMetricKey{ + path: path, + status: status, + } + m.RLock() + metric, ok1 := m.metrics[key] + timer, ok2 := m.timers[path] + m.RUnlock() + if ok1 && ok2 { + return metric.status, timer + } + + m.Lock() + defer m.Unlock() + + metric, ok1 = m.metrics[key] + timer, ok2 = m.timers[path] + if ok1 && ok2 { + return metric.status, timer + } + + scopePath := m.instrumentOpts.MetricsScope().Tagged(map[string]string{ + "path": path, + }) + + scopePathAndStatus := scopePath.Tagged(map[string]string{ + "status": strconv.Itoa(status), + }) + + if !ok1 { + metric = routeMetric{ + status: scopePathAndStatus.Counter("request"), + } + m.metrics[key] = metric + } + if !ok2 { + timer = instrument.NewTimer(scopePath, "latency", histogramTimerOptions) + m.timers[path] = timer + } + + return metric.status, timer +} diff --git a/src/x/debug/debug.go b/src/x/debug/debug.go index 47d109c5b5..341c562c54 100644 --- a/src/x/debug/debug.go +++ b/src/x/debug/debug.go @@ -40,6 +40,8 @@ import ( const ( // DebugURL is the url for the debug dump endpoint. DebugURL = "/debug/dump" + // DebugMethod is the HTTP method. + DebugMethod = http.MethodGet ) // Source is the interface that must be implemented to provide a new debug diff --git a/src/x/headers/headers.go b/src/x/headers/headers.go index 230521b744..65a4a74fb8 100644 --- a/src/x/headers/headers.go +++ b/src/x/headers/headers.go @@ -110,6 +110,9 @@ const ( // LimitHeader is the header added when returned series are limited. LimitHeader = M3HeaderPrefix + "Results-Limited" + // TimeoutHeader is the header added with the effective timeout. + TimeoutHeader = M3HeaderPrefix + "Timeout" + // LimitHeaderSeriesLimitApplied is the header applied when fetch results // are maxed. LimitHeaderSeriesLimitApplied = "max_fetch_series_limit_applied" diff --git a/src/x/instrument/methods.go b/src/x/instrument/methods.go index fd61e976ab..e3423d4010 100644 --- a/src/x/instrument/methods.go +++ b/src/x/instrument/methods.go @@ -54,6 +54,34 @@ func (o TimerOptions) NewTimer(scope tally.Scope, name string) tally.Timer { return NewTimer(scope, name, o) } +// SparseHistogramTimerHistogramBuckets returns a small spare set of +// histogram timer histogram buckets, from 1ms up to 8m. +func SparseHistogramTimerHistogramBuckets() tally.Buckets { + return tally.ValueBuckets{ + 0.001, + 0.005, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.25, + 0.5, + 0.75, + 1, + 2.5, + 5, + 7.5, + 10, + 25, + 50, + 75, + 100, + 250, + 500, + } +} + // DefaultHistogramTimerHistogramBuckets returns a set of default // histogram timer histogram buckets, from 2ms up to 1hr. func DefaultHistogramTimerHistogramBuckets() tally.Buckets {