From 973caaf2ae0f8e98cf38d6f26f0672a8fd53d2ed Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Mon, 1 Mar 2021 08:48:31 -0800 Subject: [PATCH] Restrict the time a query can hold an index worker (#3269) A new MaxWorkerTime option is introduced to cap how long a query can hold an index worker at once. If the max is exceeded, the query must yield the index worker back and acquire it again (potentially waiting) to continue processing the results. This cap ensures large queries don't dominate the finite number of index workers allowed to run concurrently and lock out smaller queries. The idea is users would want to set the max large enough so the vast majority of typical queries can finish with only a single worker acquisition. --- src/cmd/services/m3dbnode/config/config.go | 7 + .../services/m3dbnode/config/config_test.go | 1 + .../node/fetch_result_iter_test.go | 2 +- .../server/tchannelthrift/node/service.go | 5 +- src/dbnode/server/server.go | 26 +- src/dbnode/storage/index.go | 312 ++++++--- src/dbnode/storage/index/aggregate_iter.go | 23 +- src/dbnode/storage/index/aggregate_results.go | 22 - src/dbnode/storage/index/block.go | 201 +++--- src/dbnode/storage/index/block_prop_test.go | 50 +- src/dbnode/storage/index/block_test.go | 221 +++---- src/dbnode/storage/index/index_mock.go | 603 +++++++++++------- src/dbnode/storage/index/options.go | 15 + src/dbnode/storage/index/query_iter.go | 81 +++ src/dbnode/storage/index/query_options.go | 3 +- .../storage/index/query_options_test.go | 6 +- src/dbnode/storage/index/results.go | 24 +- src/dbnode/storage/index/types.go | 88 ++- .../storage/index/wide_query_results.go | 22 +- src/dbnode/storage/index_block_test.go | 192 +++++- .../storage/index_query_concurrent_test.go | 28 +- src/dbnode/storage/index_test.go | 15 +- .../storage/limits/permits/fixed_permits.go | 91 +++ .../limits/permits/fixed_permits_test.go | 69 ++ .../limits/permits/lookback_limit_permit.go | 2 +- .../storage/limits/permits/noop_permit.go | 2 +- src/dbnode/storage/limits/permits/options.go | 18 + src/dbnode/storage/limits/permits/types.go | 9 +- src/dbnode/storage/options.go | 19 - src/dbnode/storage/storage_mock.go | 29 - src/dbnode/storage/types.go | 7 - 31 files changed, 1379 insertions(+), 814 deletions(-) create mode 100644 src/dbnode/storage/index/query_iter.go create mode 100644 src/dbnode/storage/limits/permits/fixed_permits.go create mode 100644 src/dbnode/storage/limits/permits/fixed_permits_test.go diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index 1f655f7533..e2a4db455d 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -387,6 +387,13 @@ type IndexConfiguration struct { // as they are very CPU-intensive (regex and FST matching). MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"` + // MaxWorkerTime is the maximum time a query can hold an index worker at once. If a query does not finish in this + // time it yields the worker and must wait again for another worker to resume. The number of workers available to + // all queries is defined by MaxQueryIDsConcurrency. + // Capping the maximum time per worker ensures a few large queries don't hold all the concurrent workers and lock + // out many small queries from running. + MaxWorkerTime time.Duration `yaml:"maxWorkerTime"` + // RegexpDFALimit is the limit on the max number of states used by a // regexp deterministic finite automaton. Default is 10,000 states. RegexpDFALimit *int `yaml:"regexpDFALimit"` diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 2a465803b6..3d03f31f06 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -339,6 +339,7 @@ func TestConfiguration(t *testing.T) { expected := `db: index: maxQueryIDsConcurrency: 0 + maxWorkerTime: 0s regexpDFALimit: null regexpFSALimit: null forwardIndexProbability: 0 diff --git a/src/dbnode/network/server/tchannelthrift/node/fetch_result_iter_test.go b/src/dbnode/network/server/tchannelthrift/node/fetch_result_iter_test.go index bc95fa2cd6..78ef5e03ba 100644 --- a/src/dbnode/network/server/tchannelthrift/node/fetch_result_iter_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/fetch_result_iter_test.go @@ -201,7 +201,7 @@ func (p *fakePermits) TryAcquire(_ context.Context) (bool, error) { return true, nil } -func (p *fakePermits) Release() { +func (p *fakePermits) Release(_ int64) { p.released++ p.available++ } diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 8b12fa9350..54ec72f068 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -1092,9 +1092,10 @@ func (i *fetchTaggedResultsIter) Close(err error) { i.seriesBlocks.RecordValue(float64(i.totalSeriesBlocks)) - for n := 0; n < i.batchesAcquired; n++ { - i.blockPermits.Release() + for n := 0; n < i.batchesAcquired-1; n++ { + i.blockPermits.Release(int64(i.blocksPerBatch)) } + i.blockPermits.Release(int64(i.blocksPerBatch - i.blocksAvailable)) } // IDResult is the FetchTagged result for a series ID. diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 8eb0c1b1a1..7125a05701 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -90,13 +90,12 @@ import ( xos "github.com/m3db/m3/src/x/os" "github.com/m3db/m3/src/x/pool" "github.com/m3db/m3/src/x/serialize" - xsync "github.com/m3db/m3/src/x/sync" apachethrift "github.com/apache/thrift/lib/go/thrift" "github.com/m3dbx/vellum/levenshtein" "github.com/m3dbx/vellum/levenshtein2" "github.com/m3dbx/vellum/regexp" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/uber-go/tally" "github.com/uber/tchannel-go" "go.etcd.io/etcd/embed" @@ -371,14 +370,6 @@ func Run(runOpts RunOptions) { opentracing.SetGlobalTracer(tracer) - if cfg.Index.MaxQueryIDsConcurrency != 0 { - queryIDsWorkerPool := xsync.NewWorkerPool(cfg.Index.MaxQueryIDsConcurrency) - queryIDsWorkerPool.Init() - opts = opts.SetQueryIDsWorkerPool(queryIDsWorkerPool) - } else { - logger.Warn("max index query IDs concurrency was not set, falling back to default value") - } - // Set global index options. if n := cfg.Index.RegexpDFALimitOrDefault(); n > 0 { regexp.SetStateLimit(n) @@ -481,8 +472,14 @@ func Run(runOpts RunOptions) { seriesReadPermits.Start() defer seriesReadPermits.Stop() - opts = opts.SetPermitsOptions(opts.PermitsOptions(). - SetSeriesReadPermitsManager(seriesReadPermits)) + permitOptions := opts.PermitsOptions().SetSeriesReadPermitsManager(seriesReadPermits) + if cfg.Index.MaxQueryIDsConcurrency != 0 { + permitOptions = permitOptions.SetIndexQueryPermitsManager( + permits.NewFixedPermitsManager(cfg.Index.MaxQueryIDsConcurrency)) + } else { + logger.Warn("max index query IDs concurrency was not set, falling back to default value") + } + opts = opts.SetPermitsOptions(permitOptions) // Setup postings list cache. var ( @@ -524,6 +521,11 @@ func Run(runOpts RunOptions) { }). SetMmapReporter(mmapReporter). SetQueryLimits(queryLimits) + + if cfg.Index.MaxWorkerTime > 0 { + indexOpts = indexOpts.SetMaxWorkerTime(cfg.Index.MaxWorkerTime) + } + opts = opts.SetIndexOptions(indexOpts) if tick := cfg.Tick; tick != nil { diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 097ab06243..18766491e9 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -25,6 +25,7 @@ import ( gocontext "context" "errors" "fmt" + "io" "math" goruntime "runtime" "sort" @@ -45,6 +46,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/limits" + "github.com/m3db/m3/src/dbnode/storage/limits/permits" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts/writes" @@ -54,6 +56,7 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/index/segment/builder" idxpersist "github.com/m3db/m3/src/m3ninx/persist" + "github.com/m3db/m3/src/m3ninx/x" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" @@ -61,7 +64,6 @@ import ( "github.com/m3db/m3/src/x/instrument" xopentracing "github.com/m3db/m3/src/x/opentracing" xresource "github.com/m3db/m3/src/x/resource" - xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" "github.com/m3db/bitset" @@ -89,9 +91,7 @@ const ( defaultFlushDocsBatchSize = 8192 ) -var ( - allQuery = idx.NewAllQuery() -) +var allQuery = idx.NewAllQuery() // nolint: maligned type nsIndex struct { @@ -122,9 +122,8 @@ type nsIndex struct { resultsPool index.QueryResultsPool aggregateResultsPool index.AggregateResultsPool - // NB(r): Use a pooled goroutine worker once pooled goroutine workers - // support timeouts for query workers pool. - queryWorkersPool xsync.WorkerPool + permitsManager permits.Manager + maxWorkerTime time.Duration // queriesWg tracks outstanding queries to ensure // we wait for all queries to complete before actually closing @@ -210,18 +209,37 @@ type newNamespaceIndexOpts struct { type execBlockQueryFn func( ctx context.Context, block index.Block, - query index.Query, + iter index.ResultIterator, opts index.QueryOptions, state *asyncQueryExecState, results index.BaseResults, logFields []opentracinglog.Field, ) -// asyncQueryExecState tracks the async execution errors and results for a query. +// newBlockIterFn returns a new ResultIterator for the query. +type newBlockIterFn func( + ctx context.Context, + block index.Block, + query index.Query, + results index.BaseResults, +) (index.ResultIterator, error) + +// asyncQueryExecState tracks the async execution errors for a query. type asyncQueryExecState struct { - sync.Mutex - multiErr xerrors.MultiError - exhaustive bool + sync.RWMutex + multiErr xerrors.MultiError +} + +func (s *asyncQueryExecState) hasErr() bool { + s.RLock() + defer s.RUnlock() + return s.multiErr.NumErrors() > 0 +} + +func (s *asyncQueryExecState) addErr(err error) { + s.Lock() + s.multiErr = s.multiErr.Add(err) + s.Unlock() } // newNamespaceIndex returns a new namespaceIndex for the provided namespace. @@ -346,11 +364,12 @@ func newNamespaceIndexWithOptions( resultsPool: indexOpts.QueryResultsPool(), aggregateResultsPool: indexOpts.AggregateResultsPool(), - queryWorkersPool: newIndexOpts.opts.QueryIDsWorkerPool(), - metrics: newNamespaceIndexMetrics(indexOpts, instrumentOpts), + permitsManager: newIndexOpts.opts.PermitsOptions().IndexQueryPermitsManager(), + metrics: newNamespaceIndexMetrics(indexOpts, instrumentOpts), doNotIndexWithFields: doNotIndexWithFields, shardSet: shardSet, + maxWorkerTime: indexOpts.MaxWorkerTime(), } // Assign shard set upfront. @@ -1345,7 +1364,8 @@ func (i *nsIndex) Query( FilterID: i.shardsFilterID(), }) ctx.RegisterFinalizer(results) - exhaustive, err := i.query(ctx, query, results, opts, i.execBlockQueryFn, logFields, i.metrics.queryMetrics) + exhaustive, err := i.query(ctx, query, results, opts, i.execBlockQueryFn, i.newBlockQueryIterFn, + logFields, i.metrics.queryMetrics) if err != nil { sp.LogFields(opentracinglog.Error(err)) return index.QueryResult{}, err @@ -1387,7 +1407,8 @@ func (i *nsIndex) WideQuery( defer results.Finalize() queryOpts := opts.ToQueryOptions() - _, err := i.query(ctx, query, results, queryOpts, i.execBlockWideQueryFn, logFields, i.metrics.wideQueryMetrics) + _, err := i.query(ctx, query, results, queryOpts, i.execBlockWideQueryFn, i.newBlockQueryIterFn, logFields, + i.metrics.wideQueryMetrics) if err != nil { sp.LogFields(opentracinglog.Error(err)) return err @@ -1443,7 +1464,9 @@ func (i *nsIndex) AggregateQuery( } aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe() results.Reset(id, aopts) - exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn, logFields, i.metrics.aggQueryMetrics) + exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn, + i.newBlockAggregatorIterFn, + logFields, i.metrics.aggQueryMetrics) if err != nil { return index.AggregateQueryResult{}, err } @@ -1459,6 +1482,7 @@ func (i *nsIndex) query( results index.BaseResults, opts index.QueryOptions, execBlockFn execBlockQueryFn, + newBlockIterFn newBlockIterFn, logFields []opentracinglog.Field, queryMetrics queryMetrics, //nolint: gocritic ) (bool, error) { @@ -1466,7 +1490,8 @@ func (i *nsIndex) query( sp.LogFields(logFields...) defer sp.Finish() - exhaustive, err := i.queryWithSpan(ctx, query, results, opts, execBlockFn, sp, logFields, queryMetrics) + exhaustive, err := i.queryWithSpan(ctx, query, results, opts, execBlockFn, newBlockIterFn, sp, logFields, + queryMetrics) if err != nil { sp.LogFields(opentracinglog.Error(err)) @@ -1511,12 +1536,23 @@ func (i *nsIndex) query( return exhaustive, nil } +// blockIter is a composite type to hold various state about a block while iterating over the results. +type blockIter struct { + iter index.ResultIterator + iterCloser io.Closer + block index.Block + waitTime time.Duration + searchTime time.Duration + processingTime time.Duration +} + func (i *nsIndex) queryWithSpan( ctx context.Context, query index.Query, results index.BaseResults, opts index.QueryOptions, execBlockFn execBlockQueryFn, + newBlockIterFn newBlockIterFn, span opentracing.Span, logFields []opentracinglog.Field, queryMetrics queryMetrics, //nolint: gocritic @@ -1555,52 +1591,110 @@ func (i *nsIndex) queryWithSpan( var ( // State contains concurrent mutable state for async execution below. - state = asyncQueryExecState{ - exhaustive: true, - } - wg sync.WaitGroup - totalWaitTime time.Duration + state = &asyncQueryExecState{} + wg sync.WaitGroup ) + permits, err := i.permitsManager.NewPermits(ctx) + if err != nil { + return false, err + } + blockIters := make([]*blockIter, 0, len(blocks)) for _, block := range blocks { - // Capture block for async query execution below. - block := block - - // We're looping through all the blocks that we need to query and kicking - // off parallel queries which are bounded by the queryWorkersPool's maximum - // concurrency. This means that it's possible at this point that we've - // completed querying one or more blocks and already exhausted the maximum - // number of results that we're allowed to return. If thats the case, there - // is no value in kicking off more parallel queries, so we break out of - // the loop. - seriesCount := results.Size() - docsCount := results.TotalDocsCount() - alreadyExceededLimit := opts.SeriesLimitExceeded(seriesCount) || opts.DocsLimitExceeded(docsCount) - if alreadyExceededLimit { - state.Lock() - state.exhaustive = false - state.Unlock() - // Break out if already not exhaustive. - break + iter, err := newBlockIterFn(ctx, block, query, results) + if err != nil { + return false, err } - - // Calculate time spent waiting for a worker - wg.Add(1) - scheduleResult := i.queryWorkersPool.GoWithContext(ctx, func() { - startProcessing := time.Now() - execBlockFn(ctx, block, query, opts, &state, results, logFields) - i.metrics.queryMetrics.blockProcessingTime.RecordDuration(time.Since(startProcessing)) - wg.Done() + blockIters = append(blockIters, &blockIter{ + iter: iter, + iterCloser: x.NewSafeCloser(iter), + block: block, }) - totalWaitTime += scheduleResult.WaitTime - if !scheduleResult.Available { - state.Lock() - state.multiErr = state.multiErr.Add(gocontext.Canceled) - state.Unlock() - // Did not launch task, need to ensure don't wait for it - wg.Done() + } + + defer func() { + for _, iter := range blockIters { + // safe to call Close multiple times, so it's fine to eagerly close in the loop below and here. + _ = iter.iterCloser.Close() + } + }() + + // queryCanceled returns true if the query has been canceled and the current iteration should terminate. + queryCanceled := func() bool { + return opts.LimitsExceeded(results.Size(), results.TotalDocsCount()) || state.hasErr() + } + // waitForPermit waits for a permit. returns true if the permit was acquired and the wait time. + waitForPermit := func() (bool, time.Duration) { + // make sure the query hasn't been canceled before waiting for a permit. + if queryCanceled() { + return false, 0 + } + startWait := time.Now() + err := permits.Acquire(ctx) + waitTime := time.Since(startWait) + if err != nil { + state.addErr(err) + return false, waitTime + } + // make sure the query hasn't been canceled while waiting for a permit. + if queryCanceled() { + permits.Release(0) + return false, waitTime + } + return true, waitTime + } + + // We're looping through all the blocks that we need to query and kicking + // off parallel queries which are bounded by the permits maximum + // concurrency. It's possible at this point that we've completed querying one or more blocks and already exhausted + // the maximum number of results that we're allowed to return. If thats the case, there is no value in kicking off + // more parallel queries, so we break out of the loop. + for _, blockIter := range blockIters { + // Capture for async query execution below. + blockIter := blockIter + + // acquire a permit before kicking off the goroutine to process the iterator. this limits the number of + // concurrent goroutines to # of permits + large queries that needed multiple iterations to finish. + acq, waitTime := waitForPermit() + blockIter.waitTime += waitTime + if !acq { break } + + wg.Add(1) + // kick off a go routine to process the entire iterator. + go func() { + defer wg.Done() + first := true + for !blockIter.iter.Done() { + // if this is not the first iteration of the iterator, need to acquire another permit. + if !first { + acq, waitTime := waitForPermit() + blockIter.waitTime += waitTime + if !acq { + break + } + } + first = false + startProcessing := time.Now() + execBlockFn(ctx, blockIter.block, blockIter.iter, opts, state, results, logFields) + processingTime := time.Since(startProcessing) + queryMetrics.blockProcessingTime.RecordDuration(processingTime) + blockIter.processingTime += processingTime + permits.Release(int64(processingTime)) + } + if first { + // this should never happen since a new iter cannot be Done, but just to be safe. + permits.Release(0) + } + blockIter.searchTime += blockIter.iter.SearchDuration() + + // close the iterator since it's no longer needed. it's safe to call Close multiple times, here and in the + // defer when the function returns. + if err := blockIter.iterCloser.Close(); err != nil { + state.addErr(err) + } + }() } // wait for all workers to finish. if the caller cancels the call, the workers will be interrupted and eventually @@ -1609,11 +1703,9 @@ func (i *nsIndex) queryWithSpan( i.metrics.loadedDocsPerQuery.RecordValue(float64(results.TotalDocsCount())) - state.Lock() - // Take reference to vars to return while locked. - exhaustive := state.exhaustive + exhaustive := opts.Exhaustive(results.Size(), results.TotalDocsCount()) + // ok to read state without lock since all parallel queries are done. multiErr := state.multiErr - state.Unlock() err = multiErr.FinalError() if err != nil && !multiErr.Contains(gocontext.DeadlineExceeded) && !multiErr.Contains(gocontext.Canceled) { @@ -1624,18 +1716,40 @@ func (i *nsIndex) queryWithSpan( // update timing metrics even if the query was canceled due to a timeout queryRuntime := time.Since(start) + var ( + totalWaitTime time.Duration + totalProcessingTime time.Duration + totalSearchTime time.Duration + ) + + for _, blockIter := range blockIters { + totalWaitTime += blockIter.waitTime + totalProcessingTime += blockIter.processingTime + totalSearchTime += blockIter.searchTime + } + queryMetrics.queryTotalTime.ByDocs.Record(results.TotalDocsCount(), queryRuntime) queryMetrics.queryWaitTime.ByDocs.Record(results.TotalDocsCount(), totalWaitTime) - queryMetrics.queryProcessingTime.ByDocs.Record(results.TotalDocsCount(), results.TotalDuration().Processing) - queryMetrics.querySearchTime.ByDocs.Record(results.TotalDocsCount(), results.TotalDuration().Search) + queryMetrics.queryProcessingTime.ByDocs.Record(results.TotalDocsCount(), totalProcessingTime) + queryMetrics.querySearchTime.ByDocs.Record(results.TotalDocsCount(), totalSearchTime) return exhaustive, err } -func (i *nsIndex) execBlockQueryFn( +func (i *nsIndex) newBlockQueryIterFn( ctx context.Context, block index.Block, query index.Query, + _ index.BaseResults, +) (index.ResultIterator, error) { + return block.QueryIter(ctx, query) +} + +//nolint: dupl +func (i *nsIndex) execBlockQueryFn( + ctx context.Context, + block index.Block, + iter index.ResultIterator, opts index.QueryOptions, state *asyncQueryExecState, results index.BaseResults, @@ -1652,14 +1766,16 @@ func (i *nsIndex) execBlockQueryFn( docResults, ok := results.(index.DocumentResults) if !ok { // should never happen - state.Lock() - err := fmt.Errorf("unknown results type [%T] received during query", results) - state.multiErr = state.multiErr.Add(err) - state.Unlock() + state.addErr(fmt.Errorf("unknown results type [%T] received during query", results)) + return + } + queryIter, ok := iter.(index.QueryIterator) + if !ok { // should never happen + state.addErr(fmt.Errorf("unknown results type [%T] received during query", iter)) return } - blockExhaustive, err := block.Query(ctx, query, opts, docResults, logFields) + err := block.QueryWithIter(ctx, opts, queryIter, docResults, time.Now().Add(i.maxWorkerTime), logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in @@ -1668,20 +1784,16 @@ func (i *nsIndex) execBlockQueryFn( err = nil } - state.Lock() - defer state.Unlock() - if err != nil { sp.LogFields(opentracinglog.Error(err)) - state.multiErr = state.multiErr.Add(err) + state.addErr(err) } - state.exhaustive = state.exhaustive && blockExhaustive } func (i *nsIndex) execBlockWideQueryFn( ctx context.Context, block index.Block, - query index.Query, + iter index.ResultIterator, opts index.QueryOptions, state *asyncQueryExecState, results index.BaseResults, @@ -1698,14 +1810,16 @@ func (i *nsIndex) execBlockWideQueryFn( docResults, ok := results.(index.DocumentResults) if !ok { // should never happen - state.Lock() - err := fmt.Errorf("unknown results type [%T] received during wide query", results) - state.multiErr = state.multiErr.Add(err) - state.Unlock() + state.addErr(fmt.Errorf("unknown results type [%T] received during wide query", results)) + return + } + queryIter, ok := iter.(index.QueryIterator) + if !ok { // should never happen + state.addErr(fmt.Errorf("unknown results type [%T] received during query", iter)) return } - _, err := block.Query(ctx, query, opts, docResults, logFields) + err := block.QueryWithIter(ctx, opts, queryIter, docResults, time.Now().Add(i.maxWorkerTime), logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in @@ -1718,22 +1832,29 @@ func (i *nsIndex) execBlockWideQueryFn( err = nil } - state.Lock() - defer state.Unlock() - if err != nil { sp.LogFields(opentracinglog.Error(err)) - state.multiErr = state.multiErr.Add(err) + state.addErr(err) } +} - // NB: wide queries are always exhaustive. - state.exhaustive = true +func (i *nsIndex) newBlockAggregatorIterFn( + ctx context.Context, + block index.Block, + _ index.Query, + results index.BaseResults, +) (index.ResultIterator, error) { + aggResults, ok := results.(index.AggregateResults) + if !ok { // should never happen + return nil, fmt.Errorf("unknown results type [%T] received during aggregation", results) + } + return block.AggregateIter(ctx, aggResults.AggregateResultsOptions()) } func (i *nsIndex) execBlockAggregateQueryFn( ctx context.Context, block index.Block, - _ index.Query, + iter index.ResultIterator, opts index.QueryOptions, state *asyncQueryExecState, results index.BaseResults, @@ -1750,14 +1871,16 @@ func (i *nsIndex) execBlockAggregateQueryFn( aggResults, ok := results.(index.AggregateResults) if !ok { // should never happen - state.Lock() - err := fmt.Errorf("unknown results type [%T] received during aggregation", results) - state.multiErr = state.multiErr.Add(err) - state.Unlock() + state.addErr(fmt.Errorf("unknown results type [%T] received during aggregation", results)) + return + } + aggIter, ok := iter.(index.AggregateIterator) + if !ok { // should never happen + state.addErr(fmt.Errorf("unknown results type [%T] received during query", iter)) return } - blockExhaustive, err := block.Aggregate(ctx, opts, aggResults, logFields) + err := block.AggregateWithIter(ctx, aggIter, opts, aggResults, time.Now().Add(i.maxWorkerTime), logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in @@ -1766,13 +1889,10 @@ func (i *nsIndex) execBlockAggregateQueryFn( err = nil } - state.Lock() - defer state.Unlock() if err != nil { sp.LogFields(opentracinglog.Error(err)) - state.multiErr = state.multiErr.Add(err) + state.addErr(err) } - state.exhaustive = state.exhaustive && blockExhaustive } func (i *nsIndex) overriddenOptsForQueryWithRLock( diff --git a/src/dbnode/storage/index/aggregate_iter.go b/src/dbnode/storage/index/aggregate_iter.go index f4cbbe1d95..1d136bc6d5 100644 --- a/src/dbnode/storage/index/aggregate_iter.go +++ b/src/dbnode/storage/index/aggregate_iter.go @@ -41,11 +41,12 @@ type aggregateIter struct { searchDuration time.Duration // mutable state - idx int - err error - done bool - currField, currTerm []byte - nextField, nextTerm []byte + idx int + err error + done bool + currField, currTerm []byte + nextField, nextTerm []byte + docsCount, seriesCount int } func (it *aggregateIter) Next(ctx context.Context) bool { @@ -144,3 +145,15 @@ func (it *aggregateIter) Close() error { func (it *aggregateIter) SearchDuration() time.Duration { return it.searchDuration } + +func (it *aggregateIter) AddSeries(count int) { + it.seriesCount += count +} + +func (it *aggregateIter) AddDocs(count int) { + it.docsCount += count +} + +func (it *aggregateIter) Counts() (series, docs int) { + return it.seriesCount, it.docsCount +} diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index ac3cd7c194..05d6c5ac43 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -23,7 +23,6 @@ package index import ( "math" "sync" - "time" "github.com/uber-go/tally" @@ -49,7 +48,6 @@ type aggregatedResults struct { pool AggregateResultsPool valuesPool AggregateValuesPool encodedDocReader docs.EncodedDocumentReader - resultDuration ResultDurations iOpts instrument.Options } @@ -145,24 +143,6 @@ func NewAggregateResults( } } -func (r *aggregatedResults) TotalDuration() ResultDurations { - r.RLock() - defer r.RUnlock() - return r.resultDuration -} - -func (r *aggregatedResults) AddBlockProcessingDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddProcessing(duration) -} - -func (r *aggregatedResults) AddBlockSearchDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddSearch(duration) -} - func (r *aggregatedResults) EnforceLimits() bool { return true } func (r *aggregatedResults) Reset( @@ -197,8 +177,6 @@ func (r *aggregatedResults) Reset( r.totalDocsCount = 0 r.size = 0 - r.resultDuration = ResultDurations{} - // NB: could do keys+value in one step but I'm trying to avoid // using an internal method of a code-gen'd type. r.Unlock() diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 10696dd333..0fd12bc1e1 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -25,7 +25,6 @@ import ( "errors" "fmt" "io" - "math" "sync" "time" @@ -40,7 +39,6 @@ import ( "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/m3ninx/search/executor" - "github.com/m3db/m3/src/m3ninx/x" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" @@ -48,7 +46,6 @@ import ( xresource "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" - "github.com/opentracing/opentracing-go" opentracinglog "github.com/opentracing/opentracing-go/log" "github.com/uber-go/tally" "go.uber.org/zap" @@ -410,64 +407,11 @@ func (b *block) segmentReadersWithRLock() ([]segment.Reader, error) { return readers, nil } -// Query acquires a read lock on the block so that the segments -// are guaranteed to not be freed/released while accumulating results. -// This allows references to the mmap'd segment data to be accumulated -// and then copied into the results before this method returns (it is not -// safe to return docs directly from the segments from this method, the -// results datastructure is used to copy it every time documents are added -// to the results datastructure). -func (b *block) Query( - ctx context.Context, - query Query, - opts QueryOptions, - results DocumentResults, - logFields []opentracinglog.Field, -) (bool, error) { - ctx, sp := ctx.StartTraceSpan(tracepoint.BlockQuery) - sp.LogFields(logFields...) - defer sp.Finish() - - start := time.Now() - exhaustive, err := b.queryWithSpan(ctx, query, opts, results) - if err != nil { - sp.LogFields(opentracinglog.Error(err)) - } - results.AddBlockProcessingDuration(time.Since(start)) - return exhaustive, err -} - -func (b *block) queryWithSpan( - ctx context.Context, - query Query, - opts QueryOptions, - results DocumentResults, -) (bool, error) { - iter, err := b.QueryIter(ctx, query) - if err != nil { - return false, err - } - - iterCloser := x.NewSafeCloser(iter) - defer func() { - _ = iterCloser.Close() - b.metrics.queryDocsMatched.RecordValue(float64(results.TotalDocsCount())) - b.metrics.querySeriesMatched.RecordValue(float64(results.Size())) - }() - - if err := b.QueryWithIter(ctx, opts, iter, results, math.MaxInt64); err != nil { - return false, err - } - - if err := iterCloser.Close(); err != nil { - return false, err - } - results.AddBlockSearchDuration(iter.SearchDuration()) - - return opts.exhaustive(results.Size(), results.TotalDocsCount()), nil -} - -func (b *block) QueryIter(ctx context.Context, query Query) (doc.QueryDocIterator, error) { +// QueryIter acquires a read lock on the block to get the set of segments for the returned iterator. However, the +// segments are searched and results are processed lazily in the returned iterator. The segments are finalized when +// the ctx is finalized to ensure the mmaps are not freed until the ctx closes. This allows the returned results to +// reference data in the mmap without copying. +func (b *block) QueryIter(ctx context.Context, query Query) (QueryIterator, error) { b.RLock() defer b.RUnlock() @@ -480,7 +424,7 @@ func (b *block) QueryIter(ctx context.Context, query Query) (doc.QueryDocIterato } // FOLLOWUP(prateek): push down QueryOptions to restrict results - iter, err := exec.Execute(ctx, query.Query.SearchQuery()) + docIter, err := exec.Execute(ctx, query.Query.SearchQuery()) if err != nil { b.closeAsync(exec) return nil, err @@ -493,15 +437,40 @@ func (b *block) QueryIter(ctx context.Context, query Query) (doc.QueryDocIterato b.closeAsync(exec) })) - return iter, nil + return NewQueryIter(docIter), nil } +// nolint: dupl func (b *block) QueryWithIter( ctx context.Context, opts QueryOptions, - docIter doc.Iterator, + iter QueryIterator, + results DocumentResults, + deadline time.Time, + logFields []opentracinglog.Field, +) error { + ctx, sp := ctx.StartTraceSpan(tracepoint.BlockQuery) + sp.LogFields(logFields...) + defer sp.Finish() + + err := b.queryWithSpan(ctx, opts, iter, results, deadline) + if err != nil { + sp.LogFields(opentracinglog.Error(err)) + } + if iter.Done() { + docs, series := iter.Counts() + b.metrics.queryDocsMatched.RecordValue(float64(docs)) + b.metrics.querySeriesMatched.RecordValue(float64(series)) + } + return err +} + +func (b *block) queryWithSpan( + ctx context.Context, + opts QueryOptions, + iter QueryIterator, results DocumentResults, - limit int, + deadline time.Time, ) error { var ( err error @@ -511,7 +480,6 @@ func (b *block) QueryWithIter( docsPool = b.opts.DocumentArrayPool() batch = docsPool.Get() batchSize = cap(batch) - count int ) if batchSize == 0 { batchSize = defaultQueryDocsBatchSize @@ -520,8 +488,7 @@ func (b *block) QueryWithIter( // Register local data structures that need closing. defer docsPool.Put(batch) - for count < limit && docIter.Next() { - count++ + for time.Now().Before(deadline) && iter.Next(ctx) { if opts.LimitsExceeded(size, docsCount) { break } @@ -539,7 +506,7 @@ func (b *block) QueryWithIter( } } - batch = append(batch, docIter.Current()) + batch = append(batch, iter.Current()) if len(batch) < batchSize { continue } @@ -549,7 +516,7 @@ func (b *block) QueryWithIter( return err } } - if err := docIter.Err(); err != nil { + if err := iter.Err(); err != nil { return err } @@ -561,6 +528,9 @@ func (b *block) QueryWithIter( } } + iter.AddSeries(size) + iter.AddDocs(docsCount) + return nil } @@ -604,57 +574,10 @@ func (b *block) addQueryResults( return batch, size, docsCount, err } -// Aggregate acquires a read lock on the block so that the segments -// are guaranteed to not be freed/released while accumulating results. -// NB: Aggregate is an optimization of the general aggregate Query approach -// for the case when we can skip going to raw documents, and instead rely on -// pre-aggregated results via the FST underlying the index. -func (b *block) Aggregate( - ctx context.Context, - opts QueryOptions, - results AggregateResults, - logFields []opentracinglog.Field, -) (bool, error) { - ctx, sp := ctx.StartTraceSpan(tracepoint.BlockAggregate) - sp.LogFields(logFields...) - defer sp.Finish() - - start := time.Now() - exhaustive, err := b.aggregateWithSpan(ctx, opts, results, sp) - if err != nil { - sp.LogFields(opentracinglog.Error(err)) - } - results.AddBlockProcessingDuration(time.Since(start)) - - return exhaustive, err -} - -func (b *block) aggregateWithSpan( - ctx context.Context, - opts QueryOptions, - results AggregateResults, - sp opentracing.Span, -) (bool, error) { - iter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) - if err != nil { - return false, err - } - - defer func() { - _ = iter.Close() - b.metrics.aggregateDocsMatched.RecordValue(float64(results.TotalDocsCount())) - b.metrics.aggregateSeriesMatched.RecordValue(float64(results.Size())) - }() - - if err := b.AggregateWithIter(ctx, iter, opts, results, math.MaxInt64); err != nil { - return false, err - } - - results.AddBlockSearchDuration(iter.SearchDuration()) - - return opts.exhaustive(results.Size(), results.TotalDocsCount()), nil -} - +// AggIter acquires a read lock on the block to get the set of segments for the returned iterator. However, the +// segments are searched and results are processed lazily in the returned iterator. The segments are finalized when +// the ctx is finalized to ensure the mmaps are not freed until the ctx closes. This allows the returned results to +// reference data in the mmap without copying. func (b *block) AggregateIter(ctx context.Context, aggOpts AggregateResultsOptions) (AggregateIterator, error) { b.RLock() defer b.RUnlock() @@ -711,14 +634,40 @@ func (b *block) AggregateIter(ctx context.Context, aggOpts AggregateResultsOptio }, nil } +// nolint: dupl func (b *block) AggregateWithIter( ctx context.Context, iter AggregateIterator, opts QueryOptions, results AggregateResults, - limit int) error { + deadline time.Time, + logFields []opentracinglog.Field, +) error { + ctx, sp := ctx.StartTraceSpan(tracepoint.BlockAggregate) + sp.LogFields(logFields...) + defer sp.Finish() + + err := b.aggregateWithSpan(ctx, iter, opts, results, deadline) + if err != nil { + sp.LogFields(opentracinglog.Error(err)) + } + if iter.Done() { + docs, series := iter.Counts() + b.metrics.aggregateDocsMatched.RecordValue(float64(docs)) + b.metrics.aggregateSeriesMatched.RecordValue(float64(series)) + } + + return err +} + +func (b *block) aggregateWithSpan( + ctx context.Context, + iter AggregateIterator, + opts QueryOptions, + results AggregateResults, + deadline time.Time, +) error { var ( - count int err error source = opts.Source size = results.Size() @@ -747,8 +696,7 @@ func (b *block) AggregateWithIter( maxBatch = opts.DocsLimit } - for count < limit && iter.Next(ctx) { - count++ + for time.Now().Before(deadline) && iter.Next(ctx) { if opts.LimitsExceeded(size, docsCount) { break } @@ -832,6 +780,9 @@ func (b *block) AggregateWithIter( } } + iter.AddSeries(size) + iter.AddDocs(docsCount) + return nil } diff --git a/src/dbnode/storage/index/block_prop_test.go b/src/dbnode/storage/index/block_prop_test.go index 0c912eb930..4e7c5d3c23 100644 --- a/src/dbnode/storage/index/block_prop_test.go +++ b/src/dbnode/storage/index/block_prop_test.go @@ -120,23 +120,29 @@ func TestPostingsListCacheDoesNotAffectBlockQueryResults(t *testing.T) { } uncachedResults := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err := uncachedBlock.Query(context.NewBackground(), indexQuery, - queryOpts, uncachedResults, emptyLogFields) + ctx := context.NewBackground() + queryIter, err := uncachedBlock.QueryIter(ctx, indexQuery) if err != nil { - return false, fmt.Errorf("error querying uncached block: %v", err) + return false, err } - if !exhaustive { - return false, errors.New("querying uncached block was not exhaustive") + require.NoError(t, err) + for !queryIter.Done() { + err = uncachedBlock.QueryWithIter(ctx, + queryOpts, queryIter, uncachedResults, time.Now().Add(time.Millisecond * 10), emptyLogFields) + if err != nil { + return false, fmt.Errorf("error querying uncached block: %v", err) + } } cachedResults := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err = cachedBlock.Query(context.NewBackground(), indexQuery, - queryOpts, cachedResults, emptyLogFields) - if err != nil { - return false, fmt.Errorf("error querying cached block: %v", err) - } - if !exhaustive { - return false, errors.New("querying cached block was not exhaustive") + ctx = context.NewBackground() + queryIter, err = cachedBlock.QueryIter(ctx, indexQuery) + for !queryIter.Done() { + err = cachedBlock.QueryWithIter(ctx, queryOpts, queryIter, cachedResults, + time.Now().Add(time.Millisecond * 10), emptyLogFields) + if err != nil { + return false, fmt.Errorf("error querying cached block: %v", err) + } } uncachedMap := uncachedResults.Map() @@ -362,17 +368,23 @@ func TestAggregateDocLimits(t *testing.T) { ctx := context.NewBackground() defer ctx.BlockingClose() - exhaustive, err := b.Aggregate( - ctx, - QueryOptions{}, - results, - emptyLogFields) - + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) if err != nil { return false, err } + for !aggIter.Done() { + err = b.AggregateWithIter( + ctx, + aggIter, + QueryOptions{}, + results, + time.Now().Add(time.Millisecond * 10), + emptyLogFields) - require.True(t, exhaustive, errors.New("not exhaustive")) + if err != nil { + return false, err + } + } verifyResults(t, results, testSegment.segmentMap) snap := scope.Snapshot() tallytest.AssertCounterValue(t, testSegment.exCount, snap, diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 8d55cb1429..eb6489956c 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -373,9 +373,7 @@ func TestBlockQueryAfterClose(t *testing.T) { require.Equal(t, start.Add(time.Hour), b.EndTime()) require.NoError(t, b.Close()) - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - - _, err = b.Query(context.NewBackground(), defaultQuery, QueryOptions{}, results, emptyLogFields) + _, err = b.QueryIter(context.NewBackground(), defaultQuery) require.Error(t, err) } @@ -401,7 +399,7 @@ func TestBlockQueryWithCancelledQuery(t *testing.T) { gomock.InOrder( exec.EXPECT().Execute(gomock.Any(), gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), - dIter.EXPECT().Close().Return(nil), + dIter.EXPECT().Done().Return(false), exec.EXPECT().Close().Return(nil), ) @@ -416,7 +414,9 @@ func TestBlockQueryWithCancelledQuery(t *testing.T) { results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - _, err = b.Query(ctx, defaultQuery, QueryOptions{}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, results, time.Now().Add(time.Minute), emptyLogFields) require.Error(t, err) require.Equal(t, stdlibctx.Canceled, err) } @@ -435,9 +435,7 @@ func TestBlockQueryExecutorError(t *testing.T) { return nil, fmt.Errorf("random-err") } - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - - _, err = b.Query(context.NewBackground(), defaultQuery, QueryOptions{}, results, emptyLogFields) + _, err = b.QueryIter(context.NewBackground(), defaultQuery) require.Error(t, err) } @@ -459,9 +457,7 @@ func TestBlockQuerySegmentReaderError(t *testing.T) { randErr := fmt.Errorf("random-err") seg.EXPECT().Reader().Return(nil, randErr) - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - - _, err = b.Query(context.NewBackground(), defaultQuery, QueryOptions{}, results, emptyLogFields) + _, err = b.QueryIter(context.NewBackground(), defaultQuery) require.Equal(t, randErr, err) } @@ -500,9 +496,7 @@ func TestBlockQueryAddResultsSegmentsError(t *testing.T) { randErr := fmt.Errorf("random-err") seg3.EXPECT().Reader().Return(nil, randErr) - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - - _, err = b.Query(context.NewBackground(), defaultQuery, QueryOptions{}, results, emptyLogFields) + _, err = b.QueryIter(context.NewBackground(), defaultQuery) require.Equal(t, randErr, err) } @@ -529,8 +523,7 @@ func TestBlockMockQueryExecutorExecError(t *testing.T) { exec.EXPECT().Close(), ) - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - _, err = b.Query(context.NewBackground(), defaultQuery, QueryOptions{}, results, emptyLogFields) + _, err = b.QueryIter(context.NewBackground(), defaultQuery) require.Error(t, err) } @@ -559,15 +552,17 @@ func TestBlockMockQueryExecutorExecIterErr(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(fmt.Errorf("randomerr")), - dIter.EXPECT().Close(), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close(), ) ctx := context.NewBackground() - _, err = b.Query(ctx, - defaultQuery, QueryOptions{}, - NewQueryResults(nil, QueryResultsOptions{}, testOpts), emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, + NewQueryResults(nil, QueryResultsOptions{}, testOpts), time.Now().Add(time.Minute), emptyLogFields) require.Error(t, err) // NB(r): Make sure to call finalizers blockingly (to finish @@ -600,8 +595,7 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(true), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) limit := 1 @@ -610,9 +604,11 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) { ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{SeriesLimit: limit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) require.Equal(t, 1, results.Map().Len()) d, ok := results.Map().Get(testDoc1().ID) @@ -628,44 +624,6 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) { ctx.BlockingClose() } -func TestBlockMockQueryExecutorExecIterCloseErr(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - testMD := newTestNSMetadata(t) - start := time.Now().Truncate(time.Hour) - blk, err := NewBlock(start, testMD, BlockOptions{}, - namespace.NewRuntimeOptionsManager("foo"), testOpts) - require.NoError(t, err) - - b, ok := blk.(*block) - require.True(t, ok) - - exec := search.NewMockExecutor(ctrl) - b.newExecutorWithRLockFn = func() (search.Executor, error) { - return exec, nil - } - - dIter := doc.NewMockQueryDocIterator(ctrl) - gomock.InOrder( - exec.EXPECT().Execute(gomock.Any(), gomock.Any()).Return(dIter, nil), - dIter.EXPECT().Next().Return(false), - dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(fmt.Errorf("random-err")), - exec.EXPECT().Close().Return(nil), - ) - results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - - ctx := context.NewBackground() - - _, err = b.Query(ctx, defaultQuery, QueryOptions{}, results, emptyLogFields) - require.Error(t, err) - - // NB(r): Make sure to call finalizers blockingly (to finish - // the expected close calls) - ctx.BlockingClose() -} - func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -691,8 +649,7 @@ func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(true), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) limit := 1 @@ -700,9 +657,11 @@ func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) { ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{SeriesLimit: limit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) require.Equal(t, 1, results.Map().Len()) @@ -744,8 +703,7 @@ func TestBlockMockQuerySeriesLimitExhaustive(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) limit := 2 @@ -754,9 +712,11 @@ func TestBlockMockQuerySeriesLimitExhaustive(t *testing.T) { ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{SeriesLimit: limit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) rMap := results.Map() require.Equal(t, 1, rMap.Len()) @@ -797,8 +757,7 @@ func TestBlockMockQueryDocsLimitNonExhaustive(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(true), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) docsLimit := 1 @@ -806,9 +765,11 @@ func TestBlockMockQueryDocsLimitNonExhaustive(t *testing.T) { ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{DocsLimit: docsLimit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{DocsLimit: docsLimit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) require.Equal(t, 1, results.Map().Len()) d, ok := results.Map().Get(testDoc1().ID) @@ -848,8 +809,7 @@ func TestBlockMockQueryDocsLimitExhaustive(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc1())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(false), exec.EXPECT().Close().Return(nil), ) docsLimit := 2 @@ -858,9 +818,11 @@ func TestBlockMockQueryDocsLimitExhaustive(t *testing.T) { ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{DocsLimit: docsLimit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{DocsLimit: docsLimit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) rMap := results.Map() require.Equal(t, 1, rMap.Len()) @@ -908,16 +870,17 @@ func TestBlockMockQueryMergeResultsMapLimit(t *testing.T) { exec.EXPECT().Execute(gomock.Any(), gomock.Any()).Return(dIter, nil), dIter.EXPECT().Next().Return(true), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{SeriesLimit: limit}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) rMap := results.Map() require.Equal(t, 1, rMap.Len()) @@ -966,16 +929,17 @@ func TestBlockMockQueryMergeResultsDupeID(t *testing.T) { dIter.EXPECT().Current().Return(doc.NewDocumentFromMetadata(testDoc2())), dIter.EXPECT().Next().Return(false), dIter.EXPECT().Err().Return(nil), - dIter.EXPECT().Close().Return(nil), - dIter.EXPECT().SearchDuration().Return(time.Second), + dIter.EXPECT().Done().Return(true), exec.EXPECT().Close().Return(nil), ) ctx := context.NewBackground() - exhaustive, err := b.Query(ctx, defaultQuery, QueryOptions{}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, defaultQuery) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, results, time.Now().Add(time.Minute), + emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) rMap := results.Map() require.Equal(t, 2, rMap.Len()) @@ -1443,9 +1407,10 @@ func TestBlockE2EInsertQuery(t *testing.T) { ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err := b.Query(ctx, Query{q}, QueryOptions{}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, Query{q}) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, results, time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) require.Equal(t, 2, results.Size()) rMap := results.Map() @@ -1522,10 +1487,12 @@ func TestBlockE2EInsertQueryLimit(t *testing.T) { limit := 1 results := NewQueryResults(nil, QueryResultsOptions{SizeLimit: limit}, testOpts) - exhaustive, err := b.Query(context.NewBackground(), Query{q}, QueryOptions{SeriesLimit: limit}, results, + ctx := context.NewBackground() + queryIter, err := b.QueryIter(ctx, Query{q}) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{SeriesLimit: limit}, queryIter, results, time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) require.Equal(t, 1, results.Size()) rMap := results.Map() @@ -1613,9 +1580,10 @@ func TestBlockE2EInsertAddResultsQuery(t *testing.T) { ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err := b.Query(ctx, Query{q}, QueryOptions{}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, Query{q}) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, results, time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) require.Equal(t, 2, results.Size()) rMap := results.Map() @@ -1697,9 +1665,10 @@ func TestBlockE2EInsertAddResultsMergeQuery(t *testing.T) { ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err := b.Query(ctx, Query{q}, QueryOptions{}, results, emptyLogFields) + queryIter, err := b.QueryIter(ctx, Query{q}) + require.NoError(t, err) + err = b.QueryWithIter(ctx, QueryOptions{}, queryIter, results, time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) require.Equal(t, 2, results.Size()) rMap := results.Map() @@ -1845,7 +1814,7 @@ func TestBlockAggregateAfterClose(t *testing.T) { require.Equal(t, start.Add(time.Hour), b.EndTime()) require.NoError(t, b.Close()) - _, err = b.Aggregate(context.NewBackground(), QueryOptions{}, &aggregatedResults{}, emptyLogFields) + _, err = b.AggregateIter(context.NewBackground(), AggregateResultsOptions{}) require.Error(t, err) } @@ -1885,16 +1854,19 @@ func TestBlockAggregateIterationErr(t *testing.T) { iter.EXPECT().Current().Return([]byte("f1"), []byte("t1")), iter.EXPECT().Next().Return(false), iter.EXPECT().Err().Return(fmt.Errorf("unknown error")), - iter.EXPECT().Close().Return(nil), ) ctx := context.NewBackground() defer ctx.BlockingClose() - _, err = b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{SeriesLimit: 3}, results, + time.Now().Add(time.Minute), emptyLogFields) require.Error(t, err) } @@ -1963,13 +1935,16 @@ func TestBlockAggregate(t *testing.T) { iter.EXPECT().Err().Return(nil) iter.EXPECT().Close().Return(nil) - exhaustive, err := b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{SeriesLimit: seriesLimit}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ "f1": {"t1", "t2", "t3"}, @@ -2048,17 +2023,19 @@ func TestBlockAggregateWithAggregateLimits(t *testing.T) { curr := []byte(fmt.Sprint(i)) iter.EXPECT().Current().Return([]byte("f1"), curr) } - iter.EXPECT().Close().Return(nil) iter.EXPECT().SearchDuration().Return(time.Second) - exhaustive, err := b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{SeriesLimit: seriesLimit}, results, + time.Now().Add(time.Minute), emptyLogFields) require.Error(t, err) assert.True(t, strings.Contains(err.Error(), "query aborted due to limit")) - require.False(t, exhaustive) sp.Finish() spans := mtr.FinishedSpans() @@ -2130,15 +2107,17 @@ func TestBlockAggregateNotExhaustive(t *testing.T) { iter.EXPECT().Current().Return([]byte("f2"), []byte("t2")), iter.EXPECT().Next().Return(true), iter.EXPECT().Current().Return([]byte("f3"), []byte("f3")), - iter.EXPECT().Close().Return(nil), ) - exhaustive, err := b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{SeriesLimit: 1}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.False(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ "f1": {}, @@ -2224,13 +2203,16 @@ func TestBlockE2EInsertAggregate(t *testing.T) { sp := mtr.StartSpan("root") ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) - exhaustive, err := b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{SeriesLimit: 1000}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ "bar": {"baz", "qux"}, "some": {"more", "other"}, @@ -2241,13 +2223,16 @@ func TestBlockE2EInsertAggregate(t *testing.T) { Type: AggregateTagNamesAndValues, FieldFilter: AggregateFieldFilter{[]byte("bar")}, }, testOpts) - exhaustive, err = b.Aggregate( + aggIter, err = b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, - QueryOptions{SeriesLimit: 10}, + aggIter, + QueryOptions{SeriesLimit: 1000}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ "bar": {"baz", "qux"}, }, results) @@ -2257,13 +2242,16 @@ func TestBlockE2EInsertAggregate(t *testing.T) { Type: AggregateTagNamesAndValues, FieldFilter: AggregateFieldFilter{[]byte("random")}, }, testOpts) - exhaustive, err = b.Aggregate( + aggIter, err = b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, - QueryOptions{SeriesLimit: 100}, + aggIter, + QueryOptions{SeriesLimit: 1000}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{}, results) sp.Finish() @@ -2556,13 +2544,16 @@ func TestBlockAggregateBatching(t *testing.T) { ctx := context.NewBackground() defer ctx.BlockingClose() - exhaustive, err := b.Aggregate( + aggIter, err := b.AggregateIter(ctx, results.AggregateResultsOptions()) + require.NoError(t, err) + err = b.AggregateWithIter( ctx, + aggIter, QueryOptions{}, results, + time.Now().Add(time.Minute), emptyLogFields) require.NoError(t, err) - require.True(t, exhaustive) snap := scope.Snapshot() tallytest.AssertCounterValue(t, tt.expectedDocsMatched, snap, diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index c503f85053..fa10affc80 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -112,44 +112,6 @@ func (mr *MockBaseResultsMockRecorder) TotalDocsCount() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockBaseResults)(nil).TotalDocsCount)) } -// TotalDuration mocks base method -func (m *MockBaseResults) TotalDuration() ResultDurations { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TotalDuration") - ret0, _ := ret[0].(ResultDurations) - return ret0 -} - -// TotalDuration indicates an expected call of TotalDuration -func (mr *MockBaseResultsMockRecorder) TotalDuration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDuration", reflect.TypeOf((*MockBaseResults)(nil).TotalDuration)) -} - -// AddBlockProcessingDuration mocks base method -func (m *MockBaseResults) AddBlockProcessingDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockProcessingDuration", duration) -} - -// AddBlockProcessingDuration indicates an expected call of AddBlockProcessingDuration -func (mr *MockBaseResultsMockRecorder) AddBlockProcessingDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockProcessingDuration", reflect.TypeOf((*MockBaseResults)(nil).AddBlockProcessingDuration), duration) -} - -// AddBlockSearchDuration mocks base method -func (m *MockBaseResults) AddBlockSearchDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockSearchDuration", duration) -} - -// AddBlockSearchDuration indicates an expected call of AddBlockSearchDuration -func (mr *MockBaseResultsMockRecorder) AddBlockSearchDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockSearchDuration", reflect.TypeOf((*MockBaseResults)(nil).AddBlockSearchDuration), duration) -} - // EnforceLimits mocks base method func (m *MockBaseResults) EnforceLimits() bool { m.ctrl.T.Helper() @@ -241,44 +203,6 @@ func (mr *MockDocumentResultsMockRecorder) TotalDocsCount() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockDocumentResults)(nil).TotalDocsCount)) } -// TotalDuration mocks base method -func (m *MockDocumentResults) TotalDuration() ResultDurations { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TotalDuration") - ret0, _ := ret[0].(ResultDurations) - return ret0 -} - -// TotalDuration indicates an expected call of TotalDuration -func (mr *MockDocumentResultsMockRecorder) TotalDuration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDuration", reflect.TypeOf((*MockDocumentResults)(nil).TotalDuration)) -} - -// AddBlockProcessingDuration mocks base method -func (m *MockDocumentResults) AddBlockProcessingDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockProcessingDuration", duration) -} - -// AddBlockProcessingDuration indicates an expected call of AddBlockProcessingDuration -func (mr *MockDocumentResultsMockRecorder) AddBlockProcessingDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockProcessingDuration", reflect.TypeOf((*MockDocumentResults)(nil).AddBlockProcessingDuration), duration) -} - -// AddBlockSearchDuration mocks base method -func (m *MockDocumentResults) AddBlockSearchDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockSearchDuration", duration) -} - -// AddBlockSearchDuration indicates an expected call of AddBlockSearchDuration -func (mr *MockDocumentResultsMockRecorder) AddBlockSearchDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockSearchDuration", reflect.TypeOf((*MockDocumentResults)(nil).AddBlockSearchDuration), duration) -} - // EnforceLimits mocks base method func (m *MockDocumentResults) EnforceLimits() bool { m.ctrl.T.Helper() @@ -386,44 +310,6 @@ func (mr *MockQueryResultsMockRecorder) TotalDocsCount() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockQueryResults)(nil).TotalDocsCount)) } -// TotalDuration mocks base method -func (m *MockQueryResults) TotalDuration() ResultDurations { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TotalDuration") - ret0, _ := ret[0].(ResultDurations) - return ret0 -} - -// TotalDuration indicates an expected call of TotalDuration -func (mr *MockQueryResultsMockRecorder) TotalDuration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDuration", reflect.TypeOf((*MockQueryResults)(nil).TotalDuration)) -} - -// AddBlockProcessingDuration mocks base method -func (m *MockQueryResults) AddBlockProcessingDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockProcessingDuration", duration) -} - -// AddBlockProcessingDuration indicates an expected call of AddBlockProcessingDuration -func (mr *MockQueryResultsMockRecorder) AddBlockProcessingDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockProcessingDuration", reflect.TypeOf((*MockQueryResults)(nil).AddBlockProcessingDuration), duration) -} - -// AddBlockSearchDuration mocks base method -func (m *MockQueryResults) AddBlockSearchDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockSearchDuration", duration) -} - -// AddBlockSearchDuration indicates an expected call of AddBlockSearchDuration -func (mr *MockQueryResultsMockRecorder) AddBlockSearchDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockSearchDuration", reflect.TypeOf((*MockQueryResults)(nil).AddBlockSearchDuration), duration) -} - // EnforceLimits mocks base method func (m *MockQueryResults) EnforceLimits() bool { m.ctrl.T.Helper() @@ -618,44 +504,6 @@ func (mr *MockAggregateResultsMockRecorder) TotalDocsCount() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockAggregateResults)(nil).TotalDocsCount)) } -// TotalDuration mocks base method -func (m *MockAggregateResults) TotalDuration() ResultDurations { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TotalDuration") - ret0, _ := ret[0].(ResultDurations) - return ret0 -} - -// TotalDuration indicates an expected call of TotalDuration -func (mr *MockAggregateResultsMockRecorder) TotalDuration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDuration", reflect.TypeOf((*MockAggregateResults)(nil).TotalDuration)) -} - -// AddBlockProcessingDuration mocks base method -func (m *MockAggregateResults) AddBlockProcessingDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockProcessingDuration", duration) -} - -// AddBlockProcessingDuration indicates an expected call of AddBlockProcessingDuration -func (mr *MockAggregateResultsMockRecorder) AddBlockProcessingDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockProcessingDuration", reflect.TypeOf((*MockAggregateResults)(nil).AddBlockProcessingDuration), duration) -} - -// AddBlockSearchDuration mocks base method -func (m *MockAggregateResults) AddBlockSearchDuration(duration time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddBlockSearchDuration", duration) -} - -// AddBlockSearchDuration indicates an expected call of AddBlockSearchDuration -func (mr *MockAggregateResultsMockRecorder) AddBlockSearchDuration(duration interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBlockSearchDuration", reflect.TypeOf((*MockAggregateResults)(nil).AddBlockSearchDuration), duration) -} - // EnforceLimits mocks base method func (m *MockAggregateResults) EnforceLimits() bool { m.ctrl.T.Helper() @@ -1081,40 +929,25 @@ func (mr *MockBlockMockRecorder) WriteBatch(inserts interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteBatch", reflect.TypeOf((*MockBlock)(nil).WriteBatch), inserts) } -// Query mocks base method -func (m *MockBlock) Query(ctx context.Context, query Query, opts QueryOptions, results DocumentResults, logFields []log.Field) (bool, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Query", ctx, query, opts, results, logFields) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Query indicates an expected call of Query -func (mr *MockBlockMockRecorder) Query(ctx, query, opts, results, logFields interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Query", reflect.TypeOf((*MockBlock)(nil).Query), ctx, query, opts, results, logFields) -} - // QueryWithIter mocks base method -func (m *MockBlock) QueryWithIter(ctx context.Context, opts QueryOptions, docIter doc.Iterator, results DocumentResults, limit int) error { +func (m *MockBlock) QueryWithIter(ctx context.Context, opts QueryOptions, iter QueryIterator, results DocumentResults, deadline time.Time, logFields []log.Field) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "QueryWithIter", ctx, opts, docIter, results, limit) + ret := m.ctrl.Call(m, "QueryWithIter", ctx, opts, iter, results, deadline, logFields) ret0, _ := ret[0].(error) return ret0 } // QueryWithIter indicates an expected call of QueryWithIter -func (mr *MockBlockMockRecorder) QueryWithIter(ctx, opts, docIter, results, limit interface{}) *gomock.Call { +func (mr *MockBlockMockRecorder) QueryWithIter(ctx, opts, iter, results, deadline, logFields interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryWithIter", reflect.TypeOf((*MockBlock)(nil).QueryWithIter), ctx, opts, docIter, results, limit) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryWithIter", reflect.TypeOf((*MockBlock)(nil).QueryWithIter), ctx, opts, iter, results, deadline, logFields) } // QueryIter mocks base method -func (m *MockBlock) QueryIter(ctx context.Context, query Query) (doc.QueryDocIterator, error) { +func (m *MockBlock) QueryIter(ctx context.Context, query Query) (QueryIterator, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "QueryIter", ctx, query) - ret0, _ := ret[0].(doc.QueryDocIterator) + ret0, _ := ret[0].(QueryIterator) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -1125,33 +958,18 @@ func (mr *MockBlockMockRecorder) QueryIter(ctx, query interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryIter", reflect.TypeOf((*MockBlock)(nil).QueryIter), ctx, query) } -// Aggregate mocks base method -func (m *MockBlock) Aggregate(ctx context.Context, opts QueryOptions, results AggregateResults, logFields []log.Field) (bool, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Aggregate", ctx, opts, results, logFields) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Aggregate indicates an expected call of Aggregate -func (mr *MockBlockMockRecorder) Aggregate(ctx, opts, results, logFields interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Aggregate", reflect.TypeOf((*MockBlock)(nil).Aggregate), ctx, opts, results, logFields) -} - // AggregateWithIter mocks base method -func (m *MockBlock) AggregateWithIter(ctx context.Context, iter AggregateIterator, opts QueryOptions, results AggregateResults, limit int) error { +func (m *MockBlock) AggregateWithIter(ctx context.Context, iter AggregateIterator, opts QueryOptions, results AggregateResults, deadline time.Time, logFields []log.Field) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AggregateWithIter", ctx, iter, opts, results, limit) + ret := m.ctrl.Call(m, "AggregateWithIter", ctx, iter, opts, results, deadline, logFields) ret0, _ := ret[0].(error) return ret0 } // AggregateWithIter indicates an expected call of AggregateWithIter -func (mr *MockBlockMockRecorder) AggregateWithIter(ctx, iter, opts, results, limit interface{}) *gomock.Call { +func (mr *MockBlockMockRecorder) AggregateWithIter(ctx, iter, opts, results, deadline, logFields interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateWithIter", reflect.TypeOf((*MockBlock)(nil).AggregateWithIter), ctx, iter, opts, results, limit) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateWithIter", reflect.TypeOf((*MockBlock)(nil).AggregateWithIter), ctx, iter, opts, results, deadline, logFields) } // AggregateIter mocks base method @@ -1384,6 +1202,152 @@ func (mr *MockBlockStatsReporterMockRecorder) ReportIndexingStats(stats interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportIndexingStats", reflect.TypeOf((*MockBlockStatsReporter)(nil).ReportIndexingStats), stats) } +// MockQueryIterator is a mock of QueryIterator interface +type MockQueryIterator struct { + ctrl *gomock.Controller + recorder *MockQueryIteratorMockRecorder +} + +// MockQueryIteratorMockRecorder is the mock recorder for MockQueryIterator +type MockQueryIteratorMockRecorder struct { + mock *MockQueryIterator +} + +// NewMockQueryIterator creates a new mock instance +func NewMockQueryIterator(ctrl *gomock.Controller) *MockQueryIterator { + mock := &MockQueryIterator{ctrl: ctrl} + mock.recorder = &MockQueryIteratorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockQueryIterator) EXPECT() *MockQueryIteratorMockRecorder { + return m.recorder +} + +// Done mocks base method +func (m *MockQueryIterator) Done() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Done") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Done indicates an expected call of Done +func (mr *MockQueryIteratorMockRecorder) Done() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockQueryIterator)(nil).Done)) +} + +// Next mocks base method +func (m *MockQueryIterator) Next(ctx context.Context) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next", ctx) + ret0, _ := ret[0].(bool) + return ret0 +} + +// Next indicates an expected call of Next +func (mr *MockQueryIteratorMockRecorder) Next(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockQueryIterator)(nil).Next), ctx) +} + +// Err mocks base method +func (m *MockQueryIterator) Err() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Err") + ret0, _ := ret[0].(error) + return ret0 +} + +// Err indicates an expected call of Err +func (mr *MockQueryIteratorMockRecorder) Err() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockQueryIterator)(nil).Err)) +} + +// SearchDuration mocks base method +func (m *MockQueryIterator) SearchDuration() time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SearchDuration") + ret0, _ := ret[0].(time.Duration) + return ret0 +} + +// SearchDuration indicates an expected call of SearchDuration +func (mr *MockQueryIteratorMockRecorder) SearchDuration() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchDuration", reflect.TypeOf((*MockQueryIterator)(nil).SearchDuration)) +} + +// Close mocks base method +func (m *MockQueryIterator) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockQueryIteratorMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockQueryIterator)(nil).Close)) +} + +// AddSeries mocks base method +func (m *MockQueryIterator) AddSeries(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddSeries", count) +} + +// AddSeries indicates an expected call of AddSeries +func (mr *MockQueryIteratorMockRecorder) AddSeries(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSeries", reflect.TypeOf((*MockQueryIterator)(nil).AddSeries), count) +} + +// AddDocs mocks base method +func (m *MockQueryIterator) AddDocs(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddDocs", count) +} + +// AddDocs indicates an expected call of AddDocs +func (mr *MockQueryIteratorMockRecorder) AddDocs(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocs", reflect.TypeOf((*MockQueryIterator)(nil).AddDocs), count) +} + +// Counts mocks base method +func (m *MockQueryIterator) Counts() (int, int) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Counts") + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(int) + return ret0, ret1 +} + +// Counts indicates an expected call of Counts +func (mr *MockQueryIteratorMockRecorder) Counts() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Counts", reflect.TypeOf((*MockQueryIterator)(nil).Counts)) +} + +// Current mocks base method +func (m *MockQueryIterator) Current() doc.Document { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Current") + ret0, _ := ret[0].(doc.Document) + return ret0 +} + +// Current indicates an expected call of Current +func (mr *MockQueryIteratorMockRecorder) Current() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Current", reflect.TypeOf((*MockQueryIterator)(nil).Current)) +} + // MockAggregateIterator is a mock of AggregateIterator interface type MockAggregateIterator struct { ctrl *gomock.Controller @@ -1407,32 +1371,32 @@ func (m *MockAggregateIterator) EXPECT() *MockAggregateIteratorMockRecorder { return m.recorder } -// Next mocks base method -func (m *MockAggregateIterator) Next(ctx context.Context) bool { +// Done mocks base method +func (m *MockAggregateIterator) Done() bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Next", ctx) + ret := m.ctrl.Call(m, "Done") ret0, _ := ret[0].(bool) return ret0 } -// Next indicates an expected call of Next -func (mr *MockAggregateIteratorMockRecorder) Next(ctx interface{}) *gomock.Call { +// Done indicates an expected call of Done +func (mr *MockAggregateIteratorMockRecorder) Done() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockAggregateIterator)(nil).Next), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockAggregateIterator)(nil).Done)) } -// Done mocks base method -func (m *MockAggregateIterator) Done() bool { +// Next mocks base method +func (m *MockAggregateIterator) Next(ctx context.Context) bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Done") + ret := m.ctrl.Call(m, "Next", ctx) ret0, _ := ret[0].(bool) return ret0 } -// Done indicates an expected call of Done -func (mr *MockAggregateIteratorMockRecorder) Done() *gomock.Call { +// Next indicates an expected call of Next +func (mr *MockAggregateIteratorMockRecorder) Next(ctx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockAggregateIterator)(nil).Done)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockAggregateIterator)(nil).Next), ctx) } // Err mocks base method @@ -1449,6 +1413,73 @@ func (mr *MockAggregateIteratorMockRecorder) Err() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockAggregateIterator)(nil).Err)) } +// SearchDuration mocks base method +func (m *MockAggregateIterator) SearchDuration() time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SearchDuration") + ret0, _ := ret[0].(time.Duration) + return ret0 +} + +// SearchDuration indicates an expected call of SearchDuration +func (mr *MockAggregateIteratorMockRecorder) SearchDuration() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchDuration", reflect.TypeOf((*MockAggregateIterator)(nil).SearchDuration)) +} + +// Close mocks base method +func (m *MockAggregateIterator) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockAggregateIteratorMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockAggregateIterator)(nil).Close)) +} + +// AddSeries mocks base method +func (m *MockAggregateIterator) AddSeries(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddSeries", count) +} + +// AddSeries indicates an expected call of AddSeries +func (mr *MockAggregateIteratorMockRecorder) AddSeries(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSeries", reflect.TypeOf((*MockAggregateIterator)(nil).AddSeries), count) +} + +// AddDocs mocks base method +func (m *MockAggregateIterator) AddDocs(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddDocs", count) +} + +// AddDocs indicates an expected call of AddDocs +func (mr *MockAggregateIteratorMockRecorder) AddDocs(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocs", reflect.TypeOf((*MockAggregateIterator)(nil).AddDocs), count) +} + +// Counts mocks base method +func (m *MockAggregateIterator) Counts() (int, int) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Counts") + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(int) + return ret0, ret1 +} + +// Counts indicates an expected call of Counts +func (mr *MockAggregateIteratorMockRecorder) Counts() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Counts", reflect.TypeOf((*MockAggregateIterator)(nil).Counts)) +} + // Current mocks base method func (m *MockAggregateIterator) Current() ([]byte, []byte) { m.ctrl.T.Helper() @@ -1464,22 +1495,87 @@ func (mr *MockAggregateIteratorMockRecorder) Current() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Current", reflect.TypeOf((*MockAggregateIterator)(nil).Current)) } -// Close mocks base method -func (m *MockAggregateIterator) Close() error { +// fieldsAndTermsIteratorOpts mocks base method +func (m *MockAggregateIterator) fieldsAndTermsIteratorOpts() fieldsAndTermsIteratorOpts { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") + ret := m.ctrl.Call(m, "fieldsAndTermsIteratorOpts") + ret0, _ := ret[0].(fieldsAndTermsIteratorOpts) + return ret0 +} + +// fieldsAndTermsIteratorOpts indicates an expected call of fieldsAndTermsIteratorOpts +func (mr *MockAggregateIteratorMockRecorder) fieldsAndTermsIteratorOpts() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "fieldsAndTermsIteratorOpts", reflect.TypeOf((*MockAggregateIterator)(nil).fieldsAndTermsIteratorOpts)) +} + +// MockResultIterator is a mock of ResultIterator interface +type MockResultIterator struct { + ctrl *gomock.Controller + recorder *MockResultIteratorMockRecorder +} + +// MockResultIteratorMockRecorder is the mock recorder for MockResultIterator +type MockResultIteratorMockRecorder struct { + mock *MockResultIterator +} + +// NewMockResultIterator creates a new mock instance +func NewMockResultIterator(ctrl *gomock.Controller) *MockResultIterator { + mock := &MockResultIterator{ctrl: ctrl} + mock.recorder = &MockResultIteratorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockResultIterator) EXPECT() *MockResultIteratorMockRecorder { + return m.recorder +} + +// Done mocks base method +func (m *MockResultIterator) Done() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Done") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Done indicates an expected call of Done +func (mr *MockResultIteratorMockRecorder) Done() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockResultIterator)(nil).Done)) +} + +// Next mocks base method +func (m *MockResultIterator) Next(ctx context.Context) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next", ctx) + ret0, _ := ret[0].(bool) + return ret0 +} + +// Next indicates an expected call of Next +func (mr *MockResultIteratorMockRecorder) Next(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockResultIterator)(nil).Next), ctx) +} + +// Err mocks base method +func (m *MockResultIterator) Err() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Err") ret0, _ := ret[0].(error) return ret0 } -// Close indicates an expected call of Close -func (mr *MockAggregateIteratorMockRecorder) Close() *gomock.Call { +// Err indicates an expected call of Err +func (mr *MockResultIteratorMockRecorder) Err() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockAggregateIterator)(nil).Close)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockResultIterator)(nil).Err)) } // SearchDuration mocks base method -func (m *MockAggregateIterator) SearchDuration() time.Duration { +func (m *MockResultIterator) SearchDuration() time.Duration { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SearchDuration") ret0, _ := ret[0].(time.Duration) @@ -1487,23 +1583,62 @@ func (m *MockAggregateIterator) SearchDuration() time.Duration { } // SearchDuration indicates an expected call of SearchDuration -func (mr *MockAggregateIteratorMockRecorder) SearchDuration() *gomock.Call { +func (mr *MockResultIteratorMockRecorder) SearchDuration() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchDuration", reflect.TypeOf((*MockAggregateIterator)(nil).SearchDuration)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchDuration", reflect.TypeOf((*MockResultIterator)(nil).SearchDuration)) } -// fieldsAndTermsIteratorOpts mocks base method -func (m *MockAggregateIterator) fieldsAndTermsIteratorOpts() fieldsAndTermsIteratorOpts { +// Close mocks base method +func (m *MockResultIterator) Close() error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "fieldsAndTermsIteratorOpts") - ret0, _ := ret[0].(fieldsAndTermsIteratorOpts) + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) return ret0 } -// fieldsAndTermsIteratorOpts indicates an expected call of fieldsAndTermsIteratorOpts -func (mr *MockAggregateIteratorMockRecorder) fieldsAndTermsIteratorOpts() *gomock.Call { +// Close indicates an expected call of Close +func (mr *MockResultIteratorMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "fieldsAndTermsIteratorOpts", reflect.TypeOf((*MockAggregateIterator)(nil).fieldsAndTermsIteratorOpts)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockResultIterator)(nil).Close)) +} + +// AddSeries mocks base method +func (m *MockResultIterator) AddSeries(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddSeries", count) +} + +// AddSeries indicates an expected call of AddSeries +func (mr *MockResultIteratorMockRecorder) AddSeries(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSeries", reflect.TypeOf((*MockResultIterator)(nil).AddSeries), count) +} + +// AddDocs mocks base method +func (m *MockResultIterator) AddDocs(count int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddDocs", count) +} + +// AddDocs indicates an expected call of AddDocs +func (mr *MockResultIteratorMockRecorder) AddDocs(count interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocs", reflect.TypeOf((*MockResultIterator)(nil).AddDocs), count) +} + +// Counts mocks base method +func (m *MockResultIterator) Counts() (int, int) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Counts") + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(int) + return ret0, ret1 +} + +// Counts indicates an expected call of Counts +func (mr *MockResultIteratorMockRecorder) Counts() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Counts", reflect.TypeOf((*MockResultIterator)(nil).Counts)) } // MockfieldsAndTermsIterator is a mock of fieldsAndTermsIterator interface @@ -2252,3 +2387,31 @@ func (mr *MockOptionsMockRecorder) QueryLimits() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryLimits", reflect.TypeOf((*MockOptions)(nil).QueryLimits)) } + +// MaxWorkerTime mocks base method +func (m *MockOptions) MaxWorkerTime() time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MaxWorkerTime") + ret0, _ := ret[0].(time.Duration) + return ret0 +} + +// MaxWorkerTime indicates an expected call of MaxWorkerTime +func (mr *MockOptionsMockRecorder) MaxWorkerTime() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxWorkerTime", reflect.TypeOf((*MockOptions)(nil).MaxWorkerTime)) +} + +// SetMaxWorkerTime mocks base method +func (m *MockOptions) SetMaxWorkerTime(value time.Duration) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetMaxWorkerTime", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetMaxWorkerTime indicates an expected call of SetMaxWorkerTime +func (mr *MockOptionsMockRecorder) SetMaxWorkerTime(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxWorkerTime", reflect.TypeOf((*MockOptions)(nil).SetMaxWorkerTime), value) +} diff --git a/src/dbnode/storage/index/options.go b/src/dbnode/storage/index/options.go index 055ab95cac..3e4328a473 100644 --- a/src/dbnode/storage/index/options.go +++ b/src/dbnode/storage/index/options.go @@ -22,6 +22,7 @@ package index import ( "errors" + "time" "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/dbnode/storage/limits" @@ -81,6 +82,8 @@ var ( defaultForegroundCompactionOpts compaction.PlannerOptions defaultBackgroundCompactionOpts compaction.PlannerOptions + // defaultMaxWorkerTime sets the default time a query can hold an index worker. + defaultMaxWorkerTime = time.Second ) func init() { @@ -135,6 +138,7 @@ type opts struct { readThroughSegmentOptions ReadThroughSegmentOptions mmapReporter mmap.Reporter queryLimits limits.QueryLimits + maxWorkerTime time.Duration } var undefinedUUIDFn = func() ([]byte, error) { return nil, errIDGenerationDisabled } @@ -195,6 +199,7 @@ func NewOptions() Options { foregroundCompactionPlannerOpts: defaultForegroundCompactionOpts, backgroundCompactionPlannerOpts: defaultBackgroundCompactionOpts, queryLimits: limits.NoOpQueryLimits(), + maxWorkerTime: defaultMaxWorkerTime, } resultsPool.Init(func() QueryResults { return NewQueryResults(nil, QueryResultsOptions{}, opts) @@ -460,3 +465,13 @@ func (o *opts) SetQueryLimits(value limits.QueryLimits) Options { func (o *opts) QueryLimits() limits.QueryLimits { return o.queryLimits } + +func (o *opts) MaxWorkerTime() time.Duration { + return o.maxWorkerTime +} + +func (o *opts) SetMaxWorkerTime(value time.Duration) Options { + opts := *o + opts.maxWorkerTime = value + return &opts +} diff --git a/src/dbnode/storage/index/query_iter.go b/src/dbnode/storage/index/query_iter.go new file mode 100644 index 0000000000..97e3d8f703 --- /dev/null +++ b/src/dbnode/storage/index/query_iter.go @@ -0,0 +1,81 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "time" + + "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/x/context" +) + +type queryIter struct { + // immutable state + docIter doc.QueryDocIterator + + // mutable state + seriesCount, docCount int +} + +var _ QueryIterator = &queryIter{} + +// NewQueryIter wraps the provided QueryDocIterator as a QueryIterator +func NewQueryIter(docIter doc.QueryDocIterator) QueryIterator { + return &queryIter{ + docIter: docIter, + } +} + +func (q *queryIter) Done() bool { + return q.docIter.Done() +} + +func (q *queryIter) Next(_ context.Context) bool { + return q.docIter.Next() +} + +func (q *queryIter) Err() error { + return q.docIter.Err() +} + +func (q *queryIter) SearchDuration() time.Duration { + return q.docIter.SearchDuration() +} + +func (q *queryIter) Close() error { + return q.docIter.Close() +} + +func (q *queryIter) AddSeries(count int) { + q.seriesCount += count +} + +func (q *queryIter) AddDocs(count int) { + q.docCount += count +} + +func (q *queryIter) Counts() (series, docs int) { + return q.seriesCount, q.docCount +} + +func (q *queryIter) Current() doc.Document { + return q.docIter.Current() +} diff --git a/src/dbnode/storage/index/query_options.go b/src/dbnode/storage/index/query_options.go index ff7ffb5b4a..eccbc08c1d 100644 --- a/src/dbnode/storage/index/query_options.go +++ b/src/dbnode/storage/index/query_options.go @@ -43,7 +43,8 @@ func (o QueryOptions) LimitsExceeded(seriesCount, docsCount int) bool { return o.SeriesLimitExceeded(seriesCount) || o.DocsLimitExceeded(docsCount) } -func (o QueryOptions) exhaustive(seriesCount, docsCount int) bool { +// Exhaustive returns true if the provided counts did not exceeded the query limits. +func (o QueryOptions) Exhaustive(seriesCount, docsCount int) bool { return !o.SeriesLimitExceeded(seriesCount) && !o.DocsLimitExceeded(docsCount) } diff --git a/src/dbnode/storage/index/query_options_test.go b/src/dbnode/storage/index/query_options_test.go index 01add7e973..4706f297b7 100644 --- a/src/dbnode/storage/index/query_options_test.go +++ b/src/dbnode/storage/index/query_options_test.go @@ -45,9 +45,9 @@ func TestQueryOptions(t *testing.T) { assert.True(t, opts.LimitsExceeded(20, 9)) assert.False(t, opts.LimitsExceeded(19, 9)) - assert.False(t, opts.exhaustive(19, 10)) - assert.False(t, opts.exhaustive(20, 9)) - assert.True(t, opts.exhaustive(19, 9)) + assert.False(t, opts.Exhaustive(19, 10)) + assert.False(t, opts.Exhaustive(20, 9)) + assert.True(t, opts.Exhaustive(19, 9)) } func TestInvalidWideQueryOptions(t *testing.T) { diff --git a/src/dbnode/storage/index/results.go b/src/dbnode/storage/index/results.go index 385fa92bda..58a30a0f6c 100644 --- a/src/dbnode/storage/index/results.go +++ b/src/dbnode/storage/index/results.go @@ -23,7 +23,6 @@ package index import ( "errors" "sync" - "time" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" @@ -52,8 +51,7 @@ type results struct { idPool ident.Pool bytesPool pool.CheckedBytesPool - pool QueryResultsPool - resultDuration ResultDurations + pool QueryResultsPool } // NewQueryResults returns a new query results object. @@ -73,24 +71,6 @@ func NewQueryResults( } } -func (r *results) TotalDuration() ResultDurations { - r.RLock() - defer r.RUnlock() - return r.resultDuration -} - -func (r *results) AddBlockProcessingDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddProcessing(duration) -} - -func (r *results) AddBlockSearchDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddSearch(duration) -} - func (r *results) EnforceLimits() bool { return true } func (r *results) Reset(nsID ident.ID, opts QueryResultsOptions) { @@ -110,8 +90,6 @@ func (r *results) Reset(nsID ident.ID, opts QueryResultsOptions) { r.resultsMap.Reset() r.totalDocsCount = 0 - r.resultDuration = ResultDurations{} - r.opts = opts r.Unlock() diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 56ef408915..5e30c4f72b 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -154,15 +154,6 @@ type BaseResults interface { // TotalDocsCount returns the total number of documents observed. TotalDocsCount() int - // TotalDuration is the total ResultDurations for the query. - TotalDuration() ResultDurations - - // AddBlockProcessingDuration adds the processing duration for a single block to the TotalDuration. - AddBlockProcessingDuration(duration time.Duration) - - // AddBlockSearchDuration adds the search duration for a single block to the TotalDuration. - AddBlockSearchDuration(duration time.Duration) - // EnforceLimits returns whether this should enforce and increment limits. EnforceLimits() bool @@ -410,36 +401,18 @@ type Block interface { // WriteBatch writes a batch of provided entries. WriteBatch(inserts *WriteBatch) (WriteBatchResult, error) - // Query resolves the given query into known IDs. - Query( - ctx context.Context, - query Query, - opts QueryOptions, - results DocumentResults, - logFields []opentracinglog.Field, - ) (bool, error) - // QueryWithIter processes n docs from the iterator into known IDs. QueryWithIter( ctx context.Context, opts QueryOptions, - docIter doc.Iterator, + iter QueryIterator, results DocumentResults, - limit int, + deadline time.Time, + logFields []opentracinglog.Field, ) error - // QueryIter returns a new QueryDocIterator for the query. - QueryIter(ctx context.Context, query Query) (doc.QueryDocIterator, error) - - // Aggregate aggregates known tag names/values. - // NB(prateek): different from aggregating by means of Query, as we can - // avoid going to documents, relying purely on the indexed FSTs. - Aggregate( - ctx context.Context, - opts QueryOptions, - results AggregateResults, - logFields []opentracinglog.Field, - ) (bool, error) + // QueryIter returns a new QueryIterator for the query. + QueryIter(ctx context.Context, query Query) (QueryIterator, error) // AggregateWithIter aggregates N known tag names/values from the iterator. AggregateWithIter( @@ -447,7 +420,8 @@ type Block interface { iter AggregateIterator, opts QueryOptions, results AggregateResults, - limit int, + deadline time.Time, + logFields []opentracinglog.Field, ) error // AggregateIter returns a new AggregatorIterator. @@ -922,31 +896,49 @@ func (e WriteBatchEntry) Result() WriteBatchEntryResult { return *e.result } +// QueryIterator iterates through the documents for a block. +type QueryIterator interface { + ResultIterator + + // Current returns the current (field, term). + Current() doc.Document +} + // AggregateIterator iterates through the (field,term)s for a block. type AggregateIterator interface { + ResultIterator + + // Current returns the current (field, term). + Current() (field, term []byte) + + fieldsAndTermsIteratorOpts() fieldsAndTermsIteratorOpts +} + +// ResultIterator is a common interface for query and aggregate result iterators. +type ResultIterator interface { + // Done returns true if there are no more elements in the iterator. Allows checking if the query should acquire + // a permit, which might block, before calling Next(). + Done() bool + // Next processes the next (field,term) available with Current. Returns true if there are more to process. // Callers need to check Err after this returns false to check if an error occurred while iterating. Next(ctx context.Context) bool - // Done returns true if the iterator is exhausted. This non-standard iterating method allows any index query to - // check if there is more work to be done before waiting for a worker from the pool. - // If this method returns true, Next is guaranteed to return false. However, on the first iteration this will always - // return false and Next may return false for an empty iterator. - Done() bool - // Err returns an non-nil error if an error occurred calling Next. Err() error - // Current returns the current (field, term). - Current() (field, term []byte) + // SearchDuration is how long it took search the FSTs for the results returned by the iterator. + SearchDuration() time.Duration - // Close the iterator and underlying resources. + // Close the iterator. Close() error - // SearchDuration is how long it took to search the segments in the block. - SearchDuration() time.Duration + AddSeries(count int) - fieldsAndTermsIteratorOpts() fieldsAndTermsIteratorOpts + AddDocs(count int) + + // Counts returns the number of series and documents processed by the iterator. + Counts() (series, docs int) } // fieldsAndTermsIterator iterates over all known fields and terms for a segment. @@ -1105,4 +1097,10 @@ type Options interface { // QueryLimits returns the current query limits. QueryLimits() limits.QueryLimits + + // MaxWorkerTime returns the max time a query can hold an index worker. + MaxWorkerTime() time.Duration + + // SetMaxWorkerTime sets MaxWorkerTime. + SetMaxWorkerTime(value time.Duration) Options } diff --git a/src/dbnode/storage/index/wide_query_results.go b/src/dbnode/storage/index/wide_query_results.go index f209184034..cb5ad85312 100644 --- a/src/dbnode/storage/index/wide_query_results.go +++ b/src/dbnode/storage/index/wide_query_results.go @@ -24,7 +24,6 @@ import ( "errors" "fmt" "sync" - "time" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" @@ -62,8 +61,7 @@ type wideResults struct { // document is discovered whose shard exceeds the last shard this results // is responsible for, using the fact that incoming documents are sorted by // shard then by ID. - pastLastShard bool - resultDuration ResultDurations + pastLastShard bool } // NewWideQueryResults returns a new wide query results object. @@ -94,24 +92,6 @@ func NewWideQueryResults( return results } -func (r *wideResults) TotalDuration() ResultDurations { - r.RLock() - defer r.RUnlock() - return r.resultDuration -} - -func (r *wideResults) AddBlockProcessingDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddProcessing(duration) -} - -func (r *wideResults) AddBlockSearchDuration(duration time.Duration) { - r.Lock() - defer r.Unlock() - r.resultDuration = r.resultDuration.AddSearch(duration) -} - func (r *wideResults) EnforceLimits() bool { // NB: wide results should not enforce limits, as they may span an entire // block in a memory constrained batch-wise fashion. diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index 31ffdc82d1..71f5fb8284 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -45,6 +45,7 @@ import ( "github.com/golang/mock/gomock" opentracing "github.com/opentracing/opentracing-go" + opentracinglog "github.com/opentracing/opentracing-go/log" "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/require" ) @@ -642,7 +643,12 @@ func TestNamespaceIndexBlockQuery(t *testing.T) { sp := mtr.StartSpan("root") ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) - b0.EXPECT().Query(gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + mockIter0 := index.NewMockQueryIterator(ctrl) + b0.EXPECT().QueryIter(gomock.Any(), q).Return(mockIter0, nil) + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Minute) + mockIter0.EXPECT().Close().Return(nil) + result, err := idx.Query(ctx, q, qOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -653,8 +659,17 @@ func TestNamespaceIndexBlockQuery(t *testing.T) { EndExclusive: t2.Add(time.Minute), RequireExhaustive: test.requireExhaustive, } - b0.EXPECT().Query(gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) - b1.EXPECT().Query(gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + b0.EXPECT().QueryIter(gomock.Any(), q).Return(mockIter0, nil) + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Minute) + mockIter0.EXPECT().Close().Return(nil) + + mockIter1 := index.NewMockQueryIterator(ctrl) + b1.EXPECT().QueryIter(gomock.Any(), q).Return(mockIter1, nil) + mockIter1.EXPECT().Done().Return(true) + mockIter1.EXPECT().SearchDuration().Return(time.Minute) + mockIter1.EXPECT().Close().Return(nil) + result, err = idx.Query(ctx, q, qOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -664,8 +679,32 @@ func TestNamespaceIndexBlockQuery(t *testing.T) { StartInclusive: t0, EndExclusive: t0.Add(time.Minute), RequireExhaustive: test.requireExhaustive, + SeriesLimit: 1, } - b0.EXPECT().Query(gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(false, nil) + b0.EXPECT().QueryIter(gomock.Any(), q).Return(mockIter0, nil) + b0.EXPECT().QueryWithIter(gomock.Any(), qOpts, mockIter0, gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + ctx context.Context, + opts index.QueryOptions, + iter index.QueryIterator, + r index.QueryResults, + deadline time.Time, + logFields []opentracinglog.Field, + ) error { + _, _, err = r.AddDocuments([]doc.Document{ + doc.NewDocumentFromMetadata(doc.Metadata{ID: []byte("A")}), + doc.NewDocumentFromMetadata(doc.Metadata{ID: []byte("B")}), + }) + require.NoError(t, err) + return nil + }) + gomock.InOrder( + mockIter0.EXPECT().Done().Return(false), + mockIter0.EXPECT().Done().Return(true), + mockIter0.EXPECT().SearchDuration().Return(time.Minute), + mockIter0.EXPECT().Close().Return(nil), + ) + result, err = idx.Query(ctx, q, qOpts) if test.requireExhaustive { require.Error(t, err) @@ -677,7 +716,7 @@ func TestNamespaceIndexBlockQuery(t *testing.T) { sp.Finish() spans := mtr.FinishedSpans() - require.Len(t, spans, 15) + require.Len(t, spans, 8) }) } } @@ -775,15 +814,6 @@ func TestLimits(t *testing.T) { requireExhaustive: false, expectedErr: "", }, - { - name: "no limits", - seriesLimit: 0, - docsLimit: 0, - requireExhaustive: true, - expectedErr: "query exceeded limit: require_exhaustive=true, " + - "series_limit=0, series_matched=1, docs_limit=0, docs_matched=2", - expectedQueryLimitExceededError: true, - }, { name: "series limit only", seriesLimit: 1, @@ -829,12 +859,22 @@ func TestLimits(t *testing.T) { sp := mtr.StartSpan("root") ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) - b0.EXPECT().Query(gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()). + mockIter := index.NewMockQueryIterator(ctrl) + b0.EXPECT().QueryIter(gomock.Any(), q).Return(mockIter, nil) + gomock.InOrder( + mockIter.EXPECT().Done().Return(false), + mockIter.EXPECT().Done().Return(true), + mockIter.EXPECT().SearchDuration().Return(time.Minute), + mockIter.EXPECT().Close().Return(err), + ) + + b0.EXPECT().QueryWithIter(gomock.Any(), qOpts, mockIter, gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(func(ctx context.Context, - query interface{}, opts interface{}, + iter interface{}, results index.DocumentResults, - logFields interface{}) (bool, error) { + deadline interface{}, + logFields interface{}) error { _, _, err = results.AddDocuments([]doc.Document{ // Results in size=1 and docs=2. // Byte array represents ID encoded as bytes. @@ -844,11 +884,16 @@ func TestLimits(t *testing.T) { doc.NewDocumentFromMetadata(doc.Metadata{ID: []byte("A")}), }) require.NoError(t, err) - return false, nil + return nil }) result, err := idx.Query(ctx, q, qOpts) - require.False(t, result.Exhaustive) + if test.seriesLimit == 0 && test.docsLimit == 0 { + require.True(t, result.Exhaustive) + } else { + require.False(t, result.Exhaustive) + } + if test.requireExhaustive { require.Error(t, err) require.Equal(t, test.expectedErr, err.Error()) @@ -951,9 +996,13 @@ func TestNamespaceIndexBlockQueryReleasingContext(t *testing.T) { StartInclusive: t0, EndExclusive: now.Add(time.Minute), } + mockIter := index.NewMockQueryIterator(ctrl) gomock.InOrder( mockPool.EXPECT().Get().Return(stubResult), - b0.EXPECT().Query(ctx, q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil), + b0.EXPECT().QueryIter(ctx, q).Return(mockIter, nil), + mockIter.EXPECT().Done().Return(true), + mockIter.EXPECT().SearchDuration().Return(time.Minute), + mockIter.EXPECT().Close().Return(nil), mockPool.EXPECT().Put(stubResult), ) _, err = idx.Query(ctx, q, qOpts) @@ -1062,7 +1111,11 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { } aggOpts := index.AggregationOptions{QueryOptions: qOpts} - b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + mockIter0 := index.NewMockAggregateIterator(ctrl) + b0.EXPECT().AggregateIter(gomock.Any(), gomock.Any()).Return(mockIter0, nil) + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Minute) + mockIter0.EXPECT().Close().Return(nil) result, err := idx.AggregateQuery(ctx, q, aggOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -1074,8 +1127,16 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { RequireExhaustive: test.requireExhaustive, } aggOpts = index.AggregationOptions{QueryOptions: qOpts} - b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(true, nil) - b1.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + b0.EXPECT().AggregateIter(gomock.Any(), gomock.Any()).Return(mockIter0, nil) + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Minute) + mockIter0.EXPECT().Close().Return(nil) + + mockIter1 := index.NewMockAggregateIterator(ctrl) + b1.EXPECT().AggregateIter(gomock.Any(), gomock.Any()).Return(mockIter1, nil) + mockIter1.EXPECT().Done().Return(true) + mockIter1.EXPECT().SearchDuration().Return(time.Minute) + mockIter1.EXPECT().Close().Return(nil) result, err = idx.AggregateQuery(ctx, q, aggOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -1085,8 +1146,35 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { StartInclusive: t0, EndExclusive: t0.Add(time.Minute), RequireExhaustive: test.requireExhaustive, + DocsLimit: 1, } - b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(false, nil) + b0.EXPECT().AggregateIter(gomock.Any(), gomock.Any()).Return(mockIter0, nil) + //nolint: dupl + b0.EXPECT(). + AggregateWithIter(gomock.Any(), mockIter0, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + ctx context.Context, + iter index.AggregateIterator, + opts index.QueryOptions, + results index.AggregateResults, + deadline time.Time, + logFields []opentracinglog.Field, + ) error { + _, _ = results.AddFields([]index.AggregateResultsEntry{{ + Field: ident.StringID("A"), + Terms: []ident.ID{ident.StringID("foo")}, + }, { + Field: ident.StringID("B"), + Terms: []ident.ID{ident.StringID("bar")}, + }}) + return nil + }) + gomock.InOrder( + mockIter0.EXPECT().Done().Return(false), + mockIter0.EXPECT().Done().Return(true), + mockIter0.EXPECT().SearchDuration().Return(time.Minute), + mockIter0.EXPECT().Close().Return(nil), + ) aggOpts = index.AggregationOptions{QueryOptions: qOpts} result, err = idx.AggregateQuery(ctx, q, aggOpts) if test.requireExhaustive { @@ -1099,7 +1187,7 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { sp.Finish() spans := mtr.FinishedSpans() - require.Len(t, spans, 15) + require.Len(t, spans, 8) }) } } @@ -1201,9 +1289,13 @@ func TestNamespaceIndexBlockAggregateQueryReleasingContext(t *testing.T) { } aggOpts := index.AggregationOptions{QueryOptions: qOpts} + mockIter := index.NewMockAggregateIterator(ctrl) gomock.InOrder( mockPool.EXPECT().Get().Return(stubResult), - b0.EXPECT().Aggregate(ctx, qOpts, gomock.Any(), gomock.Any()).Return(true, nil), + b0.EXPECT().AggregateIter(ctx, gomock.Any()).Return(mockIter, nil), + mockIter.EXPECT().Done().Return(true), + mockIter.EXPECT().SearchDuration().Return(time.Minute), + mockIter.EXPECT().Close().Return(nil), mockPool.EXPECT().Put(stubResult), ) _, err = idx.AggregateQuery(ctx, q, aggOpts) @@ -1307,7 +1399,11 @@ func TestNamespaceIndexBlockAggregateQueryAggPath(t *testing.T) { q := index.Query{ Query: query, } - b0.EXPECT().Aggregate(ctx, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + mockIter0 := index.NewMockAggregateIterator(ctrl) + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Second) + mockIter0.EXPECT().Close().Return(nil) + b0.EXPECT().AggregateIter(ctx, gomock.Any()).Return(mockIter0, nil) result, err := idx.AggregateQuery(ctx, q, aggOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -1319,8 +1415,17 @@ func TestNamespaceIndexBlockAggregateQueryAggPath(t *testing.T) { RequireExhaustive: test.requireExhaustive, } aggOpts = index.AggregationOptions{QueryOptions: qOpts} - b0.EXPECT().Aggregate(ctx, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) - b1.EXPECT().Aggregate(ctx, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + + mockIter0.EXPECT().Done().Return(true) + mockIter0.EXPECT().SearchDuration().Return(time.Second) + mockIter0.EXPECT().Close().Return(nil) + b0.EXPECT().AggregateIter(ctx, gomock.Any()).Return(mockIter0, nil) + + mockIter1 := index.NewMockAggregateIterator(ctrl) + mockIter1.EXPECT().Done().Return(true) + mockIter1.EXPECT().SearchDuration().Return(time.Second) + mockIter1.EXPECT().Close().Return(nil) + b1.EXPECT().AggregateIter(ctx, gomock.Any()).Return(mockIter1, nil) result, err = idx.AggregateQuery(ctx, q, aggOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -1330,8 +1435,35 @@ func TestNamespaceIndexBlockAggregateQueryAggPath(t *testing.T) { StartInclusive: t0, EndExclusive: t0.Add(time.Minute), RequireExhaustive: test.requireExhaustive, + DocsLimit: 1, } - b0.EXPECT().Aggregate(ctx, qOpts, gomock.Any(), gomock.Any()).Return(false, nil) + b0.EXPECT().AggregateIter(gomock.Any(), gomock.Any()).Return(mockIter0, nil) + //nolint: dupl + b0.EXPECT(). + AggregateWithIter(gomock.Any(), mockIter0, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + ctx context.Context, + iter index.AggregateIterator, + opts index.QueryOptions, + results index.AggregateResults, + deadline time.Time, + logFields []opentracinglog.Field, + ) error { + _, _ = results.AddFields([]index.AggregateResultsEntry{{ + Field: ident.StringID("A"), + Terms: []ident.ID{ident.StringID("foo")}, + }, { + Field: ident.StringID("B"), + Terms: []ident.ID{ident.StringID("bar")}, + }}) + return nil + }) + gomock.InOrder( + mockIter0.EXPECT().Done().Return(false), + mockIter0.EXPECT().Done().Return(true), + mockIter0.EXPECT().SearchDuration().Return(time.Minute), + mockIter0.EXPECT().Close().Return(nil), + ) aggOpts = index.AggregationOptions{QueryOptions: qOpts} result, err = idx.AggregateQuery(ctx, q, aggOpts) if test.requireExhaustive { diff --git a/src/dbnode/storage/index_query_concurrent_test.go b/src/dbnode/storage/index_query_concurrent_test.go index 4443693e51..fb5f24127e 100644 --- a/src/dbnode/storage/index_query_concurrent_test.go +++ b/src/dbnode/storage/index_query_concurrent_test.go @@ -34,12 +34,12 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/index/convert" + "github.com/m3db/m3/src/dbnode/storage/limits/permits" testutil "github.com/m3db/m3/src/dbnode/test" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" - xsync "github.com/m3db/m3/src/x/sync" xtest "github.com/m3db/m3/src/x/test" "github.com/fortytw2/leaktest" @@ -123,8 +123,7 @@ func testNamespaceIndexHighConcurrentQueries( nsIdx := test.index.(*nsIndex) nsIdx.state.Lock() // Make the query pool really high to improve concurrency likelihood - nsIdx.queryWorkersPool = xsync.NewWorkerPool(1000) - nsIdx.queryWorkersPool.Init() + nsIdx.permitsManager = permits.NewFixedPermitsManager(1000) currNow := min nowLock := &sync.Mutex{} @@ -223,32 +222,39 @@ func testNamespaceIndexHighConcurrentQueries( EndTime(). DoAndReturn(func() time.Time { return block.EndTime() }). AnyTimes() + mockBlock.EXPECT().QueryIter(gomock.Any(), gomock.Any()).DoAndReturn(func( + ctx context.Context, query index.Query) (index.QueryIterator, error) { + return block.QueryIter(ctx, query) + }, + ).AnyTimes() if opts.blockErrors { mockBlock.EXPECT(). - Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + QueryWithIter(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(func( _ context.Context, - _ index.Query, _ index.QueryOptions, + _ index.QueryIterator, _ index.QueryResults, + _ time.Time, _ []opentracinglog.Field, - ) (bool, error) { - return false, errors.New("some-error") + ) error { + return errors.New("some-error") }). AnyTimes() } else { mockBlock.EXPECT(). - Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + QueryWithIter(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(func( ctx context.Context, - q index.Query, opts index.QueryOptions, + iter index.QueryIterator, r index.QueryResults, + deadline time.Time, logFields []opentracinglog.Field, - ) (bool, error) { + ) error { time.Sleep(timeoutValue + time.Second) - return block.Query(ctx, q, opts, r, logFields) + return block.QueryWithIter(ctx, opts, iter, r, deadline, logFields) }). AnyTimes() } diff --git a/src/dbnode/storage/index_test.go b/src/dbnode/storage/index_test.go index b551869bec..07b7014f8a 100644 --- a/src/dbnode/storage/index_test.go +++ b/src/dbnode/storage/index_test.go @@ -400,22 +400,29 @@ func TestNamespaceIndexQueryTimeout(t *testing.T) { ctx := context.NewWithGoContext(stdCtx) defer ctx.Close() + mockIter := index.NewMockQueryIterator(ctrl) + mockIter.EXPECT().Done().Return(false).Times(2) + mockIter.EXPECT().SearchDuration().Return(time.Minute * 1) + mockIter.EXPECT().Close().Return(nil) + mockBlock := index.NewMockBlock(ctrl) mockBlock.EXPECT().Stats(gomock.Any()).Return(nil).AnyTimes() blockTime := now.Add(-1 * test.indexBlockSize) mockBlock.EXPECT().StartTime().Return(blockTime).AnyTimes() mockBlock.EXPECT().EndTime().Return(blockTime.Add(test.indexBlockSize)).AnyTimes() + mockBlock.EXPECT().QueryIter(gomock.Any(), gomock.Any()).Return(mockIter, nil) mockBlock.EXPECT(). - Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + QueryWithIter(gomock.Any(), gomock.Any(), mockIter, gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(func( ctx context.Context, - q index.Query, opts index.QueryOptions, + iter index.QueryIterator, r index.QueryResults, + deadline time.Time, logFields []opentracinglog.Field, - ) (bool, error) { + ) error { <-ctx.GoContext().Done() - return false, ctx.GoContext().Err() + return ctx.GoContext().Err() }) mockBlock.EXPECT().Close().Return(nil) idx.state.blocksByTime[xtime.ToUnixNano(blockTime)] = mockBlock diff --git a/src/dbnode/storage/limits/permits/fixed_permits.go b/src/dbnode/storage/limits/permits/fixed_permits.go new file mode 100644 index 0000000000..13db162038 --- /dev/null +++ b/src/dbnode/storage/limits/permits/fixed_permits.go @@ -0,0 +1,91 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package permits + +import ( + "github.com/m3db/m3/src/x/context" +) + +type fixedPermits struct { + permits chan struct{} +} + +type fixedPermitsManager struct { + fp fixedPermits +} + +var ( + _ Permits = &fixedPermits{} + _ Manager = &fixedPermitsManager{} +) + +// NewFixedPermitsManager returns a permits manager that uses a fixed size of permits. +func NewFixedPermitsManager(size int) Manager { + fp := fixedPermits{permits: make(chan struct{}, size)} + for i := 0; i < size; i++ { + fp.permits <- struct{}{} + } + return &fixedPermitsManager{fp} +} + +func (f *fixedPermitsManager) NewPermits(_ context.Context) (Permits, error) { + return &f.fp, nil +} + +func (f *fixedPermits) Acquire(ctx context.Context) error { + // don't acquire a permit if ctx is already done. + select { + case <-ctx.GoContext().Done(): + return ctx.GoContext().Err() + default: + } + + select { + case <-ctx.GoContext().Done(): + return ctx.GoContext().Err() + case <-f.permits: + return nil + } +} + +func (f *fixedPermits) TryAcquire(ctx context.Context) (bool, error) { + // don't acquire a permit if ctx is already done. + select { + case <-ctx.GoContext().Done(): + return false, ctx.GoContext().Err() + default: + } + + select { + case <-f.permits: + return true, nil + default: + return false, nil + } +} + +func (f *fixedPermits) Release(_ int64) { + select { + case f.permits <- struct{}{}: + default: + panic("more permits released than acquired") + } +} diff --git a/src/dbnode/storage/limits/permits/fixed_permits_test.go b/src/dbnode/storage/limits/permits/fixed_permits_test.go new file mode 100644 index 0000000000..7b6f0dd80e --- /dev/null +++ b/src/dbnode/storage/limits/permits/fixed_permits_test.go @@ -0,0 +1,69 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package permits + +import ( + stdctx "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/x/context" +) + +func TestFixedPermits(t *testing.T) { + ctx := context.NewBackground() + fp, err := NewFixedPermitsManager(3).NewPermits(ctx) + require.NoError(t, err) + require.NoError(t, fp.Acquire(ctx)) + require.NoError(t, fp.Acquire(ctx)) + require.NoError(t, fp.Acquire(ctx)) + + acq, err := fp.TryAcquire(ctx) + require.NoError(t, err) + require.False(t, acq) + + fp.Release(0) + require.NoError(t, fp.Acquire(ctx)) +} + +func TestFixedPermitsTimeouts(t *testing.T) { + ctx := context.NewBackground() + fp, err := NewFixedPermitsManager(1).NewPermits(ctx) + require.NoError(t, err) + require.NoError(t, fp.Acquire(ctx)) + + acq, err := fp.TryAcquire(ctx) + require.NoError(t, err) + require.False(t, acq) + + stdCtx, cancel := stdctx.WithCancel(stdctx.Background()) + cancel() + ctx = context.NewWithGoContext(stdCtx) + + fp.Release(0) + + err = fp.Acquire(ctx) + require.Error(t, err) + + _, err = fp.TryAcquire(ctx) + require.Error(t, err) +} diff --git a/src/dbnode/storage/limits/permits/lookback_limit_permit.go b/src/dbnode/storage/limits/permits/lookback_limit_permit.go index 38e4284c38..b495e37596 100644 --- a/src/dbnode/storage/limits/permits/lookback_limit_permit.go +++ b/src/dbnode/storage/limits/permits/lookback_limit_permit.go @@ -99,7 +99,7 @@ func (p *LookbackLimitPermit) TryAcquire(context.Context) (bool, error) { } // Release is a no-op in this implementation. -func (p *LookbackLimitPermit) Release() { +func (p *LookbackLimitPermit) Release(_ int64) { } func sourceFromContext(ctx context.Context) []byte { diff --git a/src/dbnode/storage/limits/permits/noop_permit.go b/src/dbnode/storage/limits/permits/noop_permit.go index f9f9e4833f..6afb174b4a 100644 --- a/src/dbnode/storage/limits/permits/noop_permit.go +++ b/src/dbnode/storage/limits/permits/noop_permit.go @@ -51,5 +51,5 @@ func (p noOpPermits) TryAcquire(context.Context) (bool, error) { return true, nil } -func (p noOpPermits) Release() { +func (p noOpPermits) Release(_ int64) { } diff --git a/src/dbnode/storage/limits/permits/options.go b/src/dbnode/storage/limits/permits/options.go index fe0bedcd28..7e74cb1899 100644 --- a/src/dbnode/storage/limits/permits/options.go +++ b/src/dbnode/storage/limits/permits/options.go @@ -20,14 +20,22 @@ package permits +import ( + "math" + "runtime" +) + type options struct { seriesReadManager Manager + indexQueryManager Manager } // NewOptions return a new set of default permit managers. func NewOptions() Options { return &options{ seriesReadManager: NewNoOpPermitsManager(), + // Default to using half of the available cores for querying IDs + indexQueryManager: NewFixedPermitsManager(int(math.Ceil(float64(runtime.NumCPU()) / 2))), } } @@ -42,3 +50,13 @@ func (o *options) SetSeriesReadPermitsManager(value Manager) Options { func (o *options) SeriesReadPermitsManager() Manager { return o.seriesReadManager } + +func (o *options) IndexQueryPermitsManager() Manager { + return o.indexQueryManager +} + +func (o *options) SetIndexQueryPermitsManager(value Manager) Options { + opts := *o + opts.indexQueryManager = value + return &opts +} diff --git a/src/dbnode/storage/limits/permits/types.go b/src/dbnode/storage/limits/permits/types.go index 504e0a29f7..61c69248b8 100644 --- a/src/dbnode/storage/limits/permits/types.go +++ b/src/dbnode/storage/limits/permits/types.go @@ -25,6 +25,10 @@ import "github.com/m3db/m3/src/x/context" // Options is the permit options. type Options interface { + // IndexQueryPermitsManager returns the index query permits manager. + IndexQueryPermitsManager() Manager + // SetIndexQueryPermitsManager sets the index query permits manager. + SetIndexQueryPermitsManager(manager Manager) Options // SeriesReadPermitsManager returns the series read permits manager. SeriesReadPermitsManager() Manager // SetSeriesReadPermitsManager sets the series read permits manager. @@ -46,7 +50,8 @@ type Permits interface { // true if an resource was acquired. TryAcquire(ctx context.Context) (bool, error) - // Release gives back one acquired permit from the specific permits instance. + // Release gives back one acquired permit from the specific permits instance. The user can pass an optional quota + // indicating how much of quota was used while holding the permit. // Cannot release more permits than have been acquired. - Release() + Release(quota int64) } diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index ec56c22bda..7e61b283a1 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -23,8 +23,6 @@ package storage import ( "errors" "fmt" - "math" - "runtime" "time" "github.com/m3db/m3/src/dbnode/client" @@ -52,7 +50,6 @@ import ( "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/mmap" "github.com/m3db/m3/src/x/pool" - xsync "github.com/m3db/m3/src/x/sync" ) const ( @@ -159,7 +156,6 @@ type options struct { identifierPool ident.Pool fetchBlockMetadataResultsPool block.FetchBlockMetadataResultsPool fetchBlocksMetadataResultsPool block.FetchBlocksMetadataResultsPool - queryIDsWorkerPool xsync.WorkerPool writeBatchPool *writes.WriteBatchPool bufferBucketPool *series.BufferBucketPool bufferBucketVersionsPool *series.BufferBucketVersionsPool @@ -197,10 +193,6 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options { bytesPool.Init() seriesOpts := series.NewOptions() - // Default to using half of the available cores for querying IDs - queryIDsWorkerPool := xsync.NewWorkerPool(int(math.Ceil(float64(runtime.NumCPU()) / 2))) - queryIDsWorkerPool.Init() - writeBatchPool := writes.NewWriteBatchPool(poolOpts, nil, nil) writeBatchPool.Init() @@ -245,7 +237,6 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options { }), fetchBlockMetadataResultsPool: block.NewFetchBlockMetadataResultsPool(poolOpts, 0), fetchBlocksMetadataResultsPool: block.NewFetchBlocksMetadataResultsPool(poolOpts, 0), - queryIDsWorkerPool: queryIDsWorkerPool, writeBatchPool: writeBatchPool, bufferBucketVersionsPool: series.NewBufferBucketVersionsPool(poolOpts), bufferBucketPool: series.NewBufferBucketPool(poolOpts), @@ -703,16 +694,6 @@ func (o *options) FetchBlocksMetadataResultsPool() block.FetchBlocksMetadataResu return o.fetchBlocksMetadataResultsPool } -func (o *options) SetQueryIDsWorkerPool(value xsync.WorkerPool) Options { - opts := *o - opts.queryIDsWorkerPool = value - return &opts -} - -func (o *options) QueryIDsWorkerPool() xsync.WorkerPool { - return o.queryIDsWorkerPool -} - func (o *options) SetWriteBatchPool(value *writes.WriteBatchPool) Options { opts := *o opts.writeBatchPool = value diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 18d227f3aa..74378deb2f 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -56,7 +56,6 @@ import ( "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/mmap" "github.com/m3db/m3/src/x/pool" - sync0 "github.com/m3db/m3/src/x/sync" time0 "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" @@ -4609,34 +4608,6 @@ func (mr *MockOptionsMockRecorder) FetchBlocksMetadataResultsPool() *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchBlocksMetadataResultsPool", reflect.TypeOf((*MockOptions)(nil).FetchBlocksMetadataResultsPool)) } -// SetQueryIDsWorkerPool mocks base method -func (m *MockOptions) SetQueryIDsWorkerPool(value sync0.WorkerPool) Options { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetQueryIDsWorkerPool", value) - ret0, _ := ret[0].(Options) - return ret0 -} - -// SetQueryIDsWorkerPool indicates an expected call of SetQueryIDsWorkerPool -func (mr *MockOptionsMockRecorder) SetQueryIDsWorkerPool(value interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetQueryIDsWorkerPool", reflect.TypeOf((*MockOptions)(nil).SetQueryIDsWorkerPool), value) -} - -// QueryIDsWorkerPool mocks base method -func (m *MockOptions) QueryIDsWorkerPool() sync0.WorkerPool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "QueryIDsWorkerPool") - ret0, _ := ret[0].(sync0.WorkerPool) - return ret0 -} - -// QueryIDsWorkerPool indicates an expected call of QueryIDsWorkerPool -func (mr *MockOptionsMockRecorder) QueryIDsWorkerPool() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryIDsWorkerPool", reflect.TypeOf((*MockOptions)(nil).QueryIDsWorkerPool)) -} - // SetWriteBatchPool mocks base method func (m *MockOptions) SetWriteBatchPool(value *writes.WriteBatchPool) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 0e5d313003..e1b27f1616 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -54,7 +54,6 @@ import ( "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/mmap" "github.com/m3db/m3/src/x/pool" - xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" ) @@ -1217,12 +1216,6 @@ type Options interface { // FetchBlocksMetadataResultsPool returns the fetchBlocksMetadataResultsPool. FetchBlocksMetadataResultsPool() block.FetchBlocksMetadataResultsPool - // SetQueryIDsWorkerPool sets the QueryIDs worker pool. - SetQueryIDsWorkerPool(value xsync.WorkerPool) Options - - // QueryIDsWorkerPool returns the QueryIDs worker pool. - QueryIDsWorkerPool() xsync.WorkerPool - // SetWriteBatchPool sets the WriteBatch pool. SetWriteBatchPool(value *writes.WriteBatchPool) Options