diff --git a/src/query/api/v1/handler/database/common.go b/src/query/api/v1/handler/database/common.go index f7960909b9..950bed6396 100644 --- a/src/query/api/v1/handler/database/common.go +++ b/src/query/api/v1/handler/database/common.go @@ -21,16 +21,12 @@ package database import ( - "net/http" - clusterclient "github.com/m3db/m3/src/cluster/client" dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" - "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" - - "github.com/gorilla/mux" ) // Handler represents a generic handler for namespace endpoints. @@ -43,31 +39,27 @@ type Handler struct { // RegisterRoutes registers the namespace routes func RegisterRoutes( - r *mux.Router, + addRoute handler.AddRouteFn, client clusterclient.Client, cfg config.Configuration, embeddedDbCfg *dbconfig.DBConfiguration, defaults []handleroptions.ServiceOptionsDefault, instrumentOpts instrument.Options, ) error { - wrapped := func(n http.Handler) http.Handler { - return logging.WithResponseTimeAndPanicErrorLogging(n, instrumentOpts) - } - createHandler, err := NewCreateHandler(client, cfg, embeddedDbCfg, defaults, instrumentOpts) if err != nil { return err } - r.HandleFunc(CreateURL, - wrapped(createHandler).ServeHTTP). - Methods(CreateHTTPMethod) - // Register the same handler under two different endpoints. This just makes explaining things in // our documentation easier so we can separate out concepts, but share the underlying code. - r.HandleFunc(CreateURL, wrapped(createHandler).ServeHTTP).Methods(CreateHTTPMethod) - r.HandleFunc(CreateNamespaceURL, wrapped(createHandler).ServeHTTP).Methods(CreateNamespaceHTTPMethod) + if err := addRoute(CreateURL, createHandler, CreateHTTPMethod); err != nil { + return err + } + if err := addRoute(CreateNamespaceURL, createHandler, CreateNamespaceHTTPMethod); err != nil { + return err + } return nil } diff --git a/src/query/api/v1/handler/namespace/common.go b/src/query/api/v1/handler/namespace/common.go index 4e95e121ef..8290490395 100644 --- a/src/query/api/v1/handler/namespace/common.go +++ b/src/query/api/v1/handler/namespace/common.go @@ -31,13 +31,11 @@ import ( "github.com/m3db/m3/src/cluster/kv" nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/storage/m3" - "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" - - "github.com/gorilla/mux" ) const ( @@ -102,66 +100,93 @@ func Metadata(store kv.Store) ([]namespace.Metadata, int, error) { return nsMap.Metadatas(), value.Version(), nil } +type applyMiddlewareFn func( + svc handleroptions.ServiceNameAndDefaults, + w http.ResponseWriter, + r *http.Request, +) + +type addRouteFn func( + path string, + applyMiddlewareFn applyMiddlewareFn, + methods ...string, +) error + // RegisterRoutes registers the namespace routes. func RegisterRoutes( - r *mux.Router, + addRouteFn handler.AddRouteFn, client clusterclient.Client, clusters m3.Clusters, defaults []handleroptions.ServiceOptionsDefault, instrumentOpts instrument.Options, -) { - wrapped := func(n http.Handler) http.Handler { - return logging.WithResponseTimeAndPanicErrorLogging(n, instrumentOpts) - } - applyMiddleware := func( - f func(svc handleroptions.ServiceNameAndDefaults, - w http.ResponseWriter, r *http.Request), - defaults []handleroptions.ServiceOptionsDefault, - ) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - svc := handleroptions.ServiceNameAndDefaults{ - ServiceName: handleroptions.M3DBServiceName, - Defaults: defaults, - } - f(svc, w, r) - }) - } +) error { + addRoute := applyMiddlewareToRoute(addRouteFn, defaults) // Get M3DB namespaces. - getHandler := wrapped( - applyMiddleware(NewGetHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBGetURL, getHandler.ServeHTTP).Methods(GetHTTPMethod) + getHandler := NewGetHandler(client, instrumentOpts).ServeHTTP + if err := addRoute(M3DBGetURL, getHandler, GetHTTPMethod); err != nil { + return err + } // Add M3DB namespaces. - addHandler := wrapped( - applyMiddleware(NewAddHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBAddURL, addHandler.ServeHTTP).Methods(AddHTTPMethod) + addHandler := NewAddHandler(client, instrumentOpts).ServeHTTP + if err := addRoute(M3DBAddURL, addHandler, AddHTTPMethod); err != nil { + return err + } // Update M3DB namespaces. - updateHandler := wrapped( - applyMiddleware(NewUpdateHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBUpdateURL, updateHandler.ServeHTTP).Methods(UpdateHTTPMethod) + updateHandler := NewUpdateHandler(client, instrumentOpts).ServeHTTP + if err := addRoute(M3DBUpdateURL, updateHandler, UpdateHTTPMethod); err != nil { + return err + } // Delete M3DB namespaces. - deleteHandler := wrapped( - applyMiddleware(NewDeleteHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBDeleteURL, deleteHandler.ServeHTTP).Methods(DeleteHTTPMethod) + deleteHandler := NewDeleteHandler(client, instrumentOpts).ServeHTTP + if err := addRoute(M3DBDeleteURL, deleteHandler, DeleteHTTPMethod); err != nil { + return err + } // Deploy M3DB schemas. - schemaHandler := wrapped( - applyMiddleware(NewSchemaHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBSchemaURL, schemaHandler.ServeHTTP).Methods(SchemaDeployHTTPMethod) + schemaHandler := NewSchemaHandler(client, instrumentOpts).ServeHTTP + if err := addRoute(M3DBSchemaURL, schemaHandler, SchemaDeployHTTPMethod); err != nil { + return err + } // Reset M3DB schemas. - schemaResetHandler := wrapped( - applyMiddleware(NewSchemaResetHandler(client, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBSchemaURL, schemaResetHandler.ServeHTTP).Methods(DeleteHTTPMethod) + schemaResetHandler := NewSchemaResetHandler(client, instrumentOpts).ServeHTTP + if err := addRoute(M3DBSchemaURL, schemaResetHandler, DeleteHTTPMethod); err != nil { + return err + } // Mark M3DB namespace as ready. - readyHandler := wrapped( - applyMiddleware(NewReadyHandler(client, clusters, instrumentOpts).ServeHTTP, defaults)) - r.HandleFunc(M3DBReadyURL, readyHandler.ServeHTTP).Methods(ReadyHTTPMethod) + readyHandler := NewReadyHandler(client, clusters, instrumentOpts).ServeHTTP + if err := addRoute(M3DBReadyURL, readyHandler, ReadyHTTPMethod); err != nil { + return err + } + + return nil +} +func applyMiddlewareToRoute( + addRouteFn handler.AddRouteFn, + defaults []handleroptions.ServiceOptionsDefault, +) addRouteFn { + applyMiddleware := func( + applyMiddlewareFn applyMiddlewareFn, + defaults []handleroptions.ServiceOptionsDefault, + ) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + svc := handleroptions.ServiceNameAndDefaults{ + ServiceName: handleroptions.M3DBServiceName, + Defaults: defaults, + } + applyMiddlewareFn(svc, w, r) + }) + } + + return func(path string, f applyMiddlewareFn, methods ...string) error { + return addRouteFn(path, applyMiddleware(f, defaults), methods...) + } } func validateNamespaceAggregationOptions(mds []namespace.Metadata) error { diff --git a/src/query/api/v1/handler/placement/common.go b/src/query/api/v1/handler/placement/common.go index 66d732442c..4afba97187 100644 --- a/src/query/api/v1/handler/placement/common.go +++ b/src/query/api/v1/handler/placement/common.go @@ -37,13 +37,12 @@ import ( "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/cmd/services/m3query/config" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/util/logging" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" - - "github.com/gorilla/mux" ) const ( @@ -222,72 +221,117 @@ func ConvertInstancesProto(instancesProto []*placementpb.Instance) ([]placement. // RegisterRoutes registers the placement routes func RegisterRoutes( - r *mux.Router, + addRoute handler.AddRouteFn, defaults []handleroptions.ServiceOptionsDefault, opts HandlerOptions, -) { +) error { // Init var ( initHandler = NewInitHandler(opts) initFn = applyMiddleware(initHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBInitURL, initFn).Methods(InitHTTPMethod) - r.HandleFunc(M3AggInitURL, initFn).Methods(InitHTTPMethod) - r.HandleFunc(M3CoordinatorInitURL, initFn).Methods(InitHTTPMethod) + + if err := addRoute(M3DBInitURL, initFn, InitHTTPMethod); err != nil { + return err + } + if err := addRoute(M3AggInitURL, initFn, InitHTTPMethod); err != nil { + return err + } + if err := addRoute(M3CoordinatorInitURL, initFn, InitHTTPMethod); err != nil { + return err + } // Get var ( getHandler = NewGetHandler(opts) getFn = applyMiddleware(getHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBGetURL, getFn).Methods(GetHTTPMethod) - r.HandleFunc(M3AggGetURL, getFn).Methods(GetHTTPMethod) - r.HandleFunc(M3CoordinatorGetURL, getFn).Methods(GetHTTPMethod) + if err := addRoute(M3DBGetURL, getFn, GetHTTPMethod); err != nil { + return err + } + if err := addRoute(M3AggGetURL, getFn, GetHTTPMethod); err != nil { + return err + } + if err := addRoute(M3CoordinatorGetURL, getFn, GetHTTPMethod); err != nil { + return err + } // Delete all var ( deleteAllHandler = NewDeleteAllHandler(opts) deleteAllFn = applyMiddleware(deleteAllHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBDeleteAllURL, deleteAllFn).Methods(DeleteAllHTTPMethod) - r.HandleFunc(M3AggDeleteAllURL, deleteAllFn).Methods(DeleteAllHTTPMethod) - r.HandleFunc(M3CoordinatorDeleteAllURL, deleteAllFn).Methods(DeleteAllHTTPMethod) + if err := addRoute(M3DBDeleteAllURL, deleteAllFn, DeleteAllHTTPMethod); err != nil { + return err + } + if err := addRoute(M3AggDeleteAllURL, deleteAllFn, DeleteAllHTTPMethod); err != nil { + return err + } + if err := addRoute(M3CoordinatorDeleteAllURL, deleteAllFn, DeleteAllHTTPMethod); err != nil { + return err + } // Add var ( addHandler = NewAddHandler(opts) addFn = applyMiddleware(addHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBAddURL, addFn).Methods(AddHTTPMethod) - r.HandleFunc(M3AggAddURL, addFn).Methods(AddHTTPMethod) - r.HandleFunc(M3CoordinatorAddURL, addFn).Methods(AddHTTPMethod) + if err := addRoute(M3DBAddURL, addFn, AddHTTPMethod); err != nil { + return err + } + if err := addRoute(M3AggAddURL, addFn, AddHTTPMethod); err != nil { + return err + } + if err := addRoute(M3CoordinatorAddURL, addFn, AddHTTPMethod); err != nil { + return err + } // Delete var ( deleteHandler = NewDeleteHandler(opts) deleteFn = applyMiddleware(deleteHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBDeleteURL, deleteFn).Methods(DeleteHTTPMethod) - r.HandleFunc(M3AggDeleteURL, deleteFn).Methods(DeleteHTTPMethod) - r.HandleFunc(M3CoordinatorDeleteURL, deleteFn).Methods(DeleteHTTPMethod) + if err := addRoute(M3DBDeleteURL, deleteFn, DeleteHTTPMethod); err != nil { + return err + } + if err := addRoute(M3AggDeleteURL, deleteFn, DeleteHTTPMethod); err != nil { + return err + } + if err := addRoute(M3CoordinatorDeleteURL, deleteFn, DeleteHTTPMethod); err != nil { + return err + } // Replace var ( replaceHandler = NewReplaceHandler(opts) replaceFn = applyMiddleware(replaceHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBReplaceURL, replaceFn).Methods(ReplaceHTTPMethod) - r.HandleFunc(M3AggReplaceURL, replaceFn).Methods(ReplaceHTTPMethod) - r.HandleFunc(M3CoordinatorReplaceURL, replaceFn).Methods(ReplaceHTTPMethod) + if err := addRoute(M3DBReplaceURL, replaceFn, ReplaceHTTPMethod); err != nil { + return err + } + if err := addRoute(M3AggReplaceURL, replaceFn, ReplaceHTTPMethod); err != nil { + return err + } + if err := addRoute(M3CoordinatorReplaceURL, replaceFn, ReplaceHTTPMethod); err != nil { + return err + } // Set var ( setHandler = NewSetHandler(opts) setFn = applyMiddleware(setHandler.ServeHTTP, defaults, opts.instrumentOptions) ) - r.HandleFunc(M3DBSetURL, setFn).Methods(SetHTTPMethod) - r.HandleFunc(M3AggSetURL, setFn).Methods(SetHTTPMethod) - r.HandleFunc(M3CoordinatorSetURL, setFn).Methods(SetHTTPMethod) + if err := addRoute(M3DBSetURL, setFn, SetHTTPMethod); err != nil { + return err + } + if err := addRoute(M3AggSetURL, setFn, SetHTTPMethod); err != nil { + return err + } + if err := addRoute(M3CoordinatorSetURL, setFn, SetHTTPMethod); err != nil { + return err + } + + return nil } func newPlacementCutoverNanosFn( @@ -386,11 +430,11 @@ func applyMiddleware( f func(svc handleroptions.ServiceNameAndDefaults, w http.ResponseWriter, r *http.Request), defaults []handleroptions.ServiceOptionsDefault, instrumentOpts instrument.Options, -) func(w http.ResponseWriter, r *http.Request) { +) http.Handler { return logging.WithResponseTimeAndPanicErrorLoggingFunc( parseServiceMiddleware(f, defaults), instrumentOpts, - ).ServeHTTP + ) } func parseServiceMiddleware( diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index cbb72f4ad3..be153d0d4c 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -37,14 +37,26 @@ import ( ) const ( - // PromReadURL is the url for native prom read handler, this matches the + // PromReadURL is the URL for native prom read handler, this matches the // default URL for the query range endpoint found on a Prometheus server. PromReadURL = handler.RoutePrefixV1 + "/query_range" - // PromReadInstantURL is the url for native instantaneous prom read + // PromReadInstantURL is the URL for native instantaneous prom read // handler, this matches the default URL for the query endpoint // found on a Prometheus server. PromReadInstantURL = handler.RoutePrefixV1 + "/query" + + // PrometheusReadURL is the URL for native prom read handler. + PrometheusReadURL = "/prometheus" + PromReadURL + + // PrometheusReadInstantURL is the URL for native instantaneous prom read handler. + PrometheusReadInstantURL = "/prometheus" + PromReadInstantURL + + // M3QueryReadURL is the URL for native m3 query read handler. + M3QueryReadURL = "/m3query" + PromReadURL + + // M3QueryReadInstantURL is the URL for native instantaneous m3 query read handler. + M3QueryReadInstantURL = "/m3query" + PromReadInstantURL ) var ( diff --git a/src/query/api/v1/handler/topic/common.go b/src/query/api/v1/handler/topic/common.go index 24112ceb4b..dc8f461e36 100644 --- a/src/query/api/v1/handler/topic/common.go +++ b/src/query/api/v1/handler/topic/common.go @@ -28,14 +28,13 @@ import ( "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/msg/topic" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" - "github.com/m3db/m3/src/query/util/logging" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" - "github.com/gorilla/mux" ) const ( @@ -71,30 +70,33 @@ func Service(clusterClient clusterclient.Client, opts handleroptions.ServiceOpti // RegisterRoutes registers the topic routes func RegisterRoutes( - r *mux.Router, + addRoute handler.AddRouteFn, client clusterclient.Client, cfg config.Configuration, instrumentOpts instrument.Options, -) { - wrapped := func(n http.Handler) http.Handler { - return logging.WithResponseTimeAndPanicErrorLogging(n, instrumentOpts) +) error { + if err := addRoute(InitURL, newInitHandler(client, cfg, instrumentOpts), + InitHTTPMethod); err != nil { + return err + } + if err := addRoute(GetURL, newGetHandler(client, cfg, instrumentOpts), + GetHTTPMethod); err != nil { + return err + } + if err := addRoute(AddURL, newAddHandler(client, cfg, instrumentOpts), + AddHTTPMethod); err != nil { + return err + } + if err := addRoute(UpdateURL, newUpdateHandler(client, cfg, instrumentOpts), + UpdateHTTPMethod); err != nil { + return err + } + if err := addRoute(DeleteURL, newDeleteHandler(client, cfg, instrumentOpts), + DeleteHTTPMethod); err != nil { + return err } - r.HandleFunc(InitURL, - wrapped(newInitHandler(client, cfg, instrumentOpts)).ServeHTTP). - Methods(InitHTTPMethod) - r.HandleFunc(GetURL, - wrapped(newGetHandler(client, cfg, instrumentOpts)).ServeHTTP). - Methods(GetHTTPMethod) - r.HandleFunc(AddURL, - wrapped(newAddHandler(client, cfg, instrumentOpts)).ServeHTTP). - Methods(AddHTTPMethod) - r.HandleFunc(UpdateURL, - wrapped(newUpdateHandler(client, cfg, instrumentOpts)).ServeHTTP). - Methods(UpdateHTTPMethod) - r.HandleFunc(DeleteURL, - wrapped(newDeleteHandler(client, cfg, instrumentOpts)).ServeHTTP). - Methods(DeleteHTTPMethod) + return nil } func topicName(headers http.Header) string { diff --git a/src/query/api/v1/handler/types.go b/src/query/api/v1/handler/types.go index 54a6a605ae..5b0f523987 100644 --- a/src/query/api/v1/handler/types.go +++ b/src/query/api/v1/handler/types.go @@ -20,9 +20,14 @@ package handler +import "net/http" + // HeaderKeyType is the type for the header key. type HeaderKeyType int +// AddRouteFn is the function type for adding new HTTP route. +type AddRouteFn func(path string, handler http.Handler, methods ...string) error + const ( // HeaderKey is the key which headers will be added to in the request context. HeaderKey HeaderKeyType = iota diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 37ebd072b7..8e188b6a74 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -52,6 +52,7 @@ import ( "github.com/opentracing-contrib/go-stdlib/nethttp" opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/prometheus/util/httputil" + "go.uber.org/zap" ) const ( @@ -79,6 +80,7 @@ type Handler struct { handler http.Handler options options.HandlerOptions customHandlers []options.CustomHandler + logger *zap.Logger } // Router returns the http handler registered with all relevant routes for query. @@ -93,12 +95,14 @@ func NewHandler( ) *Handler { r := mux.NewRouter() handlerWithMiddleware := applyMiddleware(r, opentracing.GlobalTracer()) + logger := handlerOptions.InstrumentOpts().Logger() return &Handler{ router: r, handler: handlerWithMiddleware, options: handlerOptions, customHandlers: customHandlers, + logger: logger, } } @@ -127,9 +131,10 @@ func applyMiddleware(base *mux.Router, tracer opentracing.Tracer) http.Handler { // RegisterRoutes registers all http routes. func (h *Handler) RegisterRoutes() error { - instrumentOpts := h.options.InstrumentOpts() - // Wrap requests with response time logging as well as panic recovery. var ( + instrumentOpts = h.options.InstrumentOpts() + + // Wrap requests with response time logging as well as panic recovery. wrapped = func(n http.Handler) http.Handler { return logging.WithResponseTimeAndPanicErrorLogging(n, instrumentOpts) } @@ -137,13 +142,25 @@ func (h *Handler) RegisterRoutes() error { panicOnly = func(n http.Handler) http.Handler { return logging.WithPanicErrorResponder(n, instrumentOpts) } + + wrappedRouteFn = func(path string, handler http.Handler, methods ...string) error { + return h.addRouteHandlerFn(h.router, path, wrapped(handler).ServeHTTP, methods...) + } + + routeFn = func(path string, handler http.Handler, methods ...string) error { + return h.addRouteHandlerFn(h.router, path, handler.ServeHTTP, methods...) + } ) - h.router.HandleFunc(openapi.URL, - wrapped(openapi.NewDocHandler(instrumentOpts)).ServeHTTP, - ).Methods(openapi.HTTPMethod) + if err := wrappedRouteFn(openapi.URL, openapi.NewDocHandler(instrumentOpts), + openapi.HTTPMethod); err != nil { + return err + } + h.router.PathPrefix(openapi.StaticURLPrefix). - Handler(wrapped(openapi.StaticHandler())) + Handler(wrapped(openapi.StaticHandler())). + Name(openapi.StaticURLPrefix) + // Prometheus remote read/write endpoints. remoteSourceOpts := h.options.SetInstrumentOpts(instrumentOpts. @@ -164,24 +181,13 @@ func (h *Handler) RegisterRoutes() error { Tagged(v1APIGroup), )) - // Register custom endpoints. - for _, custom := range h.customHandlers { - handler, err := custom.Handler(nativeSourceOpts) - if err != nil { - return err - } - - h.router.HandleFunc(custom.Route(), handler.ServeHTTP). - Methods(custom.Methods()...) - } - opts := prom.Options{ PromQLEngine: h.options.PrometheusEngine(), } - promqlQueryHandler := wrapped(prom.NewReadHandler(opts, nativeSourceOpts)) - promqlInstantQueryHandler := wrapped(prom.NewReadInstantHandler(opts, nativeSourceOpts)) - nativePromReadHandler := wrapped(native.NewPromReadHandler(nativeSourceOpts)) - nativePromReadInstantHandler := wrapped(native.NewPromReadInstantHandler(nativeSourceOpts)) + promqlQueryHandler := prom.NewReadHandler(opts, nativeSourceOpts) + promqlInstantQueryHandler := prom.NewReadInstantHandler(opts, nativeSourceOpts) + nativePromReadHandler := native.NewPromReadHandler(nativeSourceOpts) + nativePromReadInstantHandler := native.NewPromReadInstantHandler(nativeSourceOpts) h.options.QueryRouter().Setup(options.QueryRouterOptions{ DefaultQueryEngine: h.options.DefaultQueryEngine(), @@ -195,71 +201,101 @@ func (h *Handler) RegisterRoutes() error { M3QueryHandler: nativePromReadInstantHandler.ServeHTTP, }) - h.router. - HandleFunc(native.PromReadURL, h.options.QueryRouter().ServeHTTP). - Methods(native.PromReadHTTPMethods...) - h.router. - HandleFunc(native.PromReadInstantURL, h.options.InstantQueryRouter().ServeHTTP). - Methods(native.PromReadInstantHTTPMethods...) + if err := wrappedRouteFn(native.PromReadURL, h.options.QueryRouter(), + native.PromReadHTTPMethods...); err != nil { + return err + } + if err := wrappedRouteFn(native.PromReadInstantURL, h.options.InstantQueryRouter(), + native.PromReadInstantHTTPMethods...); err != nil { + return err + } - h.router.HandleFunc("/prometheus"+native.PromReadURL, promqlQueryHandler.ServeHTTP).Methods(native.PromReadHTTPMethods...) - h.router.HandleFunc("/prometheus"+native.PromReadInstantURL, promqlInstantQueryHandler.ServeHTTP).Methods(native.PromReadInstantHTTPMethods...) + if err := wrappedRouteFn(native.PrometheusReadURL, promqlQueryHandler, + native.PromReadHTTPMethods...); err != nil { + return err + } + + if err := wrappedRouteFn(native.PrometheusReadInstantURL, promqlInstantQueryHandler, + native.PromReadInstantHTTPMethods...); err != nil { + return err + } - h.router.HandleFunc(remote.PromReadURL, - wrapped(promRemoteReadHandler).ServeHTTP, - ).Methods(remote.PromReadHTTPMethods...) - h.router.HandleFunc(remote.PromWriteURL, - panicOnly(promRemoteWriteHandler).ServeHTTP, - ).Methods(remote.PromWriteHTTPMethod) - h.router.HandleFunc("/m3query"+native.PromReadURL, nativePromReadHandler.ServeHTTP).Methods(native.PromReadHTTPMethods...) - h.router.HandleFunc("/m3query"+native.PromReadInstantURL, nativePromReadInstantHandler.ServeHTTP).Methods(native.PromReadInstantHTTPMethods...) + if err := wrappedRouteFn(remote.PromReadURL, promRemoteReadHandler, + remote.PromReadHTTPMethods...); err != nil { + return err + } + if err := routeFn(remote.PromWriteURL, panicOnly(promRemoteWriteHandler), + remote.PromWriteHTTPMethod); err != nil { + return err + } + + if err := wrappedRouteFn(native.M3QueryReadURL, nativePromReadHandler, + native.PromReadHTTPMethods...); err != nil { + return err + } + if err := wrappedRouteFn(native.M3QueryReadInstantURL, nativePromReadInstantHandler, + native.PromReadInstantHTTPMethods...); err != nil { + return err + } // InfluxDB write endpoint. - h.router.HandleFunc(influxdb.InfluxWriteURL, - wrapped(influxdb.NewInfluxWriterHandler(h.options)).ServeHTTP).Methods(influxdb.InfluxWriteHTTPMethod) + if err := wrappedRouteFn(influxdb.InfluxWriteURL, influxdb.NewInfluxWriterHandler(h.options), + influxdb.InfluxWriteHTTPMethod); err != nil { + return err + } // Native M3 search and write endpoints. - h.router.HandleFunc(handler.SearchURL, - wrapped(handler.NewSearchHandler(h.options)).ServeHTTP, - ).Methods(handler.SearchHTTPMethod) - h.router.HandleFunc(m3json.WriteJSONURL, - wrapped(m3json.NewWriteJSONHandler(h.options)).ServeHTTP, - ).Methods(m3json.JSONWriteHTTPMethod) + if err := wrappedRouteFn(handler.SearchURL, handler.NewSearchHandler(h.options), + handler.SearchHTTPMethod); err != nil { + return err + } + if err := wrappedRouteFn(m3json.WriteJSONURL, m3json.NewWriteJSONHandler(h.options), + m3json.JSONWriteHTTPMethod); err != nil { + return err + } // Tag completion endpoints. - h.router.HandleFunc(native.CompleteTagsURL, - wrapped(native.NewCompleteTagsHandler(h.options)).ServeHTTP, - ).Methods(native.CompleteTagsHTTPMethod) - h.router.HandleFunc(remote.TagValuesURL, - wrapped(remote.NewTagValuesHandler(h.options)).ServeHTTP, - ).Methods(remote.TagValuesHTTPMethod) + if err := wrappedRouteFn(native.CompleteTagsURL, native.NewCompleteTagsHandler(h.options), + native.CompleteTagsHTTPMethod); err != nil { + return err + } + if err := wrappedRouteFn(remote.TagValuesURL, remote.NewTagValuesHandler(h.options), + remote.TagValuesHTTPMethod); err != nil { + return err + } // List tag endpoints. - h.router.HandleFunc(native.ListTagsURL, - wrapped(native.NewListTagsHandler(h.options)).ServeHTTP, - ).Methods(native.ListTagsHTTPMethods...) + if err := wrappedRouteFn(native.ListTagsURL, native.NewListTagsHandler(h.options), + native.ListTagsHTTPMethods...); err != nil { + return err + } // Query parse endpoints. - h.router.HandleFunc(native.PromParseURL, - wrapped(native.NewPromParseHandler(h.options)).ServeHTTP, - ).Methods(native.PromParseHTTPMethod) - h.router.HandleFunc(native.PromThresholdURL, - wrapped(native.NewPromThresholdHandler(h.options)).ServeHTTP, - ).Methods(native.PromThresholdHTTPMethod) + if err := wrappedRouteFn(native.PromParseURL, native.NewPromParseHandler(h.options), + native.PromParseHTTPMethod); err != nil { + return err + } + if err := wrappedRouteFn(native.PromThresholdURL, native.NewPromThresholdHandler(h.options), + native.PromThresholdHTTPMethod); err != nil { + return err + } // Series match endpoints. - h.router.HandleFunc(remote.PromSeriesMatchURL, - wrapped(remote.NewPromSeriesMatchHandler(h.options)).ServeHTTP, - ).Methods(remote.PromSeriesMatchHTTPMethods...) + if err := wrappedRouteFn(remote.PromSeriesMatchURL, + remote.NewPromSeriesMatchHandler(h.options), + remote.PromSeriesMatchHTTPMethods...); err != nil { + return err + } // Graphite endpoints. - h.router.HandleFunc(graphite.ReadURL, - wrapped(graphite.NewRenderHandler(h.options)).ServeHTTP, - ).Methods(graphite.ReadHTTPMethods...) - - h.router.HandleFunc(graphite.FindURL, - wrapped(graphite.NewFindHandler(h.options)).ServeHTTP, - ).Methods(graphite.FindHTTPMethods...) + if err := wrappedRouteFn(graphite.ReadURL, graphite.NewRenderHandler(h.options), + graphite.ReadHTTPMethods...); err != nil { + return err + } + if err := wrappedRouteFn(graphite.FindURL, graphite.NewFindHandler(h.options), + graphite.FindHTTPMethods...); err != nil { + return err + } placementOpts, err := h.placementOpts() if err != nil { @@ -293,21 +329,28 @@ func (h *Handler) RegisterRoutes() error { } // Register debug dump handler. - h.router.HandleFunc(xdebug.DebugURL, - wrapped(debugWriter.HTTPHandler()).ServeHTTP) + if err := wrappedRouteFn(xdebug.DebugURL, debugWriter.HTTPHandler()); err != nil { + return err + } if clusterClient != nil { - err = database.RegisterRoutes(h.router, clusterClient, + if err := database.RegisterRoutes(wrappedRouteFn, clusterClient, h.options.Config(), h.options.EmbeddedDbCfg(), - serviceOptionDefaults, instrumentOpts) - if err != nil { + serviceOptionDefaults, instrumentOpts); err != nil { + return err + } + if err := placement.RegisterRoutes(routeFn, serviceOptionDefaults, + placementOpts); err != nil { + return err + } + if err := namespace.RegisterRoutes(wrappedRouteFn, clusterClient, + h.options.Clusters(), serviceOptionDefaults, instrumentOpts); err != nil { + return err + } + if err := topic.RegisterRoutes(wrappedRouteFn, clusterClient, config, + instrumentOpts); err != nil { return err } - - placement.RegisterRoutes(h.router, - serviceOptionDefaults, placementOpts) - namespace.RegisterRoutes(h.router, clusterClient, h.options.Clusters(), serviceOptionDefaults, instrumentOpts) - topic.RegisterRoutes(h.router, clusterClient, config, instrumentOpts) // Experimental endpoints. if config.Experimental.Enabled { @@ -318,19 +361,77 @@ func (h *Handler) RegisterRoutes() error { Tagged(remoteSource). Tagged(experimentalAPIGroup), ) - h.router.HandleFunc(annotated.WriteURL, - wrapped(experimentalAnnotatedWriteHandler).ServeHTTP, - ).Methods(annotated.WriteHTTPMethod) + if err := wrappedRouteFn(annotated.WriteURL, experimentalAnnotatedWriteHandler, + annotated.WriteHTTPMethod); err != nil { + return err + } } } - h.registerHealthEndpoints() + if err := h.registerHealthEndpoints(); err != nil { + return err + } h.registerProfileEndpoints() - h.registerRoutesEndpoint() + if err := h.registerRoutesEndpoint(); err != nil { + return err + } + + // Register custom endpoints. + for _, custom := range h.customHandlers { + for _, method := range custom.Methods() { + routeName := routeName(custom.Route(), method) + route := h.router.Get(routeName) + var prevHandler http.Handler + if route != nil { + prevHandler = route.GetHandler() + } + customHandler, err := custom.Handler(nativeSourceOpts, prevHandler) + if err != nil { + return fmt.Errorf("failed to register custom handler with path %s: %w", + routeName, err) + } + + if route == nil { + if err := wrappedRouteFn(custom.Route(), customHandler, method); err != nil { + return err + } + } else { + route.Handler(wrapped(customHandler)) + } + } + } return nil } +func (h *Handler) addRouteHandlerFn( + router *mux.Router, + path string, + handlerFn http.HandlerFunc, + methods ...string, +) error { + for _, method := range methods { + routeName := routeName(path, method) + if previousRoute := router.Get(routeName); previousRoute != nil { + return fmt.Errorf("route already exists: %s", routeName) + } + + router. + HandleFunc(path, handlerFn). + Name(routeName). + Methods(method) + } + + return nil +} + +func routeName(p string, method string) string { + if method == "" { + return p + } + return fmt.Sprintf("%s %s", p, method) +} + func (h *Handler) placementOpts() (placement.HandlerOptions, error) { return placement.NewHandlerOptions( h.options.ClusterClient(), @@ -364,24 +465,27 @@ func (h *Handler) m3AggServiceOptions() *handleroptions.M3AggServiceOptions { } // Endpoints useful for profiling the service. -func (h *Handler) registerHealthEndpoints() { - h.router.HandleFunc(healthURL, func(w http.ResponseWriter, r *http.Request) { +func (h *Handler) registerHealthEndpoints() error { + return h.addRouteHandlerFn(h.router, healthURL, func(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(struct { Uptime string `json:"uptime"` }{ Uptime: time.Since(h.options.CreatedAt()).String(), }) - }).Methods(http.MethodGet) + }, http.MethodGet) } // Endpoints useful for profiling the service. func (h *Handler) registerProfileEndpoints() { - h.router.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux) + h.router. + PathPrefix("/debug/pprof/"). + Handler(http.DefaultServeMux). + Name("/debug/pprof/") } // Endpoints useful for viewing routes directory. -func (h *Handler) registerRoutesEndpoint() { - h.router.HandleFunc(routesURL, func(w http.ResponseWriter, r *http.Request) { +func (h *Handler) registerRoutesEndpoint() error { + return h.addRouteHandlerFn(h.router, routesURL, func(w http.ResponseWriter, r *http.Request) { var routes []string err := h.router.Walk( func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { @@ -401,5 +505,5 @@ func (h *Handler) registerRoutesEndpoint() { }{ Routes: routes, }) - }).Methods(http.MethodGet) + }, http.MethodGet) } diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index f0e64b3c65..69ad5ba9ae 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -377,26 +377,32 @@ func init() { testWorkerPool.Init() } +type assertFn func(t *testing.T, prev http.Handler, r *http.Request) + type customHandler struct { t *testing.T + routeName string + methods []string + assertFn assertFn } -func (h *customHandler) Route() string { return "/custom" } -func (h *customHandler) Methods() []string { return []string{http.MethodGet} } +func (h *customHandler) Route() string { return h.routeName } +func (h *customHandler) Methods() []string { return h.methods } func (h *customHandler) Handler( opts options.HandlerOptions, + prev http.Handler, ) (http.Handler, error) { assert.Equal(h.t, "z", string(opts.TagOptions().MetricName())) fn := func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("success!")) + h.assertFn(h.t, prev, r) + _, err := w.Write([]byte("success!")) + require.NoError(h.t, err) } return http.HandlerFunc(fn), nil } func TestCustomRoutes(t *testing.T) { - req := httptest.NewRequest(http.MethodGet, "/custom", nil) - res := httptest.NewRecorder() ctrl := gomock.NewController(t) store, _ := m3.NewStorageAndSession(t, ctrl) instrumentOpts := instrument.NewOptions() @@ -411,11 +417,78 @@ func TestCustomRoutes(t *testing.T) { graphite.M3WrappedStorageOptions{}, testM3DBOpts) require.NoError(t, err) - custom := &customHandler{t: t} - handler := NewHandler(opts, custom) + custom := &customHandler{ + t: t, + routeName: "/custom", + methods: []string{http.MethodGet, http.MethodHead}, + assertFn: func(t *testing.T, prev http.Handler, r *http.Request) { + assert.Nil(t, prev, "Should not shadow already existing handler") + }, + } + customShadowGet := &customHandler{ + t: t, + routeName: "/custom", + methods: []string{http.MethodGet}, + assertFn: func(t *testing.T, prev http.Handler, r *http.Request) { + assert.NotNil(t, prev, "Should shadow already existing handler") + }, + } + customShadowHead := &customHandler{ + t: t, + routeName: "/custom", + methods: []string{http.MethodHead}, + assertFn: func(t *testing.T, prev http.Handler, r *http.Request) { + assert.NotNil(t, prev, "Should shadow already existing handler") + }, + } + customNew := &customHandler{ + t: t, + routeName: "/custom/new", + methods: []string{http.MethodGet, http.MethodHead}, + assertFn: func(t *testing.T, prev http.Handler, r *http.Request) { + assert.Nil(t, prev, "Should not shadow already existing handler") + }, + } + handler := NewHandler(opts, custom, customShadowGet, customShadowHead, customNew) require.NoError(t, err, "unable to setup handler") err = handler.RegisterRoutes() require.NoError(t, err, "unable to register routes") + + for _, method := range custom.methods { + assertRoute(t, custom.routeName, method, handler, http.StatusOK) + } + + for _, method := range customNew.methods { + assertRoute(t, customNew.routeName, method, handler, http.StatusOK) + } + + assertRoute(t, customNew.routeName, http.MethodPost, handler, http.StatusMethodNotAllowed) + assertRoute(t, "/unknown", http.MethodGet, handler, http.StatusNotFound) +} + +func assertRoute(t *testing.T, routeName string, method string, handler *Handler, expectedStatusCode int) { + req := httptest.NewRequest(method, routeName, nil) + res := httptest.NewRecorder() handler.Router().ServeHTTP(res, req) - require.Equal(t, res.Code, http.StatusOK) + require.Equal(t, expectedStatusCode, res.Code) +} + +func TestRouteName(t *testing.T) { + assert.Equal( + t, + "/api/v1/test GET", + routeName("/api/v1/test", "GET"), + ) + + assert.Equal( + t, + "/api/v1/test", + routeName("/api/v1/test", ""), + ) + + assert.Equal( + t, + "/api/v1/test POST", + routeName("/api/v1/test", "POST"), + ) } diff --git a/src/query/api/v1/options/handler.go b/src/query/api/v1/options/handler.go index 15eab3f99f..a178b1bd33 100644 --- a/src/query/api/v1/options/handler.go +++ b/src/query/api/v1/options/handler.go @@ -71,7 +71,9 @@ type CustomHandler interface { // Methods is the list of http methods this handler services. Methods() []string // Handler is the custom handler itself. - Handler(handlerOptions HandlerOptions) (http.Handler, error) + // prev is optional argument for getting already registered handler for the same route. + // If there is nothing to override, prev will be nil. + Handler(handlerOptions HandlerOptions, prev http.Handler) (http.Handler, error) } // QueryRouter is responsible for routing queries between promql and m3query.