From f4d13bce0e14a14cab0905e3074de66a7713583d Mon Sep 17 00:00:00 2001 From: Andrew Mains Date: Fri, 26 Oct 2018 12:24:22 -0400 Subject: [PATCH 1/4] Cost accounting hooked up with removal of cost after query finishes --- src/cmd/services/m3query/config/config.go | 83 ++++-- .../services/m3query/config/config_test.go | 110 ++++++- .../m3query/config/testdata/config.yml | 59 ++++ .../{sample_config.yml => config_test.yml} | 6 +- .../api/v1/handler/prometheus/native/read.go | 10 +- .../v1/handler/prometheus/native/read_test.go | 10 +- .../v1/handler/prometheus/remote/read_test.go | 8 +- .../handler/prometheus/validator/handler.go | 2 +- .../prometheus/validator/handler_test.go | 3 +- src/query/api/v1/httpd/handler.go | 2 +- src/query/api/v1/httpd/handler_test.go | 7 +- src/query/block/accounted.go | 44 +++ src/query/block/accounted_test.go | 41 +++ src/query/block/column.go | 33 +- src/query/config/config_test.go | 1 - src/query/cost/cost.go | 208 +++++++++++++ src/query/cost/cost_mock.go | 236 +++++++++++++++ src/query/cost/cost_prop_test.go | 77 +++++ src/query/cost/cost_test.go | 242 +++++++++++++++ src/query/executor/engine.go | 17 +- src/query/executor/engine_test.go | 39 ++- src/query/executor/transform/controller.go | 2 +- src/query/functions/fetch.go | 2 + src/query/functions/tag/join_test.go | 2 +- src/query/functions/tag/replace_test.go | 2 +- src/query/functions/temporal/base_test.go | 2 +- src/query/generated/mocks/generate.go | 1 + src/query/models/query_context.go | 19 +- src/query/server/cost_reporters.go | 209 +++++++++++++ src/query/server/cost_reporters_test.go | 282 ++++++++++++++++++ src/query/server/server.go | 7 +- src/query/server/server_test.go | 54 ++++ src/query/storage/block.go | 6 +- src/query/storage/converter.go | 4 + src/query/storage/converter_test.go | 23 +- src/query/storage/m3/accounted_series_iter.go | 89 ++++++ .../storage/m3/accounted_series_iter_test.go | 150 ++++++++++ src/query/storage/m3/storage.go | 24 +- src/query/storage/types.go | 10 + src/query/storage/validator/storage.go | 2 +- src/query/test/block.go | 2 +- src/query/test/seriesiter/mock_iter.go | 53 ++-- src/query/ts/m3db/convert_test.go | 6 +- src/query/tsdb/remote/client.go | 2 +- src/x/cost/enforcer.go | 80 ++--- src/x/cost/enforcer_test.go | 26 +- src/x/cost/options.go | 38 +-- src/x/cost/test/assert.go | 45 +++ src/x/cost/types.go | 14 + 49 files changed, 2245 insertions(+), 149 deletions(-) create mode 100644 src/cmd/services/m3query/config/testdata/config.yml rename src/cmd/services/m3query/config/testdata/{sample_config.yml => config_test.yml} (91%) create mode 100644 src/query/block/accounted.go create mode 100644 src/query/block/accounted_test.go create mode 100644 src/query/cost/cost.go create mode 100644 src/query/cost/cost_mock.go create mode 100644 src/query/cost/cost_prop_test.go create mode 100644 src/query/cost/cost_test.go create mode 100644 src/query/server/cost_reporters.go create mode 100644 src/query/server/cost_reporters_test.go create mode 100644 src/query/storage/m3/accounted_series_iter.go create mode 100644 src/query/storage/m3/accounted_series_iter_test.go create mode 100644 src/x/cost/test/assert.go diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index e02c950a1a..5d58c3951c 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/m3" + "github.com/m3db/m3/src/x/cost" xdocs "github.com/m3db/m3/src/x/docs" xconfig "github.com/m3db/m3x/config" "github.com/m3db/m3x/config/listenaddress" @@ -62,12 +63,6 @@ var ( defaultLookbackDuration = 5 * time.Minute defaultCarbonIngesterAggregationType = aggregation.Mean - - // defaultLimitsConfiguration is applied if `limits` isn't specified. - defaultLimitsConfiguration = &LimitsConfiguration{ - // this is sufficient for 1 day span / 1s step, or 60 days with a 1m step. - MaxComputedDatapoints: 86400, - } ) // Configuration is the configuration for the query service. @@ -121,7 +116,7 @@ type Configuration struct { Carbon *CarbonConfiguration `yaml:"carbon"` // Limits specifies limits on per-query resource usage. - Limits *LimitsConfiguration `yaml:"limits"` + Limits LimitsConfiguration `yaml:"limits"` // LookbackDuration determines the lookback duration for queries LookbackDuration *time.Duration `yaml:"lookbackDuration"` @@ -191,9 +186,69 @@ func (q *QueryConversionCacheConfiguration) Validate() error { return nil } -// LimitsConfiguration represents limitations on per-query resource usage. Zero or negative values imply no limit. +// LimitsConfiguration represents limitations on resource usage in the query instance. Limits are split between per-query +// and global limits. type LimitsConfiguration struct { - MaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"` + // deprecated: use PerQuery.MaxComputedDatapoints instead. + DeprecatedMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"` + + // Global configures limits which apply across all queries running on this + // instance. + Global GlobalLimitsConfiguration `yaml:"global"` + + // PerQuery configures limits which apply to each query individually. + PerQuery PerQueryLimitsConfiguration `yaml:"perQuery"` +} + +// MaxComputedDatapoints is a getter providing backwards compatibility between +// LimitsConfiguration.DeprecatedMaxComputedDatapoints and +// LimitsConfiguration.PerQuery.PrivateMaxComputedDatapoints. See +// LimitsConfiguration.PerQuery.PrivateMaxComputedDatapoints for a comment on +// the semantics. +func (lc *LimitsConfiguration) MaxComputedDatapoints() int64 { + if lc.PerQuery.PrivateMaxComputedDatapoints != 0 { + return lc.PerQuery.PrivateMaxComputedDatapoints + } + + return lc.DeprecatedMaxComputedDatapoints +} + +// GlobalLimitsConfiguration represents limits on resource usage across a query instance. Zero or negative values imply no limit. +type GlobalLimitsConfiguration struct { + // MaxFetchedDatapoints limits the total number of datapoints actually fetched by all queries at any given time. + MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"` +} + +// AsLimitManagerOptions converts this configuration to cost.LimitManagerOptions for MaxFetchedDatapoints. +func (l *GlobalLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerOptions { + return toLimitManagerOptions(l.MaxFetchedDatapoints) +} + +// PerQueryLimitsConfiguration represents limits on resource usage within a single query. Zero or negative values imply no limit. +type PerQueryLimitsConfiguration struct { + // PrivateMaxComputedDatapoints limits the number of datapoints that can be + // returned by a query. It's determined purely + // from the size of the time range and the step size (end - start / step). + // + // N.B.: the hacky "Private" prefix is to indicate that callers should use + // LimitsConfiguration.MaxComputedDatapoints() instead of accessing + // this field directly. + PrivateMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"` + + // MaxFetchedDatapoints limits the number of datapoints actually used by a given query. + MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"` +} + +// AsLimitManagerOptions converts this configuration to cost.LimitManagerOptions for MaxFetchedDatapoints. +func (l *PerQueryLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerOptions { + return toLimitManagerOptions(l.MaxFetchedDatapoints) +} + +func toLimitManagerOptions(limit int64) cost.LimitManagerOptions { + return cost.NewLimitManagerOptions().SetDefaultLimit(cost.Limit{ + Threshold: cost.Cost(limit), + Enabled: limit > 0, + }) } // IngestConfiguration is the configuration for ingestion server. @@ -232,16 +287,6 @@ func (c Configuration) LookbackDurationOrDefault() (time.Duration, error) { return v, nil } -// LimitsOrDefault returns the specified limit configuration if provided, or the -// default value otherwise. -func (c Configuration) LimitsOrDefault() *LimitsConfiguration { - if c.Limits != nil { - return c.Limits - } - - return defaultLimitsConfiguration -} - // ListenAddressOrDefault returns the specified carbon ingester listen address if provided, or the // default value if not. func (c *CarbonIngesterConfiguration) ListenAddressOrDefault() string { diff --git a/src/cmd/services/m3query/config/config_test.go b/src/cmd/services/m3query/config/config_test.go index 45fe549343..81a841d477 100644 --- a/src/cmd/services/m3query/config/config_test.go +++ b/src/cmd/services/m3query/config/config_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/x/cost" xdocs "github.com/m3db/m3/src/x/docs" xconfig "github.com/m3db/m3x/config" @@ -34,6 +35,8 @@ import ( yaml "gopkg.in/yaml.v2" ) +const testConfigFile = "./testdata/config.yml" + func TestTagOptionsFromEmptyConfigErrors(t *testing.T) { cfg := TagOptionsConfiguration{} opts, err := TagOptionsFromConfig(cfg) @@ -69,20 +72,112 @@ func TestTagOptionsFromConfig(t *testing.T) { assert.Equal(t, []byte(name), opts.MetricName()) } +func TestLimitsConfiguration_AsLimitManagerOptions(t *testing.T) { + cases := []struct { + Input interface { + AsLimitManagerOptions() cost.LimitManagerOptions + } + ExpectedDefault int64 + }{{ + Input: &PerQueryLimitsConfiguration{ + MaxFetchedDatapoints: 5, + }, + ExpectedDefault: 5, + }, { + Input: &GlobalLimitsConfiguration{ + MaxFetchedDatapoints: 6, + }, + ExpectedDefault: 6, + }} + + for _, tc := range cases { + t.Run(fmt.Sprintf("type_%T", tc.Input), func(t *testing.T) { + res := tc.Input.AsLimitManagerOptions() + assert.Equal(t, cost.Limit{ + Threshold: cost.Cost(tc.ExpectedDefault), + Enabled: true, + }, res.DefaultLimit()) + }) + } +} + +func TestLimitsConfiguration_MaxComputedDatapoints(t *testing.T) { + t.Run("uses PerQuery value if provided", func(t *testing.T) { + lc := &LimitsConfiguration{ + DeprecatedMaxComputedDatapoints: 6, + PerQuery: PerQueryLimitsConfiguration{ + PrivateMaxComputedDatapoints: 5, + }, + } + + assert.Equal(t, int64(5), lc.MaxComputedDatapoints()) + }) + + t.Run("uses deprecated value if PerQuery not provided", func(t *testing.T) { + lc := &LimitsConfiguration{ + DeprecatedMaxComputedDatapoints: 6, + } + + assert.Equal(t, int64(6), lc.MaxComputedDatapoints()) + }) +} + +func TestToLimitManagerOptions(t *testing.T) { + cases := []struct { + Name string + Input int64 + ExpectedLimit cost.Limit + }{{ + Name: "negative is disabled", + Input: -5, + ExpectedLimit: cost.Limit{ + Threshold: cost.Cost(-5), + Enabled: false, + }, + }, { + Name: "zero is disabled", + Input: 0, + ExpectedLimit: cost.Limit{ + Threshold: cost.Cost(0), + Enabled: false, + }, + }, { + Name: "positive is enabled", + Input: 5, + ExpectedLimit: cost.Limit{ + Threshold: cost.Cost(5), + Enabled: true, + }, + }} + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + assert.Equal(t, tc.ExpectedLimit, toLimitManagerOptions(tc.Input).DefaultLimit()) + }) + } +} + func TestConfigLoading(t *testing.T) { var cfg Configuration - require.NoError(t, xconfig.LoadFile(&cfg, "./testdata/sample_config.yml", xconfig.Options{})) + require.NoError(t, xconfig.LoadFile(&cfg, testConfigFile, xconfig.Options{})) assert.Equal(t, &LimitsConfiguration{ - MaxComputedDatapoints: 12000, - }, cfg.Limits) + DeprecatedMaxComputedDatapoints: 10555, + PerQuery: PerQueryLimitsConfiguration{ + PrivateMaxComputedDatapoints: 12000, + MaxFetchedDatapoints: 11000, + }, + Global: GlobalLimitsConfiguration{ + MaxFetchedDatapoints: 13000, + }, + }, &cfg.Limits) // TODO: assert on more fields here. } func TestConfigValidation(t *testing.T) { baseCfg := func(t *testing.T) *Configuration { var cfg Configuration - require.NoError(t, xconfig.LoadFile(&cfg, "./testdata/sample_config.yml", xconfig.Options{}), + require.NoError(t, xconfig.LoadFile(&cfg, testConfigFile, xconfig.Options{}), "sample configuration is no longer valid or loadable. Fix it up to provide a base config here") return &cfg @@ -106,9 +201,10 @@ func TestConfigValidation(t *testing.T) { for _, tc := range limitsCfgCases { t.Run(tc.Name, func(t *testing.T) { cfg := baseCfg(t) - cfg.Limits = &LimitsConfiguration{ - MaxComputedDatapoints: 5, - } + cfg.Limits = LimitsConfiguration{ + PerQuery: PerQueryLimitsConfiguration{ + PrivateMaxComputedDatapoints: tc.Limit, + }} assert.NoError(t, validator.Validate(cfg)) }) diff --git a/src/cmd/services/m3query/config/testdata/config.yml b/src/cmd/services/m3query/config/testdata/config.yml new file mode 100644 index 0000000000..f1d69a30e7 --- /dev/null +++ b/src/cmd/services/m3query/config/testdata/config.yml @@ -0,0 +1,59 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +clusters: + - namespaces: + - namespace: default + type: unaggregated + retention: 48h + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - 127.0.0.1:2379 + seedNodes: + initialCluster: + - hostID: m3db_local + endpoint: http://127.0.0.1:2380 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + writeTimeout: 10s + fetchTimeout: 15s + connectTimeout: 20s + writeRetry: + initialBackoff: 500ms + backoffFactor: 3 + maxRetries: 2 + jitter: true + fetchRetry: + initialBackoff: 500ms + backoffFactor: 2 + maxRetries: 3 + jitter: true + backgroundHealthCheckFailLimit: 4 + backgroundHealthCheckFailThrottleFactor: 0.5 + +limits: + maxComputedDatapoints: 10555 + perQuery: + maxComputedDatapoints: 12000 + maxFetchedDatapoints: 11000 + global: + maxFetchedDatapoints: 13000 diff --git a/src/cmd/services/m3query/config/testdata/sample_config.yml b/src/cmd/services/m3query/config/testdata/config_test.yml similarity index 91% rename from src/cmd/services/m3query/config/testdata/sample_config.yml rename to src/cmd/services/m3query/config/testdata/config_test.yml index 645cc165b1..3df29096f3 100644 --- a/src/cmd/services/m3query/config/testdata/sample_config.yml +++ b/src/cmd/services/m3query/config/testdata/config_test.yml @@ -51,4 +51,8 @@ clusters: backgroundHealthCheckFailThrottleFactor: 0.5 limits: - maxComputedDatapoints: 12000 \ No newline at end of file + perQuery: + maxComputedDatapoints: 12000 + maxFetchedDatapoints: 11000 + global: + maxFetchedDatapoints: 13000 diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index b10ad7c89a..fb7fb3ff9c 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -118,7 +118,7 @@ func NewPromReadHandler( timeoutOps: timeoutOpts, } - h.promReadMetrics.maxDatapoints.Update(float64(limitsCfg.MaxComputedDatapoints)) + h.promReadMetrics.maxDatapoints.Update(float64(limitsCfg.MaxComputedDatapoints())) return h } @@ -178,6 +178,9 @@ func (h *PromReadHandler) ServeHTTPWithEngine( return nil, emptyReqParams, &RespError{Err: err, Code: http.StatusInternalServerError} } + // TODO: Support multiple result types + w.Header().Set("Content-Type", "application/json") + return result, params, nil } @@ -186,12 +189,13 @@ func (h *PromReadHandler) validateRequest(params *models.RequestParams) error { // querying from the beginning of time with a 1s step size. // Approach taken directly from prom. numSteps := int64(params.End.Sub(params.Start) / params.Step) - if h.limitsCfg.MaxComputedDatapoints > 0 && numSteps > h.limitsCfg.MaxComputedDatapoints { + maxComputedDatapoints := h.limitsCfg.MaxComputedDatapoints() + if maxComputedDatapoints > 0 && numSteps > maxComputedDatapoints { return fmt.Errorf( "querying from %v to %v with step size %v would result in too many datapoints "+ "(end - start / step > %d). Either decrease the query resolution (?step=XX), decrease the time window, "+ "or increase the limit (`limits.maxComputedDatapoints`)", - params.Start, params.End, params.Step, h.limitsCfg.MaxComputedDatapoints, + params.Start, params.End, params.Step, maxComputedDatapoints, ) } diff --git a/src/query/api/v1/handler/prometheus/native/read_test.go b/src/query/api/v1/handler/prometheus/native/read_test.go index 765b82c27f..d2c8946eba 100644 --- a/src/query/api/v1/handler/prometheus/native/read_test.go +++ b/src/query/api/v1/handler/prometheus/native/read_test.go @@ -126,7 +126,7 @@ func newTestSetup() *testSetup { return &testSetup{ Storage: mockStorage, Handler: NewPromReadHandler( - executor.NewEngine(mockStorage, tally.NewTestScope("test", nil), time.Minute), + executor.NewEngine(mockStorage, tally.NewTestScope("test", nil), time.Minute, nil), models.NewTagOptions(), &config.LimitsConfiguration{}, tally.NewTestScope("", nil), @@ -138,7 +138,9 @@ func newTestSetup() *testSetup { func TestPromReadHandler_ServeHTTP_maxComputedDatapoints(t *testing.T) { setup := newTestSetup() setup.Handler.limitsCfg = &config.LimitsConfiguration{ - MaxComputedDatapoints: 3599, + PerQuery: config.PerQueryLimitsConfiguration{ + PrivateMaxComputedDatapoints: 3599, + }, } params := defaultParams() @@ -247,7 +249,9 @@ func TestPromReadHandler_validateRequest(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { setup := newTestSetup() setup.Handler.limitsCfg = &config.LimitsConfiguration{ - MaxComputedDatapoints: tc.Max, + PerQuery: config.PerQueryLimitsConfiguration{ + PrivateMaxComputedDatapoints: tc.Max, + }, } err := setup.Handler.validateRequest(tc.Params) diff --git a/src/query/api/v1/handler/prometheus/remote/read_test.go b/src/query/api/v1/handler/prometheus/remote/read_test.go index 9a6d5feea8..8c13a6b7ae 100644 --- a/src/query/api/v1/handler/prometheus/remote/read_test.go +++ b/src/query/api/v1/handler/prometheus/remote/read_test.go @@ -69,7 +69,7 @@ func setupServer(t *testing.T) *httptest.Server { } func readHandler(store storage.Storage, timeoutOpts *prometheus.TimeoutOpts) *PromReadHandler { - return &PromReadHandler{engine: executor.NewEngine(store, tally.NewTestScope("test", nil), defaultLookbackDuration), + return &PromReadHandler{engine: executor.NewEngine(store, tally.NewTestScope("test", nil), defaultLookbackDuration, nil), promReadMetrics: promReadTestMetrics, timeoutOpts: timeoutOpts, } @@ -79,7 +79,7 @@ func TestPromReadParsing(t *testing.T) { logging.InitWithCores(nil) ctrl := gomock.NewController(t) storage, _ := m3.NewStorageAndSession(t, ctrl) - promRead := &PromReadHandler{engine: executor.NewEngine(storage, tally.NewTestScope("test", nil), defaultLookbackDuration), promReadMetrics: promReadTestMetrics} + promRead := &PromReadHandler{engine: executor.NewEngine(storage, tally.NewTestScope("test", nil), defaultLookbackDuration, nil), promReadMetrics: promReadTestMetrics} req, _ := http.NewRequest("POST", PromReadURL, test.GeneratePromReadBody(t)) r, err := promRead.parseRequest(req) @@ -92,7 +92,7 @@ func TestPromFetchTimeoutParsing(t *testing.T) { ctrl := gomock.NewController(t) storage, _ := m3.NewStorageAndSession(t, ctrl) promRead := &PromReadHandler{ - engine: executor.NewEngine(storage, tally.NewTestScope("test", nil), defaultLookbackDuration), + engine: executor.NewEngine(storage, tally.NewTestScope("test", nil), defaultLookbackDuration, nil), promReadMetrics: promReadTestMetrics, timeoutOpts: &prometheus.TimeoutOpts{ FetchTimeout: 2 * time.Minute, @@ -180,7 +180,7 @@ func TestReadErrorMetricsCount(t *testing.T) { defer closer.Close() readMetrics := newPromReadMetrics(scope) - promRead := &PromReadHandler{engine: executor.NewEngine(storage, scope, defaultLookbackDuration), promReadMetrics: readMetrics, timeoutOpts: timeoutOpts} + promRead := &PromReadHandler{engine: executor.NewEngine(storage, scope, defaultLookbackDuration, nil), promReadMetrics: readMetrics, timeoutOpts: timeoutOpts} req, _ := http.NewRequest("POST", PromReadURL, test.GeneratePromReadBody(t)) promRead.ServeHTTP(httptest.NewRecorder(), req) diff --git a/src/query/api/v1/handler/prometheus/validator/handler.go b/src/query/api/v1/handler/prometheus/validator/handler.go index 902be378ca..ac251f28fc 100644 --- a/src/query/api/v1/handler/prometheus/validator/handler.go +++ b/src/query/api/v1/handler/prometheus/validator/handler.go @@ -109,7 +109,7 @@ func (h *PromDebugHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - engine := executor.NewEngine(s, h.scope.SubScope("debug_engine"), h.lookbackDuration) + engine := executor.NewEngine(s, h.scope.SubScope("debug_engine"), h.lookbackDuration, nil) results, _, respErr := h.readHandler.ServeHTTPWithEngine(w, r, engine) if respErr != nil { logger.Error("unable to read data", zap.Error(respErr.Err)) diff --git a/src/query/api/v1/handler/prometheus/validator/handler_test.go b/src/query/api/v1/handler/prometheus/validator/handler_test.go index 850b52cdd1..36088e501f 100644 --- a/src/query/api/v1/handler/prometheus/validator/handler_test.go +++ b/src/query/api/v1/handler/prometheus/validator/handler_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/native" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage/mock" @@ -301,7 +302,7 @@ func newServer() (*httptest.Server, *PromDebugHandler) { mockStorage := mock.NewMockStorage() debugHandler := NewPromDebugHandler( native.NewPromReadHandler( - executor.NewEngine(mockStorage, tally.NewTestScope("test_engine", nil), defaultLookbackDuration), + executor.NewEngine(mockStorage, tally.NewTestScope("test_engine", nil), defaultLookbackDuration, cost.NoopChainedEnforcer()), models.NewTagOptions(), &config.LimitsConfiguration{}, tally.NewTestScope("test", nil), diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 36072c89a4..925bee6d29 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -178,7 +178,7 @@ func (h *Handler) RegisterRoutes() error { nativePromReadHandler := native.NewPromReadHandler( h.engine, h.tagOptions, - h.config.LimitsOrDefault(), + &h.config.Limits, h.scope.Tagged(nativeSource), h.timeoutOpts, ) diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index 7aed787828..b077779770 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -65,7 +65,8 @@ func setupHandler(store storage.Storage) (*Handler, error) { return NewHandler( downsamplerAndWriter, makeTagOptions(), - executor.NewEngine(store, tally.NewTestScope("test", nil), time.Minute), + executor.NewEngine(store, tally.NewTestScope("test", nil), + time.Minute, nil), nil, nil, config.Configuration{LookbackDuration: &defaultLookbackDuration}, @@ -82,7 +83,7 @@ func TestHandlerFetchTimeoutError(t *testing.T) { negValue := -1 * time.Second dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: &negValue}} - _, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute), nil, nil, + _, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil), nil, nil, config.Configuration{LookbackDuration: &defaultLookbackDuration}, dbconfig, tally.NewTestScope("", nil)) require.Error(t, err) } @@ -96,7 +97,7 @@ func TestHandlerFetchTimeout(t *testing.T) { fourMin := 4 * time.Minute dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: &fourMin}} - h, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute), nil, nil, + h, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil), nil, nil, config.Configuration{LookbackDuration: &defaultLookbackDuration}, dbconfig, tally.NewTestScope("", nil)) require.NoError(t, err) assert.Equal(t, 4*time.Minute, h.timeoutOpts.FetchTimeout) diff --git a/src/query/block/accounted.go b/src/query/block/accounted.go new file mode 100644 index 0000000000..3538b7812c --- /dev/null +++ b/src/query/block/accounted.go @@ -0,0 +1,44 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package block + +import "github.com/m3db/m3/src/query/cost" + +// AccountedBlock is a wrapper for a block which enforces limits on the number of datapoints used by the block. +type AccountedBlock struct { + Block + + enforcer cost.ChainedEnforcer +} + +// NewAccountedBlock wraps the given block and enforces datapoint limits. +func NewAccountedBlock(wrapped Block, enforcer cost.ChainedEnforcer) *AccountedBlock { + return &AccountedBlock{ + Block: wrapped, + enforcer: enforcer, + } +} + +// Close closes the block, and marks the number of datapoints used by this block as finished. +func (ab *AccountedBlock) Close() error { + ab.enforcer.Close() + return ab.Block.Close() +} diff --git a/src/query/block/accounted_test.go b/src/query/block/accounted_test.go new file mode 100644 index 0000000000..8172e341a9 --- /dev/null +++ b/src/query/block/accounted_test.go @@ -0,0 +1,41 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package block + +import ( + "testing" + + "github.com/m3db/m3/src/query/cost" + + "github.com/golang/mock/gomock" +) + +func TestAccountedBlock_Close(t *testing.T) { + ctrl := gomock.NewController(t) + + wrapped := NewMockBlock(ctrl) + wrapped.EXPECT().Close() + + mockEnforcer := cost.NewMockChainedEnforcer(ctrl) + mockEnforcer.EXPECT().Close() + + NewAccountedBlock(wrapped, mockEnforcer).Close() +} diff --git a/src/query/block/column.go b/src/query/block/column.go index 336bd1830a..6e171e1631 100644 --- a/src/query/block/column.go +++ b/src/query/block/column.go @@ -23,11 +23,19 @@ package block import ( "fmt" "time" + + "github.com/m3db/m3/src/query/cost" + "github.com/m3db/m3/src/query/models" + xcost "github.com/m3db/m3/src/x/cost" + + "github.com/uber-go/tally" ) // ColumnBlockBuilder builds a block optimized for column iteration type ColumnBlockBuilder struct { - block *columnBlock + block *columnBlock + enforcer cost.ChainedEnforcer + blockDatapoints tally.Counter } type columnBlock struct { @@ -164,8 +172,13 @@ func NewColStep(t time.Time, values []float64) Step { } // NewColumnBlockBuilder creates a new column block builder -func NewColumnBlockBuilder(meta Metadata, seriesMeta []SeriesMeta) Builder { +func NewColumnBlockBuilder( + queryCtx *models.QueryContext, + meta Metadata, + seriesMeta []SeriesMeta) Builder { return ColumnBlockBuilder{ + enforcer: queryCtx.Enforcer.Child(cost.BlockLevel), + blockDatapoints: queryCtx.Scope.Tagged(map[string]string{"type": "generated"}).Counter("datapoints"), block: &columnBlock{ meta: meta, seriesMeta: seriesMeta, @@ -180,6 +193,13 @@ func (cb ColumnBlockBuilder) AppendValue(idx int, value float64) error { return fmt.Errorf("idx out of range for append: %d", idx) } + r := cb.enforcer.Add(1) + if r.Error != nil { + return r.Error + } + + cb.blockDatapoints.Inc(1) + columns[idx].Values = append(columns[idx].Values, value) return nil } @@ -191,6 +211,13 @@ func (cb ColumnBlockBuilder) AppendValues(idx int, values []float64) error { return fmt.Errorf("idx out of range for append: %d", idx) } + r := cb.enforcer.Add(xcost.Cost(len(values))) + if r.Error != nil { + return r.Error + } + + cb.blockDatapoints.Inc(int64(len(values))) + columns[idx].Values = append(columns[idx].Values, values...) return nil } @@ -205,7 +232,7 @@ func (cb ColumnBlockBuilder) AddCols(num int) error { // Build extracts the block // TODO: Return an immutable copy func (cb ColumnBlockBuilder) Build() Block { - return cb.block + return NewAccountedBlock(cb.block, cb.enforcer) } type column struct { diff --git a/src/query/config/config_test.go b/src/query/config/config_test.go index d3570602cb..2b65d57b3b 100644 --- a/src/query/config/config_test.go +++ b/src/query/config/config_test.go @@ -17,7 +17,6 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// package config diff --git a/src/query/cost/cost.go b/src/query/cost/cost.go new file mode 100644 index 0000000000..30e7fdafcb --- /dev/null +++ b/src/query/cost/cost.go @@ -0,0 +1,208 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cost + +import ( + "errors" + "fmt" + + "github.com/m3db/m3/src/x/cost" +) + +const ( + // BlockLevel identifies per-block enforcers. + BlockLevel = "block" + // QueryLevel identifies per-query enforcers. + QueryLevel = "query" + // GlobalLevel identifies global enforcers. + GlobalLevel = "global" +) + +// ChainedEnforcer is a cost.Enforcer implementation which tracks resource usage implements cost.Enforcer to enforce +// limits on multiple resources at once, linked together in a tree. +type ChainedEnforcer interface { + cost.Enforcer + + // Child creates a new ChainedEnforcer which rolls up to this one. + Child(resourceName string) ChainedEnforcer + + // Close indicates that all resources have been returned for this + // ChainedEnforcer. It should inform all parent enforcers that the + // resources have been freed. + Close() +} + +type noopChainedReporter struct{} + +func (noopChainedReporter) ReportCost(_ cost.Cost) {} +func (noopChainedReporter) ReportCurrent(_ cost.Cost) {} +func (noopChainedReporter) ReportOverLimit(_ bool) {} +func (noopChainedReporter) OnChildClose(_ cost.Cost) {} +func (noopChainedReporter) OnClose(_ cost.Cost) {} + +var noopChainedReporterInstance = noopChainedReporter{} + +// ChainedReporter is a listener for chainedEnforcer methods, which listens to Close events in addition to +// events used by cost.EnforcerReporter. +type ChainedReporter interface { + cost.EnforcerReporter + + // OnChildClose is called whenever a child of this reporter's chainedEnforcer is released. + OnChildClose(currentCost cost.Cost) + + // OnClose is called whenever this reporter's chainedEnforcer is released. + OnClose(currentCost cost.Cost) +} + +// chainedEnforcer is the actual implementation of ChainedEnforcer. +type chainedEnforcer struct { + resourceName string + local cost.Enforcer + parent *chainedEnforcer + models []cost.Enforcer + reporter ChainedReporter +} + +var noopChainedEnforcer = mustNoopChainedEnforcer() + +func mustNoopChainedEnforcer() ChainedEnforcer { + rtn, err := NewChainedEnforcer("", []cost.Enforcer{cost.NoopEnforcer()}) + if err != nil { + panic(err.Error()) + } + + return rtn +} + +// NoopChainedEnforcer returns a chainedEnforcer which enforces no limits and does no reporting. +func NoopChainedEnforcer() ChainedEnforcer { + return noopChainedEnforcer +} + +// NewChainedEnforcer constructs a chainedEnforcer which creates children using the provided models. +// models[0] enforces this instance; models[1] enforces the first level of children, and so on. +func NewChainedEnforcer(rootResourceName string, models []cost.Enforcer) (ChainedEnforcer, error) { + if len(models) == 0 { + return nil, errors.New("must provide at least one Enforcer instance for a chainedEnforcer") + } + + local := models[0] + + return &chainedEnforcer{ + resourceName: rootResourceName, + parent: nil, // root has nil parent + local: local, + models: models[1:], + reporter: upcastReporterOrNoop(local.Reporter()), + }, nil +} + +func upcastReporterOrNoop(r cost.EnforcerReporter) ChainedReporter { + if r, ok := r.(ChainedReporter); ok { + return r + } + + return noopChainedReporterInstance +} + +// Add adds the given cost both to this enforcer and any parents, working recursively until the root is reached. +// The most local error is preferred. +func (ce *chainedEnforcer) Add(c cost.Cost) cost.Report { + if ce.parent == nil { + return ce.wrapLocalResult(ce.local.Add(c)) + } + + localR := ce.local.Add(c) + globalR := ce.parent.Add(c) + + // check our local limit first + if localR.Error != nil { + return ce.wrapLocalResult(localR) + } + + // check the global limit + if globalR.Error != nil { + return globalR + } + + return localR +} + +func (ce *chainedEnforcer) wrapLocalResult(localR cost.Report) cost.Report { + if localR.Error != nil { + return cost.Report{ + Cost: localR.Cost, + Error: fmt.Errorf("exceeded %s limit: %s", ce.resourceName, localR.Error.Error()), + } + } + return localR +} + +// Child creates a new chainedEnforcer whose resource consumption rolls up into this instance. +func (ce *chainedEnforcer) Child(resourceName string) ChainedEnforcer { + // no more models; just return a noop default. TODO: this could be a panic case? Technically speaking it's + // misconfiguration. + if len(ce.models) == 0 { + return NoopChainedEnforcer() + } + + newLocal := ce.models[0] + return &chainedEnforcer{ + resourceName: resourceName, + parent: ce, + // make sure to clone the local enforcer, so that we're using an + // independent instance with the same configuration. + local: newLocal.Clone(), + models: ce.models[1:], + reporter: upcastReporterOrNoop(newLocal.Reporter()), + } +} + +// Clone on a chainedEnforcer is a noop--TODO: implement? +func (ce *chainedEnforcer) Clone() cost.Enforcer { + return ce +} + +// State returns the local state of this enforcer (ignoring anything further up the chain). +func (ce *chainedEnforcer) State() (cost.Report, cost.Limit) { + return ce.local.State() +} + +// Close releases all resources tracked by this enforcer back to the global enforcer +func (ce *chainedEnforcer) Close() { + r, _ := ce.local.State() + ce.reporter.OnClose(r.Cost) + + if ce.parent != nil { + parentR, _ := ce.parent.State() + ce.parent.reporter.OnChildClose(parentR.Cost) + } + + ce.Add(-r.Cost) +} + +func (ce *chainedEnforcer) Limit() cost.Limit { + return ce.local.Limit() +} + +func (ce *chainedEnforcer) Reporter() cost.EnforcerReporter { + return ce.local.Reporter() +} diff --git a/src/query/cost/cost_mock.go b/src/query/cost/cost_mock.go new file mode 100644 index 0000000000..b0d92fae56 --- /dev/null +++ b/src/query/cost/cost_mock.go @@ -0,0 +1,236 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/m3db/m3/src/query/cost/go + +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package cost is a generated GoMock package. +package cost + +import ( + "reflect" + + cost0 "github.com/m3db/m3/src/x/cost" + + "github.com/golang/mock/gomock" +) + +// MockChainedEnforcer is a mock of ChainedEnforcer interface +type MockChainedEnforcer struct { + ctrl *gomock.Controller + recorder *MockChainedEnforcerMockRecorder +} + +// MockChainedEnforcerMockRecorder is the mock recorder for MockChainedEnforcer +type MockChainedEnforcerMockRecorder struct { + mock *MockChainedEnforcer +} + +// NewMockChainedEnforcer creates a new mock instance +func NewMockChainedEnforcer(ctrl *gomock.Controller) *MockChainedEnforcer { + mock := &MockChainedEnforcer{ctrl: ctrl} + mock.recorder = &MockChainedEnforcerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockChainedEnforcer) EXPECT() *MockChainedEnforcerMockRecorder { + return m.recorder +} + +// Add mocks base method +func (m *MockChainedEnforcer) Add(op cost0.Cost) cost0.Report { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Add", op) + ret0, _ := ret[0].(cost0.Report) + return ret0 +} + +// Add indicates an expected call of Add +func (mr *MockChainedEnforcerMockRecorder) Add(op interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockChainedEnforcer)(nil).Add), op) +} + +// State mocks base method +func (m *MockChainedEnforcer) State() (cost0.Report, cost0.Limit) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "State") + ret0, _ := ret[0].(cost0.Report) + ret1, _ := ret[1].(cost0.Limit) + return ret0, ret1 +} + +// State indicates an expected call of State +func (mr *MockChainedEnforcerMockRecorder) State() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "State", reflect.TypeOf((*MockChainedEnforcer)(nil).State)) +} + +// Limit mocks base method +func (m *MockChainedEnforcer) Limit() cost0.Limit { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Limit") + ret0, _ := ret[0].(cost0.Limit) + return ret0 +} + +// Limit indicates an expected call of Limit +func (mr *MockChainedEnforcerMockRecorder) Limit() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Limit", reflect.TypeOf((*MockChainedEnforcer)(nil).Limit)) +} + +// Clone mocks base method +func (m *MockChainedEnforcer) Clone() cost0.Enforcer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Clone") + ret0, _ := ret[0].(cost0.Enforcer) + return ret0 +} + +// Clone indicates an expected call of Clone +func (mr *MockChainedEnforcerMockRecorder) Clone() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Clone", reflect.TypeOf((*MockChainedEnforcer)(nil).Clone)) +} + +// Reporter mocks base method +func (m *MockChainedEnforcer) Reporter() cost0.EnforcerReporter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Reporter") + ret0, _ := ret[0].(cost0.EnforcerReporter) + return ret0 +} + +// Reporter indicates an expected call of Reporter +func (mr *MockChainedEnforcerMockRecorder) Reporter() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reporter", reflect.TypeOf((*MockChainedEnforcer)(nil).Reporter)) +} + +// Child mocks base method +func (m *MockChainedEnforcer) Child(resourceName string) ChainedEnforcer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Child", resourceName) + ret0, _ := ret[0].(ChainedEnforcer) + return ret0 +} + +// Child indicates an expected call of Child +func (mr *MockChainedEnforcerMockRecorder) Child(resourceName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Child", reflect.TypeOf((*MockChainedEnforcer)(nil).Child), resourceName) +} + +// Close mocks base method +func (m *MockChainedEnforcer) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close +func (mr *MockChainedEnforcerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockChainedEnforcer)(nil).Close)) +} + +// MockChainedReporter is a mock of ChainedReporter interface +type MockChainedReporter struct { + ctrl *gomock.Controller + recorder *MockChainedReporterMockRecorder +} + +// MockChainedReporterMockRecorder is the mock recorder for MockChainedReporter +type MockChainedReporterMockRecorder struct { + mock *MockChainedReporter +} + +// NewMockChainedReporter creates a new mock instance +func NewMockChainedReporter(ctrl *gomock.Controller) *MockChainedReporter { + mock := &MockChainedReporter{ctrl: ctrl} + mock.recorder = &MockChainedReporterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockChainedReporter) EXPECT() *MockChainedReporterMockRecorder { + return m.recorder +} + +// ReportCost mocks base method +func (m *MockChainedReporter) ReportCost(c cost0.Cost) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ReportCost", c) +} + +// ReportCost indicates an expected call of ReportCost +func (mr *MockChainedReporterMockRecorder) ReportCost(c interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportCost", reflect.TypeOf((*MockChainedReporter)(nil).ReportCost), c) +} + +// ReportCurrent mocks base method +func (m *MockChainedReporter) ReportCurrent(c cost0.Cost) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ReportCurrent", c) +} + +// ReportCurrent indicates an expected call of ReportCurrent +func (mr *MockChainedReporterMockRecorder) ReportCurrent(c interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportCurrent", reflect.TypeOf((*MockChainedReporter)(nil).ReportCurrent), c) +} + +// ReportOverLimit mocks base method +func (m *MockChainedReporter) ReportOverLimit(enabled bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ReportOverLimit", enabled) +} + +// ReportOverLimit indicates an expected call of ReportOverLimit +func (mr *MockChainedReporterMockRecorder) ReportOverLimit(enabled interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportOverLimit", reflect.TypeOf((*MockChainedReporter)(nil).ReportOverLimit), enabled) +} + +// OnChildClose mocks base method +func (m *MockChainedReporter) OnChildClose(currentCost cost0.Cost) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnChildClose", currentCost) +} + +// OnChildClose indicates an expected call of OnChildClose +func (mr *MockChainedReporterMockRecorder) OnChildClose(currentCost interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnChildClose", reflect.TypeOf((*MockChainedReporter)(nil).OnChildClose), currentCost) +} + +// OnClose mocks base method +func (m *MockChainedReporter) OnClose(currentCost cost0.Cost) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnClose", currentCost) +} + +// OnClose indicates an expected call of OnClose +func (mr *MockChainedReporterMockRecorder) OnClose(currentCost interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnClose", reflect.TypeOf((*MockChainedReporter)(nil).OnClose), currentCost) +} diff --git a/src/query/cost/cost_prop_test.go b/src/query/cost/cost_prop_test.go new file mode 100644 index 0000000000..5bcc726b04 --- /dev/null +++ b/src/query/cost/cost_prop_test.go @@ -0,0 +1,77 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cost + +import ( + "math" + "sync" + "testing" + + "github.com/m3db/m3/src/x/cost" + + "github.com/leanovate/gopter" + "github.com/leanovate/gopter/gen" + "github.com/leanovate/gopter/prop" + "github.com/stretchr/testify/require" +) + +func TestPropertyPerQueryEnforcerAlwaysEndsUpZero(t *testing.T) { + testParams := gopter.DefaultTestParameters() + testParams.MinSuccessfulTests = 1000 + props := gopter.NewProperties(testParams) + + globalEndsUpZero := func(costs []float64, perQueryThreshold, globalThreshold float64) bool { + pqfIFace, err := NewChainedEnforcer( + "", + []cost.Enforcer{newTestEnforcer(cost.Limit{Threshold: cost.Cost(globalThreshold), Enabled: true}), + newTestEnforcer(cost.Limit{Threshold: cost.Cost(perQueryThreshold), Enabled: true})}) + require.NoError(t, err) + pqf := pqfIFace.(*chainedEnforcer) + wg := sync.WaitGroup{} + for _, c := range costs { + wg.Add(1) + go func(c float64) { + defer wg.Done() + + perQuery := pqf.Child("query") + defer perQuery.Close() + perQuery.Add(cost.Cost(c)) + }(c) + } + + wg.Wait() + r, _ := pqf.local.State() + + // do delta comparison to deal with floating point errors. TODO: cost could potentially be an int + const tolerance = 0.000001 + return math.Abs(float64(r.Cost)) < tolerance + } + + props.Property("global enforcer >= 0", + prop.ForAll( + globalEndsUpZero, + gen.SliceOf(gen.Float64Range(0, 10000)), + gen.Float64Range(0.0, 10000), + gen.Float64Range(0.0, 10000), + )) + + props.TestingRun(t) +} diff --git a/src/query/cost/cost_test.go b/src/query/cost/cost_test.go new file mode 100644 index 0000000000..a554f45d98 --- /dev/null +++ b/src/query/cost/cost_test.go @@ -0,0 +1,242 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cost + +import ( + "fmt" + "math" + "testing" + + "github.com/m3db/m3/src/x/cost" + "github.com/m3db/m3/src/x/cost/test" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestChainedEnforcer_Child(t *testing.T) { + t.Run("creates independent local enforcers with shared global enforcer", func(t *testing.T) { + globalEnforcer := newTestEnforcer(cost.Limit{Threshold: 10.0, Enabled: true}) + localEnforcer := newTestEnforcer(cost.Limit{Threshold: 5.0, Enabled: true}) + + pef, err := NewChainedEnforcer("", []cost.Enforcer{globalEnforcer, localEnforcer}) + require.NoError(t, err) + + l1, l2 := pef.Child("foo"), pef.Child("foo") + + l1.Add(2) + + test.AssertCurrentCost(t, 2.0, l1) + test.AssertCurrentCost(t, 0.0, l2) + test.AssertCurrentCost(t, 2.0, globalEnforcer) + }) + + t.Run("returns a noop enforcer once out of models", func(t *testing.T) { + globalEnforcer := newTestEnforcer(cost.Limit{Threshold: 10.0, Enabled: true}) + pef, err := NewChainedEnforcer("", []cost.Enforcer{globalEnforcer}) + require.NoError(t, err) + + child := pef.Child("foo") + assert.Equal(t, noopChainedEnforcer, child) + }) +} + +func TestChainedEnforcer_Close(t *testing.T) { + t.Run("removes local total from global", func(t *testing.T) { + parentIface, err := NewChainedEnforcer( + "", + []cost.Enforcer{newTestEnforcer(cost.Limit{Threshold: 10.0, Enabled: true}), + newTestEnforcer(cost.Limit{Threshold: 5.0, Enabled: true})}) + require.NoError(t, err) + parent := parentIface.(*chainedEnforcer) + + pqe1, pqe2 := parent.Child("query"), parent.Child("query") + + pqe1.Add(cost.Cost(5.0)) + pqe1.Add(cost.Cost(6.0)) + + pqe2.Add(cost.Cost(7.0)) + + pqe1.Close() + + test.AssertCurrentCost(t, cost.Cost(7.0), parent.local) + pqe2.Close() + test.AssertCurrentCost(t, cost.Cost(0.0), parent.local) + }) + + t.Run("calls into reporter on release", func(t *testing.T) { + ctrl := gomock.NewController(t) + makeEnforcer := func(cr ChainedReporter) cost.Enforcer { + return cost.NewEnforcer(cost.NewStaticLimitManager(cost.NewLimitManagerOptions()), cost.NewTracker(), + cost.NewEnforcerOptions().SetReporter(cr)) + } + + makeReporter := func() *MockChainedReporter { + r := NewMockChainedReporter(ctrl) + r.EXPECT().ReportCurrent(gomock.Any()).AnyTimes() + r.EXPECT().ReportOverLimit(gomock.Any()).AnyTimes() + r.EXPECT().ReportCost(gomock.Any()).AnyTimes() + return r + } + + globalReporter, localReporter := makeReporter(), makeReporter() + + ce, err := NewChainedEnforcer( + "global", + []cost.Enforcer{makeEnforcer(globalReporter), makeEnforcer(localReporter)}) + + child := ce.Child("foo") + child.Add(1.0) + + require.NoError(t, err) + + globalReporter.EXPECT().OnChildClose(floatMatcher(1.0)) + localReporter.EXPECT().OnClose(floatMatcher(1.0)) + + child.Close() + }) +} + +// floatMatcher does a janky delta comparison between floats, since rounding error makes float equality treacherous +type floatMatcher float64 + +func (f floatMatcher) Matches(x interface{}) bool { + other, ok := x.(cost.Cost) + if !ok { + return false + } + + // janky delta comparison + return math.Abs(float64(f)-float64(other)) < 0.00001 +} + +func (f floatMatcher) String() string { + return fmt.Sprintf("%f", f) +} + +func TestChainedEnforcer_Add(t *testing.T) { + assertGlobalError := func(t *testing.T, err error) { + if assert.Error(t, err) { + assert.Regexp(t, "exceeded global limit", err.Error()) + } + } + + assertLocalError := func(t *testing.T, err error) { + if assert.Error(t, err) { + assert.Regexp(t, "exceeded query limit", err.Error()) + } + } + + t.Run("errors on global error", func(t *testing.T) { + pqe := newTestChainedEnforcer(5.0, 100.0) + r := pqe.Add(cost.Cost(6.0)) + assertGlobalError(t, r.Error) + }) + + t.Run("errors on local error", func(t *testing.T) { + pqe := newTestChainedEnforcer(100.0, 5.0) + r := pqe.Add(cost.Cost(6.0)) + assertLocalError(t, r.Error) + }) + + t.Run("adds to local in case of global error", func(t *testing.T) { + pqe := newTestChainedEnforcer(5.0, 100.0) + r := pqe.Add(cost.Cost(6.0)) + assertGlobalError(t, r.Error) + + r, _ = pqe.State() + assert.Equal(t, cost.Report{ + Error: nil, + Cost: 6.0}, + r) + }) + + t.Run("adds to global in case of local error", func(t *testing.T) { + pqe := newTestChainedEnforcer(100.0, 5.0) + r := pqe.Add(cost.Cost(6.0)) + assertLocalError(t, r.Error) + + r, _ = pqe.parent.State() + assert.Equal(t, cost.Report{ + Error: nil, + Cost: 6.0}, + r) + }) + + t.Run("release after local error", func(t *testing.T) { + pqe := newTestChainedEnforcer(10.0, 5.0) + + // exceeds local + r := pqe.Add(6.0) + assertLocalError(t, r.Error) + + pqe.Close() + test.AssertCurrentCost(t, 0.0, pqe.local) + }) + + t.Run("release after global error", func(t *testing.T) { + pqe := newTestChainedEnforcer(5.0, 10.0) + // exceeds global + r := pqe.Add(6.0) + assertGlobalError(t, r.Error) + pqe.Close() + test.AssertCurrentCost(t, 0.0, pqe.local) + }) +} + +func TestChainedEnforcer_State(t *testing.T) { + pqe := newTestChainedEnforcer(10.0, 5.0) + pqe.Add(15.0) + + r, l := pqe.State() + assert.Equal(t, cost.Cost(15), r.Cost) + test.AssertLimitError(t, r.Error, 15.0, 5.0) + assert.Equal(t, cost.Limit{Threshold: 5.0, Enabled: true}, l) +} + +func TestNoopChainedEnforcer_Close(t *testing.T) { + ce := NoopChainedEnforcer() + ce.Close() + test.AssertCurrentCost(t, 0.0, ce) +} + +// utils + +func newTestEnforcer(limit cost.Limit) cost.Enforcer { + return cost.NewEnforcer( + cost.NewStaticLimitManager(cost.NewLimitManagerOptions().SetDefaultLimit(limit)), + cost.NewTracker(), + nil, + ) +} + +func newTestChainedEnforcer(globalLimit float64, localLimit float64) *chainedEnforcer { + rtn, err := NewChainedEnforcer( + "global", + []cost.Enforcer{newTestEnforcer(cost.Limit{Threshold: cost.Cost(globalLimit), Enabled: true}), + newTestEnforcer(cost.Limit{Threshold: cost.Cost(localLimit), Enabled: true})}) + if err != nil { + panic(err.Error()) + } + + return rtn.Child("query").(*chainedEnforcer) +} diff --git a/src/query/executor/engine.go b/src/query/executor/engine.go index b1a70078a8..cab58d2dbf 100644 --- a/src/query/executor/engine.go +++ b/src/query/executor/engine.go @@ -24,6 +24,7 @@ import ( "context" "time" + qcost "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/storage" @@ -35,6 +36,8 @@ import ( // Engine executes a Query. type Engine struct { metrics *engineMetrics + costScope tally.Scope + globalEnforcer qcost.ChainedEnforcer store storage.Storage lookbackDuration time.Duration } @@ -50,11 +53,16 @@ type Query struct { } // NewEngine returns a new instance of QueryExecutor. -func NewEngine(store storage.Storage, scope tally.Scope, lookbackDuration time.Duration) *Engine { +func NewEngine(store storage.Storage, scope tally.Scope, lookbackDuration time.Duration, factory qcost.ChainedEnforcer) *Engine { + if factory == nil { + factory = qcost.NoopChainedEnforcer() + } return &Engine{ metrics: newEngineMetrics(scope), + costScope: scope, store: store, lookbackDuration: lookbackDuration, + globalEnforcer: factory, } } @@ -134,6 +142,11 @@ func (e *Engine) ExecuteExpr( ) { defer close(results) + perQueryEnforcer := e.globalEnforcer.Child(qcost.QueryLevel) + defer func() { + perQueryEnforcer.Close() + }() + req := newRequest(e, params) nodes, edges, err := req.compile(ctx, parser) @@ -161,7 +174,7 @@ func (e *Engine) ExecuteExpr( result := state.resultNode results <- Query{Result: result} - if err := state.Execute(models.NewQueryContext(ctx, tally.NoopScope)); err != nil { + if err := state.Execute(models.NewQueryContext(ctx, e.costScope, perQueryEnforcer)); err != nil { result.abort(err) } else { result.done() diff --git a/src/query/executor/engine_test.go b/src/query/executor/engine_test.go index e68c75e42b..6425bfe5c7 100644 --- a/src/query/executor/engine_test.go +++ b/src/query/executor/engine_test.go @@ -26,12 +26,17 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/cost" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser/promql" "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/storage/mock" "github.com/m3db/m3/src/query/test/m3" "github.com/m3db/m3/src/query/util/logging" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/uber-go/tally" ) @@ -44,8 +49,40 @@ func TestEngine_Execute(t *testing.T) { // Results is closed by execute results := make(chan *storage.QueryResult) - engine := NewEngine(store, tally.NewTestScope("test", nil), time.Minute) + engine := NewEngine(store, tally.NewTestScope("test", nil), time.Minute, nil) go engine.Execute(context.TODO(), &storage.FetchQuery{}, &EngineOptions{}, results) res := <-results assert.NotNil(t, res.Err) } + +func TestEngine_ExecuteExpr(t *testing.T) { + t.Run("releases and reports on completion", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockEnforcer := cost.NewMockChainedEnforcer(ctrl) + mockEnforcer.EXPECT().Close().Times(1) + + mockParent := cost.NewMockChainedEnforcer(ctrl) + mockParent.EXPECT().Child(gomock.Any()).Return(mockEnforcer) + + parser, err := promql.Parse("foo", models.NewTagOptions()) + require.NoError(t, err) + + results := make(chan Query) + engine := NewEngine(mock.NewMockStorage(), tally.NewTestScope("", nil), defaultLookbackDuration, mockParent) + go engine.ExecuteExpr(context.TODO(), parser, &EngineOptions{}, models.RequestParams{ + Start: time.Now().Add(-2 * time.Second), + End: time.Now(), + Step: time.Second, + }, results) + + // drain the channel + var resSl []Query + for r := range results { + resSl = append(resSl, r) + } + require.Len(t, resSl, 1) + + res := resSl[0] + require.NoError(t, res.Err) + }) +} diff --git a/src/query/executor/transform/controller.go b/src/query/executor/transform/controller.go index 12efded782..ba9a72bd28 100644 --- a/src/query/executor/transform/controller.go +++ b/src/query/executor/transform/controller.go @@ -53,7 +53,7 @@ func (t *Controller) BlockBuilder( queryCtx *models.QueryContext, blockMeta block.Metadata, seriesMeta []block.SeriesMeta) (block.Builder, error) { - return block.NewColumnBlockBuilder(blockMeta, seriesMeta), nil + return block.NewColumnBlockBuilder(queryCtx, blockMeta, seriesMeta), nil } // HasMultipleOperations returns true if there are multiple operations. diff --git a/src/query/functions/fetch.go b/src/query/functions/fetch.go index 5778438087..2a0760eb30 100644 --- a/src/query/functions/fetch.go +++ b/src/query/functions/fetch.go @@ -100,6 +100,8 @@ func (n *FetchNode) fetch(queryCtx *models.QueryContext) (block.Result, error) { opts := storage.NewFetchOptions() opts.BlockType = n.blockType + opts.Scope = queryCtx.Scope + opts.Enforcer = queryCtx.Enforcer return n.storage.FetchBlocks(ctx, &storage.FetchQuery{ Start: startTime, diff --git a/src/query/functions/tag/join_test.go b/src/query/functions/tag/join_test.go index d6982e39f4..08a1733167 100644 --- a/src/query/functions/tag/join_test.go +++ b/src/query/functions/tag/join_test.go @@ -184,7 +184,7 @@ func TestTagJoinOp(t *testing.T) { seriesMeta[i] = block.SeriesMeta{Tags: test.StringTagsToTags(t)} } - bl := block.NewColumnBlockBuilder(meta, seriesMeta).Build() + bl := block.NewColumnBlockBuilder(models.NoopQueryContext(), meta, seriesMeta).Build() c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(baseOp).Node(c, transform.Options{}) err = node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) diff --git a/src/query/functions/tag/replace_test.go b/src/query/functions/tag/replace_test.go index b82d76cdf0..a1a6b0402e 100644 --- a/src/query/functions/tag/replace_test.go +++ b/src/query/functions/tag/replace_test.go @@ -162,7 +162,7 @@ func TestTagReplaceOp(t *testing.T) { seriesMeta[i] = block.SeriesMeta{Tags: test.StringTagsToTags(t)} } - bl := block.NewColumnBlockBuilder(meta, seriesMeta).Build() + bl := block.NewColumnBlockBuilder(models.NoopQueryContext(), meta, seriesMeta).Build() c, sink := executor.NewControllerWithSink(parser.NodeID(1)) node := op.(baseOp).Node(c, transform.Options{}) err = node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go index b5aee6af43..fe63446303 100644 --- a/src/query/functions/temporal/base_test.go +++ b/src/query/functions/temporal/base_test.go @@ -464,7 +464,7 @@ func setupCloseableBlock(ctrl *gomock.Controller, node *baseNode) closeableBlock blockMeta block.Metadata, seriesMeta []block.SeriesMeta) (block.Builder, error) { mb := &closeSpyBlockBuilder{ - Builder: block.NewColumnBlockBuilder(blockMeta, seriesMeta), + Builder: block.NewColumnBlockBuilder(models.NoopQueryContext(), blockMeta, seriesMeta), } mockBuilders = append(mockBuilders, mb) return mb, nil diff --git a/src/query/generated/mocks/generate.go b/src/query/generated/mocks/generate.go index 5794930481..337f080c57 100644 --- a/src/query/generated/mocks/generate.go +++ b/src/query/generated/mocks/generate.go @@ -28,5 +28,6 @@ //go:generate sh -c "mockgen -package=m3ql -destination=$GOPATH/src/github.com/m3db/m3/src/query/parser/m3ql/types_mock.go -source=$GOPATH/src/github.com/m3db/m3/src/query/parser/m3ql/types.go" //go:generate sh -c "mockgen -package=transform -destination=$GOPATH/src/github.com/m3db/m3/src/query/executor/transform/exec_mock.go -source=$GOPATH/src/github.com/m3db/m3/src/query/executor/transform/exec.go" //go:generate sh -c "mockgen -package=temporal -destination=$GOPATH/src/github.com/m3db/m3/src/query/functions/temporal/dependencies_mock.go -source=$GOPATH/src/github.com/m3db/m3/src/query/functions/temporal/dependencies.go" controller +//go:generate sh -c "mockgen -package=cost -destination=$GOPATH/src/github.com/m3db/m3/src/query/cost/cost_mock.go -source=$GOPATH/src/github.com/m3db/m3/src/query/cost/cost.go" package mocks diff --git a/src/query/models/query_context.go b/src/query/models/query_context.go index 5da6e50dd7..5fa1de52bb 100644 --- a/src/query/models/query_context.go +++ b/src/query/models/query_context.go @@ -23,6 +23,8 @@ package models import ( "context" + "github.com/m3db/m3/src/query/cost" + "github.com/uber-go/tally" ) @@ -30,22 +32,27 @@ import ( // It acts as a hook back into the execution engine for things like // cost accounting. type QueryContext struct { - Ctx context.Context - Scope tally.Scope + Ctx context.Context + Scope tally.Scope + Enforcer cost.ChainedEnforcer } // NewQueryContext constructs a QueryContext using the given Enforcer to // enforce per query limits. -func NewQueryContext(ctx context.Context, scope tally.Scope) *QueryContext { +func NewQueryContext( + ctx context.Context, + scope tally.Scope, + enforcer cost.ChainedEnforcer) *QueryContext { return &QueryContext{ - Ctx: ctx, - Scope: scope, + Ctx: ctx, + Scope: scope, + Enforcer: enforcer, } } // NoopQueryContext returns a query context with no active components. func NoopQueryContext() *QueryContext { - return NewQueryContext(context.Background(), tally.NoopScope) + return NewQueryContext(context.Background(), tally.NoopScope, cost.NoopChainedEnforcer()) } // WithContext creates a shallow copy of this QueryContext using the new context. diff --git a/src/query/server/cost_reporters.go b/src/query/server/cost_reporters.go new file mode 100644 index 0000000000..76f4d2e0bb --- /dev/null +++ b/src/query/server/cost_reporters.go @@ -0,0 +1,209 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package server + +// This file contains reporters and setup for our query/cost.ChainedEnforcer +// instances. +import ( + "sync" + + "github.com/m3db/m3/src/cmd/services/m3query/config" + qcost "github.com/m3db/m3/src/query/cost" + "github.com/m3db/m3/src/x/cost" + "github.com/m3db/m3x/instrument" + + "github.com/uber-go/tally" +) + +const ( + queriesOverLimitMetric = "over_datapoints_limit" + datapointsMetric = "datapoints" + datapointsCounterMetric = "datapoints_counter" + maxDatapointsHistMetric = "max_datapoints_hist" +) + +// newConfiguredChainedEnforcer returns a ChainedEnforcer with 3 configured +// levels: global, per-query, per-block. Global and per-query both have limits +// on them (as configured by cfg.Limits); per-block is purely for accounting +// purposes. +// Our enforcers report at least these stats: +// cost.global.datapoints: gauge; the number of datapoints currently in use +// by this instance. +// +// cost.global.datapoints_counter: counter; counter representation of the +// number of datapoints in use by this instance +// +// cost.{per_query,global}.over_datapoints_limit: counter; how many queries are over the +// datapoint limit +// +// cost.per_query.max_datapoints_hist: histogram; represents the +// distribution of the maximum datapoints used at any point in each query. +func newConfiguredChainedEnforcer(cfg *config.Configuration, instrumentOptions instrument.Options) (qcost.ChainedEnforcer, error) { + costScope := instrumentOptions.MetricsScope().SubScope("cost") + costIops := instrumentOptions.SetMetricsScope(costScope) + limitMgr := cost.NewStaticLimitManager(cfg.Limits.Global.AsLimitManagerOptions().SetInstrumentOptions(costIops)) + tracker := cost.NewTracker() + + globalEnforcer := cost.NewEnforcer(limitMgr, tracker, + cost.NewEnforcerOptions().SetReporter( + newGlobalReporter(costScope.SubScope("global")), + ).SetCostExceededMessage("limits.global.maxFetchedDatapoints exceeded"), + ) + + queryEnforcerOpts := cost.NewEnforcerOptions().SetCostExceededMessage("limits.perQuery.maxFetchedDatapoints exceeded"). + SetReporter(newPerQueryReporter(costScope. + SubScope("per_query"))) + + queryEnforcer := cost.NewEnforcer( + cost.NewStaticLimitManager(cfg.Limits.PerQuery.AsLimitManagerOptions()), + cost.NewTracker(), + queryEnforcerOpts) + + blockEnforcer := cost.NewEnforcer( + cost.NewStaticLimitManager(cost.NewLimitManagerOptions().SetDefaultLimit(cost.Limit{Enabled: false})), + cost.NewTracker(), + nil, + ) + + return qcost.NewChainedEnforcer(qcost.GlobalLevel, []cost.Enforcer{ + globalEnforcer, + queryEnforcer, + blockEnforcer, + }) +} + +// globalReporter records ChainedEnforcer statistics for the global enforcer. +type globalReporter struct { + datapoints tally.Gauge + datapointsCounter tally.Counter + overLimit overLimitReporter +} + +// assert we implement the interface +var _ cost.EnforcerReporter = (*globalReporter)(nil) + +func newGlobalReporter(s tally.Scope) *globalReporter { + return &globalReporter{ + datapoints: s.Gauge(datapointsMetric), + datapointsCounter: s.Counter(datapointsCounterMetric), + overLimit: newOverLimitReporter(s), + } +} + +func (gr *globalReporter) ReportCurrent(c cost.Cost) { + gr.datapoints.Update(float64(c)) +} + +// ReportCost for global reporters sends the new incoming cost to a counter. +// Since counters can only be incremented, it ignores negative values. +func (gr *globalReporter) ReportCost(c cost.Cost) { + if c > 0 { + gr.datapointsCounter.Inc(int64(c)) + } +} + +// ReportOverLimit delegates to gr.overLimit +func (gr *globalReporter) ReportOverLimit(enabled bool) { + gr.overLimit.ReportOverLimit(enabled) +} + +// perQueryReporter records ChainedEnforcer statistics on a per query level. +type perQueryReporter struct { + mu *sync.Mutex + maxDatapoints cost.Cost + queryHisto tally.Histogram + overLimit overLimitReporter +} + +// assert we implement the interface +var _ qcost.ChainedReporter = (*perQueryReporter)(nil) + +func newPerQueryReporter(scope tally.Scope) *perQueryReporter { + return &perQueryReporter{ + mu: &sync.Mutex{}, + maxDatapoints: 0, + queryHisto: scope.Histogram(maxDatapointsHistMetric, + tally.MustMakeExponentialValueBuckets(10.0, 10.0, 6)), + overLimit: newOverLimitReporter(scope), + } +} + +// ReportCost is a noop for perQueryReporter because it's noisy to report +// the current cost for every query (hard to meaningfully divide out). +// Instead, we report the max datapoints at the end of the query--see on +// release. +func (perQueryReporter) ReportCost(c cost.Cost) {} + +// ReportCurrent is a noop for perQueryReporter--see ReportCost for +// explanation. +func (perQueryReporter) ReportCurrent(c cost.Cost) {} + +// ReportOverLimit reports when a query is over its per query limit. +func (pr *perQueryReporter) ReportOverLimit(enabled bool) { + pr.overLimit.ReportOverLimit(enabled) +} + +// OnChildClose takes the max of the current cost for this query and the +// previously recorded cost. We do this OnChildRelease instead of on +// ReportCurrent to avoid locking every time we add to the Enforcer. +func (pr *perQueryReporter) OnChildClose(curCost cost.Cost) { + pr.mu.Lock() + if curCost > pr.maxDatapoints { + pr.maxDatapoints = curCost + } + pr.mu.Unlock() +} + +// OnClose records the maximum cost seen by this reporter. +func (pr *perQueryReporter) OnClose(curCost cost.Cost) { + pr.mu.Lock() + pr.queryHisto.RecordValue(float64(pr.maxDatapoints)) + pr.mu.Unlock() +} + +// overLimitReporter factors out reporting over limit cases for both global +// and per query enforcer reporters. +type overLimitReporter struct { + queriesOverLimitDisabled tally.Counter + queriesOverLimitEnabled tally.Counter +} + +func newOverLimitReporter(scope tally.Scope) overLimitReporter { + return overLimitReporter{ + queriesOverLimitDisabled: scope.Tagged(map[string]string{ + "enabled": "false", + }).Counter(queriesOverLimitMetric), + + queriesOverLimitEnabled: scope.Tagged(map[string]string{ + "enabled": "true", + }).Counter(queriesOverLimitMetric), + } +} + +// ReportOverLimit increments .over_limit, tagged by +// "enabled". +func (olr overLimitReporter) ReportOverLimit(enabled bool) { + if enabled { + olr.queriesOverLimitEnabled.Inc(1) + } else { + olr.queriesOverLimitDisabled.Inc(1) + } +} diff --git a/src/query/server/cost_reporters_test.go b/src/query/server/cost_reporters_test.go new file mode 100644 index 0000000000..5030077905 --- /dev/null +++ b/src/query/server/cost_reporters_test.go @@ -0,0 +1,282 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package server + +import ( + "fmt" + "math" + "testing" + + "github.com/m3db/m3/src/cmd/services/m3query/config" + "github.com/m3db/m3/src/query/cost" + "github.com/m3db/m3/src/x/cost/test" + "github.com/m3db/m3x/instrument" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" +) + +func TestNewConfiguredChainedEnforcer(t *testing.T) { + type testCtx struct { + Scope tally.TestScope + GlobalEnforcer cost.ChainedEnforcer + } + + setup := func(t *testing.T, perQueryLimit, globalLimit int64) testCtx { + s := tally.NewTestScope("", nil) + iopts := instrument.NewOptions().SetMetricsScope(s) + + globalEnforcer, err := newConfiguredChainedEnforcer(&config.Configuration{ + Limits: config.LimitsConfiguration{ + PerQuery: config.PerQueryLimitsConfiguration{ + MaxFetchedDatapoints: perQueryLimit, + }, + Global: config.GlobalLimitsConfiguration{ + MaxFetchedDatapoints: globalLimit, + }, + }, + }, iopts) + + require.NoError(t, err) + + return testCtx{ + Scope: s, + GlobalEnforcer: globalEnforcer, + } + } + + t.Run("has 3 valid levels", func(t *testing.T) { + tctx := setup(t, 6, 10) + + assertValid := func(ce cost.ChainedEnforcer) { + assert.NotEqual(t, ce, cost.NoopChainedEnforcer()) + } + + assertValid(tctx.GlobalEnforcer) + + qe := tctx.GlobalEnforcer.Child(cost.QueryLevel) + assertValid(qe) + + block := qe.Child(cost.BlockLevel) + assertValid(block) + + badLevel := block.Child("nonExistent") + assert.Equal(t, cost.NoopChainedEnforcer(), + badLevel) + }) + + t.Run("configures reporters", func(t *testing.T) { + tctx := setup(t, 6, 10) + queryEf := tctx.GlobalEnforcer.Child(cost.QueryLevel) + blockEf := queryEf.Child(cost.BlockLevel) + blockEf.Add(7) + + assertHasGauge(t, + tctx.Scope.Snapshot(), + tally.KeyForPrefixedStringMap( + fmt.Sprintf("cost.global.%s", datapointsMetric), nil), + 7, + ) + + blockEf.Close() + queryEf.Close() + + assertHasHistogram(t, + tctx.Scope.Snapshot(), + tally.KeyForPrefixedStringMap( + fmt.Sprintf("cost.per_query.%s", maxDatapointsHistMetric), nil), + map[float64]int64{10: 1}, + ) + }) + + t.Run("block level doesn't have a limit", func(t *testing.T) { + tctx := setup(t, -1, -1) + block := tctx.GlobalEnforcer.Child(cost.QueryLevel).Child(cost.BlockLevel) + assert.NoError(t, block.Add(math.MaxFloat64-1).Error) + }) + + t.Run("works e2e", func(t *testing.T) { + tctx := setup(t, 6, 10) + + qe1, qe2 := tctx.GlobalEnforcer.Child(cost.QueryLevel), tctx.GlobalEnforcer.Child(cost.QueryLevel) + r := qe1.Add(6) + test.AssertLimitErrorWithMsg( + t, + r.Error, + "exceeded query limit: limits.perQuery.maxFetchedDatapoints exceeded", + 6, + 6) + + r = qe2.Add(3) + require.NoError(t, r.Error) + + r = qe2.Add(2) + test.AssertLimitErrorWithMsg( + t, + r.Error, + "exceeded global limit: limits.global.maxFetchedDatapoints exceeded", + 11, + 10) + + test.AssertCurrentCost(t, 11, tctx.GlobalEnforcer) + + qe2.Close() + test.AssertCurrentCost(t, 6, tctx.GlobalEnforcer) + + // check the block level + blockEf := qe1.Child(cost.BlockLevel) + blockEf.Add(2) + + test.AssertCurrentCost(t, 2, blockEf) + test.AssertCurrentCost(t, 8, qe1) + test.AssertCurrentCost(t, 8, tctx.GlobalEnforcer) + }) +} + +func setupGlobalReporter() (tally.TestScope, *globalReporter) { + s := tally.NewTestScope("", nil) + gr := newGlobalReporter(s) + return s, gr +} + +func TestGlobalReporter_ReportCurrent(t *testing.T) { + s, gr := setupGlobalReporter() + + gr.ReportCurrent(5.0) + assertHasGauge(t, s.Snapshot(), tally.KeyForPrefixedStringMap(datapointsMetric, nil), 5.0) +} + +func TestGlobalReporter_ReportCost(t *testing.T) { + t.Run("reports positive", func(t *testing.T) { + s, gr := setupGlobalReporter() + gr.ReportCost(5.0) + assertHasCounter(t, s.Snapshot(), tally.KeyForPrefixedStringMap(datapointsCounterMetric, nil), 5.0) + }) + + t.Run("skips negative", func(t *testing.T) { + s, gr := setupGlobalReporter() + gr.ReportCost(-5.0) + + assertHasCounter(t, s.Snapshot(), tally.KeyForPrefixedStringMap(datapointsCounterMetric, nil), 0.0) + }) +} + +func TestGlobalReporter_ReportOverLimit(t *testing.T) { + s, gr := setupGlobalReporter() + gr.ReportOverLimit(true) + assertHasCounter(t, s.Snapshot(), tally.KeyForPrefixedStringMap(queriesOverLimitMetric, map[string]string{ + "enabled": "true", + }), 1) +} + +func setupPerQueryReporter() (tally.TestScope, *perQueryReporter) { + s := tally.NewTestScope("", nil) + gr := newPerQueryReporter(s) + return s, gr +} + +func TestPerQueryReporter_ReportOverLimit(t *testing.T) { + s, pqr := setupPerQueryReporter() + pqr.ReportOverLimit(true) + assertHasCounter(t, s.Snapshot(), tally.KeyForPrefixedStringMap(queriesOverLimitMetric, map[string]string{ + "enabled": "true", + }), 1) +} + +func TestPerQueryReporter_OnClose(t *testing.T) { + s, pqr := setupPerQueryReporter() + pqr.OnChildClose(5.0) + pqr.OnChildClose(110.0) + + // ignores current cost + pqr.OnClose(100.0) + assertHasHistogram(t, s.Snapshot(), + tally.KeyForPrefixedStringMap(maxDatapointsHistMetric, nil), + map[float64]int64{ + 1000.0: 1, + }) +} + +func TestPerQueryReporter_OnChildClose(t *testing.T) { + _, pqr := setupPerQueryReporter() + pqr.OnChildClose(5.0) + pqr.OnChildClose(110.0) + + assert.InDelta(t, 110.0, float64(pqr.maxDatapoints), 0.0001) +} + +func TestOverLimitReporter_ReportOverLimit(t *testing.T) { + s := tally.NewTestScope("", nil) + orl := newOverLimitReporter(s) + + orl.ReportOverLimit(true) + assertHasCounter(t, s.Snapshot(), tally.KeyForPrefixedStringMap(queriesOverLimitMetric, map[string]string{ + "enabled": "true", + }), 1) + + orl.ReportOverLimit(false) + assertHasCounter(t, s.Snapshot(), tally.KeyForPrefixedStringMap(queriesOverLimitMetric, map[string]string{ + "enabled": "true", + }), 1) +} + +func assertHasCounter(t *testing.T, snapshot tally.Snapshot, key string, v int) { + counters := snapshot.Counters() + if !assert.Contains(t, counters, key, "No such metric: %s", key) { + return + } + + counter := counters[key] + + assert.Equal(t, int(counter.Value()), v, "Incorrect value for counter %s", key) +} + +func assertHasGauge(t *testing.T, snapshot tally.Snapshot, key string, v int) { + gauges := snapshot.Gauges() + if !assert.Contains(t, gauges, key, "No such metric: %s", key) { + return + } + + gauge := gauges[key] + + assert.Equal(t, int(gauge.Value()), v, "Incorrect value for gauge %s", key) +} + +func assertHasHistogram(t *testing.T, snapshot tally.Snapshot, key string, values map[float64]int64) { + histograms := snapshot.Histograms() + if !assert.Contains(t, histograms, key, "No such metric: %s", key) { + return + } + + hist := histograms[key] + + actualValues := hist.Values() + + // filter zero values + for k, v := range actualValues { + if v == 0 { + delete(actualValues, k) + } + } + + assert.Equal(t, values, actualValues) +} diff --git a/src/query/server/server.go b/src/query/server/server.go index 38f940feb5..4194d238fd 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -248,7 +248,12 @@ func Run(runOpts RunOptions) { defer cleanup() } - engine := executor.NewEngine(backendStorage, scope.SubScope("engine"), *cfg.LookbackDuration) + perQueryEnforcer, err := newConfiguredChainedEnforcer(&cfg, instrumentOptions) + if err != nil { + logger.Fatal("unable to setup perQueryEnforcer", zap.Error(err)) + } + + engine := executor.NewEngine(backendStorage, scope.SubScope("engine"), *cfg.LookbackDuration, perQueryEnforcer) downsamplerAndWriter, err := newDownsamplerAndWriter(backendStorage, downsampler) if err != nil { diff --git a/src/query/server/server_test.go b/src/query/server/server_test.go index cdf8b72a59..e8d2a88c33 100644 --- a/src/query/server/server_test.go +++ b/src/query/server/server_test.go @@ -35,10 +35,12 @@ import ( "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote/test" + "github.com/m3db/m3/src/query/cost" rpc "github.com/m3db/m3/src/query/generated/proto/rpcpb" "github.com/m3db/m3/src/query/storage/m3" xconfig "github.com/m3db/m3x/config" "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" xtest "github.com/m3db/m3x/test" "github.com/golang/mock/gomock" @@ -322,3 +324,55 @@ writeWorkerPoolPolicy: interruptCh <- fmt.Errorf("interrupt") <-doneCh } + +func TestNewPerQueryEnforcer(t *testing.T) { + type testContext struct { + Global cost.ChainedEnforcer + Query cost.ChainedEnforcer + Block cost.ChainedEnforcer + } + + setup := func(t *testing.T, globalLimit, queryLimit int) testContext { + cfg := &config.Configuration{ + Limits: config.LimitsConfiguration{ + Global: config.GlobalLimitsConfiguration{ + MaxFetchedDatapoints: 100, + }, + PerQuery: config.PerQueryLimitsConfiguration{ + MaxFetchedDatapoints: 10, + }, + }, + } + + global, err := newConfiguredChainedEnforcer(cfg, instrument.NewOptions()) + require.NoError(t, err) + + queryLvl := global.Child(cost.QueryLevel) + blockLvl := queryLvl.Child(cost.BlockLevel) + + return testContext{ + Global: global, + Query: queryLvl, + Block: blockLvl, + } + } + + tctx := setup(t, 100, 10) + + // spot check that limits are setup properly for each level + r := tctx.Block.Add(11) + require.Error(t, r.Error) + + floatsEqual := func(f1, f2 float64) { + assert.InDelta(t, f1, f2, 0.0000001) + } + + floatsEqual(float64(r.Cost), 11) + + r, _ = tctx.Query.State() + floatsEqual(float64(r.Cost), 11) + + r, _ = tctx.Global.State() + floatsEqual(float64(r.Cost), 11) + require.NoError(t, r.Error) +} diff --git a/src/query/storage/block.go b/src/query/storage/block.go index 140e483275..ed6f830824 100644 --- a/src/query/storage/block.go +++ b/src/query/storage/block.go @@ -26,19 +26,21 @@ import ( "time" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/ts" ) // FetchResultToBlockResult converts a fetch result into coordinator blocks -func FetchResultToBlockResult(result *FetchResult, query *FetchQuery, lookbackDuration time.Duration) (block.Result, error) { +func FetchResultToBlockResult(result *FetchResult, query *FetchQuery, lookbackDuration time.Duration, enforcer cost.ChainedEnforcer) (block.Result, error) { multiBlock, err := NewMultiSeriesBlock(result.SeriesList, query, lookbackDuration) if err != nil { return block.Result{}, err } + accountedBlock := block.NewAccountedBlock(NewMultiBlockWrapper(multiBlock), enforcer) return block.Result{ - Blocks: []block.Block{NewMultiBlockWrapper(multiBlock)}, + Blocks: []block.Block{accountedBlock}, }, nil } diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index 5cf11d0f80..9fb0697733 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -279,6 +279,10 @@ func iteratorToTsSeries( datapoints = append(datapoints, ts.Datapoint{Timestamp: dp.Timestamp, Value: dp.Value}) } + if err := iter.Err(); err != nil { + return nil, err + } + return ts.NewSeries(metric.ID, datapoints, metric.Tags), nil } diff --git a/src/query/storage/converter_test.go b/src/query/storage/converter_test.go index 72da7001e6..104954c890 100644 --- a/src/query/storage/converter_test.go +++ b/src/query/storage/converter_test.go @@ -126,12 +126,8 @@ func TestFailingExpandSeriesValidPools(t *testing.T) { require.NoError(t, err) pool.Init() ctrl := gomock.NewController(t) - testTags := seriesiter.GenerateTag() - validTagGenerator := func() ident.TagIterator { - return seriesiter.GenerateSingleSampleTagIterator(ctrl, testTags) - } - iters := seriesiter.NewMockSeriesIterSlice(ctrl, validTagGenerator, numValidSeries, numValues) + iters := seriesiter.NewMockSeriesIterSlice(ctrl, seriesiter.NewMockValidTagGenerator(ctrl), numValidSeries, numValues) // Add poolSize + 1 failing iterators; there can be slight timing // inconsistencies which can sometimes cause failures in this test // as one of the `uncalled` iterators gets unexpectedly used. @@ -307,3 +303,20 @@ func BenchmarkFetchResultToPromResult(b *testing.B) { benchResult = FetchResultToPromResult(fr) } } + +func TestIteratorToTsSeries(t *testing.T) { + t.Run("errors on iterator error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockIter := encoding.NewMockSeriesIterator(ctrl) + + expectedErr := errors.New("expected") + mockIter.EXPECT().Err().Return(expectedErr) + + mockIter = seriesiter.NewMockSeriesIteratorFromBase(mockIter, seriesiter.NewMockValidTagGenerator(ctrl), 1) + + dps, err := iteratorToTsSeries(mockIter, models.NewTagOptions()) + + assert.Nil(t, dps) + assert.EqualError(t, err, expectedErr.Error()) + }) +} diff --git a/src/query/storage/m3/accounted_series_iter.go b/src/query/storage/m3/accounted_series_iter.go new file mode 100644 index 0000000000..67bee4913c --- /dev/null +++ b/src/query/storage/m3/accounted_series_iter.go @@ -0,0 +1,89 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3 + +import ( + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/query/cost" + + "github.com/uber-go/tally" +) + +// AccountedSeriesIter wraps a series iterator to track and enforce limits on datapoint usage. Datapoint usage +// is tracked on each call to Next(). +type AccountedSeriesIter struct { + encoding.SeriesIterator + + scope tally.Scope + fetchedDps tally.Counter + enforcer cost.ChainedEnforcer + enforcerErr error +} + +// NewAccountedSeriesIter constructs an AccountedSeriesIter which uses wrapped as its source. +func NewAccountedSeriesIter(wrapped encoding.SeriesIterator, enforcer cost.ChainedEnforcer, scope tally.Scope) *AccountedSeriesIter { + return &AccountedSeriesIter{ + SeriesIterator: wrapped, + enforcer: enforcer, + scope: scope, + fetchedDps: scope.Tagged(map[string]string{"type": "fetched"}).Counter("datapoints"), + } +} + +// Err returns the underlying iterator's error if present, or any limit exceeded error. +func (as *AccountedSeriesIter) Err() error { + if err := as.SeriesIterator.Err(); err != nil { + return err + } + return as.enforcerErr +} + +// Next advances the underlying iterator and adds to the datapoint count. If that count exceeds the limit, it will +// set this iterator's error. +func (as *AccountedSeriesIter) Next() bool { + if as.enforcerErr != nil { + return false + } + + hasNext := as.SeriesIterator.Next() + + if !hasNext { + // nothing left; don't inform the enforcer + return false + } + + as.fetchedDps.Inc(1) + // we actually advanced the iterator; inform the enforcer + r := as.enforcer.Add(1.0) + + if err := r.Error; err != nil { + as.enforcerErr = err + return false + } + + return hasNext +} + +// Close closes the underlying iterator, and marks datapoints as released to our enforcer. +func (as *AccountedSeriesIter) Close() { + as.SeriesIterator.Close() + as.enforcer.Close() +} diff --git a/src/query/storage/m3/accounted_series_iter_test.go b/src/query/storage/m3/accounted_series_iter_test.go new file mode 100644 index 0000000000..6517d82aca --- /dev/null +++ b/src/query/storage/m3/accounted_series_iter_test.go @@ -0,0 +1,150 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3 + +import ( + "errors" + "testing" + + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/ts" + qcost "github.com/m3db/m3/src/query/cost" + "github.com/m3db/m3/src/query/test/seriesiter" + "github.com/m3db/m3/src/x/cost" + "github.com/m3db/m3/src/x/cost/test" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" +) + +func newTestEnforcer(limit cost.Cost) qcost.ChainedEnforcer { + limitObj := cost.Limit{Threshold: limit, Enabled: true} + rtn, err := qcost.NewChainedEnforcer("block", []cost.Enforcer{cost.NewEnforcer( + cost.NewStaticLimitManager(cost.NewLimitManagerOptions().SetDefaultLimit(limitObj)), + + cost.NewTracker(), + nil, + )}) + if err != nil { + panic(err.Error()) + } + + return rtn +} + +type accountedSeriesIterSetup struct { + Ctrl *gomock.Controller + Enforcer qcost.ChainedEnforcer + Iter *AccountedSeriesIter +} + +func setupAccountedSeriesIter(t *testing.T, numValues int, limit cost.Cost) *accountedSeriesIterSetup { + ctrl := gomock.NewController(t) + enforcer := newTestEnforcer(limit) + + mockWrappedIter := seriesiter.NewMockSeriesIterator(ctrl, seriesiter.NewMockValidTagGenerator(ctrl), numValues) + return &accountedSeriesIterSetup{ + Ctrl: ctrl, + Enforcer: enforcer, + Iter: NewAccountedSeriesIter(mockWrappedIter, enforcer, tally.NoopScope), + } +} + +func TestAccountedSeriesIter_Next(t *testing.T) { + t.Run("adds to enforcer", func(t *testing.T) { + setup := setupAccountedSeriesIter(t, 5, 5) + setup.Iter.Next() + test.AssertCurrentCost(t, 1, setup.Enforcer) + }) + + t.Run("returns all values", func(t *testing.T) { + setup := setupAccountedSeriesIter(t, 5, 6) + + values := make([]ts.Datapoint, 0) + require.Len(t, values, 0) // I don't trust myself :D + for setup.Iter.Next() { + d, _, _ := setup.Iter.Current() + values = append(values, d) + } + + assert.NoError(t, setup.Iter.Err()) + assert.Len(t, values, 5) + for _, d := range values { + assert.NotEmpty(t, d) + } + }) + + t.Run("sets error on enforcer error", func(t *testing.T) { + setup := setupAccountedSeriesIter(t, 5, 2) + + iter := setup.Iter + assert.True(t, iter.Next()) + require.NoError(t, iter.Err()) + + assert.False(t, iter.Next()) + test.AssertLimitErrorWithMsg(t, iter.Err(), "exceeded block limit", 2, 2) + }) + + t.Run("delegates on wrapped error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockIter := mockSeriesIterWithErr(ctrl) + iter := NewAccountedSeriesIter(mockIter, newTestEnforcer(5), tally.NoopScope) + + assert.True(t, iter.Next(), "the wrapped iterator returns true, so the AcccountedSeriesIterator should return true") + }) +} + +func mockSeriesIterWithErr(ctrl *gomock.Controller) *encoding.MockSeriesIterator { + mockIter := encoding.NewMockSeriesIterator(ctrl) + mockIter.EXPECT().Err().Return(errors.New("test error")) + return seriesiter.NewMockSeriesIteratorFromBase(mockIter, seriesiter.NewMockValidTagGenerator(ctrl), 5) +} + +func TestAccountedSeriesIter_Err(t *testing.T) { + t.Run("returns wrapped error over enforcer error", func(t *testing.T) { + ctrl := gomock.NewController(t) + iter := NewAccountedSeriesIter(mockSeriesIterWithErr(ctrl), newTestEnforcer(1), tally.NoopScope) + iter.Next() + assert.EqualError(t, iter.Err(), "test error") + }) + + t.Run("returns enforcer error", func(t *testing.T) { + setup := setupAccountedSeriesIter(t, 3, 1) + setup.Iter.Next() + + test.AssertLimitErrorWithMsg(t, setup.Iter.Err(), "exceeded block limit", 1, 1) + }) +} + +func TestAccountedSeriesIter_Close(t *testing.T) { + t.Run("releases enforcer and closes underlying iter", func(t *testing.T) { + setup := setupAccountedSeriesIter(t, 3, 5) + assert.True(t, setup.Iter.Next()) + require.NoError(t, setup.Iter.Err()) + + test.AssertCurrentCost(t, 1, setup.Enforcer) + setup.Iter.Close() + + test.AssertCurrentCost(t, 0, setup.Enforcer) + }) +} diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index 5627447e30..de6a10a2fe 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/errors" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" @@ -141,7 +142,7 @@ func (s *m3storage) FetchBlocks( return block.Result{}, err } - return storage.FetchResultToBlockResult(fetchResult, query, s.opts.LookbackDuration()) + return storage.FetchResultToBlockResult(fetchResult, query, s.opts.LookbackDuration(), options.Enforcer) } // If using multiblock, update options to reflect this. @@ -161,7 +162,26 @@ func (s *m3storage) FetchBlocks( StepSize: query.Interval, } - blocks, err := m3db.ConvertM3DBSeriesIterators(raw, bounds, opts) + enforcer := options.Enforcer + if enforcer == nil { + enforcer = cost.NoopChainedEnforcer() + } + + // TODO: mutating this array breaks the abstraction a bit, but it's the least fussy way I can think of to do this + // while maintaining the original pooling. + // Alternative would be to fetch a new MutableSeriesIterators() instance from the pool, populate it, + // and then return the original to the pool, which feels wasteful. + iters := raw.Iters() + for i, iter := range iters { + iters[i] = NewAccountedSeriesIter(iter, enforcer, options.Scope) + } + + blocks, err := m3db.ConvertM3DBSeriesIterators( + raw, + bounds, + opts, + ) + if err != nil { return block.Result{}, err } diff --git a/src/query/storage/types.go b/src/query/storage/types.go index dd01d8d34b..e117ec96d5 100644 --- a/src/query/storage/types.go +++ b/src/query/storage/types.go @@ -26,9 +26,12 @@ import ( "time" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/ts" xtime "github.com/m3db/m3x/time" + + "github.com/uber-go/tally" ) // Type describes the type of storage @@ -86,6 +89,11 @@ type FetchOptions struct { BlockType models.FetchedBlockType // FanoutOptions are the options for the fetch namespace fanout. FanoutOptions *FanoutOptions + // Enforcer is used to enforce resource limits on the number of datapoints + // used by a given query. Limits are imposed at time of decompression. + Enforcer cost.ChainedEnforcer + // Scope is used to report metrics about the fetch. + Scope tally.Scope } // FanoutOptions describes which namespaces should be fanned out to for @@ -124,6 +132,8 @@ func NewFetchOptions() *FetchOptions { FanoutAggregated: FanoutDefault, FanoutAggregatedOptimized: FanoutDefault, }, + Enforcer: cost.NoopChainedEnforcer(), + Scope: tally.NoopScope, } } diff --git a/src/query/storage/validator/storage.go b/src/query/storage/validator/storage.go index d934ed76b7..41cb3fb301 100644 --- a/src/query/storage/validator/storage.go +++ b/src/query/storage/validator/storage.go @@ -73,7 +73,7 @@ func (s *debugStorage) FetchBlocks( return block.Result{}, err } - return storage.FetchResultToBlockResult(fetchResult, query, s.lookbackDuration) + return storage.FetchResultToBlockResult(fetchResult, query, s.lookbackDuration, options.Enforcer) } // PromResultToSeriesList converts a prom result to a series list diff --git a/src/query/test/block.go b/src/query/test/block.go index 5ddace92f0..21ed930410 100644 --- a/src/query/test/block.go +++ b/src/query/test/block.go @@ -130,7 +130,7 @@ func NewBlockFromValuesWithMetaAndSeriesMeta( seriesMeta []block.SeriesMeta, seriesValues [][]float64, ) block.Block { - columnBuilder := block.NewColumnBlockBuilder(meta, seriesMeta) + columnBuilder := block.NewColumnBlockBuilder(models.NoopQueryContext(), meta, seriesMeta) if err := columnBuilder.AddCols(len(seriesValues[0])); err != nil { panic(err) diff --git a/src/query/test/seriesiter/mock_iter.go b/src/query/test/seriesiter/mock_iter.go index adb04f7944..3e35e63c32 100644 --- a/src/query/test/seriesiter/mock_iter.go +++ b/src/query/test/seriesiter/mock_iter.go @@ -62,28 +62,40 @@ func NewMockSeriesIterSlice( ) []encoding.SeriesIterator { iteratorList := make([]encoding.SeriesIterator, 0, len) for i := 0; i < len; i++ { - mockIter := encoding.NewMockSeriesIterator(ctrl) - mockIter.EXPECT().Next().Return(true).MaxTimes(numValues) - mockIter.EXPECT().Next().Return(false).MaxTimes(1) - now := time.Now() - for i := 0; i < numValues; i++ { - mockIter.EXPECT().Current().Return(m3ts.Datapoint{Timestamp: now.Add(time.Duration(i*10) * time.Second), Value: float64(i)}, xtime.Millisecond, nil).MaxTimes(1) - } - - tags := tagGenerator() - mockIter.EXPECT().Namespace().Return(ident.StringID("foo")).AnyTimes() - mockIter.EXPECT().ID().Return(ident.StringID("bar")).AnyTimes() - mockIter.EXPECT().Tags().Return(tags).MaxTimes(1) - mockIter.EXPECT().Close().Do(func() { - // Make sure to close the tags generated when closing the iter - tags.Close() - }).AnyTimes() - + mockIter := NewMockSeriesIterator(ctrl, tagGenerator, numValues) iteratorList = append(iteratorList, mockIter) } return iteratorList } +// NewMockSeriesIterator constructs a MockSeriesIterator return numValues datapoints, using tagGenerator, and otherwise +// configured with sensible defaults. +func NewMockSeriesIterator(ctrl *gomock.Controller, tagGenerator func() ident.TagIterator, numValues int) *encoding.MockSeriesIterator { + return NewMockSeriesIteratorFromBase(encoding.NewMockSeriesIterator(ctrl), tagGenerator, numValues) +} + +// NewMockSeriesIteratorFromBase constructs a MockSeriesIterator return numValues datapoints, using tagGenerator, and otherwise +// configured with sensible defaults. Any expectations already set on mockIter will be respected. +func NewMockSeriesIteratorFromBase(mockIter *encoding.MockSeriesIterator, tagGenerator func() ident.TagIterator, numValues int) *encoding.MockSeriesIterator { + mockIter.EXPECT().Next().Return(true).MaxTimes(numValues) + mockIter.EXPECT().Next().Return(false).MaxTimes(1) + now := time.Now() + for i := 0; i < numValues; i++ { + mockIter.EXPECT().Current().Return(m3ts.Datapoint{Timestamp: now.Add(time.Duration(i*10) * time.Second), Value: float64(i)}, xtime.Millisecond, nil).MaxTimes(1) + } + + tags := tagGenerator() + mockIter.EXPECT().Namespace().Return(ident.StringID("foo")).AnyTimes() + mockIter.EXPECT().ID().Return(ident.StringID("bar")).AnyTimes() + mockIter.EXPECT().Tags().Return(tags).MaxTimes(1) + mockIter.EXPECT().Close().Do(func() { + // Make sure to close the tags generated when closing the iter + tags.Close() + }).AnyTimes() + mockIter.EXPECT().Err().Return(nil).AnyTimes() + return mockIter +} + // NewMockSeriesIters generates a new mock series iters func NewMockSeriesIters( ctrl *gomock.Controller, @@ -110,3 +122,10 @@ func NewMockSeriesIters( return mockIters } + +// NewMockValidTagGenerator wraps around GenerateSimpleTagIterator to construct a default TagIterator function. +func NewMockValidTagGenerator(ctrl *gomock.Controller) func() ident.TagIterator { + return func() ident.TagIterator { + return GenerateSingleSampleTagIterator(ctrl, GenerateTag()) + } +} diff --git a/src/query/ts/m3db/convert_test.go b/src/query/ts/m3db/convert_test.go index 287cda3d55..c80bf763a0 100644 --- a/src/query/ts/m3db/convert_test.go +++ b/src/query/ts/m3db/convert_test.go @@ -45,7 +45,7 @@ func generateIterators( t *testing.T, stepSize time.Duration, ) ( - encoding.SeriesIterators, + []encoding.SeriesIterator, models.Bounds, ) { datapoints := [][][]test.Datapoint{ @@ -110,7 +110,7 @@ func generateIterators( iters[i] = iter } - return encoding.NewSeriesIterators(iters, nil), bounds + return iters, bounds } func buildCustomIterator( @@ -175,7 +175,7 @@ func generateBlocks( ) ([]block.Block, models.Bounds) { iterators, bounds := generateIterators(t, stepSize) blocks, err := ConvertM3DBSeriesIterators( - iterators, + encoding.NewSeriesIterators(iterators, nil), bounds, opts, ) diff --git a/src/query/tsdb/remote/client.go b/src/query/tsdb/remote/client.go index c56cc55847..f864443d6f 100644 --- a/src/query/tsdb/remote/client.go +++ b/src/query/tsdb/remote/client.go @@ -192,7 +192,7 @@ func (c *grpcClient) FetchBlocks( return block.Result{}, err } - res, err := storage.FetchResultToBlockResult(fetchResult, query, c.lookbackDuration) + res, err := storage.FetchResultToBlockResult(fetchResult, query, c.lookbackDuration, options.Enforcer) if err != nil { return block.Result{}, err } diff --git a/src/x/cost/enforcer.go b/src/x/cost/enforcer.go index 3bafecc01d..5667735632 100644 --- a/src/x/cost/enforcer.go +++ b/src/x/cost/enforcer.go @@ -22,13 +22,10 @@ package cost import ( "fmt" - - "github.com/uber-go/tally" ) const ( - defaultCostExceededErrorFmt = "%s exceeds limit of %s" - customCostExceededErrorFmt = "%s exceeds limit of %s: %s" + defaultCostExceededErrorFmt = "limit reached (current = %v, limit = %v)" ) var ( @@ -55,7 +52,7 @@ type enforcer struct { tracker Tracker costMsg string - metrics enforcerMetrics + metrics EnforcerReporter } // NewEnforcer returns a new enforcer for cost limits. @@ -64,28 +61,36 @@ func NewEnforcer(m LimitManager, t Tracker, opts EnforcerOptions) Enforcer { opts = NewEnforcerOptions() } + reporter := opts.Reporter() + if reporter == nil { + reporter = NoopEnforcerReporter() + } + return &enforcer{ LimitManager: m, tracker: t, costMsg: opts.CostExceededMessage(), - metrics: newEnforcerMetrics(opts.InstrumentOptions().MetricsScope(), opts.ValueBuckets()), + metrics: reporter, } } +func (e *enforcer) Reporter() EnforcerReporter { + return e.metrics +} + // Add adds the cost of an operation to the enforcer's current total. If the operation exceeds // the enforcer's limit the enforcer will return a CostLimit error in addition to the new total. func (e *enforcer) Add(cost Cost) Report { + e.metrics.ReportCost(cost) current := e.tracker.Add(cost) + e.metrics.ReportCurrent(current) limit := e.Limit() overLimit := e.checkLimit(current, limit) if overLimit != nil { // Emit metrics on number of operations that are over the limit even when not enabled. - e.metrics.overLimit.Inc(1) - if limit.Enabled { - e.metrics.overLimitAndEnabled.Inc(1) - } + e.metrics.ReportOverLimit(limit.Enabled) } return Report{ @@ -122,27 +127,35 @@ func (e *enforcer) checkLimit(cost Cost, limit Limit) error { return nil } - if e.costMsg == "" { - return defaultCostExceededError(cost, limit) - } - return costExceededError(e.costMsg, cost, limit) + return NewCostExceededError(e.costMsg, cost, limit.Threshold) +} + +type costExceededError struct { + Threshold Cost + Current Cost + CustomMsg string } -func defaultCostExceededError(cost Cost, limit Limit) error { - return fmt.Errorf( +func (ce costExceededError) Error() string { + baseMsg := fmt.Sprintf( defaultCostExceededErrorFmt, - fmt.Sprintf("%v", float64(cost)), - fmt.Sprintf("%v", float64(limit.Threshold)), + float64(ce.Current), + float64(ce.Threshold), ) + if ce.CustomMsg == "" { + return baseMsg + } + + return fmt.Sprintf("%s: %s", ce.CustomMsg, baseMsg) } -func costExceededError(customMessage string, cost Cost, limit Limit) error { - return fmt.Errorf( - customCostExceededErrorFmt, - fmt.Sprintf("%v", float64(cost)), - fmt.Sprintf("%v", float64(limit.Threshold)), - customMessage, - ) +// NewCostExceededError returns an error for going over an Enforcer's limit. +func NewCostExceededError(customMessage string, cost Cost, threshold Cost) error { + return costExceededError{ + CustomMsg: customMessage, + Current: cost, + Threshold: threshold, + } } // NoopEnforcer returns a new enforcer that always returns a current cost of 0 and @@ -151,14 +164,13 @@ func NoopEnforcer() Enforcer { return noopEnforcer } -type enforcerMetrics struct { - overLimit tally.Counter - overLimitAndEnabled tally.Counter -} +type noopEnforcerReporter struct{} -func newEnforcerMetrics(s tally.Scope, b tally.ValueBuckets) enforcerMetrics { - return enforcerMetrics{ - overLimit: s.Counter("over-limit"), - overLimitAndEnabled: s.Counter("over-limit-and-enabled"), - } +func (noopEnforcerReporter) ReportCost(c Cost) {} +func (noopEnforcerReporter) ReportCurrent(c Cost) {} +func (noopEnforcerReporter) ReportOverLimit(enabled bool) {} + +// NoopEnforcerReporter returns an EnforcerReporter which does nothing on all events. +func NoopEnforcerReporter() EnforcerReporter { + return noopEnforcerReporter{} } diff --git a/src/x/cost/enforcer_test.go b/src/x/cost/enforcer_test.go index 22cbdf4311..27b9a9aa1f 100644 --- a/src/x/cost/enforcer_test.go +++ b/src/x/cost/enforcer_test.go @@ -86,7 +86,7 @@ func TestEnforcer(t *testing.T) { require.Equal(t, test.expected, report.Cost) if test.exceededThreshold { - require.EqualError(t, report.Error, costExceededError(msg, 13, Limit{Threshold: 10}).Error()) + require.EqualError(t, report.Error, NewCostExceededError(msg, 13, 10).Error()) } else { require.NoError(t, report.Error) } @@ -98,10 +98,10 @@ func TestEnforcer(t *testing.T) { require.Equal(t, Cost(13), report.Cost) require.Equal(t, Cost(10), limit.Threshold) require.True(t, limit.Enabled) - require.EqualError(t, report.Error, costExceededError(msg, 13, Limit{Threshold: 10}).Error()) + require.EqualError(t, report.Error, NewCostExceededError(msg, 13, 10).Error()) - // The error message should end with the message provided in the options. - require.True(t, strings.HasSuffix(report.Error.Error(), msg)) + // The error message should start with the message provided in the options. + require.True(t, strings.HasPrefix(report.Error.Error(), msg)) // When the threshold is raised, any new operations that stay below it should be legal again. store.Set(testThresholdKey, &commonpb.Float64Proto{Value: float64(20)}) @@ -120,7 +120,7 @@ func TestEnforcer(t *testing.T) { report = e.Add(Cost(5)) require.NoError(t, err) require.NoError(t, err) - require.EqualError(t, report.Error, costExceededError(msg, 21, Limit{Threshold: 20}).Error()) + require.EqualError(t, report.Error, NewCostExceededError(msg, 21, 20).Error()) require.Equal(t, Cost(21), report.Cost) // When the enforcer is disabled any input above the threshold should become legal. @@ -145,6 +145,22 @@ func TestEnforcer(t *testing.T) { require.NoError(t, report.Error) } +func TestNewCostExceedError(t *testing.T) { + t.Run("with custom error message", func(t *testing.T) { + assert.EqualError( + t, + NewCostExceededError("custom", 1, 2), + "custom: limit reached (current = 1, limit = 2)") + }) + + t.Run("with default message", func(t *testing.T) { + assert.EqualError( + t, + NewCostExceededError("", 1, 2), + "limit reached (current = 1, limit = 2)") + }) +} + func TestEnforcerClone(t *testing.T) { store := mem.NewStore() threshold := Cost(30) diff --git a/src/x/cost/options.go b/src/x/cost/options.go index f3fb59d3be..8131a42901 100644 --- a/src/x/cost/options.go +++ b/src/x/cost/options.go @@ -97,31 +97,25 @@ func (o *limitManagerOptions) InstrumentOptions() instrument.Options { // EnforcerOptions provides a set of options for an enforcer. type EnforcerOptions interface { - // SetCostExceededMessage sets the message appended to cost limit errors to provide - // more context on the cost limit that was exceeded. + // Reporter is the reporter which will be used on Enforcer events. + Reporter() EnforcerReporter + + // SetReporter sets Reporter() + SetReporter(r EnforcerReporter) EnforcerOptions + + // SetCostExceededMessage sets CostExceededMessage SetCostExceededMessage(val string) EnforcerOptions // CostExceededMessage returns the message appended to cost limit errors to provide // more context on the cost limit that was exceeded. CostExceededMessage() string - - // SetValueBuckets sets the tally buckets used in histogram metrics. - SetValueBuckets(val tally.ValueBuckets) EnforcerOptions - - // ValueBuckets returns the tally buckets used in histogram metrics. - ValueBuckets() tally.ValueBuckets - - // SetInstrumentOptions sets the instrument options. - SetInstrumentOptions(val instrument.Options) EnforcerOptions - - // InstrumentOptions returns the instrument options. - InstrumentOptions() instrument.Options } type enforcerOptions struct { - msg string - buckets tally.ValueBuckets - iOpts instrument.Options + msg string + buckets tally.ValueBuckets + reporter EnforcerReporter + iOpts instrument.Options } // NewEnforcerOptions returns a new set of enforcer options. @@ -132,6 +126,16 @@ func NewEnforcerOptions() EnforcerOptions { } } +func (o *enforcerOptions) Reporter() EnforcerReporter { + return o.reporter +} + +func (o *enforcerOptions) SetReporter(r EnforcerReporter) EnforcerOptions { + opts := *o + opts.reporter = r + return &opts +} + func (o *enforcerOptions) SetCostExceededMessage(val string) EnforcerOptions { opts := *o opts.msg = val diff --git a/src/x/cost/test/assert.go b/src/x/cost/test/assert.go new file mode 100644 index 0000000000..b7c4ac40e3 --- /dev/null +++ b/src/x/cost/test/assert.go @@ -0,0 +1,45 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package test contains testing utilities for the cost package. +package test + +import ( + "github.com/m3db/m3/src/x/cost" + + "github.com/stretchr/testify/assert" +) + +// AssertCurrentCost is a helper assertion to check that an enforcer has the +// given cost. +func AssertCurrentCost(t assert.TestingT, expectedCost cost.Cost, ef cost.Enforcer) { + actual, _ := ef.State() + assert.Equal(t, expectedCost, actual.Cost) +} + +// AssertLimitError checks that err is a limit error with the given parameters +func AssertLimitError(t assert.TestingT, err error, current, threshold cost.Cost) { + AssertLimitErrorWithMsg(t, err, "", current, threshold) +} + +// AssertLimitErrorWithMsg checks that err is a limit error with the given parameters +func AssertLimitErrorWithMsg(t assert.TestingT, err error, msg string, current, threshold cost.Cost) { + assert.EqualError(t, err, cost.NewCostExceededError(msg, current, threshold).Error()) +} diff --git a/src/x/cost/types.go b/src/x/cost/types.go index 829f5219d6..0e103d4210 100644 --- a/src/x/cost/types.go +++ b/src/x/cost/types.go @@ -68,4 +68,18 @@ type Enforcer interface { State() (Report, Limit) Limit() Limit Clone() Enforcer + Reporter() EnforcerReporter +} + +// An EnforcerReporter is a listener for Enforcer events. +type EnforcerReporter interface { + // ReportCost is called on every call to Enforcer#Add with the added cost + ReportCost(c Cost) + + // ReportCurrent reports the current total on every call to Enforcer#Add + ReportCurrent(c Cost) + + // ReportOverLimit is called every time an enforcer goes over its limit. enabled is true if the limit manager + // says the limit is currently enabled. + ReportOverLimit(enabled bool) } From fe10d6bce2356f7e735bbe2169ce2b57715a1b33 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Wed, 13 Mar 2019 10:38:36 -0400 Subject: [PATCH 2/4] [query] Add cost enforcer to graphite queries --- src/query/api/v1/handler/graphite/render.go | 4 +- .../api/v1/handler/graphite/render_test.go | 10 +- src/query/api/v1/httpd/handler.go | 6 +- src/query/api/v1/httpd/handler_test.go | 13 ++- src/query/graphite/storage/m3_wrapper.go | 16 ++- src/query/graphite/storage/m3_wrapper_test.go | 10 +- src/query/server/server.go | 2 +- src/query/storage/converter.go | 38 ++++++- src/query/storage/converter_test.go | 101 ++++++++++++++---- src/query/storage/m3/storage.go | 6 ++ src/query/storage/mock/storage.go | 17 ++- src/query/tsdb/remote/client.go | 15 ++- 12 files changed, 191 insertions(+), 47 deletions(-) diff --git a/src/query/api/v1/handler/graphite/render.go b/src/query/api/v1/handler/graphite/render.go index b87d6d4f4f..2596eb033e 100644 --- a/src/query/api/v1/handler/graphite/render.go +++ b/src/query/api/v1/handler/graphite/render.go @@ -29,6 +29,7 @@ import ( "sync" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/graphite/errors" "github.com/m3db/m3/src/query/graphite/native" @@ -62,8 +63,9 @@ type respError struct { // NewRenderHandler returns a new render handler around the given storage. func NewRenderHandler( storage storage.Storage, + enforcer cost.ChainedEnforcer, ) http.Handler { - wrappedStore := graphite.NewM3WrappedStorage(storage) + wrappedStore := graphite.NewM3WrappedStorage(storage, enforcer) return &renderHandler{ engine: native.NewEngine(wrappedStore), } diff --git a/src/query/api/v1/handler/graphite/render_test.go b/src/query/api/v1/handler/graphite/render_test.go index f35bfe5d9b..83cbc4fe9c 100644 --- a/src/query/api/v1/handler/graphite/render_test.go +++ b/src/query/api/v1/handler/graphite/render_test.go @@ -39,7 +39,7 @@ import ( func TestParseNoQuery(t *testing.T) { mockStorage := mock.NewMockStorage() - handler := NewRenderHandler(mockStorage) + handler := NewRenderHandler(mockStorage, nil) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, newGraphiteReadHTTPRequest(t)) @@ -51,7 +51,7 @@ func TestParseNoQuery(t *testing.T) { func TestParseQueryNoResults(t *testing.T) { mockStorage := mock.NewMockStorage() mockStorage.SetFetchResult(&storage.FetchResult{}, nil) - handler := NewRenderHandler(mockStorage) + handler := NewRenderHandler(mockStorage, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = "target=foo.bar&from=-2h&until=now" @@ -82,7 +82,7 @@ func TestParseQueryResults(t *testing.T) { } mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil) - handler := NewRenderHandler(mockStorage) + handler := NewRenderHandler(mockStorage, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = fmt.Sprintf("target=foo.bar&from=%d&until=%d", @@ -123,7 +123,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) { } mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil) - handler := NewRenderHandler(mockStorage) + handler := NewRenderHandler(mockStorage, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = "target=foo.bar&from=" + startStr + "&until=" + endStr + "&maxDataPoints=1" @@ -158,7 +158,7 @@ func TestParseQueryResultsMultiTarget(t *testing.T) { } mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil) - handler := NewRenderHandler(mockStorage) + handler := NewRenderHandler(mockStorage, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = fmt.Sprintf("target=foo.bar&target=baz.qux&from=%d&until=%d", diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 925bee6d29..5a172e60b0 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -44,6 +44,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/validator" "github.com/m3db/m3/src/query/api/v1/handler/topic" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" @@ -85,6 +86,7 @@ type Handler struct { createdAt time.Time tagOptions models.TagOptions timeoutOpts *prometheus.TimeoutOpts + enforcer cost.ChainedEnforcer } // Router returns the http handler registered with all relevant routes for query. @@ -101,6 +103,7 @@ func NewHandler( clusterClient clusterclient.Client, cfg config.Configuration, embeddedDbCfg *dbconfig.DBConfiguration, + enforcer cost.ChainedEnforcer, scope tally.Scope, ) (*Handler, error) { r := mux.NewRouter() @@ -132,6 +135,7 @@ func NewHandler( createdAt: time.Now(), tagOptions: tagOptions, timeoutOpts: timeoutOpts, + enforcer: enforcer, } return h, nil } @@ -224,7 +228,7 @@ func (h *Handler) RegisterRoutes() error { // Graphite endpoints h.router.HandleFunc(graphite.ReadURL, - wrapped(graphite.NewRenderHandler(h.storage)).ServeHTTP, + wrapped(graphite.NewRenderHandler(h.storage, h.enforcer)).ServeHTTP, ).Methods(graphite.ReadHTTPMethods...) h.router.HandleFunc(graphite.FindURL, diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index b077779770..d85e62852a 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -71,6 +71,7 @@ func setupHandler(store storage.Storage) (*Handler, error) { nil, config.Configuration{LookbackDuration: &defaultLookbackDuration}, nil, + nil, tally.NewTestScope("", nil)) } @@ -83,8 +84,10 @@ func TestHandlerFetchTimeoutError(t *testing.T) { negValue := -1 * time.Second dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: &negValue}} - _, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil), nil, nil, - config.Configuration{LookbackDuration: &defaultLookbackDuration}, dbconfig, tally.NewTestScope("", nil)) + engine := executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil) + cfg := config.Configuration{LookbackDuration: &defaultLookbackDuration} + _, err := NewHandler(downsamplerAndWriter, makeTagOptions(), engine, nil, nil, + cfg, dbconfig, nil, tally.NewTestScope("", nil)) require.Error(t, err) } @@ -97,8 +100,10 @@ func TestHandlerFetchTimeout(t *testing.T) { fourMin := 4 * time.Minute dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: &fourMin}} - h, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil), nil, nil, - config.Configuration{LookbackDuration: &defaultLookbackDuration}, dbconfig, tally.NewTestScope("", nil)) + engine := executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil) + cfg := config.Configuration{LookbackDuration: &defaultLookbackDuration} + h, err := NewHandler(downsamplerAndWriter, makeTagOptions(), engine, + nil, nil, cfg, dbconfig, nil, tally.NewTestScope("", nil)) require.NoError(t, err) assert.Equal(t, 4*time.Minute, h.timeoutOpts.FetchTimeout) } diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index b93df4a60a..929547392e 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -25,6 +25,7 @@ import ( "errors" "time" + "github.com/m3db/m3/src/query/cost" xctx "github.com/m3db/m3/src/query/graphite/context" "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/graphite/ts" @@ -38,13 +39,21 @@ var ( ) type m3WrappedStore struct { - m3 storage.Storage + m3 storage.Storage + enforcer cost.ChainedEnforcer } // NewM3WrappedStorage creates a graphite storage wrapper around an m3query // storage instance. -func NewM3WrappedStorage(m3storage storage.Storage) Storage { - return &m3WrappedStore{m3: m3storage} +func NewM3WrappedStorage( + m3storage storage.Storage, + enforcer cost.ChainedEnforcer, +) Storage { + if enforcer == nil { + enforcer = cost.NoopChainedEnforcer() + } + + return &m3WrappedStore{m3: m3storage, enforcer: enforcer} } // TranslateQueryToMatchers converts a graphite query to tag matcher pairs. @@ -130,6 +139,7 @@ func (s *m3WrappedStore) FetchByQuery( m3ctx, cancel := context.WithTimeout(ctx.RequestContext(), opts.Timeout) defer cancel() fetchOptions := storage.NewFetchOptions() + fetchOptions.Enforcer = s.enforcer fetchOptions.FanoutOptions = &storage.FanoutOptions{ FanoutUnaggregated: storage.FanoutForceDisable, FanoutAggregated: storage.FanoutDefault, diff --git a/src/query/graphite/storage/m3_wrapper_test.go b/src/query/graphite/storage/m3_wrapper_test.go index 8fd7dfe60c..02eb5e677a 100644 --- a/src/query/graphite/storage/m3_wrapper_test.go +++ b/src/query/graphite/storage/m3_wrapper_test.go @@ -26,12 +26,14 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/cost" xctx "github.com/m3db/m3/src/query/graphite/context" "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/mock" m3ts "github.com/m3db/m3/src/query/ts" + xcost "github.com/m3db/m3/src/x/cost" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -137,7 +139,10 @@ func TestFetchByQuery(t *testing.T) { } store.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil) - wrapper := NewM3WrappedStorage(store) + enforcers := []xcost.Enforcer{xcost.NewEnforcer(nil, nil, nil)} + enforcer, err := cost.NewChainedEnforcer("name", enforcers) + require.NoError(t, err) + wrapper := NewM3WrappedStorage(store, enforcer) ctx := xctx.New() ctx.SetRequestContext(context.TODO()) end := time.Now() @@ -156,4 +161,7 @@ func TestFetchByQuery(t *testing.T) { series := result.SeriesList[0] assert.Equal(t, "a", series.Name()) assert.Equal(t, []float64{3, 3, 3}, series.SafeValues()) + + // NB: ensure the fetch was called with enforcer propagated correctly + assert.Equal(t, enforcer, store.LastFetchOptions().Enforcer) } diff --git a/src/query/server/server.go b/src/query/server/server.go index 4194d238fd..a5ffffd354 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -261,7 +261,7 @@ func Run(runOpts RunOptions) { } handler, err := httpd.NewHandler(downsamplerAndWriter, tagOptions, engine, - m3dbClusters, clusterClient, cfg, runOpts.DBConfig, scope) + m3dbClusters, clusterClient, cfg, runOpts.DBConfig, perQueryEnforcer, scope) if err != nil { logger.Fatal("unable to set up handlers", zap.Error(err)) } diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index 9fb0697733..38936b6dc7 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -28,9 +28,11 @@ import ( "time" "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/ts" + xcost "github.com/m3db/m3/src/x/cost" xsync "github.com/m3db/m3x/sync" xtime "github.com/m3db/m3x/time" ) @@ -266,6 +268,7 @@ func SeriesToPromSamples(series *ts.Series) []*prompb.Sample { func iteratorToTsSeries( iter encoding.SeriesIterator, + enforcer enforcer, tagOptions models.TagOptions, ) (*ts.Series, error) { metric, err := FromM3IdentToMetric(iter.ID(), iter.Tags(), tagOptions) @@ -283,6 +286,11 @@ func iteratorToTsSeries( return nil, err } + r := enforcer.Add(xcost.Cost(len(datapoints))) + if r.Error != nil { + return nil, r.Error + } + return ts.NewSeries(metric.ID, datapoints, metric.Tags), nil } @@ -290,11 +298,12 @@ func iteratorToTsSeries( func decompressSequentially( iterLength int, iters []encoding.SeriesIterator, + enforcer enforcer, tagOptions models.TagOptions, ) (*FetchResult, error) { seriesList := make([]*ts.Series, 0, len(iters)) for _, iter := range iters { - series, err := iteratorToTsSeries(iter, tagOptions) + series, err := iteratorToTsSeries(iter, enforcer, tagOptions) if err != nil { return nil, err } @@ -310,6 +319,7 @@ func decompressConcurrently( iterLength int, iters []encoding.SeriesIterator, readWorkerPool xsync.PooledWorkerPool, + enforcer enforcer, tagOptions models.TagOptions, ) (*FetchResult, error) { seriesList := make([]*ts.Series, iterLength) @@ -336,7 +346,7 @@ func decompressConcurrently( return } - series, err := iteratorToTsSeries(iter, tagOptions) + series, err := iteratorToTsSeries(iter, enforcer, tagOptions) if err != nil { // Return the first error that is encountered. select { @@ -361,11 +371,29 @@ func decompressConcurrently( }, nil } +// Wrap enforcer in a +type enforcer interface { + Add(xcost.Cost) xcost.Report +} + +type syncEnforcer struct { + enforcer cost.ChainedEnforcer + mu sync.Mutex +} + +func (e *syncEnforcer) Add(c xcost.Cost) xcost.Report { + e.mu.Lock() + r := e.enforcer.Add(c) + e.mu.Unlock() + return r +} + // SeriesIteratorsToFetchResult converts SeriesIterators into a fetch result func SeriesIteratorsToFetchResult( seriesIterators encoding.SeriesIterators, readWorkerPool xsync.PooledWorkerPool, cleanupSeriesIters bool, + enforcer cost.ChainedEnforcer, tagOptions models.TagOptions, ) (*FetchResult, error) { if cleanupSeriesIters { @@ -375,8 +403,10 @@ func SeriesIteratorsToFetchResult( iters := seriesIterators.Iters() iterLength := seriesIterators.Len() if readWorkerPool == nil { - return decompressSequentially(iterLength, iters, tagOptions) + return decompressSequentially(iterLength, iters, enforcer, tagOptions) } - return decompressConcurrently(iterLength, iters, readWorkerPool, tagOptions) + syncedEnforcer := &syncEnforcer{enforcer: enforcer} + return decompressConcurrently(iterLength, iters, readWorkerPool, + syncedEnforcer, tagOptions) } diff --git a/src/query/storage/converter_test.go b/src/query/storage/converter_test.go index 104954c890..219230c46c 100644 --- a/src/query/storage/converter_test.go +++ b/src/query/storage/converter_test.go @@ -28,10 +28,12 @@ import ( "time" "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/test/seriesiter" "github.com/m3db/m3/src/query/ts" + xcost "github.com/m3db/m3/src/x/cost" "github.com/m3db/m3x/ident" xsync "github.com/m3db/m3x/sync" @@ -71,17 +73,25 @@ func TestLabelConversion(t *testing.T) { assert.Equal(t, labels, reverted) } -func verifyExpandSeries(t *testing.T, ctrl *gomock.Controller, num int, pools xsync.PooledWorkerPool) { +func verifyExpandSeries( + t *testing.T, + ctrl *gomock.Controller, + num int, + pools xsync.PooledWorkerPool, +) { testTags := seriesiter.GenerateTag() iters := seriesiter.NewMockSeriesIters(ctrl, testTags, num, 2) - results, err := SeriesIteratorsToFetchResult(iters, pools, true, nil) + enforcer := cost.NewMockChainedEnforcer(ctrl) + enforcer.EXPECT().Add(xcost.Cost(2)).Times(num) + results, err := SeriesIteratorsToFetchResult(iters, pools, true, enforcer, nil) assert.NoError(t, err) require.NotNil(t, results) require.NotNil(t, results.SeriesList) require.Len(t, results.SeriesList, num) - expectedTags := []models.Tag{{Name: testTags.Name.Bytes(), Value: testTags.Value.Bytes()}} + expectedTags := []models.Tag{{Name: testTags.Name.Bytes(), + Value: testTags.Value.Bytes()}} for i := 0; i < num; i++ { series := results.SeriesList[i] require.NotNil(t, series) @@ -122,12 +132,14 @@ func TestFailingExpandSeriesValidPools(t *testing.T) { poolSize = 2 numUncalled = 10 ) - pool, err := xsync.NewPooledWorkerPool(poolSize, xsync.NewPooledWorkerPoolOptions()) + pool, err := xsync.NewPooledWorkerPool(poolSize, + xsync.NewPooledWorkerPoolOptions()) require.NoError(t, err) pool.Init() ctrl := gomock.NewController(t) - iters := seriesiter.NewMockSeriesIterSlice(ctrl, seriesiter.NewMockValidTagGenerator(ctrl), numValidSeries, numValues) + iters := seriesiter.NewMockSeriesIterSlice(ctrl, + seriesiter.NewMockValidTagGenerator(ctrl), numValidSeries, numValues) // Add poolSize + 1 failing iterators; there can be slight timing // inconsistencies which can sometimes cause failures in this test // as one of the `uncalled` iterators gets unexpectedly used. @@ -155,11 +167,54 @@ func TestFailingExpandSeriesValidPools(t *testing.T) { mockIters.EXPECT().Iters().Return(iters).Times(1) mockIters.EXPECT().Len().Return(len(iters)).Times(1) mockIters.EXPECT().Close().Times(1) + enforcer := cost.NewMockChainedEnforcer(ctrl) + enforcer.EXPECT().Add(xcost.Cost(2)).Times(numValidSeries) result, err := SeriesIteratorsToFetchResult( mockIters, pool, true, + enforcer, + nil, + ) + require.Nil(t, result) + require.EqualError(t, err, "error") +} + +func TestOverLimit(t *testing.T) { + var ( + numValidSeries = 8 + numValues = 2 + poolSize = 2 + numUncalled = 10 + ) + pool, err := xsync.NewPooledWorkerPool(poolSize, + xsync.NewPooledWorkerPoolOptions()) + require.NoError(t, err) + pool.Init() + ctrl := gomock.NewController(t) + + iters := seriesiter.NewMockSeriesIterSlice(ctrl, + seriesiter.NewMockValidTagGenerator(ctrl), numValidSeries+poolSize+1, numValues) + for i := 0; i < numUncalled; i++ { + uncalledIter := encoding.NewMockSeriesIterator(ctrl) + iters = append(iters, uncalledIter) + } + + mockIters := encoding.NewMockSeriesIterators(ctrl) + mockIters.EXPECT().Iters().Return(iters).Times(1) + mockIters.EXPECT().Len().Return(len(iters)).Times(1) + mockIters.EXPECT().Close().Times(1) + enforcer := cost.NewMockChainedEnforcer(ctrl) + enforcer.EXPECT().Add(xcost.Cost(2)).Times(numValidSeries) + enforcer.EXPECT().Add(xcost.Cost(2)). + Return(xcost.Report{Error: errors.New("error")}).MinTimes(1) + + result, err := SeriesIteratorsToFetchResult( + mockIters, + pool, + true, + enforcer, nil, ) require.Nil(t, result) @@ -265,6 +320,25 @@ var ( benchResult *prompb.QueryResult ) +func TestIteratorToTsSeries(t *testing.T) { + t.Run("errors on iterator error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockIter := encoding.NewMockSeriesIterator(ctrl) + + expectedErr := errors.New("expected") + mockIter.EXPECT().Err().Return(expectedErr) + + mockIter = seriesiter.NewMockSeriesIteratorFromBase(mockIter, seriesiter.NewMockValidTagGenerator(ctrl), 1) + enforcer := cost.NewMockChainedEnforcer(ctrl) + enforcer.EXPECT().Add(xcost.Cost(2)).Times(1) + + dps, err := iteratorToTsSeries(mockIter, enforcer, models.NewTagOptions()) + + assert.Nil(t, dps) + assert.EqualError(t, err, expectedErr.Error()) + }) +} + // BenchmarkFetchResultToPromResult-8 100 10563444 ns/op 25368543 B/op 4443 allocs/op func BenchmarkFetchResultToPromResult(b *testing.B) { var ( @@ -303,20 +377,3 @@ func BenchmarkFetchResultToPromResult(b *testing.B) { benchResult = FetchResultToPromResult(fr) } } - -func TestIteratorToTsSeries(t *testing.T) { - t.Run("errors on iterator error", func(t *testing.T) { - ctrl := gomock.NewController(t) - mockIter := encoding.NewMockSeriesIterator(ctrl) - - expectedErr := errors.New("expected") - mockIter.EXPECT().Err().Return(expectedErr) - - mockIter = seriesiter.NewMockSeriesIteratorFromBase(mockIter, seriesiter.NewMockValidTagGenerator(ctrl), 1) - - dps, err := iteratorToTsSeries(mockIter, models.NewTagOptions()) - - assert.Nil(t, dps) - assert.EqualError(t, err, expectedErr.Error()) - }) -} diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index de6a10a2fe..cb4ca646e7 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -107,10 +107,16 @@ func (s *m3storage) Fetch( return nil, err } + enforcer := options.Enforcer + if enforcer == nil { + enforcer = cost.NoopChainedEnforcer() + } + fetchResult, err := storage.SeriesIteratorsToFetchResult( iters, s.readWorkerPool, false, + enforcer, s.opts.TagOptions(), ) diff --git a/src/query/storage/mock/storage.go b/src/query/storage/mock/storage.go index ed7133f3e2..1ff562c513 100644 --- a/src/query/storage/mock/storage.go +++ b/src/query/storage/mock/storage.go @@ -34,6 +34,7 @@ type Storage interface { storage.Storage SetTypeResult(storage.Type) + LastFetchOptions() *storage.FetchOptions SetFetchResult(*storage.FetchResult, error) SetFetchTagsResult(*storage.SearchResults, error) SetCompleteTagsResult(*storage.CompleteTagsResult, error) @@ -48,7 +49,8 @@ type mockStorage struct { typeResult struct { result storage.Type } - fetchResult struct { + lastFetchOptions *storage.FetchOptions + fetchResult struct { result *storage.FetchResult err error } @@ -130,13 +132,20 @@ func (s *mockStorage) Writes() []*storage.WriteQuery { return s.writes } +func (s *mockStorage) LastFetchOptions() *storage.FetchOptions { + s.RLock() + defer s.RUnlock() + return s.lastFetchOptions +} + func (s *mockStorage) Fetch( ctx context.Context, query *storage.FetchQuery, - _ *storage.FetchOptions, + opts *storage.FetchOptions, ) (*storage.FetchResult, error) { - s.RLock() - defer s.RUnlock() + s.Lock() + defer s.Unlock() + s.lastFetchOptions = opts return s.fetchResult.result, s.fetchResult.err } diff --git a/src/query/tsdb/remote/client.go b/src/query/tsdb/remote/client.go index f864443d6f..5121aee90a 100644 --- a/src/query/tsdb/remote/client.go +++ b/src/query/tsdb/remote/client.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/errors" rpc "github.com/m3db/m3/src/query/generated/proto/rpcpb" "github.com/m3db/m3/src/query/models" @@ -103,7 +104,13 @@ func (c *grpcClient) Fetch( return nil, err } - return storage.SeriesIteratorsToFetchResult(iters, c.readWorkerPool, true, c.tagOptions) + enforcer := options.Enforcer + if enforcer == nil { + enforcer = cost.NoopChainedEnforcer() + } + + return storage.SeriesIteratorsToFetchResult(iters, c.readWorkerPool, + true, enforcer, c.tagOptions) } func (c *grpcClient) waitForPools() (encoding.IteratorPools, error) { @@ -182,10 +189,16 @@ func (c *grpcClient) FetchBlocks( return block.Result{}, err } + enforcer := options.Enforcer + if enforcer == nil { + enforcer = cost.NoopChainedEnforcer() + } + fetchResult, err := storage.SeriesIteratorsToFetchResult( iters, c.readWorkerPool, true, + enforcer, c.tagOptions, ) if err != nil { From 87cce05b0076189a6b9d6445f121c7206c4a0030 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Wed, 13 Mar 2019 12:28:50 -0400 Subject: [PATCH 3/4] Removed explicitly sync'd enforcer since the base enforcer is threadsafe --- src/query/storage/converter.go | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index 38936b6dc7..c956b94a22 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -268,7 +268,7 @@ func SeriesToPromSamples(series *ts.Series) []*prompb.Sample { func iteratorToTsSeries( iter encoding.SeriesIterator, - enforcer enforcer, + enforcer cost.ChainedEnforcer, tagOptions models.TagOptions, ) (*ts.Series, error) { metric, err := FromM3IdentToMetric(iter.ID(), iter.Tags(), tagOptions) @@ -298,7 +298,7 @@ func iteratorToTsSeries( func decompressSequentially( iterLength int, iters []encoding.SeriesIterator, - enforcer enforcer, + enforcer cost.ChainedEnforcer, tagOptions models.TagOptions, ) (*FetchResult, error) { seriesList := make([]*ts.Series, 0, len(iters)) @@ -319,7 +319,7 @@ func decompressConcurrently( iterLength int, iters []encoding.SeriesIterator, readWorkerPool xsync.PooledWorkerPool, - enforcer enforcer, + enforcer cost.ChainedEnforcer, tagOptions models.TagOptions, ) (*FetchResult, error) { seriesList := make([]*ts.Series, iterLength) @@ -371,23 +371,6 @@ func decompressConcurrently( }, nil } -// Wrap enforcer in a -type enforcer interface { - Add(xcost.Cost) xcost.Report -} - -type syncEnforcer struct { - enforcer cost.ChainedEnforcer - mu sync.Mutex -} - -func (e *syncEnforcer) Add(c xcost.Cost) xcost.Report { - e.mu.Lock() - r := e.enforcer.Add(c) - e.mu.Unlock() - return r -} - // SeriesIteratorsToFetchResult converts SeriesIterators into a fetch result func SeriesIteratorsToFetchResult( seriesIterators encoding.SeriesIterators, @@ -406,7 +389,6 @@ func SeriesIteratorsToFetchResult( return decompressSequentially(iterLength, iters, enforcer, tagOptions) } - syncedEnforcer := &syncEnforcer{enforcer: enforcer} return decompressConcurrently(iterLength, iters, readWorkerPool, - syncedEnforcer, tagOptions) + enforcer, tagOptions) } From fc7a3347e16b8192353cc0c0b924957575b72243 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Wed, 13 Mar 2019 15:24:56 -0400 Subject: [PATCH 4/4] Fix bad merge in test --- src/query/storage/converter_test.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/src/query/storage/converter_test.go b/src/query/storage/converter_test.go index 21062137b4..219230c46c 100644 --- a/src/query/storage/converter_test.go +++ b/src/query/storage/converter_test.go @@ -377,20 +377,3 @@ func BenchmarkFetchResultToPromResult(b *testing.B) { benchResult = FetchResultToPromResult(fr) } } - -func TestIteratorToTsSeries(t *testing.T) { - t.Run("errors on iterator error", func(t *testing.T) { - ctrl := gomock.NewController(t) - mockIter := encoding.NewMockSeriesIterator(ctrl) - - expectedErr := errors.New("expected") - mockIter.EXPECT().Err().Return(expectedErr) - - mockIter = seriesiter.NewMockSeriesIteratorFromBase(mockIter, seriesiter.NewMockValidTagGenerator(ctrl), 1) - - dps, err := iteratorToTsSeries(mockIter, models.NewTagOptions()) - - assert.Nil(t, dps) - assert.EqualError(t, err, expectedErr.Error()) - }) -}