diff --git a/scripts/development/m3_stack/m3dbnode.yml b/scripts/development/m3_stack/m3dbnode.yml index a4fc0a3bf2..0add9908d2 100644 --- a/scripts/development/m3_stack/m3dbnode.yml +++ b/scripts/development/m3_stack/m3dbnode.yml @@ -41,6 +41,8 @@ db: cache: series: policy: lru + postingsList: + size: 262144 commitlog: flushMaxBytes: 524288 diff --git a/src/cmd/services/m3dbnode/config/cache.go b/src/cmd/services/m3dbnode/config/cache.go index 7e29c6e040..d088a4c026 100644 --- a/src/cmd/services/m3dbnode/config/cache.go +++ b/src/cmd/services/m3dbnode/config/cache.go @@ -22,10 +22,19 @@ package config import "github.com/m3db/m3/src/dbnode/storage/series" +var ( + defaultPostingsListCacheSize = 2 << 17 // 262,144 + defaultPostingsListCacheRegexp = true + defaultPostingsListCacheTerms = true +) + // CacheConfigurations is the cache configurations. type CacheConfigurations struct { // Series cache policy. Series *SeriesCacheConfiguration `yaml:"series"` + + // PostingsList cache policy. + PostingsList *PostingsListCacheConfiguration `yaml:"postingsList"` } // SeriesConfiguration returns the series cache configuration or default @@ -38,6 +47,16 @@ func (c CacheConfigurations) SeriesConfiguration() SeriesCacheConfiguration { return *c.Series } +// PostingsListConfiguration returns the postings list cache configuration +// or default if none is specified. +func (c CacheConfigurations) PostingsListConfiguration() PostingsListCacheConfiguration { + if c.PostingsList == nil { + return PostingsListCacheConfiguration{} + } + + return *c.PostingsList +} + // SeriesCacheConfiguration is the series cache configuration. type SeriesCacheConfiguration struct { Policy series.CachePolicy `yaml:"policy"` @@ -50,3 +69,40 @@ type LRUSeriesCachePolicyConfiguration struct { MaxBlocks uint `yaml:"maxBlocks" validate:"nonzero"` EventsChannelSize uint `yaml:"eventsChannelSize" validate:"nonzero"` } + +// PostingsListCacheConfiguration is the postings list cache configuration. +type PostingsListCacheConfiguration struct { + Size *int `yaml:"size"` + CacheRegexp *bool `yaml:"cacheRegexp"` + CacheTerms *bool `yaml:"cacheTerms"` +} + +// SizeOrDefault returns the provided size or the default value is none is +// provided. +func (p *PostingsListCacheConfiguration) SizeOrDefault() int { + if p.Size == nil { + return defaultPostingsListCacheSize + } + + return *p.Size +} + +// CacheRegexpOrDefault returns the provided cache regexp configuration value +// or the default value is none is provided. +func (p *PostingsListCacheConfiguration) CacheRegexpOrDefault() bool { + if p.CacheRegexp == nil { + return defaultPostingsListCacheRegexp + } + + return *p.CacheRegexp +} + +// CacheTermsOrDefault returns the provided cache terms configuration value +// or the default value is none is provided. +func (p *PostingsListCacheConfiguration) CacheTermsOrDefault() bool { + if p.CacheTerms == nil { + return defaultPostingsListCacheTerms + } + + return *p.CacheTerms +} diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index fd9b18664e..297d5c5a06 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -40,6 +40,12 @@ db: level: info file: /var/log/m3dbnode.log + cache: + postingsList: + size: 100 + cacheRegexp: false + cacheTerms: false + metrics: prometheus: handlerPath: /metrics @@ -381,6 +387,10 @@ func TestConfiguration(t *testing.T) { blockRetrieve: null cache: series: null + postingsList: + size: 100 + cacheRegexp: false + cacheTerms: false fs: filePathPrefix: /var/lib/m3db writeBufferSize: 65536 diff --git a/src/dbnode/config/m3dbnode-cluster-template.yml b/src/dbnode/config/m3dbnode-cluster-template.yml index 0771ab56a4..cec60eb72a 100644 --- a/src/dbnode/config/m3dbnode-cluster-template.yml +++ b/src/dbnode/config/m3dbnode-cluster-template.yml @@ -83,6 +83,8 @@ db: cache: series: policy: lru + postingsList: + size: 262144 commitlog: flushMaxBytes: 524288 diff --git a/src/dbnode/config/m3dbnode-local-etcd.yml b/src/dbnode/config/m3dbnode-local-etcd.yml index a8fb1249be..38c1b47b4a 100644 --- a/src/dbnode/config/m3dbnode-local-etcd.yml +++ b/src/dbnode/config/m3dbnode-local-etcd.yml @@ -65,6 +65,8 @@ db: cache: series: policy: lru + postingsList: + size: 262144 commitlog: flushMaxBytes: 524288 diff --git a/src/dbnode/config/m3dbnode-local.yml b/src/dbnode/config/m3dbnode-local.yml index 75797f3066..59bf0261af 100644 --- a/src/dbnode/config/m3dbnode-local.yml +++ b/src/dbnode/config/m3dbnode-local.yml @@ -62,6 +62,8 @@ db: cache: series: policy: lru + postingsList: + size: 262144 commitlog: flushMaxBytes: 524288 diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index cf7aa47ec3..e7dba5bd4b 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -166,7 +166,21 @@ func newTestSetup(t *testing.T, opts testOptions, fsOpts fs.Options) (*testSetup if opts.WriteNewSeriesAsync() { indexMode = index.InsertAsync } - storageOpts = storageOpts.SetIndexOptions(storageOpts.IndexOptions().SetInsertMode(indexMode)) + + plCache, stopReporting, err := index.NewPostingsListCache(10, index.PostingsListCacheOptions{ + InstrumentOptions: iOpts, + }) + if err != nil { + return nil, fmt.Errorf("unable to create postings list cache: %v", err) + } + // Ok to run immediately since it just closes the background reporting loop. Only ok because + // this is a test setup, in production we would want the metrics. + stopReporting() + + indexOpts := storageOpts.IndexOptions(). + SetInsertMode(indexMode). + SetPostingsListCache(plCache) + storageOpts = storageOpts.SetIndexOptions(indexOpts) runtimeOptsMgr := storageOpts.RuntimeOptionsManager() runtimeOpts := runtimeOptsMgr.Get(). diff --git a/src/dbnode/persist/fs/index_read_segments.go b/src/dbnode/persist/fs/index_read_segments.go index e3131e25f3..b31070b499 100644 --- a/src/dbnode/persist/fs/index_read_segments.go +++ b/src/dbnode/persist/fs/index_read_segments.go @@ -101,7 +101,8 @@ func ReadIndexSegments( return nil, err } - seg, err := newPersistentSegment(fileset, fsOpts.FSTOptions()) + fstOpts := fsOpts.FSTOptions() + seg, err := newPersistentSegment(fileset, fstOpts) if err != nil { return nil, err } diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index b53077a0c4..668b32323e 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -313,57 +313,57 @@ type IndexFileSetReader interface { Validate() error } -// Options represents the options for filesystem persistence +// Options represents the options for filesystem persistence. type Options interface { - // Validate will validate the options and return an error if not valid + // Validate will validate the options and return an error if not valid. Validate() error - // SetClockOptions sets the clock options + // SetClockOptions sets the clock options. SetClockOptions(value clock.Options) Options - // ClockOptions returns the clock options + // ClockOptions returns the clock options. ClockOptions() clock.Options - // SetInstrumentOptions sets the instrumentation options + // SetInstrumentOptions sets the instrumentation options. SetInstrumentOptions(value instrument.Options) Options - // InstrumentOptions returns the instrumentation options + // InstrumentOptions returns the instrumentation options. InstrumentOptions() instrument.Options - // SetRuntimeOptionsManager sets the runtime options manager + // SetRuntimeOptionsManager sets the runtime options manager. SetRuntimeOptionsManager(value runtime.OptionsManager) Options - // RuntimeOptionsManager returns the runtime options manager + // RuntimeOptionsManager returns the runtime options manager. RuntimeOptionsManager() runtime.OptionsManager - // SetDecodingOptions sets the decoding options + // SetDecodingOptions sets the decoding options. SetDecodingOptions(value msgpack.DecodingOptions) Options - // DecodingOptions returns the decoding options + // DecodingOptions returns the decoding options. DecodingOptions() msgpack.DecodingOptions - // SetFilePathPrefix sets the file path prefix for sharded TSDB files + // SetFilePathPrefix sets the file path prefix for sharded TSDB files. SetFilePathPrefix(value string) Options - // FilePathPrefix returns the file path prefix for sharded TSDB files + // FilePathPrefix returns the file path prefix for sharded TSDB files. FilePathPrefix() string - // SetNewFileMode sets the new file mode + // SetNewFileMode sets the new file mode. SetNewFileMode(value os.FileMode) Options - // NewFileMode returns the new file mode + // NewFileMode returns the new file mode. NewFileMode() os.FileMode - // SetNewDirectoryMode sets the new directory mode + // SetNewDirectoryMode sets the new directory mode. SetNewDirectoryMode(value os.FileMode) Options - // NewDirectoryMode returns the new directory mode + // NewDirectoryMode returns the new directory mode. NewDirectoryMode() os.FileMode - // SetIndexSummariesPercent size sets the percent of index summaries to write + // SetIndexSummariesPercent size sets the percent of index summaries to write. SetIndexSummariesPercent(value float64) Options - // IndexSummariesPercent size returns the percent of index summaries to write + // IndexSummariesPercent size returns the percent of index summaries to write. IndexSummariesPercent() float64 // SetIndexBloomFilterFalsePositivePercent size sets the percent of false positive @@ -371,7 +371,7 @@ type Options interface { SetIndexBloomFilterFalsePositivePercent(value float64) Options // IndexBloomFilterFalsePositivePercent size returns the percent of false positive - // rate to use for the index bloom filter size and k hashes estimation + // rate to use for the index bloom filter size and k hashes estimation. IndexBloomFilterFalsePositivePercent() float64 // SetForceIndexSummariesMmapMemory sets whether the summaries files will be mmap'd @@ -382,66 +382,66 @@ type Options interface { // as an anonymous region, or as a file. ForceIndexSummariesMmapMemory() bool - // SetForceBloomFilterMmapMemory sets whether the bloom filters will be mmap'd + // SetForceBloomFilterMmapMemory sets whether the bloom filters will be mmap'd. // as an anonymous region, or as a file. SetForceBloomFilterMmapMemory(value bool) Options - // ForceBloomFilterMmapMemory returns whether the bloom filters will be mmap'd + // ForceBloomFilterMmapMemory returns whether the bloom filters will be mmap'd. // as an anonymous region, or as a file. ForceBloomFilterMmapMemory() bool - // SetWriterBufferSize sets the buffer size for writing TSDB files + // SetWriterBufferSize sets the buffer size for writing TSDB files. SetWriterBufferSize(value int) Options - // WriterBufferSize returns the buffer size for writing TSDB files + // WriterBufferSize returns the buffer size for writing TSDB files. WriterBufferSize() int - // SetInfoReaderBufferSize sets the buffer size for reading TSDB info, digest and checkpoint files + // SetInfoReaderBufferSize sets the buffer size for reading TSDB info, digest and checkpoint files. SetInfoReaderBufferSize(value int) Options - // InfoReaderBufferSize returns the buffer size for reading TSDB info, digest and checkpoint files + // InfoReaderBufferSize returns the buffer size for reading TSDB info, digest and checkpoint files. InfoReaderBufferSize() int - // SetDataReaderBufferSize sets the buffer size for reading TSDB data and index files + // SetDataReaderBufferSize sets the buffer size for reading TSDB data and index files. SetDataReaderBufferSize(value int) Options - // DataReaderBufferSize returns the buffer size for reading TSDB data and index files + // DataReaderBufferSize returns the buffer size for reading TSDB data and index files. DataReaderBufferSize() int - // SetSeekReaderBufferSize size sets the buffer size for seeking TSDB files + // SetSeekReaderBufferSize size sets the buffer size for seeking TSDB files. SetSeekReaderBufferSize(value int) Options - // SeekReaderBufferSize size returns the buffer size for seeking TSDB files + // SeekReaderBufferSize size returns the buffer size for seeking TSDB files. SeekReaderBufferSize() int - // SetMmapEnableHugeTLB sets whether mmap huge pages are enabled when running on linux + // SetMmapEnableHugeTLB sets whether mmap huge pages are enabled when running on linux. SetMmapEnableHugeTLB(value bool) Options - // MmapEnableHugeTLB returns whether mmap huge pages are enabled when running on linux + // MmapEnableHugeTLB returns whether mmap huge pages are enabled when running on linux. MmapEnableHugeTLB() bool - // SetMmapHugeTLBThreshold sets the threshold when to use mmap huge pages for mmap'd files on linux + // SetMmapHugeTLBThreshold sets the threshold when to use mmap huge pages for mmap'd files on linux. SetMmapHugeTLBThreshold(value int64) Options - // MmapHugeTLBThreshold returns the threshold when to use mmap huge pages for mmap'd files on linux + // MmapHugeTLBThreshold returns the threshold when to use mmap huge pages for mmap'd files on linux. MmapHugeTLBThreshold() int64 - // SetTagEncoderPool sets the tag encoder pool + // SetTagEncoderPool sets the tag encoder pool. SetTagEncoderPool(value serialize.TagEncoderPool) Options - // TagEncoderPool returns the tag encoder pool + // TagEncoderPool returns the tag encoder pool. TagEncoderPool() serialize.TagEncoderPool - // SetTagDecoderPool sets the tag decoder pool + // SetTagDecoderPool sets the tag decoder pool. SetTagDecoderPool(value serialize.TagDecoderPool) Options - // TagDecoderPool returns the tag decoder pool + // TagDecoderPool returns the tag decoder pool. TagDecoderPool() serialize.TagDecoderPool - // SetFStOptions sets the fst options + // SetFStOptions sets the fst options. SetFSTOptions(value fst.Options) Options - // FSTOptions returns the fst options + // FSTOptions returns the fst options. FSTOptions() fst.Options } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 695751837d..5ae40627fd 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -246,14 +246,34 @@ func Run(runOpts RunOptions) { runtimeOpts = runtimeOpts.SetMaxWiredBlocks(lruCfg.MaxBlocks) } + // Setup postings list cache. + var ( + plCacheConfig = cfg.Cache.PostingsListConfiguration() + plCacheSize = plCacheConfig.SizeOrDefault() + plCacheOptions = index.PostingsListCacheOptions{ + InstrumentOptions: opts.InstrumentOptions(). + SetMetricsScope(scope.SubScope("postings-list-cache")), + } + ) + postingsListCache, stopReporting, err := index.NewPostingsListCache(plCacheSize, plCacheOptions) + if err != nil { + logger.Fatalf("could not construct query cache: %s", err.Error()) + } + defer stopReporting() + // FOLLOWUP(prateek): remove this once we have the runtime options<->index wiring done indexOpts := opts.IndexOptions() insertMode := index.InsertSync if cfg.WriteNewSeriesAsync { insertMode = index.InsertAsync } - opts = opts.SetIndexOptions( - indexOpts.SetInsertMode(insertMode)) + indexOpts = indexOpts.SetInsertMode(insertMode). + SetPostingsListCache(postingsListCache). + SetReadThroughSegmentOptions(index.ReadThroughSegmentOptions{ + CacheRegexp: plCacheConfig.CacheRegexpOrDefault(), + CacheTerms: plCacheConfig.CacheTermsOrDefault(), + }) + opts = opts.SetIndexOptions(indexOpts) if tick := cfg.Tick; tick != nil { runtimeOpts = runtimeOpts. @@ -1162,15 +1182,12 @@ func withEncodingAndPoolingOptions( SetBytesPool(bytesPool). SetIdentifierPool(identifierPool)) - resultsPool := index.NewResultsPool( - poolOptions( - policy.IndexResultsPool, - scope.SubScope("index-results-pool"))) - - postingsListOpts := poolOptions( - policy.PostingsListPool, - scope.SubScope("postingslist-pool")) - postingsList := postings.NewPool(postingsListOpts, roaring.NewPostingsList) + var ( + resultsPool = index.NewResultsPool( + poolOptions(policy.IndexResultsPool, scope.SubScope("index-results-pool"))) + postingsListOpts = poolOptions(policy.PostingsListPool, scope.SubScope("postingslist-pool")) + postingsList = postings.NewPool(postingsListOpts, roaring.NewPostingsList) + ) indexOpts := opts.IndexOptions(). SetInstrumentOptions(iopts). @@ -1188,6 +1205,7 @@ func withEncodingAndPoolingOptions( SetIdentifierPool(identifierPool). SetCheckedBytesPool(bytesPool). SetResultsPool(resultsPool) + resultsPool.Init(func() index.Results { return index.NewResults(indexOpts) }) return opts.SetIndexOptions(indexOpts) diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index dd12d1068d..7940c9b872 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -85,7 +85,18 @@ func init() { panic(err) } + plCache, stopReporting, err := index.NewPostingsListCache(10, index.PostingsListCacheOptions{ + InstrumentOptions: opts.InstrumentOptions(), + }) + if err != nil { + panic(err) + } + defer stopReporting() + + indexOpts := opts.IndexOptions(). + SetPostingsListCache(plCache) defaultTestDatabaseOptions = opts. + SetIndexOptions(indexOpts). SetSeriesCachePolicy(series.CacheAll). SetPersistManager(pm). SetRepairEnabled(false). diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 146b02b6b7..997d28b367 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -833,9 +833,24 @@ func (b *block) AddResults( results.Fulfilled().SummaryString(), blockRange.String()) } + var ( + plCache = b.opts.PostingsListCache() + readThroughOpts = b.opts.ReadThroughSegmentOptions() + segments = results.Segments() + ) + readThroughSegments := make([]segment.Segment, 0, len(segments)) + for _, seg := range segments { + readThroughSeg := seg + if _, ok := seg.(segment.MutableSegment); !ok { + // Only wrap the immutable segments with a read through cache. + readThroughSeg = NewReadThroughSegment(seg, plCache, readThroughOpts) + } + readThroughSegments = append(readThroughSegments, readThroughSeg) + } + entry := blockShardRangesSegments{ shardTimeRanges: results.Fulfilled(), - segments: results.Segments(), + segments: readThroughSegments, } // First see if this block can cover all our current blocks covering shard diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 721a609f30..14a6facf2b 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -773,6 +773,7 @@ func TestBlockAddResultsAddsSegment(t *testing.T) { result.NewIndexBlock(start, []segment.Segment{seg1}, result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 3)))) require.Equal(t, 1, len(b.shardRangesSegments)) + require.Equal(t, seg1, b.shardRangesSegments[0].segments[0]) } @@ -810,6 +811,7 @@ func TestBlockAddResultsAfterSealWorks(t *testing.T) { result.NewIndexBlock(start, []segment.Segment{seg1}, result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 3)))) require.Equal(t, 1, len(b.shardRangesSegments)) + require.Equal(t, seg1, b.shardRangesSegments[0].segments[0]) } diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index f884e66f6e..c63c70eef5 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -132,6 +132,18 @@ func (mr *MockResultsMockRecorder) Namespace() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Namespace", reflect.TypeOf((*MockResults)(nil).Namespace)) } +// NoFinalize mocks base method +func (m *MockResults) NoFinalize() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "NoFinalize") +} + +// NoFinalize indicates an expected call of NoFinalize +func (mr *MockResultsMockRecorder) NoFinalize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NoFinalize", reflect.TypeOf((*MockResults)(nil).NoFinalize)) +} + // Reset mocks base method func (m *MockResults) Reset(arg0 ident.ID) { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/index/options.go b/src/dbnode/storage/index/options.go index f7fb27db3c..b10a1473f9 100644 --- a/src/dbnode/storage/index/options.go +++ b/src/dbnode/storage/index/options.go @@ -52,6 +52,7 @@ var ( errOptionsBytesPoolUnspecified = errors.New("checkedbytes pool is unset") errOptionsResultsPoolUnspecified = errors.New("results pool is unset") errIDGenerationDisabled = errors.New("id generation is disabled") + errPostingsListCacheUnspecified = errors.New("postings list cache is unset") defaultForegroundCompactionOpts compaction.PlannerOptions defaultBackgroundCompactionOpts compaction.PlannerOptions @@ -98,11 +99,13 @@ type opts struct { docArrayPool doc.DocumentArrayPool foregroundCompactionPlannerOpts compaction.PlannerOptions backgroundCompactionPlannerOpts compaction.PlannerOptions + postingsListCache *PostingsListCache + readThroughSegmentOptions ReadThroughSegmentOptions } var undefinedUUIDFn = func() ([]byte, error) { return nil, errIDGenerationDisabled } -// NewOptions returns a new index.Options object with default properties. +// NewOptions returns a new Options object with default properties. func NewOptions() Options { resultsPool := NewResultsPool(pool.NewObjectPoolOptions()) @@ -150,6 +153,9 @@ func (o *opts) Validate() error { if o.resultsPool == nil { return errOptionsResultsPoolUnspecified } + if o.postingsListCache == nil { + return errPostingsListCacheUnspecified + } return nil } @@ -276,3 +282,23 @@ func (o *opts) SetBackgroundCompactionPlannerOptions(value compaction.PlannerOpt func (o *opts) BackgroundCompactionPlannerOptions() compaction.PlannerOptions { return o.backgroundCompactionPlannerOpts } + +func (o *opts) SetPostingsListCache(value *PostingsListCache) Options { + opts := *o + opts.postingsListCache = value + return &opts +} + +func (o *opts) PostingsListCache() *PostingsListCache { + return o.postingsListCache +} + +func (o *opts) SetReadThroughSegmentOptions(value ReadThroughSegmentOptions) Options { + opts := *o + opts.readThroughSegmentOptions = value + return &opts +} + +func (o *opts) ReadThroughSegmentOptions() ReadThroughSegmentOptions { + return o.readThroughSegmentOptions +} diff --git a/src/dbnode/storage/index/postings_list_cache.go b/src/dbnode/storage/index/postings_list_cache.go new file mode 100644 index 0000000000..700f4883a1 --- /dev/null +++ b/src/dbnode/storage/index/postings_list_cache.go @@ -0,0 +1,270 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "sync" + "time" + + "github.com/m3db/m3/src/m3ninx/postings" + "github.com/m3db/m3x/instrument" + + "github.com/pborman/uuid" + "github.com/uber-go/tally" +) + +// PatternType is an enum for the various pattern types. It allows us +// separate them logically within the cache. +type PatternType int + +// Closer represents a function that will close managed resources. +type Closer func() + +const ( + // PatternTypeRegexp indicates that the pattern is of type regexp. + PatternTypeRegexp PatternType = iota + // PatternTypeTerm indicates that the pattern is of type term. + PatternTypeTerm + + reportLoopInterval = 10 * time.Second +) + +// PostingsListCacheOptions is the options struct for the query cache. +type PostingsListCacheOptions struct { + InstrumentOptions instrument.Options +} + +// PostingsListCache implements an LRU for caching queries and their results. +type PostingsListCache struct { + sync.Mutex + + lru *postingsListLRU + + size int + opts PostingsListCacheOptions + metrics *postingsListCacheMetrics + + stopReporting chan struct{} +} + +// NewPostingsListCache creates a new query cache. +func NewPostingsListCache(size int, opts PostingsListCacheOptions) (*PostingsListCache, Closer, error) { + lru, err := newPostingsListLRU(size) + if err != nil { + return nil, nil, err + } + + plc := &PostingsListCache{ + lru: lru, + size: size, + opts: opts, + metrics: newPostingsListCacheMetrics(opts.InstrumentOptions.MetricsScope()), + } + + closer := plc.startReportLoop() + return &PostingsListCache{ + lru: lru, + size: size, + opts: opts, + metrics: newPostingsListCacheMetrics(opts.InstrumentOptions.MetricsScope()), + }, closer, nil +} + +// GetRegexp returns the cached results for the provided regexp query, if any. +func (q *PostingsListCache) GetRegexp( + segmentUUID uuid.UUID, + pattern string, +) (postings.List, bool) { + return q.get( + segmentUUID, + pattern, + PatternTypeRegexp) +} + +// GetTerm returns the cached results for the provided term query, if any. +func (q *PostingsListCache) GetTerm( + segmentUUID uuid.UUID, + pattern string, +) (postings.List, bool) { + return q.get( + segmentUUID, + pattern, + PatternTypeTerm) +} + +func (q *PostingsListCache) get( + segmentUUID uuid.UUID, + pattern string, + patternType PatternType, +) (postings.List, bool) { + // No RLock because a Get() operation mutates the LRU. + q.Lock() + p, ok := q.lru.Get(segmentUUID, pattern, patternType) + q.Unlock() + + q.emitCacheGetMetrics(patternType, ok) + + if !ok { + return nil, false + } + + return p, ok +} + +// PutRegexp updates the LRU with the result of the regexp query. +func (q *PostingsListCache) PutRegexp( + segmentUUID uuid.UUID, + pattern string, + pl postings.List, +) { + q.put(segmentUUID, pattern, PatternTypeRegexp, pl) +} + +// PutTerm updates the LRU with the result of the term query. +func (q *PostingsListCache) PutTerm( + segmentUUID uuid.UUID, + pattern string, + pl postings.List, +) { + q.put(segmentUUID, pattern, PatternTypeTerm, pl) +} + +func (q *PostingsListCache) put( + segmentUUID uuid.UUID, + pattern string, + patternType PatternType, + pl postings.List, +) { + q.Lock() + q.lru.Add( + segmentUUID, + pattern, + patternType, + pl, + ) + q.Unlock() + q.emitCachePutMetrics(patternType) +} + +// PurgeSegment removes all postings lists associated with the specified +// segment from the cache. +func (q *PostingsListCache) PurgeSegment(segmentUUID uuid.UUID) { + q.Lock() + q.lru.PurgeSegment(segmentUUID) + q.Unlock() +} + +// startReportLoop starts a background process that will call Report() +// on a regular basis and returns a function that will end the background +// process. +func (q *PostingsListCache) startReportLoop() Closer { + doneCh := make(chan struct{}) + + go func() { + for { + select { + case <-doneCh: + return + default: + } + + q.Lock() + q.Report() + q.Unlock() + time.Sleep(reportLoopInterval) + } + }() + + return func() { close(doneCh) } +} + +// Report will emit metrics about the status of the cache. +func (q *PostingsListCache) Report() { + var ( + size float64 + capacity float64 + ) + + q.Lock() + size = float64(q.lru.Len()) + capacity = float64(q.size) + q.Unlock() + + q.metrics.size.Update(size) + q.metrics.capacity.Update(capacity) +} + +func (q *PostingsListCache) emitCacheGetMetrics(patternType PatternType, hit bool) { + switch { + case patternType == PatternTypeRegexp && hit: + q.metrics.regexp.hits.Inc(1) + case patternType == PatternTypeRegexp && !hit: + q.metrics.regexp.misses.Inc(1) + case patternType == PatternTypeTerm && hit: + q.metrics.term.hits.Inc(1) + case patternType == PatternTypeTerm && !hit: + q.metrics.term.misses.Inc(1) + } +} + +func (q *PostingsListCache) emitCachePutMetrics(patternType PatternType) { + if patternType == PatternTypeRegexp { + q.metrics.regexp.puts.Inc(1) + } else { + q.metrics.term.puts.Inc(1) + } +} + +type postingsListCacheMetrics struct { + regexp *postingsListCacheMethodMetrics + term *postingsListCacheMethodMetrics + + size tally.Gauge + capacity tally.Gauge +} + +func newPostingsListCacheMetrics(scope tally.Scope) *postingsListCacheMetrics { + return &postingsListCacheMetrics{ + regexp: newPostingsListCacheMethodMetrics(scope.Tagged(map[string]string{ + "query_type": "regexp", + })), + term: newPostingsListCacheMethodMetrics(scope.Tagged(map[string]string{ + "query_type": "term", + })), + + size: scope.Gauge("size"), + capacity: scope.Gauge("capacity"), + } +} + +type postingsListCacheMethodMetrics struct { + hits tally.Counter + misses tally.Counter + puts tally.Counter +} + +func newPostingsListCacheMethodMetrics(scope tally.Scope) *postingsListCacheMethodMetrics { + return &postingsListCacheMethodMetrics{ + hits: scope.Counter("hits"), + misses: scope.Counter("misses"), + puts: scope.Counter("puts"), + } +} diff --git a/src/dbnode/storage/index/postings_list_cache_lru.go b/src/dbnode/storage/index/postings_list_cache_lru.go new file mode 100644 index 0000000000..e59f46c1b4 --- /dev/null +++ b/src/dbnode/storage/index/postings_list_cache_lru.go @@ -0,0 +1,225 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "container/list" + "errors" + + "github.com/m3db/m3/src/m3ninx/postings" + + "github.com/pborman/uuid" +) + +// PostingsListLRU implements a non-thread safe fixed size LRU cache of postings lists +// that were resolved by running a given query against a particular segment. Normally +// an key in the LRU would look like: +// +// type key struct { +// segmentUUID uuid.UUID +// pattern string +// patternType PatternType +// } +// +// However, some of the postings lists that we will store in the LRU have a fixed lifecycle +// because they reference mmap'd byte slices which will eventually be unmap'd. To prevent +// these postings lists that point to unmap'd regions from remaining in the LRU, we want to +// support the ability to efficiently purge the LRU of any postings list that belong to a +// given segment. This isn't technically required for correctness as once a segment has been +// closed, its old postings list in the LRU will never be accessed again (since they are only +// addressable by that segments UUID), but we purge them from the LRU before closing the segment +// anyways as an additional safety precaution. +// +// Instead of adding additional tracking on-top of an existing generic LRU, we've created a +// specialized LRU that instead of having a single top-level map pointing into the linked-list, +// has a two-level map where the top level map is keyed by segment UUID and the second level map +// is keyed by pattern and pattern type. +// +// As a result, when a segment is ready to be closed, they can call into the cache with their +// UUID and we can efficiently remove all the entries corresponding to that segment from the +// LRU. The specialization has the additional nice property that we don't need to allocate everytime +// we add an item to the LRU due to the interface{} conversion. +type postingsListLRU struct { + size int + evictList *list.List + items map[uuid.Array]map[patternAndPatternType]*list.Element +} + +type cachedQuery struct { + postingsList postings.List +} + +// entry is used to hold a value in the evictList. +type entry struct { + uuid uuid.UUID + pattern string + patternType PatternType + postingsList postings.List +} + +type key struct { + uuid uuid.UUID + pattern string + patternType PatternType +} + +type patternAndPatternType struct { + pattern string + patternType PatternType +} + +// newPostingsListLRU constructs an LRU of the given size. +func newPostingsListLRU(size int) (*postingsListLRU, error) { + if size <= 0 { + return nil, errors.New("Must provide a positive size") + } + + return &postingsListLRU{ + size: size, + evictList: list.New(), + items: make(map[uuid.Array]map[patternAndPatternType]*list.Element), + }, nil +} + +// Add adds a value to the cache. Returns true if an eviction occurred. +func (c *postingsListLRU) Add( + segmentUUID uuid.UUID, + pattern string, + patternType PatternType, + pl postings.List, +) (evicted bool) { + // Check for existing item. + uuidArray := segmentUUID.Array() + if uuidEntries, ok := c.items[uuidArray]; ok { + key := newPatternAndPatternType(pattern, patternType) + if ent, ok := uuidEntries[key]; ok { + // If it already exists, just move it to the front. This avoids storing + // the same item in the LRU twice which is important because the maps + // can only point to one entry at a time and we use them for purges. Also, + // it saves space by avoiding storing duplicate values. + c.evictList.MoveToFront(ent) + ent.Value.(*entry).postingsList = pl + return false + } + } + + // Add new item. + var ( + ent = &entry{ + uuid: segmentUUID, + pattern: pattern, + patternType: patternType, + postingsList: pl, + } + entry = c.evictList.PushFront(ent) + key = newPatternAndPatternType(pattern, patternType) + ) + if patterns, ok := c.items[uuidArray]; ok { + patterns[key] = entry + } else { + c.items[uuidArray] = map[patternAndPatternType]*list.Element{ + key: entry, + } + } + + evict := c.evictList.Len() > c.size + // Verify size not exceeded. + if evict { + c.removeOldest() + } + return evict +} + +// Get looks up a key's value from the cache. +func (c *postingsListLRU) Get( + segmentUUID uuid.UUID, + pattern string, + patternType PatternType, +) (postings.List, bool) { + uuidArray := segmentUUID.Array() + if uuidEntries, ok := c.items[uuidArray]; ok { + key := newPatternAndPatternType(pattern, patternType) + if ent, ok := uuidEntries[key]; ok { + c.evictList.MoveToFront(ent) + return ent.Value.(*entry).postingsList, true + } + } + + return nil, false +} + +// Remove removes the provided key from the cache, returning if the +// key was contained. +func (c *postingsListLRU) Remove( + segmentUUID uuid.UUID, + pattern string, + patternType PatternType, +) bool { + uuidArray := segmentUUID.Array() + if uuidEntries, ok := c.items[uuidArray]; ok { + key := newPatternAndPatternType(pattern, patternType) + if ent, ok := uuidEntries[key]; ok { + c.removeElement(ent) + return true + } + } + + return false +} + +func (c *postingsListLRU) PurgeSegment(segmentUUID uuid.UUID) { + if uuidEntries, ok := c.items[segmentUUID.Array()]; ok { + for _, ent := range uuidEntries { + c.removeElement(ent) + } + } +} + +// Len returns the number of items in the cache. +func (c *postingsListLRU) Len() int { + return c.evictList.Len() +} + +// removeOldest removes the oldest item from the cache. +func (c *postingsListLRU) removeOldest() { + ent := c.evictList.Back() + if ent != nil { + c.removeElement(ent) + } +} + +// removeElement is used to remove a given list element from the cache +func (c *postingsListLRU) removeElement(e *list.Element) { + c.evictList.Remove(e) + entry := e.Value.(*entry) + + if patterns, ok := c.items[entry.uuid.Array()]; ok { + key := newPatternAndPatternType(entry.pattern, entry.patternType) + delete(patterns, key) + if len(patterns) == 0 { + delete(c.items, entry.uuid.Array()) + } + } +} + +func newPatternAndPatternType(pattern string, patternType PatternType) patternAndPatternType { + return patternAndPatternType{pattern: pattern, patternType: patternType} +} diff --git a/src/dbnode/storage/index/postings_list_cache_lru_test.go b/src/dbnode/storage/index/postings_list_cache_lru_test.go new file mode 100644 index 0000000000..558020527f --- /dev/null +++ b/src/dbnode/storage/index/postings_list_cache_lru_test.go @@ -0,0 +1,36 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +// Keys returns a slice of the keys in the cache, from oldest to newest. Used for +// testing only. +func (c *postingsListLRU) keys() []key { + keys := make([]key, 0, len(c.items)) + for ent := c.evictList.Back(); ent != nil; ent = ent.Prev() { + entry := ent.Value.(*entry) + keys = append(keys, key{ + uuid: entry.uuid, + pattern: entry.pattern, + patternType: entry.patternType, + }) + } + return keys +} diff --git a/src/dbnode/storage/index/postings_list_cache_test.go b/src/dbnode/storage/index/postings_list_cache_test.go new file mode 100644 index 0000000000..753ebb9106 --- /dev/null +++ b/src/dbnode/storage/index/postings_list_cache_test.go @@ -0,0 +1,345 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "fmt" + "sort" + "strconv" + "sync" + "testing" + + "github.com/m3db/m3/src/m3ninx/postings" + "github.com/m3db/m3/src/m3ninx/postings/roaring" + "github.com/m3db/m3x/instrument" + + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" +) + +const ( + numTestPlEntries = 1000 +) + +var ( + // Filled in by init(). + testPlEntries []testEntry + testPostingListCacheOptions = PostingsListCacheOptions{ + InstrumentOptions: instrument.NewOptions(), + } +) + +func init() { + // Generate test data. + for i := 0; i < numTestPlEntries; i++ { + segmentUUID := uuid.Parse( + fmt.Sprintf("00000000-0000-0000-0000-000000000%03d", i)) + pattern := fmt.Sprintf("%d", i) + pl := roaring.NewPostingsList() + pl.Insert(postings.ID(i)) + + patternType := PatternTypeRegexp + if i%2 == 0 { + patternType = PatternTypeTerm + } + testPlEntries = append(testPlEntries, testEntry{ + segmentUUID: segmentUUID, + pattern: pattern, + patternType: patternType, + postingsList: pl, + }) + } +} + +type testEntry struct { + segmentUUID uuid.UUID + pattern string + patternType PatternType + postingsList postings.List +} + +func TestSimpleLRUBehavior(t *testing.T) { + size := 3 + plCache, stopReporting, err := NewPostingsListCache(size, testPostingListCacheOptions) + require.NoError(t, err) + defer stopReporting() + + var ( + e0 = testPlEntries[0] + e1 = testPlEntries[1] + e2 = testPlEntries[2] + e3 = testPlEntries[3] + e4 = testPlEntries[4] + e5 = testPlEntries[5] + ) + putEntry(plCache, 0) + putEntry(plCache, 1) + putEntry(plCache, 2) + + expectedOrder := []string{e0.pattern, e1.pattern, e2.pattern} + for i, key := range plCache.lru.keys() { + require.Equal(t, expectedOrder[i], key.pattern) + } + + putEntry(plCache, 3) + expectedOrder = []string{e1.pattern, e2.pattern, e3.pattern} + for i, key := range plCache.lru.keys() { + require.Equal(t, expectedOrder[i], key.pattern) + } + + putEntry(plCache, 4) + putEntry(plCache, 4) + putEntry(plCache, 5) + putEntry(plCache, 5) + putEntry(plCache, 0) + putEntry(plCache, 0) + + expectedOrder = []string{e4.pattern, e5.pattern, e0.pattern} + for i, key := range plCache.lru.keys() { + require.Equal(t, expectedOrder[i], key.pattern) + } + + // Miss, no expected change. + getEntry(plCache, 100) + for i, key := range plCache.lru.keys() { + require.Equal(t, expectedOrder[i], key.pattern) + } + + // Hit. + getEntry(plCache, 4) + expectedOrder = []string{e5.pattern, e0.pattern, e4.pattern} + for i, key := range plCache.lru.keys() { + require.Equal(t, expectedOrder[i], key.pattern) + } + + // Multiple hits. + getEntry(plCache, 4) + getEntry(plCache, 0) + getEntry(plCache, 5) + getEntry(plCache, 5) + expectedOrder = []string{e4.pattern, e0.pattern, e5.pattern} + for i, key := range plCache.lru.keys() { + require.Equal(t, expectedOrder[i], key.pattern) + } +} + +func TestPurgeSegment(t *testing.T) { + size := len(testPlEntries) + plCache, stopReporting, err := NewPostingsListCache(size, testPostingListCacheOptions) + require.NoError(t, err) + defer stopReporting() + + // Write many entries with the same segment UUID. + for i := 0; i < 100; i++ { + if testPlEntries[i].patternType == PatternTypeRegexp { + plCache.PutRegexp( + testPlEntries[0].segmentUUID, + testPlEntries[i].pattern, + testPlEntries[i].postingsList, + ) + } else { + plCache.PutTerm( + testPlEntries[0].segmentUUID, + testPlEntries[i].pattern, + testPlEntries[i].postingsList, + ) + } + } + + // Write the remaining entries. + for i := 100; i < len(testPlEntries); i++ { + putEntry(plCache, i) + } + + // Purge all entries related to the segment. + plCache.PurgeSegment(testPlEntries[0].segmentUUID) + + // All entries related to the purged segment should be gone. + require.Equal(t, size-100, plCache.lru.Len()) + for i := 0; i < 100; i++ { + if testPlEntries[i].patternType == PatternTypeRegexp { + _, ok := plCache.GetRegexp( + testPlEntries[0].segmentUUID, + testPlEntries[i].pattern, + ) + require.False(t, ok) + } else { + _, ok := plCache.GetTerm( + testPlEntries[0].segmentUUID, + testPlEntries[i].pattern, + ) + require.False(t, ok) + } + } + + // Remaining entries should still be present. + for i := 100; i < len(testPlEntries); i++ { + getEntry(plCache, i) + } +} + +func TestEverthingInsertedCanBeRetrieved(t *testing.T) { + plCache, stopReporting, err := NewPostingsListCache(len(testPlEntries), testPostingListCacheOptions) + require.NoError(t, err) + defer stopReporting() + + for i := range testPlEntries { + putEntry(plCache, i) + } + + for i, entry := range testPlEntries { + cached, ok := getEntry(plCache, i) + require.True(t, ok) + require.True(t, cached.Equal(entry.postingsList)) + } +} + +func TestConcurrencyWithEviction(t *testing.T) { + testConcurrency(t, len(testPlEntries)/10, true, false) +} + +func TestConcurrencyVerifyResultsNoEviction(t *testing.T) { + testConcurrency(t, len(testPlEntries), false, true) +} + +func testConcurrency(t *testing.T, size int, purge bool, verify bool) { + plCache, stopReporting, err := NewPostingsListCache(size, testPostingListCacheOptions) + require.NoError(t, err) + defer stopReporting() + + wg := sync.WaitGroup{} + // Spin up writers. + for i := range testPlEntries { + wg.Add(1) + go func(i int) { + for j := 0; j < 100; j++ { + putEntry(plCache, i) + } + wg.Done() + }(i) + } + + // Spin up readers. + for i := range testPlEntries { + wg.Add(1) + go func(i int) { + for j := 0; j < 100; j++ { + getEntry(plCache, j) + } + wg.Done() + }(i) + } + + stopPurge := make(chan struct{}) + if purge { + go func() { + for { + select { + case <-stopPurge: + default: + for _, entry := range testPlEntries { + plCache.PurgeSegment(entry.segmentUUID) + } + } + } + }() + } + + wg.Wait() + close(stopPurge) + + if !verify { + return + } + + for i, entry := range testPlEntries { + cached, ok := getEntry(plCache, i) + if !ok { + // Debug. + printSortedKeys(t, plCache) + } + require.True(t, ok) + require.True(t, cached.Equal(entry.postingsList)) + } +} + +func putEntry(cache *PostingsListCache, i int) { + // Do each put twice to test the logic that avoids storing + // multiple entries for the same value. + if testPlEntries[i].patternType == PatternTypeRegexp { + cache.PutRegexp( + testPlEntries[i].segmentUUID, + testPlEntries[i].pattern, + testPlEntries[i].postingsList, + ) + cache.PutRegexp( + testPlEntries[i].segmentUUID, + testPlEntries[i].pattern, + testPlEntries[i].postingsList, + ) + } else { + cache.PutTerm( + testPlEntries[i].segmentUUID, + testPlEntries[i].pattern, + testPlEntries[i].postingsList, + ) + cache.PutTerm( + testPlEntries[i].segmentUUID, + testPlEntries[i].pattern, + testPlEntries[i].postingsList, + ) + } +} + +func getEntry(cache *PostingsListCache, i int) (postings.List, bool) { + if testPlEntries[i].patternType == PatternTypeRegexp { + return cache.GetRegexp( + testPlEntries[i].segmentUUID, + testPlEntries[i].pattern, + ) + } + + return cache.GetTerm( + testPlEntries[i].segmentUUID, + testPlEntries[i].pattern, + ) +} + +func printSortedKeys(t *testing.T, cache *PostingsListCache) { + keys := cache.lru.keys() + sort.Slice(keys, func(i, j int) bool { + iIdx, err := strconv.ParseInt(keys[i].pattern, 10, 64) + if err != nil { + t.Fatalf("unable to parse: %s into int", keys[i].pattern) + } + + jIdx, err := strconv.ParseInt(keys[j].pattern, 10, 64) + if err != nil { + t.Fatalf("unable to parse: %s into int", keys[i].pattern) + } + + return iIdx < jIdx + }) + + for _, key := range keys { + fmt.Println("key: ", key.pattern) + } +} diff --git a/src/dbnode/storage/index/read_through_segment.go b/src/dbnode/storage/index/read_through_segment.go new file mode 100644 index 0000000000..af835f42ea --- /dev/null +++ b/src/dbnode/storage/index/read_through_segment.go @@ -0,0 +1,185 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "errors" + "sync" + + "github.com/m3db/m3/src/m3ninx/index" + "github.com/m3db/m3/src/m3ninx/index/segment" + "github.com/m3db/m3/src/m3ninx/postings" + + "github.com/pborman/uuid" +) + +var ( + errCantGetReaderFromClosedSegment = errors.New("cant get reader from closed segment") + errCantCloseClosedSegment = errors.New("cant close closed segment") +) + +// ReadThroughSegment wraps a segment with a postings list cache so that +// queries can be transparently cached in a read through manner. In addition, +// the postings lists returned by the segments may not be safe to use once the +// underlying segments are closed due to the postings lists pointing into the +// segments mmap'd region. As a result, the close method of the ReadThroughSegment +// will make sure that the cache is purged of all the segments postings lists before +// the segment itself is closed. +type ReadThroughSegment struct { + segment.Segment + sync.RWMutex + + opts ReadThroughSegmentOptions + uuid uuid.UUID + postingsListCache *PostingsListCache + + closed bool +} + +// ReadThroughSegmentOptions is the options struct for the +// ReadThroughSegment. +type ReadThroughSegmentOptions struct { + // Whether the postings list for regexp queries should be cached. + CacheRegexp bool + // Whether the postings list for term queries should be cached. + CacheTerms bool +} + +// NewReadThroughSegment creates a new read through segment. +func NewReadThroughSegment( + seg segment.Segment, + cache *PostingsListCache, + opts ReadThroughSegmentOptions, +) segment.Segment { + return &ReadThroughSegment{ + Segment: seg, + + opts: opts, + uuid: uuid.NewUUID(), + postingsListCache: cache, + } +} + +// Reader returns a read through reader for the read through segment. +func (r *ReadThroughSegment) Reader() (index.Reader, error) { + r.RLock() + defer r.RUnlock() + if r.closed { + return nil, errCantGetReaderFromClosedSegment + } + + reader, err := r.Segment.Reader() + if err != nil { + return nil, err + } + return newReadThroughSegmentReader( + reader, r.uuid, r.postingsListCache, r.opts), nil +} + +// Close purges all entries in the cache associated with this segment, +// and then closes the underlying segment. +func (r *ReadThroughSegment) Close() error { + r.Lock() + defer r.Unlock() + if r.closed { + return errCantCloseClosedSegment + } + + r.closed = true + + if r.postingsListCache != nil { + // Purge segments from the cache before closing the segment to avoid + // temporarily having postings lists in the cache whose underlying + // bytes are no longer mmap'd. + r.postingsListCache.PurgeSegment(r.uuid) + } + return r.Segment.Close() +} + +type readThroughSegmentReader struct { + index.Reader + + opts ReadThroughSegmentOptions + uuid uuid.UUID + postingsListCache *PostingsListCache +} + +func newReadThroughSegmentReader( + reader index.Reader, + uuid uuid.UUID, + cache *PostingsListCache, + opts ReadThroughSegmentOptions, +) index.Reader { + return &readThroughSegmentReader{ + Reader: reader, + + opts: opts, + uuid: uuid, + postingsListCache: cache, + } +} + +// MatchRegexp returns a cached posting list or queries the underlying +// segment if their is a cache miss. +func (s *readThroughSegmentReader) MatchRegexp( + field []byte, + c index.CompiledRegex, +) (postings.List, error) { + if s.postingsListCache == nil || !s.opts.CacheRegexp { + return s.Reader.MatchRegexp(field, c) + } + + // TODO(rartoul): Would be nice not to allocate a string here. + pattern := c.FSTSyntax.String() + pl, ok := s.postingsListCache.GetRegexp(s.uuid, pattern) + if ok { + return pl, nil + } + + pl, err := s.Reader.MatchRegexp(field, c) + if err == nil { + s.postingsListCache.PutRegexp(s.uuid, pattern, pl) + } + return pl, err +} + +// MatchTerm returns a cached posting list or queries the underlying +// segment if their is a cache miss. +func (s *readThroughSegmentReader) MatchTerm( + field []byte, term []byte, +) (postings.List, error) { + if s.postingsListCache == nil || !s.opts.CacheTerms { + return s.Reader.MatchTerm(field, term) + } + + // TODO(rartoul): Would be nice to not allocate a string here. + termString := string(term) + pl, ok := s.postingsListCache.GetTerm(s.uuid, termString) + if ok { + return pl, nil + } + + pl, err := s.Reader.MatchTerm(field, term) + if err == nil { + s.postingsListCache.PutTerm(s.uuid, termString, pl) + } + return pl, err +} diff --git a/src/dbnode/storage/index/read_through_segment_test.go b/src/dbnode/storage/index/read_through_segment_test.go new file mode 100644 index 0000000000..7acf5dae53 --- /dev/null +++ b/src/dbnode/storage/index/read_through_segment_test.go @@ -0,0 +1,309 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "regexp/syntax" + "testing" + + "github.com/m3db/m3/src/m3ninx/index" + "github.com/m3db/m3/src/m3ninx/index/segment/fst" + "github.com/m3db/m3/src/m3ninx/postings/roaring" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +var ( + defaultReadThroughSegmentOptions = ReadThroughSegmentOptions{ + CacheRegexp: true, + CacheTerms: true, + } +) + +func TestReadThroughSegmentMatchRegexp(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + segment := fst.NewMockSegment(ctrl) + reader := index.NewMockReader(ctrl) + segment.EXPECT().Reader().Return(reader, nil) + + cache, stopReporting, err := NewPostingsListCache(1, testPostingListCacheOptions) + require.NoError(t, err) + defer stopReporting() + + field := []byte("some-field") + parsedRegex, err := syntax.Parse(".*this-will-be-slow.*", syntax.Simple) + require.NoError(t, err) + compiledRegex := index.CompiledRegex{ + FSTSyntax: parsedRegex, + } + + readThrough, err := NewReadThroughSegment( + segment, cache, defaultReadThroughSegmentOptions).Reader() + require.NoError(t, err) + + originalPL := roaring.NewPostingsList() + require.NoError(t, originalPL.Insert(1)) + reader.EXPECT().MatchRegexp(field, gomock.Any()).Return(originalPL, nil) + + // Make sure it goes to the segment when the cache misses. + pl, err := readThrough.MatchRegexp(field, compiledRegex) + require.NoError(t, err) + require.True(t, pl.Equal(originalPL)) + + // Make sure it relies on the cache if its present (mock only expects + // one call.) + pl, err = readThrough.MatchRegexp(field, compiledRegex) + require.NoError(t, err) + require.True(t, pl.Equal(originalPL)) +} + +func TestReadThroughSegmentMatchRegexpCacheDisabled(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + segment := fst.NewMockSegment(ctrl) + reader := index.NewMockReader(ctrl) + segment.EXPECT().Reader().Return(reader, nil) + + cache, stopReporting, err := NewPostingsListCache(1, testPostingListCacheOptions) + require.NoError(t, err) + defer stopReporting() + + field := []byte("some-field") + parsedRegex, err := syntax.Parse(".*this-will-be-slow.*", syntax.Simple) + require.NoError(t, err) + compiledRegex := index.CompiledRegex{ + FSTSyntax: parsedRegex, + } + + readThrough, err := NewReadThroughSegment(segment, cache, ReadThroughSegmentOptions{ + CacheRegexp: false, + }).Reader() + require.NoError(t, err) + + originalPL := roaring.NewPostingsList() + require.NoError(t, originalPL.Insert(1)) + reader.EXPECT(). + MatchRegexp(field, gomock.Any()). + Return(originalPL, nil). + Times(2) + + // Make sure it goes to the segment. + pl, err := readThrough.MatchRegexp(field, compiledRegex) + require.NoError(t, err) + require.True(t, pl.Equal(originalPL)) + + // Make sure it goes to the segment the second time - meaning the cache was + // disabled. + pl, err = readThrough.MatchRegexp(field, compiledRegex) + require.NoError(t, err) + require.True(t, pl.Equal(originalPL)) +} + +func TestReadThroughSegmentMatchRegexpNoCache(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + segment = fst.NewMockSegment(ctrl) + reader = index.NewMockReader(ctrl) + field = []byte("some-field") + parsedRegex, err = syntax.Parse(".*this-will-be-slow.*", syntax.Simple) + ) + require.NoError(t, err) + + segment.EXPECT().Reader().Return(reader, nil) + compiledRegex := index.CompiledRegex{ + FSTSyntax: parsedRegex, + } + + readThrough, err := NewReadThroughSegment( + segment, nil, defaultReadThroughSegmentOptions).Reader() + require.NoError(t, err) + + originalPL := roaring.NewPostingsList() + require.NoError(t, originalPL.Insert(1)) + reader.EXPECT().MatchRegexp(field, gomock.Any()).Return(originalPL, nil) + + // Make sure it it works with no cache. + pl, err := readThrough.MatchRegexp(field, compiledRegex) + require.NoError(t, err) + require.True(t, pl.Equal(originalPL)) +} + +func TestReadThroughSegmentMatchTerm(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + segment := fst.NewMockSegment(ctrl) + reader := index.NewMockReader(ctrl) + segment.EXPECT().Reader().Return(reader, nil) + + cache, stopReporting, err := NewPostingsListCache(1, testPostingListCacheOptions) + require.NoError(t, err) + defer stopReporting() + + var ( + field = []byte("some-field") + term = []byte("some-term") + + originalPL = roaring.NewPostingsList() + ) + require.NoError(t, originalPL.Insert(1)) + + readThrough, err := NewReadThroughSegment( + segment, cache, defaultReadThroughSegmentOptions).Reader() + require.NoError(t, err) + + reader.EXPECT().MatchTerm(field, term).Return(originalPL, nil) + + // Make sure it goes to the segment when the cache misses. + pl, err := readThrough.MatchTerm(field, term) + require.NoError(t, err) + require.True(t, pl.Equal(originalPL)) + + // Make sure it relies on the cache if its present (mock only expects + // one call.) + pl, err = readThrough.MatchTerm(field, term) + require.NoError(t, err) + require.True(t, pl.Equal(originalPL)) +} + +func TestReadThroughSegmentMatchTermCacheDisabled(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + segment := fst.NewMockSegment(ctrl) + reader := index.NewMockReader(ctrl) + segment.EXPECT().Reader().Return(reader, nil) + + cache, stopReporting, err := NewPostingsListCache(1, testPostingListCacheOptions) + require.NoError(t, err) + defer stopReporting() + + var ( + field = []byte("some-field") + term = []byte("some-term") + + originalPL = roaring.NewPostingsList() + ) + require.NoError(t, originalPL.Insert(1)) + + readThrough, err := NewReadThroughSegment(segment, cache, ReadThroughSegmentOptions{ + CacheTerms: false, + }).Reader() + require.NoError(t, err) + + reader.EXPECT(). + MatchTerm(field, term). + Return(originalPL, nil). + Times(2) + + // Make sure it goes to the segment when the cache misses. + pl, err := readThrough.MatchTerm(field, term) + require.NoError(t, err) + require.True(t, pl.Equal(originalPL)) + + // Make sure it goes to the segment the second time - meaning the cache was + // disabled. + pl, err = readThrough.MatchTerm(field, term) + require.NoError(t, err) + require.True(t, pl.Equal(originalPL)) +} + +func TestReadThroughSegmentMatchTermNoCache(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + segment = fst.NewMockSegment(ctrl) + reader = index.NewMockReader(ctrl) + + field = []byte("some-field") + term = []byte("some-term") + + originalPL = roaring.NewPostingsList() + ) + require.NoError(t, originalPL.Insert(1)) + + segment.EXPECT().Reader().Return(reader, nil) + + readThrough, err := NewReadThroughSegment( + segment, nil, defaultReadThroughSegmentOptions).Reader() + require.NoError(t, err) + + reader.EXPECT().MatchTerm(field, term).Return(originalPL, nil) + + // Make sure it it works with no cache. + pl, err := readThrough.MatchTerm(field, term) + require.NoError(t, err) + require.True(t, pl.Equal(originalPL)) +} + +func TestClose(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + segment := fst.NewMockSegment(ctrl) + cache, stopReporting, err := NewPostingsListCache(1, testPostingListCacheOptions) + require.NoError(t, err) + defer stopReporting() + + readThroughSeg := NewReadThroughSegment( + segment, cache, defaultReadThroughSegmentOptions) + + segmentUUID := readThroughSeg.(*ReadThroughSegment).uuid + + // Store an entry for the segment in the cache so we can check if it + // gets purged after. + cache.PutRegexp(segmentUUID, "some-regexp", roaring.NewPostingsList()) + + segment.EXPECT().Close().Return(nil) + err = readThroughSeg.Close() + require.NoError(t, err) + require.True(t, readThroughSeg.(*ReadThroughSegment).closed) + + // Make sure it does not allow double closes. + err = readThroughSeg.Close() + require.Equal(t, errCantCloseClosedSegment, err) + + // Make sure it does not allow readers to be created after closing. + _, err = readThroughSeg.Reader() + require.Equal(t, errCantGetReaderFromClosedSegment, err) +} + +func TestCloseNoCache(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + segment := fst.NewMockSegment(ctrl) + + readThrough := NewReadThroughSegment( + segment, nil, defaultReadThroughSegmentOptions) + + segment.EXPECT().Close().Return(nil) + err := readThrough.Close() + require.NoError(t, err) + require.True(t, readThrough.(*ReadThroughSegment).closed) +} diff --git a/src/dbnode/storage/index/results.go b/src/dbnode/storage/index/results.go index 74ccc72d6b..b09ffe5a14 100644 --- a/src/dbnode/storage/index/results.go +++ b/src/dbnode/storage/index/results.go @@ -39,7 +39,8 @@ type results struct { idPool ident.Pool bytesPool pool.CheckedBytesPool - pool ResultsPool + pool ResultsPool + noFinalize bool } // NewResults returns a new results object. @@ -156,6 +157,10 @@ func (r *results) Reset(nsID ident.ID) { } func (r *results) Finalize() { + if r.noFinalize { + return + } + r.Reset(nil) if r.pool == nil { @@ -163,3 +168,14 @@ func (r *results) Finalize() { } r.pool.Put(r) } + +func (r *results) NoFinalize() { + // Ensure neither the results object itself, or any of its underlying + // IDs and tags will be finalized. + r.noFinalize = true + for _, entry := range r.resultsMap.Iter() { + id, tags := entry.Key(), entry.Value() + id.NoFinalize() + tags.NoFinalize() + } +} diff --git a/src/dbnode/storage/index/results_test.go b/src/dbnode/storage/index/results_test.go index b7da8c5e4b..a2ffb0b563 100644 --- a/src/dbnode/storage/index/results_test.go +++ b/src/dbnode/storage/index/results_test.go @@ -231,3 +231,67 @@ func tagsFromFields(fields []doc.Field) ident.Tags { } return tags } + +func TestFinalize(t *testing.T) { + // Create a Results and insert some data. + res := NewResults(testOpts) + d1 := doc.Document{ID: []byte("abc")} + added, size, err := res.AddDocument(d1) + require.NoError(t, err) + require.True(t, added) + require.Equal(t, 1, size) + + // Ensure the data is present. + tags, ok := res.Map().Get(ident.StringID("abc")) + require.True(t, ok) + require.Equal(t, 0, len(tags.Values())) + + // Call Finalize() to reset the Results. + res.Finalize() + + // Ensure data was removed by call to Finalize(). + tags, ok = res.Map().Get(ident.StringID("abc")) + require.False(t, ok) + require.Equal(t, 0, len(tags.Values())) + require.Equal(t, 0, res.Size()) + + for _, entry := range res.Map().Iter() { + id, _ := entry.Key(), entry.Value() + require.False(t, id.IsNoFinalize()) + // TODO(rartoul): Could verify tags are NoFinalize() as well if + // they had that method. + } +} + +func TestNoFinalize(t *testing.T) { + // Create a Results and insert some data. + res := NewResults(testOpts) + d1 := doc.Document{ID: []byte("abc")} + added, size, err := res.AddDocument(d1) + require.NoError(t, err) + require.True(t, added) + require.Equal(t, 1, size) + + // Ensure the data is present. + tags, ok := res.Map().Get(ident.StringID("abc")) + require.True(t, ok) + require.Equal(t, 0, len(tags.Values())) + + // Call to NoFinalize indicates that subsequent call + // to finalize should be a no-op. + res.NoFinalize() + res.Finalize() + + // Ensure data was not removed by call to Finalize(). + tags, ok = res.Map().Get(ident.StringID("abc")) + require.True(t, ok) + require.Equal(t, 0, len(tags.Values())) + require.Equal(t, 1, res.Size()) + + for _, entry := range res.Map().Iter() { + id, _ := entry.Key(), entry.Value() + require.True(t, id.IsNoFinalize()) + // TODO(rartoul): Could verify tags are NoFinalize() as well if + // they had that method. + } +} diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 6901ea2f0c..6397181fe5 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -94,6 +94,11 @@ type Results interface { // including returning it to a backing pool. Finalize() + // NoFinalize marks the Results such that a subsequent call to Finalize() will + // be a no-op and will not return the object to the pool or release any of its + // resources. + NoFinalize() + // Size returns the number of IDs tracked. Size() int @@ -665,4 +670,16 @@ type Options interface { // BackgroundCompactionPlannerOptions returns the compaction planner options. BackgroundCompactionPlannerOptions() compaction.PlannerOptions + + // SetPostingsListCache sets the postings list cache. + SetPostingsListCache(value *PostingsListCache) Options + + // PostingsListCache returns the postings list cache. + PostingsListCache() *PostingsListCache + + // SetReadThroughSegmentOptions sets the read through segment cache options. + SetReadThroughSegmentOptions(value ReadThroughSegmentOptions) Options + + // ReadThroughSegmentOptions returns the read through segment cache options. + ReadThroughSegmentOptions() ReadThroughSegmentOptions } diff --git a/src/dbnode/storage/shard_ref_count_test.go b/src/dbnode/storage/shard_ref_count_test.go index 21ca0a4737..6a4b31ec97 100644 --- a/src/dbnode/storage/shard_ref_count_test.go +++ b/src/dbnode/storage/shard_ref_count_test.go @@ -134,8 +134,15 @@ func TestShardWriteTaggedSyncRefCountSyncIndex(t *testing.T) { } md, err := namespace.NewMetadata(defaultTestNs1ID, defaultTestNs1Opts) require.NoError(t, err) - idx, err := newNamespaceIndexWithInsertQueueFn(md, newFn, testDatabaseOptions(). - SetIndexOptions(index.NewOptions().SetInsertMode(index.InsertSync))) + + var ( + opts = testDatabaseOptions() + indexOpts = opts.IndexOptions(). + SetInsertMode(index.InsertSync) + ) + opts = opts.SetIndexOptions(indexOpts) + + idx, err := newNamespaceIndexWithInsertQueueFn(md, newFn, opts) assert.NoError(t, err) defer func() { @@ -303,8 +310,15 @@ func TestShardWriteTaggedAsyncRefCountSyncIndex(t *testing.T) { } md, err := namespace.NewMetadata(defaultTestNs1ID, defaultTestNs1Opts) require.NoError(t, err) - idx, err := newNamespaceIndexWithInsertQueueFn(md, newFn, testDatabaseOptions(). - SetIndexOptions(index.NewOptions().SetInsertMode(index.InsertSync))) + + var ( + opts = testDatabaseOptions() + indexOpts = opts.IndexOptions(). + SetInsertMode(index.InsertSync) + ) + opts = opts.SetIndexOptions(indexOpts) + + idx, err := newNamespaceIndexWithInsertQueueFn(md, newFn, opts) assert.NoError(t, err) defer func() {