diff --git a/src/query/api/v1/handler/graphite/render.go b/src/query/api/v1/handler/graphite/render.go index b87d6d4f4f..2596eb033e 100644 --- a/src/query/api/v1/handler/graphite/render.go +++ b/src/query/api/v1/handler/graphite/render.go @@ -29,6 +29,7 @@ import ( "sync" "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/graphite/errors" "github.com/m3db/m3/src/query/graphite/native" @@ -62,8 +63,9 @@ type respError struct { // NewRenderHandler returns a new render handler around the given storage. func NewRenderHandler( storage storage.Storage, + enforcer cost.ChainedEnforcer, ) http.Handler { - wrappedStore := graphite.NewM3WrappedStorage(storage) + wrappedStore := graphite.NewM3WrappedStorage(storage, enforcer) return &renderHandler{ engine: native.NewEngine(wrappedStore), } diff --git a/src/query/api/v1/handler/graphite/render_test.go b/src/query/api/v1/handler/graphite/render_test.go index f35bfe5d9b..83cbc4fe9c 100644 --- a/src/query/api/v1/handler/graphite/render_test.go +++ b/src/query/api/v1/handler/graphite/render_test.go @@ -39,7 +39,7 @@ import ( func TestParseNoQuery(t *testing.T) { mockStorage := mock.NewMockStorage() - handler := NewRenderHandler(mockStorage) + handler := NewRenderHandler(mockStorage, nil) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, newGraphiteReadHTTPRequest(t)) @@ -51,7 +51,7 @@ func TestParseNoQuery(t *testing.T) { func TestParseQueryNoResults(t *testing.T) { mockStorage := mock.NewMockStorage() mockStorage.SetFetchResult(&storage.FetchResult{}, nil) - handler := NewRenderHandler(mockStorage) + handler := NewRenderHandler(mockStorage, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = "target=foo.bar&from=-2h&until=now" @@ -82,7 +82,7 @@ func TestParseQueryResults(t *testing.T) { } mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil) - handler := NewRenderHandler(mockStorage) + handler := NewRenderHandler(mockStorage, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = fmt.Sprintf("target=foo.bar&from=%d&until=%d", @@ -123,7 +123,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) { } mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil) - handler := NewRenderHandler(mockStorage) + handler := NewRenderHandler(mockStorage, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = "target=foo.bar&from=" + startStr + "&until=" + endStr + "&maxDataPoints=1" @@ -158,7 +158,7 @@ func TestParseQueryResultsMultiTarget(t *testing.T) { } mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil) - handler := NewRenderHandler(mockStorage) + handler := NewRenderHandler(mockStorage, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = fmt.Sprintf("target=foo.bar&target=baz.qux&from=%d&until=%d", diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 925bee6d29..5a172e60b0 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -44,6 +44,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/validator" "github.com/m3db/m3/src/query/api/v1/handler/topic" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" @@ -85,6 +86,7 @@ type Handler struct { createdAt time.Time tagOptions models.TagOptions timeoutOpts *prometheus.TimeoutOpts + enforcer cost.ChainedEnforcer } // Router returns the http handler registered with all relevant routes for query. @@ -101,6 +103,7 @@ func NewHandler( clusterClient clusterclient.Client, cfg config.Configuration, embeddedDbCfg *dbconfig.DBConfiguration, + enforcer cost.ChainedEnforcer, scope tally.Scope, ) (*Handler, error) { r := mux.NewRouter() @@ -132,6 +135,7 @@ func NewHandler( createdAt: time.Now(), tagOptions: tagOptions, timeoutOpts: timeoutOpts, + enforcer: enforcer, } return h, nil } @@ -224,7 +228,7 @@ func (h *Handler) RegisterRoutes() error { // Graphite endpoints h.router.HandleFunc(graphite.ReadURL, - wrapped(graphite.NewRenderHandler(h.storage)).ServeHTTP, + wrapped(graphite.NewRenderHandler(h.storage, h.enforcer)).ServeHTTP, ).Methods(graphite.ReadHTTPMethods...) h.router.HandleFunc(graphite.FindURL, diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index b077779770..161c4f15a8 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -71,6 +71,7 @@ func setupHandler(store storage.Storage) (*Handler, error) { nil, config.Configuration{LookbackDuration: &defaultLookbackDuration}, nil, + nil, tally.NewTestScope("", nil)) } @@ -83,8 +84,11 @@ func TestHandlerFetchTimeoutError(t *testing.T) { negValue := -1 * time.Second dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: &negValue}} - _, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil), nil, nil, - config.Configuration{LookbackDuration: &defaultLookbackDuration}, dbconfig, tally.NewTestScope("", nil)) + engine := executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil) + cfg := config.Configuration{LookbackDuration: &defaultLookbackDuration} + _, err := NewHandler(downsamplerAndWriter, makeTagOptions(), engine, nil, nil, + cfg, dbconfig, nil, tally.NewTestScope("", nil)) + require.Error(t, err) } @@ -97,8 +101,10 @@ func TestHandlerFetchTimeout(t *testing.T) { fourMin := 4 * time.Minute dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: &fourMin}} - h, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil), nil, nil, - config.Configuration{LookbackDuration: &defaultLookbackDuration}, dbconfig, tally.NewTestScope("", nil)) + engine := executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil) + cfg := config.Configuration{LookbackDuration: &defaultLookbackDuration} + h, err := NewHandler(downsamplerAndWriter, makeTagOptions(), engine, + nil, nil, cfg, dbconfig, nil, tally.NewTestScope("", nil)) require.NoError(t, err) assert.Equal(t, 4*time.Minute, h.timeoutOpts.FetchTimeout) } diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index b93df4a60a..929547392e 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -25,6 +25,7 @@ import ( "errors" "time" + "github.com/m3db/m3/src/query/cost" xctx "github.com/m3db/m3/src/query/graphite/context" "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/graphite/ts" @@ -38,13 +39,21 @@ var ( ) type m3WrappedStore struct { - m3 storage.Storage + m3 storage.Storage + enforcer cost.ChainedEnforcer } // NewM3WrappedStorage creates a graphite storage wrapper around an m3query // storage instance. -func NewM3WrappedStorage(m3storage storage.Storage) Storage { - return &m3WrappedStore{m3: m3storage} +func NewM3WrappedStorage( + m3storage storage.Storage, + enforcer cost.ChainedEnforcer, +) Storage { + if enforcer == nil { + enforcer = cost.NoopChainedEnforcer() + } + + return &m3WrappedStore{m3: m3storage, enforcer: enforcer} } // TranslateQueryToMatchers converts a graphite query to tag matcher pairs. @@ -130,6 +139,7 @@ func (s *m3WrappedStore) FetchByQuery( m3ctx, cancel := context.WithTimeout(ctx.RequestContext(), opts.Timeout) defer cancel() fetchOptions := storage.NewFetchOptions() + fetchOptions.Enforcer = s.enforcer fetchOptions.FanoutOptions = &storage.FanoutOptions{ FanoutUnaggregated: storage.FanoutForceDisable, FanoutAggregated: storage.FanoutDefault, diff --git a/src/query/graphite/storage/m3_wrapper_test.go b/src/query/graphite/storage/m3_wrapper_test.go index 8fd7dfe60c..02eb5e677a 100644 --- a/src/query/graphite/storage/m3_wrapper_test.go +++ b/src/query/graphite/storage/m3_wrapper_test.go @@ -26,12 +26,14 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/cost" xctx "github.com/m3db/m3/src/query/graphite/context" "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/mock" m3ts "github.com/m3db/m3/src/query/ts" + xcost "github.com/m3db/m3/src/x/cost" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -137,7 +139,10 @@ func TestFetchByQuery(t *testing.T) { } store.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil) - wrapper := NewM3WrappedStorage(store) + enforcers := []xcost.Enforcer{xcost.NewEnforcer(nil, nil, nil)} + enforcer, err := cost.NewChainedEnforcer("name", enforcers) + require.NoError(t, err) + wrapper := NewM3WrappedStorage(store, enforcer) ctx := xctx.New() ctx.SetRequestContext(context.TODO()) end := time.Now() @@ -156,4 +161,7 @@ func TestFetchByQuery(t *testing.T) { series := result.SeriesList[0] assert.Equal(t, "a", series.Name()) assert.Equal(t, []float64{3, 3, 3}, series.SafeValues()) + + // NB: ensure the fetch was called with enforcer propagated correctly + assert.Equal(t, enforcer, store.LastFetchOptions().Enforcer) } diff --git a/src/query/server/server.go b/src/query/server/server.go index 4194d238fd..a5ffffd354 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -261,7 +261,7 @@ func Run(runOpts RunOptions) { } handler, err := httpd.NewHandler(downsamplerAndWriter, tagOptions, engine, - m3dbClusters, clusterClient, cfg, runOpts.DBConfig, scope) + m3dbClusters, clusterClient, cfg, runOpts.DBConfig, perQueryEnforcer, scope) if err != nil { logger.Fatal("unable to set up handlers", zap.Error(err)) } diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index 9fb0697733..c956b94a22 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -28,9 +28,11 @@ import ( "time" "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/ts" + xcost "github.com/m3db/m3/src/x/cost" xsync "github.com/m3db/m3x/sync" xtime "github.com/m3db/m3x/time" ) @@ -266,6 +268,7 @@ func SeriesToPromSamples(series *ts.Series) []*prompb.Sample { func iteratorToTsSeries( iter encoding.SeriesIterator, + enforcer cost.ChainedEnforcer, tagOptions models.TagOptions, ) (*ts.Series, error) { metric, err := FromM3IdentToMetric(iter.ID(), iter.Tags(), tagOptions) @@ -283,6 +286,11 @@ func iteratorToTsSeries( return nil, err } + r := enforcer.Add(xcost.Cost(len(datapoints))) + if r.Error != nil { + return nil, r.Error + } + return ts.NewSeries(metric.ID, datapoints, metric.Tags), nil } @@ -290,11 +298,12 @@ func iteratorToTsSeries( func decompressSequentially( iterLength int, iters []encoding.SeriesIterator, + enforcer cost.ChainedEnforcer, tagOptions models.TagOptions, ) (*FetchResult, error) { seriesList := make([]*ts.Series, 0, len(iters)) for _, iter := range iters { - series, err := iteratorToTsSeries(iter, tagOptions) + series, err := iteratorToTsSeries(iter, enforcer, tagOptions) if err != nil { return nil, err } @@ -310,6 +319,7 @@ func decompressConcurrently( iterLength int, iters []encoding.SeriesIterator, readWorkerPool xsync.PooledWorkerPool, + enforcer cost.ChainedEnforcer, tagOptions models.TagOptions, ) (*FetchResult, error) { seriesList := make([]*ts.Series, iterLength) @@ -336,7 +346,7 @@ func decompressConcurrently( return } - series, err := iteratorToTsSeries(iter, tagOptions) + series, err := iteratorToTsSeries(iter, enforcer, tagOptions) if err != nil { // Return the first error that is encountered. select { @@ -366,6 +376,7 @@ func SeriesIteratorsToFetchResult( seriesIterators encoding.SeriesIterators, readWorkerPool xsync.PooledWorkerPool, cleanupSeriesIters bool, + enforcer cost.ChainedEnforcer, tagOptions models.TagOptions, ) (*FetchResult, error) { if cleanupSeriesIters { @@ -375,8 +386,9 @@ func SeriesIteratorsToFetchResult( iters := seriesIterators.Iters() iterLength := seriesIterators.Len() if readWorkerPool == nil { - return decompressSequentially(iterLength, iters, tagOptions) + return decompressSequentially(iterLength, iters, enforcer, tagOptions) } - return decompressConcurrently(iterLength, iters, readWorkerPool, tagOptions) + return decompressConcurrently(iterLength, iters, readWorkerPool, + enforcer, tagOptions) } diff --git a/src/query/storage/converter_test.go b/src/query/storage/converter_test.go index 104954c890..219230c46c 100644 --- a/src/query/storage/converter_test.go +++ b/src/query/storage/converter_test.go @@ -28,10 +28,12 @@ import ( "time" "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/test/seriesiter" "github.com/m3db/m3/src/query/ts" + xcost "github.com/m3db/m3/src/x/cost" "github.com/m3db/m3x/ident" xsync "github.com/m3db/m3x/sync" @@ -71,17 +73,25 @@ func TestLabelConversion(t *testing.T) { assert.Equal(t, labels, reverted) } -func verifyExpandSeries(t *testing.T, ctrl *gomock.Controller, num int, pools xsync.PooledWorkerPool) { +func verifyExpandSeries( + t *testing.T, + ctrl *gomock.Controller, + num int, + pools xsync.PooledWorkerPool, +) { testTags := seriesiter.GenerateTag() iters := seriesiter.NewMockSeriesIters(ctrl, testTags, num, 2) - results, err := SeriesIteratorsToFetchResult(iters, pools, true, nil) + enforcer := cost.NewMockChainedEnforcer(ctrl) + enforcer.EXPECT().Add(xcost.Cost(2)).Times(num) + results, err := SeriesIteratorsToFetchResult(iters, pools, true, enforcer, nil) assert.NoError(t, err) require.NotNil(t, results) require.NotNil(t, results.SeriesList) require.Len(t, results.SeriesList, num) - expectedTags := []models.Tag{{Name: testTags.Name.Bytes(), Value: testTags.Value.Bytes()}} + expectedTags := []models.Tag{{Name: testTags.Name.Bytes(), + Value: testTags.Value.Bytes()}} for i := 0; i < num; i++ { series := results.SeriesList[i] require.NotNil(t, series) @@ -122,12 +132,14 @@ func TestFailingExpandSeriesValidPools(t *testing.T) { poolSize = 2 numUncalled = 10 ) - pool, err := xsync.NewPooledWorkerPool(poolSize, xsync.NewPooledWorkerPoolOptions()) + pool, err := xsync.NewPooledWorkerPool(poolSize, + xsync.NewPooledWorkerPoolOptions()) require.NoError(t, err) pool.Init() ctrl := gomock.NewController(t) - iters := seriesiter.NewMockSeriesIterSlice(ctrl, seriesiter.NewMockValidTagGenerator(ctrl), numValidSeries, numValues) + iters := seriesiter.NewMockSeriesIterSlice(ctrl, + seriesiter.NewMockValidTagGenerator(ctrl), numValidSeries, numValues) // Add poolSize + 1 failing iterators; there can be slight timing // inconsistencies which can sometimes cause failures in this test // as one of the `uncalled` iterators gets unexpectedly used. @@ -155,11 +167,54 @@ func TestFailingExpandSeriesValidPools(t *testing.T) { mockIters.EXPECT().Iters().Return(iters).Times(1) mockIters.EXPECT().Len().Return(len(iters)).Times(1) mockIters.EXPECT().Close().Times(1) + enforcer := cost.NewMockChainedEnforcer(ctrl) + enforcer.EXPECT().Add(xcost.Cost(2)).Times(numValidSeries) result, err := SeriesIteratorsToFetchResult( mockIters, pool, true, + enforcer, + nil, + ) + require.Nil(t, result) + require.EqualError(t, err, "error") +} + +func TestOverLimit(t *testing.T) { + var ( + numValidSeries = 8 + numValues = 2 + poolSize = 2 + numUncalled = 10 + ) + pool, err := xsync.NewPooledWorkerPool(poolSize, + xsync.NewPooledWorkerPoolOptions()) + require.NoError(t, err) + pool.Init() + ctrl := gomock.NewController(t) + + iters := seriesiter.NewMockSeriesIterSlice(ctrl, + seriesiter.NewMockValidTagGenerator(ctrl), numValidSeries+poolSize+1, numValues) + for i := 0; i < numUncalled; i++ { + uncalledIter := encoding.NewMockSeriesIterator(ctrl) + iters = append(iters, uncalledIter) + } + + mockIters := encoding.NewMockSeriesIterators(ctrl) + mockIters.EXPECT().Iters().Return(iters).Times(1) + mockIters.EXPECT().Len().Return(len(iters)).Times(1) + mockIters.EXPECT().Close().Times(1) + enforcer := cost.NewMockChainedEnforcer(ctrl) + enforcer.EXPECT().Add(xcost.Cost(2)).Times(numValidSeries) + enforcer.EXPECT().Add(xcost.Cost(2)). + Return(xcost.Report{Error: errors.New("error")}).MinTimes(1) + + result, err := SeriesIteratorsToFetchResult( + mockIters, + pool, + true, + enforcer, nil, ) require.Nil(t, result) @@ -265,6 +320,25 @@ var ( benchResult *prompb.QueryResult ) +func TestIteratorToTsSeries(t *testing.T) { + t.Run("errors on iterator error", func(t *testing.T) { + ctrl := gomock.NewController(t) + mockIter := encoding.NewMockSeriesIterator(ctrl) + + expectedErr := errors.New("expected") + mockIter.EXPECT().Err().Return(expectedErr) + + mockIter = seriesiter.NewMockSeriesIteratorFromBase(mockIter, seriesiter.NewMockValidTagGenerator(ctrl), 1) + enforcer := cost.NewMockChainedEnforcer(ctrl) + enforcer.EXPECT().Add(xcost.Cost(2)).Times(1) + + dps, err := iteratorToTsSeries(mockIter, enforcer, models.NewTagOptions()) + + assert.Nil(t, dps) + assert.EqualError(t, err, expectedErr.Error()) + }) +} + // BenchmarkFetchResultToPromResult-8 100 10563444 ns/op 25368543 B/op 4443 allocs/op func BenchmarkFetchResultToPromResult(b *testing.B) { var ( @@ -303,20 +377,3 @@ func BenchmarkFetchResultToPromResult(b *testing.B) { benchResult = FetchResultToPromResult(fr) } } - -func TestIteratorToTsSeries(t *testing.T) { - t.Run("errors on iterator error", func(t *testing.T) { - ctrl := gomock.NewController(t) - mockIter := encoding.NewMockSeriesIterator(ctrl) - - expectedErr := errors.New("expected") - mockIter.EXPECT().Err().Return(expectedErr) - - mockIter = seriesiter.NewMockSeriesIteratorFromBase(mockIter, seriesiter.NewMockValidTagGenerator(ctrl), 1) - - dps, err := iteratorToTsSeries(mockIter, models.NewTagOptions()) - - assert.Nil(t, dps) - assert.EqualError(t, err, expectedErr.Error()) - }) -} diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index de6a10a2fe..cb4ca646e7 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -107,10 +107,16 @@ func (s *m3storage) Fetch( return nil, err } + enforcer := options.Enforcer + if enforcer == nil { + enforcer = cost.NoopChainedEnforcer() + } + fetchResult, err := storage.SeriesIteratorsToFetchResult( iters, s.readWorkerPool, false, + enforcer, s.opts.TagOptions(), ) diff --git a/src/query/storage/mock/storage.go b/src/query/storage/mock/storage.go index ed7133f3e2..1ff562c513 100644 --- a/src/query/storage/mock/storage.go +++ b/src/query/storage/mock/storage.go @@ -34,6 +34,7 @@ type Storage interface { storage.Storage SetTypeResult(storage.Type) + LastFetchOptions() *storage.FetchOptions SetFetchResult(*storage.FetchResult, error) SetFetchTagsResult(*storage.SearchResults, error) SetCompleteTagsResult(*storage.CompleteTagsResult, error) @@ -48,7 +49,8 @@ type mockStorage struct { typeResult struct { result storage.Type } - fetchResult struct { + lastFetchOptions *storage.FetchOptions + fetchResult struct { result *storage.FetchResult err error } @@ -130,13 +132,20 @@ func (s *mockStorage) Writes() []*storage.WriteQuery { return s.writes } +func (s *mockStorage) LastFetchOptions() *storage.FetchOptions { + s.RLock() + defer s.RUnlock() + return s.lastFetchOptions +} + func (s *mockStorage) Fetch( ctx context.Context, query *storage.FetchQuery, - _ *storage.FetchOptions, + opts *storage.FetchOptions, ) (*storage.FetchResult, error) { - s.RLock() - defer s.RUnlock() + s.Lock() + defer s.Unlock() + s.lastFetchOptions = opts return s.fetchResult.result, s.fetchResult.err } diff --git a/src/query/tsdb/remote/client.go b/src/query/tsdb/remote/client.go index f864443d6f..5121aee90a 100644 --- a/src/query/tsdb/remote/client.go +++ b/src/query/tsdb/remote/client.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/errors" rpc "github.com/m3db/m3/src/query/generated/proto/rpcpb" "github.com/m3db/m3/src/query/models" @@ -103,7 +104,13 @@ func (c *grpcClient) Fetch( return nil, err } - return storage.SeriesIteratorsToFetchResult(iters, c.readWorkerPool, true, c.tagOptions) + enforcer := options.Enforcer + if enforcer == nil { + enforcer = cost.NoopChainedEnforcer() + } + + return storage.SeriesIteratorsToFetchResult(iters, c.readWorkerPool, + true, enforcer, c.tagOptions) } func (c *grpcClient) waitForPools() (encoding.IteratorPools, error) { @@ -182,10 +189,16 @@ func (c *grpcClient) FetchBlocks( return block.Result{}, err } + enforcer := options.Enforcer + if enforcer == nil { + enforcer = cost.NoopChainedEnforcer() + } + fetchResult, err := storage.SeriesIteratorsToFetchResult( iters, c.readWorkerPool, true, + enforcer, c.tagOptions, ) if err != nil {