diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 4e8a8aa402..4c0bcce7a4 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -33,7 +33,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util" "github.com/m3db/m3/src/query/util/json" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" "github.com/golang/snappy" "github.com/gorilla/mux" @@ -47,15 +47,18 @@ const ( filterNameTagsParam = "tag" errFormatStr = "error parsing param: %s, error: %v" - // TODO: get timeouts from configs - maxTimeout = time.Minute - defaultTimeout = time.Second * 15 + maxTimeout = 5 * time.Minute ) var ( matchValues = []byte(".*") ) +// TimeoutOpts stores options related to various timeout configurations +type TimeoutOpts struct { + FetchTimeout time.Duration +} + // ParsePromCompressedRequest parses a snappy compressed request from Prometheus func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) { body := r.Body @@ -83,10 +86,10 @@ func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) { } // ParseRequestTimeout parses the input request timeout with a default -func ParseRequestTimeout(r *http.Request) (time.Duration, error) { +func ParseRequestTimeout(r *http.Request, configFetchTimeout time.Duration) (time.Duration, error) { timeout := r.Header.Get("timeout") if timeout == "" { - return defaultTimeout, nil + return configFetchTimeout, nil } duration, err := time.ParseDuration(timeout) diff --git a/src/query/api/v1/handler/prometheus/common_test.go b/src/query/api/v1/handler/prometheus/common_test.go index 69ba5d8b88..cd02b4d9ac 100644 --- a/src/query/api/v1/handler/prometheus/common_test.go +++ b/src/query/api/v1/handler/prometheus/common_test.go @@ -64,17 +64,17 @@ func TestTimeoutParse(t *testing.T) { req, _ := http.NewRequest("POST", "dummy", nil) req.Header.Add("timeout", "1ms") - timeout, err := ParseRequestTimeout(req) + timeout, err := ParseRequestTimeout(req, time.Second) assert.NoError(t, err) assert.Equal(t, timeout, time.Millisecond) req.Header.Del("timeout") - timeout, err = ParseRequestTimeout(req) + timeout, err = ParseRequestTimeout(req, 2*time.Minute) assert.NoError(t, err) - assert.Equal(t, timeout, defaultTimeout) + assert.Equal(t, timeout, 2*time.Minute) req.Header.Add("timeout", "invalid") - _, err = ParseRequestTimeout(req) + _, err = ParseRequestTimeout(req, 15*time.Second) assert.Error(t, err) } diff --git a/src/query/api/v1/handler/prometheus/native/common.go b/src/query/api/v1/handler/prometheus/native/common.go index 7a051a0f1c..d25b13208b 100644 --- a/src/query/api/v1/handler/prometheus/native/common.go +++ b/src/query/api/v1/handler/prometheus/native/common.go @@ -36,7 +36,7 @@ import ( "github.com/m3db/m3/src/query/util" "github.com/m3db/m3/src/query/util/json" "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" "go.uber.org/zap" ) @@ -83,12 +83,12 @@ func parseDuration(r *http.Request, key string) (time.Duration, error) { } // parseParams parses all params from the GET request -func parseParams(r *http.Request) (models.RequestParams, *xhttp.ParseError) { +func parseParams(r *http.Request, timeoutOpts *prometheus.TimeoutOpts) (models.RequestParams, *xhttp.ParseError) { params := models.RequestParams{ Now: time.Now(), } - t, err := prometheus.ParseRequestTimeout(r) + t, err := prometheus.ParseRequestTimeout(r, timeoutOpts.FetchTimeout) if err != nil { return params, xhttp.NewParseError(err, http.StatusBadRequest) } @@ -179,14 +179,14 @@ func parseBlockType(r *http.Request) models.FetchedBlockType { } // parseInstantaneousParams parses all params from the GET request -func parseInstantaneousParams(r *http.Request) (models.RequestParams, *xhttp.ParseError) { +func parseInstantaneousParams(r *http.Request, timeoutOpts *prometheus.TimeoutOpts) (models.RequestParams, *xhttp.ParseError) { params := models.RequestParams{ Now: time.Now(), Step: time.Second, IncludeEnd: true, } - t, err := prometheus.ParseRequestTimeout(r) + t, err := prometheus.ParseRequestTimeout(r, timeoutOpts.FetchTimeout) if err != nil { return params, xhttp.NewParseError(err, http.StatusBadRequest) } diff --git a/src/query/api/v1/handler/prometheus/native/common_test.go b/src/query/api/v1/handler/prometheus/native/common_test.go index 64b43e6fb1..391a7448f7 100644 --- a/src/query/api/v1/handler/prometheus/native/common_test.go +++ b/src/query/api/v1/handler/prometheus/native/common_test.go @@ -28,6 +28,7 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/ts" @@ -42,6 +43,12 @@ const ( promQuery = `http_requests_total{job="prometheus",group="canary"}` ) +var ( + timeoutOpts = &prometheus.TimeoutOpts{ + FetchTimeout: 15 * time.Second, + } +) + func defaultParams() url.Values { vals := url.Values{} now := time.Now() @@ -56,7 +63,7 @@ func TestParamParsing(t *testing.T) { req, _ := http.NewRequest("GET", PromReadURL, nil) req.URL.RawQuery = defaultParams().Encode() - r, err := parseParams(req) + r, err := parseParams(req, timeoutOpts) require.Nil(t, err, "unable to parse request") require.Equal(t, promQuery, r.Query) } @@ -69,7 +76,7 @@ func TestInstantaneousParamParsing(t *testing.T) { params.Add(timeParam, now.Format(time.RFC3339)) req.URL.RawQuery = params.Encode() - r, err := parseInstantaneousParams(req) + r, err := parseInstantaneousParams(req, timeoutOpts) require.Nil(t, err, "unable to parse request") require.Equal(t, promQuery, r.Query) } @@ -79,7 +86,7 @@ func TestInvalidStart(t *testing.T) { vals := defaultParams() vals.Del(startParam) req.URL.RawQuery = vals.Encode() - _, err := parseParams(req) + _, err := parseParams(req, timeoutOpts) require.NotNil(t, err, "unable to parse request") require.Equal(t, err.Code(), http.StatusBadRequest) } @@ -90,7 +97,7 @@ func TestInvalidTarget(t *testing.T) { vals.Del(queryParam) req.URL.RawQuery = vals.Encode() - p, err := parseParams(req) + p, err := parseParams(req, timeoutOpts) require.NotNil(t, err, "unable to parse request") assert.NotNil(t, p.Start) require.Equal(t, err.Code(), http.StatusBadRequest) diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index 41d645cf5c..bd157ffe94 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" @@ -61,6 +62,7 @@ type PromReadHandler struct { tagOpts models.TagOptions limitsCfg *config.LimitsConfiguration promReadMetrics promReadMetrics + timeoutOps *prometheus.TimeoutOpts } type promReadMetrics struct { @@ -103,12 +105,14 @@ func NewPromReadHandler( tagOpts models.TagOptions, limitsCfg *config.LimitsConfiguration, scope tally.Scope, + timeoutOpts *prometheus.TimeoutOpts, ) *PromReadHandler { h := &PromReadHandler{ engine: engine, tagOpts: tagOpts, limitsCfg: limitsCfg, promReadMetrics: newPromReadMetrics(scope), + timeoutOps: timeoutOpts, } h.promReadMetrics.maxDatapoints.Update(float64(limitsCfg.MaxComputedDatapoints)) @@ -143,7 +147,7 @@ func (h *PromReadHandler) ServeHTTPWithEngine(w http.ResponseWriter, r *http.Req ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) logger := logging.WithContext(ctx) - params, rErr := parseParams(r) + params, rErr := parseParams(r, h.timeoutOps) if rErr != nil { h.promReadMetrics.fetchErrorsClient.Inc(1) return nil, emptyReqParams, &RespError{Err: rErr.Inner(), Code: rErr.Code()} diff --git a/src/query/api/v1/handler/prometheus/native/read_instantaneous.go b/src/query/api/v1/handler/prometheus/native/read_instantaneous.go index 0975707e8e..31aa489408 100644 --- a/src/query/api/v1/handler/prometheus/native/read_instantaneous.go +++ b/src/query/api/v1/handler/prometheus/native/read_instantaneous.go @@ -25,10 +25,11 @@ import ( "net/http" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" "go.uber.org/zap" ) @@ -45,25 +46,28 @@ const ( // PromReadInstantHandler represents a handler for prometheus instantaneous read endpoint. type PromReadInstantHandler struct { - engine *executor.Engine - tagOpts models.TagOptions + engine *executor.Engine + tagOpts models.TagOptions + timeoutOpts *prometheus.TimeoutOpts } // NewPromReadInstantHandler returns a new instance of handler. func NewPromReadInstantHandler( engine *executor.Engine, tagOpts models.TagOptions, + timeoutOpts *prometheus.TimeoutOpts, ) *PromReadInstantHandler { return &PromReadInstantHandler{ - engine: engine, - tagOpts: tagOpts, + engine: engine, + tagOpts: tagOpts, + timeoutOpts: timeoutOpts, } } func (h *PromReadInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) logger := logging.WithContext(ctx) - params, rErr := parseInstantaneousParams(r) + params, rErr := parseInstantaneousParams(r, h.timeoutOpts) if rErr != nil { xhttp.Error(w, rErr.Inner(), rErr.Code()) return diff --git a/src/query/api/v1/handler/prometheus/native/read_test.go b/src/query/api/v1/handler/prometheus/native/read_test.go index 317fdf2452..765b82c27f 100644 --- a/src/query/api/v1/handler/prometheus/native/read_test.go +++ b/src/query/api/v1/handler/prometheus/native/read_test.go @@ -57,7 +57,7 @@ func TestPromReadHandler_Read(t *testing.T) { req, _ := http.NewRequest("GET", PromReadURL, nil) req.URL.RawQuery = defaultParams().Encode() - r, parseErr := parseParams(req) + r, parseErr := parseParams(req, timeoutOpts) require.Nil(t, parseErr) assert.Equal(t, models.FormatPromQL, r.FormatType) seriesList, err := read(context.TODO(), promRead.engine, promRead.tagOpts, httptest.NewRecorder(), r) @@ -130,6 +130,7 @@ func newTestSetup() *testSetup { models.NewTagOptions(), &config.LimitsConfiguration{}, tally.NewTestScope("", nil), + timeoutOpts, ), } } diff --git a/src/query/api/v1/handler/prometheus/remote/read.go b/src/query/api/v1/handler/prometheus/remote/read.go index 0746692017..6f073f0179 100644 --- a/src/query/api/v1/handler/prometheus/remote/read.go +++ b/src/query/api/v1/handler/prometheus/remote/read.go @@ -32,7 +32,7 @@ import ( "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" "github.com/golang/protobuf/proto" "github.com/golang/snappy" @@ -52,13 +52,15 @@ const ( type PromReadHandler struct { engine *executor.Engine promReadMetrics promReadMetrics + timeoutOpts *prometheus.TimeoutOpts } // NewPromReadHandler returns a new instance of handler. -func NewPromReadHandler(engine *executor.Engine, scope tally.Scope) http.Handler { +func NewPromReadHandler(engine *executor.Engine, scope tally.Scope, timeoutOpts *prometheus.TimeoutOpts) http.Handler { return &PromReadHandler{ engine: engine, promReadMetrics: newPromReadMetrics(scope), + timeoutOpts: timeoutOpts, } } @@ -87,7 +89,7 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - timeout, err := prometheus.ParseRequestTimeout(r) + timeout, err := prometheus.ParseRequestTimeout(r, h.timeoutOpts.FetchTimeout) if err != nil { h.promReadMetrics.fetchErrorsClient.Inc(1) xhttp.Error(w, err, http.StatusBadRequest) diff --git a/src/query/api/v1/handler/prometheus/remote/read_test.go b/src/query/api/v1/handler/prometheus/remote/read_test.go index 7a39075d03..9a6d5feea8 100644 --- a/src/query/api/v1/handler/prometheus/remote/read_test.go +++ b/src/query/api/v1/handler/prometheus/remote/read_test.go @@ -30,6 +30,7 @@ import ( "time" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" @@ -47,6 +48,10 @@ import ( var ( promReadTestMetrics = newPromReadMetrics(tally.NewTestScope("", nil)) defaultLookbackDuration = time.Minute + + timeoutOpts = &prometheus.TimeoutOpts{ + FetchTimeout: 15 * time.Second, + } ) func setupServer(t *testing.T) *httptest.Server { @@ -58,13 +63,16 @@ func setupServer(t *testing.T) *httptest.Server { FetchTagged(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, false, fmt.Errorf("not initialized")) storage := test.NewSlowStorage(lstore, 10*time.Millisecond) - promRead := readHandler(storage) + promRead := readHandler(storage, timeoutOpts) server := httptest.NewServer(test.NewSlowHandler(promRead, 10*time.Millisecond)) return server } -func readHandler(store storage.Storage) *PromReadHandler { - return &PromReadHandler{engine: executor.NewEngine(store, tally.NewTestScope("test", nil), defaultLookbackDuration), promReadMetrics: promReadTestMetrics} +func readHandler(store storage.Storage, timeoutOpts *prometheus.TimeoutOpts) *PromReadHandler { + return &PromReadHandler{engine: executor.NewEngine(store, tally.NewTestScope("test", nil), defaultLookbackDuration), + promReadMetrics: promReadTestMetrics, + timeoutOpts: timeoutOpts, + } } func TestPromReadParsing(t *testing.T) { @@ -79,11 +87,29 @@ func TestPromReadParsing(t *testing.T) { require.Equal(t, len(r.Queries), 1) } +func TestPromFetchTimeoutParsing(t *testing.T) { + logging.InitWithCores(nil) + ctrl := gomock.NewController(t) + storage, _ := m3.NewStorageAndSession(t, ctrl) + promRead := &PromReadHandler{ + engine: executor.NewEngine(storage, tally.NewTestScope("test", nil), defaultLookbackDuration), + promReadMetrics: promReadTestMetrics, + timeoutOpts: &prometheus.TimeoutOpts{ + FetchTimeout: 2 * time.Minute, + }, + } + + req, _ := http.NewRequest("POST", PromReadURL, test.GeneratePromReadBody(t)) + dur, err := prometheus.ParseRequestTimeout(req, promRead.timeoutOpts.FetchTimeout) + require.NoError(t, err) + assert.Equal(t, 2*time.Minute, dur) +} + func TestPromReadParsingBad(t *testing.T) { logging.InitWithCores(nil) ctrl := gomock.NewController(t) storage, _ := m3.NewStorageAndSession(t, ctrl) - promRead := readHandler(storage) + promRead := readHandler(storage, timeoutOpts) req, _ := http.NewRequest("POST", PromReadURL, strings.NewReader("bad body")) _, err := promRead.parseRequest(req) require.NotNil(t, err, "unable to parse request") @@ -97,7 +123,7 @@ func TestPromReadStorageWithFetchError(t *testing.T) { Return(nil, true, fmt.Errorf("unable to get data")) session.EXPECT().IteratorPools(). Return(nil, nil) - promRead := readHandler(storage) + promRead := readHandler(storage, timeoutOpts) req := test.GeneratePromReadRequest() _, err := promRead.read(context.TODO(), httptest.NewRecorder(), req, time.Hour) require.NotNil(t, err, "unable to read from storage") @@ -154,7 +180,7 @@ func TestReadErrorMetricsCount(t *testing.T) { defer closer.Close() readMetrics := newPromReadMetrics(scope) - promRead := &PromReadHandler{engine: executor.NewEngine(storage, scope, defaultLookbackDuration), promReadMetrics: readMetrics} + promRead := &PromReadHandler{engine: executor.NewEngine(storage, scope, defaultLookbackDuration), promReadMetrics: readMetrics, timeoutOpts: timeoutOpts} req, _ := http.NewRequest("POST", PromReadURL, test.GeneratePromReadBody(t)) promRead.ServeHTTP(httptest.NewRecorder(), req) diff --git a/src/query/api/v1/handler/prometheus/validator/handler_test.go b/src/query/api/v1/handler/prometheus/validator/handler_test.go index f48640ece2..850b52cdd1 100644 --- a/src/query/api/v1/handler/prometheus/validator/handler_test.go +++ b/src/query/api/v1/handler/prometheus/validator/handler_test.go @@ -30,6 +30,7 @@ import ( "time" "github.com/m3db/m3/src/cmd/services/m3query/config" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/native" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" @@ -43,6 +44,10 @@ import ( var ( defaultLookbackDuration = time.Minute + + timeoutOpts = &prometheus.TimeoutOpts{ + FetchTimeout: 15 * time.Second, + } ) func newBodyWithMismatch() io.Reader { @@ -300,6 +305,7 @@ func newServer() (*httptest.Server, *PromDebugHandler) { models.NewTagOptions(), &config.LimitsConfiguration{}, tally.NewTestScope("test", nil), + timeoutOpts, ), tally.NewTestScope("test", nil), defaultLookbackDuration, ) diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index c88a84c261..e5637ae6c1 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -22,6 +22,7 @@ package httpd import ( "encoding/json" + "errors" "net/http" _ "net/http/pprof" // needed for pprof handler registration "time" @@ -37,6 +38,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/namespace" "github.com/m3db/m3/src/query/api/v1/handler/openapi" "github.com/m3db/m3/src/query/api/v1/handler/placement" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/native" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/validator" @@ -61,6 +63,8 @@ const ( var ( remoteSource = map[string]string{"source": "remote"} nativeSource = map[string]string{"source": "native"} + + defaultTimeout = 30 * time.Second ) // Handler represents an HTTP handler. @@ -77,6 +81,7 @@ type Handler struct { scope tally.Scope createdAt time.Time tagOptions models.TagOptions + timeoutOpts *prometheus.TimeoutOpts } // Router returns the http handler registered with all relevant routes for query. @@ -105,6 +110,17 @@ func NewHandler( }, } + var timeoutOpts = &prometheus.TimeoutOpts{} + if embeddedDbCfg == nil { + timeoutOpts.FetchTimeout = defaultTimeout + } else { + if embeddedDbCfg.Client.FetchTimeout < 0 { + return nil, errors.New("m3db client fetch timeout should be > 0") + } + + timeoutOpts.FetchTimeout = embeddedDbCfg.Client.FetchTimeout + } + h := &Handler{ router: r, handler: withMiddleware, @@ -118,6 +134,7 @@ func NewHandler( scope: scope, createdAt: time.Now(), tagOptions: tagOptions, + timeoutOpts: timeoutOpts, } return h, nil } @@ -132,7 +149,7 @@ func (h *Handler) RegisterRoutes() error { h.router.PathPrefix(openapi.StaticURLPrefix).Handler(logged(openapi.StaticHandler())) // Prometheus remote read/write endpoints - promRemoteReadHandler := remote.NewPromReadHandler(h.engine, h.scope.Tagged(remoteSource)) + promRemoteReadHandler := remote.NewPromReadHandler(h.engine, h.scope.Tagged(remoteSource), h.timeoutOpts) promRemoteWriteHandler, err := remote.NewPromWriteHandler( h.downsamplerAndWriter, h.tagOptions, @@ -147,6 +164,7 @@ func (h *Handler) RegisterRoutes() error { h.tagOptions, &h.config.Limits, h.scope.Tagged(nativeSource), + h.timeoutOpts, ) h.router.HandleFunc(remote.PromReadURL, @@ -159,7 +177,7 @@ func (h *Handler) RegisterRoutes() error { logged(nativePromReadHandler).ServeHTTP, ).Methods(native.PromReadHTTPMethod) h.router.HandleFunc(native.PromReadInstantURL, - logged(native.NewPromReadInstantHandler(h.engine, h.tagOptions)).ServeHTTP, + logged(native.NewPromReadInstantHandler(h.engine, h.tagOptions, h.timeoutOpts)).ServeHTTP, ).Methods(native.PromReadInstantHTTPMethod) // Native M3 search and write endpoints diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index d2696d08a2..24865ded47 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -29,7 +29,9 @@ import ( "time" "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" + dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" + "github.com/m3db/m3/src/dbnode/client" m3json "github.com/m3db/m3/src/query/api/v1/handler/json" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/native" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote" @@ -62,6 +64,33 @@ func setupHandler(store storage.Storage) (*Handler, error) { config.Configuration{LookbackDuration: &defaultLookbackDuration}, nil, tally.NewTestScope("", nil)) } +func TestHandlerFetchTimeoutError(t *testing.T) { + logging.InitWithCores(nil) + + ctrl := gomock.NewController(t) + storage, _ := m3.NewStorageAndSession(t, ctrl) + downsamplerAndWriter := ingest.NewDownsamplerAndWriter(storage, nil, testWorkerPool) + + dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: -1 * time.Second}} + _, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute), nil, nil, + config.Configuration{LookbackDuration: &defaultLookbackDuration}, dbconfig, tally.NewTestScope("", nil)) + require.Error(t, err) +} + +func TestHandlerFetchTimeout(t *testing.T) { + logging.InitWithCores(nil) + + ctrl := gomock.NewController(t) + storage, _ := m3.NewStorageAndSession(t, ctrl) + downsamplerAndWriter := ingest.NewDownsamplerAndWriter(storage, nil, testWorkerPool) + + dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: 4 * time.Minute}} + h, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute), nil, nil, + config.Configuration{LookbackDuration: &defaultLookbackDuration}, dbconfig, tally.NewTestScope("", nil)) + require.NoError(t, err) + assert.Equal(t, 4*time.Minute, h.timeoutOpts.FetchTimeout) +} + func TestPromRemoteReadGet(t *testing.T) { logging.InitWithCores(nil) @@ -72,6 +101,7 @@ func TestPromRemoteReadGet(t *testing.T) { h, err := setupHandler(storage) require.NoError(t, err, "unable to setup handler") + assert.Equal(t, 30*time.Second, h.timeoutOpts.FetchTimeout) err = h.RegisterRoutes() require.NoError(t, err, "unable to register routes") h.Router().ServeHTTP(res, req)