Skip to content

Commit

Permalink
Prom read instant handler refactoring (#2928)
Browse files Browse the repository at this point in the history
* [query] Allow customizing request parsing for /query handler

* fix lint errors

* fix another lint error

* PR comments

* add an interface for hooks

* Removed read instant handler.

* Added some comments.

* Uses functional options pattern.

* Added comments for exported functions.

* Simplified options.

* Got rid of 2 fields in readHandler.

Co-authored-by: Vilius Pranckaitis <[email protected]>
Co-authored-by: Rob Skillington <[email protected]>
  • Loading branch information
3 people authored Nov 24, 2020
1 parent 1cd71f1 commit 5f8ca2c
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 201 deletions.
9 changes: 6 additions & 3 deletions src/query/api/v1/handler/prom/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ const (

type errorType string

type queryData struct {
// QueryData struct to be used when responding from HTTP handler.
type QueryData struct {
ResultType promql.ValueType `json:"resultType"`
Result promql.Value `json:"result"`
}
Expand All @@ -66,7 +67,8 @@ type response struct {
Warnings []string `json:"warnings,omitempty"`
}

func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) {
// Responds with HTTP OK status code and writes response JSON to response body.
func Respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) {
statusMessage := statusSuccess
var warningStrings []string
for _, warning := range warnings {
Expand All @@ -88,7 +90,8 @@ func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warni
w.Write(b)
}

func respondError(w http.ResponseWriter, err error) {
// Responds with error status code and writes error JSON to response body.
func RespondError(w http.ResponseWriter, err error) {
json := jsoniter.ConfigCompatibleWithStandardLibrary
b, marshalErr := json.Marshal(&response{
Status: statusError,
Expand Down
67 changes: 53 additions & 14 deletions src/query/api/v1/handler/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import (
"net/http"
"time"

promstorage "github.com/prometheus/prometheus/storage"

"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/storage/prometheus"

"github.com/prometheus/prometheus/promql"
Expand All @@ -38,29 +41,65 @@ func init() {
promql.SetDefaultEvaluationInterval(time.Minute)
}

// Options defines options for PromQL handler.
type Options struct {
PromQLEngine *promql.Engine
// opts defines options for PromQL handler.
type opts struct {
promQLEngine *promql.Engine
instant bool
queryable promstorage.Queryable
newQueryFn NewQueryFn
}

// NewReadHandler creates a handler to handle PromQL requests.
func NewReadHandler(opts Options, hOpts options.HandlerOptions) http.Handler {
queryable := prometheus.NewPrometheusQueryable(
prometheus.PrometheusOptions{
Storage: hOpts.Storage(),
InstrumentOptions: hOpts.InstrumentOpts(),
})
return newReadHandler(opts, hOpts, queryable)
// Option is a Prometheus handler option.
type Option func(*opts) error

// WithEngine sets the PromQL engine.
func WithEngine(promQLEngine *promql.Engine) Option {
return withEngine(promQLEngine, false)
}

// WithInstantEngine sets the PromQL instant engine.
func WithInstantEngine(promQLEngine *promql.Engine) Option {
return withEngine(promQLEngine, true)
}

func withEngine(promQLEngine *promql.Engine, instant bool) Option {
return func(o *opts) error {
if promQLEngine == nil {
return errors.New("invalid engine")
}
o.instant = instant
o.promQLEngine = promQLEngine
o.newQueryFn = newRangeQueryFn(promQLEngine, o.queryable)
if instant {
o.newQueryFn = newInstantQueryFn(promQLEngine, o.queryable)
}
return nil
}
}

// NewReadInstantHandler creates a handler to handle PromQL requests.
func NewReadInstantHandler(opts Options, hOpts options.HandlerOptions) http.Handler {
func newDefaultOptions(hOpts options.HandlerOptions) opts {
queryable := prometheus.NewPrometheusQueryable(
prometheus.PrometheusOptions{
Storage: hOpts.Storage(),
InstrumentOptions: hOpts.InstrumentOpts(),
})
return newReadInstantHandler(opts, hOpts, queryable)
return opts{
promQLEngine: hOpts.PrometheusEngine(),
queryable: queryable,
instant: false,
newQueryFn: newRangeQueryFn(hOpts.PrometheusEngine(), queryable),
}
}

// NewReadHandler creates a handler to handle PromQL requests.
func NewReadHandler(hOpts options.HandlerOptions, options ...Option) (http.Handler, error) {
opts := newDefaultOptions(hOpts)
for _, optionFn := range options {
if err := optionFn(&opts); err != nil {
return nil, err
}
}
return newReadHandler(hOpts, opts)
}

// ApplyRangeWarnings applies warnings encountered during execution.
Expand Down
97 changes: 63 additions & 34 deletions src/query/api/v1/handler/prom/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/native"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage/prometheus"
xerrors "github.com/m3db/m3/src/x/errors"

Expand All @@ -40,94 +41,122 @@ import (
"go.uber.org/zap"
)

// NewQueryFn creates a new promql Query.
type NewQueryFn func(params models.RequestParams) (promql.Query, error)

var (
newRangeQueryFn = func(
engine *promql.Engine,
queryable promstorage.Queryable,
) NewQueryFn {
return func(params models.RequestParams) (promql.Query, error) {
return engine.NewRangeQuery(
queryable,
params.Query,
params.Start,
params.End,
params.Step)
}
}

newInstantQueryFn = func(
engine *promql.Engine,
queryable promstorage.Queryable,
) NewQueryFn {
return func(params models.RequestParams) (promql.Query, error) {
return engine.NewInstantQuery(
queryable,
params.Query,
params.Now)
}
}
)

type readHandler struct {
engine *promql.Engine
queryable promstorage.Queryable
hOpts options.HandlerOptions
scope tally.Scope
logger *zap.Logger
hOpts options.HandlerOptions
scope tally.Scope
logger *zap.Logger
opts opts
}

func newReadHandler(
opts Options,
hOpts options.HandlerOptions,
queryable promstorage.Queryable,
) http.Handler {
options opts,
) (http.Handler, error) {
scope := hOpts.InstrumentOpts().MetricsScope().Tagged(
map[string]string{"handler": "prometheus-read"},
)
return &readHandler{
engine: opts.PromQLEngine,
queryable: queryable,
hOpts: hOpts,
scope: scope,
logger: hOpts.InstrumentOpts().Logger(),
}
hOpts: hOpts,
opts: options,
scope: scope,
logger: hOpts.InstrumentOpts().Logger(),
}, nil
}

func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

fetchOptions, err := h.hOpts.FetchOptionsBuilder().NewFetchOptions(r)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

request, err := native.ParseRequest(ctx, r, false, h.hOpts)
request, err := native.ParseRequest(ctx, r, h.opts.instant, h.hOpts)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

params := request.Params

// NB (@shreyas): We put the FetchOptions in context so it can be
// retrieved in the queryable object as there is no other way to pass
// that through.
var resultMetadata block.ResultMetadata
ctx = context.WithValue(ctx, prometheus.FetchOptionsContextKey, fetchOptions)
ctx = context.WithValue(ctx, prometheus.BlockResultMetadataKey, &resultMetadata)

if request.Params.Timeout > 0 {
if params.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, request.Params.Timeout)
ctx, cancel = context.WithTimeout(ctx, params.Timeout)
defer cancel()
}

qry, err := h.engine.NewRangeQuery(
h.queryable,
request.Params.Query,
request.Params.Start,
request.Params.End,
request.Params.Step)
qry, err := h.opts.newQueryFn(params)
if err != nil {
h.logger.Error("error creating range query",
zap.Error(err), zap.String("query", request.Params.Query))
respondError(w, xerrors.NewInvalidParamsError(err))
h.logger.Error("error creating query",
zap.Error(err), zap.String("query", params.Query),
zap.Bool("instant", h.opts.instant))
RespondError(w, xerrors.NewInvalidParamsError(err))
return
}
defer qry.Close()

res := qry.Exec(ctx)
if res.Err != nil {
h.logger.Error("error executing range query",
zap.Error(res.Err), zap.String("query", request.Params.Query))
respondError(w, res.Err)
h.logger.Error("error executing query",
zap.Error(res.Err), zap.String("query", params.Query),
zap.Bool("instant", h.opts.instant))
RespondError(w, res.Err)
return
}

for _, warn := range resultMetadata.Warnings {
res.Warnings = append(res.Warnings, errors.New(warn.Message))
}

query := request.Params.Query
query := params.Query
err = ApplyRangeWarnings(query, &resultMetadata)
if err != nil {
h.logger.Warn("error applying range warnings",
zap.Error(err), zap.String("query", query))
zap.Error(err), zap.String("query", query),
zap.Bool("instant", h.opts.instant))
}

handleroptions.AddResponseHeaders(w, resultMetadata, fetchOptions)
respond(w, &queryData{
Respond(w, &QueryData{
Result: res.Value,
ResultType: res.Value.Type(),
}, res.Warnings)
Expand Down
Loading

0 comments on commit 5f8ca2c

Please sign in to comment.